Make core.playback select backend based on track URI

This commit is contained in:
Stein Magnus Jodal 2012-10-27 11:27:54 +02:00
parent 7b9c682e95
commit c47cec9e65
2 changed files with 140 additions and 6 deletions

View File

@ -78,6 +78,11 @@ class PlaybackController(object):
self.audio = audio
self.backends = backends
uri_schemes_by_backend = {backend: backend.uri_schemes.get()
for backend in backends}
self.backends_by_uri_scheme = {uri_scheme: backend
for backend, uri_schemes in uri_schemes_by_backend.items()
for uri_scheme in uri_schemes}
self.core = core
@ -86,6 +91,13 @@ class PlaybackController(object):
self._first_shuffle = True
self._volume = None
def _get_backend(self):
if self.current_cp_track is None:
return None
track = self.current_cp_track.track
uri_scheme = track.uri.split(':', 1)[0]
return self.backends_by_uri_scheme[uri_scheme]
def _get_cpid(self, cp_track):
if cp_track is None:
return None
@ -291,7 +303,10 @@ class PlaybackController(object):
@property
def time_position(self):
"""Time position in milliseconds."""
return self.backends[0].playback.get_time_position().get()
backend = self._get_backend()
if backend is None:
return 0
return backend.playback.get_time_position().get()
@property
def volume(self):
@ -377,7 +392,8 @@ class PlaybackController(object):
def pause(self):
"""Pause playback."""
if self.backends[0].playback.pause().get():
backend = self._get_backend()
if backend is None or backend.playback.pause().get():
self.state = PlaybackState.PAUSED
self._trigger_track_playback_paused()
@ -409,7 +425,7 @@ class PlaybackController(object):
if cp_track is not None:
self.current_cp_track = cp_track
self.state = PlaybackState.PLAYING
if not self.backends[0].playback.play(cp_track.track).get():
if not self._get_backend().playback.play(cp_track.track).get():
# Track is not playable
if self.random and self._shuffled:
self._shuffled.remove(cp_track)
@ -436,7 +452,7 @@ class PlaybackController(object):
def resume(self):
"""If paused, resume playing the current track."""
if (self.state == PlaybackState.PAUSED and
self.backends[0].playback.resume().get()):
self._get_backend().playback.resume().get()):
self.state = PlaybackState.PLAYING
self._trigger_track_playback_resumed()
@ -462,7 +478,7 @@ class PlaybackController(object):
self.next()
return True
success = self.backends[0].playback.seek(time_position).get()
success = self._get_backend().playback.seek(time_position).get()
if success:
self._trigger_seeked(time_position)
return success
@ -476,7 +492,7 @@ class PlaybackController(object):
:type clear_current_track: boolean
"""
if self.state != PlaybackState.STOPPED:
if self.backends[0].playback.stop().get():
if self._get_backend().playback.stop().get():
self._trigger_track_playback_ended()
self.state = PlaybackState.STOPPED
if clear_current_track:

118
tests/core/playback_test.py Normal file
View File

@ -0,0 +1,118 @@
import mock
from mopidy.backends import base
from mopidy.core import Core
from mopidy.models import Track
from tests import unittest
class CorePlaybackTest(unittest.TestCase):
def setUp(self):
self.backend1 = mock.Mock()
self.backend1.uri_schemes.get.return_value = ['dummy1']
self.playback1 = mock.Mock(spec=base.BasePlaybackProvider)
self.backend1.playback = self.playback1
self.backend2 = mock.Mock()
self.backend2.uri_schemes.get.return_value = ['dummy2']
self.playback2 = mock.Mock(spec=base.BasePlaybackProvider)
self.backend2.playback = self.playback2
self.tracks = [
Track(uri='dummy1://foo', length=40000),
Track(uri='dummy1://bar', length=40000),
Track(uri='dummy2://foo', length=40000),
Track(uri='dummy2://bar', length=40000),
]
self.core = Core(audio=None, backends=[self.backend1, self.backend2])
self.core.current_playlist.append(self.tracks)
self.cp_tracks = self.core.current_playlist.cp_tracks
def test_play_selects_dummy1_backend(self):
self.core.playback.play(self.cp_tracks[0])
self.playback1.play.assert_called_once_with(self.tracks[0])
self.assertFalse(self.playback2.play.called)
def test_play_selects_dummy2_backend(self):
self.core.playback.play(self.cp_tracks[2])
self.assertFalse(self.playback1.play.called)
self.playback2.play.assert_called_once_with(self.tracks[2])
def test_pause_selects_dummy1_backend(self):
self.core.playback.play(self.cp_tracks[0])
self.core.playback.pause()
self.playback1.pause.assert_called_once_with()
self.assertFalse(self.playback2.pause.called)
def test_pause_selects_dummy2_backend(self):
self.core.playback.play(self.cp_tracks[2])
self.core.playback.pause()
self.assertFalse(self.playback1.pause.called)
self.playback2.pause.assert_called_once_with()
def test_resume_selects_dummy1_backend(self):
self.core.playback.play(self.cp_tracks[0])
self.core.playback.pause()
self.core.playback.resume()
self.playback1.resume.assert_called_once_with()
self.assertFalse(self.playback2.resume.called)
def test_resume_selects_dummy2_backend(self):
self.core.playback.play(self.cp_tracks[2])
self.core.playback.pause()
self.core.playback.resume()
self.assertFalse(self.playback1.resume.called)
self.playback2.resume.assert_called_once_with()
def test_stop_selects_dummy1_backend(self):
self.core.playback.play(self.cp_tracks[0])
self.core.playback.stop()
self.playback1.stop.assert_called_once_with()
self.assertFalse(self.playback2.stop.called)
def test_stop_selects_dummy2_backend(self):
self.core.playback.play(self.cp_tracks[2])
self.core.playback.stop()
self.assertFalse(self.playback1.stop.called)
self.playback2.stop.assert_called_once_with()
def test_seek_selects_dummy1_backend(self):
self.core.playback.play(self.cp_tracks[0])
self.core.playback.seek(10000)
self.playback1.seek.assert_called_once_with(10000)
self.assertFalse(self.playback2.seek.called)
def test_seek_selects_dummy2_backend(self):
self.core.playback.play(self.cp_tracks[2])
self.core.playback.seek(10000)
self.assertFalse(self.playback1.seek.called)
self.playback2.seek.assert_called_once_with(10000)
def test_time_position_selects_dummy1_backend(self):
self.core.playback.play(self.cp_tracks[0])
self.core.playback.seek(10000)
self.core.playback.time_position
self.playback1.get_time_position.assert_called_once_with()
self.assertFalse(self.playback2.get_time_position.called)
def test_time_position_selects_dummy2_backend(self):
self.core.playback.play(self.cp_tracks[2])
self.core.playback.seek(10000)
self.core.playback.time_position
self.assertFalse(self.playback1.get_time_position.called)
self.playback2.get_time_position.assert_called_once_with()