diff --git a/tests/local/test_playback.py b/tests/local/test_playback.py index b99f8508..617044ba 100644 --- a/tests/local/test_playback.py +++ b/tests/local/test_playback.py @@ -42,8 +42,12 @@ class LocalPlaybackProviderTest(unittest.TestCase): track = Track(uri=uri, length=4464) self.tracklist.add([track]) - def trigger_end_of_track(self): - self.playback._on_end_of_track() + def trigger_about_to_finish(self): + # Flush any queued core calls. + self.playback.get_current_tl_track().get() + + callback = self.audio.get_about_to_finish_callback().get() + callback() def run(self, result=None): with deprecation.ignore('core.tracklist.add:tracks_arg'): @@ -53,7 +57,9 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.audio = dummy_audio.create_proxy() self.backend = actor.LocalBackend.start( config=self.config, audio=self.audio).proxy() - self.core = core.Core(self.config, backends=[self.backend]) + self.core = core.Core.start(audio=self.audio, + backends=[self.backend], + config=self.config).proxy() self.playback = self.core.playback self.tracklist = self.core.tracklist @@ -65,24 +71,58 @@ class LocalPlaybackProviderTest(unittest.TestCase): def tearDown(self): # noqa: N802 pykka.ActorRegistry.stop_all() + def assert_state_is(self, state): + self.assertEqual(self.playback.get_state().get(), state) + + def assert_current_track_is(self, track): + self.assertEqual(self.playback.get_current_track().get(), track) + + def assert_current_track_is_not(self, track): + self.assertNotEqual(self.playback.get_current_track().get(), track) + + def assert_current_track_index_is(self, index): + tl_track = self.playback.get_current_tl_track().get() + self.assertEqual(self.tracklist.index(tl_track).get(), index) + + def assert_next_tl_track_is(self, tl_track): + current = self.playback.get_current_tl_track().get() + self.assertEqual(self.tracklist.next_track(current).get(), tl_track) + + def assert_next_tl_track_is_not(self, tl_track): + current = self.playback.get_current_tl_track().get() + self.assertNotEqual(self.tracklist.next_track(current).get(), tl_track) + + def assert_previous_tl_track_is(self, tl_track): + current = self.playback.get_current_tl_track().get() + previous = self.tracklist.previous_track(current).get() + self.assertEqual(previous, tl_track) + + def assert_eot_tl_track_is(self, tl_track): + current = self.playback.get_current_tl_track().get() + self.assertEqual(self.tracklist.eot_track(current).get(), tl_track) + + def assert_eot_tl_track_is_not(self, tl_track): + current = self.playback.get_current_tl_track().get() + self.assertNotEqual(self.tracklist.eot_track(current).get(), tl_track) + def test_uri_scheme(self): - self.assertNotIn('file', self.core.uri_schemes) - self.assertIn('local', self.core.uri_schemes) + self.assertNotIn('file', self.core.uri_schemes.get()) + self.assertIn('local', self.core.uri_schemes.get()) def test_play_mp3(self): self.add_track('local:track:blank.mp3') self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) def test_play_ogg(self): self.add_track('local:track:blank.ogg') self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) def test_play_flac(self): self.add_track('local:track:blank.flac') self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) def test_play_uri_with_non_ascii_bytes(self): # Regression test: If trying to do .split(u':') on a bytestring, the @@ -90,76 +130,75 @@ class LocalPlaybackProviderTest(unittest.TestCase): # non-ASCII strings, like the bytestring the following URI decodes to. self.add_track('local:track:12%20Doin%E2%80%99%20It%20Right.flac') self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) def test_initial_state_is_stopped(self): - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) def test_play_with_empty_playlist(self): - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) def test_play_with_empty_playlist_return_value(self): - self.assertEqual(self.playback.play(), None) + self.assertEqual(self.playback.play().get(), None) @populate_tracklist def test_play_state(self): - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) @populate_tracklist def test_play_return_value(self): - self.assertEqual(self.playback.play(), None) + self.assertEqual(self.playback.play().get(), None) @populate_tracklist def test_play_track_state(self): - self.assertEqual(self.playback.state, PlaybackState.STOPPED) - self.playback.play(self.tracklist.tl_tracks[-1]) - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.STOPPED) + self.playback.play(self.tl_tracks.get()[-1]) + self.assert_state_is(PlaybackState.PLAYING) @populate_tracklist def test_play_track_return_value(self): - self.assertEqual(self.playback.play( - self.tracklist.tl_tracks[-1]), None) + self.assertIsNone(self.playback.play(self.tl_tracks.get()[-1]).get()) @populate_tracklist def test_play_when_playing(self): self.playback.play() - track = self.playback.current_track + track = self.playback.get_current_track().get() self.playback.play() - self.assertEqual(track, self.playback.current_track) + self.assert_current_track_is(track) @populate_tracklist def test_play_when_paused(self): self.playback.play() - track = self.playback.current_track + track = self.playback.get_current_track().get() self.playback.pause() self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) - self.assertEqual(track, self.playback.current_track) + self.assert_state_is(PlaybackState.PLAYING) + self.assert_current_track_is(track) @populate_tracklist def test_play_when_pause_after_next(self): self.playback.play() self.playback.next() self.playback.next() - track = self.playback.current_track + track = self.playback.get_current_track().get() self.playback.pause() self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) - self.assertEqual(track, self.playback.current_track) + self.assert_state_is(PlaybackState.PLAYING) + self.assert_current_track_is(track) @populate_tracklist def test_play_sets_current_track(self): self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is(self.tracks[0]) @populate_tracklist def test_play_track_sets_current_track(self): - self.playback.play(self.tracklist.tl_tracks[-1]) - self.assertEqual(self.playback.current_track, self.tracks[-1]) + self.playback.play(self.tl_tracks.get()[-1]) + self.assert_current_track_is(self.tracks[-1]) @populate_tracklist def test_play_skips_to_next_track_on_failure(self): @@ -167,27 +206,28 @@ class LocalPlaybackProviderTest(unittest.TestCase): return_values = [True, False] self.backend.playback.play = lambda: return_values.pop() self.playback.play() - self.assertNotEqual(self.playback.current_track, self.tracks[0]) - self.assertEqual(self.playback.current_track, self.tracks[1]) + self.assert_current_track_is_not(self.tracks[0]) + self.assert_current_track_is(self.tracks[1]) @populate_tracklist def test_current_track_after_completed_playlist(self): - self.playback.play(self.tracklist.tl_tracks[-1]) - self.trigger_end_of_track() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) - self.assertEqual(self.playback.current_track, None) + self.playback.play(self.tl_tracks.get()[-1]) + self.trigger_about_to_finish() + # EOS should have triggered + self.assert_state_is(PlaybackState.STOPPED) + self.assert_current_track_is(None) - self.playback.play(self.tracklist.tl_tracks[-1]) + self.playback.play(self.tl_tracks.get()[-1]) self.playback.next() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) - self.assertEqual(self.playback.current_track, None) + self.assert_state_is(PlaybackState.STOPPED) + self.assert_current_track_is(None) @populate_tracklist def test_previous(self): self.playback.play() self.playback.next() self.playback.previous() - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is(self.tracks[0]) @populate_tracklist def test_previous_more(self): @@ -195,13 +235,13 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.next() # At track 1 self.playback.next() # At track 2 self.playback.previous() # At track 1 - self.assertEqual(self.playback.current_track, self.tracks[1]) + self.assert_current_track_is(self.tracks[1]) @populate_tracklist def test_previous_return_value(self): self.playback.play() self.playback.next() - self.assertEqual(self.playback.previous(), None) + self.assertIsNone(self.playback.previous().get()) @populate_tracklist def test_previous_does_not_trigger_playback(self): @@ -209,68 +249,64 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.next() self.playback.stop() self.playback.previous() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_previous_at_start_of_playlist(self): self.playback.previous() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) - self.assertEqual(self.playback.current_track, None) + self.assert_state_is(PlaybackState.STOPPED) + self.assert_current_track_is(None) def test_previous_for_empty_playlist(self): self.playback.previous() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) - self.assertEqual(self.playback.current_track, None) + self.assert_state_is(PlaybackState.STOPPED) + self.assert_current_track_is(None) @populate_tracklist def test_previous_skips_to_previous_track_on_failure(self): # If backend's play() returns False, it is a failure. return_values = [True, False, True] self.backend.playback.play = lambda: return_values.pop() - self.playback.play(self.tracklist.tl_tracks[2]) - self.assertEqual(self.playback.current_track, self.tracks[2]) + self.playback.play(self.tl_tracks.get()[2]) + self.assert_current_track_is(self.tracks[2]) self.playback.previous() - self.assertNotEqual(self.playback.current_track, self.tracks[1]) - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is_not(self.tracks[1]) + self.assert_current_track_is(self.tracks[0]) @populate_tracklist def test_next(self): self.playback.play() - tl_track = self.playback.current_tl_track - old_position = self.tracklist.index(tl_track) - old_uri = tl_track.track.uri + old_track = self.playback.get_current_track().get() + old_position = self.tracklist.index().get() self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.index(tl_track), old_position + 1) - self.assertNotEqual(self.playback.current_track.uri, old_uri) + self.assertEqual(self.tracklist.index().get(), old_position + 1) + self.assert_current_track_is_not(old_track) @populate_tracklist def test_next_return_value(self): self.playback.play() - self.assertEqual(self.playback.next(), None) + self.assertEqual(self.playback.next().get(), None) @populate_tracklist def test_next_does_not_trigger_playback(self): self.playback.next() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_next_at_end_of_playlist(self): self.playback.play() for i, track in enumerate(self.tracks): - self.assertEqual(self.playback.state, PlaybackState.PLAYING) - self.assertEqual(self.playback.current_track, track) - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.index(tl_track), i) + self.assert_state_is(PlaybackState.PLAYING) + self.assert_current_track_is(track) + self.assertEqual(self.tracklist.index().get(), i) self.playback.next() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_next_until_end_of_playlist_and_play_from_start(self): @@ -279,16 +315,16 @@ class LocalPlaybackProviderTest(unittest.TestCase): for _ in self.tracks: self.playback.next() - self.assertEqual(self.playback.current_track, None) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_current_track_is(None) + self.assert_state_is(PlaybackState.STOPPED) self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_state_is(PlaybackState.PLAYING) + self.assert_current_track_is(self.tracks[0]) def test_next_for_empty_playlist(self): self.playback.next() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_next_skips_to_next_track_on_failure(self): @@ -296,44 +332,36 @@ class LocalPlaybackProviderTest(unittest.TestCase): return_values = [True, False, True] self.backend.playback.play = lambda: return_values.pop() self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is(self.tracks[0]) self.playback.next() - self.assertNotEqual(self.playback.current_track, self.tracks[1]) - self.assertEqual(self.playback.current_track, self.tracks[2]) + self.assert_current_track_is_not(self.tracks[1]) + self.assert_current_track_is(self.tracks[2]) @populate_tracklist def test_next_track_before_play(self): - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[0]) + self.assert_next_tl_track_is(self.tl_tracks.get()[0]) @populate_tracklist def test_next_track_during_play(self): self.playback.play() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[1]) + self.assert_next_tl_track_is(self.tl_tracks.get()[1]) @populate_tracklist def test_next_track_after_previous(self): self.playback.play() self.playback.next() self.playback.previous() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[1]) + self.assert_next_tl_track_is(self.tl_tracks.get()[1]) def test_next_track_empty_playlist(self): - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.next_track(tl_track), None) + self.assert_next_tl_track_is(None) @populate_tracklist def test_next_track_at_end_of_playlist(self): self.playback.play() - for _ in self.tracklist.tl_tracks[1:]: + for _ in self.tl_tracks.get()[1:]: self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.next_track(tl_track), None) + self.assert_next_tl_track_is(None) @populate_tracklist def test_next_track_at_end_of_playlist_with_repeat(self): @@ -341,9 +369,7 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.play() for _ in self.tracks[1:]: self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[0]) + self.assert_next_tl_track_is(self.tl_tracks.get()[0]) @populate_tracklist @mock.patch('random.shuffle') @@ -351,25 +377,23 @@ class LocalPlaybackProviderTest(unittest.TestCase): shuffle_mock.side_effect = lambda tracks: tracks.reverse() self.tracklist.random = True - current_tl_track = self.playback.current_tl_track - next_tl_track = self.tracklist.next_track(current_tl_track) - self.assertEqual(next_tl_track, self.tl_tracks[-1]) + self.assert_next_tl_track_is(self.tl_tracks.get()[-1]) @populate_tracklist def test_next_with_consume(self): self.tracklist.consume = True self.playback.play() self.playback.next() - self.assertNotIn(self.tracks[0], self.tracklist.tracks) + self.assertNotIn(self.tracks[0], self.tracklist.get_tracks().get()) @populate_tracklist def test_next_with_single_and_repeat(self): self.tracklist.single = True self.tracklist.repeat = True self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is(self.tracks[0]) self.playback.next() - self.assertEqual(self.playback.current_track, self.tracks[1]) + self.assert_current_track_is(self.tracks[1]) @populate_tracklist @mock.patch('random.shuffle') @@ -378,9 +402,9 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.tracklist.random = True self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[-1]) + self.assert_current_track_is(self.tracks[-1]) self.playback.next() - self.assertEqual(self.playback.current_track, self.tracks[-2]) + self.assert_current_track_is(self.tracks[-2]) @populate_tracklist @mock.patch('random.shuffle') @@ -388,10 +412,10 @@ class LocalPlaybackProviderTest(unittest.TestCase): shuffle_mock.side_effect = lambda tracks: tracks.reverse() self.tracklist.random = True - current_tl_track = self.playback.current_tl_track + current_tl_track = self.playback.get_current_tl_track().get() - expected_tl_track = self.tracklist.tl_tracks[-1] - next_tl_track = self.tracklist.next_track(current_tl_track) + expected_tl_track = self.tl_tracks.get()[-1] + next_tl_track = self.tracklist.next_track(current_tl_track).get() # Baseline checking that first next_track is last tl track per our fake # shuffle. @@ -400,8 +424,8 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.tracklist.add(self.tracks[:1]) old_next_tl_track = next_tl_track - expected_tl_track = self.tracklist.tl_tracks[-1] - next_tl_track = self.tracklist.next_track(current_tl_track) + expected_tl_track = self.tracklist.tl_tracks.get()[-1] + next_tl_track = self.tracklist.next_track(current_tl_track).get() # Verify that first next track has changed since we added to the # playlist. @@ -412,58 +436,55 @@ class LocalPlaybackProviderTest(unittest.TestCase): def test_end_of_track(self): self.playback.play() - tl_track = self.playback.current_tl_track - old_position = self.tracklist.index(tl_track) - old_uri = tl_track.track.uri + old_track = self.playback.get_current_track().get() + old_position = self.tracklist.index().get() - self.trigger_end_of_track() + self.trigger_about_to_finish() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.index(tl_track), old_position + 1) - self.assertNotEqual(self.playback.current_track.uri, old_uri) + new_track = self.playback.get_current_track().get() + self.assertEqual(self.tracklist.index().get(), old_position + 1) + self.assertNotEqual(new_track.uri, old_track.uri) @populate_tracklist def test_end_of_track_return_value(self): self.playback.play() - self.assertEqual(self.trigger_end_of_track(), None) + self.assertEqual(self.trigger_about_to_finish(), None) @populate_tracklist def test_end_of_track_does_not_trigger_playback(self): - self.trigger_end_of_track() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.trigger_about_to_finish() + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_end_of_track_at_end_of_playlist(self): self.playback.play() for i, track in enumerate(self.tracks): - self.assertEqual(self.playback.state, PlaybackState.PLAYING) - self.assertEqual(self.playback.current_track, track) - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.index(tl_track), i) + self.assert_state_is(PlaybackState.PLAYING) + self.assert_current_track_is(track) + self.assertEqual(self.tracklist.index().get(), i) - self.trigger_end_of_track() + self.trigger_about_to_finish() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_end_of_track_until_end_of_playlist_and_play_from_start(self): self.playback.play() for _ in self.tracks: - self.trigger_end_of_track() + self.trigger_about_to_finish() - self.assertEqual(self.playback.current_track, None) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assertEqual(self.playback.get_current_track().get(), None) + self.assert_state_is(PlaybackState.STOPPED) self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_state_is(PlaybackState.PLAYING) + self.assert_current_track_is(self.tracks[0]) def test_end_of_track_for_empty_playlist(self): - self.trigger_end_of_track() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.trigger_about_to_finish() + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_end_of_track_skips_to_next_track_on_failure(self): @@ -471,54 +492,46 @@ class LocalPlaybackProviderTest(unittest.TestCase): return_values = [True, False, True] self.backend.playback.play = lambda: return_values.pop() self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) - self.trigger_end_of_track() - self.assertNotEqual(self.playback.current_track, self.tracks[1]) - self.assertEqual(self.playback.current_track, self.tracks[2]) + self.assert_current_track_is(self.tracks[0]) + self.trigger_about_to_finish() + self.assert_current_track_is_not(self.tracks[1]) + self.assert_current_track_is(self.tracks[2]) @populate_tracklist def test_end_of_track_track_before_play(self): - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[0]) + self.assert_next_tl_track_is(self.tl_tracks.get()[0]) @populate_tracklist def test_end_of_track_track_during_play(self): self.playback.play() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[1]) + self.assert_next_tl_track_is(self.tl_tracks.get()[1]) @populate_tracklist - def test_end_of_track_track_after_previous(self): - self.playback.play() - self.trigger_end_of_track() - self.playback.previous() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[1]) + def test_about_to_finish_after_previous(self): + self.playback.play().get() + self.trigger_about_to_finish() + self.playback.previous().get() + self.assert_next_tl_track_is(self.tl_tracks.get()[1]) def test_end_of_track_track_empty_playlist(self): - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.next_track(tl_track), None) + self.assert_next_tl_track_is(None) @populate_tracklist def test_end_of_track_track_at_end_of_playlist(self): self.playback.play() - for _ in self.tracklist.tl_tracks[1:]: - self.trigger_end_of_track() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.next_track(tl_track), None) + for _ in self.tracks[1:]: + self.trigger_about_to_finish() + + self.assert_next_tl_track_is(None) @populate_tracklist def test_end_of_track_track_at_end_of_playlist_with_repeat(self): self.tracklist.repeat = True self.playback.play() for _ in self.tracks[1:]: - self.trigger_end_of_track() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[0]) + self.trigger_about_to_finish() + + self.assert_next_tl_track_is(self.tl_tracks.get()[0]) @populate_tracklist @mock.patch('random.shuffle') @@ -526,16 +539,14 @@ class LocalPlaybackProviderTest(unittest.TestCase): shuffle_mock.side_effect = lambda tracks: tracks.reverse() self.tracklist.random = True - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.next_track(tl_track), self.tl_tracks[-1]) + self.assert_next_tl_track_is(self.tl_tracks.get()[-1]) @populate_tracklist def test_end_of_track_with_consume(self): self.tracklist.consume = True self.playback.play() - self.trigger_end_of_track() - self.assertNotIn(self.tracks[0], self.tracklist.tracks) + self.trigger_about_to_finish() + self.assertNotIn(self.tracks[0], self.tracklist.get_tracks().get()) @populate_tracklist @mock.patch('random.shuffle') @@ -544,9 +555,9 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.tracklist.random = True self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[-1]) - self.trigger_end_of_track() - self.assertEqual(self.playback.current_track, self.tracks[-2]) + self.assert_current_track_is(self.tracks[-1]) + self.trigger_about_to_finish() + self.assert_current_track_is(self.tracks[-2]) @populate_tracklist @mock.patch('random.shuffle') @@ -555,10 +566,10 @@ class LocalPlaybackProviderTest(unittest.TestCase): shuffle_mock.side_effect = lambda tracks: tracks.reverse() self.tracklist.random = True - current_tl_track = self.playback.current_tl_track + current_tl_track = self.playback.get_current_tl_track().get() - expected_tl_track = self.tracklist.tl_tracks[-1] - eot_tl_track = self.tracklist.eot_track(current_tl_track) + expected_tl_track = self.tracklist.get_tl_tracks().get()[-1] + eot_tl_track = self.tracklist.eot_track(current_tl_track).get() # Baseline checking that first eot_track is last tl track per our fake # shuffle. @@ -567,8 +578,8 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.tracklist.add(self.tracks[:1]) old_eot_tl_track = eot_tl_track - expected_tl_track = self.tracklist.tl_tracks[-1] - eot_tl_track = self.tracklist.eot_track(current_tl_track) + expected_tl_track = self.tracklist.get_tl_tracks().get()[-1] + eot_tl_track = self.tracklist.eot_track(current_tl_track).get() # Verify that first next track has changed since we added to the # playlist. @@ -577,22 +588,18 @@ class LocalPlaybackProviderTest(unittest.TestCase): @populate_tracklist def test_previous_track_before_play(self): - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.previous_track(tl_track), None) + self.assert_previous_tl_track_is(None) @populate_tracklist def test_previous_track_after_play(self): self.playback.play() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.previous_track(tl_track), None) + self.assert_previous_tl_track_is(None) @populate_tracklist def test_previous_track_after_next(self): self.playback.play() self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.previous_track(tl_track), self.tl_tracks[0]) + self.assert_previous_tl_track_is(self.tl_tracks.get()[0]) @populate_tracklist def test_previous_track_after_previous(self): @@ -600,165 +607,138 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.next() # At track 1 self.playback.next() # At track 2 self.playback.previous() # At track 1 - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.previous_track(tl_track), self.tl_tracks[0]) + self.assert_previous_tl_track_is(self.tl_tracks.get()[0]) def test_previous_track_empty_playlist(self): - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.previous_track(tl_track), None) + self.assert_previous_tl_track_is(None) @populate_tracklist def test_previous_track_with_consume(self): self.tracklist.consume = True for _ in self.tracks: self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.previous_track(tl_track), - self.playback.current_tl_track) + current = self.playback.get_current_tl_track().get() + self.assert_previous_tl_track_is(current) @populate_tracklist def test_previous_track_with_random(self): self.tracklist.random = True for _ in self.tracks: self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual( - self.tracklist.previous_track(tl_track), - self.playback.current_tl_track) + current = self.playback.get_current_tl_track().get() + self.assert_previous_tl_track_is(current) @populate_tracklist def test_initial_current_track(self): - self.assertEqual(self.playback.current_track, None) + self.assert_current_track_is(None) @populate_tracklist def test_current_track_during_play(self): self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is(self.tracks[0]) @populate_tracklist def test_current_track_after_next(self): self.playback.play() self.playback.next() - self.assertEqual(self.playback.current_track, self.tracks[1]) + self.assert_current_track_is(self.tracks[1]) @populate_tracklist def test_initial_tracklist_position(self): - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.index(tl_track), None) + self.assertEqual(self.tracklist.index().get(), None) @populate_tracklist def test_tracklist_position_during_play(self): self.playback.play() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.index(tl_track), 0) + self.assert_current_track_index_is(0) @populate_tracklist def test_tracklist_position_after_next(self): self.playback.play() self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.index(tl_track), 1) + self.assert_current_track_index_is(1) @populate_tracklist def test_tracklist_position_at_end_of_playlist(self): - self.playback.play(self.tracklist.tl_tracks[-1]) - self.trigger_end_of_track() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.index(tl_track), None) + self.playback.play(self.tl_tracks.get()[-1]) + self.trigger_about_to_finish() + # EOS should have triggered + self.assert_current_track_index_is(None) - def test_on_tracklist_change_gets_called(self): - callback = self.playback._on_tracklist_change - - def wrapper(): - wrapper.called = True - return callback() - wrapper.called = False - - self.playback._on_tracklist_change = wrapper - self.tracklist.add([Track()]) - - self.assert_(wrapper.called) - - @unittest.SkipTest # Blocks for 10ms - @populate_tracklist - def test_end_of_track_callback_gets_called(self): - self.playback.play() - result = self.playback.seek(self.tracks[0].length - 10) - self.assertTrue(result, 'Seek failed') - message = self.core_queue.get(True, 1) - self.assertEqual('end_of_track', message['command']) + @mock.patch('mopidy.core.playback.PlaybackController._on_tracklist_change') + def test_on_tracklist_change_gets_called(self, change_mock): + self.tracklist.add([Track()]).get() + change_mock.assert_called_once_with() @populate_tracklist def test_on_tracklist_change_when_playing(self): self.playback.play() - current_track = self.playback.current_track + current_track = self.playback.get_current_track().get() self.tracklist.add([self.tracks[2]]) - self.assertEqual(self.playback.state, PlaybackState.PLAYING) - self.assertEqual(self.playback.current_track, current_track) + self.assert_state_is(PlaybackState.PLAYING) + self.assert_current_track_is(current_track) @populate_tracklist def test_on_tracklist_change_when_stopped(self): self.tracklist.add([self.tracks[2]]) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) - self.assertEqual(self.playback.current_track, None) + self.assert_state_is(PlaybackState.STOPPED) + self.assert_current_track_is(None) @populate_tracklist def test_on_tracklist_change_when_paused(self): self.playback.play() self.playback.pause() - current_track = self.playback.current_track + current_track = self.playback.get_current_track().get() self.tracklist.add([self.tracks[2]]) - self.assertEqual(self.playback.state, PlaybackState.PAUSED) - self.assertEqual(self.playback.current_track, current_track) + self.assert_state_is(PlaybackState.PAUSED) + self.assert_current_track_is(current_track) @populate_tracklist def test_pause_when_stopped(self): self.playback.pause() - self.assertEqual(self.playback.state, PlaybackState.PAUSED) + self.assert_state_is(PlaybackState.PAUSED) @populate_tracklist def test_pause_when_playing(self): self.playback.play() self.playback.pause() - self.assertEqual(self.playback.state, PlaybackState.PAUSED) + self.assert_state_is(PlaybackState.PAUSED) @populate_tracklist def test_pause_when_paused(self): self.playback.play() self.playback.pause() self.playback.pause() - self.assertEqual(self.playback.state, PlaybackState.PAUSED) + self.assert_state_is(PlaybackState.PAUSED) @populate_tracklist def test_pause_return_value(self): self.playback.play() - self.assertEqual(self.playback.pause(), None) + self.assertIsNone(self.playback.pause().get()) @populate_tracklist def test_resume_when_stopped(self): self.playback.resume() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_resume_when_playing(self): self.playback.play() self.playback.resume() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) @populate_tracklist def test_resume_when_paused(self): self.playback.play() self.playback.pause() self.playback.resume() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) @populate_tracklist def test_resume_return_value(self): self.playback.play() self.playback.pause() - self.assertEqual(self.playback.resume(), None) + self.assertIsNone(self.playback.resume().get()) @unittest.SkipTest # Uses sleep and might not work with LocalBackend @populate_tracklist @@ -781,16 +761,16 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.assertGreaterEqual(position, 990) def test_seek_on_empty_playlist(self): - self.assertFalse(self.playback.seek(0)) + self.assertFalse(self.playback.seek(0).get()) def test_seek_on_empty_playlist_updates_position(self): self.playback.seek(0) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_seek_when_stopped_triggers_play(self): self.playback.seek(0) - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) @populate_tracklist def test_seek_when_playing(self): @@ -800,10 +780,10 @@ class LocalPlaybackProviderTest(unittest.TestCase): @populate_tracklist def test_seek_when_playing_updates_position(self): - length = self.tracklist.tracks[0].length + length = self.tracks[0].length self.playback.play() self.playback.seek(length - 1000) - position = self.playback.time_position + position = self.playback.get_time_position().get() self.assertGreaterEqual(position, length - 1010) @populate_tracklist @@ -812,15 +792,15 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.pause() result = self.playback.seek(self.tracks[0].length - 1000) self.assert_(result, 'Seek return value was %s' % result) - self.assertEqual(self.playback.state, PlaybackState.PAUSED) + self.assert_state_is(PlaybackState.PAUSED) @populate_tracklist def test_seek_when_paused_updates_position(self): - length = self.tracklist.tracks[0].length + length = self.tracks[0].length self.playback.play() self.playback.pause() self.playback.seek(length - 1000) - position = self.playback.time_position + position = self.playback.get_time_position().get() self.assertGreaterEqual(position, length - 1010) @unittest.SkipTest @@ -835,50 +815,42 @@ class LocalPlaybackProviderTest(unittest.TestCase): def test_seek_beyond_end_of_song_jumps_to_next_song(self): self.playback.play() self.playback.seek(self.tracks[0].length * 100) - self.assertEqual(self.playback.current_track, self.tracks[1]) + self.assert_current_track_is(self.tracks[1]) @populate_tracklist def test_seek_beyond_end_of_song_for_last_track(self): - self.playback.play(self.tracklist.tl_tracks[-1]) - self.playback.seek(self.tracklist.tracks[-1].length * 100) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.playback.play(self.tl_tracks.get()[-1]) + self.playback.seek(self.tracks[-1].length * 100) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_stop_when_stopped(self): self.playback.stop() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_stop_when_playing(self): self.playback.play() self.playback.stop() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_stop_when_paused(self): self.playback.play() self.playback.pause() self.playback.stop() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_state_is(PlaybackState.STOPPED) def test_stop_return_value(self): self.playback.play() - self.assertEqual(self.playback.stop(), None) + self.assertIsNone(self.playback.stop().get()) def test_time_position_when_stopped(self): - future = mock.Mock() - future.get = mock.Mock(return_value=0) - self.audio.get_position = mock.Mock(return_value=future) - - self.assertEqual(self.playback.time_position, 0) + self.assertEqual(self.playback.get_time_position().get(), 0) @populate_tracklist def test_time_position_when_stopped_with_playlist(self): - future = mock.Mock() - future.get = mock.Mock(return_value=0) - self.audio.get_position = mock.Mock(return_value=future) - - self.assertEqual(self.playback.time_position, 0) + self.assertEqual(self.playback.get_time_position().get(), 0) @unittest.SkipTest # Uses sleep and does might not work with LocalBackend @populate_tracklist @@ -889,30 +861,30 @@ class LocalPlaybackProviderTest(unittest.TestCase): second = self.playback.time_position self.assertGreater(second, first) - @unittest.SkipTest # Uses sleep @populate_tracklist def test_time_position_when_paused(self): self.playback.play() - time.sleep(0.2) - self.playback.pause() - time.sleep(0.2) - first = self.playback.time_position - second = self.playback.time_position + self.playback.pause().get() + first = self.playback.get_time_position().get() + second = self.playback.get_time_position().get() self.assertEqual(first, second) @populate_tracklist def test_play_with_consume(self): self.tracklist.consume = True self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is(self.tracks[0]) @populate_tracklist def test_playlist_is_empty_after_all_tracks_are_played_with_consume(self): self.tracklist.consume = True self.playback.play() - for _ in range(len(self.tracklist.tracks)): - self.trigger_end_of_track() - self.assertEqual(len(self.tracklist.tracks), 0) + + for t in self.tracks: + self.trigger_about_to_finish() + # EOS should have trigger + + self.assertEqual(len(self.tracklist.get_tracks().get()), 0) @populate_tracklist @mock.patch('random.shuffle') @@ -921,7 +893,7 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.tracklist.random = True self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[-1]) + self.assert_current_track_is(self.tracks[-1]) @populate_tracklist @mock.patch('random.shuffle') @@ -931,24 +903,24 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.tracklist.random = True self.playback.play() self.playback.next() - current_track = self.playback.current_track + current_track = self.playback.get_current_track().get() self.playback.previous() - self.assertEqual(self.playback.current_track, current_track) + self.assert_current_track_is(current_track) @populate_tracklist def test_end_of_song_starts_next_track(self): self.playback.play() - self.trigger_end_of_track() - self.assertEqual(self.playback.current_track, self.tracks[1]) + self.trigger_about_to_finish() + self.assert_current_track_is(self.tracks[1]) @populate_tracklist def test_end_of_song_with_single_and_repeat_starts_same(self): self.tracklist.single = True self.tracklist.repeat = True self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) - self.trigger_end_of_track() - self.assertEqual(self.playback.current_track, self.tracks[0]) + self.assert_current_track_is(self.tracks[0]) + self.trigger_about_to_finish() + self.assert_current_track_is(self.tracks[0]) @populate_tracklist def test_end_of_song_with_single_random_and_repeat_starts_same(self): @@ -956,42 +928,45 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.tracklist.repeat = True self.tracklist.random = True self.playback.play() - current_track = self.playback.current_track - self.trigger_end_of_track() - self.assertEqual(self.playback.current_track, current_track) + current_track = self.playback.get_current_track().get() + self.trigger_about_to_finish() + self.assert_current_track_is(current_track) @populate_tracklist def test_end_of_song_with_single_stops(self): self.tracklist.single = True self.playback.play() - self.assertEqual(self.playback.current_track, self.tracks[0]) - self.trigger_end_of_track() - self.assertEqual(self.playback.current_track, None) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_current_track_is(self.tracks[0]) + self.trigger_about_to_finish() + self.assert_current_track_is(None) + # EOS should have triggered + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_end_of_song_with_single_and_random_stops(self): self.tracklist.single = True self.tracklist.random = True self.playback.play() - self.trigger_end_of_track() - self.assertEqual(self.playback.current_track, None) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.trigger_about_to_finish() + # EOS should have triggered + self.assert_current_track_is(None) + self.assert_state_is(PlaybackState.STOPPED) @populate_tracklist def test_end_of_playlist_stops(self): - self.playback.play(self.tracklist.tl_tracks[-1]) - self.trigger_end_of_track() - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.playback.play(self.tl_tracks.get()[-1]) + self.trigger_about_to_finish() + # EOS should have triggered + self.assert_state_is(PlaybackState.STOPPED) def test_repeat_off_by_default(self): - self.assertEqual(self.tracklist.repeat, False) + self.assertEqual(self.tracklist.get_repeat().get(), False) def test_random_off_by_default(self): - self.assertEqual(self.tracklist.random, False) + self.assertEqual(self.tracklist.get_random().get(), False) def test_consume_off_by_default(self): - self.assertEqual(self.tracklist.consume, False) + self.assertEqual(self.tracklist.get_consume().get(), False) @populate_tracklist def test_random_until_end_of_playlist(self): @@ -999,17 +974,16 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.play() for _ in self.tracks[1:]: self.playback.next() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.next_track(tl_track), None) + self.assert_next_tl_track_is(None) @populate_tracklist def test_random_with_eot_until_end_of_playlist(self): self.tracklist.random = True self.playback.play() for _ in self.tracks[1:]: - self.trigger_end_of_track() - tl_track = self.playback.current_tl_track - self.assertEqual(self.tracklist.eot_track(tl_track), None) + self.trigger_about_to_finish() + + self.assert_eot_tl_track_is(None) @populate_tracklist def test_random_until_end_of_playlist_and_play_from_start(self): @@ -1017,23 +991,23 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.play() for _ in self.tracks: self.playback.next() - tl_track = self.playback.current_tl_track - self.assertNotEqual(self.tracklist.next_track(tl_track), None) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.assert_next_tl_track_is_not(None) + self.assert_state_is(PlaybackState.STOPPED) self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) @populate_tracklist def test_random_with_eot_until_end_of_playlist_and_play_from_start(self): self.tracklist.random = True self.playback.play() for _ in self.tracks: - self.trigger_end_of_track() - tl_track = self.playback.current_tl_track - self.assertNotEqual(self.tracklist.eot_track(tl_track), None) - self.assertEqual(self.playback.state, PlaybackState.STOPPED) + self.trigger_about_to_finish() + # EOS should have triggered + + self.assert_eot_tl_track_is_not(None) + self.assert_state_is(PlaybackState.STOPPED) self.playback.play() - self.assertEqual(self.playback.state, PlaybackState.PLAYING) + self.assert_state_is(PlaybackState.PLAYING) @populate_tracklist def test_random_until_end_of_playlist_with_repeat(self): @@ -1042,8 +1016,7 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.play() for _ in self.tracks[1:]: self.playback.next() - tl_track = self.playback.current_tl_track - self.assertNotEqual(self.tracklist.next_track(tl_track), None) + self.assert_next_tl_track_is_not(None) @populate_tracklist def test_played_track_during_random_not_played_again(self): @@ -1051,8 +1024,9 @@ class LocalPlaybackProviderTest(unittest.TestCase): self.playback.play() played = [] for _ in self.tracks: - self.assertNotIn(self.playback.current_track, played) - played.append(self.playback.current_track) + track = self.playback.get_current_track().get() + self.assertNotIn(track, played) + played.append(track) self.playback.next() @populate_tracklist @@ -1061,17 +1035,19 @@ class LocalPlaybackProviderTest(unittest.TestCase): # Covers underlying issue IssueGH17RegressionTest tests for. shuffle_mock.side_effect = lambda tracks: tracks.reverse() - expected = self.tl_tracks[::-1] + [None] + expected = self.tl_tracks.get()[::-1] + [None] actual = [] self.playback.play() self.tracklist.random = True - while self.playback.state != PlaybackState.STOPPED: + while self.playback.get_state().get() != PlaybackState.STOPPED: self.playback.next() - actual.append(self.playback.current_tl_track) + actual.append(self.playback.get_current_tl_track().get()) + if len(actual) > len(expected): + break self.assertEqual(actual, expected) @populate_tracklist def test_playing_track_that_isnt_in_playlist(self): with self.assertRaises(AssertionError): - self.playback.play(TlTrack(17, Track())) + self.playback.play(TlTrack(17, Track())).get()