mopidy/tests/core/test_playback.py
Thomas Adamcik 6adeea6009 core: Correctly handle missing duration in seek.
Seeks will now fail when the duration is None, this is an approximation to if
the track is seekable or not. This check is need as otherwise seeking a radio
stream will trigger the next track.

If the track truly isn't seekable despite having a duration we should still
fail as GStreamer will reject the seek.
2015-03-15 11:29:07 +01:00

625 lines
22 KiB
Python

from __future__ import absolute_import, unicode_literals
import unittest
import mock
import pykka
from mopidy import backend, core
from mopidy.models import Track
from tests import dummy_audio as audio
class CorePlaybackTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.backend1 = mock.Mock()
self.backend1.uri_schemes.get.return_value = ['dummy1']
self.playback1 = mock.Mock(spec=backend.PlaybackProvider)
self.playback1.get_time_position().get.return_value = 1000
self.playback1.reset_mock()
self.backend1.playback = self.playback1
self.backend2 = mock.Mock()
self.backend2.uri_schemes.get.return_value = ['dummy2']
self.playback2 = mock.Mock(spec=backend.PlaybackProvider)
self.playback2.get_time_position().get.return_value = 2000
self.playback2.reset_mock()
self.backend2.playback = self.playback2
# A backend without the optional playback provider
self.backend3 = mock.Mock()
self.backend3.uri_schemes.get.return_value = ['dummy3']
self.backend3.has_playback().get.return_value = False
self.tracks = [
Track(uri='dummy1:a', length=40000),
Track(uri='dummy2:a', length=40000),
Track(uri='dummy3:a', length=40000), # Unplayable
Track(uri='dummy1:b', length=40000),
Track(uri='dummy1:c', length=None), # No duration
]
self.core = core.Core(mixer=None, backends=[
self.backend1, self.backend2, self.backend3])
self.core.tracklist.add(self.tracks)
self.tl_tracks = self.core.tracklist.tl_tracks
self.unplayable_tl_track = self.tl_tracks[2]
self.duration_less_tl_track = self.tl_tracks[4]
def test_get_current_tl_track_none(self):
self.core.playback.set_current_tl_track(None)
self.assertEqual(
self.core.playback.get_current_tl_track(), None)
def test_get_current_tl_track_play(self):
self.core.playback.play(self.tl_tracks[0])
self.assertEqual(
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
def test_get_current_tl_track_next(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.next()
self.assertEqual(
self.core.playback.get_current_tl_track(), self.tl_tracks[1])
def test_get_current_tl_track_prev(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.previous()
self.assertEqual(
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
def test_get_current_track_play(self):
self.core.playback.play(self.tl_tracks[0])
self.assertEqual(
self.core.playback.get_current_track(), self.tracks[0])
def test_get_current_track_next(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.next()
self.assertEqual(
self.core.playback.get_current_track(), self.tracks[1])
def test_get_current_track_prev(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.previous()
self.assertEqual(
self.core.playback.get_current_track(), self.tracks[0])
# TODO Test state
def test_play_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.playback1.play.assert_called_once_with(self.tracks[0])
self.assertFalse(self.playback2.play.called)
def test_play_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.assertFalse(self.playback1.play.called)
self.playback2.play.assert_called_once_with(self.tracks[1])
def test_play_skips_to_next_on_unplayable_track(self):
self.core.playback.play(self.unplayable_tl_track)
self.playback1.play.assert_called_once_with(self.tracks[3])
self.assertFalse(self.playback2.play.called)
self.assertEqual(
self.core.playback.current_tl_track, self.tl_tracks[3])
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_play_when_stopped_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[0]),
])
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_play_when_paused_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
listener_mock.reset_mock()
self.core.playback.play(self.tl_tracks[1])
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='paused', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_play_when_playing_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.play(self.tl_tracks[3])
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=1000),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[3]),
])
def test_pause_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
self.playback1.pause.assert_called_once_with()
self.assertFalse(self.playback2.pause.called)
def test_pause_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.pause()
self.assertFalse(self.playback1.pause.called)
self.playback2.pause.assert_called_once_with()
def test_pause_changes_state_even_if_track_is_unplayable(self):
self.core.playback.current_tl_track = self.unplayable_tl_track
self.core.playback.pause()
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
self.assertFalse(self.playback1.pause.called)
self.assertFalse(self.playback2.pause.called)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_pause_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.pause()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='paused'),
mock.call(
'track_playback_paused',
tl_track=self.tl_tracks[0], time_position=1000),
])
def test_resume_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
self.core.playback.resume()
self.playback1.resume.assert_called_once_with()
self.assertFalse(self.playback2.resume.called)
def test_resume_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.pause()
self.core.playback.resume()
self.assertFalse(self.playback1.resume.called)
self.playback2.resume.assert_called_once_with()
def test_resume_does_nothing_if_track_is_unplayable(self):
self.core.playback.current_tl_track = self.unplayable_tl_track
self.core.playback.state = core.PlaybackState.PAUSED
self.core.playback.resume()
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
self.assertFalse(self.playback1.resume.called)
self.assertFalse(self.playback2.resume.called)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_resume_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.pause()
listener_mock.reset_mock()
self.core.playback.resume()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='paused', new_state='playing'),
mock.call(
'track_playback_resumed',
tl_track=self.tl_tracks[0], time_position=1000),
])
def test_stop_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.stop()
self.playback1.stop.assert_called_once_with()
self.assertFalse(self.playback2.stop.called)
def test_stop_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.stop()
self.assertFalse(self.playback1.stop.called)
self.playback2.stop.assert_called_once_with()
def test_stop_changes_state_even_if_track_is_unplayable(self):
self.core.playback.current_tl_track = self.unplayable_tl_track
self.core.playback.state = core.PlaybackState.PAUSED
self.core.playback.stop()
self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED)
self.assertFalse(self.playback1.stop.called)
self.assertFalse(self.playback2.stop.called)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_stop_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.stop()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=1000),
])
# TODO Test next() more
def test_next_keeps_finished_track_in_tracklist(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.core.playback.next()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
def test_next_in_consume_mode_removes_finished_track(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.core.tracklist.consume = True
self.core.playback.next()
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_next_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.next()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
# TODO Test previous() more
def test_previous_keeps_finished_track_in_tracklist(self):
tl_track = self.tl_tracks[1]
self.core.playback.play(tl_track)
self.core.playback.previous()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
def test_previous_keeps_finished_track_even_in_consume_mode(self):
tl_track = self.tl_tracks[1]
self.core.playback.play(tl_track)
self.core.tracklist.consume = True
self.core.playback.previous()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_previous_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[1])
listener_mock.reset_mock()
self.core.playback.previous()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[1], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[0]),
])
# TODO Test on_end_of_track() more
def test_on_end_of_track_keeps_finished_track_in_tracklist(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.core.playback.on_end_of_track()
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
def test_on_end_of_track_in_consume_mode_removes_finished_track(self):
tl_track = self.tl_tracks[0]
self.core.playback.play(tl_track)
self.core.tracklist.consume = True
self.core.playback.on_end_of_track()
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_on_end_of_track_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.on_end_of_track()
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_seek_past_end_of_track_emits_events(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.seek(self.tracks[0].length * 5)
self.assertListEqual(
listener_mock.send.mock_calls,
[
mock.call(
'playback_state_changed',
old_state='playing', new_state='stopped'),
mock.call(
'track_playback_ended',
tl_track=self.tl_tracks[0], time_position=mock.ANY),
mock.call(
'playback_state_changed',
old_state='stopped', new_state='playing'),
mock.call(
'track_playback_started', tl_track=self.tl_tracks[1]),
])
def test_seek_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.seek(10000)
self.playback1.seek.assert_called_once_with(10000)
self.assertFalse(self.playback2.seek.called)
def test_seek_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.seek(10000)
self.assertFalse(self.playback1.seek.called)
self.playback2.seek.assert_called_once_with(10000)
def test_seek_fails_for_unplayable_track(self):
self.core.playback.current_tl_track = self.unplayable_tl_track
self.core.playback.state = core.PlaybackState.PLAYING
success = self.core.playback.seek(1000)
self.assertFalse(success)
self.assertFalse(self.playback1.seek.called)
self.assertFalse(self.playback2.seek.called)
def test_seek_fails_for_track_without_duration(self):
self.core.playback.current_tl_track = self.duration_less_tl_track
self.core.playback.state = core.PlaybackState.PLAYING
success = self.core.playback.seek(1000)
self.assertFalse(success)
self.assertFalse(self.playback1.seek.called)
self.assertFalse(self.playback2.seek.called)
def test_seek_play_stay_playing(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.state = core.PlaybackState.PLAYING
self.core.playback.seek(1000)
self.assertEqual(self.core.playback.state, core.PlaybackState.PLAYING)
def test_seek_paused_stay_paused(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.state = core.PlaybackState.PAUSED
self.core.playback.seek(1000)
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
@mock.patch(
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
def test_seek_emits_seeked_event(self, listener_mock):
self.core.playback.play(self.tl_tracks[0])
listener_mock.reset_mock()
self.core.playback.seek(1000)
listener_mock.send.assert_called_once_with(
'seeked', time_position=1000)
def test_time_position_selects_dummy1_backend(self):
self.core.playback.play(self.tl_tracks[0])
self.core.playback.seek(10000)
self.core.playback.time_position
self.playback1.get_time_position.assert_called_once_with()
self.assertFalse(self.playback2.get_time_position.called)
def test_time_position_selects_dummy2_backend(self):
self.core.playback.play(self.tl_tracks[1])
self.core.playback.seek(10000)
self.core.playback.time_position
self.assertFalse(self.playback1.get_time_position.called)
self.playback2.get_time_position.assert_called_once_with()
def test_time_position_returns_0_if_track_is_unplayable(self):
self.core.playback.current_tl_track = self.unplayable_tl_track
result = self.core.playback.time_position
self.assertEqual(result, 0)
self.assertFalse(self.playback1.get_time_position.called)
self.assertFalse(self.playback2.get_time_position.called)
# TODO Test on_tracklist_change
# Since we rely on our DummyAudio to actually emit events we need a "real"
# backend and not a mock so the right calls make it through to audio.
class TestBackend(pykka.ThreadingActor, backend.Backend):
uri_schemes = ['dummy']
def __init__(self, config, audio):
super(TestBackend, self).__init__()
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
class TestStream(unittest.TestCase):
def setUp(self): # noqa: N802
self.audio = audio.DummyAudio.start().proxy()
self.backend = TestBackend.start(config={}, audio=self.audio).proxy()
self.core = core.Core(audio=self.audio, backends=[self.backend])
self.playback = self.core.playback
self.tracks = [Track(uri='dummy:a', length=1234),
Track(uri='dummy:b', length=1234)]
self.core.tracklist.add(self.tracks)
self.events = []
self.patcher = mock.patch('mopidy.audio.listener.AudioListener.send')
self.send_mock = self.patcher.start()
def send(event, **kwargs):
self.events.append((event, kwargs))
self.send_mock.side_effect = send
def tearDown(self): # noqa: N802
pykka.ActorRegistry.stop_all()
self.patcher.stop()
def replay_audio_events(self):
while self.events:
event, kwargs = self.events.pop(0)
self.core.on_event(event, **kwargs)
def test_get_stream_title_before_playback(self):
self.assertEqual(self.playback.get_stream_title(), None)
def test_get_stream_title_during_playback(self):
self.core.playback.play()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), None)
def test_get_stream_title_during_playback_with_tags_change(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), 'foobar')
def test_get_stream_title_after_next(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
self.core.playback.next()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), None)
def test_get_stream_title_after_next_with_tags_change(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'title': ['foo']}).get()
self.core.playback.next()
self.audio.trigger_fake_tags_changed({'title': ['bar']}).get()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), 'bar')
def test_get_stream_title_after_stop(self):
self.core.playback.play()
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
self.core.playback.stop()
self.replay_audio_events()
self.assertEqual(self.playback.get_stream_title(), None)