While trying to remove traces of stop calls in core to get gapless working I found we had no way to switch to switch tracks without triggering a play. This change fixes this by changing the backends playback provider API. - play() now _only_ starts playback and does not take any arguments. - prepare_change() has been added, this could have been avoided with a kwarg to change_track(track), but that would break more backends. - core has been updated to call prepare_change+change_track+play as needed. - tests have been updated to handle this change. Longer term I hope to completely rework the playback API in backends, as 99% of our backends only use change_track(track) to translate URIs. So we should make simple case simple, and handle mopidy-spotify / appsrc in some other way. Cherry picked from the WIP gapless branch.
636 lines
23 KiB
Python
636 lines
23 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
|
|
import unittest
|
|
|
|
import mock
|
|
|
|
import pykka
|
|
|
|
from mopidy import backend, core
|
|
from mopidy.models import Track
|
|
|
|
from tests import dummy_audio as audio
|
|
|
|
|
|
class CorePlaybackTest(unittest.TestCase):
|
|
def setUp(self): # noqa: N802
|
|
self.backend1 = mock.Mock()
|
|
self.backend1.uri_schemes.get.return_value = ['dummy1']
|
|
self.playback1 = mock.Mock(spec=backend.PlaybackProvider)
|
|
self.playback1.get_time_position().get.return_value = 1000
|
|
self.playback1.reset_mock()
|
|
self.backend1.playback = self.playback1
|
|
|
|
self.backend2 = mock.Mock()
|
|
self.backend2.uri_schemes.get.return_value = ['dummy2']
|
|
self.playback2 = mock.Mock(spec=backend.PlaybackProvider)
|
|
self.playback2.get_time_position().get.return_value = 2000
|
|
self.playback2.reset_mock()
|
|
self.backend2.playback = self.playback2
|
|
|
|
# A backend without the optional playback provider
|
|
self.backend3 = mock.Mock()
|
|
self.backend3.uri_schemes.get.return_value = ['dummy3']
|
|
self.backend3.has_playback().get.return_value = False
|
|
|
|
self.tracks = [
|
|
Track(uri='dummy1:a', length=40000),
|
|
Track(uri='dummy2:a', length=40000),
|
|
Track(uri='dummy3:a', length=40000), # Unplayable
|
|
Track(uri='dummy1:b', length=40000),
|
|
Track(uri='dummy1:c', length=None), # No duration
|
|
]
|
|
|
|
self.core = core.Core(mixer=None, backends=[
|
|
self.backend1, self.backend2, self.backend3])
|
|
self.core.tracklist.add(self.tracks)
|
|
|
|
self.tl_tracks = self.core.tracklist.tl_tracks
|
|
self.unplayable_tl_track = self.tl_tracks[2]
|
|
self.duration_less_tl_track = self.tl_tracks[4]
|
|
|
|
def test_get_current_tl_track_none(self):
|
|
self.core.playback.set_current_tl_track(None)
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), None)
|
|
|
|
def test_get_current_tl_track_play(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
|
|
|
|
def test_get_current_tl_track_next(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.next()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[1])
|
|
|
|
def test_get_current_tl_track_prev(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.previous()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
|
|
|
|
def test_get_current_track_play(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[0])
|
|
|
|
def test_get_current_track_next(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.next()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[1])
|
|
|
|
def test_get_current_track_prev(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.previous()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[0])
|
|
|
|
# TODO Test state
|
|
|
|
def test_play_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.playback1.prepare_change.assert_called_once_with()
|
|
self.playback1.change_track.assert_called_once_with(self.tracks[0])
|
|
self.playback1.play.assert_called_once_with()
|
|
self.assertFalse(self.playback2.play.called)
|
|
|
|
def test_play_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
|
|
self.assertFalse(self.playback1.play.called)
|
|
self.playback2.prepare_change.assert_called_once_with()
|
|
self.playback2.change_track.assert_called_once_with(self.tracks[1])
|
|
self.playback2.play.assert_called_once_with()
|
|
|
|
def test_play_skips_to_next_on_unplayable_track(self):
|
|
self.core.playback.play(self.unplayable_tl_track)
|
|
|
|
self.playback1.prepare_change.assert_called_once_with()
|
|
self.playback1.change_track.assert_called_once_with(self.tracks[3])
|
|
self.playback1.play.assert_called_once_with()
|
|
self.assertFalse(self.playback2.play.called)
|
|
|
|
self.assertEqual(
|
|
self.core.playback.current_tl_track, self.tl_tracks[3])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_stopped_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[0]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_paused_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='paused', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_playing_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.play(self.tl_tracks[3])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[3]),
|
|
])
|
|
|
|
def test_pause_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
|
|
self.playback1.pause.assert_called_once_with()
|
|
self.assertFalse(self.playback2.pause.called)
|
|
|
|
def test_pause_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.pause()
|
|
|
|
self.assertFalse(self.playback1.pause.called)
|
|
self.playback2.pause.assert_called_once_with()
|
|
|
|
def test_pause_changes_state_even_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.pause()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
self.assertFalse(self.playback1.pause.called)
|
|
self.assertFalse(self.playback2.pause.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_pause_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.pause()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='paused'),
|
|
mock.call(
|
|
'track_playback_paused',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
def test_resume_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
self.core.playback.resume()
|
|
|
|
self.playback1.resume.assert_called_once_with()
|
|
self.assertFalse(self.playback2.resume.called)
|
|
|
|
def test_resume_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.pause()
|
|
self.core.playback.resume()
|
|
|
|
self.assertFalse(self.playback1.resume.called)
|
|
self.playback2.resume.assert_called_once_with()
|
|
|
|
def test_resume_does_nothing_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.resume()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
self.assertFalse(self.playback1.resume.called)
|
|
self.assertFalse(self.playback2.resume.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_resume_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.resume()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='paused', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_resumed',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
def test_stop_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.stop()
|
|
|
|
self.playback1.stop.assert_called_once_with()
|
|
self.assertFalse(self.playback2.stop.called)
|
|
|
|
def test_stop_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.stop()
|
|
|
|
self.assertFalse(self.playback1.stop.called)
|
|
self.playback2.stop.assert_called_once_with()
|
|
|
|
def test_stop_changes_state_even_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.stop()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED)
|
|
self.assertFalse(self.playback1.stop.called)
|
|
self.assertFalse(self.playback2.stop.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_stop_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.stop()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
# TODO Test next() more
|
|
|
|
def test_next_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_next_in_consume_mode_removes_finished_track(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_next_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
# TODO Test previous() more
|
|
|
|
def test_previous_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[1]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_previous_keeps_finished_track_even_in_consume_mode(self):
|
|
tl_track = self.tl_tracks[1]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_previous_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[1], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[0]),
|
|
])
|
|
|
|
# TODO Test on_end_of_track() more
|
|
|
|
def test_on_end_of_track_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.on_end_of_track()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_on_end_of_track_in_consume_mode_removes_finished_track(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.on_end_of_track()
|
|
|
|
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_on_end_of_track_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.on_end_of_track()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_seek_past_end_of_track_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.seek(self.tracks[0].length * 5)
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
def test_seek_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.seek(10000)
|
|
|
|
self.playback1.seek.assert_called_once_with(10000)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.seek(10000)
|
|
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.playback2.seek.assert_called_once_with(10000)
|
|
|
|
def test_seek_fails_for_unplayable_track(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
success = self.core.playback.seek(1000)
|
|
|
|
self.assertFalse(success)
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_fails_for_track_without_duration(self):
|
|
self.core.playback.current_tl_track = self.duration_less_tl_track
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
success = self.core.playback.seek(1000)
|
|
|
|
self.assertFalse(success)
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_play_stay_playing(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
self.core.playback.seek(1000)
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PLAYING)
|
|
|
|
def test_seek_paused_stay_paused(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.seek(1000)
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_seek_emits_seeked_event(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.seek(1000)
|
|
|
|
listener_mock.send.assert_called_once_with(
|
|
'seeked', time_position=1000)
|
|
|
|
def test_time_position_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.seek(10000)
|
|
self.core.playback.time_position
|
|
|
|
self.playback1.get_time_position.assert_called_once_with()
|
|
self.assertFalse(self.playback2.get_time_position.called)
|
|
|
|
def test_time_position_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.seek(10000)
|
|
self.core.playback.time_position
|
|
|
|
self.assertFalse(self.playback1.get_time_position.called)
|
|
self.playback2.get_time_position.assert_called_once_with()
|
|
|
|
def test_time_position_returns_0_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
|
|
result = self.core.playback.time_position
|
|
|
|
self.assertEqual(result, 0)
|
|
self.assertFalse(self.playback1.get_time_position.called)
|
|
self.assertFalse(self.playback2.get_time_position.called)
|
|
|
|
# TODO Test on_tracklist_change
|
|
|
|
|
|
# Since we rely on our DummyAudio to actually emit events we need a "real"
|
|
# backend and not a mock so the right calls make it through to audio.
|
|
class TestBackend(pykka.ThreadingActor, backend.Backend):
|
|
uri_schemes = ['dummy']
|
|
|
|
def __init__(self, config, audio):
|
|
super(TestBackend, self).__init__()
|
|
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
|
|
|
|
|
|
class TestStream(unittest.TestCase):
|
|
def setUp(self): # noqa: N802
|
|
self.audio = audio.DummyAudio.start().proxy()
|
|
self.backend = TestBackend.start(config={}, audio=self.audio).proxy()
|
|
self.core = core.Core(audio=self.audio, backends=[self.backend])
|
|
self.playback = self.core.playback
|
|
|
|
self.tracks = [Track(uri='dummy:a', length=1234),
|
|
Track(uri='dummy:b', length=1234)]
|
|
|
|
self.core.tracklist.add(self.tracks)
|
|
|
|
self.events = []
|
|
self.patcher = mock.patch('mopidy.audio.listener.AudioListener.send')
|
|
self.send_mock = self.patcher.start()
|
|
|
|
def send(event, **kwargs):
|
|
self.events.append((event, kwargs))
|
|
|
|
self.send_mock.side_effect = send
|
|
|
|
def tearDown(self): # noqa: N802
|
|
pykka.ActorRegistry.stop_all()
|
|
self.patcher.stop()
|
|
|
|
def replay_audio_events(self):
|
|
while self.events:
|
|
event, kwargs = self.events.pop(0)
|
|
self.core.on_event(event, **kwargs)
|
|
|
|
def test_get_stream_title_before_playback(self):
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|
|
|
|
def test_get_stream_title_during_playback(self):
|
|
self.core.playback.play()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|
|
|
|
def test_get_stream_title_during_playback_with_tags_change(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), 'foobar')
|
|
|
|
def test_get_stream_title_after_next(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
self.core.playback.next()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|
|
|
|
def test_get_stream_title_after_next_with_tags_change(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foo']}).get()
|
|
self.core.playback.next()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['bar']}).get()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), 'bar')
|
|
|
|
def test_get_stream_title_after_stop(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
self.core.playback.stop()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|