Cleanup gstreamer. Simplified code and reduced get_by_name usage.

This commit is contained in:
Thomas Adamcik 2011-05-04 23:21:26 +02:00
parent 03c30369b3
commit 6f9be11594

View File

@ -34,7 +34,12 @@ class GStreamer(ThreadingActor):
""" """
def __init__(self): def __init__(self):
self.gst_pipeline = None self._pipeline = None
self._source = None
self._taginject = None
self._tee = None
self._uridecodebin = None
self._volume = None
def on_start(self): def on_start(self):
self._setup_gstreamer() self._setup_gstreamer()
@ -45,46 +50,44 @@ class GStreamer(ThreadingActor):
:class:`mopidy.utils.process.GObjectEventThread` to be running. This is :class:`mopidy.utils.process.GObjectEventThread` to be running. This is
not enforced by :class:`GStreamer` itself. not enforced by :class:`GStreamer` itself.
""" """
base_pipeline = ' ! '.join([ description = ' ! '.join([
'uridecodebin name=uri',
'audioconvert name=convert', 'audioconvert name=convert',
'volume name=volume', 'volume name=volume',
'taginject name=tag', 'taginject name=inject',
'tee name=tee', 'tee name=tee'])
])
logger.debug(u'Setting up base GStreamer pipeline: %s', base_pipeline) logger.debug(u'Setting up base GStreamer pipeline: %s', description)
self.gst_pipeline = gst.parse_launch(base_pipeline) self._pipeline = gst.parse_launch(description)
self._taginject = self._pipeline.get_by_name('inject')
self._tee = self._pipeline.get_by_name('tee')
self._volume = self._pipeline.get_by_name('volume')
self._uridecodebin = self._pipeline.get_by_name('uri')
self.gst_tee = self.gst_pipeline.get_by_name('tee') self._uridecodebin.connect('notify::source', self._process_new_source)
self.gst_convert = self.gst_pipeline.get_by_name('convert') self._uridecodebin.connect('pad-added', self._process_new_pad,
self.gst_volume = self.gst_pipeline.get_by_name('volume') self._pipeline.get_by_name('convert').get_pad('sink'))
self.gst_taginject = self.gst_pipeline.get_by_name('tag')
self.gst_uridecodebin = gst.element_factory_make('uridecodebin', 'uri')
self.gst_uridecodebin.connect('notify::source', self._process_new_source)
self.gst_uridecodebin.connect('pad-added', self._process_new_pad,
self.gst_convert.get_pad('sink'))
self.gst_pipeline.add(self.gst_uridecodebin)
for output in settings.OUTPUTS: for output in settings.OUTPUTS:
output_cls = get_class(output)() output_cls = get_class(output)()
output_cls.connect_bin(self.gst_pipeline, self.gst_tee) output_cls.connect_bin(self._pipeline, self._tee)
# Setup bus and message processor # Setup bus and message processor
gst_bus = self.gst_pipeline.get_bus() bus = self._pipeline.get_bus()
gst_bus.add_signal_watch() bus.add_signal_watch()
gst_bus.connect('message', self._process_gstreamer_message) bus.connect('message', self._process_gstreamer_message)
def _process_new_source(self, element, pad): def _process_new_source(self, element, pad):
source = element.get_by_name('source') self._source = element.get_by_name('source')
try: try:
source.set_property('caps', default_caps) self._source.set_property('caps', default_caps)
except TypeError: except TypeError:
pass pass
def _process_new_pad(self, source, pad, target_pad): def _process_new_pad(self, source, pad, target_pad):
pad.link(target_pad) if not pad.is_linked():
pad.link(target_pad)
def _process_gstreamer_message(self, bus, message): def _process_gstreamer_message(self, bus, message):
"""Process messages from GStreamer.""" """Process messages from GStreamer."""
@ -105,8 +108,13 @@ class GStreamer(ThreadingActor):
return backend_refs[0].proxy() return backend_refs[0].proxy()
def set_uri(self, uri): def set_uri(self, uri):
"""Change internal uridecodebin's URI""" """
self.gst_uridecodebin.set_property('uri', uri) Change internal uridecodebin's URI
:param uri: the URI to play
:type uri: string
"""
self._uridecodebin.set_property('uri', uri)
def deliver_data(self, capabilities, data): def deliver_data(self, capabilities, data):
""" """
@ -116,12 +124,11 @@ class GStreamer(ThreadingActor):
:type capabilities: string :type capabilities: string
:param data: raw audio data to be played :param data: raw audio data to be played
""" """
source = self.gst_pipeline.get_by_name('source')
caps = gst.caps_from_string(capabilities) caps = gst.caps_from_string(capabilities)
buffer_ = gst.Buffer(buffer(data)) buffer_ = gst.Buffer(buffer(data))
buffer_.set_caps(caps) buffer_.set_caps(caps)
source.set_property('caps', caps) self._source.set_property('caps', caps)
source.emit('push-buffer', buffer_) self._source.emit('push-buffer', buffer_)
def end_of_data_stream(self): def end_of_data_stream(self):
""" """
@ -130,7 +137,7 @@ class GStreamer(ThreadingActor):
We will get a GStreamer message when the stream playback reaches the We will get a GStreamer message when the stream playback reaches the
token, and can then do any end-of-stream related tasks. token, and can then do any end-of-stream related tasks.
""" """
self.gst_pipeline.get_by_name('source').emit('end-of-stream') self._source.emit('end-of-stream')
def get_position(self): def get_position(self):
""" """
@ -138,10 +145,10 @@ class GStreamer(ThreadingActor):
:rtype: int :rtype: int
""" """
if self.gst_pipeline.get_state()[1] == gst.STATE_NULL: if self._pipeline.get_state()[1] == gst.STATE_NULL:
return 0 return 0
try: try:
position = self.gst_pipeline.query_position(gst.FORMAT_TIME)[0] position = self._pipeline.query_position(gst.FORMAT_TIME)[0]
return position // gst.MSECOND return position // gst.MSECOND
except gst.QueryError, e: except gst.QueryError, e:
logger.error('time_position failed: %s', e) logger.error('time_position failed: %s', e)
@ -155,10 +162,10 @@ class GStreamer(ThreadingActor):
:type volume: int :type volume: int
:rtype: :class:`True` if successful, else :class:`False` :rtype: :class:`True` if successful, else :class:`False`
""" """
self.gst_pipeline.get_state() # block until state changes are done self._pipeline.get_state() # block until state changes are done
handeled = self.gst_pipeline.seek_simple(gst.Format(gst.FORMAT_TIME), handeled = self._pipeline.seek_simple(gst.Format(gst.FORMAT_TIME),
gst.SEEK_FLAG_FLUSH, position * gst.MSECOND) gst.SEEK_FLAG_FLUSH, position * gst.MSECOND)
self.gst_pipeline.get_state() # block until seek is done self._pipeline.get_state() # block until seek is done
return handeled return handeled
def start_playback(self): def start_playback(self):
@ -199,7 +206,7 @@ class GStreamer(ThreadingActor):
:type state_name: string :type state_name: string
:rtype: :class:`True` or :class:`False` :rtype: :class:`True` or :class:`False`
""" """
result = self.gst_pipeline.set_state(state) result = self._pipeline.set_state(state)
if result == gst.STATE_CHANGE_FAILURE: if result == gst.STATE_CHANGE_FAILURE:
logger.warning('Setting GStreamer state to %s: failed', logger.warning('Setting GStreamer state to %s: failed',
state.value_name) state.value_name)
@ -215,7 +222,7 @@ class GStreamer(ThreadingActor):
:rtype: int in range [0..100] :rtype: int in range [0..100]
""" """
return int(self.gst_volume.get_property('volume') * 100) return int(self._volume.get_property('volume') * 100)
def set_volume(self, volume): def set_volume(self, volume):
""" """
@ -225,7 +232,7 @@ class GStreamer(ThreadingActor):
:type volume: int :type volume: int
:rtype: :class:`True` if successful, else :class:`False` :rtype: :class:`True` if successful, else :class:`False`
""" """
self.gst_volume.set_property('volume', volume / 100.0) self._volume.set_property('volume', volume / 100.0)
return True return True
def set_metadata(self, track): def set_metadata(self, track):
@ -244,4 +251,4 @@ class GStreamer(ThreadingActor):
'title': track.name, 'title': track.name,
} }
logger.debug('Setting tags to: %s', tags) logger.debug('Setting tags to: %s', tags)
self.gst_taginject.set_property('tags', tags) self._taginject.set_property('tags', tags)