audio: Move most of event handling out of audio.

Some of the signal handling still needs to be moved.
This commit is contained in:
Thomas Adamcik 2014-08-04 23:51:47 +02:00
parent 101b2a9817
commit ac5bf9af17
2 changed files with 185 additions and 155 deletions

View File

@ -51,6 +51,10 @@ MB = 1 << 20
PLAYBIN_FLAGS = (1 << 1) | (1 << 4) | (1 << 7)
PLAYBIN_VIS_FLAGS = PLAYBIN_FLAGS | (1 << 3)
# These are just to long to wrap nicely, so rename them locally.
_get_missing_description = gst.pbutils.missing_plugin_message_get_description
_get_missing_detail = gst.pbutils.missing_plugin_message_get_installer_detail
class _Signals(object):
"""Helper for tracking gobject signal registrations"""
@ -257,6 +261,153 @@ def setup_proxy(element, config):
element.set_property('proxy-pw', config.get('password'))
class _Handler(object):
def __init__(self, audio):
self._audio = audio
self._element = None
self._pad = None
self._message_handler_id = None
self._event_handler_id = None
def setup_message_handling(self, element):
self._element = element
bus = element.get_bus()
bus.add_signal_watch()
self._message_handler_id = bus.connect('message', self.on_message)
def setup_event_handling(self, pad):
self._pad = pad
self._event_handler_id = pad.add_event_probe(self.on_event)
def teardown(self):
bus = self._element.get_bus()
bus.remove_signal_watch()
bus.disconnect(self._message_handler_id)
self._pad.remove_event_probe(self._event_handler_id)
self._message_handler_id = None
self._event_handler_id = None
def on_message(self, bus, msg):
if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._element:
self.on_playbin_state_changed(*msg.parse_state_changed())
elif msg.type == gst.MESSAGE_BUFFERING:
self.on_buffering(msg.parse_buffering())
elif msg.type == gst.MESSAGE_EOS:
self.on_end_of_stream()
elif msg.type == gst.MESSAGE_ERROR:
self.on_error(*msg.parse_error())
elif msg.type == gst.MESSAGE_WARNING:
self.on_warning(*msg.parse_warning())
elif msg.type == gst.MESSAGE_ASYNC_DONE:
self.on_async_done()
elif msg.type == gst.MESSAGE_ELEMENT:
if gst.pbutils.is_missing_plugin_message(msg):
self.on_missing_plugin(_get_missing_description(msg),
_get_missing_detail(msg))
def on_event(self, pad, event):
if event.type == gst.EVENT_NEWSEGMENT:
self.on_new_segment(*event.parse_new_segment())
elif event.type == gst.EVENT_SINK_MESSAGE:
# Handle stream changed messages when they reach our output bin.
# If we listen for it on the bus we get one per tee branch.
msg = event.parse_sink_message()
if msg.structure.has_name('playbin2-stream-changed'):
self.on_stream_changed(msg.structure['uri'])
return True
def on_playbin_state_changed(self, old_state, new_state, pending_state):
gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s',
old_state.value_name, new_state.value_name,
pending_state.value_name)
if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL:
# XXX: We're not called on the last state change when going down to
# NULL, so we rewrite the second to last call to get the expected
# behavior.
new_state = gst.STATE_NULL
pending_state = gst.STATE_VOID_PENDING
if pending_state != gst.STATE_VOID_PENDING:
return # Ignore intermediate state changes
if new_state == gst.STATE_READY:
return # Ignore READY state as it's GStreamer specific
new_state = _GST_STATE_MAPPING[new_state]
old_state, self._audio.state = self._audio.state, new_state
target_state = _GST_STATE_MAPPING[self._audio._target_state]
if target_state == new_state:
target_state = None
logger.debug('Triggering: state_changed(old_state=%s, new_state=%s, '
'target_state=%s)', old_state, new_state, target_state)
AudioListener.send('state_changed', old_state=old_state,
new_state=new_state, target_state=target_state)
if new_state == PlaybackState.STOPPED:
logger.debug('Triggering: stream_changed(uri=None)')
AudioListener.send('stream_changed', uri=None)
def on_buffering(self, percent):
gst_logger.debug('Got buffering message: percent=%d%%', percent)
if percent < 10 and not self._audio._buffering:
self._audio._playbin.set_state(gst.STATE_PAUSED)
self._audio._buffering = True
if percent == 100:
self._audio._buffering = False
if self._audio._target_state == gst.STATE_PLAYING:
self._audio._playbin.set_state(gst.STATE_PLAYING)
def on_end_of_stream(self):
gst_logger.debug('Got end-of-stream message.')
logger.debug('Triggering: reached_end_of_stream()')
AudioListener.send('reached_end_of_stream')
def on_error(self, error, debug):
gst_logger.error(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
# TODO: is this needed?
self._audio.stop_playback()
def on_warning(self, error, debug):
gst_logger.warning(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
def on_async_done(self):
gst_logger.debug('Got async-done.')
def on_missing_plugin(self, msg):
desc = gst.pbutils.missing_plugin_message_get_description(msg)
debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg)
gst_logger.debug('Got missing-plugin message: description:%s', desc)
logger.warning('Could not find a %s to handle media.', desc)
if gst.pbutils.install_plugins_supported():
logger.info('You might be able to fix this by running: '
'gst-installer "%s"', debug)
# TODO: store the missing plugins installer info in a file so we can
# can provide a 'mopidy install-missing-plugins' if the system has the
# required helper installed?
def on_new_segment(self, update, rate, format_, start, stop, position):
gst_logger.debug('Got new-segment event: update=%s rate=%s format=%s'
'start=%s stop=%s position=%s', update, rate,
format_.value_name, start, stop, position)
position_ms = position // gst.MSECOND
logger.debug('Triggering: position_changed(position=%s)', position_ms)
AudioListener.send('position_changed', position=position_ms)
def on_stream_changed(self, uri):
gst_logger.debug('Got stream-changed message: uri:%s', uri)
logger.debug('Triggering: stream_changed(uri=%s)', uri)
AudioListener.send('stream_changed', uri=uri)
# TODO: create a player class which replaces the actors internals
class Audio(pykka.ThreadingActor):
"""
@ -280,6 +431,7 @@ class Audio(pykka.ThreadingActor):
self._outputs = None
self._about_to_finish_callback = None
self._handler = _Handler(self)
self._appsrc = _Appsrc()
self._signals = _Signals()
@ -293,13 +445,11 @@ class Audio(pykka.ThreadingActor):
self._setup_output()
self._setup_mixer()
self._setup_visualizer()
self._setup_message_processor()
except gobject.GError as ex:
logger.exception(ex)
process.exit_process()
def on_stop(self):
self._teardown_message_processor()
self._teardown_mixer()
self._teardown_playbin()
@ -321,32 +471,18 @@ class Audio(pykka.ThreadingActor):
playbin.set_property('buffer-duration', 2*gst.SECOND)
self._signals.connect(playbin, 'source-setup', self._on_source_setup)
self._signals.connect(
playbin, 'about-to-finish', self._on_about_to_finish)
self._signals.connect(playbin, 'about-to-finish',
self._on_about_to_finish)
self._playbin = playbin
self._handler.setup_message_handling(playbin)
def _teardown_playbin(self):
self._handler.teardown()
self._signals.disconnect(self._playbin, 'about-to-finish')
self._signals.disconnect(self._playbin, 'source-setup')
self._playbin.set_state(gst.STATE_NULL)
def _on_about_to_finish(self, element):
gst_logger.debug('Got about-to-finish event.')
if self._about_to_finish_callback:
self._about_to_finish_callback()
def _on_source_setup(self, element, source):
gst_logger.debug('Got source-setup: element=%s', source)
if source.get_factory().get_name() == 'appsrc':
self._appsrc.configure(source)
else:
self._appsrc.reset()
if hasattr(source.props, 'proxy'):
setup_proxy(source, self._config['proxy'])
def _setup_output(self):
# We don't want to test outputs for regular testing, so just instal
# an unsynced fakesink when someone asks for a testouput.
@ -359,7 +495,7 @@ class Audio(pykka.ThreadingActor):
except exceptions.AudioException:
process.exit_process() # TODO: move this up the chain
self._outputs.get_pad('sink').add_event_probe(self._on_pad_event)
self._handler.setup_event_handling(self._outputs.get_pad('sink'))
self._playbin.set_property('audio-sink', self._outputs)
def _setup_mixer(self):
@ -385,129 +521,22 @@ class Audio(pykka.ThreadingActor):
'Failed to create audio visualizer "%s": %s',
visualizer_element, ex)
def _setup_message_processor(self):
bus = self._playbin.get_bus()
bus.add_signal_watch()
self._signals.connect(bus, 'message', self._on_message)
def _on_about_to_finish(self, element):
gst_logger.debug('Got about-to-finish event.')
if self._about_to_finish_callback:
logger.debug('Running about to finish callback.')
self._about_to_finish_callback()
def _teardown_message_processor(self):
bus = self._playbin.get_bus()
self._signals.disconnect(bus, 'message')
bus.remove_signal_watch()
def _on_source_setup(self, element, source):
gst_logger.debug('Got source-setup: element=%s', source)
def _on_pad_event(self, pad, event):
if event.type == gst.EVENT_NEWSEGMENT:
update, rate, format_, start, stop, pos = event.parse_new_segment()
gst_logger.debug('Got new-segment event: update=%s rate=%s '
'format=%s start=%s stop=%s position=%s', update,
rate, format_.value_name, start, stop, pos)
pos_ms = pos // gst.MSECOND
logger.debug('Triggering: position_changed(position=%s)', pos_ms)
AudioListener.send('position_changed', position=pos_ms)
elif event.type == gst.EVENT_SINK_MESSAGE:
# Handle stream changed messages when they reach our output bin.
# If we listen for it on the bus we get one per tee branch.
msg = event.parse_sink_message()
if msg.structure.has_name('playbin2-stream-changed'):
self._on_stream_changed(msg.structure['uri'])
return True
if source.get_factory().get_name() == 'appsrc':
self._appsrc.configure(source)
else:
self._appsrc.reset()
# TODO: consider splitting this out while we are at it.
def _on_message(self, bus, msg):
if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._playbin:
self._on_playbin_state_changed(*msg.parse_state_changed())
elif msg.type == gst.MESSAGE_BUFFERING:
self._on_buffering(msg.parse_buffering())
elif msg.type == gst.MESSAGE_EOS:
self._on_end_of_stream()
elif msg.type == gst.MESSAGE_ERROR:
self._on_error(*msg.parse_error())
elif msg.type == gst.MESSAGE_WARNING:
self._on_warning(*msg.parse_warning())
elif msg.type == gst.MESSAGE_ASYNC_DONE:
gst_logger.debug('Got async-done message.')
elif msg.type == gst.MESSAGE_ELEMENT:
if gst.pbutils.is_missing_plugin_message(msg):
self._on_missing_plugin(msg)
def _on_playbin_state_changed(self, old_state, new_state, pending_state):
gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s',
old_state.value_name, new_state.value_name,
pending_state.value_name)
if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL:
# XXX: We're not called on the last state change when going down to
# NULL, so we rewrite the second to last call to get the expected
# behavior.
new_state = gst.STATE_NULL
pending_state = gst.STATE_VOID_PENDING
if pending_state != gst.STATE_VOID_PENDING:
return # Ignore intermediate state changes
if new_state == gst.STATE_READY:
return # Ignore READY state as it's GStreamer specific
new_state = _GST_STATE_MAPPING[new_state]
old_state, self.state = self.state, new_state
target_state = _GST_STATE_MAPPING[self._target_state]
if target_state == new_state:
target_state = None
logger.debug('Triggering: state_changed(old_state=%s, new_state=%s, '
'target_state=%s)', old_state, new_state, target_state)
AudioListener.send('state_changed', old_state=old_state,
new_state=new_state, target_state=target_state)
if new_state == PlaybackState.STOPPED:
logger.debug('Triggering: stream_changed(uri=None)')
AudioListener.send('stream_changed', uri=None)
def _on_buffering(self, percent):
gst_logger.debug('Got buffering message: percent=%d%%', percent)
if percent < 10 and not self._buffering:
self._playbin.set_state(gst.STATE_PAUSED)
self._buffering = True
if percent == 100:
self._buffering = False
if self._target_state == gst.STATE_PLAYING:
self._playbin.set_state(gst.STATE_PLAYING)
def _on_end_of_stream(self):
gst_logger.debug('Got end-of-stream message.')
logger.debug('Triggering: reached_end_of_stream()')
AudioListener.send('reached_end_of_stream')
def _on_error(self, error, debug):
gst_logger.error(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
# TODO: is this needed?
self.stop_playback()
def _on_warning(self, error, debug):
gst_logger.warning(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
def _on_stream_changed(self, uri):
gst_logger.debug('Got stream-changed message: uri:%s', uri)
logger.debug('Triggering: stream_changed(uri=%s)', uri)
AudioListener.send('stream_changed', uri=uri)
def _on_missing_plugin(self, msg):
desc = gst.pbutils.missing_plugin_message_get_description(msg)
debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg)
gst_logger.debug('Got missing-plugin message: description:%s', desc)
logger.warning('Could not find a %s to handle media.', desc)
if gst.pbutils.install_plugins_supported():
logger.info('You might be able to fix this by running: '
'gst-installer "%s"', debug)
# TODO: store the missing plugins installer info in a file so we can
# can provide a 'mopidy install-missing-plugins' if the system has the
# required helper installed?
if hasattr(source.props, 'proxy'):
setup_proxy(source, self._config['proxy'])
def set_uri(self, uri):
"""
@ -659,7 +688,7 @@ class Audio(pykka.ThreadingActor):
Should only be used by tests.
"""
def sync_handler(bus, message):
self._on_message(bus, message)
self._handler.on_message(bus, message)
return gst.BUS_DROP
bus = self._playbin.get_bus()

View File

@ -357,6 +357,7 @@ class AudioEventTest(BaseTest):
self.audio.wait_for_state_change().get()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not done.wait(timeout=1.0):
self.fail('EOS not received')
@ -405,17 +406,17 @@ class AudioStateTest(unittest.TestCase):
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
def test_state_does_not_change_when_in_gst_ready_state(self):
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_NULL, gst.STATE_READY, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
def test_state_changes_from_stopped_to_playing_on_play(self):
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_NULL, gst.STATE_READY, gst.STATE_PLAYING)
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING)
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PAUSED, gst.STATE_PLAYING, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.PLAYING, self.audio.state)
@ -423,7 +424,7 @@ class AudioStateTest(unittest.TestCase):
def test_state_changes_from_playing_to_paused_on_pause(self):
self.audio.state = audio.PlaybackState.PLAYING
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.PAUSED, self.audio.state)
@ -431,12 +432,12 @@ class AudioStateTest(unittest.TestCase):
def test_state_changes_from_playing_to_stopped_on_stop(self):
self.audio.state = audio.PlaybackState.PLAYING
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_NULL)
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PAUSED, gst.STATE_READY, gst.STATE_NULL)
# We never get the following call, so the logic must work without it
# self.audio._on_playbin_state_changed(
# self.audio._handler.on_playbin_state_changed(
# gst.STATE_READY, gst.STATE_NULL, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
@ -453,7 +454,7 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._on_buffering(0)
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
self.assertTrue(self.audio._buffering)
@ -463,7 +464,7 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
playbin.set_state.reset_mock()
self.audio._on_buffering(100)
self.audio._handler.on_buffering(100)
self.assertEqual(playbin.set_state.call_count, 0)
self.assertFalse(self.audio._buffering)
@ -473,12 +474,12 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._on_buffering(0)
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
self.audio.pause_playback()
playbin.set_state.reset_mock()
self.audio._on_buffering(100)
self.audio._handler.on_buffering(100)
self.assertEqual(playbin.set_state.call_count, 0)
self.assertFalse(self.audio._buffering)
@ -488,7 +489,7 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._on_buffering(0)
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
playbin.set_state.reset_mock()