audio: Update tests and dummy audio actor to pass

This commit is contained in:
Thomas Adamcik 2014-01-28 23:19:33 +01:00
parent d1b91117b4
commit dd98ad8353
2 changed files with 160 additions and 105 deletions

View File

@ -13,12 +13,14 @@ from .listener import AudioListener
class DummyAudio(pykka.ThreadingActor):
def __init__(self):
def __init__(self, config=None):
super(DummyAudio, self).__init__()
self.state = PlaybackState.STOPPED
self._volume = 0
self._position = 0
self._callback = None
self._uri = None
self._state_change_result = True
def set_uri(self, uri):
assert self._uri is None, 'prepare change not called before set'
@ -55,10 +57,11 @@ class DummyAudio(pykka.ThreadingActor):
return self._change_state(PlaybackState.STOPPED)
def get_volume(self):
return 0
return self._volume
def set_volume(self, volume):
pass
self._volume = volume
return True
def set_metadata(self, track):
pass
@ -66,26 +69,44 @@ class DummyAudio(pykka.ThreadingActor):
def set_about_to_finish_callback(self, callback):
self._callback = callback
def enable_sync_handler(self):
pass
def wait_for_state_change(self):
pass
def _change_state(self, new_state):
if not self._uri:
return False
if self.state == PlaybackState.STOPPED and self._uri:
AudioListener.send('position_changed', position=0)
AudioListener.send('stream_changed', uri=self._uri)
if new_state == PlaybackState.STOPPED:
self._uri = None
AudioListener.send('stream_changed', uri=self._uri)
old_state, self.state = self.state, new_state
AudioListener.send(
'state_changed', old_state=old_state, new_state=new_state)
return True
return self._state_change_result
def trigger_fake_end_of_stream(self):
AudioListener.send('reached_end_of_stream')
def trigger_fake_playback_failure(self):
self._state_change_result = False
def trigger_fake_about_to_finish(self):
if not self._callback:
return
self.prepare_change()
self._callback()
if not self._uri:
self.trigger_fake_end_of_stream()
def get_about_to_finish_callback(self):
# This needs to be called from outside the actor or we lock up.
def wrapper():
if self._callback:
self.prepare_change()
self._callback()
if not self._uri or not self._callback:
AudioListener.send('reached_end_of_stream')
else:
AudioListener.send('position_changed', position=0)
AudioListener.send('stream_changed', uri=self._uri)
return wrapper

View File

