- Adds tests for new behaviors in core. - Adds stream name to MPD format (fixes #944) - Adds 'stream_changed' core event (needs a new name/event) - Adds 'get_stream_reference' (which I'm also unsure about) The bits I'm unsure about are mostly with respect to #270, but I'm going ahead with this commit so we can discuss the details in PR with this code as an example.
616 lines
22 KiB
Python
616 lines
22 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
|
|
import unittest
|
|
|
|
import mock
|
|
|
|
import pykka
|
|
|
|
from mopidy import backend, core
|
|
from mopidy.models import Ref, Track
|
|
|
|
from tests import dummy_audio as audio
|
|
|
|
|
|
class CorePlaybackTest(unittest.TestCase):
|
|
def setUp(self): # noqa: N802
|
|
self.backend1 = mock.Mock()
|
|
self.backend1.uri_schemes.get.return_value = ['dummy1']
|
|
self.playback1 = mock.Mock(spec=backend.PlaybackProvider)
|
|
self.playback1.get_time_position().get.return_value = 1000
|
|
self.playback1.reset_mock()
|
|
self.backend1.playback = self.playback1
|
|
|
|
self.backend2 = mock.Mock()
|
|
self.backend2.uri_schemes.get.return_value = ['dummy2']
|
|
self.playback2 = mock.Mock(spec=backend.PlaybackProvider)
|
|
self.playback2.get_time_position().get.return_value = 2000
|
|
self.playback2.reset_mock()
|
|
self.backend2.playback = self.playback2
|
|
|
|
# A backend without the optional playback provider
|
|
self.backend3 = mock.Mock()
|
|
self.backend3.uri_schemes.get.return_value = ['dummy3']
|
|
self.backend3.has_playback().get.return_value = False
|
|
|
|
self.tracks = [
|
|
Track(uri='dummy1:a', length=40000),
|
|
Track(uri='dummy2:a', length=40000),
|
|
Track(uri='dummy3:a', length=40000), # Unplayable
|
|
Track(uri='dummy1:b', length=40000),
|
|
]
|
|
|
|
self.core = core.Core(mixer=None, backends=[
|
|
self.backend1, self.backend2, self.backend3])
|
|
self.core.tracklist.add(self.tracks)
|
|
|
|
self.tl_tracks = self.core.tracklist.tl_tracks
|
|
self.unplayable_tl_track = self.tl_tracks[2]
|
|
|
|
def test_get_current_tl_track_none(self):
|
|
self.core.playback.set_current_tl_track(None)
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), None)
|
|
|
|
def test_get_current_tl_track_play(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
|
|
|
|
def test_get_current_tl_track_next(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.next()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[1])
|
|
|
|
def test_get_current_tl_track_prev(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.previous()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
|
|
|
|
def test_get_current_track_play(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[0])
|
|
|
|
def test_get_current_track_next(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.next()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[1])
|
|
|
|
def test_get_current_track_prev(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.previous()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[0])
|
|
|
|
# TODO Test state
|
|
|
|
def test_play_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.playback1.play.assert_called_once_with(self.tracks[0])
|
|
self.assertFalse(self.playback2.play.called)
|
|
|
|
def test_play_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
|
|
self.assertFalse(self.playback1.play.called)
|
|
self.playback2.play.assert_called_once_with(self.tracks[1])
|
|
|
|
def test_play_skips_to_next_on_unplayable_track(self):
|
|
self.core.playback.play(self.unplayable_tl_track)
|
|
|
|
self.playback1.play.assert_called_once_with(self.tracks[3])
|
|
self.assertFalse(self.playback2.play.called)
|
|
|
|
self.assertEqual(
|
|
self.core.playback.current_tl_track, self.tl_tracks[3])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_stopped_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[0]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_paused_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='paused', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_playing_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.play(self.tl_tracks[3])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[3]),
|
|
])
|
|
|
|
def test_pause_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
|
|
self.playback1.pause.assert_called_once_with()
|
|
self.assertFalse(self.playback2.pause.called)
|
|
|
|
def test_pause_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.pause()
|
|
|
|
self.assertFalse(self.playback1.pause.called)
|
|
self.playback2.pause.assert_called_once_with()
|
|
|
|
def test_pause_changes_state_even_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.pause()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
self.assertFalse(self.playback1.pause.called)
|
|
self.assertFalse(self.playback2.pause.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_pause_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.pause()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='paused'),
|
|
mock.call(
|
|
'track_playback_paused',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
def test_resume_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
self.core.playback.resume()
|
|
|
|
self.playback1.resume.assert_called_once_with()
|
|
self.assertFalse(self.playback2.resume.called)
|
|
|
|
def test_resume_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.pause()
|
|
self.core.playback.resume()
|
|
|
|
self.assertFalse(self.playback1.resume.called)
|
|
self.playback2.resume.assert_called_once_with()
|
|
|
|
def test_resume_does_nothing_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.resume()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
self.assertFalse(self.playback1.resume.called)
|
|
self.assertFalse(self.playback2.resume.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_resume_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.resume()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='paused', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_resumed',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
def test_stop_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.stop()
|
|
|
|
self.playback1.stop.assert_called_once_with()
|
|
self.assertFalse(self.playback2.stop.called)
|
|
|
|
def test_stop_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.stop()
|
|
|
|
self.assertFalse(self.playback1.stop.called)
|
|
self.playback2.stop.assert_called_once_with()
|
|
|
|
def test_stop_changes_state_even_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.stop()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED)
|
|
self.assertFalse(self.playback1.stop.called)
|
|
self.assertFalse(self.playback2.stop.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_stop_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.stop()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
# TODO Test next() more
|
|
|
|
def test_next_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_next_in_consume_mode_removes_finished_track(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_next_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
# TODO Test previous() more
|
|
|
|
def test_previous_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[1]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_previous_keeps_finished_track_even_in_consume_mode(self):
|
|
tl_track = self.tl_tracks[1]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_previous_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[1], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[0]),
|
|
])
|
|
|
|
# TODO Test on_end_of_track() more
|
|
|
|
def test_on_end_of_track_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.on_end_of_track()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_on_end_of_track_in_consume_mode_removes_finished_track(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.on_end_of_track()
|
|
|
|
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_on_end_of_track_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.on_end_of_track()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_seek_past_end_of_track_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.seek(self.tracks[0].length * 5)
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
def test_seek_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.seek(10000)
|
|
|
|
self.playback1.seek.assert_called_once_with(10000)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.seek(10000)
|
|
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.playback2.seek.assert_called_once_with(10000)
|
|
|
|
def test_seek_fails_for_unplayable_track(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
success = self.core.playback.seek(1000)
|
|
|
|
self.assertFalse(success)
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_play_stay_playing(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
self.core.playback.seek(1000)
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PLAYING)
|
|
|
|
def test_seek_paused_stay_paused(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.seek(1000)
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_seek_emits_seeked_event(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.seek(1000)
|
|
|
|
listener_mock.send.assert_called_once_with(
|
|
'seeked', time_position=1000)
|
|
|
|
def test_time_position_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.seek(10000)
|
|
self.core.playback.time_position
|
|
|
|
self.playback1.get_time_position.assert_called_once_with()
|
|
self.assertFalse(self.playback2.get_time_position.called)
|
|
|
|
def test_time_position_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.seek(10000)
|
|
self.core.playback.time_position
|
|
|
|
self.assertFalse(self.playback1.get_time_position.called)
|
|
self.playback2.get_time_position.assert_called_once_with()
|
|
|
|
def test_time_position_returns_0_if_track_is_unplayable(self):
|
|
self.core.playback.current_tl_track = self.unplayable_tl_track
|
|
|
|
result = self.core.playback.time_position
|
|
|
|
self.assertEqual(result, 0)
|
|
self.assertFalse(self.playback1.get_time_position.called)
|
|
self.assertFalse(self.playback2.get_time_position.called)
|
|
|
|
# TODO Test on_tracklist_change
|
|
|
|
|
|
# Since we rely on our DummyAudio to actually emit events we need a "real"
|
|
# backend and not a mock so the right calls make it through to audio.
|
|
class TestBackend(pykka.ThreadingActor, backend.Backend):
|
|
uri_schemes = ['dummy']
|
|
|
|
def __init__(self, config, audio):
|
|
super(TestBackend, self).__init__()
|
|
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
|
|
|
|
|
|
class TestStream(unittest.TestCase):
|
|
def setUp(self): # noqa: N802
|
|
self.audio = audio.DummyAudio.start().proxy()
|
|
self.backend = TestBackend.start(config={}, audio=self.audio).proxy()
|
|
self.core = core.Core(audio=self.audio, backends=[self.backend])
|
|
self.playback = self.core.playback
|
|
|
|
self.tracks = [Track(uri='dummy:a', length=1234),
|
|
Track(uri='dummy:b', length=1234)]
|
|
|
|
self.core.tracklist.add(self.tracks)
|
|
|
|
self.events = []
|
|
self.patcher = mock.patch('mopidy.audio.listener.AudioListener.send')
|
|
self.send_mock = self.patcher.start()
|
|
|
|
def send(event, **kwargs):
|
|
self.events.append((event, kwargs))
|
|
|
|
self.send_mock.side_effect = send
|
|
|
|
def tearDown(self): # noqa: N802
|
|
pykka.ActorRegistry.stop_all()
|
|
self.patcher.stop()
|
|
|
|
def replay_audio_events(self):
|
|
while self.events:
|
|
event, kwargs = self.events.pop(0)
|
|
self.core.on_event(event, **kwargs)
|
|
|
|
def test_get_stream_reference_before_playback(self):
|
|
self.assertEqual(self.playback.get_stream_reference(), None)
|
|
|
|
def test_get_stream_reference_during_playback(self):
|
|
self.core.playback.play()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_reference(), None)
|
|
|
|
def test_get_stream_reference_during_playback_with_tags_change(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
|
|
self.replay_audio_events()
|
|
expected = Ref.track(name='foobar')
|
|
self.assertEqual(self.playback.get_stream_reference(), expected)
|
|
|
|
def test_get_stream_reference_after_next(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
self.core.playback.next()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_reference(), None)
|
|
|
|
def test_get_stream_reference_after_next_with_tags_change(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'title': ['foo']}).get()
|
|
self.core.playback.next()
|
|
self.audio.trigger_fake_tags_changed({'title': ['bar']}).get()
|
|
|
|
self.replay_audio_events()
|
|
expected = Ref.track(name='bar')
|
|
self.assertEqual(self.playback.get_stream_reference(), expected)
|
|
|
|
def test_get_stream_reference_after_stop(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
self.core.playback.stop()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_reference(), None)
|