diff --git a/mopidy/core/playback.py b/mopidy/core/playback.py index 9cbbf874..7317550e 100644 --- a/mopidy/core/playback.py +++ b/mopidy/core/playback.py @@ -207,6 +207,8 @@ class PlaybackController(object): self._trigger_track_playback_started() def _on_about_to_finish(self): + self._trigger_track_playback_ended(self.get_time_position()) + # TODO: check that we always have a current track original_tl_track = self.get_current_tl_track() next_tl_track = self.core.tracklist.eot_track(original_tl_track) @@ -244,6 +246,7 @@ class PlaybackController(object): current = self._pending_tl_track or self._current_tl_track # TODO: move to pending track? + self._trigger_track_playback_ended(self.get_time_position()) self.core.tracklist._mark_played(self._current_tl_track) while current: @@ -290,7 +293,43 @@ class PlaybackController(object): if tl_track: deprecation.warn('core.playback.play:tl_track_kwarg', pending=True) - self._play(tl_track=tl_track, tlid=tlid, on_error_step=1) + if tl_track is None and tlid is not None: + for tl_track in self.core.tracklist.get_tl_tracks(): + if tl_track.tlid == tlid: + break + else: + tl_track = None + + if tl_track is not None: + # TODO: allow from outside tracklist, would make sense given refs? + assert tl_track in self.core.tracklist.get_tl_tracks() + elif tl_track is None and self.get_state() == PlaybackState.PAUSED: + self.resume() + return + + original = self._current_tl_track + current = self._pending_tl_track or self._current_tl_track + pending = tl_track or current or self.core.tracklist.next_track(None) + + if original != pending and self.get_state() != PlaybackState.STOPPED: + self._trigger_track_playback_ended(self.get_time_position()) + + if pending: + # TODO: remove? + self.set_state(PlaybackState.PLAYING) + + while pending: + # TODO: should we consume unplayable tracks in this loop? + if self._change(pending, PlaybackState.PLAYING): + break + else: + self.core.tracklist._mark_unplayable(pending) + current = pending + pending = self.core.tracklist.next_track(current) + + # TODO: move to top and get rid of original? + self.core.tracklist._mark_played(original) + # TODO return result? def _change(self, pending_tl_track, state): self._pending_tl_track = pending_tl_track @@ -327,67 +366,6 @@ class PlaybackController(object): raise Exception('Unknown state: %s' % state) - def _play(self, tl_track=None, tlid=None, on_error_step=1): - if tl_track is None and tlid is not None: - for tl_track in self.core.tracklist.get_tl_tracks(): - if tl_track.tlid == tlid: - break - else: - tl_track = None - - if tl_track is None: - if self.get_state() == PlaybackState.PAUSED: - return self.resume() - - if self.get_current_tl_track() is not None: - tl_track = self.get_current_tl_track() - else: - if on_error_step == 1: - tl_track = self.core.tracklist.next_track(tl_track) - elif on_error_step == -1: - tl_track = self.core.tracklist.previous_track(tl_track) - - if tl_track is None: - return - - assert tl_track in self.core.tracklist.get_tl_tracks() - - # TODO: switch to: - # backend.play(track) - # wait for state change? - - if self.get_state() == PlaybackState.PLAYING: - self.stop() - - self._set_current_tl_track(tl_track) - self.set_state(PlaybackState.PLAYING) - backend = self._get_backend(tl_track) - success = False - - if backend: - backend.playback.prepare_change() - try: - success = ( - backend.playback.change_track(tl_track.track).get() and - backend.playback.play().get()) - except TypeError: - logger.error( - '%s needs to be updated to work with this ' - 'version of Mopidy.', - backend.actor_ref.actor_class.__name__) - logger.debug('Backend exception', exc_info=True) - - if success: - # TODO: replace with stream-changed - self._trigger_track_playback_started() - else: - self.core.tracklist._mark_unplayable(tl_track) - if on_error_step == 1: - # TODO: can cause an endless loop for single track repeat. - self.next() - elif on_error_step == -1: - self.previous() - def previous(self): """ Change to the previous track. @@ -395,6 +373,8 @@ class PlaybackController(object): The current playback state will be kept. If it was playing, playing will continue. If it was paused, it will still be paused, etc. """ + self._trigger_track_playback_ended(self.get_time_position()) + state = self.get_state() current = self._pending_tl_track or self._current_tl_track @@ -443,15 +423,23 @@ class PlaybackController(object): if not self.core.tracklist.tracks: return False - if self.current_track and self.current_track.length is None: - return False - if self.get_state() == PlaybackState.STOPPED: self.play() + # TODO: uncomment once we have tests for this. Should fix seek after + # about to finish doing wrong track. + # if self._current_tl_track and self._pending_tl_track: + # self.play(self._current_tl_track) + + # We need to prefer the still playing track, but if nothing is playing + # we fall back to the pending one. + tl_track = self._current_tl_track or self._pending_tl_track + if tl_track and tl_track.track.length is None: + return False + if time_position < 0: time_position = 0 - elif time_position > self.current_track.length: + elif time_position > tl_track.track.length: # TODO: gstreamer will trigger a about to finish for us, use that? self.next() return True diff --git a/tests/core/test_playback.py b/tests/core/test_playback.py index 5880ace3..23a9845d 100644 --- a/tests/core/test_playback.py +++ b/tests/core/test_playback.py @@ -24,12 +24,14 @@ class TestBackend(pykka.ThreadingActor, backend.Backend): self.playback = backend.PlaybackProvider(audio=audio, backend=self) -class PlaybackBaseTest(unittest.TestCase): +class BaseTest(unittest.TestCase): config = {'core': {'max_tracklist_length': 10000}} tracks = [Track(uri='dummy:a', length=1234), - Track(uri='dummy:b', length=1234)] + Track(uri='dummy:b', length=1234), + Track(uri='dummy:c', length=1234)] def setUp(self): # noqa: N802 + # TODO: use create_proxy helpers. self.audio = dummy_audio.DummyAudio.start().proxy() self.backend = TestBackend.start( audio=self.audio, config=self.config).proxy() @@ -67,7 +69,58 @@ class PlaybackBaseTest(unittest.TestCase): self.replay_events(until=replay_until) -class TestNextHandling(PlaybackBaseTest): +class TestPlayHandling(BaseTest): + + def test_get_current_tl_track_play(self): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + + self.assertEqual( + self.core.playback.get_current_tl_track(), tl_tracks[0]) + + def test_get_current_track_play(self): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + + self.assertEqual( + self.core.playback.get_current_track(), self.tracks[0]) + + def test_get_current_tlid_play(self): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + + self.assertEqual( + self.core.playback.get_current_tlid(), tl_tracks[0].tlid) + + def test_play_skips_to_next_on_unplayable_track(self): + """Checks that we handle backend.change_track failing.""" + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.audio.trigger_fake_playback_failure(tl_tracks[0].track.uri) + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + + current_tl_track = self.core.playback.get_current_tl_track() + self.assertEqual(tl_tracks[1], current_tl_track) + + def test_play_tlid(self): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tlid=tl_tracks[1].tlid) + self.replay_events() + + current_tl_track = self.core.playback.get_current_tl_track() + self.assertEqual(tl_tracks[1], current_tl_track) + + +class TestNextHandling(BaseTest): def test_get_current_tl_track_next(self): self.core.playback.play() @@ -99,8 +152,19 @@ class TestNextHandling(PlaybackBaseTest): current_track = self.core.playback.get_current_track() self.assertEqual(current_track, self.tracks[1]) + def test_next_keeps_finished_track_in_tracklist(self): + tl_track = self.core.tracklist.get_tl_tracks()[0] -class TestPreviousHandling(PlaybackBaseTest): + self.core.playback.play(tl_track) + self.replay_events() + + self.core.playback.next() + self.replay_events() + + self.assertIn(tl_track, self.core.tracklist.tl_tracks) + + +class TestPreviousHandling(BaseTest): # TODO Test previous() more def test_get_current_tl_track_prev(self): @@ -144,37 +208,13 @@ class TestPreviousHandling(PlaybackBaseTest): self.assertIn(tl_tracks[1], self.core.tracklist.tl_tracks) - @unittest.skip('Currently tests wrong events, and nothing generates them.') - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) - def test_previous_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[1]) - listener_mock.reset_mock() - self.core.playback.previous() - - self.assertListEqual( - listener_mock.send.mock_calls, - [ - mock.call( - 'playback_state_changed', - old_state='playing', new_state='stopped'), - mock.call( - 'track_playback_ended', - tl_track=self.tl_tracks[1], time_position=mock.ANY), - mock.call( - 'playback_state_changed', - old_state='stopped', new_state='playing'), - mock.call( - 'track_playback_started', tl_track=self.tl_tracks[0]), - ]) - - -class TestPlayUnknownHanlding(PlaybackBaseTest): +class TestPlayUnknownHanlding(BaseTest): tracks = [Track(uri='unknown:a', length=1234), Track(uri='dummy:b', length=1234)] + # TODO: move to UnplayableTest? def test_play_skips_to_next_on_track_without_playback_backend(self): self.core.playback.play() @@ -184,7 +224,18 @@ class TestPlayUnknownHanlding(PlaybackBaseTest): self.assertEqual(current_track, self.tracks[1]) -class TestConsumeHandling(PlaybackBaseTest): +class OnAboutToFinishTest(BaseTest): + + def test_on_about_to_finish_keeps_finished_track_in_tracklist(self): + tl_track = self.core.tracklist.get_tl_tracks()[0] + + self.core.playback.play(tl_track) + self.trigger_about_to_finish() + + self.assertIn(tl_track, self.core.tracklist.tl_tracks) + + +class TestConsumeHandling(BaseTest): def test_next_in_consume_mode_removes_finished_track(self): tl_track = self.core.tracklist.get_tl_tracks()[0] @@ -208,52 +259,25 @@ class TestConsumeHandling(PlaybackBaseTest): self.assertNotIn(tl_track, self.core.tracklist.get_tl_tracks()) -class TestCurrentAndPendingTlTrack(unittest.TestCase): - config = {'core': {'max_tracklist_length': 10000}} +class TestCurrentAndPendingTlTrack(BaseTest): - def setUp(self): # noqa: N802 - self.audio = dummy_audio.DummyAudio.start().proxy() - self.backend = TestBackend.start(config={}, audio=self.audio).proxy() - self.core = core.Core( - audio=self.audio, backends=[self.backend], config=self.config) - self.playback = self.core.playback + def test_get_current_tl_track_none(self): + self.assertEqual( + self.core.playback.get_current_tl_track(), None) - self.tracks = [Track(uri='dummy:a', length=1234), - Track(uri='dummy:b', length=1234)] - - self.core.tracklist.add(self.tracks) - - self.events = [] - self.patcher = mock.patch('mopidy.audio.listener.AudioListener.send') - self.send_mock = self.patcher.start() - - def send(event, **kwargs): - self.events.append((event, kwargs)) - - self.send_mock.side_effect = send - - def tearDown(self): # noqa: N802 - pykka.ActorRegistry.stop_all() - self.patcher.stop() - - def trigger_about_to_finish(self, block_stream_changed=False): - callback = self.audio.get_about_to_finish_callback().get() - callback() - - while self.events: - event, kwargs = self.events.pop(0) - if event == 'stream_changed' and block_stream_changed: - continue - self.core.on_event(event, **kwargs) + def test_get_current_tlid_none(self): + self.assertEqual(self.core.playback.get_current_tlid(), None) def test_pending_tl_track_is_none(self): self.core.playback.play() + self.replay_events() self.assertEqual(self.playback._pending_tl_track, None) def test_pending_tl_track_after_about_to_finish(self): self.core.playback.play() - self.trigger_about_to_finish(block_stream_changed=True) + self.replay_events() + self.trigger_about_to_finish(replay_until='stream_changed') self.assertEqual(self.playback._pending_tl_track.track.uri, 'dummy:b') def test_pending_tl_track_after_stream_changed(self): @@ -262,156 +286,34 @@ class TestCurrentAndPendingTlTrack(unittest.TestCase): def test_current_tl_track_after_about_to_finish(self): self.core.playback.play() - self.trigger_about_to_finish(block_stream_changed=True) + self.replay_events() + self.trigger_about_to_finish(replay_until='stream_changed') self.assertEqual(self.playback.current_tl_track.track.uri, 'dummy:a') def test_current_tl_track_after_stream_changed(self): self.core.playback.play() + self.replay_events() self.trigger_about_to_finish() self.assertEqual(self.playback.current_tl_track.track.uri, 'dummy:b') def test_current_tl_track_after_end_of_stream(self): self.core.playback.play() + self.replay_events() + self.trigger_about_to_finish() self.trigger_about_to_finish() self.trigger_about_to_finish() # EOS self.assertEqual(self.playback.current_tl_track, None) -# TODO: split into smaller easier to follow tests. setup is way to complex. -# TODO: just mock tracklist? -class CorePlaybackTest(unittest.TestCase): +@mock.patch( + 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) +class EventEmissionTest(BaseTest): - def setUp(self): # noqa: N802 - config = { - 'core': { - 'max_tracklist_length': 10000, - } - } - - self.backend1 = mock.Mock() - self.backend1.uri_schemes.get.return_value = ['dummy1'] - self.playback1 = mock.Mock(spec=backend.PlaybackProvider) - self.playback1.get_time_position.return_value.get.return_value = 1000 - self.backend1.playback = self.playback1 - - self.backend2 = mock.Mock() - self.backend2.uri_schemes.get.return_value = ['dummy2'] - self.playback2 = mock.Mock(spec=backend.PlaybackProvider) - self.playback2.get_time_position.return_value.get.return_value = 2000 - self.backend2.playback = self.playback2 - - # A backend without the optional playback provider - self.backend3 = mock.Mock() - self.backend3.uri_schemes.get.return_value = ['dummy3'] - self.backend3.has_playback().get.return_value = False - - self.tracks = [ - Track(uri='dummy1:a', length=40000), - Track(uri='dummy2:a', length=40000), - Track(uri='dummy3:a', length=40000), # Unplayable - Track(uri='dummy1:b', length=40000), - Track(uri='dummy1:c', length=None), # No duration - ] - - self.uris = [ - 'dummy1:a', 'dummy2:a', 'dummy3:a', 'dummy1:b', 'dummy1:c'] - - self.core = core.Core(config, mixer=None, backends=[ - self.backend1, self.backend2, self.backend3]) - - def lookup(uris): - result = {uri: [] for uri in uris} - for track in self.tracks: - if track.uri in result: - result[track.uri].append(track) - return result - - self.lookup_patcher = mock.patch.object(self.core.library, 'lookup') - self.lookup_mock = self.lookup_patcher.start() - self.lookup_mock.side_effect = lookup - - self.core.tracklist.add(uris=self.uris) - - self.tl_tracks = self.core.tracklist.tl_tracks - self.unplayable_tl_track = self.tl_tracks[2] - self.duration_less_tl_track = self.tl_tracks[4] - - def tearDown(self): # noqa: N802 - self.lookup_patcher.stop() - - def trigger_end_of_track(self): - self.core.playback._on_end_of_track() - - def set_current_tl_track(self, tl_track): - self.core.playback._set_current_tl_track(tl_track) - - def test_get_current_tl_track_none(self): - self.set_current_tl_track(None) - - self.assertEqual( - self.core.playback.get_current_tl_track(), None) - - def test_get_current_tl_track_play(self): - self.core.playback.play(self.tl_tracks[0]) - - self.assertEqual( - self.core.playback.get_current_tl_track(), self.tl_tracks[0]) - - def test_get_current_track_play(self): - self.core.playback.play(self.tl_tracks[0]) - - self.assertEqual( - self.core.playback.get_current_track(), self.tracks[0]) - - def test_get_current_tlid_none(self): - self.set_current_tl_track(None) - - self.assertEqual(self.core.playback.get_current_tlid(), None) - - def test_get_current_tlid_play(self): - self.core.playback.play(self.tl_tracks[0]) - - self.assertEqual( - self.core.playback.get_current_tlid(), self.tl_tracks[0].tlid) - - # TODO Test state - - def test_play_selects_dummy1_backend(self): - self.core.playback.play(self.tl_tracks[0]) - - self.playback1.prepare_change.assert_called_once_with() - self.playback1.change_track.assert_called_once_with(self.tracks[0]) - self.playback1.play.assert_called_once_with() - self.assertFalse(self.playback2.play.called) - - def test_play_selects_dummy2_backend(self): - self.core.playback.play(self.tl_tracks[1]) - - self.assertFalse(self.playback1.play.called) - self.playback2.prepare_change.assert_called_once_with() - self.playback2.change_track.assert_called_once_with(self.tracks[1]) - self.playback2.play.assert_called_once_with() - - def test_play_skips_to_next_on_unplayable_track(self): - """Checks that we handle backend.change_track failing.""" - self.playback2.change_track.return_value.get.return_value = False - - self.core.tracklist.clear() - self.core.tracklist.add(uris=self.uris[:2]) - tl_tracks = self.core.tracklist.tl_tracks + def test_play_when_stopped_emits_events(self, listener_mock): + tl_tracks = self.core.tracklist.get_tl_tracks() self.core.playback.play(tl_tracks[0]) - self.core.playback.play(tl_tracks[1]) - - # TODO: we really want to check that the track was marked unplayable - # and that next was called. This is just an indirect way of checking - # this :( - self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED) - - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) - def test_play_when_stopped_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + self.replay_events() self.assertListEqual( listener_mock.send.mock_calls, @@ -420,78 +322,66 @@ class CorePlaybackTest(unittest.TestCase): 'playback_state_changed', old_state='stopped', new_state='playing'), mock.call( - 'track_playback_started', tl_track=self.tl_tracks[0]), + 'track_playback_started', tl_track=tl_tracks[0]), ]) - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_play_when_paused_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + self.core.playback.pause() + self.replay_events() listener_mock.reset_mock() - self.core.playback.play(self.tl_tracks[1]) + self.core.playback.play(tl_tracks[1]) + self.replay_events() self.assertListEqual( listener_mock.send.mock_calls, [ + mock.call( + 'track_playback_ended', + tl_track=tl_tracks[0], time_position=mock.ANY), mock.call( 'playback_state_changed', old_state='paused', new_state='playing'), mock.call( - 'track_playback_started', tl_track=self.tl_tracks[1]), + 'track_playback_started', tl_track=tl_tracks[1]), ]) - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_play_when_playing_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() listener_mock.reset_mock() - self.core.playback.play(self.tl_tracks[3]) + self.core.playback.play(tl_tracks[2]) + self.replay_events() + # TODO: Do we want to emit playing->playing for this case? self.assertListEqual( listener_mock.send.mock_calls, [ - mock.call( - 'playback_state_changed', - old_state='playing', new_state='stopped'), mock.call( 'track_playback_ended', - tl_track=self.tl_tracks[0], time_position=1000), + tl_track=tl_tracks[0], time_position=mock.ANY), mock.call( - 'playback_state_changed', - old_state='stopped', new_state='playing'), + 'playback_state_changed', old_state='playing', + new_state='playing'), mock.call( - 'track_playback_started', tl_track=self.tl_tracks[3]), + 'track_playback_started', tl_track=tl_tracks[2]), ]) - def test_pause_selects_dummy1_backend(self): - self.core.playback.play(self.tl_tracks[0]) - self.core.playback.pause() - - self.playback1.pause.assert_called_once_with() - self.assertFalse(self.playback2.pause.called) - - def test_pause_selects_dummy2_backend(self): - self.core.playback.play(self.tl_tracks[1]) - self.core.playback.pause() - - self.assertFalse(self.playback1.pause.called) - self.playback2.pause.assert_called_once_with() - - def test_pause_changes_state_even_if_track_is_unplayable(self): - self.set_current_tl_track(self.unplayable_tl_track) - self.core.playback.pause() - - self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED) - self.assertFalse(self.playback1.pause.called) - self.assertFalse(self.playback2.pause.called) - - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_pause_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + + self.core.playback.seek(1000) listener_mock.reset_mock() self.core.playback.pause() @@ -504,39 +394,17 @@ class CorePlaybackTest(unittest.TestCase): old_state='playing', new_state='paused'), mock.call( 'track_playback_paused', - tl_track=self.tl_tracks[0], time_position=1000), + tl_track=tl_tracks[0], time_position=1000), ]) - def test_resume_selects_dummy1_backend(self): - self.core.playback.play(self.tl_tracks[0]) - self.core.playback.pause() - self.core.playback.resume() - - self.playback1.resume.assert_called_once_with() - self.assertFalse(self.playback2.resume.called) - - def test_resume_selects_dummy2_backend(self): - self.core.playback.play(self.tl_tracks[1]) - self.core.playback.pause() - self.core.playback.resume() - - self.assertFalse(self.playback1.resume.called) - self.playback2.resume.assert_called_once_with() - - def test_resume_does_nothing_if_track_is_unplayable(self): - self.set_current_tl_track(self.unplayable_tl_track) - self.core.playback.state = core.PlaybackState.PAUSED - self.core.playback.resume() - - self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED) - self.assertFalse(self.playback1.resume.called) - self.assertFalse(self.playback2.resume.called) - - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_resume_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + self.core.playback.pause() + self.core.playback.seek(1000) listener_mock.reset_mock() self.core.playback.resume() @@ -549,39 +417,19 @@ class CorePlaybackTest(unittest.TestCase): old_state='paused', new_state='playing'), mock.call( 'track_playback_resumed', - tl_track=self.tl_tracks[0], time_position=1000), + tl_track=tl_tracks[0], time_position=1000), ]) - def test_stop_selects_dummy1_backend(self): - self.core.playback.play(self.tl_tracks[0]) - self.core.playback.stop() - - self.playback1.stop.assert_called_once_with() - self.assertFalse(self.playback2.stop.called) - - def test_stop_selects_dummy2_backend(self): - self.core.playback.play(self.tl_tracks[1]) - self.core.playback.stop() - - self.assertFalse(self.playback1.stop.called) - self.playback2.stop.assert_called_once_with() - - def test_stop_changes_state_even_if_track_is_unplayable(self): - self.set_current_tl_track(self.unplayable_tl_track) - self.core.playback.state = core.PlaybackState.PAUSED - self.core.playback.stop() - - self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED) - self.assertFalse(self.playback1.stop.called) - self.assertFalse(self.playback2.stop.called) - - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_stop_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + self.core.playback.seek(1000) listener_mock.reset_mock() self.core.playback.stop() + self.replay_events() self.assertListEqual( listener_mock.send.mock_calls, @@ -591,158 +439,55 @@ class CorePlaybackTest(unittest.TestCase): old_state='playing', new_state='stopped'), mock.call( 'track_playback_ended', - tl_track=self.tl_tracks[0], time_position=1000), + tl_track=tl_tracks[0], time_position=1000), ]) - # TODO Test next() more - - def test_next_keeps_finished_track_in_tracklist(self): - tl_track = self.tl_tracks[0] - self.core.playback.play(tl_track) - - self.core.playback.next() - - self.assertIn(tl_track, self.core.tracklist.tl_tracks) - - @unittest.skip('Currently tests wrong events, and nothing generates them.') - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_next_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + self.core.playback.seek(1000) listener_mock.reset_mock() self.core.playback.next() + self.replay_events() + # TODO: should we be emitting playing -> playing? self.assertListEqual( listener_mock.send.mock_calls, [ - mock.call( - 'playback_state_changed', - old_state='playing', new_state='stopped'), mock.call( 'track_playback_ended', - tl_track=self.tl_tracks[0], time_position=mock.ANY), + tl_track=tl_tracks[0], time_position=mock.ANY), mock.call( - 'playback_state_changed', - old_state='stopped', new_state='playing'), - mock.call( - 'track_playback_started', tl_track=self.tl_tracks[1]), + 'track_playback_started', tl_track=tl_tracks[1]), ]) - def test_on_about_to_finish_keeps_finished_track_in_tracklist(self): - tl_track = self.tl_tracks[0] - self.core.playback.play(tl_track) - - self.core.playback._on_about_to_finish() # TODO trigger_about_to.. - - self.assertIn(tl_track, self.core.tracklist.tl_tracks) - - @unittest.skip('Currently tests wrong events, and nothing generates them.') - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_on_end_of_track_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() listener_mock.reset_mock() - self.trigger_end_of_track() + self.trigger_about_to_finish() self.assertListEqual( listener_mock.send.mock_calls, [ - mock.call( - 'playback_state_changed', - old_state='playing', new_state='stopped'), mock.call( 'track_playback_ended', - tl_track=self.tl_tracks[0], time_position=mock.ANY), + tl_track=tl_tracks[0], time_position=mock.ANY), mock.call( - 'playback_state_changed', - old_state='stopped', new_state='playing'), - mock.call( - 'track_playback_started', tl_track=self.tl_tracks[1]), + 'track_playback_started', tl_track=tl_tracks[1]), ]) - @unittest.skip('Currently tests wrong events, and nothing generates them.') - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) - def test_seek_past_end_of_track_emits_events(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) - listener_mock.reset_mock() - - self.core.playback.seek(self.tracks[0].length * 5) - - self.assertListEqual( - listener_mock.send.mock_calls, - [ - mock.call( - 'playback_state_changed', - old_state='playing', new_state='stopped'), - mock.call( - 'track_playback_ended', - tl_track=self.tl_tracks[0], time_position=mock.ANY), - mock.call( - 'playback_state_changed', - old_state='stopped', new_state='playing'), - mock.call( - 'track_playback_started', tl_track=self.tl_tracks[1]), - ]) - - def test_seek_selects_dummy1_backend(self): - self.core.playback.play(self.tl_tracks[0]) - self.core.playback.seek(10000) - - self.playback1.seek.assert_called_once_with(10000) - self.assertFalse(self.playback2.seek.called) - - def test_seek_selects_dummy2_backend(self): - self.core.playback.play(self.tl_tracks[1]) - self.core.playback.seek(10000) - - self.assertFalse(self.playback1.seek.called) - self.playback2.seek.assert_called_once_with(10000) - - def test_seek_normalizes_negative_positions_to_zero(self): - self.core.playback.play(self.tl_tracks[0]) - self.core.playback.seek(-100) - - self.playback1.seek.assert_called_once_with(0) - - def test_seek_fails_for_unplayable_track(self): - self.set_current_tl_track(self.unplayable_tl_track) - self.core.playback.state = core.PlaybackState.PLAYING - success = self.core.playback.seek(1000) - - self.assertFalse(success) - self.assertFalse(self.playback1.seek.called) - self.assertFalse(self.playback2.seek.called) - - def test_seek_fails_for_track_without_duration(self): - self.set_current_tl_track(self.duration_less_tl_track) - self.core.playback.state = core.PlaybackState.PLAYING - success = self.core.playback.seek(1000) - - self.assertFalse(success) - self.assertFalse(self.playback1.seek.called) - self.assertFalse(self.playback2.seek.called) - - def test_seek_play_stay_playing(self): - self.core.playback.play(self.tl_tracks[0]) - self.core.playback.state = core.PlaybackState.PLAYING - self.core.playback.seek(1000) - - self.assertEqual(self.core.playback.state, core.PlaybackState.PLAYING) - - def test_seek_paused_stay_paused(self): - self.core.playback.play(self.tl_tracks[0]) - self.core.playback.state = core.PlaybackState.PAUSED - self.core.playback.seek(1000) - - self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED) - - @mock.patch( - 'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener) def test_seek_emits_seeked_event(self, listener_mock): - self.core.playback.play(self.tl_tracks[0]) + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() listener_mock.reset_mock() self.core.playback.seek(1000) @@ -750,8 +495,313 @@ class CorePlaybackTest(unittest.TestCase): listener_mock.send.assert_called_once_with( 'seeked', time_position=1000) + def test_seek_past_end_of_track_emits_events(self, listener_mock): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + listener_mock.reset_mock() + + self.core.playback.seek(self.tracks[0].length * 5) + self.replay_events() + + self.assertListEqual( + listener_mock.send.mock_calls, + [ + mock.call( + 'track_playback_ended', + tl_track=tl_tracks[0], time_position=mock.ANY), + mock.call( + 'track_playback_started', tl_track=tl_tracks[1]), + ]) + + def test_previous_emits_events(self, listener_mock): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[1]) + self.replay_events() + listener_mock.reset_mock() + + self.core.playback.previous() + self.replay_events() + + self.assertListEqual( + listener_mock.send.mock_calls, + [ + mock.call( + 'track_playback_ended', + tl_track=tl_tracks[1], time_position=mock.ANY), + mock.call( + 'track_playback_started', tl_track=tl_tracks[0]), + ]) + + +class UnplayableURITest(BaseTest): + + def setUp(self): # noqa: N802 + super(UnplayableURITest, self).setUp() + self.core.tracklist.clear() + tl_tracks = self.core.tracklist.add([Track(uri='unplayable://')]) + self.core.playback._set_current_tl_track(tl_tracks[0]) + + def test_pause_changes_state_even_if_track_is_unplayable(self): + self.core.playback.pause() + self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED) + + def test_resume_does_nothing_if_track_is_unplayable(self): + self.core.playback.state = core.PlaybackState.PAUSED + self.core.playback.resume() + + self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED) + + def test_stop_changes_state_even_if_track_is_unplayable(self): + self.core.playback.state = core.PlaybackState.PAUSED + self.core.playback.stop() + + self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED) + + def test_time_position_returns_0_if_track_is_unplayable(self): + result = self.core.playback.time_position + + self.assertEqual(result, 0) + + def test_seek_fails_for_unplayable_track(self): + self.core.playback.state = core.PlaybackState.PLAYING + success = self.core.playback.seek(1000) + + self.assertFalse(success) + + +class SeekTest(BaseTest): + + def test_seek_normalizes_negative_positions_to_zero(self): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + + self.core.playback.seek(-100) # Dummy audio doesn't progress time. + self.assertEqual(0, self.core.playback.get_time_position()) + + def test_seek_fails_for_track_without_duration(self): + track = self.tracks[0].replace(length=None) + self.core.tracklist.clear() + self.core.tracklist.add([track]) + + self.core.playback.play() + self.replay_events() + + self.assertFalse(self.core.playback.seek(1000)) + self.assertEqual(0, self.core.playback.get_time_position()) + + def test_seek_play_stay_playing(self): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.replay_events() + + self.core.playback.seek(1000) + self.assertEqual(self.core.playback.state, core.PlaybackState.PLAYING) + + def test_seek_paused_stay_paused(self): + tl_tracks = self.core.tracklist.get_tl_tracks() + + self.core.playback.play(tl_tracks[0]) + self.core.playback.pause() + self.replay_events() + + self.core.playback.seek(1000) + self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED) + + +class TestStream(BaseTest): + + def test_get_stream_title_before_playback(self): + self.assertEqual(self.playback.get_stream_title(), None) + + def test_get_stream_title_during_playback(self): + self.core.playback.play() + self.replay_events() + + self.assertEqual(self.playback.get_stream_title(), None) + + def test_get_stream_title_during_playback_with_tags_change(self): + self.core.playback.play() + self.audio.trigger_fake_tags_changed({'organization': ['baz']}) + self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get() + self.replay_events() + + self.assertEqual(self.playback.get_stream_title(), 'foobar') + + def test_get_stream_title_after_next(self): + self.core.playback.play() + self.audio.trigger_fake_tags_changed({'organization': ['baz']}) + self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get() + self.replay_events() + + self.core.playback.next() + self.replay_events() + + self.assertEqual(self.playback.get_stream_title(), None) + + def test_get_stream_title_after_next_with_tags_change(self): + self.core.playback.play() + self.audio.trigger_fake_tags_changed({'organization': ['baz']}) + self.audio.trigger_fake_tags_changed({'title': ['foo']}).get() + self.replay_events() + + self.core.playback.next() + self.audio.trigger_fake_tags_changed({'organization': ['baz']}) + self.audio.trigger_fake_tags_changed({'title': ['bar']}).get() + self.replay_events() + + self.assertEqual(self.playback.get_stream_title(), 'bar') + + def test_get_stream_title_after_stop(self): + self.core.playback.play() + self.audio.trigger_fake_tags_changed({'organization': ['baz']}) + self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get() + self.replay_events() + + self.core.playback.stop() + self.replay_events() + self.assertEqual(self.playback.get_stream_title(), None) + + +class BackendSelectionTest(unittest.TestCase): + + def setUp(self): # noqa: N802 + config = { + 'core': { + 'max_tracklist_length': 10000, + } + } + + self.backend1 = mock.Mock() + self.backend1.uri_schemes.get.return_value = ['dummy1'] + self.playback1 = mock.Mock(spec=backend.PlaybackProvider) + self.backend1.playback = self.playback1 + + self.backend2 = mock.Mock() + self.backend2.uri_schemes.get.return_value = ['dummy2'] + self.playback2 = mock.Mock(spec=backend.PlaybackProvider) + self.backend2.playback = self.playback2 + + self.tracks = [ + Track(uri='dummy1:a', length=40000), + Track(uri='dummy2:a', length=40000), + ] + + self.core = core.Core(config, mixer=None, backends=[ + self.backend1, self.backend2]) + + self.tl_tracks = self.core.tracklist.add(self.tracks) + + def trigger_stream_changed(self): + pending = self.core.playback._pending_tl_track + if pending: + self.core.stream_changed(uri=pending.track.uri) + else: + self.core.stream_changed(uri=None) + + def test_play_selects_dummy1_backend(self): + self.core.playback.play(self.tl_tracks[0]) + self.trigger_stream_changed() + + self.playback1.prepare_change.assert_called_once_with() + self.playback1.change_track.assert_called_once_with(self.tracks[0]) + self.playback1.play.assert_called_once_with() + self.assertFalse(self.playback2.play.called) + + def test_play_selects_dummy2_backend(self): + self.core.playback.play(self.tl_tracks[1]) + self.trigger_stream_changed() + + self.assertFalse(self.playback1.play.called) + self.playback2.prepare_change.assert_called_once_with() + self.playback2.change_track.assert_called_once_with(self.tracks[1]) + self.playback2.play.assert_called_once_with() + + def test_pause_selects_dummy1_backend(self): + self.core.playback.play(self.tl_tracks[0]) + self.trigger_stream_changed() + + self.core.playback.pause() + + self.playback1.pause.assert_called_once_with() + self.assertFalse(self.playback2.pause.called) + + def test_pause_selects_dummy2_backend(self): + self.core.playback.play(self.tl_tracks[1]) + self.trigger_stream_changed() + + self.core.playback.pause() + + self.assertFalse(self.playback1.pause.called) + self.playback2.pause.assert_called_once_with() + + def test_resume_selects_dummy1_backend(self): + self.core.playback.play(self.tl_tracks[0]) + self.trigger_stream_changed() + + self.core.playback.pause() + self.core.playback.resume() + + self.playback1.resume.assert_called_once_with() + self.assertFalse(self.playback2.resume.called) + + def test_resume_selects_dummy2_backend(self): + self.core.playback.play(self.tl_tracks[1]) + self.trigger_stream_changed() + + self.core.playback.pause() + self.core.playback.resume() + + self.assertFalse(self.playback1.resume.called) + self.playback2.resume.assert_called_once_with() + + def test_stop_selects_dummy1_backend(self): + self.core.playback.play(self.tl_tracks[0]) + self.trigger_stream_changed() + + self.core.playback.stop() + self.trigger_stream_changed() + + self.playback1.stop.assert_called_once_with() + self.assertFalse(self.playback2.stop.called) + + def test_stop_selects_dummy2_backend(self): + self.core.playback.play(self.tl_tracks[1]) + self.trigger_stream_changed() + + self.core.playback.stop() + self.trigger_stream_changed() + + self.assertFalse(self.playback1.stop.called) + self.playback2.stop.assert_called_once_with() + + def test_seek_selects_dummy1_backend(self): + self.core.playback.play(self.tl_tracks[0]) + self.trigger_stream_changed() + + self.core.playback.seek(10000) + + self.playback1.seek.assert_called_once_with(10000) + self.assertFalse(self.playback2.seek.called) + + def test_seek_selects_dummy2_backend(self): + self.core.playback.play(self.tl_tracks[1]) + self.trigger_stream_changed() + + self.core.playback.seek(10000) + + self.assertFalse(self.playback1.seek.called) + self.playback2.seek.assert_called_once_with(10000) + def test_time_position_selects_dummy1_backend(self): self.core.playback.play(self.tl_tracks[0]) + self.trigger_stream_changed() + self.core.playback.seek(10000) self.core.playback.time_position @@ -760,113 +810,14 @@ class CorePlaybackTest(unittest.TestCase): def test_time_position_selects_dummy2_backend(self): self.core.playback.play(self.tl_tracks[1]) + self.trigger_stream_changed() + self.core.playback.seek(10000) self.core.playback.time_position self.assertFalse(self.playback1.get_time_position.called) self.playback2.get_time_position.assert_called_once_with() - def test_time_position_returns_0_if_track_is_unplayable(self): - self.set_current_tl_track(self.unplayable_tl_track) - - result = self.core.playback.time_position - - self.assertEqual(result, 0) - self.assertFalse(self.playback1.get_time_position.called) - self.assertFalse(self.playback2.get_time_position.called) - - # TODO Test on_tracklist_change - - -class TestStream(unittest.TestCase): - - def setUp(self): # noqa: N802 - config = { - 'core': { - 'max_tracklist_length': 10000, - } - } - self.audio = dummy_audio.DummyAudio.start().proxy() - self.backend = TestBackend.start(config={}, audio=self.audio).proxy() - self.core = core.Core( - config, audio=self.audio, backends=[self.backend]) - self.playback = self.core.playback - - self.tracks = [Track(uri='dummy:a', length=1234), - Track(uri='dummy:b', length=1234)] - - self.lookup_patcher = mock.patch.object(self.core.library, 'lookup') - self.lookup_mock = self.lookup_patcher.start() - self.lookup_mock.return_value = {t.uri: [t] for t in self.tracks} - - self.core.tracklist.add(uris=[t.uri for t in self.tracks]) - - self.events = [] - self.send_patcher = mock.patch( - 'mopidy.audio.listener.AudioListener.send') - self.send_mock = self.send_patcher.start() - - def send(event, **kwargs): - self.events.append((event, kwargs)) - - self.send_mock.side_effect = send - - def tearDown(self): # noqa: N802 - pykka.ActorRegistry.stop_all() - self.lookup_patcher.stop() - self.send_patcher.stop() - - def replay_audio_events(self): - while self.events: - event, kwargs = self.events.pop(0) - self.core.on_event(event, **kwargs) - - def test_get_stream_title_before_playback(self): - self.assertEqual(self.playback.get_stream_title(), None) - - def test_get_stream_title_during_playback(self): - self.core.playback.play() - - self.replay_audio_events() - self.assertEqual(self.playback.get_stream_title(), None) - - def test_get_stream_title_during_playback_with_tags_change(self): - self.core.playback.play() - self.audio.trigger_fake_tags_changed({'organization': ['baz']}) - self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get() - - self.replay_audio_events() - self.assertEqual(self.playback.get_stream_title(), 'foobar') - - def test_get_stream_title_after_next(self): - self.core.playback.play() - self.audio.trigger_fake_tags_changed({'organization': ['baz']}) - self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get() - self.core.playback.next() - - self.replay_audio_events() - self.assertEqual(self.playback.get_stream_title(), None) - - def test_get_stream_title_after_next_with_tags_change(self): - self.core.playback.play() - self.audio.trigger_fake_tags_changed({'organization': ['baz']}) - self.audio.trigger_fake_tags_changed({'title': ['foo']}).get() - self.core.playback.next() - self.audio.trigger_fake_tags_changed({'organization': ['baz']}) - self.audio.trigger_fake_tags_changed({'title': ['bar']}).get() - - self.replay_audio_events() - self.assertEqual(self.playback.get_stream_title(), 'bar') - - def test_get_stream_title_after_stop(self): - self.core.playback.play() - self.audio.trigger_fake_tags_changed({'organization': ['baz']}) - self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get() - self.core.playback.stop() - - self.replay_audio_events() - self.assertEqual(self.playback.get_stream_title(), None) - class CorePlaybackWithOldBackendTest(unittest.TestCase): @@ -891,31 +842,6 @@ class CorePlaybackWithOldBackendTest(unittest.TestCase): b.playback.play.assert_called_once_with() -class TestPlay(unittest.TestCase): - - def setUp(self): # noqa: N802 - config = { - 'core': { - 'max_tracklist_length': 10000, - } - } - - self.backend = mock.Mock() - self.backend.uri_schemes.get.return_value = ['dummy'] - self.core = core.Core(config, backends=[self.backend]) - - self.tracks = [Track(uri='dummy:a', length=1234), - Track(uri='dummy:b', length=1234)] - - with deprecation.ignore('core.tracklist.add:tracks_arg'): - self.tl_tracks = self.core.tracklist.add(tracks=self.tracks) - - def test_play_tlid(self): - self.core.playback.play(tlid=self.tl_tracks[1].tlid) - self.backend.playback.change_track.assert_called_once_with( - self.tl_tracks[1].track) - - class Bug1177RegressionTest(unittest.TestCase): def test(self): config = {