Merge pull request #826 from adamcik/feature/audio-message-handler

Refactor message handling in audio
This commit is contained in:
Stein Magnus Jodal 2014-09-07 23:41:27 +02:00
commit 0091603f2a
3 changed files with 217 additions and 168 deletions

View File

@ -18,6 +18,14 @@ v0.20.0 (UNRELEASED)
- In stored playlist names, replace "/", which are illegal, with "|" instead of
a whitespace. Pipes are more similar to forward slash.
**Audio**
- Internal code cleanup within audio sub-system:
- Started splitting audio code into smaller better defined pieces.
- Improved GStreamer related debug logging.
- Provide better error messages for missing plugins.
- Add foundation for trying to re-add multiple output support.
v0.19.4 (2014-09-01)
====================

View File

@ -51,6 +51,10 @@ MB = 1 << 20
PLAYBIN_FLAGS = (1 << 1) | (1 << 4) | (1 << 7)
PLAYBIN_VIS_FLAGS = PLAYBIN_FLAGS | (1 << 3)
# These are just to long to wrap nicely, so rename them locally.
_get_missing_description = gst.pbutils.missing_plugin_message_get_description
_get_missing_detail = gst.pbutils.missing_plugin_message_get_installer_detail
class _Signals(object):
"""Helper for tracking gobject signal registrations"""
@ -155,7 +159,7 @@ class _Outputs(gst.Bin):
# the actual switch, i.e. about to switch can block for longer thanks
# to this queue.
# TODO: make the min-max values a setting?
# TODO: move out of this class?
# TODO: this does not belong in this class.
queue = gst.element_factory_make('queue')
queue.set_property('max-size-buffers', 0)
queue.set_property('max-size-bytes', 0)
@ -168,9 +172,11 @@ class _Outputs(gst.Bin):
ghost_pad = gst.GhostPad('sink', queue.get_pad('sink'))
self.add_pad(ghost_pad)
# Add an always connected fakesink so the tee doesn't fail.
# XXX disabled for now as we get one stream changed per sink...
# self._add(gst.element_factory_make('fakesink'))
# Add an always connected fakesink which respects the clock so the tee
# doesn't fail even if we don't have any outputs.
fakesink = gst.element_factory_make('fakesink')
fakesink.set_property('sync', True)
self._add(fakesink)
def add_output(self, description):
# XXX This only works for pipelines not in use until #790 gets done.
@ -255,6 +261,154 @@ def setup_proxy(element, config):
element.set_property('proxy-pw', config.get('password'))
class _Handler(object):
def __init__(self, audio):
self._audio = audio
self._element = None
self._pad = None
self._message_handler_id = None
self._event_handler_id = None
def setup_message_handling(self, element):
self._element = element
bus = element.get_bus()
bus.add_signal_watch()
self._message_handler_id = bus.connect('message', self.on_message)
def setup_event_handling(self, pad):
self._pad = pad
self._event_handler_id = pad.add_event_probe(self.on_event)
def teardown_message_handling(self):
bus = self._element.get_bus()
bus.remove_signal_watch()
bus.disconnect(self._message_handler_id)
self._message_handler_id = None
def teardown_event_handling(self):
self._pad.remove_event_probe(self._event_handler_id)
self._event_handler_id = None
def on_message(self, bus, msg):
if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._element:
self.on_playbin_state_changed(*msg.parse_state_changed())
elif msg.type == gst.MESSAGE_BUFFERING:
self.on_buffering(msg.parse_buffering())
elif msg.type == gst.MESSAGE_EOS:
self.on_end_of_stream()
elif msg.type == gst.MESSAGE_ERROR:
self.on_error(*msg.parse_error())
elif msg.type == gst.MESSAGE_WARNING:
self.on_warning(*msg.parse_warning())
elif msg.type == gst.MESSAGE_ASYNC_DONE:
self.on_async_done()
elif msg.type == gst.MESSAGE_ELEMENT:
if gst.pbutils.is_missing_plugin_message(msg):
self.on_missing_plugin(_get_missing_description(msg),
_get_missing_detail(msg))
def on_event(self, pad, event):
if event.type == gst.EVENT_NEWSEGMENT:
self.on_new_segment(*event.parse_new_segment())
elif event.type == gst.EVENT_SINK_MESSAGE:
# Handle stream changed messages when they reach our output bin.
# If we listen for it on the bus we get one per tee branch.
msg = event.parse_sink_message()
if msg.structure.has_name('playbin2-stream-changed'):
self.on_stream_changed(msg.structure['uri'])
return True
def on_playbin_state_changed(self, old_state, new_state, pending_state):
gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s',
old_state.value_name, new_state.value_name,
pending_state.value_name)
if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL:
# XXX: We're not called on the last state change when going down to
# NULL, so we rewrite the second to last call to get the expected
# behavior.
new_state = gst.STATE_NULL
pending_state = gst.STATE_VOID_PENDING
if pending_state != gst.STATE_VOID_PENDING:
return # Ignore intermediate state changes
if new_state == gst.STATE_READY:
return # Ignore READY state as it's GStreamer specific
new_state = _GST_STATE_MAPPING[new_state]
old_state, self._audio.state = self._audio.state, new_state
target_state = _GST_STATE_MAPPING[self._audio._target_state]
if target_state == new_state:
target_state = None
logger.debug('Audio event: state_changed(old_state=%s, new_state=%s, '
'target_state=%s)', old_state, new_state, target_state)
AudioListener.send('state_changed', old_state=old_state,
new_state=new_state, target_state=target_state)
if new_state == PlaybackState.STOPPED:
logger.debug('Audio event: stream_changed(uri=None)')
AudioListener.send('stream_changed', uri=None)
def on_buffering(self, percent):
gst_logger.debug('Got buffering message: percent=%d%%', percent)
if percent < 10 and not self._audio._buffering:
self._audio._playbin.set_state(gst.STATE_PAUSED)
self._audio._buffering = True
if percent == 100:
self._audio._buffering = False
if self._audio._target_state == gst.STATE_PLAYING:
self._audio._playbin.set_state(gst.STATE_PLAYING)
def on_end_of_stream(self):
gst_logger.debug('Got end-of-stream message.')
logger.debug('Audio event: reached_end_of_stream()')
AudioListener.send('reached_end_of_stream')
def on_error(self, error, debug):
gst_logger.error(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
# TODO: is this needed?
self._audio.stop_playback()
def on_warning(self, error, debug):
gst_logger.warning(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
def on_async_done(self):
gst_logger.debug('Got async-done.')
def on_missing_plugin(self, msg):
desc = gst.pbutils.missing_plugin_message_get_description(msg)
debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg)
gst_logger.debug('Got missing-plugin message: description:%s', desc)
logger.warning('Could not find a %s to handle media.', desc)
if gst.pbutils.install_plugins_supported():
logger.info('You might be able to fix this by running: '
'gst-installer "%s"', debug)
# TODO: store the missing plugins installer info in a file so we can
# can provide a 'mopidy install-missing-plugins' if the system has the
# required helper installed?
def on_new_segment(self, update, rate, format_, start, stop, position):
gst_logger.debug('Got new-segment event: update=%s rate=%s format=%s'
'start=%s stop=%s position=%s', update, rate,
format_.value_name, start, stop, position)
position_ms = position // gst.MSECOND
logger.debug('Audio event: position_changed(position=%s)', position_ms)
AudioListener.send('position_changed', position=position_ms)
def on_stream_changed(self, uri):
gst_logger.debug('Got stream-changed message: uri=%s', uri)
logger.debug('Audio event: stream_changed(uri=%s)', uri)
AudioListener.send('stream_changed', uri=uri)
# TODO: create a player class which replaces the actors internals
class Audio(pykka.ThreadingActor):
"""
@ -278,6 +432,7 @@ class Audio(pykka.ThreadingActor):
self._outputs = None
self._about_to_finish_callback = None
self._handler = _Handler(self)
self._appsrc = _Appsrc()
self._signals = _Signals()
@ -291,13 +446,11 @@ class Audio(pykka.ThreadingActor):
self._setup_output()
self._setup_mixer()
self._setup_visualizer()
self._setup_message_processor()
except gobject.GError as ex:
logger.exception(ex)
process.exit_process()
def on_stop(self):
self._teardown_message_processor()
self._teardown_mixer()
self._teardown_playbin()
@ -319,39 +472,32 @@ class Audio(pykka.ThreadingActor):
playbin.set_property('buffer-duration', 2*gst.SECOND)
self._signals.connect(playbin, 'source-setup', self._on_source_setup)
self._signals.connect(
playbin, 'about-to-finish', self._on_about_to_finish)
self._signals.connect(playbin, 'about-to-finish',
self._on_about_to_finish)
self._playbin = playbin
self._handler.setup_message_handling(playbin)
def _teardown_playbin(self):
self._handler.teardown_message_handling()
self._handler.teardown_event_handling()
self._signals.disconnect(self._playbin, 'about-to-finish')
self._signals.disconnect(self._playbin, 'source-setup')
self._playbin.set_state(gst.STATE_NULL)
def _on_about_to_finish(self, element):
gst_logger.debug('Got about-to-finish event.')
if self._about_to_finish_callback:
self._about_to_finish_callback()
def _on_source_setup(self, element, source):
gst_logger.debug('Got source-setup: element=%s', source)
if source.get_factory().get_name() == 'appsrc':
self._appsrc.configure(source)
else:
self._appsrc.reset()
if hasattr(source.props, 'proxy'):
setup_proxy(source, self._config['proxy'])
def _setup_output(self):
self._outputs = _Outputs()
self._outputs.get_pad('sink').add_event_probe(self._on_pad_event)
try:
self._outputs.add_output(self._config['audio']['output'])
except exceptions.AudioException:
process.exit_process() # TODO: move this up the chain
# We don't want to use outputs for regular testing, so just install
# an unsynced fakesink when someone asks for a 'testoutput'.
if self._config['audio']['output'] == 'testoutput':
self._outputs = gst.element_factory_make('fakesink')
else:
self._outputs = _Outputs()
try:
self._outputs.add_output(self._config['audio']['output'])
except exceptions.AudioException:
process.exit_process() # TODO: move this up the chain
self._handler.setup_event_handling(self._outputs.get_pad('sink'))
self._playbin.set_property('audio-sink', self._outputs)
def _setup_mixer(self):
@ -377,126 +523,22 @@ class Audio(pykka.ThreadingActor):
'Failed to create audio visualizer "%s": %s',
visualizer_element, ex)
def _setup_message_processor(self):
bus = self._playbin.get_bus()
bus.add_signal_watch()
self._signals.connect(bus, 'message', self._on_message)
def _on_about_to_finish(self, element):
gst_logger.debug('Got about-to-finish event.')
if self._about_to_finish_callback:
logger.debug('Running about to finish callback.')
self._about_to_finish_callback()
def _teardown_message_processor(self):
bus = self._playbin.get_bus()
self._signals.disconnect(bus, 'message')
bus.remove_signal_watch()
def _on_source_setup(self, element, source):
gst_logger.debug('Got source-setup: element=%s', source)
def _on_pad_event(self, pad, event):
if event.type == gst.EVENT_NEWSEGMENT:
update, rate, format_, start, stop, pos = event.parse_new_segment()
gst_logger.debug('Got new-segment event: update=%s rate=%s '
'format=%s start=%s stop=%s position=%s', update,
rate, format_.value_name, start, stop, pos)
pos_ms = pos // gst.MSECOND
logger.debug('Triggering: position_changed(position=%s)', pos_ms)
AudioListener.send('position_changed', position=pos_ms)
if source.get_factory().get_name() == 'appsrc':
self._appsrc.configure(source)
else:
self._appsrc.reset()
return True
# TODO: consider splitting this out while we are at it.
def _on_message(self, bus, msg):
if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._playbin:
self._on_playbin_state_changed(*msg.parse_state_changed())
elif msg.type == gst.MESSAGE_BUFFERING:
self._on_buffering(msg.parse_buffering())
elif msg.type == gst.MESSAGE_EOS:
self._on_end_of_stream()
elif msg.type == gst.MESSAGE_ERROR:
self._on_error(*msg.parse_error())
elif msg.type == gst.MESSAGE_WARNING:
self._on_warning(*msg.parse_warning())
elif msg.type == gst.MESSAGE_ASYNC_DONE:
gst_logger.debug('Got async-done message.')
elif msg.type == gst.MESSAGE_ELEMENT:
if msg.structure.has_name('playbin2-stream-changed'):
self._on_stream_changed(msg.structure['uri'])
elif gst.pbutils.is_missing_plugin_message(msg):
self._on_missing_plugin(msg)
def _on_playbin_state_changed(self, old_state, new_state, pending_state):
gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s',
old_state.value_name, new_state.value_name,
pending_state.value_name)
if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL:
# XXX: We're not called on the last state change when going down to
# NULL, so we rewrite the second to last call to get the expected
# behavior.
new_state = gst.STATE_NULL
pending_state = gst.STATE_VOID_PENDING
if pending_state != gst.STATE_VOID_PENDING:
return # Ignore intermediate state changes
if new_state == gst.STATE_READY:
return # Ignore READY state as it's GStreamer specific
new_state = _GST_STATE_MAPPING[new_state]
old_state, self.state = self.state, new_state
target_state = _GST_STATE_MAPPING[self._target_state]
if target_state == new_state:
target_state = None
logger.debug('Triggering: state_changed(old_state=%s, new_state=%s, '
'target_state=%s)', old_state, new_state, target_state)
AudioListener.send('state_changed', old_state=old_state,
new_state=new_state, target_state=target_state)
if new_state == PlaybackState.STOPPED:
logger.debug('Triggering: stream_changed(uri=None)')
AudioListener.send('stream_changed', uri=None)
def _on_buffering(self, percent):
gst_logger.debug('Got buffering message: percent=%d%%', percent)
if percent < 10 and not self._buffering:
self._playbin.set_state(gst.STATE_PAUSED)
self._buffering = True
if percent == 100:
self._buffering = False
if self._target_state == gst.STATE_PLAYING:
self._playbin.set_state(gst.STATE_PLAYING)
def _on_end_of_stream(self):
gst_logger.debug('Got end-of-stream message.')
logger.debug('Triggering: reached_end_of_stream()')
AudioListener.send('reached_end_of_stream')
def _on_error(self, error, debug):
gst_logger.error(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
# TODO: is this needed?
self.stop_playback()
def _on_warning(self, error, debug):
gst_logger.warning(str(error).decode('utf-8'))
if debug:
gst_logger.debug(debug.decode('utf-8'))
def _on_stream_changed(self, uri):
gst_logger.debug('Got stream-changed message: uri:%s', uri)
logger.debug('Triggering: stream_changed(uri=%s)', uri)
AudioListener.send('stream_changed', uri=uri)
def _on_missing_plugin(self, msg):
desc = gst.pbutils.missing_plugin_message_get_description(msg)
debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg)
gst_logger.debug('Got missing-plugin message: description:%s', desc)
logger.warning('Could not find a %s to handle media.', desc)
if gst.pbutils.install_plugins_supported():
logger.info('You might be able to fix this by running: '
'gst-installer "%s"', debug)
# TODO: store the missing plugins installer info in a file so we can
# can provide a 'mopidy install-missing-plugins' if the system has the
# required helper installed?
if hasattr(source.props, 'proxy'):
setup_proxy(source, self._config['proxy'])
def set_uri(self, uri):
"""
@ -648,7 +690,7 @@ class Audio(pykka.ThreadingActor):
Should only be used by tests.
"""
def sync_handler(bus, message):
self._on_message(bus, message)
self._handler.on_message(bus, message)
return gst.BUS_DROP
bus = self._playbin.get_bus()

View File

@ -21,11 +21,9 @@ from mopidy.utils.path import path_to_uri
from tests import path_to_data_dir
"""
We want to make sure both our real audio class and the fake one behave
correctly. So each test is first run against the real class, then repeated
against our dummy.
"""
# We want to make sure both our real audio class and the fake one behave
# correctly. So each test is first run against the real class, then repeated
# against our dummy.
class BaseTest(unittest.TestCase):
@ -34,7 +32,7 @@ class BaseTest(unittest.TestCase):
'mixer': 'fakemixer track_max_volume=65536',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'output': 'testoutput',
'visualizer': None,
}
}
@ -49,7 +47,7 @@ class BaseTest(unittest.TestCase):
'audio': {
'mixer': 'foomixer',
'mixer_volume': None,
'output': 'fakesink',
'output': 'testoutput',
'visualizer': None,
},
'proxy': {
@ -359,6 +357,7 @@ class AudioEventTest(BaseTest):
self.audio.wait_for_state_change().get()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not done.wait(timeout=1.0):
self.fail('EOS not received')
@ -407,17 +406,17 @@ class AudioStateTest(unittest.TestCase):
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
def test_state_does_not_change_when_in_gst_ready_state(self):
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_NULL, gst.STATE_READY, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
def test_state_changes_from_stopped_to_playing_on_play(self):
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_NULL, gst.STATE_READY, gst.STATE_PLAYING)
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING)
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PAUSED, gst.STATE_PLAYING, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.PLAYING, self.audio.state)
@ -425,7 +424,7 @@ class AudioStateTest(unittest.TestCase):
def test_state_changes_from_playing_to_paused_on_pause(self):
self.audio.state = audio.PlaybackState.PLAYING
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.PAUSED, self.audio.state)
@ -433,12 +432,12 @@ class AudioStateTest(unittest.TestCase):
def test_state_changes_from_playing_to_stopped_on_stop(self):
self.audio.state = audio.PlaybackState.PLAYING
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_NULL)
self.audio._on_playbin_state_changed(
self.audio._handler.on_playbin_state_changed(
gst.STATE_PAUSED, gst.STATE_READY, gst.STATE_NULL)
# We never get the following call, so the logic must work without it
# self.audio._on_playbin_state_changed(
# self.audio._handler.on_playbin_state_changed(
# gst.STATE_READY, gst.STATE_NULL, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
@ -455,7 +454,7 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._on_buffering(0)
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
self.assertTrue(self.audio._buffering)
@ -465,7 +464,7 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
playbin.set_state.reset_mock()
self.audio._on_buffering(100)
self.audio._handler.on_buffering(100)
self.assertEqual(playbin.set_state.call_count, 0)
self.assertFalse(self.audio._buffering)
@ -475,12 +474,12 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._on_buffering(0)
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
self.audio.pause_playback()
playbin.set_state.reset_mock()
self.audio._on_buffering(100)
self.audio._handler.on_buffering(100)
self.assertEqual(playbin.set_state.call_count, 0)
self.assertFalse(self.audio._buffering)
@ -490,7 +489,7 @@ class AudioBufferingTest(unittest.TestCase):
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._on_buffering(0)
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
playbin.set_state.reset_mock()