diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 7f0fc7b7..5b715175 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -34,7 +34,12 @@ class GStreamer(ThreadingActor): """ def __init__(self): - self.gst_pipeline = None + self._pipeline = None + self._source = None + self._taginject = None + self._tee = None + self._uridecodebin = None + self._volume = None def on_start(self): self._setup_gstreamer() @@ -45,46 +50,44 @@ class GStreamer(ThreadingActor): :class:`mopidy.utils.process.GObjectEventThread` to be running. This is not enforced by :class:`GStreamer` itself. """ - base_pipeline = ' ! '.join([ + description = ' ! '.join([ + 'uridecodebin name=uri', 'audioconvert name=convert', 'volume name=volume', - 'taginject name=tag', - 'tee name=tee', - ]) + 'taginject name=inject', + 'tee name=tee']) - logger.debug(u'Setting up base GStreamer pipeline: %s', base_pipeline) + logger.debug(u'Setting up base GStreamer pipeline: %s', description) - self.gst_pipeline = gst.parse_launch(base_pipeline) + self._pipeline = gst.parse_launch(description) + self._taginject = self._pipeline.get_by_name('inject') + self._tee = self._pipeline.get_by_name('tee') + self._volume = self._pipeline.get_by_name('volume') + self._uridecodebin = self._pipeline.get_by_name('uri') - self.gst_tee = self.gst_pipeline.get_by_name('tee') - self.gst_convert = self.gst_pipeline.get_by_name('convert') - self.gst_volume = self.gst_pipeline.get_by_name('volume') - self.gst_taginject = self.gst_pipeline.get_by_name('tag') - - self.gst_uridecodebin = gst.element_factory_make('uridecodebin', 'uri') - self.gst_uridecodebin.connect('notify::source', self._process_new_source) - self.gst_uridecodebin.connect('pad-added', self._process_new_pad, - self.gst_convert.get_pad('sink')) - self.gst_pipeline.add(self.gst_uridecodebin) + self._uridecodebin.connect('notify::source', self._process_new_source) + self._uridecodebin.connect('pad-added', self._process_new_pad, + self._pipeline.get_by_name('convert').get_pad('sink')) for output in settings.OUTPUTS: output_cls = get_class(output)() - output_cls.connect_bin(self.gst_pipeline, self.gst_tee) + output_cls.connect_bin(self._pipeline, self._tee) # Setup bus and message processor - gst_bus = self.gst_pipeline.get_bus() - gst_bus.add_signal_watch() - gst_bus.connect('message', self._process_gstreamer_message) + bus = self._pipeline.get_bus() + bus.add_signal_watch() + bus.connect('message', self._process_gstreamer_message) def _process_new_source(self, element, pad): - source = element.get_by_name('source') + self._source = element.get_by_name('source') try: - source.set_property('caps', default_caps) + self._source.set_property('caps', default_caps) except TypeError: pass def _process_new_pad(self, source, pad, target_pad): - pad.link(target_pad) + if not pad.is_linked(): + pad.link(target_pad) def _process_gstreamer_message(self, bus, message): """Process messages from GStreamer.""" @@ -105,8 +108,13 @@ class GStreamer(ThreadingActor): return backend_refs[0].proxy() def set_uri(self, uri): - """Change internal uridecodebin's URI""" - self.gst_uridecodebin.set_property('uri', uri) + """ + Change internal uridecodebin's URI + + :param uri: the URI to play + :type uri: string + """ + self._uridecodebin.set_property('uri', uri) def deliver_data(self, capabilities, data): """ @@ -116,12 +124,11 @@ class GStreamer(ThreadingActor): :type capabilities: string :param data: raw audio data to be played """ - source = self.gst_pipeline.get_by_name('source') caps = gst.caps_from_string(capabilities) buffer_ = gst.Buffer(buffer(data)) buffer_.set_caps(caps) - source.set_property('caps', caps) - source.emit('push-buffer', buffer_) + self._source.set_property('caps', caps) + self._source.emit('push-buffer', buffer_) def end_of_data_stream(self): """ @@ -130,7 +137,7 @@ class GStreamer(ThreadingActor): We will get a GStreamer message when the stream playback reaches the token, and can then do any end-of-stream related tasks. """ - self.gst_pipeline.get_by_name('source').emit('end-of-stream') + self._source.emit('end-of-stream') def get_position(self): """ @@ -138,10 +145,10 @@ class GStreamer(ThreadingActor): :rtype: int """ - if self.gst_pipeline.get_state()[1] == gst.STATE_NULL: + if self._pipeline.get_state()[1] == gst.STATE_NULL: return 0 try: - position = self.gst_pipeline.query_position(gst.FORMAT_TIME)[0] + position = self._pipeline.query_position(gst.FORMAT_TIME)[0] return position // gst.MSECOND except gst.QueryError, e: logger.error('time_position failed: %s', e) @@ -155,10 +162,10 @@ class GStreamer(ThreadingActor): :type volume: int :rtype: :class:`True` if successful, else :class:`False` """ - self.gst_pipeline.get_state() # block until state changes are done - handeled = self.gst_pipeline.seek_simple(gst.Format(gst.FORMAT_TIME), + self._pipeline.get_state() # block until state changes are done + handeled = self._pipeline.seek_simple(gst.Format(gst.FORMAT_TIME), gst.SEEK_FLAG_FLUSH, position * gst.MSECOND) - self.gst_pipeline.get_state() # block until seek is done + self._pipeline.get_state() # block until seek is done return handeled def start_playback(self): @@ -199,7 +206,7 @@ class GStreamer(ThreadingActor): :type state_name: string :rtype: :class:`True` or :class:`False` """ - result = self.gst_pipeline.set_state(state) + result = self._pipeline.set_state(state) if result == gst.STATE_CHANGE_FAILURE: logger.warning('Setting GStreamer state to %s: failed', state.value_name) @@ -215,7 +222,7 @@ class GStreamer(ThreadingActor): :rtype: int in range [0..100] """ - return int(self.gst_volume.get_property('volume') * 100) + return int(self._volume.get_property('volume') * 100) def set_volume(self, volume): """ @@ -225,7 +232,7 @@ class GStreamer(ThreadingActor): :type volume: int :rtype: :class:`True` if successful, else :class:`False` """ - self.gst_volume.set_property('volume', volume / 100.0) + self._volume.set_property('volume', volume / 100.0) return True def set_metadata(self, track): @@ -244,4 +251,4 @@ class GStreamer(ThreadingActor): 'title': track.name, } logger.debug('Setting tags to: %s', tags) - self.gst_taginject.set_property('tags', tags) + self._taginject.set_property('tags', tags)