GStreamerProcess: Now partly tested for the first time. Bunch of fixes.

This commit is contained in:
Stein Magnus Jodal 2010-08-11 22:54:46 +02:00
parent fa4c710007
commit 296af3c2af

View File

@ -17,8 +17,8 @@ class GStreamerOutput(object):
Starts the :class:`GStreamerProcess`.
"""
def __init__(self, core_queue):
process = GStreamerProcess(core_queue)
def __init__(self, core_queue, input_connection):
process = GStreamerProcess(core_queue, input_connection)
process.start()
class GStreamerProcess(BaseProcess):
@ -28,9 +28,10 @@ class GStreamerProcess(BaseProcess):
The main loop processes events from both Mopidy and GStreamer.
"""
def __init__(self, core_queue):
def __init__(self, core_queue, input_connection):
super(GStreamerProcess, self).__init__()
self.core_queue = core_queue
self.input_connection = input_connection
self.gobject_context = None
self.gst_pipeline = None
self.gst_bus = None
@ -43,8 +44,11 @@ class GStreamerProcess(BaseProcess):
def run_inside_try(self):
self.setup()
while True:
message = self.core_queue.get()
self.process_core_message(message)
# FIXME Should we block on poll() or not? Need to see iteration()
# behaviour first.
if self.input_connection.poll():
message = self.input_connection.recv()
self.process_core_message(message)
self.gobject_context.iteration(True)
def setup(self):
@ -78,12 +82,25 @@ class GStreamerProcess(BaseProcess):
self.gst_sink = gst.element_factory_make('autoaudiosink', 'sink')
self.gst_pipeline.add(self.gst_sink)
# The audio URI chain
gst.element_link_many(self.gst_uri_src, self.gst_volume, self.gst_sink)
# Add callback that will link uri_src output with volume filter input
# when the output pad is ready.
# See http://stackoverflow.com/questions/2993777 for details.
def on_new_decoded_pad(dbin, pad, is_last):
uri_src = pad.get_parent()
pipeline = uri_src.get_parent()
volume = pipeline.get_by_name('volume')
uri_src.link(volume)
logger.debug("Linked uri_src's new decoded pad to volume filter")
# FIXME uridecodebin got no new-decoded-pad signal, but it's
# subcomponent decodebin2 got that signal. Fixing this is postponed
# till after data_src is up and running perfectly
#self.gst_uri_src.connect('new-decoded-pad', on_new_decoded_pad)
# The audio data chain
gst.element_link_many(self.gst_data_src, self.gst_volume,
self.gst_sink)
# Link data source output with volume filter input
self.gst_data_src.link(self.gst_volume)
# Link volume filter output to audio sink input
self.gst_volume.link(self.gst_sink)
def process_core_message(self, message):
"""Process messages from the rest of Mopidy."""
@ -93,7 +110,7 @@ class GStreamerProcess(BaseProcess):
connection = unpickle_connection(message['reply_to'])
connection.send(response)
elif message['command'] == 'deliver_data':
# TODO Do we care about sending responses for every data delivery?
# FIXME Do we care about sending responses for every data delivery?
self.deliver_data(message['caps'], message['data'])
elif message['command'] == 'set_state':
response = self.set_state(message['state'])
@ -105,11 +122,13 @@ class GStreamerProcess(BaseProcess):
def process_gst_message(self, bus, message):
"""Process messages from GStreamer."""
if message.type == gst.MESSAGE_EOS:
pass # TODO Handle end of track/stream
self.core_queue.put({'message': 'end_of_track'})
elif message.type == gst.MESSAGE_ERROR:
self.gst_bin.set_state(gst.STATE_NULL)
self.set_state('NULL')
error, debug = message.parse_error()
logger.error(u'%s %s', error, debug)
# FIXME Should we send 'stop_playback' to core here? Can we
# differentiate on how serious the error is?
def deliver_data(self, caps_string, data):
"""Deliver audio data to be played"""
@ -129,11 +148,20 @@ class GStreamerProcess(BaseProcess):
"""
Set the GStreamer state. Returns :class:`True` if successful.
:param state_name: READY, PLAYING, or PAUSED
.. digraph:: gst_state_transitions
"NULL" -> "READY"
"PAUSED" -> "PLAYING"
"PAUSED" -> "READY"
"PLAYING" -> "PAUSED"
"READY" -> "NULL"
"READY" -> "PAUSED"
:param state_name: NULL, READY, PAUSED, or PLAYING
:type state_name: string
:rtype: :class:`True` or :class:`False`
"""
result = self.gst_uri_src.set_state(
result = self.gst_pipeline.set_state(
getattr(gst, 'STATE_' + state_name))
if result == gst.STATE_CHANGE_SUCCESS:
logger.debug('Setting GStreamer state to %s: OK', state_name)