607 lines
20 KiB
Python
607 lines
20 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
|
|
import threading
|
|
import unittest
|
|
|
|
import mock
|
|
|
|
import pykka
|
|
|
|
from mopidy import audio
|
|
from mopidy.audio.constants import PlaybackState
|
|
from mopidy.internal import path
|
|
from mopidy.internal.gi import Gst
|
|
|
|
from tests import dummy_audio, path_to_data_dir
|
|
|
|
# We want to make sure both our real audio class and the fake one behave
|
|
# correctly. So each test is first run against the real class, then repeated
|
|
# against our dummy.
|
|
|
|
|
|
class BaseTest(unittest.TestCase):
|
|
config = {
|
|
'audio': {
|
|
'mixer': 'fakemixer track_max_volume=65536',
|
|
'mixer_track': None,
|
|
'mixer_volume': None,
|
|
'output': 'testoutput',
|
|
'visualizer': None,
|
|
}
|
|
}
|
|
|
|
uris = [path.path_to_uri(path_to_data_dir('song1.wav')),
|
|
path.path_to_uri(path_to_data_dir('song2.wav'))]
|
|
|
|
audio_class = audio.Audio
|
|
|
|
def setUp(self): # noqa: N802
|
|
config = {
|
|
'audio': {
|
|
'mixer': 'foomixer',
|
|
'mixer_volume': None,
|
|
'output': 'testoutput',
|
|
'visualizer': None,
|
|
},
|
|
'proxy': {
|
|
'hostname': '',
|
|
},
|
|
}
|
|
self.song_uri = path.path_to_uri(path_to_data_dir('song1.wav'))
|
|
self.audio = self.audio_class.start(config=config, mixer=None).proxy()
|
|
|
|
def tearDown(self): # noqa
|
|
pykka.ActorRegistry.stop_all()
|
|
|
|
def possibly_trigger_fake_playback_error(self, uri):
|
|
pass
|
|
|
|
def possibly_trigger_fake_about_to_finish(self):
|
|
pass
|
|
|
|
|
|
class DummyMixin(object):
|
|
audio_class = dummy_audio.DummyAudio
|
|
|
|
def possibly_trigger_fake_playback_error(self, uri):
|
|
self.audio.trigger_fake_playback_failure(uri)
|
|
|
|
def possibly_trigger_fake_about_to_finish(self):
|
|
callback = self.audio.get_about_to_finish_callback().get()
|
|
if callback:
|
|
callback()
|
|
|
|
|
|
class AudioTest(BaseTest):
|
|
|
|
def test_start_playback_existing_file(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.assertTrue(self.audio.start_playback().get())
|
|
|
|
def test_start_playback_non_existing_file(self):
|
|
self.possibly_trigger_fake_playback_error(self.uris[0] + 'bogus')
|
|
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0] + 'bogus')
|
|
self.assertFalse(self.audio.start_playback().get())
|
|
|
|
def test_pause_playback_while_playing(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
self.assertTrue(self.audio.pause_playback().get())
|
|
|
|
def test_stop_playback_while_playing(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
self.assertTrue(self.audio.stop_playback().get())
|
|
|
|
@unittest.SkipTest
|
|
def test_deliver_data(self):
|
|
pass # TODO
|
|
|
|
@unittest.SkipTest
|
|
def test_end_of_data_stream(self):
|
|
pass # TODO
|
|
|
|
@unittest.SkipTest
|
|
def test_set_mute(self):
|
|
pass # TODO Probably needs a fakemixer with a mixer track
|
|
|
|
@unittest.SkipTest
|
|
def test_set_state_encapsulation(self):
|
|
pass # TODO
|
|
|
|
@unittest.SkipTest
|
|
def test_set_position(self):
|
|
pass # TODO
|
|
|
|
@unittest.SkipTest
|
|
def test_invalid_output_raises_error(self):
|
|
pass # TODO
|
|
|
|
|
|
class AudioDummyTest(DummyMixin, AudioTest):
|
|
pass
|
|
|
|
|
|
class DummyAudioListener(pykka.ThreadingActor, audio.AudioListener):
|
|
def __init__(self):
|
|
super(DummyAudioListener, self).__init__()
|
|
self.events = []
|
|
self.waiters = {}
|
|
|
|
def on_event(self, event, **kwargs):
|
|
self.events.append((event, kwargs))
|
|
if event in self.waiters:
|
|
self.waiters[event].set()
|
|
|
|
def wait(self, event):
|
|
self.waiters[event] = threading.Event()
|
|
return self.waiters[event]
|
|
|
|
def get_events(self):
|
|
return self.events
|
|
|
|
def clear_events(self):
|
|
self.events = []
|
|
|
|
|
|
class AudioEventTest(BaseTest):
|
|
|
|
def setUp(self): # noqa: N802
|
|
super(AudioEventTest, self).setUp()
|
|
self.audio.enable_sync_handler().get()
|
|
self.listener = DummyAudioListener.start().proxy()
|
|
|
|
def tearDown(self): # noqa: N802
|
|
super(AudioEventTest, self).tearDown()
|
|
|
|
def assertEvent(self, event, **kwargs): # noqa: N802
|
|
self.assertIn((event, kwargs), self.listener.get_events().get())
|
|
|
|
def assertNotEvent(self, event, **kwargs): # noqa: N802
|
|
self.assertNotIn((event, kwargs), self.listener.get_events().get())
|
|
|
|
# TODO: test without uri set, with bad uri and gapless...
|
|
# TODO: playing->playing triggered by seek should be removed
|
|
# TODO: codify expected state after EOS
|
|
# TODO: consider returning a future or a threading event?
|
|
|
|
def test_state_change_stopped_to_playing_event(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('state_changed', old_state=PlaybackState.STOPPED,
|
|
new_state=PlaybackState.PLAYING, target_state=None)
|
|
|
|
def test_state_change_stopped_to_paused_event(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('state_changed', old_state=PlaybackState.STOPPED,
|
|
new_state=PlaybackState.PAUSED, target_state=None)
|
|
|
|
def test_state_change_paused_to_playing_event(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change()
|
|
self.listener.clear_events()
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('state_changed', old_state=PlaybackState.PAUSED,
|
|
new_state=PlaybackState.PLAYING, target_state=None)
|
|
|
|
def test_state_change_paused_to_stopped_event(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change()
|
|
self.listener.clear_events()
|
|
self.audio.stop_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('state_changed', old_state=PlaybackState.PAUSED,
|
|
new_state=PlaybackState.STOPPED, target_state=None)
|
|
|
|
def test_state_change_playing_to_paused_event(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change()
|
|
self.listener.clear_events()
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('state_changed', old_state=PlaybackState.PLAYING,
|
|
new_state=PlaybackState.PAUSED, target_state=None)
|
|
|
|
def test_state_change_playing_to_stopped_event(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change()
|
|
self.listener.clear_events()
|
|
self.audio.stop_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('state_changed', old_state=PlaybackState.PLAYING,
|
|
new_state=PlaybackState.STOPPED, target_state=None)
|
|
|
|
def test_stream_changed_event_on_playing(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.listener.clear_events()
|
|
self.audio.start_playback()
|
|
|
|
# Since we are going from stopped to playing, the state change is
|
|
# enough to ensure the stream changed.
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('stream_changed', uri=self.uris[0])
|
|
|
|
def test_stream_changed_event_on_multiple_changes(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.listener.clear_events()
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('stream_changed', uri=self.uris[0])
|
|
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[1])
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('stream_changed', uri=self.uris[1])
|
|
|
|
def test_stream_changed_event_on_playing_to_paused(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.listener.clear_events()
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('stream_changed', uri=self.uris[0])
|
|
|
|
self.listener.clear_events()
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertNotEvent('stream_changed', uri=self.uris[0])
|
|
|
|
def test_stream_changed_event_on_paused_to_stopped(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change()
|
|
self.listener.clear_events()
|
|
self.audio.stop_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('stream_changed', uri=None)
|
|
|
|
def test_position_changed_on_pause(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.pause_playback()
|
|
self.audio.wait_for_state_change()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('position_changed', position=0)
|
|
|
|
def test_stream_changed_event_on_paused_to_playing(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.listener.clear_events()
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('stream_changed', uri=self.uris[0])
|
|
|
|
self.listener.clear_events()
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertNotEvent('stream_changed', uri=self.uris[0])
|
|
|
|
def test_position_changed_on_play(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
self.audio.wait_for_state_change()
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('position_changed', position=0)
|
|
|
|
def test_position_changed_on_seek_while_stopped(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.set_position(2000)
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertNotEvent('position_changed', position=0)
|
|
|
|
def test_position_changed_on_seek_after_play(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
|
|
self.audio.wait_for_state_change()
|
|
self.listener.clear_events()
|
|
self.audio.set_position(2000)
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('position_changed', position=2000)
|
|
|
|
def test_position_changed_on_seek_after_pause(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.pause_playback()
|
|
|
|
self.audio.wait_for_state_change()
|
|
self.listener.clear_events()
|
|
self.audio.set_position(2000)
|
|
|
|
self.audio.wait_for_state_change().get()
|
|
self.assertEvent('position_changed', position=2000)
|
|
|
|
def test_tags_changed_on_playback(self):
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
self.audio.wait_for_state_change().get()
|
|
|
|
self.assertEvent('tags_changed', tags=mock.ANY)
|
|
|
|
# Unlike the other events, having the state changed done is not
|
|
# enough to ensure our event is called. So we setup a threading
|
|
# event that we can wait for with a timeout while the track playback
|
|
# completes.
|
|
|
|
def test_stream_changed_event_on_paused(self):
|
|
event = self.listener.wait('stream_changed').get()
|
|
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.pause_playback().get()
|
|
self.audio.wait_for_state_change().get()
|
|
|
|
if not event.wait(timeout=1.0):
|
|
self.fail('Stream changed not reached within deadline')
|
|
|
|
self.assertEvent('stream_changed', uri=self.uris[0])
|
|
|
|
def test_reached_end_of_stream_event(self):
|
|
event = self.listener.wait('reached_end_of_stream').get()
|
|
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
self.audio.wait_for_state_change().get()
|
|
|
|
self.possibly_trigger_fake_about_to_finish()
|
|
if not event.wait(timeout=1.0):
|
|
self.fail('End of stream not reached within deadline')
|
|
|
|
self.assertFalse(self.audio.get_current_tags().get())
|
|
|
|
def test_gapless(self):
|
|
uris = self.uris[1:]
|
|
event = self.listener.wait('reached_end_of_stream').get()
|
|
|
|
def callback():
|
|
if uris:
|
|
self.audio.set_uri(uris.pop()).get()
|
|
|
|
self.audio.set_about_to_finish_callback(callback).get()
|
|
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
|
|
self.possibly_trigger_fake_about_to_finish()
|
|
self.audio.wait_for_state_change().get()
|
|
|
|
self.possibly_trigger_fake_about_to_finish()
|
|
self.audio.wait_for_state_change().get()
|
|
if not event.wait(timeout=1.0):
|
|
self.fail('EOS not received')
|
|
|
|
# Check that both uris got played
|
|
self.assertEvent('stream_changed', uri=self.uris[0])
|
|
self.assertEvent('stream_changed', uri=self.uris[1])
|
|
|
|
# Check that events counts check out.
|
|
keys = [k for k, v in self.listener.get_events().get()]
|
|
self.assertEqual(2, keys.count('stream_changed'))
|
|
self.assertEqual(2, keys.count('position_changed'))
|
|
self.assertEqual(1, keys.count('state_changed'))
|
|
self.assertEqual(1, keys.count('reached_end_of_stream'))
|
|
|
|
# TODO: test tag states within gaples
|
|
|
|
# TODO: this does not belong in this testcase
|
|
def test_current_tags_are_blank_to_begin_with(self):
|
|
self.assertFalse(self.audio.get_current_tags().get())
|
|
|
|
def test_current_tags_blank_after_end_of_stream(self):
|
|
event = self.listener.wait('reached_end_of_stream').get()
|
|
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
|
|
self.possibly_trigger_fake_about_to_finish()
|
|
self.audio.wait_for_state_change().get()
|
|
|
|
if not event.wait(timeout=1.0):
|
|
self.fail('EOS not received')
|
|
|
|
self.assertFalse(self.audio.get_current_tags().get())
|
|
|
|
def test_current_tags_stored(self):
|
|
event = self.listener.wait('reached_end_of_stream').get()
|
|
tags = []
|
|
|
|
def callback():
|
|
tags.append(self.audio.get_current_tags().get())
|
|
|
|
self.audio.set_about_to_finish_callback(callback).get()
|
|
|
|
self.audio.prepare_change()
|
|
self.audio.set_uri(self.uris[0])
|
|
self.audio.start_playback()
|
|
|
|
self.possibly_trigger_fake_about_to_finish()
|
|
self.audio.wait_for_state_change().get()
|
|
|
|
if not event.wait(timeout=1.0):
|
|
self.fail('EOS not received')
|
|
|
|
self.assertTrue(tags[0])
|
|
|
|
# TODO: test that we reset when we expect between songs
|
|
|
|
|
|
class AudioDummyEventTest(DummyMixin, AudioEventTest):
|
|
|
|
"""Exercise the AudioEventTest against our mock audio classes."""
|
|
|
|
|
|
# TODO: move to mixer tests...
|
|
class MixerTest(BaseTest):
|
|
|
|
@unittest.SkipTest
|
|
def test_set_mute(self):
|
|
for value in (True, False):
|
|
self.assertTrue(self.audio.set_mute(value).get())
|
|
self.assertEqual(value, self.audio.get_mute().get())
|
|
|
|
@unittest.SkipTest
|
|
def test_set_state_encapsulation(self):
|
|
pass # TODO
|
|
|
|
@unittest.SkipTest
|
|
def test_set_position(self):
|
|
pass # TODO
|
|
|
|
@unittest.SkipTest
|
|
def test_invalid_output_raises_error(self):
|
|
pass # TODO
|
|
|
|
|
|
class AudioStateTest(unittest.TestCase):
|
|
|
|
def setUp(self): # noqa: N802
|
|
self.audio = audio.Audio(config=None, mixer=None)
|
|
|
|
def test_state_starts_as_stopped(self):
|
|
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
|
|
|
|
def test_state_does_not_change_when_in_gst_ready_state(self):
|
|
self.audio._handler.on_playbin_state_changed(
|
|
Gst.State.NULL, Gst.State.READY, Gst.State.VOID_PENDING)
|
|
|
|
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
|
|
|
|
def test_state_changes_from_stopped_to_playing_on_play(self):
|
|
self.audio._handler.on_playbin_state_changed(
|
|
Gst.State.NULL, Gst.State.READY, Gst.State.PLAYING)
|
|
self.audio._handler.on_playbin_state_changed(
|
|
Gst.State.READY, Gst.State.PAUSED, Gst.State.PLAYING)
|
|
self.audio._handler.on_playbin_state_changed(
|
|
Gst.State.PAUSED, Gst.State.PLAYING, Gst.State.VOID_PENDING)
|
|
|
|
self.assertEqual(audio.PlaybackState.PLAYING, self.audio.state)
|
|
|
|
def test_state_changes_from_playing_to_paused_on_pause(self):
|
|
self.audio.state = audio.PlaybackState.PLAYING
|
|
|
|
self.audio._handler.on_playbin_state_changed(
|
|
Gst.State.PLAYING, Gst.State.PAUSED, Gst.State.VOID_PENDING)
|
|
|
|
self.assertEqual(audio.PlaybackState.PAUSED, self.audio.state)
|
|
|
|
def test_state_changes_from_playing_to_stopped_on_stop(self):
|
|
self.audio.state = audio.PlaybackState.PLAYING
|
|
|
|
self.audio._handler.on_playbin_state_changed(
|
|
Gst.State.PLAYING, Gst.State.PAUSED, Gst.State.NULL)
|
|
self.audio._handler.on_playbin_state_changed(
|
|
Gst.State.PAUSED, Gst.State.READY, Gst.State.NULL)
|
|
# We never get the following call, so the logic must work without it
|
|
# self.audio._handler.on_playbin_state_changed(
|
|
# Gst.State.READY, Gst.State.NULL, Gst.State.VOID_PENDING)
|
|
|
|
self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state)
|
|
|
|
|
|
class AudioBufferingTest(unittest.TestCase):
|
|
|
|
def setUp(self): # noqa: N802
|
|
self.audio = audio.Audio(config=None, mixer=None)
|
|
self.audio._playbin = mock.Mock(spec=['set_state'])
|
|
|
|
def test_pause_when_buffer_empty(self):
|
|
playbin = self.audio._playbin
|
|
self.audio.start_playback()
|
|
playbin.set_state.assert_called_with(Gst.State.PLAYING)
|
|
playbin.set_state.reset_mock()
|
|
|
|
self.audio._handler.on_buffering(0)
|
|
playbin.set_state.assert_called_with(Gst.State.PAUSED)
|
|
self.assertTrue(self.audio._buffering)
|
|
|
|
def test_stay_paused_when_buffering_finished(self):
|
|
playbin = self.audio._playbin
|
|
self.audio.pause_playback()
|
|
playbin.set_state.assert_called_with(Gst.State.PAUSED)
|
|
playbin.set_state.reset_mock()
|
|
|
|
self.audio._handler.on_buffering(100)
|
|
self.assertEqual(playbin.set_state.call_count, 0)
|
|
self.assertFalse(self.audio._buffering)
|
|
|
|
def test_change_to_paused_while_buffering(self):
|
|
playbin = self.audio._playbin
|
|
self.audio.start_playback()
|
|
playbin.set_state.assert_called_with(Gst.State.PLAYING)
|
|
playbin.set_state.reset_mock()
|
|
|
|
self.audio._handler.on_buffering(0)
|
|
playbin.set_state.assert_called_with(Gst.State.PAUSED)
|
|
self.audio.pause_playback()
|
|
playbin.set_state.reset_mock()
|
|
|
|
self.audio._handler.on_buffering(100)
|
|
self.assertEqual(playbin.set_state.call_count, 0)
|
|
self.assertFalse(self.audio._buffering)
|
|
|
|
def test_change_to_stopped_while_buffering(self):
|
|
playbin = self.audio._playbin
|
|
self.audio.start_playback()
|
|
playbin.set_state.assert_called_with(Gst.State.PLAYING)
|
|
playbin.set_state.reset_mock()
|
|
|
|
self.audio._handler.on_buffering(0)
|
|
playbin.set_state.assert_called_with(Gst.State.PAUSED)
|
|
playbin.set_state.reset_mock()
|
|
|
|
self.audio.stop_playback()
|
|
playbin.set_state.assert_called_with(Gst.State.NULL)
|
|
self.assertFalse(self.audio._buffering)
|