from __future__ import unicode_literals import threading import unittest import gobject gobject.threads_init() import pygst pygst.require('0.10') import gst # noqa import mock import pykka from mopidy import audio from mopidy.audio import dummy as dummy_audio from mopidy.audio.constants import PlaybackState from mopidy.utils.path import path_to_uri from tests import path_to_data_dir """ We want to make sure both our real audio class and the fake one behave correctly. So each test is first run against the real class, then repeated against our dummy. """ class BaseTest(unittest.TestCase): config = { 'audio': { 'mixer': 'fakemixer track_max_volume=65536', 'mixer_track': None, 'mixer_volume': None, 'output': 'fakesink', 'visualizer': None, } } uris = [path_to_uri(path_to_data_dir('song1.wav')), path_to_uri(path_to_data_dir('song2.wav'))] audio_class = audio.Audio def setUp(self): config = { 'audio': { 'mixer': 'foomixer', 'mixer_volume': None, 'output': 'fakesink', 'visualizer': None, }, 'proxy': { 'hostname': '', }, } self.song_uri = path_to_uri(path_to_data_dir('song1.wav')) self.audio = self.audio_class.start(config=config, mixer=None).proxy() def tearDown(self): pykka.ActorRegistry.stop_all() def possibly_trigger_fake_playback_error(self): pass def possibly_trigger_fake_about_to_finish(self): pass class DummyMixin(object): audio_class = dummy_audio.DummyAudio def possibly_trigger_fake_playback_error(self): self.audio.trigger_fake_playback_failure() def possibly_trigger_fake_about_to_finish(self): callback = self.audio.get_about_to_finish_callback().get() if callback: callback() class AudioTest(BaseTest): def test_start_playback_existing_file(self): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.assertTrue(self.audio.start_playback().get()) def test_start_playback_non_existing_file(self): self.possibly_trigger_fake_playback_error() self.audio.prepare_change() self.audio.set_uri(self.uris[0] + 'bogus') self.assertFalse(self.audio.start_playback().get()) def test_pause_playback_while_playing(self): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.assertTrue(self.audio.pause_playback().get()) def test_stop_playback_while_playing(self): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.assertTrue(self.audio.stop_playback().get()) @unittest.SkipTest def test_deliver_data(self): pass # TODO @unittest.SkipTest def test_end_of_data_stream(self): pass # TODO @unittest.SkipTest def test_set_mute(self): pass # TODO Probably needs a fakemixer with a mixer track @unittest.SkipTest def test_set_state_encapsulation(self): pass # TODO @unittest.SkipTest def test_set_position(self): pass # TODO @unittest.SkipTest def test_invalid_output_raises_error(self): pass # TODO class AudioDummyTest(DummyMixin, AudioTest): pass @mock.patch.object(audio.AudioListener, 'send') class AudioEventTest(BaseTest): def setUp(self): super(AudioEventTest, self).setUp() self.audio.enable_sync_handler().get() # TODO: test without uri set, with bad uri and gapless... # TODO: playing->playing triggered by seek should be removed # TODO: codify expected state after EOS # TODO: consider returning a future or a threading event? def test_state_change_stopped_to_playing_event(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.audio.wait_for_state_change().get() call = mock.call('state_changed', old_state=PlaybackState.STOPPED, new_state=PlaybackState.PLAYING, target_state=None) self.assertIn(call, send_mock.call_args_list) def test_state_change_stopped_to_paused_event(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.pause_playback() self.audio.wait_for_state_change().get() call = mock.call('state_changed', old_state=PlaybackState.STOPPED, new_state=PlaybackState.PAUSED, target_state=None) self.assertIn(call, send_mock.call_args_list) def test_state_change_paused_to_playing_event(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.pause_playback() self.audio.wait_for_state_change() self.audio.start_playback() self.audio.wait_for_state_change().get() call = mock.call('state_changed', old_state=PlaybackState.PAUSED, new_state=PlaybackState.PLAYING, target_state=None) self.assertIn(call, send_mock.call_args_list) def test_state_change_paused_to_stopped_event(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.pause_playback() self.audio.wait_for_state_change() self.audio.stop_playback() self.audio.wait_for_state_change().get() call = mock.call('state_changed', old_state=PlaybackState.PAUSED, new_state=PlaybackState.STOPPED, target_state=None) self.assertIn(call, send_mock.call_args_list) def test_state_change_playing_to_paused_event(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.audio.wait_for_state_change() self.audio.pause_playback() self.audio.wait_for_state_change().get() call = mock.call('state_changed', old_state=PlaybackState.PLAYING, new_state=PlaybackState.PAUSED, target_state=None) self.assertIn(call, send_mock.call_args_list) def test_state_change_playing_to_stopped_event(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.audio.wait_for_state_change() self.audio.stop_playback() self.audio.wait_for_state_change().get() call = mock.call('state_changed', old_state=PlaybackState.PLAYING, new_state=PlaybackState.STOPPED, target_state=None) self.assertIn(call, send_mock.call_args_list) def test_stream_changed_event_on_playing(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() # Since we are going from stopped to playing, the state change is # enough to ensure the stream changed. self.audio.wait_for_state_change().get() call = mock.call('stream_changed', uri=self.uris[0]) self.assertIn(call, send_mock.call_args_list) def test_stream_changed_event_on_paused_to_stopped(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.pause_playback() self.audio.wait_for_state_change() self.audio.stop_playback() self.audio.wait_for_state_change().get() call = mock.call('stream_changed', uri=None) self.assertIn(call, send_mock.call_args_list) def test_position_changed_on_pause(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.pause_playback() self.audio.wait_for_state_change() self.audio.wait_for_state_change().get() call = mock.call('position_changed', position=0) self.assertIn(call, send_mock.call_args_list) def test_position_changed_on_play(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.audio.wait_for_state_change() self.audio.wait_for_state_change().get() call = mock.call('position_changed', position=0) self.assertIn(call, send_mock.call_args_list) def test_position_changed_on_seek(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.set_position(2000) self.audio.wait_for_state_change().get() call = mock.call('position_changed', position=0) self.assertNotIn(call, send_mock.call_args_list) def test_position_changed_on_seek_after_play(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.audio.wait_for_state_change() self.audio.set_position(2000) self.audio.wait_for_state_change().get() call = mock.call('position_changed', position=2000) self.assertIn(call, send_mock.call_args_list) def test_position_changed_on_seek_after_pause(self, send_mock): self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.pause_playback() self.audio.wait_for_state_change() self.audio.set_position(2000) self.audio.wait_for_state_change().get() call = mock.call('position_changed', position=2000) self.assertIn(call, send_mock.call_args_list) # Unlike the other events, having the state changed done is not # enough to ensure our event is called. So we setup a threading # event that we can wait for with a timeout while the track playback # completes. def test_stream_changed_event_on_paused(self, send_mock): event = threading.Event() def send(name, **kwargs): if name == 'stream_changed': event.set() send_mock.side_effect = send self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.pause_playback().get() self.audio.wait_for_state_change().get() if not event.wait(timeout=1.0): self.fail('Stream changed not reached within deadline') def test_reached_end_of_stream_event(self, send_mock): event = threading.Event() def send(name, **kwargs): if name == 'reached_end_of_stream': event.set() send_mock.side_effect = send self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.audio.wait_for_state_change().get() self.possibly_trigger_fake_about_to_finish() if not event.wait(timeout=1.0): self.fail('End of stream not reached within deadline') # Make sure that gapless really works: def test_gapless(self, send_mock): uris = self.uris[1:] events = [] done = threading.Event() def callback(): if uris: self.audio.set_uri(uris.pop()).get() def send(name, **kwargs): events.append((name, kwargs)) if name == 'reached_end_of_stream': done.set() send_mock.side_effect = send self.audio.set_about_to_finish_callback(callback).get() self.audio.prepare_change() self.audio.set_uri(self.uris[0]) self.audio.start_playback() self.possibly_trigger_fake_about_to_finish() self.audio.wait_for_state_change().get() self.possibly_trigger_fake_about_to_finish() if not done.wait(timeout=1.0): self.fail('EOS not received') excepted = [ ('position_changed', {'position': 0}), ('stream_changed', {'uri': self.uris[0]}), ('state_changed', {'old_state': PlaybackState.STOPPED, 'new_state': PlaybackState.PLAYING, 'target_state': None}), ('position_changed', {'position': 0}), ('stream_changed', {'uri': self.uris[1]}), ('reached_end_of_stream', {})] self.assertEqual(excepted, events) class AudioDummyEventTest(DummyMixin, AudioEventTest): pass # TODO: move to mixer tests... class MixerTest(BaseTest): @unittest.SkipTest def test_set_mute(self): for value in (True, False): self.assertTrue(self.audio.set_mute(value).get()) self.assertEqual(value, self.audio.get_mute().get()) @unittest.SkipTest def test_set_state_encapsulation(self): pass # TODO @unittest.SkipTest def test_set_position(self): pass # TODO @unittest.SkipTest def test_invalid_output_raises_error(self): pass # TODO class AudioStateTest(unittest.TestCase): def setUp(self): self.audio = audio.Audio(config=None, mixer=None) def test_state_starts_as_stopped(self): self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) def test_state_does_not_change_when_in_gst_ready_state(self): self.audio._on_playbin_state_changed( gst.STATE_NULL, gst.STATE_READY, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) def test_state_changes_from_stopped_to_playing_on_play(self): self.audio._on_playbin_state_changed( gst.STATE_NULL, gst.STATE_READY, gst.STATE_PLAYING) self.audio._on_playbin_state_changed( gst.STATE_READY, gst.STATE_PAUSED, gst.STATE_PLAYING) self.audio._on_playbin_state_changed( gst.STATE_PAUSED, gst.STATE_PLAYING, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.PLAYING, self.audio.state) def test_state_changes_from_playing_to_paused_on_pause(self): self.audio.state = audio.PlaybackState.PLAYING self.audio._on_playbin_state_changed( gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.PAUSED, self.audio.state) def test_state_changes_from_playing_to_stopped_on_stop(self): self.audio.state = audio.PlaybackState.PLAYING self.audio._on_playbin_state_changed( gst.STATE_PLAYING, gst.STATE_PAUSED, gst.STATE_NULL) self.audio._on_playbin_state_changed( gst.STATE_PAUSED, gst.STATE_READY, gst.STATE_NULL) # We never get the following call, so the logic must work without it # self.audio._on_playbin_state_changed( # gst.STATE_READY, gst.STATE_NULL, gst.STATE_VOID_PENDING) self.assertEqual(audio.PlaybackState.STOPPED, self.audio.state) class AudioBufferingTest(unittest.TestCase): def setUp(self): self.audio = audio.Audio(config=None, mixer=None) self.audio._playbin = mock.Mock(spec=['set_state']) def test_pause_when_buffer_empty(self): playbin = self.audio._playbin self.audio.start_playback() playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() self.audio._on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) self.assertTrue(self.audio._buffering) def test_stay_paused_when_buffering_finished(self): playbin = self.audio._playbin self.audio.pause_playback() playbin.set_state.assert_called_with(gst.STATE_PAUSED) playbin.set_state.reset_mock() self.audio._on_buffering(100) self.assertEqual(playbin.set_state.call_count, 0) self.assertFalse(self.audio._buffering) def test_change_to_paused_while_buffering(self): playbin = self.audio._playbin self.audio.start_playback() playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() self.audio._on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) self.audio.pause_playback() playbin.set_state.reset_mock() self.audio._on_buffering(100) self.assertEqual(playbin.set_state.call_count, 0) self.assertFalse(self.audio._buffering) def test_change_to_stopped_while_buffering(self): playbin = self.audio._playbin self.audio.start_playback() playbin.set_state.assert_called_with(gst.STATE_PLAYING) playbin.set_state.reset_mock() self.audio._on_buffering(0) playbin.set_state.assert_called_with(gst.STATE_PAUSED) playbin.set_state.reset_mock() self.audio.stop_playback() playbin.set_state.assert_called_with(gst.STATE_NULL) self.assertFalse(self.audio._buffering)