From ac5bf9af1795e9d8e33a42092340f4341a81815e Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Mon, 4 Aug 2014 23:51:47 +0200 Subject: [PATCH] audio: Move most of event handling out of audio. Some of the signal handling still needs to be moved. --- mopidy/audio/actor.py | 313 +++++++++++++++++++++----------------- tests/audio/test_actor.py | 27 ++-- 2 files changed, 185 insertions(+), 155 deletions(-) diff --git a/mopidy/audio/actor.py b/mopidy/audio/actor.py index 029913ac..8d86858b 100644 --- a/mopidy/audio/actor.py +++ b/mopidy/audio/actor.py @@ -51,6 +51,10 @@ MB = 1 << 20 PLAYBIN_FLAGS = (1 << 1) | (1 << 4) | (1 << 7) PLAYBIN_VIS_FLAGS = PLAYBIN_FLAGS | (1 << 3) +# These are just to long to wrap nicely, so rename them locally. +_get_missing_description = gst.pbutils.missing_plugin_message_get_description +_get_missing_detail = gst.pbutils.missing_plugin_message_get_installer_detail + class _Signals(object): """Helper for tracking gobject signal registrations""" @@ -257,6 +261,153 @@ def setup_proxy(element, config): element.set_property('proxy-pw', config.get('password')) +class _Handler(object): + def __init__(self, audio): + self._audio = audio + self._element = None + self._pad = None + self._message_handler_id = None + self._event_handler_id = None + + def setup_message_handling(self, element): + self._element = element + bus = element.get_bus() + bus.add_signal_watch() + self._message_handler_id = bus.connect('message', self.on_message) + + def setup_event_handling(self, pad): + self._pad = pad + self._event_handler_id = pad.add_event_probe(self.on_event) + + def teardown(self): + bus = self._element.get_bus() + bus.remove_signal_watch() + bus.disconnect(self._message_handler_id) + self._pad.remove_event_probe(self._event_handler_id) + + self._message_handler_id = None + self._event_handler_id = None + + def on_message(self, bus, msg): + if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._element: + self.on_playbin_state_changed(*msg.parse_state_changed()) + elif msg.type == gst.MESSAGE_BUFFERING: + self.on_buffering(msg.parse_buffering()) + elif msg.type == gst.MESSAGE_EOS: + self.on_end_of_stream() + elif msg.type == gst.MESSAGE_ERROR: + self.on_error(*msg.parse_error()) + elif msg.type == gst.MESSAGE_WARNING: + self.on_warning(*msg.parse_warning()) + elif msg.type == gst.MESSAGE_ASYNC_DONE: + self.on_async_done() + elif msg.type == gst.MESSAGE_ELEMENT: + if gst.pbutils.is_missing_plugin_message(msg): + self.on_missing_plugin(_get_missing_description(msg), + _get_missing_detail(msg)) + + def on_event(self, pad, event): + if event.type == gst.EVENT_NEWSEGMENT: + self.on_new_segment(*event.parse_new_segment()) + elif event.type == gst.EVENT_SINK_MESSAGE: + # Handle stream changed messages when they reach our output bin. + # If we listen for it on the bus we get one per tee branch. + msg = event.parse_sink_message() + if msg.structure.has_name('playbin2-stream-changed'): + self.on_stream_changed(msg.structure['uri']) + return True + + def on_playbin_state_changed(self, old_state, new_state, pending_state): + gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s', + old_state.value_name, new_state.value_name, + pending_state.value_name) + + if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL: + # XXX: We're not called on the last state change when going down to + # NULL, so we rewrite the second to last call to get the expected + # behavior. + new_state = gst.STATE_NULL + pending_state = gst.STATE_VOID_PENDING + + if pending_state != gst.STATE_VOID_PENDING: + return # Ignore intermediate state changes + + if new_state == gst.STATE_READY: + return # Ignore READY state as it's GStreamer specific + + new_state = _GST_STATE_MAPPING[new_state] + old_state, self._audio.state = self._audio.state, new_state + + target_state = _GST_STATE_MAPPING[self._audio._target_state] + if target_state == new_state: + target_state = None + + logger.debug('Triggering: state_changed(old_state=%s, new_state=%s, ' + 'target_state=%s)', old_state, new_state, target_state) + AudioListener.send('state_changed', old_state=old_state, + new_state=new_state, target_state=target_state) + if new_state == PlaybackState.STOPPED: + logger.debug('Triggering: stream_changed(uri=None)') + AudioListener.send('stream_changed', uri=None) + + def on_buffering(self, percent): + gst_logger.debug('Got buffering message: percent=%d%%', percent) + + if percent < 10 and not self._audio._buffering: + self._audio._playbin.set_state(gst.STATE_PAUSED) + self._audio._buffering = True + if percent == 100: + self._audio._buffering = False + if self._audio._target_state == gst.STATE_PLAYING: + self._audio._playbin.set_state(gst.STATE_PLAYING) + + def on_end_of_stream(self): + gst_logger.debug('Got end-of-stream message.') + logger.debug('Triggering: reached_end_of_stream()') + AudioListener.send('reached_end_of_stream') + + def on_error(self, error, debug): + gst_logger.error(str(error).decode('utf-8')) + if debug: + gst_logger.debug(debug.decode('utf-8')) + # TODO: is this needed? + self._audio.stop_playback() + + def on_warning(self, error, debug): + gst_logger.warning(str(error).decode('utf-8')) + if debug: + gst_logger.debug(debug.decode('utf-8')) + + def on_async_done(self): + gst_logger.debug('Got async-done.') + + def on_missing_plugin(self, msg): + desc = gst.pbutils.missing_plugin_message_get_description(msg) + debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg) + + gst_logger.debug('Got missing-plugin message: description:%s', desc) + logger.warning('Could not find a %s to handle media.', desc) + if gst.pbutils.install_plugins_supported(): + logger.info('You might be able to fix this by running: ' + 'gst-installer "%s"', debug) + # TODO: store the missing plugins installer info in a file so we can + # can provide a 'mopidy install-missing-plugins' if the system has the + # required helper installed? + + def on_new_segment(self, update, rate, format_, start, stop, position): + gst_logger.debug('Got new-segment event: update=%s rate=%s format=%s' + 'start=%s stop=%s position=%s', update, rate, + format_.value_name, start, stop, position) + position_ms = position // gst.MSECOND + logger.debug('Triggering: position_changed(position=%s)', position_ms) + AudioListener.send('position_changed', position=position_ms) + + def on_stream_changed(self, uri): + gst_logger.debug('Got stream-changed message: uri:%s', uri) + logger.debug('Triggering: stream_changed(uri=%s)', uri) + AudioListener.send('stream_changed', uri=uri) + + # TODO: create a player class which replaces the actors internals class Audio(pykka.ThreadingActor): """ @@ -280,6 +431,7 @@ class Audio(pykka.ThreadingActor): self._outputs = None self._about_to_finish_callback = None + self._handler = _Handler(self) self._appsrc = _Appsrc() self._signals = _Signals() @@ -293,13 +445,11 @@ class Audio(pykka.ThreadingActor): self._setup_output() self._setup_mixer() self._setup_visualizer() - self._setup_message_processor() except gobject.GError as ex: logger.exception(ex) process.exit_process() def on_stop(self): - self._teardown_message_processor() self._teardown_mixer() self._teardown_playbin() @@ -321,32 +471,18 @@ class Audio(pykka.ThreadingActor): playbin.set_property('buffer-duration', 2*gst.SECOND) self._signals.connect(playbin, 'source-setup', self._on_source_setup) - self._signals.connect( - playbin, 'about-to-finish', self._on_about_to_finish) + self._signals.connect(playbin, 'about-to-finish', + self._on_about_to_finish) self._playbin = playbin + self._handler.setup_message_handling(playbin) def _teardown_playbin(self): + self._handler.teardown() self._signals.disconnect(self._playbin, 'about-to-finish') self._signals.disconnect(self._playbin, 'source-setup') self._playbin.set_state(gst.STATE_NULL) - def _on_about_to_finish(self, element): - gst_logger.debug('Got about-to-finish event.') - if self._about_to_finish_callback: - self._about_to_finish_callback() - - def _on_source_setup(self, element, source): - gst_logger.debug('Got source-setup: element=%s', source) - - if source.get_factory().get_name() == 'appsrc': - self._appsrc.configure(source) - else: - self._appsrc.reset() - - if hasattr(source.props, 'proxy'): - setup_proxy(source, self._config['proxy']) - def _setup_output(self): # We don't want to test outputs for regular testing, so just instal # an unsynced fakesink when someone asks for a testouput. @@ -359,7 +495,7 @@ class Audio(pykka.ThreadingActor): except exceptions.AudioException: process.exit_process() # TODO: move this up the chain - self._outputs.get_pad('sink').add_event_probe(self._on_pad_event) + self._handler.setup_event_handling(self._outputs.get_pad('sink')) self._playbin.set_property('audio-sink', self._outputs) def _setup_mixer(self): @@ -385,129 +521,22 @@ class Audio(pykka.ThreadingActor): 'Failed to create audio visualizer "%s": %s', visualizer_element, ex) - def _setup_message_processor(self): - bus = self._playbin.get_bus() - bus.add_signal_watch() - self._signals.connect(bus, 'message', self._on_message) + def _on_about_to_finish(self, element): + gst_logger.debug('Got about-to-finish event.') + if self._about_to_finish_callback: + logger.debug('Running about to finish callback.') + self._about_to_finish_callback() - def _teardown_message_processor(self): - bus = self._playbin.get_bus() - self._signals.disconnect(bus, 'message') - bus.remove_signal_watch() + def _on_source_setup(self, element, source): + gst_logger.debug('Got source-setup: element=%s', source) - def _on_pad_event(self, pad, event): - if event.type == gst.EVENT_NEWSEGMENT: - update, rate, format_, start, stop, pos = event.parse_new_segment() - gst_logger.debug('Got new-segment event: update=%s rate=%s ' - 'format=%s start=%s stop=%s position=%s', update, - rate, format_.value_name, start, stop, pos) - pos_ms = pos // gst.MSECOND - logger.debug('Triggering: position_changed(position=%s)', pos_ms) - AudioListener.send('position_changed', position=pos_ms) - elif event.type == gst.EVENT_SINK_MESSAGE: - # Handle stream changed messages when they reach our output bin. - # If we listen for it on the bus we get one per tee branch. - msg = event.parse_sink_message() - if msg.structure.has_name('playbin2-stream-changed'): - self._on_stream_changed(msg.structure['uri']) - return True + if source.get_factory().get_name() == 'appsrc': + self._appsrc.configure(source) + else: + self._appsrc.reset() - # TODO: consider splitting this out while we are at it. - def _on_message(self, bus, msg): - if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._playbin: - self._on_playbin_state_changed(*msg.parse_state_changed()) - elif msg.type == gst.MESSAGE_BUFFERING: - self._on_buffering(msg.parse_buffering()) - elif msg.type == gst.MESSAGE_EOS: - self._on_end_of_stream() - elif msg.type == gst.MESSAGE_ERROR: - self._on_error(*msg.parse_error()) - elif msg.type == gst.MESSAGE_WARNING: - self._on_warning(*msg.parse_warning()) - elif msg.type == gst.MESSAGE_ASYNC_DONE: - gst_logger.debug('Got async-done message.') - elif msg.type == gst.MESSAGE_ELEMENT: - if gst.pbutils.is_missing_plugin_message(msg): - self._on_missing_plugin(msg) - - def _on_playbin_state_changed(self, old_state, new_state, pending_state): - gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s', - old_state.value_name, new_state.value_name, - pending_state.value_name) - - if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL: - # XXX: We're not called on the last state change when going down to - # NULL, so we rewrite the second to last call to get the expected - # behavior. - new_state = gst.STATE_NULL - pending_state = gst.STATE_VOID_PENDING - - if pending_state != gst.STATE_VOID_PENDING: - return # Ignore intermediate state changes - - if new_state == gst.STATE_READY: - return # Ignore READY state as it's GStreamer specific - - new_state = _GST_STATE_MAPPING[new_state] - old_state, self.state = self.state, new_state - - target_state = _GST_STATE_MAPPING[self._target_state] - if target_state == new_state: - target_state = None - - logger.debug('Triggering: state_changed(old_state=%s, new_state=%s, ' - 'target_state=%s)', old_state, new_state, target_state) - AudioListener.send('state_changed', old_state=old_state, - new_state=new_state, target_state=target_state) - if new_state == PlaybackState.STOPPED: - logger.debug('Triggering: stream_changed(uri=None)') - AudioListener.send('stream_changed', uri=None) - - def _on_buffering(self, percent): - gst_logger.debug('Got buffering message: percent=%d%%', percent) - - if percent < 10 and not self._buffering: - self._playbin.set_state(gst.STATE_PAUSED) - self._buffering = True - if percent == 100: - self._buffering = False - if self._target_state == gst.STATE_PLAYING: - self._playbin.set_state(gst.STATE_PLAYING) - - def _on_end_of_stream(self): - gst_logger.debug('Got end-of-stream message.') - logger.debug('Triggering: reached_end_of_stream()') - AudioListener.send('reached_end_of_stream') - - def _on_error(self, error, debug): - gst_logger.error(str(error).decode('utf-8')) - if debug: - gst_logger.debug(debug.decode('utf-8')) - # TODO: is this needed? - self.stop_playback() - - def _on_warning(self, error, debug): - gst_logger.warning(str(error).decode('utf-8')) - if debug: - gst_logger.debug(debug.decode('utf-8')) - - def _on_stream_changed(self, uri): - gst_logger.debug('Got stream-changed message: uri:%s', uri) - logger.debug('Triggering: stream_changed(uri=%s)', uri) - AudioListener.send('stream_changed', uri=uri) - - def _on_missing_plugin(self, msg): - desc = gst.pbutils.missing_plugin_message_get_description(msg) - debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg) - - gst_logger.debug('Got missing-plugin message: description:%s', desc) - logger.warning('Could not find a %s to handle media.', desc) - if gst.pbutils.install_plugins_supported(): - logger.info('You might be able to fix this by running: ' - 'gst-installer "%s"', debug) - # TODO: store the missing plugins installer info in a file so we can - # can provide a 'mopidy install-missing-plugins' if the system has the - # required helper installed? + if hasattr(source.props, 'proxy'): + setup_proxy(source, self._config['proxy']) def set_uri(self, uri): """ @@ -659,7 +688,7 @@ class Audio(pykka.ThreadingActor): Should only be used by tests. """ def sync_handler(bus, message): - self._on_message(bus, message) + self._handler.on_message(bus, message) return gst.BUS_DROP bus = self._playbin.get_bus() diff --git a/tests/audio/test_actor.py b/tests/audio/test_actor.py index fc3321d2..8db7f61f 100644 --- a/tests/audio/test_actor.py +++ b/tests/audio/test_actor.py @@ -357,6 +357,7 @@ class AudioEventTest(BaseTest): self.audio.wait_for_state_change().get() self.possibly_trigger_fake_about_to_finish() + self.audio.wait_for_state_change().get() if not done.wait(timeout=1.0): self.fail('EOS not received') @@ -405,17 +406,17 @@ class AudioStateTest(unittest.TestCase): self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) def test_state_does_not_change_when_in_gst_ready_state(self): - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_NULL, gst.STATE_READY, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) def test_state_changes_from_stopped_to_playing_on_play(self): - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_NULL, gst.STATE_READY, gst.STATE_PLAYING) - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING) - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PAUSED, gst.STATE_PLAYING, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.PLAYING, self.audio.state) @@ -423,7 +424,7 @@ class AudioStateTest(unittest.TestCase): def test_state_changes_from_playing_to_paused_on_pause(self): self.audio.state = audio.PlaybackState.PLAYING - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.PAUSED, self.audio.state) @@ -431,12 +432,12 @@ class AudioStateTest(unittest.TestCase): def test_state_changes_from_playing_to_stopped_on_stop(self): self.audio.state = audio.PlaybackState.PLAYING - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_NULL) - self.audio._on_playbin_state_changed( + self.audio._handler.on_playbin_state_changed( gst.STATE_PAUSED, gst.STATE_READY, gst.STATE_NULL) # We never get the following call, so the logic must work without it - # self.audio._on_playbin_state_changed( + # self.audio._handler.on_playbin_state_changed( # gst.STATE_READY, gst.STATE_NULL, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) @@ -453,7 +454,7 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() - self.audio._on_buffering(0) + self.audio._handler.on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) self.assertTrue(self.audio._buffering) @@ -463,7 +464,7 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PAUSED) playbin.set_state.reset_mock() - self.audio._on_buffering(100) + self.audio._handler.on_buffering(100) self.assertEqual(playbin.set_state.call_count, 0) self.assertFalse(self.audio._buffering) @@ -473,12 +474,12 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() - self.audio._on_buffering(0) + self.audio._handler.on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) self.audio.pause_playback() playbin.set_state.reset_mock() - self.audio._on_buffering(100) + self.audio._handler.on_buffering(100) self.assertEqual(playbin.set_state.call_count, 0) self.assertFalse(self.audio._buffering) @@ -488,7 +489,7 @@ class AudioBufferingTest(unittest.TestCase): playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() - self.audio._on_buffering(0) + self.audio._handler.on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) playbin.set_state.reset_mock()