diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 49c9d5af..acae6493 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -17,8 +17,8 @@ class GStreamerOutput(object): Starts the :class:`GStreamerProcess`. """ - def __init__(self, core_queue): - process = GStreamerProcess(core_queue) + def __init__(self, core_queue, input_connection): + process = GStreamerProcess(core_queue, input_connection) process.start() class GStreamerProcess(BaseProcess): @@ -28,9 +28,10 @@ class GStreamerProcess(BaseProcess): The main loop processes events from both Mopidy and GStreamer. """ - def __init__(self, core_queue): + def __init__(self, core_queue, input_connection): super(GStreamerProcess, self).__init__() self.core_queue = core_queue + self.input_connection = input_connection self.gobject_context = None self.gst_pipeline = None self.gst_bus = None @@ -43,8 +44,11 @@ class GStreamerProcess(BaseProcess): def run_inside_try(self): self.setup() while True: - message = self.core_queue.get() - self.process_core_message(message) + # FIXME Should we block on poll() or not? Need to see iteration() + # behaviour first. + if self.input_connection.poll(): + message = self.input_connection.recv() + self.process_core_message(message) self.gobject_context.iteration(True) def setup(self): @@ -78,12 +82,25 @@ class GStreamerProcess(BaseProcess): self.gst_sink = gst.element_factory_make('autoaudiosink', 'sink') self.gst_pipeline.add(self.gst_sink) - # The audio URI chain - gst.element_link_many(self.gst_uri_src, self.gst_volume, self.gst_sink) + # Add callback that will link uri_src output with volume filter input + # when the output pad is ready. + # See http://stackoverflow.com/questions/2993777 for details. + def on_new_decoded_pad(dbin, pad, is_last): + uri_src = pad.get_parent() + pipeline = uri_src.get_parent() + volume = pipeline.get_by_name('volume') + uri_src.link(volume) + logger.debug("Linked uri_src's new decoded pad to volume filter") + # FIXME uridecodebin got no new-decoded-pad signal, but it's + # subcomponent decodebin2 got that signal. Fixing this is postponed + # till after data_src is up and running perfectly + #self.gst_uri_src.connect('new-decoded-pad', on_new_decoded_pad) - # The audio data chain - gst.element_link_many(self.gst_data_src, self.gst_volume, - self.gst_sink) + # Link data source output with volume filter input + self.gst_data_src.link(self.gst_volume) + + # Link volume filter output to audio sink input + self.gst_volume.link(self.gst_sink) def process_core_message(self, message): """Process messages from the rest of Mopidy.""" @@ -93,7 +110,7 @@ class GStreamerProcess(BaseProcess): connection = unpickle_connection(message['reply_to']) connection.send(response) elif message['command'] == 'deliver_data': - # TODO Do we care about sending responses for every data delivery? + # FIXME Do we care about sending responses for every data delivery? self.deliver_data(message['caps'], message['data']) elif message['command'] == 'set_state': response = self.set_state(message['state']) @@ -105,11 +122,13 @@ class GStreamerProcess(BaseProcess): def process_gst_message(self, bus, message): """Process messages from GStreamer.""" if message.type == gst.MESSAGE_EOS: - pass # TODO Handle end of track/stream + self.core_queue.put({'message': 'end_of_track'}) elif message.type == gst.MESSAGE_ERROR: - self.gst_bin.set_state(gst.STATE_NULL) + self.set_state('NULL') error, debug = message.parse_error() logger.error(u'%s %s', error, debug) + # FIXME Should we send 'stop_playback' to core here? Can we + # differentiate on how serious the error is? def deliver_data(self, caps_string, data): """Deliver audio data to be played""" @@ -129,11 +148,20 @@ class GStreamerProcess(BaseProcess): """ Set the GStreamer state. Returns :class:`True` if successful. - :param state_name: READY, PLAYING, or PAUSED + .. digraph:: gst_state_transitions + + "NULL" -> "READY" + "PAUSED" -> "PLAYING" + "PAUSED" -> "READY" + "PLAYING" -> "PAUSED" + "READY" -> "NULL" + "READY" -> "PAUSED" + + :param state_name: NULL, READY, PAUSED, or PLAYING :type state_name: string :rtype: :class:`True` or :class:`False` """ - result = self.gst_uri_src.set_state( + result = self.gst_pipeline.set_state( getattr(gst, 'STATE_' + state_name)) if result == gst.STATE_CHANGE_SUCCESS: logger.debug('Setting GStreamer state to %s: OK', state_name)