From 0ec662db63512d49a8bfc14ce026c0639d1f6968 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Sat, 14 May 2011 23:46:51 +0200 Subject: [PATCH 01/17] Move output connection code to gstreamer --- mopidy/gstreamer.py | 16 ++++++++++++++-- mopidy/outputs/__init__.py | 25 ++++++------------------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 3c8941fa..cd0a1f69 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -70,8 +70,7 @@ class GStreamer(ThreadingActor): self._pipeline.get_by_name('convert').get_pad('sink')) for output in settings.OUTPUTS: - output_cls = get_class(output)() - output_cls.connect_bin(self._pipeline, self._tee) + self.connect_output(get_class(output)) # Setup bus and message processor bus = self._pipeline.get_bus() @@ -262,3 +261,16 @@ class GStreamer(ThreadingActor): } logger.debug('Setting tags to: %s', tags) self._taginject.set_property('tags', tags) + + def connect_output(self, cls): + """ + Connect output to pipeline. + + :param output: output to connect to our pipeline. + :type output: :class:`BaseOutput` + """ + output = cls().get_bin() + + self._pipeline.add(output) + output.sync_state_with_parent() # Required to add to running pipe + gst.element_link_many(self._tee, output) diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index a3aff0d8..c2b2fc6d 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -1,38 +1,25 @@ -import logging - import pygst pygst.require('0.10') import gst -logger = logging.getLogger('mopidy.outputs') +import logging +logger = logging.getLogger('mopidy.outputs') class BaseOutput(object): """Base class for providing support for multiple pluggable outputs.""" - def connect_bin(self, pipeline, element): + def get_bin(self): """ - Connect output bin to pipeline and given element. - - In normal cases the element will probably be a `tee`, - thus allowing us to connect any number of outputs. This - however is why each bin is forced to have its own `queue` - after the `tee`. - - :param pipeline: gst.Pipeline to add output to. - :type pipeline: :class:`gst.Pipeline` - :param element: gst.Element in pipeline to connect output to. - :type element: :class:`gst.Element` + Build output bin that will attached to pipeline. """ description = 'queue ! %s' % self.describe_bin() - logger.debug('Adding new output to tee: %s', description) + logger.debug('Creating new output: %s', description) output = gst.parse_bin_from_description(description, True) self.modify_bin(output) - pipeline.add(output) - output.sync_state_with_parent() # Required to add to running pipe - gst.element_link_many(element, output) + return output def modify_bin(self, output): """ From fe3f5338dd32fb85c0a7165a64ccdf85ad30bbff Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Sat, 14 May 2011 23:54:19 +0200 Subject: [PATCH 02/17] Store outputs that have been added in gstreamer class --- mopidy/gstreamer.py | 6 ++++++ mopidy/outputs/__init__.py | 1 + 2 files changed, 7 insertions(+) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index cd0a1f69..a581191d 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -40,6 +40,7 @@ class GStreamer(ThreadingActor): self._tee = None self._uridecodebin = None self._volume = None + self._outputs = {} def on_start(self): self._setup_gstreamer() @@ -274,3 +275,8 @@ class GStreamer(ThreadingActor): self._pipeline.add(output) output.sync_state_with_parent() # Required to add to running pipe gst.element_link_many(self._tee, output) + + self._outputs[output.get_name()] = output + + def list_outputs(self): + return self._outputs.keys() diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index c2b2fc6d..28bad40a 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -17,6 +17,7 @@ class BaseOutput(object): logger.debug('Creating new output: %s', description) output = gst.parse_bin_from_description(description, True) + output.set_name(self.__class__.__name__) self.modify_bin(output) return output From 743235b09d7bfc93febae1ba886c2a28e6f8cea7 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Sun, 15 May 2011 23:50:52 +0200 Subject: [PATCH 03/17] Add basic remove_output code --- mopidy/gstreamer.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index a581191d..fbb5d44a 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -278,5 +278,22 @@ class GStreamer(ThreadingActor): self._outputs[output.get_name()] = output + logger.info('Added %s', output.get_name()) + def list_outputs(self): return self._outputs.keys() + + def remove_output(self, name): + if name not in self._outputs: + return # FIXME raise mopidy exception of some sort? + src = self._taginject.get_pad('src') + src.set_blocked_async(True, self._blocked_callback, name) + + def _blocked_callback(self, pad, blocked, name): + output = self._outputs.pop(name) + gst.element_unlink_many(self._tee, output) + output.set_state(gst.STATE_NULL) + self._pipeline.remove(output) + pad.set_blocked(False) + + logger.warning(u'Removed %s', name) From 3f35e9b3913bb99cf7b299c36528eefa878337f4 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 00:14:33 +0200 Subject: [PATCH 04/17] Add method to determine output name --- mopidy/outputs/__init__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index 28bad40a..d2a67b88 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -17,11 +17,19 @@ class BaseOutput(object): logger.debug('Creating new output: %s', description) output = gst.parse_bin_from_description(description, True) - output.set_name(self.__class__.__name__) + output.set_name(self.get_name()) self.modify_bin(output) return output + def get_name(self): + """ + Return name of output in gstreamer context. + + Defaults to class name, can be overriden by sub classes if required. + """ + return self.__class__.__name__ + def modify_bin(self, output): """ Modifies bin before it is installed if needed. From 09a1d646f24efea5ff4d6170f370582d29a4eca9 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:08:01 +0200 Subject: [PATCH 05/17] Refactor BaseOutput to prepare for better error handling --- mopidy/gstreamer.py | 25 +++++++++++-------------- mopidy/outputs/__init__.py | 28 ++++++++++++++++++---------- mopidy/outputs/shoutcast.py | 5 +++-- 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index fbb5d44a..9b702b6b 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -40,7 +40,7 @@ class GStreamer(ThreadingActor): self._tee = None self._uridecodebin = None self._volume = None - self._outputs = {} + self._outputs = [] def on_start(self): self._setup_gstreamer() @@ -71,7 +71,7 @@ class GStreamer(ThreadingActor): self._pipeline.get_by_name('convert').get_pad('sink')) for output in settings.OUTPUTS: - self.connect_output(get_class(output)) + get_class(output)(self).connect() # Setup bus and message processor bus = self._pipeline.get_bus() @@ -263,37 +263,34 @@ class GStreamer(ThreadingActor): logger.debug('Setting tags to: %s', tags) self._taginject.set_property('tags', tags) - def connect_output(self, cls): + def connect_output(self, output): """ Connect output to pipeline. :param output: output to connect to our pipeline. - :type output: :class:`BaseOutput` + :type output: :class:`gst.Bin` """ - output = cls().get_bin() - self._pipeline.add(output) output.sync_state_with_parent() # Required to add to running pipe gst.element_link_many(self._tee, output) - - self._outputs[output.get_name()] = output - + self._outputs.append(output) logger.info('Added %s', output.get_name()) def list_outputs(self): return self._outputs.keys() - def remove_output(self, name): + def remove_output(self, output): + logger.debug('Trying to remove %s', output.get_name()) if name not in self._outputs: return # FIXME raise mopidy exception of some sort? src = self._taginject.get_pad('src') - src.set_blocked_async(True, self._blocked_callback, name) + src.set_blocked_async(True, self._blocked_callback, output) - def _blocked_callback(self, pad, blocked, name): - output = self._outputs.pop(name) + def _blocked_callback(self, pad, blocked, output): gst.element_unlink_many(self._tee, output) output.set_state(gst.STATE_NULL) self._pipeline.remove(output) + self._outputs.remove(output) pad.set_blocked(False) - logger.warning(u'Removed %s', name) + logger.warning(u'Removed %s', output.get_name()) diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index d2a67b88..ea1f511d 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -9,18 +9,29 @@ logger = logging.getLogger('mopidy.outputs') class BaseOutput(object): """Base class for providing support for multiple pluggable outputs.""" - def get_bin(self): + def __init__(self, gstreamer): + self.gstreamer = gstreamer + self.bin = self.build_bin() + self.bin.set_name(self.get_name()) + + self.modify_bin() + + def build_bin(self): """ Build output bin that will attached to pipeline. """ description = 'queue ! %s' % self.describe_bin() logger.debug('Creating new output: %s', description) - output = gst.parse_bin_from_description(description, True) - output.set_name(self.get_name()) - self.modify_bin(output) + return gst.parse_bin_from_description(description, True) - return output + def connect(self): + """Convenience wrapper to attach output to GStreamer pipeline""" + self.gstreamer.connect_output(self.bin) + + def remove(self): + """Convenience wrapper to remove output from GStreamer pipeline""" + self.gstreamer.remove_output(self.bin) def get_name(self): """ @@ -30,16 +41,13 @@ class BaseOutput(object): """ return self.__class__.__name__ - def modify_bin(self, output): + def modify_bin(self): """ - Modifies bin before it is installed if needed. + Modifies ``self.bin`` before it is installed if needed. Overriding this method allows for outputs to modify the constructed bin before it is installed. This can for instance be a good place to call `set_properties` on elements that need to be configured. - - :param output: gst.Bin to modify in some way. - :type output: :class:`gst.Bin` """ pass diff --git a/mopidy/outputs/shoutcast.py b/mopidy/outputs/shoutcast.py index d13b1085..26af449b 100644 --- a/mopidy/outputs/shoutcast.py +++ b/mopidy/outputs/shoutcast.py @@ -15,8 +15,9 @@ class ShoutcastOutput(BaseOutput): return 'audioconvert ! %s ! shout2send name=shoutcast' \ % settings.SHOUTCAST_OUTPUT_ENCODER - def modify_bin(self, output): - self.set_properties(output.get_by_name('shoutcast'), { + def modify_bin(self): + shoutcast = self.bin.get_by_name('shoutcast') + self.set_properties(shoutcast, { u'ip': settings.SHOUTCAST_OUTPUT_SERVER, u'mount': settings.SHOUTCAST_OUTPUT_MOUNT, u'port': settings.SHOUTCAST_OUTPUT_PORT, From f7f26403afc5215c204c710fbb459fc72e989a99 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:49:19 +0200 Subject: [PATCH 06/17] Move stop playback in error handler to after the error is logged to make cause and effect more obvious --- mopidy/gstreamer.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 9b702b6b..7c12e579 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -96,11 +96,9 @@ class GStreamer(ThreadingActor): 'Telling backend ...') self._get_backend().playback.on_end_of_track() elif message.type == gst.MESSAGE_ERROR: - self.stop_playback() error, debug = message.parse_error() logger.error(u'%s %s', error, debug) - # FIXME Should we send 'stop_playback' to the backend here? Can we - # differentiate on how serious the error is? + self.stop_playback() elif message.type == gst.MESSAGE_WARNING: error, debug = message.parse_warning() logger.warning(u'%s %s', error, debug) From 64a6202ea124db6cab2d5b8030a3b97b511d5c53 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:50:02 +0200 Subject: [PATCH 07/17] Add gst MESSAGE_* constansts to BaseOutput --- mopidy/outputs/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index ea1f511d..0423a62b 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -9,6 +9,10 @@ logger = logging.getLogger('mopidy.outputs') class BaseOutput(object): """Base class for providing support for multiple pluggable outputs.""" + MESSAGE_EOS = gst.MESSAGE_EOS + MESSAGE_ERROR = gst.MESSAGE_ERROR + MESSAGE_WARNING = gst.MESSAGE_WARNING + def __init__(self, gstreamer): self.gstreamer = gstreamer self.bin = self.build_bin() From fae784b71edd23f0ed6ba5055152082e1eebff1c Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:51:38 +0200 Subject: [PATCH 08/17] Add on_remove and on_connect hooks to BaseOutput --- mopidy/outputs/__init__.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index 0423a62b..8cf57c53 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -30,12 +30,20 @@ class BaseOutput(object): return gst.parse_bin_from_description(description, True) def connect(self): - """Convenience wrapper to attach output to GStreamer pipeline""" - self.gstreamer.connect_output(self.bin) + """Attach output to GStreamer pipeline""" + self.gstreamer.connect_output(self) + self.on_connect() + + def on_connect(self): + pass def remove(self): - """Convenience wrapper to remove output from GStreamer pipeline""" - self.gstreamer.remove_output(self.bin) + """Remove output from GStreamer pipeline""" + self.gstreamer.remove_output(self) + self.on_remove() + + def on_remove(self): + pass def get_name(self): """ From 943645aecf4e7c0ef1b27c84494372e4212f46b8 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:53:01 +0200 Subject: [PATCH 09/17] Add methods to register and register message handlers to GStreamer --- mopidy/gstreamer.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 7c12e579..69258f81 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -41,6 +41,7 @@ class GStreamer(ThreadingActor): self._uridecodebin = None self._volume = None self._outputs = [] + self._handlers = {} def on_start(self): self._setup_gstreamer() @@ -292,3 +293,9 @@ class GStreamer(ThreadingActor): pad.set_blocked(False) logger.warning(u'Removed %s', output.get_name()) + + def connect_message_handler(self, element, handler): + self._handlers[element] = handler + + def remove_message_handler(self, element): + self._handlers.pop(element, None) From 217472362051f5be52c48e9d7fcfaa2d27c702aa Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:53:53 +0200 Subject: [PATCH 10/17] Add conditional to message handler so that attached handlers can take over --- mopidy/gstreamer.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 69258f81..66e46c14 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -92,6 +92,10 @@ class GStreamer(ThreadingActor): def _process_gstreamer_message(self, bus, message): """Process messages from GStreamer.""" + if message.src in self._handlers: + if self._handlers[message.src](message): + return # Message was handeled by output + if message.type == gst.MESSAGE_EOS: logger.debug(u'GStreamer signalled end-of-stream. ' 'Telling backend ...') From 94efb083ee3a5b0fc9ebac9266a48ac4bf2fd9a6 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:55:00 +0200 Subject: [PATCH 11/17] Fixed list outputs method in GStreamer class --- mopidy/gstreamer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 66e46c14..255e7d7d 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -280,7 +280,7 @@ class GStreamer(ThreadingActor): logger.info('Added %s', output.get_name()) def list_outputs(self): - return self._outputs.keys() + return [output.get_name() for output in self._outputs] def remove_output(self, output): logger.debug('Trying to remove %s', output.get_name()) From a4b03aa292c6532aa90d5e04163fb604278bd777 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 21:57:23 +0200 Subject: [PATCH 12/17] Cleanup some of output code api --- mopidy/gstreamer.py | 5 ++--- mopidy/outputs/__init__.py | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 255e7d7d..0e866135 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -284,7 +284,7 @@ class GStreamer(ThreadingActor): def remove_output(self, output): logger.debug('Trying to remove %s', output.get_name()) - if name not in self._outputs: + if output not in self._outputs: return # FIXME raise mopidy exception of some sort? src = self._taginject.get_pad('src') src.set_blocked_async(True, self._blocked_callback, output) @@ -293,9 +293,8 @@ class GStreamer(ThreadingActor): gst.element_unlink_many(self._tee, output) output.set_state(gst.STATE_NULL) self._pipeline.remove(output) - self._outputs.remove(output) pad.set_blocked(False) - + self._outputs.remove(output) logger.warning(u'Removed %s', output.get_name()) def connect_message_handler(self, element, handler): diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index 8cf57c53..b59566b3 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -31,7 +31,7 @@ class BaseOutput(object): def connect(self): """Attach output to GStreamer pipeline""" - self.gstreamer.connect_output(self) + self.gstreamer.connect_output(self.bin) self.on_connect() def on_connect(self): @@ -39,7 +39,7 @@ class BaseOutput(object): def remove(self): """Remove output from GStreamer pipeline""" - self.gstreamer.remove_output(self) + self.gstreamer.remove_output(self.bin) self.on_remove() def on_remove(self): From 2ef550eb7bb8506cf5621c5404c58443576a7fc3 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 22:01:30 +0200 Subject: [PATCH 13/17] Add error handling code for shoutcast errors --- mopidy/outputs/shoutcast.py | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/mopidy/outputs/shoutcast.py b/mopidy/outputs/shoutcast.py index 26af449b..4298bba5 100644 --- a/mopidy/outputs/shoutcast.py +++ b/mopidy/outputs/shoutcast.py @@ -1,6 +1,10 @@ +import logging + from mopidy import settings from mopidy.outputs import BaseOutput +logger = logging.getLogger('mopidy.outputs.shoutcast') + class ShoutcastOutput(BaseOutput): """ Shoutcast streaming output. @@ -16,11 +20,26 @@ class ShoutcastOutput(BaseOutput): % settings.SHOUTCAST_OUTPUT_ENCODER def modify_bin(self): - shoutcast = self.bin.get_by_name('shoutcast') - self.set_properties(shoutcast, { + self.set_properties(self.bin.get_by_name('shoutcast'), { u'ip': settings.SHOUTCAST_OUTPUT_SERVER, u'mount': settings.SHOUTCAST_OUTPUT_MOUNT, u'port': settings.SHOUTCAST_OUTPUT_PORT, u'username': settings.SHOUTCAST_OUTPUT_USERNAME, u'password': settings.SHOUTCAST_OUTPUT_PASSWORD, }) + + def on_connect(self): + self.gstreamer.connect_message_handler( + self.bin.get_by_name('shoutcast'), self.message_handler) + + def on_remove(self): + self.gstreamer.remove_message_handler( + self.bin.get_by_name('shoutcast')) + + def message_handler(self, message): + if message.type != self.MESSAGE_ERROR: + return False + error, debug = message.parse_error() + logger.warning('%s (%s)', error, debug) + self.remove() + return True From 82bd77e24b2042c7d046e9ef5c3e46eb96959fb2 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 16 May 2011 23:14:53 +0200 Subject: [PATCH 14/17] Fixed use of callbacks to prevent removeall of broken output from stopping playback --- mopidy/gstreamer.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 0e866135..36fda08b 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -283,17 +283,18 @@ class GStreamer(ThreadingActor): return [output.get_name() for output in self._outputs] def remove_output(self, output): - logger.debug('Trying to remove %s', output.get_name()) if output not in self._outputs: return # FIXME raise mopidy exception of some sort? - src = self._taginject.get_pad('src') + src = output.get_pad('sink').get_peer() src.set_blocked_async(True, self._blocked_callback, output) def _blocked_callback(self, pad, blocked, output): gst.element_unlink_many(self._tee, output) + pad.set_blocked_async(False, self._unblocked_callback, output) + + def _unblocked_callback(self, pad, blocked, output): output.set_state(gst.STATE_NULL) self._pipeline.remove(output) - pad.set_blocked(False) self._outputs.remove(output) logger.warning(u'Removed %s', output.get_name()) From 801b3d1155c00a89d2438d1bad439d296a146e1f Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Tue, 17 May 2011 01:21:50 +0200 Subject: [PATCH 15/17] Switch to event probe based solution to removing outputs. Based on http://lists.freedesktop.org/archives/gstreamer-devel/2009-August/023708.html --- mopidy/gstreamer.py | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 36fda08b..78a3c9bf 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -285,18 +285,38 @@ class GStreamer(ThreadingActor): def remove_output(self, output): if output not in self._outputs: return # FIXME raise mopidy exception of some sort? - src = output.get_pad('sink').get_peer() - src.set_blocked_async(True, self._blocked_callback, output) + teesrc = output.get_pad('sink').get_peer() + handler = teesrc.add_event_probe(self._handle_event_probe) - def _blocked_callback(self, pad, blocked, output): - gst.element_unlink_many(self._tee, output) - pad.set_blocked_async(False, self._unblocked_callback, output) + struct = gst.Structure('mopidy-unlink-tee') + struct.set_value('handler', handler) - def _unblocked_callback(self, pad, blocked, output): - output.set_state(gst.STATE_NULL) - self._pipeline.remove(output) - self._outputs.remove(output) - logger.warning(u'Removed %s', output.get_name()) + event = gst.event_new_custom(gst.EVENT_CUSTOM_DOWNSTREAM, struct) + self._tee.send_event(event) + + def _handle_event_probe(self, teesrc, event): + if event.type == gst.EVENT_CUSTOM_DOWNSTREAM and event.has_name('mopidy-unlink-tee'): + data = self._get_structure_data(event.get_structure()) + + output = teesrc.get_peer().get_parent() + + teesrc.unlink(teesrc.get_peer()) + teesrc.remove_event_probe(data['handler']) + + output.set_state(gst.STATE_NULL) + self._pipeline.remove(output) + + logger.warning('Removed %s', output.get_name()) + return False + return True + + def _get_structure_data(self, struct): + # Ugly hack to get around missing get_value in pygst bindings :/ + data = {} + def get_data(key, value): + data[key] = value + struct.foreach(get_data) + return data def connect_message_handler(self, element, handler): self._handlers[element] = handler From eb5facd4b6cc234d9fb7c738845bef1c4fc427a6 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Tue, 17 May 2011 16:10:28 +0200 Subject: [PATCH 16/17] Add docstrings --- mopidy/gstreamer.py | 33 +++++++++++++++++++++++++++++++++ mopidy/outputs/__init__.py | 2 ++ 2 files changed, 35 insertions(+) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 78a3c9bf..dc1bd73f 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -283,6 +283,12 @@ class GStreamer(ThreadingActor): return [output.get_name() for output in self._outputs] def remove_output(self, output): + """ + Remove output from our pipeline. + + :param output: output to remove from our pipeline. + :type output: :class:`gst.Bin` + """ if output not in self._outputs: return # FIXME raise mopidy exception of some sort? teesrc = output.get_pad('sink').get_peer() @@ -319,7 +325,34 @@ class GStreamer(ThreadingActor): return data def connect_message_handler(self, element, handler): + """ + Attach custom message handler for given element. + + Hook to allow outputs (or other code) to register custom message + handlers for all message comming from the element in question. + + In the case of outputs ``on_connect`` should be used to attach such + handlers and care should be taken to remove them in ``on_remove``. + + The handler callback will only be given the message in question, and + is free to ignore the message. However, if the handler wants to prevent + the default handling of the message it should return ``True`` indicating + that the message has been handeled. + + (Note that there can only be on handler per element) + + :param element: element to watch messages from. + :type element: :class:`gst.Element` + :param handler: function that expects `gst.Message`, should return + ``True`` if message has been handeled. + """ self._handlers[element] = handler def remove_message_handler(self, element): + """ + Remove custom message handler. + + :param element: element to remove message handling from. + :type element: :class:`gst.Element` + """ self._handlers.pop(element, None) diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index b59566b3..ab27b87f 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -35,6 +35,7 @@ class BaseOutput(object): self.on_connect() def on_connect(self): + """Called after output has been connected to GStreamer pipeline""" pass def remove(self): @@ -43,6 +44,7 @@ class BaseOutput(object): self.on_remove() def on_remove(self): + """Called after output has been remove from GStreamer pipeline""" pass def get_name(self): From d57bb281c3c5ed3d7ca2888e8408bc64941b2033 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 18 May 2011 21:16:58 +0200 Subject: [PATCH 17/17] Fixed docs based on comments on pull request --- mopidy/gstreamer.py | 16 +++++++++------- mopidy/outputs/__init__.py | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index dc1bd73f..6e4ad05f 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -290,7 +290,8 @@ class GStreamer(ThreadingActor): :type output: :class:`gst.Bin` """ if output not in self._outputs: - return # FIXME raise mopidy exception of some sort? + raise LookupError('Ouput %s not present in pipeline' + % output.get_name) teesrc = output.get_pad('sink').get_peer() handler = teesrc.add_event_probe(self._handle_event_probe) @@ -329,19 +330,20 @@ class GStreamer(ThreadingActor): Attach custom message handler for given element. Hook to allow outputs (or other code) to register custom message - handlers for all message comming from the element in question. + handlers for all messages coming from the element in question. - In the case of outputs ``on_connect`` should be used to attach such - handlers and care should be taken to remove them in ``on_remove``. + In the case of outputs :meth:`mopidy.outputs.BaseOuptut.on_connect` + should be used to attach such handlers and care should be taken to + remove them in :meth:`mopidy.outputs.BaseOuptut.on_remove`. The handler callback will only be given the message in question, and is free to ignore the message. However, if the handler wants to prevent - the default handling of the message it should return ``True`` indicating - that the message has been handeled. + the default handling of the message it should return :class:`True` + indicating that the message has been handled. (Note that there can only be on handler per element) - :param element: element to watch messages from. + :param element: element to watch messages from :type element: :class:`gst.Element` :param handler: function that expects `gst.Message`, should return ``True`` if message has been handeled. diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index ab27b87f..a85088a6 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -51,7 +51,7 @@ class BaseOutput(object): """ Return name of output in gstreamer context. - Defaults to class name, can be overriden by sub classes if required. + Defaults to class name, can be overriden by subclasses if required. """ return self.__class__.__name__