diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 3c8941fa..6e4ad05f 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -40,6 +40,8 @@ class GStreamer(ThreadingActor): self._tee = None self._uridecodebin = None self._volume = None + self._outputs = [] + self._handlers = {} def on_start(self): self._setup_gstreamer() @@ -70,8 +72,7 @@ class GStreamer(ThreadingActor): self._pipeline.get_by_name('convert').get_pad('sink')) for output in settings.OUTPUTS: - output_cls = get_class(output)() - output_cls.connect_bin(self._pipeline, self._tee) + get_class(output)(self).connect() # Setup bus and message processor bus = self._pipeline.get_bus() @@ -91,16 +92,18 @@ class GStreamer(ThreadingActor): def _process_gstreamer_message(self, bus, message): """Process messages from GStreamer.""" + if message.src in self._handlers: + if self._handlers[message.src](message): + return # Message was handeled by output + if message.type == gst.MESSAGE_EOS: logger.debug(u'GStreamer signalled end-of-stream. ' 'Telling backend ...') self._get_backend().playback.on_end_of_track() elif message.type == gst.MESSAGE_ERROR: - self.stop_playback() error, debug = message.parse_error() logger.error(u'%s %s', error, debug) - # FIXME Should we send 'stop_playback' to the backend here? Can we - # differentiate on how serious the error is? + self.stop_playback() elif message.type == gst.MESSAGE_WARNING: error, debug = message.parse_warning() logger.warning(u'%s %s', error, debug) @@ -262,3 +265,96 @@ class GStreamer(ThreadingActor): } logger.debug('Setting tags to: %s', tags) self._taginject.set_property('tags', tags) + + def connect_output(self, output): + """ + Connect output to pipeline. + + :param output: output to connect to our pipeline. + :type output: :class:`gst.Bin` + """ + self._pipeline.add(output) + output.sync_state_with_parent() # Required to add to running pipe + gst.element_link_many(self._tee, output) + self._outputs.append(output) + logger.info('Added %s', output.get_name()) + + def list_outputs(self): + return [output.get_name() for output in self._outputs] + + def remove_output(self, output): + """ + Remove output from our pipeline. + + :param output: output to remove from our pipeline. + :type output: :class:`gst.Bin` + """ + if output not in self._outputs: + raise LookupError('Ouput %s not present in pipeline' + % output.get_name) + teesrc = output.get_pad('sink').get_peer() + handler = teesrc.add_event_probe(self._handle_event_probe) + + struct = gst.Structure('mopidy-unlink-tee') + struct.set_value('handler', handler) + + event = gst.event_new_custom(gst.EVENT_CUSTOM_DOWNSTREAM, struct) + self._tee.send_event(event) + + def _handle_event_probe(self, teesrc, event): + if event.type == gst.EVENT_CUSTOM_DOWNSTREAM and event.has_name('mopidy-unlink-tee'): + data = self._get_structure_data(event.get_structure()) + + output = teesrc.get_peer().get_parent() + + teesrc.unlink(teesrc.get_peer()) + teesrc.remove_event_probe(data['handler']) + + output.set_state(gst.STATE_NULL) + self._pipeline.remove(output) + + logger.warning('Removed %s', output.get_name()) + return False + return True + + def _get_structure_data(self, struct): + # Ugly hack to get around missing get_value in pygst bindings :/ + data = {} + def get_data(key, value): + data[key] = value + struct.foreach(get_data) + return data + + def connect_message_handler(self, element, handler): + """ + Attach custom message handler for given element. + + Hook to allow outputs (or other code) to register custom message + handlers for all messages coming from the element in question. + + In the case of outputs :meth:`mopidy.outputs.BaseOuptut.on_connect` + should be used to attach such handlers and care should be taken to + remove them in :meth:`mopidy.outputs.BaseOuptut.on_remove`. + + The handler callback will only be given the message in question, and + is free to ignore the message. However, if the handler wants to prevent + the default handling of the message it should return :class:`True` + indicating that the message has been handled. + + (Note that there can only be on handler per element) + + :param element: element to watch messages from + :type element: :class:`gst.Element` + :param handler: function that expects `gst.Message`, should return + ``True`` if message has been handeled. + """ + self._handlers[element] = handler + + def remove_message_handler(self, element): + """ + Remove custom message handler. + + :param element: element to remove message handling from. + :type element: :class:`gst.Element` + """ + self._handlers.pop(element, None) diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py index a3aff0d8..a85088a6 100644 --- a/mopidy/outputs/__init__.py +++ b/mopidy/outputs/__init__.py @@ -1,49 +1,67 @@ -import logging - import pygst pygst.require('0.10') import gst -logger = logging.getLogger('mopidy.outputs') +import logging +logger = logging.getLogger('mopidy.outputs') class BaseOutput(object): """Base class for providing support for multiple pluggable outputs.""" - def connect_bin(self, pipeline, element): + MESSAGE_EOS = gst.MESSAGE_EOS + MESSAGE_ERROR = gst.MESSAGE_ERROR + MESSAGE_WARNING = gst.MESSAGE_WARNING + + def __init__(self, gstreamer): + self.gstreamer = gstreamer + self.bin = self.build_bin() + self.bin.set_name(self.get_name()) + + self.modify_bin() + + def build_bin(self): """ - Connect output bin to pipeline and given element. - - In normal cases the element will probably be a `tee`, - thus allowing us to connect any number of outputs. This - however is why each bin is forced to have its own `queue` - after the `tee`. - - :param pipeline: gst.Pipeline to add output to. - :type pipeline: :class:`gst.Pipeline` - :param element: gst.Element in pipeline to connect output to. - :type element: :class:`gst.Element` + Build output bin that will attached to pipeline. """ description = 'queue ! %s' % self.describe_bin() - logger.debug('Adding new output to tee: %s', description) + logger.debug('Creating new output: %s', description) - output = gst.parse_bin_from_description(description, True) - self.modify_bin(output) + return gst.parse_bin_from_description(description, True) - pipeline.add(output) - output.sync_state_with_parent() # Required to add to running pipe - gst.element_link_many(element, output) + def connect(self): + """Attach output to GStreamer pipeline""" + self.gstreamer.connect_output(self.bin) + self.on_connect() - def modify_bin(self, output): + def on_connect(self): + """Called after output has been connected to GStreamer pipeline""" + pass + + def remove(self): + """Remove output from GStreamer pipeline""" + self.gstreamer.remove_output(self.bin) + self.on_remove() + + def on_remove(self): + """Called after output has been remove from GStreamer pipeline""" + pass + + def get_name(self): """ - Modifies bin before it is installed if needed. + Return name of output in gstreamer context. + + Defaults to class name, can be overriden by subclasses if required. + """ + return self.__class__.__name__ + + def modify_bin(self): + """ + Modifies ``self.bin`` before it is installed if needed. Overriding this method allows for outputs to modify the constructed bin before it is installed. This can for instance be a good place to call `set_properties` on elements that need to be configured. - - :param output: gst.Bin to modify in some way. - :type output: :class:`gst.Bin` """ pass diff --git a/mopidy/outputs/shoutcast.py b/mopidy/outputs/shoutcast.py index d13b1085..4298bba5 100644 --- a/mopidy/outputs/shoutcast.py +++ b/mopidy/outputs/shoutcast.py @@ -1,6 +1,10 @@ +import logging + from mopidy import settings from mopidy.outputs import BaseOutput +logger = logging.getLogger('mopidy.outputs.shoutcast') + class ShoutcastOutput(BaseOutput): """ Shoutcast streaming output. @@ -15,11 +19,27 @@ class ShoutcastOutput(BaseOutput): return 'audioconvert ! %s ! shout2send name=shoutcast' \ % settings.SHOUTCAST_OUTPUT_ENCODER - def modify_bin(self, output): - self.set_properties(output.get_by_name('shoutcast'), { + def modify_bin(self): + self.set_properties(self.bin.get_by_name('shoutcast'), { u'ip': settings.SHOUTCAST_OUTPUT_SERVER, u'mount': settings.SHOUTCAST_OUTPUT_MOUNT, u'port': settings.SHOUTCAST_OUTPUT_PORT, u'username': settings.SHOUTCAST_OUTPUT_USERNAME, u'password': settings.SHOUTCAST_OUTPUT_PASSWORD, }) + + def on_connect(self): + self.gstreamer.connect_message_handler( + self.bin.get_by_name('shoutcast'), self.message_handler) + + def on_remove(self): + self.gstreamer.remove_message_handler( + self.bin.get_by_name('shoutcast')) + + def message_handler(self, message): + if message.type != self.MESSAGE_ERROR: + return False + error, debug = message.parse_error() + logger.warning('%s (%s)', error, debug) + self.remove() + return True