mopidy/tests/core/test_playback.py
2015-04-22 20:55:51 +02:00

721 lines
26 KiB
Python

from __future__ import absolute_import, unicode_literals
import unittest
import mock
import pykka
from mopidy import backend, core
from mopidy.models import Track
from mopidy.utils import deprecation
from tests import dummy_audio as audio
# TODO: split into smaller easier to follow tests. setup is way to complex.
# TODO: just mock tracklist?
class CorePlaybackTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.backend1 = mock.Mock()
self.backend1.uri_schemes.get.return_value = ['dummy1']
self.playback1 = mock.Mock(spec=backend.PlaybackProvider)
self.playback1.get_time_position().get.return_value = 1000
self.playback1.reset_mock()
self.backend1.playback = self.playback1
self.backend2 = mock.Mock()
self.backend2.uri_schemes.get.return_value = ['dummy2']
self.playback2 = mock.Mock(spec=backend.PlaybackProvider)
self.playback2.get_time_position().get.return_value = 2000
self.playback2.reset_mock()
self.backend2.playback = self.playback2
# A backend without the optional playback provider
self.backend3 = mock.Mock()
self.backend3.uri_schemes.get.return_value = ['dummy3']
self.backend3.has_playback().get.return_value = False
self.tracks = [
Track(uri='dummy1:a', length=40000),
Track(uri='dummy2:a', length=40000),
Track(uri='dummy3:a', length=40000), # Unplayable
Track(uri='dummy1:b', length=40000),
Track(uri='dummy1:c', length=None), # No duration
]
self.uris = [
'dummy1:a', 'dummy2:a', 'dummy3:a', 'dummy1:b', 'dummy1:c']
self.core = core.Core(mixer=None, backends=[
self.backend1, self.backend2, self.backend3])
def lookup(uris):
result = {uri: [] for uri in uris}
for track in self.tracks:
if track.uri in result:
result[track.uri].append(track)
return result
self.lookup_patcher = mock.patch.object(self.core.library, 'lookup')
self.lookup_mock = self.lookup_patcher.start()
self.lookup_mock.side_effect = lookup
self.core.tracklist.add(uris=self.uris)
self.tl_tracks = self.core.tracklist.tl_tracks
self.unplayable_tl_track = self.tl_tracks[2]
self.duration_less_tl_track = self.tl_tracks[4]
def tearDown(self): # noqa: N802
self.lookup_patcher.stop()
def trigger_end_of_track(self):
self.core.playback._on_end_of_track()
def set_current_tl_track(self, tl_track):
self.core.playback._set_current_tl_track(tl_track)
def test_get_current_tl_track_none(self):
self.set_current_tl_track(None)
self.assertEqual(
self.core.playback.get_current_tl_track(), None)
def test_get_current_tl_track_play(self):
self.core.playback.play(self.tl_tracks[0])
self.assertEqual(
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
def test_get_current_tl_track_next(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.next()
self.assertEqual(
self.core.playback.get_current_tl_track(), self.tl_tracks[1])
def test_get_current_tl_track_prev(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.previous()
self.assertEqual(
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
def test_get_current_track_play(self):
self.core.playback.play(self.tl_tracks[0])
self.assertEqual(
self.core.playback.get_current_track(), self.tracks[0])
def test_get_current_track_next(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.next()
self.assertEqual(
self.core.playback.get_current_track(), self.tracks[1])
def test_get_current_track_prev(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.previous()
self.assertEqual(
self.core.playback.get_current_track(), self.tracks[0])
# TODO Test state
def test_play_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.playback1.prepare_change.assert_called_once_with()
self.playback1.change_track.assert_called_once_with(self.tracks[0])
self.playback1.play.assert_called_once_with()
self.assertFalse(self.playback2.play.called)
def test_play_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.assertFalse(self.playback1.play.called)
self.playback2.prepare_change.assert_called_once_with()
self.playback2.change_track.assert_called_once_with(self.tracks[1])
self.playback2.play.assert_called_once_with()
def test_play_skips_to_next_on_track_without_playback_backend(self):
self.core.playback.play(self.unplayable_tl_track)
self.playback1.prepare_change.assert_called_once_with()
self.playback1.change_track.assert_called_once_with(self.tracks[3])
self.playback1.play.assert_called_once_with()
self.assertFalse(self.playback2.play.called)
self.assertEqual(
self.core.playback.current_tl_track, self.tl_tracks[3])
def test_play_skips_to_next_on_unplayable_track(self):
"""Checks that we handle backend.change_track failing."""
self.playback2.change_track.return_value.get.return_value = False
self.core.tracklist.clear()
self.core.tracklist.add(uris=self.uris[:2])
tl_tracks = self.core.tracklist.tl_tracks
self.core.playback.play(tl_tracks[0])
self.core.playback.play(tl_tracks[1])
# TODO: we really want to check that the track was marked unplayable
# and that next was called. This is just an indirect way of checking
# this :(
self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_play_when_stopped_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[0]),
])
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_play_when_paused_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
listener_mock.reset_mock()
self.core.playback.play(self.tl_tracks[1])
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='paused', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_play_when_playing_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.play(self.tl_tracks[3])
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=1000),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[3]),
])
def test_pause_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
self.playback1.pause.assert_called_once_with()
self.assertFalse(self.playback2.pause.called)
def test_pause_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.pause()
self.assertFalse(self.playback1.pause.called)
self.playback2.pause.assert_called_once_with()
def test_pause_changes_state_even_if_track_is_unplayable(self):
self.set_current_tl_track(self.unplayable_tl_track)
self.core.playback.pause()
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
self.assertFalse(self.playback1.pause.called)
self.assertFalse(self.playback2.pause.called)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_pause_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.pause()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='paused'),
mock.call(
'track_playback_paused',
tl_track=self.tl_tracks[0], time_position=1000),
])
def test_resume_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
self.core.playback.resume()
self.playback1.resume.assert_called_once_with()
self.assertFalse(self.playback2.resume.called)
def test_resume_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.pause()
self.core.playback.resume()
self.assertFalse(self.playback1.resume.called)
self.playback2.resume.assert_called_once_with()
def test_resume_does_nothing_if_track_is_unplayable(self):
self.set_current_tl_track(self.unplayable_tl_track)
self.core.playback.state = core.PlaybackState.PAUSED
self.core.playback.resume()
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
self.assertFalse(self.playback1.resume.called)
self.assertFalse(self.playback2.resume.called)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_resume_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
listener_mock.reset_mock()
self.core.playback.resume()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='paused', new_state='playing'),
mock.call(
'track_playback_resumed',
tl_track=self.tl_tracks[0], time_position=1000),
])
def test_stop_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.stop()
self.playback1.stop.assert_called_once_with()
self.assertFalse(self.playback2.stop.called)
def test_stop_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.stop()
self.assertFalse(self.playback1.stop.called)
self.playback2.stop.assert_called_once_with()
def test_stop_changes_state_even_if_track_is_unplayable(self):
self.set_current_tl_track(self.unplayable_tl_track)
self.core.playback.state = core.PlaybackState.PAUSED
self.core.playback.stop()
self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED)
self.assertFalse(self.playback1.stop.called)
self.assertFalse(self.playback2.stop.called)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_stop_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.stop()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=1000),
])
# TODO Test next() more
def test_next_keeps_finished_track_in_tracklist(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.core.playback.next()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
def test_next_in_consume_mode_removes_finished_track(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.core.tracklist.consume = True
self.core.playback.next()
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_next_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.next()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
# TODO Test previous() more
def test_previous_keeps_finished_track_in_tracklist(self):
tl_track = self.tl_tracks[1]
self.core.playback.play(tl_track)
self.core.playback.previous()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
def test_previous_keeps_finished_track_even_in_consume_mode(self):
tl_track = self.tl_tracks[1]
self.core.playback.play(tl_track)
self.core.tracklist.consume = True
self.core.playback.previous()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_previous_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[1])
listener_mock.reset_mock()
self.core.playback.previous()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[1], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[0]),
])
# TODO Test on_end_of_track() more
def test_on_end_of_track_keeps_finished_track_in_tracklist(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.trigger_end_of_track()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
def test_on_end_of_track_in_consume_mode_removes_finished_track(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.core.tracklist.consume = True
self.trigger_end_of_track()
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_on_end_of_track_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.trigger_end_of_track()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_seek_past_end_of_track_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.seek(self.tracks[0].length * 5)
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
def test_seek_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.seek(10000)
self.playback1.seek.assert_called_once_with(10000)
self.assertFalse(self.playback2.seek.called)
def test_seek_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.seek(10000)
self.assertFalse(self.playback1.seek.called)
self.playback2.seek.assert_called_once_with(10000)
def test_seek_fails_for_unplayable_track(self):
self.set_current_tl_track(self.unplayable_tl_track)
self.core.playback.state = core.PlaybackState.PLAYING
success = self.core.playback.seek(1000)
self.assertFalse(success)
self.assertFalse(self.playback1.seek.called)
self.assertFalse(self.playback2.seek.called)
def test_seek_fails_for_track_without_duration(self):
self.set_current_tl_track(self.duration_less_tl_track)
self.core.playback.state = core.PlaybackState.PLAYING
success = self.core.playback.seek(1000)
self.assertFalse(success)
self.assertFalse(self.playback1.seek.called)
self.assertFalse(self.playback2.seek.called)
def test_seek_play_stay_playing(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.state = core.PlaybackState.PLAYING
self.core.playback.seek(1000)
self.assertEqual(self.core.playback.state, core.PlaybackState.PLAYING)
def test_seek_paused_stay_paused(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.state = core.PlaybackState.PAUSED
self.core.playback.seek(1000)
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_seek_emits_seeked_event(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.seek(1000)
listener_mock.send.assert_called_once_with(
'seeked', time_position=1000)
def test_time_position_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.seek(10000)
self.core.playback.time_position
self.playback1.get_time_position.assert_called_once_with()
self.assertFalse(self.playback2.get_time_position.called)
def test_time_position_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.seek(10000)
self.core.playback.time_position
self.assertFalse(self.playback1.get_time_position.called)
self.playback2.get_time_position.assert_called_once_with()
def test_time_position_returns_0_if_track_is_unplayable(self):
self.set_current_tl_track(self.unplayable_tl_track)
result = self.core.playback.time_position
self.assertEqual(result, 0)
self.assertFalse(self.playback1.get_time_position.called)
self.assertFalse(self.playback2.get_time_position.called)
# TODO Test on_tracklist_change
# Since we rely on our DummyAudio to actually emit events we need a "real"
# backend and not a mock so the right calls make it through to audio.
class TestBackend(pykka.ThreadingActor, backend.Backend):
uri_schemes = ['dummy']
def __init__(self, config, audio):
super(TestBackend, self).__init__()
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
class TestStream(unittest.TestCase):
def setUp(self): # noqa: N802
self.audio = audio.DummyAudio.start().proxy()
self.backend = TestBackend.start(config={}, audio=self.audio).proxy()
self.core = core.Core(audio=self.audio, backends=[self.backend])
self.playback = self.core.playback
self.tracks = [Track(uri='dummy:a', length=1234),
Track(uri='dummy:b', length=1234)]
self.lookup_patcher = mock.patch.object(self.core.library, 'lookup')
self.lookup_mock = self.lookup_patcher.start()
self.lookup_mock.return_value = {t.uri: [t] for t in self.tracks}
self.core.tracklist.add(uris=[t.uri for t in self.tracks])
self.events = []
self.send_patcher = mock.patch(
'mopidy.audio.listener.AudioListener.send')
self.send_mock = self.send_patcher.start()
def send(event, **kwargs):
self.events.append((event, kwargs))
self.send_mock.side_effect = send
def tearDown(self): # noqa: N802
pykka.ActorRegistry.stop_all()
self.lookup_patcher.stop()
self.send_patcher.stop()
def replay_audio_events(self):
while self.events:
event, kwargs = self.events.pop(0)
self.core.on_event(event, **kwargs)
def test_get_stream_title_before_playback(self):
self.assertEqual(self.playback.get_stream_title(), None)
def test_get_stream_title_during_playback(self):
self.core.playback.play()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), None)
def test_get_stream_title_during_playback_with_tags_change(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), 'foobar')
def test_get_stream_title_after_next(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
self.core.playback.next()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), None)
def test_get_stream_title_after_next_with_tags_change(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
self.audio.trigger_fake_tags_changed({'title': ['foo']}).get()
self.core.playback.next()
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
self.audio.trigger_fake_tags_changed({'title': ['bar']}).get()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), 'bar')
def test_get_stream_title_after_stop(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
self.core.playback.stop()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), None)
class CorePlaybackWithOldBackendTest(unittest.TestCase):
def test_type_error_from_old_backend_does_not_crash_core(self):
b = mock.Mock()
b.uri_schemes.get.return_value = ['dummy1']
b.playback = mock.Mock(spec=backend.PlaybackProvider)
b.playback.play.side_effect = TypeError
b.library.lookup.return_value.get.return_value = [
Track(uri='dummy1:a', length=40000)]
c = core.Core(mixer=None, backends=[b])
c.tracklist.add(uris=['dummy1:a'])
c.playback.play() # No TypeError == test passed.
class TestPlay(unittest.TestCase):
def setUp(self): # noqa: N802
self.backend = mock.Mock()
self.backend.uri_schemes.get.return_value = ['dummy']
self.core = core.Core(backends=[self.backend])
self.tracks = [Track(uri='dummy:a', length=1234),
Track(uri='dummy:b', length=1234)]
with deprecation.ignore('core.tracklist.add:tracks_arg'):
self.tl_tracks = self.core.tracklist.add(tracks=self.tracks)
def test_play_tlid(self):
self.core.playback.play(tlid=self.tl_tracks[1].tlid)
self.backend.playback.change_track.assert_called_once_with(
self.tl_tracks[1].track)