mopidy/tests/audio/test_actor.py

617 lines
20 KiB
Python

from __future__ import absolute_import, unicode_literals
import threading
import unittest
import gobject
gobject.threads_init()
import mock
import pygst
pygst.require('0.10')
import gst # noqa
import pykka
from mopidy import audio, listener
from mopidy.audio.constants import PlaybackState
from mopidy.internal import path
from tests import dummy_audio, path_to_data_dir
# We want to make sure both our real audio class and the fake one behave
# correctly. So each test is first run against the real class, then repeated
# against our dummy.
class BaseTest(unittest.TestCase):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=65536',
'mixer_track': None,
'mixer_volume': None,
'output': 'testoutput',
'visualizer': None,
}
}
uris = [path.path_to_uri(path_to_data_dir('song1.wav')),
path.path_to_uri(path_to_data_dir('song2.wav'))]
audio_class = audio.Audio
def setUp(self): # noqa: N802
config = {
'audio': {
'mixer': 'foomixer',
'mixer_volume': None,
'output': 'testoutput',
'visualizer': None,
},
'proxy': {
'hostname': '',
},
}
self.song_uri = path.path_to_uri(path_to_data_dir('song1.wav'))
self.audio = self.audio_class.start(config=config, mixer=None).proxy()
def tearDown(self): # noqa
pykka.ActorRegistry.stop_all()
def possibly_trigger_fake_playback_error(self):
pass
def possibly_trigger_fake_about_to_finish(self):
pass
class DummyMixin(object):
audio_class = dummy_audio.DummyAudio
def possibly_trigger_fake_playback_error(self):
self.audio.trigger_fake_playback_failure()
def possibly_trigger_fake_about_to_finish(self):
callback = self.audio.get_about_to_finish_callback().get()
if callback:
callback()
class AudioTest(BaseTest):
def test_start_playback_existing_file(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.assertTrue(self.audio.start_playback().get())
def test_start_playback_non_existing_file(self):
self.possibly_trigger_fake_playback_error()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0] + 'bogus')
self.assertFalse(self.audio.start_playback().get())
def test_pause_playback_while_playing(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.assertTrue(self.audio.pause_playback().get())
def test_stop_playback_while_playing(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.assertTrue(self.audio.stop_playback().get())
@unittest.SkipTest
def test_deliver_data(self):
pass # TODO
@unittest.SkipTest
def test_end_of_data_stream(self):
pass # TODO
@unittest.SkipTest
def test_set_mute(self):
pass # TODO Probably needs a fakemixer with a mixer track
@unittest.SkipTest
def test_set_state_encapsulation(self):
pass # TODO
@unittest.SkipTest
def test_set_position(self):
pass # TODO
@unittest.SkipTest
def test_invalid_output_raises_error(self):
pass # TODO
class AudioDummyTest(DummyMixin, AudioTest):
pass
class DummyAudioListener(pykka.ThreadingActor, audio.AudioListener):
def __init__(self):
super(DummyAudioListener, self).__init__()
self.events = []
self.waiters = {}
def on_event(self, event, **kwargs):
self.events.append((event, kwargs))
if event in self.waiters:
self.waiters[event].set()
def wait(self, event):
self.waiters[event] = threading.Event()
return self.waiters[event]
def get_events(self):
return self.events
def clear_events(self):
self.events = []
class AudioEventTest(BaseTest):
def setUp(self): # noqa: N802
super(AudioEventTest, self).setUp()
self.audio.enable_sync_handler().get()
self.listener = DummyAudioListener.start().proxy()
self.original_send_async = listener.send_async
listener.send_async = listener.send
def tearDown(self): # noqa: N802
super(AudioEventTest, self).setUp()
listener.send_async = self.original_send_async
def assertEvent(self, event, **kwargs): # noqa: N802
self.assertIn((event, kwargs), self.listener.get_events().get())
def assertNotEvent(self, event, **kwargs): # noqa: N802
self.assertNotIn((event, kwargs), self.listener.get_events().get())
# TODO: test without uri set, with bad uri and gapless...
# TODO: playing->playing triggered by seek should be removed
# TODO: codify expected state after EOS
# TODO: consider returning a future or a threading event?
def test_state_change_stopped_to_playing_event(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('state_changed', old_state=PlaybackState.STOPPED,
new_state=PlaybackState.PLAYING, target_state=None)
def test_state_change_stopped_to_paused_event(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('state_changed', old_state=PlaybackState.STOPPED,
new_state=PlaybackState.PAUSED, target_state=None)
def test_state_change_paused_to_playing_event(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.listener.clear_events()
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('state_changed', old_state=PlaybackState.PAUSED,
new_state=PlaybackState.PLAYING, target_state=None)
def test_state_change_paused_to_stopped_event(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.listener.clear_events()
self.audio.stop_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('state_changed', old_state=PlaybackState.PAUSED,
new_state=PlaybackState.STOPPED, target_state=None)
def test_state_change_playing_to_paused_event(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.listener.clear_events()
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('state_changed', old_state=PlaybackState.PLAYING,
new_state=PlaybackState.PAUSED, target_state=None)
def test_state_change_playing_to_stopped_event(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.listener.clear_events()
self.audio.stop_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('state_changed', old_state=PlaybackState.PLAYING,
new_state=PlaybackState.STOPPED, target_state=None)
def test_stream_changed_event_on_playing(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.listener.clear_events()
self.audio.start_playback()
# Since we are going from stopped to playing, the state change is
# enough to ensure the stream changed.
self.audio.wait_for_state_change().get()
self.assertEvent('stream_changed', uri=self.uris[0])
def test_stream_changed_event_on_multiple_changes(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.listener.clear_events()
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('stream_changed', uri=self.uris[0])
self.audio.prepare_change()
self.audio.set_uri(self.uris[1])
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('stream_changed', uri=self.uris[1])
def test_stream_changed_event_on_playing_to_paused(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.listener.clear_events()
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('stream_changed', uri=self.uris[0])
self.listener.clear_events()
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
self.assertNotEvent('stream_changed', uri=self.uris[0])
def test_stream_changed_event_on_paused_to_stopped(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.listener.clear_events()
self.audio.stop_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('stream_changed', uri=None)
def test_position_changed_on_pause(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.wait_for_state_change().get()
self.assertEvent('position_changed', position=0)
def test_stream_changed_event_on_paused_to_playing(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.listener.clear_events()
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('stream_changed', uri=self.uris[0])
self.listener.clear_events()
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.assertNotEvent('stream_changed', uri=self.uris[0])
def test_position_changed_on_play(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.wait_for_state_change().get()
self.assertEvent('position_changed', position=0)
def test_position_changed_on_seek_while_stopped(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.set_position(2000)
self.audio.wait_for_state_change().get()
self.assertNotEvent('position_changed', position=0)
def test_position_changed_on_seek_after_play(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.listener.clear_events()
self.audio.set_position(2000)
self.audio.wait_for_state_change().get()
self.assertEvent('position_changed', position=2000)
def test_position_changed_on_seek_after_pause(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.listener.clear_events()
self.audio.set_position(2000)
self.audio.wait_for_state_change().get()
self.assertEvent('position_changed', position=2000)
def test_tags_changed_on_playback(self):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.assertEvent('tags_changed', tags=mock.ANY)
# Unlike the other events, having the state changed done is not
# enough to ensure our event is called. So we setup a threading
# event that we can wait for with a timeout while the track playback
# completes.
def test_stream_changed_event_on_paused(self):
event = self.listener.wait('stream_changed').get()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.pause_playback().get()
self.audio.wait_for_state_change().get()
if not event.wait(timeout=1.0):
self.fail('Stream changed not reached within deadline')
self.assertEvent('stream_changed', uri=self.uris[0])
def test_reached_end_of_stream_event(self):
event = self.listener.wait('reached_end_of_stream').get()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
self.possibly_trigger_fake_about_to_finish()
if not event.wait(timeout=1.0):
self.fail('End of stream not reached within deadline')
self.assertFalse(self.audio.get_current_tags().get())
def test_gapless(self):
uris = self.uris[1:]
event = self.listener.wait('reached_end_of_stream').get()
def callback():
if uris:
self.audio.set_uri(uris.pop()).get()
self.audio.set_about_to_finish_callback(callback).get()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not event.wait(timeout=1.0):
self.fail('EOS not received')
# Check that both uris got played
self.assertEvent('stream_changed', uri=self.uris[0])
self.assertEvent('stream_changed', uri=self.uris[1])
# Check that events counts check out.
keys = [k for k, v in self.listener.get_events().get()]
self.assertEqual(2, keys.count('stream_changed'))
self.assertEqual(2, keys.count('position_changed'))
self.assertEqual(1, keys.count('state_changed'))
self.assertEqual(1, keys.count('reached_end_of_stream'))
# TODO: test tag states within gaples
# TODO: this does not belong in this testcase
def test_current_tags_are_blank_to_begin_with(self):
self.assertFalse(self.audio.get_current_tags().get())
def test_current_tags_blank_after_end_of_stream(self):
event = self.listener.wait('reached_end_of_stream').get()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not event.wait(timeout=1.0):
self.fail('EOS not received')
self.assertFalse(self.audio.get_current_tags().get())
def test_current_tags_stored(self):
event = self.listener.wait('reached_end_of_stream').get()
tags = []
def callback():
tags.append(self.audio.get_current_tags().get())
self.audio.set_about_to_finish_callback(callback).get()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not event.wait(timeout=1.0):
self.fail('EOS not received')
self.assertTrue(tags[0])
# TODO: test that we reset when we expect between songs
class AudioDummyEventTest(DummyMixin, AudioEventTest):
"""Exercise the AudioEventTest against our mock audio classes."""
# TODO: move to mixer tests...
class MixerTest(BaseTest):
@unittest.SkipTest
def test_set_mute(self):
for value in (True, False):
self.assertTrue(self.audio.set_mute(value).get())
self.assertEqual(value, self.audio.get_mute().get())
@unittest.SkipTest
def test_set_state_encapsulation(self):
pass # TODO
@unittest.SkipTest
def test_set_position(self):
pass # TODO
@unittest.SkipTest
def test_invalid_output_raises_error(self):
pass # TODO
class AudioStateTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.audio = audio.Audio(config=None, mixer=None)
def test_state_starts_as_stopped(self):
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
def test_state_does_not_change_when_in_gst_ready_state(self):
self.audio._handler.on_playbin_state_changed(
gst.STATE_NULL, gst.STATE_READY, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
def test_state_changes_from_stopped_to_playing_on_play(self):
self.audio._handler.on_playbin_state_changed(
gst.STATE_NULL, gst.STATE_READY, gst.STATE_PLAYING)
self.audio._handler.on_playbin_state_changed(
gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING)
self.audio._handler.on_playbin_state_changed(
gst.STATE_PAUSED, gst.STATE_PLAYING, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.PLAYING, self.audio.state)
def test_state_changes_from_playing_to_paused_on_pause(self):
self.audio.state = audio.PlaybackState.PLAYING
self.audio._handler.on_playbin_state_changed(
gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.PAUSED, self.audio.state)
def test_state_changes_from_playing_to_stopped_on_stop(self):
self.audio.state = audio.PlaybackState.PLAYING
self.audio._handler.on_playbin_state_changed(
gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_NULL)
self.audio._handler.on_playbin_state_changed(
gst.STATE_PAUSED, gst.STATE_READY, gst.STATE_NULL)
# We never get the following call, so the logic must work without it
# self.audio._handler.on_playbin_state_changed(
# gst.STATE_READY, gst.STATE_NULL, gst.STATE_VOID_PENDING)
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
class AudioBufferingTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.audio = audio.Audio(config=None, mixer=None)
self.audio._playbin = mock.Mock(spec=['set_state'])
def test_pause_when_buffer_empty(self):
playbin = self.audio._playbin
self.audio.start_playback()
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
self.assertTrue(self.audio._buffering)
def test_stay_paused_when_buffering_finished(self):
playbin = self.audio._playbin
self.audio.pause_playback()
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
playbin.set_state.reset_mock()
self.audio._handler.on_buffering(100)
self.assertEqual(playbin.set_state.call_count, 0)
self.assertFalse(self.audio._buffering)
def test_change_to_paused_while_buffering(self):
playbin = self.audio._playbin
self.audio.start_playback()
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
self.audio.pause_playback()
playbin.set_state.reset_mock()
self.audio._handler.on_buffering(100)
self.assertEqual(playbin.set_state.call_count, 0)
self.assertFalse(self.audio._buffering)
def test_change_to_stopped_while_buffering(self):
playbin = self.audio._playbin
self.audio.start_playback()
playbin.set_state.assert_called_with(gst.STATE_PLAYING)
playbin.set_state.reset_mock()
self.audio._handler.on_buffering(0)
playbin.set_state.assert_called_with(gst.STATE_PAUSED)
playbin.set_state.reset_mock()
self.audio.stop_playback()
playbin.set_state.assert_called_with(gst.STATE_NULL)
self.assertFalse(self.audio._buffering)