Merge pull request #680 from adamcik/feature/audio-prep-work-for-gapless

audio: Preparation work for gapless playback support
This commit is contained in:
Stein Magnus Jodal 2014-08-02 20:55:54 +02:00
commit 4f34f0103d
6 changed files with 510 additions and 33 deletions

View File

@ -45,6 +45,7 @@ PLAYBIN_FLAGS = (1 << 1) | (1 << 4) | (1 << 7)
PLAYBIN_VIS_FLAGS = PLAYBIN_FLAGS | (1 << 3)
# TODO: split out mixer as these are too intertwined right now
class Audio(pykka.ThreadingActor):
"""
Audio output through `GStreamer <http://gstreamer.freedesktop.org/>`_.
@ -63,6 +64,7 @@ class Audio(pykka.ThreadingActor):
self._playbin = None
self._signal_ids = {} # {(element, event): signal_id}
self._about_to_finish_callback = None
self._appsrc = None
self._appsrc_caps = None
@ -87,6 +89,7 @@ class Audio(pykka.ThreadingActor):
self._teardown_mixer()
self._teardown_playbin()
# TODO: split out signal tracking helper class.
def _connect(self, element, event, *args):
"""Helper to keep track of signal ids based on element+event"""
self._signal_ids[(element, event)] = element.connect(event, *args)
@ -120,13 +123,15 @@ class Audio(pykka.ThreadingActor):
def _on_about_to_finish(self, element):
source, self._appsrc = self._appsrc, None
if source is None:
return
self._appsrc_caps = None
if source is not None:
self._appsrc_caps = None
self._disconnect(source, 'need-data')
self._disconnect(source, 'enough-data')
self._disconnect(source, 'seek-data')
self._disconnect(source, 'need-data')
self._disconnect(source, 'enough-data')
self._disconnect(source, 'seek-data')
if self._about_to_finish_callback:
logger.debug('Calling about to finish callback.')
self._about_to_finish_callback()
def _on_new_source(self, element, pad):
uri = element.get_property('uri')
@ -188,15 +193,37 @@ class Audio(pykka.ThreadingActor):
def _setup_output(self):
output_desc = self._config['audio']['output']
try:
output = gst.parse_bin_from_description(
user_output = gst.parse_bin_from_description(
output_desc, ghost_unconnected_pads=True)
self._playbin.set_property('audio-sink', output)
logger.info('Audio output set to "%s"', output_desc)
except gobject.GError as ex:
logger.error(
'Failed to create audio output "%s": %s', output_desc, ex)
process.exit_process()
output = gst.Bin('output')
# Queue element to buy us time between the about to finish event and
# the actual switch, i.e. about to switch can block for longer thanks
# to this queue.
# TODO: make the min-max values a setting?
queue = gst.element_factory_make('queue')
queue.set_property('max-size-buffers', 0)
queue.set_property('max-size-bytes', 0)
queue.set_property('max-size-time', 5 * gst.SECOND)
queue.set_property('min-threshold-time', 3 * gst.SECOND)
queue.get_pad('src').add_event_probe(self._on_pad_event)
output.add(user_output)
output.add(queue)
queue.link(user_output)
ghost_pad = gst.GhostPad('sink', queue.get_pad('sink'))
output.add_pad(ghost_pad)
logger.info('Audio output set to "%s"', output_desc)
self._playbin.set_property('audio-sink', output)
def _setup_mixer(self):
if self._config['audio']['mixer'] != 'software':
return
@ -246,6 +273,15 @@ class Audio(pykka.ThreadingActor):
self._disconnect(bus, 'message')
bus.remove_signal_watch()
def _on_pad_event(self, pad, event):
if event.type == gst.EVENT_NEWSEGMENT:
# update, rate, format, start, stop, position
position = event.parse_new_segment()[5] // gst.MSECOND
logger.debug('Triggering event: position_changed(position=%s)',
position)
AudioListener.send('position_changed', position=position)
return True
def _on_message(self, bus, msg):
if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._playbin:
self._on_playbin_state_changed(*msg.parse_state_changed())
@ -257,6 +293,9 @@ class Audio(pykka.ThreadingActor):
self._on_error(*msg.parse_error())
elif msg.type == gst.MESSAGE_WARNING:
self._on_warning(*msg.parse_warning())
elif msg.type == gst.MESSAGE_ELEMENT:
if msg.structure.has_name('playbin2-stream-changed'):
self._on_stream_changed(msg.structure['uri'])
def _on_playbin_state_changed(self, old_state, new_state, pending_state):
if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL:
@ -284,6 +323,8 @@ class Audio(pykka.ThreadingActor):
'target_state=%s)', old_state, new_state, target_state)
AudioListener.send('state_changed', old_state=old_state,
new_state=new_state, target_state=target_state)
if new_state == PlaybackState.STOPPED:
AudioListener.send('stream_changed', uri=None)
def _on_buffering(self, percent):
if percent < 10 and not self._buffering:
@ -297,7 +338,7 @@ class Audio(pykka.ThreadingActor):
logger.debug('Buffer %d%% full', percent)
def _on_end_of_stream(self):
logger.debug('Triggering reached_end_of_stream event')
logger.debug('Audio event: reached_end_of_stream')
AudioListener.send('reached_end_of_stream')
def _on_error(self, error, debug):
@ -311,6 +352,10 @@ class Audio(pykka.ThreadingActor):
'%s Debug message: %s',
str(error).decode('utf-8'), debug.decode('utf-8') or 'None')
def _on_stream_changed(self, uri):
logger.debug('Triggering event: stream_changed(uri=%s)', uri)
AudioListener.send('stream_changed', uri=uri)
def set_uri(self, uri):
"""
Set URI of audio to be played.
@ -372,8 +417,22 @@ class Audio(pykka.ThreadingActor):
We will get a GStreamer message when the stream playback reaches the
token, and can then do any end-of-stream related tasks.
"""
# TODO: replace this with emit_data(None)?
self._playbin.get_property('source').emit('end-of-stream')
def set_about_to_finish_callback(self, callback):
"""
Configure audio to use an about-to-finish callback.
This should be used to achieve gapless playback. For this to work the
callback *MUST* call :meth:`set_uri` with the new URI to play and
block until this call has been made. :meth:`prepare_change` is not
needed before :meth:`set_uri` in this one special case.
:param callable callback: Callback to run when we need the next URI.
"""
self._about_to_finish_callback = callback
def get_position(self):
"""
Get position in milliseconds.
@ -435,6 +494,25 @@ class Audio(pykka.ThreadingActor):
self._buffering = False
return self._set_state(gst.STATE_NULL)
def wait_for_state_change(self):
"""Block until any pending state changes are complete.
Should only be used by tests.
"""
self._playbin.get_state()
def enable_sync_handler(self):
"""Enable manual processing of messages from bus.
Should only be used by tests.
"""
def sync_handler(bus, message):
self._on_message(bus, message)
return gst.BUS_DROP
bus = self._playbin.get_bus()
bus.set_sync_handler(sync_handler)
def _set_state(self, state):
"""
Internal method for setting the raw GStreamer state.

View File

@ -13,16 +13,18 @@ from .listener import AudioListener
class DummyAudio(pykka.ThreadingActor):
def __init__(self):
def __init__(self, config=None, mixer=None):
super(DummyAudio, self).__init__()
self.state = PlaybackState.STOPPED
self._volume = 0
self._position = 0
def set_on_end_of_track(self, callback):
pass
self._callback = None
self._uri = None
self._state_change_result = True
def set_uri(self, uri):
pass
assert self._uri is None, 'prepare change not called before set'
self._uri = uri
def set_appsrc(self, *args, **kwargs):
pass
@ -38,6 +40,7 @@ class DummyAudio(pykka.ThreadingActor):
def set_position(self, position):
self._position = position
AudioListener.send('position_changed', position=position)
return True
def start_playback(self):
@ -47,22 +50,63 @@ class DummyAudio(pykka.ThreadingActor):
return self._change_state(PlaybackState.PAUSED)
def prepare_change(self):
self._uri = None
return True
def stop_playback(self):
return self._change_state(PlaybackState.STOPPED)
def get_volume(self):
return 0
return self._volume
def set_volume(self, volume):
pass
self._volume = volume
return True
def set_metadata(self, track):
pass
def set_about_to_finish_callback(self, callback):
self._callback = callback
def enable_sync_handler(self):
pass
def wait_for_state_change(self):
pass
def _change_state(self, new_state):
if not self._uri:
return False
if self.state == PlaybackState.STOPPED and self._uri:
AudioListener.send('position_changed', position=0)
AudioListener.send('stream_changed', uri=self._uri)
if new_state == PlaybackState.STOPPED:
self._uri = None
AudioListener.send('stream_changed', uri=self._uri)
old_state, self.state = self.state, new_state
AudioListener.send(
'state_changed', old_state=old_state, new_state=new_state)
return True
AudioListener.send('state_changed', old_state=old_state,
new_state=new_state, target_state=None)
return self._state_change_result
def trigger_fake_playback_failure(self):
self._state_change_result = False
def get_about_to_finish_callback(self):
# This needs to be called from outside the actor or we lock up.
def wrapper():
if self._callback:
self.prepare_change()
self._callback()
if not self._uri or not self._callback:
AudioListener.send('reached_end_of_stream')
else:
AudioListener.send('position_changed', position=0)
AudioListener.send('stream_changed', uri=self._uri)
return wrapper

View File

@ -27,6 +27,26 @@ class AudioListener(listener.Listener):
"""
pass
def stream_changed(self, uri):
"""
Called whenever the audio stream changes.
*MAY* be implemented by actor.
:param string uri: URI the stream has started playing.
"""
pass
def position_changed(self, position_changed):
"""
Called whenever the position of the stream changes.
*MAY* be implemented by actor.
:param int position: Position in milliseconds.
"""
pass
def state_changed(self, old_state, new_state, target_state):
"""
Called after the playback state have changed.

View File

@ -10,6 +10,7 @@ from mopidy.core import listener
logger = logging.getLogger(__name__)
# TODO: split mixing out from playback?
class PlaybackController(object):
pykka_traversable = True
@ -23,6 +24,7 @@ class PlaybackController(object):
self._mute = False
def _get_backend(self):
# TODO: take in track instead
if self.current_tl_track is None:
return None
uri = self.current_tl_track.track.uri
@ -126,6 +128,7 @@ class PlaybackController(object):
# Methods
# TODO: remove this.
def change_track(self, tl_track, on_error_step=1):
"""
Change to the given track, keeping the current playback state.
@ -144,6 +147,7 @@ class PlaybackController(object):
elif old_state == PlaybackState.PAUSED:
self.pause()
# TODO: this is not really end of track, this is on_need_next_track
def on_end_of_track(self):
"""
Tell the playback controller that end of track is reached.
@ -181,6 +185,9 @@ class PlaybackController(object):
"""
tl_track = self.core.tracklist.next_track(self.current_tl_track)
if tl_track:
# TODO: switch to:
# backend.play(track)
# wait for state change?
self.change_track(tl_track)
else:
self.stop(clear_current_track=True)
@ -189,6 +196,9 @@ class PlaybackController(object):
"""Pause playback."""
backend = self._get_backend()
if not backend or backend.playback.pause().get():
# TODO: switch to:
# backend.track(pause)
# wait for state change?
self.state = PlaybackState.PAUSED
self._trigger_track_playback_paused()
@ -223,6 +233,10 @@ class PlaybackController(object):
assert tl_track in self.core.tracklist.tl_tracks
# TODO: switch to:
# backend.play(track)
# wait for state change?
if self.state == PlaybackState.PLAYING:
self.stop()
@ -233,6 +247,7 @@ class PlaybackController(object):
if success:
self.core.tracklist.mark_playing(tl_track)
# TODO: replace with stream-changed
self._trigger_track_playback_started()
else:
self.core.tracklist.mark_unplayable(tl_track)
@ -250,6 +265,9 @@ class PlaybackController(object):
will continue. If it was paused, it will still be paused, etc.
"""
tl_track = self.current_tl_track
# TODO: switch to:
# self.play(....)
# wait for state change?
self.change_track(
self.core.tracklist.previous_track(tl_track), on_error_step=-1)
@ -260,7 +278,11 @@ class PlaybackController(object):
backend = self._get_backend()
if backend and backend.playback.resume().get():
self.state = PlaybackState.PLAYING
# TODO: trigger via gst messages
self._trigger_track_playback_resumed()
# TODO: switch to:
# backend.resume()
# wait for state change?
def seek(self, time_position):
"""

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import threading
import unittest
import gobject
@ -14,12 +15,35 @@ import mock
import pykka
from mopidy import audio
from mopidy.audio import dummy as dummy_audio
from mopidy.audio.constants import PlaybackState
from mopidy.utils.path import path_to_uri
from tests import path_to_data_dir
"""
We want to make sure both our real audio class and the fake one behave
correctly. So each test is first run against the real class, then repeated
against our dummy.
"""
class BaseTest(unittest.TestCase):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=65536',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
}
uris = [path_to_uri(path_to_data_dir('song1.wav')),
path_to_uri(path_to_data_dir('song2.wav'))]
audio_class = audio.Audio
class AudioTest(unittest.TestCase):
def setUp(self):
config = {
'audio': {
@ -33,30 +57,52 @@ class AudioTest(unittest.TestCase):
},
}
self.song_uri = path_to_uri(path_to_data_dir('song1.wav'))
self.audio = audio.Audio.start(config=config, mixer=None).proxy()
self.audio = self.audio_class.start(config=config, mixer=None).proxy()
def tearDown(self):
pykka.ActorRegistry.stop_all()
def prepare_uri(self, uri):
self.audio.prepare_change()
self.audio.set_uri(uri)
def possibly_trigger_fake_playback_error(self):
pass
def possibly_trigger_fake_about_to_finish(self):
pass
class DummyMixin(object):
audio_class = dummy_audio.DummyAudio
def possibly_trigger_fake_playback_error(self):
self.audio.trigger_fake_playback_failure()
def possibly_trigger_fake_about_to_finish(self):
callback = self.audio.get_about_to_finish_callback().get()
if callback:
callback()
class AudioTest(BaseTest):
def test_start_playback_existing_file(self):
self.prepare_uri(self.song_uri)
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.assertTrue(self.audio.start_playback().get())
def test_start_playback_non_existing_file(self):
self.prepare_uri(self.song_uri + 'bogus')
self.possibly_trigger_fake_playback_error()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0] + 'bogus')
self.assertFalse(self.audio.start_playback().get())
def test_pause_playback_while_playing(self):
self.prepare_uri(self.song_uri)
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.assertTrue(self.audio.pause_playback().get())
def test_stop_playback_while_playing(self):
self.prepare_uri(self.song_uri)
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.assertTrue(self.audio.stop_playback().get())
@ -68,11 +114,272 @@ class AudioTest(unittest.TestCase):
def test_end_of_data_stream(self):
pass # TODO
def test_set_volume(self):
for value in range(0, 101):
self.assertTrue(self.audio.set_volume(value).get())
self.assertEqual(value, self.audio.get_volume().get())
@unittest.SkipTest
def test_set_mute(self):
pass # TODO Probably needs a fakemixer with a mixer track
@unittest.SkipTest
def test_set_state_encapsulation(self):
pass # TODO
@unittest.SkipTest
def test_set_position(self):
pass # TODO
@unittest.SkipTest
def test_invalid_output_raises_error(self):
pass # TODO
class AudioDummyTest(DummyMixin, AudioTest):
pass
@mock.patch.object(audio.AudioListener, 'send')
class AudioEventTest(BaseTest):
def setUp(self):
super(AudioEventTest, self).setUp()
self.audio.enable_sync_handler().get()
# TODO: test without uri set, with bad uri and gapless...
# TODO: playing->playing triggered by seek should be removed
# TODO: codify expected state after EOS
# TODO: consider returning a future or a threading event?
def test_state_change_stopped_to_playing_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
call = mock.call('state_changed', old_state=PlaybackState.STOPPED,
new_state=PlaybackState.PLAYING, target_state=None)
self.assertIn(call, send_mock.call_args_list)
def test_state_change_stopped_to_paused_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
call = mock.call('state_changed', old_state=PlaybackState.STOPPED,
new_state=PlaybackState.PAUSED, target_state=None)
self.assertIn(call, send_mock.call_args_list)
def test_state_change_paused_to_playing_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.start_playback()
self.audio.wait_for_state_change().get()
call = mock.call('state_changed', old_state=PlaybackState.PAUSED,
new_state=PlaybackState.PLAYING, target_state=None)
self.assertIn(call, send_mock.call_args_list)
def test_state_change_paused_to_stopped_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.stop_playback()
self.audio.wait_for_state_change().get()
call = mock.call('state_changed', old_state=PlaybackState.PAUSED,
new_state=PlaybackState.STOPPED, target_state=None)
self.assertIn(call, send_mock.call_args_list)
def test_state_change_playing_to_paused_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
call = mock.call('state_changed', old_state=PlaybackState.PLAYING,
new_state=PlaybackState.PAUSED, target_state=None)
self.assertIn(call, send_mock.call_args_list)
def test_state_change_playing_to_stopped_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.stop_playback()
self.audio.wait_for_state_change().get()
call = mock.call('state_changed', old_state=PlaybackState.PLAYING,
new_state=PlaybackState.STOPPED, target_state=None)
self.assertIn(call, send_mock.call_args_list)
def test_stream_changed_event_on_playing(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
# Since we are going from stopped to playing, the state change is
# enough to ensure the stream changed.
self.audio.wait_for_state_change().get()
call = mock.call('stream_changed', uri=self.uris[0])
self.assertIn(call, send_mock.call_args_list)
def test_stream_changed_event_on_paused_to_stopped(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.stop_playback()
self.audio.wait_for_state_change().get()
call = mock.call('stream_changed', uri=None)
self.assertIn(call, send_mock.call_args_list)
def test_position_changed_on_pause(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.wait_for_state_change().get()
call = mock.call('position_changed', position=0)
self.assertIn(call, send_mock.call_args_list)
def test_position_changed_on_play(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.wait_for_state_change().get()
call = mock.call('position_changed', position=0)
self.assertIn(call, send_mock.call_args_list)
def test_position_changed_on_seek(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.set_position(2000)
self.audio.wait_for_state_change().get()
call = mock.call('position_changed', position=0)
self.assertNotIn(call, send_mock.call_args_list)
def test_position_changed_on_seek_after_play(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.set_position(2000)
self.audio.wait_for_state_change().get()
call = mock.call('position_changed', position=2000)
self.assertIn(call, send_mock.call_args_list)
def test_position_changed_on_seek_after_pause(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.set_position(2000)
self.audio.wait_for_state_change().get()
call = mock.call('position_changed', position=2000)
self.assertIn(call, send_mock.call_args_list)
# Unlike the other events, having the state changed done is not
# enough to ensure our event is called. So we setup a threading
# event that we can wait for with a timeout while the track playback
# completes.
def test_stream_changed_event_on_paused(self, send_mock):
event = threading.Event()
def send(name, **kwargs):
if name == 'stream_changed':
event.set()
send_mock.side_effect = send
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback().get()
self.audio.wait_for_state_change().get()
if not event.wait(timeout=1.0):
self.fail('Stream changed not reached within deadline')
def test_reached_end_of_stream_event(self, send_mock):
event = threading.Event()
def send(name, **kwargs):
if name == 'reached_end_of_stream':
event.set()
send_mock.side_effect = send
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.possibly_trigger_fake_about_to_finish()
if not event.wait(timeout=1.0):
self.fail('End of stream not reached within deadline')
# Make sure that gapless really works:
def test_gapless(self, send_mock):
uris = self.uris[1:]
events = []
done = threading.Event()
def callback():
if uris:
self.audio.set_uri(uris.pop()).get()
def send(name, **kwargs):
events.append((name, kwargs))
if name == 'reached_end_of_stream':
done.set()
send_mock.side_effect = send
self.audio.set_about_to_finish_callback(callback).get()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
self.possibly_trigger_fake_about_to_finish()
if not done.wait(timeout=1.0):
self.fail('EOS not received')
excepted = [
('position_changed', {'position': 0}),
('stream_changed', {'uri': self.uris[0]}),
('state_changed', {'old_state': PlaybackState.STOPPED,
'new_state': PlaybackState.PLAYING,
'target_state': None}),
('position_changed', {'position': 0}),
('stream_changed', {'uri': self.uris[1]}),
('reached_end_of_stream', {})]
self.assertEqual(excepted, events)
class AudioDummyEventTest(DummyMixin, AudioEventTest):
pass
# TODO: move to mixer tests...
class MixerTest(BaseTest):
@unittest.SkipTest
def test_set_mute(self):
for value in (True, False):

View File

@ -26,3 +26,9 @@ class AudioListenerTest(unittest.TestCase):
def test_listener_has_default_impl_for_state_changed(self):
self.listener.state_changed(None, None, None)
def test_listener_has_default_impl_for_stream_changed(self):
self.listener.stream_changed(None)
def test_listener_has_default_impl_for_position_changed(self):
self.listener.position_changed(None)