diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 5415a5ee..bda04b25 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -1,10 +1,12 @@ import gobject +gobject.threads_init() import pygst pygst.require('0.10') - import gst + import logging +import threading from mopidy.process import BaseProcess, unpickle_connection @@ -21,18 +23,26 @@ class GStreamerOutput(object): process = GStreamerProcess(core_queue, input_connection) process.start() +class GStreamerMessagesThread(threading.Thread): + def run(self): + gobject.MainLoop().run() + class GStreamerProcess(BaseProcess): """ A process for all work related to GStreamer. The main loop processes events from both Mopidy and GStreamer. + + Make sure this subprocess is started by the MainThread in the top-most + parent process, and not some other thread. If not, we can get into the + problems described at + http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html. """ def __init__(self, core_queue, input_connection): super(GStreamerProcess, self).__init__() self.core_queue = core_queue self.input_connection = input_connection - self.gobject_context = None self.gst_pipeline = None self.gst_bus = None self.gst_bus_id = None @@ -44,18 +54,17 @@ class GStreamerProcess(BaseProcess): def run_inside_try(self): self.setup() while True: - # FIXME Should we block on poll() or not? Need to see iteration() - # behaviour first. - if self.input_connection.poll(): + if self.input_connection.poll(None): message = self.input_connection.recv() self.process_mopidy_message(message) - self.gobject_context.iteration(True) def setup(self): - # See http://www.jejik.com/articles/2007/01/ - # python-gstreamer_threading_and_the_main_loop/ for details. - gobject.threads_init() - self.gobject_context = gobject.MainLoop().get_context() + logger.debug(u'Setting up GStreamer pipeline') + + # Start a helper thread that can run the gobject.MainLoop + messages_thread = GStreamerMessagesThread() + messages_thread.daemon = True + messages_thread.start() # A pipeline consisting of many elements self.gst_pipeline = gst.Pipeline("pipeline") @@ -67,8 +76,8 @@ class GStreamerProcess(BaseProcess): self.process_gst_message) # Bin for playing audio URIs - self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src') - self.gst_pipeline.add(self.gst_uri_src) + #self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src') + #self.gst_pipeline.add(self.gst_uri_src) # Bin for playing audio data self.gst_data_src = gst.element_factory_make('appsrc', 'data_src') @@ -109,8 +118,9 @@ class GStreamerProcess(BaseProcess): connection = unpickle_connection(message['reply_to']) connection.send(response) elif message['command'] == 'deliver_data': - # FIXME Do we care about sending responses for every data delivery? self.deliver_data(message['caps'], message['data']) + elif message['command'] == 'end_of_data_stream': + self.end_of_data_stream() elif message['command'] == 'set_state': response = self.set_state(message['state']) connection = unpickle_connection(message['reply_to']) @@ -121,7 +131,9 @@ class GStreamerProcess(BaseProcess): def process_gst_message(self, bus, message): """Process messages from GStreamer.""" if message.type == gst.MESSAGE_EOS: - self.core_queue.put({'message': 'end_of_track'}) + logger.debug(u'GStreamer signalled end-of-stream. ' + 'Sending end_of_track to core_queue ...') + self.core_queue.put({'command': 'end_of_track'}) elif message.type == gst.MESSAGE_ERROR: self.set_state('NULL') error, debug = message.parse_error() @@ -129,13 +141,6 @@ class GStreamerProcess(BaseProcess): # FIXME Should we send 'stop_playback' to core here? Can we # differentiate on how serious the error is? - def deliver_data(self, caps_string, data): - """Deliver audio data to be played""" - caps = gst.caps_from_string(caps_string) - buffer_ = gst.Buffer(data) - buffer_.set_caps(caps) - self.gst_data_src.emit('push-buffer', buffer_) - def play_uri(self, uri): """Play audio at URI""" self.set_state('READY') @@ -143,6 +148,23 @@ class GStreamerProcess(BaseProcess): self.set_state('PLAYING') # TODO Return status + def deliver_data(self, caps_string, data): + """Deliver audio data to be played""" + caps = gst.caps_from_string(caps_string) + buffer_ = gst.Buffer(buffer(data)) + buffer_.set_caps(caps) + self.gst_data_src.set_property('caps', caps) + self.gst_data_src.emit('push-buffer', buffer_) + + def end_of_data_stream(self): + """ + Add end-of-stream token to source. + + We will get a GStreamer message when the stream playback reaches the + token, and can then do any end-of-stream related tasks. + """ + self.gst_data_src.emit('end-of-stream') + def set_state(self, state_name): """ Set the GStreamer state. Returns :class:`True` if successful.