diff --git a/docs/changelog.rst b/docs/changelog.rst index 7eefd533..f2182e33 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -18,6 +18,14 @@ v0.20.0 (UNRELEASED) - In stored playlist names, replace "/", which are illegal, with "|" instead of a whitespace. Pipes are more similar to forward slash. +**Audio** + +- Internal code cleanup within audio sub-system: + - Started splitting audio code into smaller better defined pieces. + - Improved GStreamer related debug logging. + - Provide better error messages for missing plugins. + - Add foundation for trying to re-add multiple output support. + v0.19.4 (2014-09-01) ==================== diff --git a/mopidy/audio/actor.py b/mopidy/audio/actor.py index fb8a0306..9db6fbf5 100644 --- a/mopidy/audio/actor.py +++ b/mopidy/audio/actor.py @@ -51,6 +51,10 @@ MB = 1 << 20 PLAYBIN_FLAGS = (1 << 1) | (1 << 4) | (1 << 7) PLAYBIN_VIS_FLAGS = PLAYBIN_FLAGS | (1 << 3) +# These are just to long to wrap nicely, so rename them locally. +_get_missing_description = gst.pbutils.missing_plugin_message_get_description +_get_missing_detail = gst.pbutils.missing_plugin_message_get_installer_detail + class _Signals(object): """Helper for tracking gobject signal registrations""" @@ -155,7 +159,7 @@ class _Outputs(gst.Bin): # the actual switch, i.e. about to switch can block for longer thanks # to this queue. # TODO: make the min-max values a setting? - # TODO: move out of this class? + # TODO: this does not belong in this class. queue = gst.element_factory_make('queue') queue.set_property('max-size-buffers', 0) queue.set_property('max-size-bytes', 0) @@ -168,9 +172,11 @@ class _Outputs(gst.Bin): ghost_pad = gst.GhostPad('sink', queue.get_pad('sink')) self.add_pad(ghost_pad) - # Add an always connected fakesink so the tee doesn't fail. - # XXX disabled for now as we get one stream changed per sink... - # self._add(gst.element_factory_make('fakesink')) + # Add an always connected fakesink which respects the clock so the tee + # doesn't fail even if we don't have any outputs. + fakesink = gst.element_factory_make('fakesink') + fakesink.set_property('sync', True) + self._add(fakesink) def add_output(self, description): # XXX This only works for pipelines not in use until #790 gets done. @@ -255,6 +261,154 @@ def setup_proxy(element, config): element.set_property('proxy-pw', config.get('password')) +class _Handler(object): + def __init__(self, audio): + self._audio = audio + self._element = None + self._pad = None + self._message_handler_id = None + self._event_handler_id = None + + def setup_message_handling(self, element): + self._element = element + bus = element.get_bus() + bus.add_signal_watch() + self._message_handler_id = bus.connect('message', self.on_message) + + def setup_event_handling(self, pad): + self._pad = pad + self._event_handler_id = pad.add_event_probe(self.on_event) + + def teardown_message_handling(self): + bus = self._element.get_bus() + bus.remove_signal_watch() + bus.disconnect(self._message_handler_id) + self._message_handler_id = None + + def teardown_event_handling(self): + self._pad.remove_event_probe(self._event_handler_id) + self._event_handler_id = None + + def on_message(self, bus, msg): + if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._element: + self.on_playbin_state_changed(*msg.parse_state_changed()) + elif msg.type == gst.MESSAGE_BUFFERING: + self.on_buffering(msg.parse_buffering()) + elif msg.type == gst.MESSAGE_EOS: + self.on_end_of_stream() + elif msg.type == gst.MESSAGE_ERROR: + self.on_error(*msg.parse_error()) + elif msg.type == gst.MESSAGE_WARNING: + self.on_warning(*msg.parse_warning()) + elif msg.type == gst.MESSAGE_ASYNC_DONE: + self.on_async_done() + elif msg.type == gst.MESSAGE_ELEMENT: + if gst.pbutils.is_missing_plugin_message(msg): + self.on_missing_plugin(_get_missing_description(msg), + _get_missing_detail(msg)) + + def on_event(self, pad, event): + if event.type == gst.EVENT_NEWSEGMENT: + self.on_new_segment(*event.parse_new_segment()) + elif event.type == gst.EVENT_SINK_MESSAGE: + # Handle stream changed messages when they reach our output bin. + # If we listen for it on the bus we get one per tee branch. + msg = event.parse_sink_message() + if msg.structure.has_name('playbin2-stream-changed'): + self.on_stream_changed(msg.structure['uri']) + return True + + def on_playbin_state_changed(self, old_state, new_state, pending_state): + gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s', + old_state.value_name, new_state.value_name, + pending_state.value_name) + + if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL: + # XXX: We're not called on the last state change when going down to + # NULL, so we rewrite the second to last call to get the expected + # behavior. + new_state = gst.STATE_NULL + pending_state = gst.STATE_VOID_PENDING + + if pending_state != gst.STATE_VOID_PENDING: + return # Ignore intermediate state changes + + if new_state == gst.STATE_READY: + return # Ignore READY state as it's GStreamer specific + + new_state = _GST_STATE_MAPPING[new_state] + old_state, self._audio.state = self._audio.state, new_state + + target_state = _GST_STATE_MAPPING[self._audio._target_state] + if target_state == new_state: + target_state = None + + logger.debug('Audio event: state_changed(old_state=%s, new_state=%s, ' + 'target_state=%s)', old_state, new_state, target_state) + AudioListener.send('state_changed', old_state=old_state, + new_state=new_state, target_state=target_state) + if new_state == PlaybackState.STOPPED: + logger.debug('Audio event: stream_changed(uri=None)') + AudioListener.send('stream_changed', uri=None) + + def on_buffering(self, percent): + gst_logger.debug('Got buffering message: percent=%d%%', percent) + + if percent < 10 and not self._audio._buffering: + self._audio._playbin.set_state(gst.STATE_PAUSED) + self._audio._buffering = True + if percent == 100: + self._audio._buffering = False + if self._audio._target_state == gst.STATE_PLAYING: + self._audio._playbin.set_state(gst.STATE_PLAYING) + + def on_end_of_stream(self): + gst_logger.debug('Got end-of-stream message.') + logger.debug('Audio event: reached_end_of_stream()') + AudioListener.send('reached_end_of_stream') + + def on_error(self, error, debug): + gst_logger.error(str(error).decode('utf-8')) + if debug: + gst_logger.debug(debug.decode('utf-8')) + # TODO: is this needed? + self._audio.stop_playback() + + def on_warning(self, error, debug): + gst_logger.warning(str(error).decode('utf-8')) + if debug: + gst_logger.debug(debug.decode('utf-8')) + + def on_async_done(self): + gst_logger.debug('Got async-done.') + + def on_missing_plugin(self, msg): + desc = gst.pbutils.missing_plugin_message_get_description(msg) + debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg) + + gst_logger.debug('Got missing-plugin message: description:%s', desc) + logger.warning('Could not find a %s to handle media.', desc) + if gst.pbutils.install_plugins_supported(): + logger.info('You might be able to fix this by running: ' + 'gst-installer "%s"', debug) + # TODO: store the missing plugins installer info in a file so we can + # can provide a 'mopidy install-missing-plugins' if the system has the + # required helper installed? + + def on_new_segment(self, update, rate, format_, start, stop, position): + gst_logger.debug('Got new-segment event: update=%s rate=%s format=%s' + 'start=%s stop=%s position=%s', update, rate, + format_.value_name, start, stop, position) + position_ms = position // gst.MSECOND + logger.debug('Audio event: position_changed(position=%s)', position_ms) + AudioListener.send('position_changed', position=position_ms) + + def on_stream_changed(self, uri): + gst_logger.debug('Got stream-changed message: uri=%s', uri) + logger.debug('Audio event: stream_changed(uri=%s)', uri) + AudioListener.send('stream_changed', uri=uri) + + # TODO: create a player class which replaces the actors internals class Audio(pykka.ThreadingActor): """ @@ -278,6 +432,7 @@ class Audio(pykka.ThreadingActor): self._outputs = None self._about_to_finish_callback = None + self._handler = _Handler(self) self._appsrc = _Appsrc() self._signals = _Signals() @@ -291,13 +446,11 @@ class Audio(pykka.ThreadingActor): self._setup_output() self._setup_mixer() self._setup_visualizer() - self._setup_message_processor() except gobject.GError as ex: logger.exception(ex) process.exit_process() def on_stop(self): - self._teardown_message_processor() self._teardown_mixer() self._teardown_playbin() @@ -319,39 +472,32 @@ class Audio(pykka.ThreadingActor): playbin.set_property('buffer-duration', 2*gst.SECOND) self._signals.connect(playbin, 'source-setup', self._on_source_setup) - self._signals.connect( - playbin, 'about-to-finish', self._on_about_to_finish) + self._signals.connect(playbin, 'about-to-finish', + self._on_about_to_finish) self._playbin = playbin + self._handler.setup_message_handling(playbin) def _teardown_playbin(self): + self._handler.teardown_message_handling() + self._handler.teardown_event_handling() self._signals.disconnect(self._playbin, 'about-to-finish') self._signals.disconnect(self._playbin, 'source-setup') self._playbin.set_state(gst.STATE_NULL) - def _on_about_to_finish(self, element): - gst_logger.debug('Got about-to-finish event.') - if self._about_to_finish_callback: - self._about_to_finish_callback() - - def _on_source_setup(self, element, source): - gst_logger.debug('Got source-setup: element=%s', source) - - if source.get_factory().get_name() == 'appsrc': - self._appsrc.configure(source) - else: - self._appsrc.reset() - - if hasattr(source.props, 'proxy'): - setup_proxy(source, self._config['proxy']) - def _setup_output(self): - self._outputs = _Outputs() - self._outputs.get_pad('sink').add_event_probe(self._on_pad_event) - try: - self._outputs.add_output(self._config['audio']['output']) - except exceptions.AudioException: - process.exit_process() # TODO: move this up the chain + # We don't want to use outputs for regular testing, so just install + # an unsynced fakesink when someone asks for a 'testoutput'. + if self._config['audio']['output'] == 'testoutput': + self._outputs = gst.element_factory_make('fakesink') + else: + self._outputs = _Outputs() + try: + self._outputs.add_output(self._config['audio']['output']) + except exceptions.AudioException: + process.exit_process() # TODO: move this up the chain + + self._handler.setup_event_handling(self._outputs.get_pad('sink')) self._playbin.set_property('audio-sink', self._outputs) def _setup_mixer(self): @@ -377,126 +523,22 @@ class Audio(pykka.ThreadingActor): 'Failed to create audio visualizer "%s": %s', visualizer_element, ex) - def _setup_message_processor(self): - bus = self._playbin.get_bus() - bus.add_signal_watch() - self._signals.connect(bus, 'message', self._on_message) + def _on_about_to_finish(self, element): + gst_logger.debug('Got about-to-finish event.') + if self._about_to_finish_callback: + logger.debug('Running about to finish callback.') + self._about_to_finish_callback() - def _teardown_message_processor(self): - bus = self._playbin.get_bus() - self._signals.disconnect(bus, 'message') - bus.remove_signal_watch() + def _on_source_setup(self, element, source): + gst_logger.debug('Got source-setup: element=%s', source) - def _on_pad_event(self, pad, event): - if event.type == gst.EVENT_NEWSEGMENT: - update, rate, format_, start, stop, pos = event.parse_new_segment() - gst_logger.debug('Got new-segment event: update=%s rate=%s ' - 'format=%s start=%s stop=%s position=%s', update, - rate, format_.value_name, start, stop, pos) - pos_ms = pos // gst.MSECOND - logger.debug('Triggering: position_changed(position=%s)', pos_ms) - AudioListener.send('position_changed', position=pos_ms) + if source.get_factory().get_name() == 'appsrc': + self._appsrc.configure(source) + else: + self._appsrc.reset() - return True - - # TODO: consider splitting this out while we are at it. - def _on_message(self, bus, msg): - if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._playbin: - self._on_playbin_state_changed(*msg.parse_state_changed()) - elif msg.type == gst.MESSAGE_BUFFERING: - self._on_buffering(msg.parse_buffering()) - elif msg.type == gst.MESSAGE_EOS: - self._on_end_of_stream() - elif msg.type == gst.MESSAGE_ERROR: - self._on_error(*msg.parse_error()) - elif msg.type == gst.MESSAGE_WARNING: - self._on_warning(*msg.parse_warning()) - elif msg.type == gst.MESSAGE_ASYNC_DONE: - gst_logger.debug('Got async-done message.') - elif msg.type == gst.MESSAGE_ELEMENT: - if msg.structure.has_name('playbin2-stream-changed'): - self._on_stream_changed(msg.structure['uri']) - elif gst.pbutils.is_missing_plugin_message(msg): - self._on_missing_plugin(msg) - - def _on_playbin_state_changed(self, old_state, new_state, pending_state): - gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s', - old_state.value_name, new_state.value_name, - pending_state.value_name) - - if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL: - # XXX: We're not called on the last state change when going down to - # NULL, so we rewrite the second to last call to get the expected - # behavior. - new_state = gst.STATE_NULL - pending_state = gst.STATE_VOID_PENDING - - if pending_state != gst.STATE_VOID_PENDING: - return # Ignore intermediate state changes - - if new_state == gst.STATE_READY: - return # Ignore READY state as it's GStreamer specific - - new_state = _GST_STATE_MAPPING[new_state] - old_state, self.state = self.state, new_state - - target_state = _GST_STATE_MAPPING[self._target_state] - if target_state == new_state: - target_state = None - - logger.debug('Triggering: state_changed(old_state=%s, new_state=%s, ' - 'target_state=%s)', old_state, new_state, target_state) - AudioListener.send('state_changed', old_state=old_state, - new_state=new_state, target_state=target_state) - if new_state == PlaybackState.STOPPED: - logger.debug('Triggering: stream_changed(uri=None)') - AudioListener.send('stream_changed', uri=None) - - def _on_buffering(self, percent): - gst_logger.debug('Got buffering message: percent=%d%%', percent) - - if percent < 10 and not self._buffering: - self._playbin.set_state(gst.STATE_PAUSED) - self._buffering = True - if percent == 100: - self._buffering = False - if self._target_state == gst.STATE_PLAYING: - self._playbin.set_state(gst.STATE_PLAYING) - - def _on_end_of_stream(self): - gst_logger.debug('Got end-of-stream message.') - logger.debug('Triggering: reached_end_of_stream()') - AudioListener.send('reached_end_of_stream') - - def _on_error(self, error, debug): - gst_logger.error(str(error).decode('utf-8')) - if debug: - gst_logger.debug(debug.decode('utf-8')) - # TODO: is this needed? - self.stop_playback() - - def _on_warning(self, error, debug): - gst_logger.warning(str(error).decode('utf-8')) - if debug: - gst_logger.debug(debug.decode('utf-8')) - - def _on_stream_changed(self, uri): - gst_logger.debug('Got stream-changed message: uri:%s', uri) - logger.debug('Triggering: stream_changed(uri=%s)', uri) - AudioListener.send('stream_changed', uri=uri) - - def _on_missing_plugin(self, msg): - desc = gst.pbutils.missing_plugin_message_get_description(msg) - debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg) - - gst_logger.debug('Got missing-plugin message: description:%s', desc) - logger.warning('Could not find a %s to handle media.', desc) - if gst.pbutils.install_plugins_supported(): - logger.info('You might be able to fix this by running: ' - 'gst-installer "%s"', debug) - # TODO: store the missing plugins installer info in a file so we can - # can provide a 'mopidy install-missing-plugins' if the system has the - # required helper installed? + if hasattr(source.props, 'proxy'): + setup_proxy(source, self._config['proxy']) def set_uri(self, uri): """ @@ -648,7 +690,7 @@ class Audio(pykka.ThreadingActor): Should only be used by tests. """ def sync_handler(bus, message): - self._on_message(bus, message) + self._handler.on_message(bus, message) return gst.BUS_DROP bus = self._playbin.get_bus() diff --git a/tests/audio/test_actor.py b/tests/audio/test_actor.py index 2426f54e..8db7f61f 100644 --- a/tests/audio/test_actor.py +++ b/tests/audio/test_actor.py @@ -21,11 +21,9 @@ from mopidy.utils.path import path_to_uri from tests import path_to_data_dir -""" -We want to make sure both our real audio class and the fake one behave -correctly. So each test is first run against the real class, then repeated -against our dummy. -""" +# We want to make sure both our real audio class and the fake one behave +# correctly. So each test is first run against the real class, then repeated +# against our dummy. class BaseTest(unittest.TestCase): @@ -34,7 +32,7 @@ class BaseTest(unittest.TestCase): 'mixer': 'fakemixer track_max_volume=65536', 'mixer_track': None, 'mixer_volume': None, - 'output': 'fakesink', + 'output': 'testoutput', 'visualizer': None, } } @@ -49,7 +47,7 @@ class BaseTest(unittest.TestCase): 'audio': { 'mixer': 'foomixer', 'mixer_volume': None, - 'output': 'fakesink', + 'output': 'testoutput', 'visualizer': None, }, 'proxy': { @@ -359,6 +357,7 @@ class AudioEventTest(BaseTest): self.audio.wait_for_state_change().get() self.possibly_trigger_fake_about_to_finish() + self.audio.wait_for_state_change().get() if not done.wait(timeout=1.0): self.fail('EOS not received') @@ -407,17 +406,17 @@ class AudioStateTest(unittest.TestCase): self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) def test_state_does_not_change_when_in_gst_ready_state(self): - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_NULL, gst.STATE_READY, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) def test_state_changes_from_stopped_to_playing_on_play(self): - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_NULL, gst.STATE_READY, gst.STATE_PLAYING) - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING) - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PAUSED, gst.STATE_PLAYING, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.PLAYING, self.audio.state) @@ -425,7 +424,7 @@ class AudioStateTest(unittest.TestCase): def test_state_changes_from_playing_to_paused_on_pause(self): self.audio.state = audio.PlaybackState.PLAYING - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.PAUSED, self.audio.state) @@ -433,12 +432,12 @@ class AudioStateTest(unittest.TestCase): def test_state_changes_from_playing_to_stopped_on_stop(self): self.audio.state = audio.PlaybackState.PLAYING - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_NULL) - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PAUSED, gst.STATE_READY, gst.STATE_NULL) # We never get the following call, so the logic must work without it - # self.audio._on_playbin_state_changed( + # self.audio._handler.on_playbin_state_changed( # gst.STATE_READY, gst.STATE_NULL, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) @@ -455,7 +454,7 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() - self.audio._on_buffering(0) + self.audio._handler.on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) self.assertTrue(self.audio._buffering) @@ -465,7 +464,7 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PAUSED) playbin.set_state.reset_mock() - self.audio._on_buffering(100) + self.audio._handler.on_buffering(100) self.assertEqual(playbin.set_state.call_count, 0) self.assertFalse(self.audio._buffering) @@ -475,12 +474,12 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() - self.audio._on_buffering(0) + self.audio._handler.on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) self.audio.pause_playback() playbin.set_state.reset_mock() - self.audio._on_buffering(100) + self.audio._handler.on_buffering(100) self.assertEqual(playbin.set_state.call_count, 0) self.assertFalse(self.audio._buffering) @@ -490,7 +489,7 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() - self.audio._on_buffering(0) + self.audio._handler.on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) playbin.set_state.reset_mock()