@ -14,48 +14,82 @@ gobject.threads_init()
import pykka
from mopidy import audio
from mopidy.audio import dummy as dummy_audio
from mopidy.audio.constants import PlaybackState
from mopidy.utils.path import path_to_uri
from tests import path_to_data_dir
"""
We want to make sure both our real audio class and the fake one behave
correctly. So each test is first run against the real class, then repeated
against our dummy.
"""
class AudioTest(unittest.TestCase):
def setUp(self):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=65536',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
class BaseTest(unittest.TestCase):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=65536',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
self.song_uri = path_to_uri(path_to_data_dir('song1.wav'))
self.audio = audio.Audio.start(config=config).proxy()
}
uris = [path_to_uri(path_to_data_dir('song1.wav')),
path_to_uri(path_to_data_dir('song2.wav'))]
audio_class = audio.Audio
def setUp(self):
self.audio = self.audio_class.start(config=self.config).proxy()
def tearDown(self):
pykka.ActorRegistry.stop_all()
def possibly_trigger_fake_playback_error(self):
pass
def possibly_trigger_fake_about_to_finish(self):
pass
class DummyMixin(object):
audio_class = dummy_audio.DummyAudio
def possibly_trigger_fake_playback_error(self):
self.audio.trigger_fake_playback_failure()
def possibly_trigger_fake_about_to_finish(self):
callback = self.audio.get_about_to_finish_callback().get()
if callback:
callback()
class AudioTest(BaseTest):
def test_start_playback_existing_file(self):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.assertTrue(self.audio.start_playback().get())
def test_start_playback_non_existing_file(self):
self.possibly_trigger_fake_playback_error()
self.audio.prepare_change()
self.audio.set_uri(self.song_uri + 'bogus')
self.audio.set_uri(self.uris[0] + 'bogus')
self.assertFalse(self.audio.start_playback().get())
def test_pause_playback_while_playing(self):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.assertTrue(self.audio.pause_playback().get())
def test_stop_playback_while_playing(self):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.assertTrue(self.audio.stop_playback().get())
@ -67,40 +101,6 @@ class AudioTest(unittest.TestCase):
def test_end_of_data_stream(self):
pass # TODO
def test_set_volume(self):
for value in range(0, 101):
self.assertTrue(self.audio.set_volume(value).get())
self.assertEqual(value, self.audio.get_volume().get())
def test_set_volume_with_mixer_max_below_100(self):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=40',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
}
self.audio = audio.Audio.start(config=config).proxy()
for value in range(0, 101):
self.assertTrue(self.audio.set_volume(value).get())
self.assertEqual(value, self.audio.get_volume().get())
def test_set_volume_with_mixer_min_equal_max(self):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=0',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
}
self.audio = audio.Audio.start(config=config).proxy()
self.assertEqual(0, self.audio.get_volume().get())
@unittest.SkipTest
def test_set_mute(self):
pass # TODO Probably needs a fakemixer with a mixer track
@ -118,31 +118,22 @@ class AudioTest(unittest.TestCase):
pass # TODO
@mock.patch.object(audio.AudioListener, 'send')
class AudioEventTest(unittest.TestCase):
def setUp(self):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=65536',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
}
self.song_uri = path_to_uri(path_to_data_dir('song1.wav'))
self.audio = audio.Audio.start(config=config).proxy()
self.audio.enable_sync_handler().get()
class AudioDummyTest(DummyMixin, AudioTest):
pass
def tearDown(self):
pykka.ActorRegistry.stop_all()
@mock.patch.object(audio.AudioListener, 'send')
class AudioEventTest(BaseTest):
def setUp(self):
super(AudioEventTest, self).setUp()
self.audio.enable_sync_handler().get()
# TODO: test wihtout uri set, with bad uri and gapless...
# TODO: playing->playing triggered by seek should be removed
def test_state_change_stopped_to_playing_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
@ -152,7 +143,7 @@ class AudioEventTest(unittest.TestCase):
def test_state_change_stopped_to_paused_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change().get()
@ -162,7 +153,7 @@ class AudioEventTest(unittest.TestCase):
def test_state_change_paused_to_playing_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.start_playback()
@ -174,7 +165,7 @@ class AudioEventTest(unittest.TestCase):
def test_state_change_paused_to_stopped_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.stop_playback()
@ -186,7 +177,7 @@ class AudioEventTest(unittest.TestCase):
def test_state_change_playing_to_paused_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.pause_playback()
@ -198,7 +189,7 @@ class AudioEventTest(unittest.TestCase):
def test_state_change_playing_to_stopped_event(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.stop_playback()
@ -210,19 +201,19 @@ class AudioEventTest(unittest.TestCase):
def test_stream_changed_event_on_playing(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
# Since we are going from stopped to playing, the state change is
# enough to ensure the stream changed.
self.audio.wait_for_state_change().get()
call = mock.call('stream_changed', uri=self.song_uri)
call = mock.call('stream_changed', uri=self.uris[0])
self.assertIn(call, send_mock.call_args_list)
def test_stream_changed_event_on_paused_to_stopped(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.stop_playback()
@ -234,7 +225,7 @@ class AudioEventTest(unittest.TestCase):
def test_position_changed_on_pause(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
@ -245,7 +236,7 @@ class AudioEventTest(unittest.TestCase):
def test_position_changed_on_play(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
@ -256,7 +247,7 @@ class AudioEventTest(unittest.TestCase):
def test_position_changed_on_seek(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.set_position(2000)
self.audio.wait_for_state_change().get()
@ -266,7 +257,7 @@ class AudioEventTest(unittest.TestCase):
def test_position_changed_on_seek_after_play(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change()
self.audio.set_position(2000)
@ -278,7 +269,7 @@ class AudioEventTest(unittest.TestCase):
def test_position_changed_on_seek_after_pause(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.pause_playback()
self.audio.wait_for_state_change()
self.audio.set_position(2000)
@ -302,11 +293,11 @@ class AudioEventTest(unittest.TestCase):
send_mock.side_effect = send
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.pause_playback().get()
self.audio.wait_for_state_change().get()
if not event.wait(timeout=5.0):
if not event.wait(timeout=1.0):
self.fail('Stream changed not reached within deadline')
def test_reached_end_of_stream_event(self, send_mock):
@ -318,19 +309,18 @@ class AudioEventTest(unittest.TestCase):
send_mock.side_effect = send
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
if not event.wait(timeout=5.0):
self.possibly_trigger_fake_about_to_finish()
if not event.wait(timeout=1.0):
self.fail('End of stream not reached within deadline')
# Make sure that gapless really works:
def test_gapless(self, send_mock):
song2_uri = path_to_uri(path_to_data_dir('song2.wav'))
uris = [song2_uri]
uris = self.uris[1:]
events = []
done = threading.Event()
@ -347,24 +337,68 @@ class AudioEventTest(unittest.TestCase):
self.audio.set_about_to_finish_callback(callback).get()
self.audio.prepare_change()
self.audio.set_uri(self.song_uri)
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not done.wait(timeout=5.0):
self.possibly_trigger_fake_about_to_finish()
if not done.wait(timeout=1.0):
self.fail('EOS not received')
excepted = [
('position_changed', {'position': 0}),
('stream_changed', {'uri': self.song_uri}),
('stream_changed', {'uri': self.uris[0]}),
('state_changed', {'old_state': PlaybackState.STOPPED,
'new_state': PlaybackState.PLAYING}),
('position_changed', {'position': 0}),
('stream_changed', {'uri': song2_uri}),
('stream_changed', {'uri': self.uris[1]}),
('reached_end_of_stream', {})]
self.assertEqual(excepted, events)
class AudioDummyEventTest(DummyMixin, AudioEventTest):
pass
# TODO: this is really a mixer scaling test, has nothing to do with audio
class MixerTest(BaseTest):
def test_set_volume(self):
for value in range(0, 101):
self.assertTrue(self.audio.set_volume(value).get())
self.assertEqual(value, self.audio.get_volume().get())
def test_set_volume_with_mixer_max_below_100(self):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=40',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
}
self.audio = self.audio_class.start(config=config).proxy()
for value in range(0, 101):
self.assertTrue(self.audio.set_volume(value).get())
self.assertEqual(value, self.audio.get_volume().get())
def test_set_volume_with_mixer_min_equal_max(self):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=0',
'mixer_track': None,
'mixer_volume': None,
'output': 'fakesink',
'visualizer': None,
}
}
self.audio = self.audio_class.start(config=config).proxy()
self.assertEqual(0, self.audio.get_volume().get())
class AudioStateTest(unittest.TestCase):
def setUp(self):
self.audio = audio.Audio(config=None)