695 lines
25 KiB
Python
695 lines
25 KiB
Python
from __future__ import absolute_import, unicode_literals
|
|
|
|
import unittest
|
|
|
|
import mock
|
|
|
|
import pykka
|
|
|
|
from mopidy import backend, core
|
|
from mopidy.models import Track
|
|
|
|
from tests import dummy_audio as audio
|
|
|
|
|
|
# TODO: split into smaller easier to follow tests. setup is way to complex.
|
|
class CorePlaybackTest(unittest.TestCase):
|
|
def setUp(self): # noqa: N802
|
|
self.backend1 = mock.Mock()
|
|
self.backend1.uri_schemes.get.return_value = ['dummy1']
|
|
self.playback1 = mock.Mock(spec=backend.PlaybackProvider)
|
|
self.playback1.get_time_position().get.return_value = 1000
|
|
self.playback1.reset_mock()
|
|
self.backend1.playback = self.playback1
|
|
|
|
self.backend2 = mock.Mock()
|
|
self.backend2.uri_schemes.get.return_value = ['dummy2']
|
|
self.playback2 = mock.Mock(spec=backend.PlaybackProvider)
|
|
self.playback2.get_time_position().get.return_value = 2000
|
|
self.playback2.reset_mock()
|
|
self.backend2.playback = self.playback2
|
|
|
|
# A backend without the optional playback provider
|
|
self.backend3 = mock.Mock()
|
|
self.backend3.uri_schemes.get.return_value = ['dummy3']
|
|
self.backend3.has_playback().get.return_value = False
|
|
|
|
self.tracks = [
|
|
Track(uri='dummy1:a', length=40000),
|
|
Track(uri='dummy2:a', length=40000),
|
|
Track(uri='dummy3:a', length=40000), # Unplayable
|
|
Track(uri='dummy1:b', length=40000),
|
|
Track(uri='dummy1:c', length=None), # No duration
|
|
]
|
|
|
|
self.core = core.Core(mixer=None, backends=[
|
|
self.backend1, self.backend2, self.backend3])
|
|
self.core.tracklist.add(self.tracks)
|
|
|
|
self.tl_tracks = self.core.tracklist.tl_tracks
|
|
self.unplayable_tl_track = self.tl_tracks[2]
|
|
self.duration_less_tl_track = self.tl_tracks[4]
|
|
|
|
def trigger_end_of_track(self):
|
|
self.core.playback._on_end_of_track()
|
|
|
|
def set_current_tl_track(self, tl_track):
|
|
self.core.playback._set_current_tl_track(tl_track)
|
|
|
|
def test_get_current_tl_track_none(self):
|
|
self.set_current_tl_track(None)
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), None)
|
|
|
|
def test_get_current_tl_track_play(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
|
|
|
|
def test_get_current_tl_track_next(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.next()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[1])
|
|
|
|
def test_get_current_tl_track_prev(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.previous()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_tl_track(), self.tl_tracks[0])
|
|
|
|
def test_get_current_track_play(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[0])
|
|
|
|
def test_get_current_track_next(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.next()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[1])
|
|
|
|
def test_get_current_track_prev(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.previous()
|
|
|
|
self.assertEqual(
|
|
self.core.playback.get_current_track(), self.tracks[0])
|
|
|
|
# TODO Test state
|
|
|
|
def test_play_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.playback1.prepare_change.assert_called_once_with()
|
|
self.playback1.change_track.assert_called_once_with(self.tracks[0])
|
|
self.playback1.play.assert_called_once_with()
|
|
self.assertFalse(self.playback2.play.called)
|
|
|
|
def test_play_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
|
|
self.assertFalse(self.playback1.play.called)
|
|
self.playback2.prepare_change.assert_called_once_with()
|
|
self.playback2.change_track.assert_called_once_with(self.tracks[1])
|
|
self.playback2.play.assert_called_once_with()
|
|
|
|
def test_play_skips_to_next_on_track_without_playback_backend(self):
|
|
self.core.playback.play(self.unplayable_tl_track)
|
|
|
|
self.playback1.prepare_change.assert_called_once_with()
|
|
self.playback1.change_track.assert_called_once_with(self.tracks[3])
|
|
self.playback1.play.assert_called_once_with()
|
|
self.assertFalse(self.playback2.play.called)
|
|
|
|
self.assertEqual(
|
|
self.core.playback.current_tl_track, self.tl_tracks[3])
|
|
|
|
def test_play_skips_to_next_on_unplayable_track(self):
|
|
"""Checks that we handle backend.change_track failing."""
|
|
self.playback2.change_track.return_value.get.return_value = False
|
|
|
|
self.core.tracklist.clear()
|
|
self.core.tracklist.add(self.tracks[:2])
|
|
tl_tracks = self.core.tracklist.tl_tracks
|
|
|
|
self.core.playback.play(tl_tracks[0])
|
|
self.core.playback.play(tl_tracks[1])
|
|
|
|
# TODO: we really want to check that the track was marked unplayable
|
|
# and that next was called. This is just an indirect way of checking
|
|
# this :(
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_stopped_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[0]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_paused_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='paused', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_play_when_playing_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.play(self.tl_tracks[3])
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[3]),
|
|
])
|
|
|
|
def test_pause_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
|
|
self.playback1.pause.assert_called_once_with()
|
|
self.assertFalse(self.playback2.pause.called)
|
|
|
|
def test_pause_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.pause()
|
|
|
|
self.assertFalse(self.playback1.pause.called)
|
|
self.playback2.pause.assert_called_once_with()
|
|
|
|
def test_pause_changes_state_even_if_track_is_unplayable(self):
|
|
self.set_current_tl_track(self.unplayable_tl_track)
|
|
self.core.playback.pause()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
self.assertFalse(self.playback1.pause.called)
|
|
self.assertFalse(self.playback2.pause.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_pause_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.pause()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='paused'),
|
|
mock.call(
|
|
'track_playback_paused',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
def test_resume_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
self.core.playback.resume()
|
|
|
|
self.playback1.resume.assert_called_once_with()
|
|
self.assertFalse(self.playback2.resume.called)
|
|
|
|
def test_resume_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.pause()
|
|
self.core.playback.resume()
|
|
|
|
self.assertFalse(self.playback1.resume.called)
|
|
self.playback2.resume.assert_called_once_with()
|
|
|
|
def test_resume_does_nothing_if_track_is_unplayable(self):
|
|
self.set_current_tl_track(self.unplayable_tl_track)
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.resume()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
self.assertFalse(self.playback1.resume.called)
|
|
self.assertFalse(self.playback2.resume.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_resume_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.pause()
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.resume()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='paused', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_resumed',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
def test_stop_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.stop()
|
|
|
|
self.playback1.stop.assert_called_once_with()
|
|
self.assertFalse(self.playback2.stop.called)
|
|
|
|
def test_stop_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.stop()
|
|
|
|
self.assertFalse(self.playback1.stop.called)
|
|
self.playback2.stop.assert_called_once_with()
|
|
|
|
def test_stop_changes_state_even_if_track_is_unplayable(self):
|
|
self.set_current_tl_track(self.unplayable_tl_track)
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.stop()
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.STOPPED)
|
|
self.assertFalse(self.playback1.stop.called)
|
|
self.assertFalse(self.playback2.stop.called)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_stop_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.stop()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=1000),
|
|
])
|
|
|
|
# TODO Test next() more
|
|
|
|
def test_next_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_next_in_consume_mode_removes_finished_track(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_next_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.next()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
# TODO Test previous() more
|
|
|
|
def test_previous_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[1]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_previous_keeps_finished_track_even_in_consume_mode(self):
|
|
tl_track = self.tl_tracks[1]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_previous_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.previous()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[1], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[0]),
|
|
])
|
|
|
|
# TODO Test on_end_of_track() more
|
|
|
|
def test_on_end_of_track_keeps_finished_track_in_tracklist(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
|
|
self.trigger_end_of_track()
|
|
|
|
self.assertIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
def test_on_end_of_track_in_consume_mode_removes_finished_track(self):
|
|
tl_track = self.tl_tracks[0]
|
|
self.core.playback.play(tl_track)
|
|
self.core.tracklist.consume = True
|
|
|
|
self.trigger_end_of_track()
|
|
|
|
self.assertNotIn(tl_track, self.core.tracklist.tl_tracks)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_on_end_of_track_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.trigger_end_of_track()
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_seek_past_end_of_track_emits_events(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.seek(self.tracks[0].length * 5)
|
|
|
|
self.assertListEqual(
|
|
listener_mock.send.mock_calls,
|
|
[
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='playing', new_state='stopped'),
|
|
mock.call(
|
|
'track_playback_ended',
|
|
tl_track=self.tl_tracks[0], time_position=mock.ANY),
|
|
mock.call(
|
|
'playback_state_changed',
|
|
old_state='stopped', new_state='playing'),
|
|
mock.call(
|
|
'track_playback_started', tl_track=self.tl_tracks[1]),
|
|
])
|
|
|
|
def test_seek_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.seek(10000)
|
|
|
|
self.playback1.seek.assert_called_once_with(10000)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.seek(10000)
|
|
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.playback2.seek.assert_called_once_with(10000)
|
|
|
|
def test_seek_fails_for_unplayable_track(self):
|
|
self.set_current_tl_track(self.unplayable_tl_track)
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
success = self.core.playback.seek(1000)
|
|
|
|
self.assertFalse(success)
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_fails_for_track_without_duration(self):
|
|
self.set_current_tl_track(self.duration_less_tl_track)
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
success = self.core.playback.seek(1000)
|
|
|
|
self.assertFalse(success)
|
|
self.assertFalse(self.playback1.seek.called)
|
|
self.assertFalse(self.playback2.seek.called)
|
|
|
|
def test_seek_play_stay_playing(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.state = core.PlaybackState.PLAYING
|
|
self.core.playback.seek(1000)
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PLAYING)
|
|
|
|
def test_seek_paused_stay_paused(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.state = core.PlaybackState.PAUSED
|
|
self.core.playback.seek(1000)
|
|
|
|
self.assertEqual(self.core.playback.state, core.PlaybackState.PAUSED)
|
|
|
|
@mock.patch(
|
|
'mopidy.core.playback.listener.CoreListener', spec=core.CoreListener)
|
|
def test_seek_emits_seeked_event(self, listener_mock):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
listener_mock.reset_mock()
|
|
|
|
self.core.playback.seek(1000)
|
|
|
|
listener_mock.send.assert_called_once_with(
|
|
'seeked', time_position=1000)
|
|
|
|
def test_time_position_selects_dummy1_backend(self):
|
|
self.core.playback.play(self.tl_tracks[0])
|
|
self.core.playback.seek(10000)
|
|
self.core.playback.time_position
|
|
|
|
self.playback1.get_time_position.assert_called_once_with()
|
|
self.assertFalse(self.playback2.get_time_position.called)
|
|
|
|
def test_time_position_selects_dummy2_backend(self):
|
|
self.core.playback.play(self.tl_tracks[1])
|
|
self.core.playback.seek(10000)
|
|
self.core.playback.time_position
|
|
|
|
self.assertFalse(self.playback1.get_time_position.called)
|
|
self.playback2.get_time_position.assert_called_once_with()
|
|
|
|
def test_time_position_returns_0_if_track_is_unplayable(self):
|
|
self.set_current_tl_track(self.unplayable_tl_track)
|
|
|
|
result = self.core.playback.time_position
|
|
|
|
self.assertEqual(result, 0)
|
|
self.assertFalse(self.playback1.get_time_position.called)
|
|
self.assertFalse(self.playback2.get_time_position.called)
|
|
|
|
# TODO Test on_tracklist_change
|
|
|
|
|
|
# Since we rely on our DummyAudio to actually emit events we need a "real"
|
|
# backend and not a mock so the right calls make it through to audio.
|
|
class TestBackend(pykka.ThreadingActor, backend.Backend):
|
|
uri_schemes = ['dummy']
|
|
|
|
def __init__(self, config, audio):
|
|
super(TestBackend, self).__init__()
|
|
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
|
|
|
|
|
|
class TestStream(unittest.TestCase):
|
|
def setUp(self): # noqa: N802
|
|
self.audio = audio.DummyAudio.start().proxy()
|
|
self.backend = TestBackend.start(config={}, audio=self.audio).proxy()
|
|
self.core = core.Core(audio=self.audio, backends=[self.backend])
|
|
self.playback = self.core.playback
|
|
|
|
self.tracks = [Track(uri='dummy:a', length=1234),
|
|
Track(uri='dummy:b', length=1234)]
|
|
|
|
self.core.tracklist.add(self.tracks)
|
|
|
|
self.events = []
|
|
self.patcher = mock.patch('mopidy.audio.listener.AudioListener.send')
|
|
self.send_mock = self.patcher.start()
|
|
|
|
def send(event, **kwargs):
|
|
self.events.append((event, kwargs))
|
|
|
|
self.send_mock.side_effect = send
|
|
|
|
def tearDown(self): # noqa: N802
|
|
pykka.ActorRegistry.stop_all()
|
|
self.patcher.stop()
|
|
|
|
def replay_audio_events(self):
|
|
while self.events:
|
|
event, kwargs = self.events.pop(0)
|
|
self.core.on_event(event, **kwargs)
|
|
|
|
def test_get_stream_title_before_playback(self):
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|
|
|
|
def test_get_stream_title_during_playback(self):
|
|
self.core.playback.play()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|
|
|
|
def test_get_stream_title_during_playback_with_tags_change(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), 'foobar')
|
|
|
|
def test_get_stream_title_after_next(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
self.core.playback.next()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|
|
|
|
def test_get_stream_title_after_next_with_tags_change(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foo']}).get()
|
|
self.core.playback.next()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['bar']}).get()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), 'bar')
|
|
|
|
def test_get_stream_title_after_stop(self):
|
|
self.core.playback.play()
|
|
self.audio.trigger_fake_tags_changed({'organization': ['baz']})
|
|
self.audio.trigger_fake_tags_changed({'title': ['foobar']}).get()
|
|
self.core.playback.stop()
|
|
|
|
self.replay_audio_events()
|
|
self.assertEqual(self.playback.get_stream_title(), None)
|
|
|
|
|
|
class CorePlaybackWithOldBackendTest(unittest.TestCase):
|
|
def test_type_error_from_old_backend_does_not_crash_core(self):
|
|
b = mock.Mock()
|
|
b.uri_schemes.get.return_value = ['dummy1']
|
|
b.playback = mock.Mock(spec=backend.PlaybackProvider)
|
|
b.playback.play.side_effect = TypeError
|
|
|
|
c = core.Core(mixer=None, backends=[b])
|
|
c.tracklist.add([Track(uri='dummy1:a', length=40000)])
|
|
c.playback.play() # No TypeError == test passed.
|
|
b.playback.play.assert_called_once_with()
|
|
|
|
|
|
class Bug1177RegressionTest(unittest.TestCase):
|
|
def test(self):
|
|
b = mock.Mock()
|
|
b.uri_schemes.get.return_value = ['dummy']
|
|
b.playback = mock.Mock(spec=backend.PlaybackProvider)
|
|
b.playback.change_track.return_value.get.return_value = True
|
|
b.playback.play.return_value.get.return_value = True
|
|
|
|
track1 = Track(uri='dummy:a', length=40000)
|
|
track2 = Track(uri='dummy:b', length=40000)
|
|
|
|
c = core.Core(mixer=None, backends=[b])
|
|
c.tracklist.add([track1, track2])
|
|
|
|
c.playback.play()
|
|
b.playback.change_track.assert_called_once_with(track1)
|
|
b.playback.change_track.reset_mock()
|
|
|
|
c.playback.pause()
|
|
c.playback.next()
|
|
b.playback.change_track.assert_called_once_with(track2)
|