Split MPRIS frontend into multiple files. Fix some pylint warnings.

This commit is contained in:
Stein Magnus Jodal 2011-07-28 01:03:51 +02:00
parent 3c1ba51580
commit d14dbc5587
5 changed files with 214 additions and 203 deletions

View File

@ -0,0 +1,122 @@
import logging
logger = logging.getLogger('mopidy.frontends.mpris')
try:
import indicate
except ImportError as import_error:
indicate = None
logger.debug(u'Startup notification will not be sent (%s)', import_error)
from pykka.actor import ThreadingActor
from mopidy.frontends.mpris import objects
from mopidy.listeners import BackendListener
class MprisFrontend(ThreadingActor, BackendListener):
"""
Frontend which lets you control Mopidy through the Media Player Remote
Interfacing Specification (MPRIS) D-Bus interface.
An example of an MPRIS client is `Ubuntu's sound menu
<https://wiki.ubuntu.com/SoundMenu>`_.
**Dependencies:**
- ``dbus`` Python bindings. The package is named ``python-dbus`` in
Ubuntu/Debian.
- ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the
Ubuntu Sound Menu. The package is named ``python-indicate`` in
Ubuntu/Debian.
**Testing the frontend**
To test, start Mopidy, and then run the following in a Python shell::
import dbus
bus = dbus.SessionBus()
player = bus.get_object('org.mpris.MediaPlayer2.mopidy',
'/org/mpris/MediaPlayer2')
Now you can control Mopidy through the player object. Examples:
- To get some properties from Mopidy, run::
props = player.GetAll('org.mpris.MediaPlayer2',
dbus_interface='org.freedesktop.DBus.Properties')
- To quit Mopidy through D-Bus, run::
player.Quit(dbus_interface='org.mpris.MediaPlayer2')
"""
def __init__(self):
self.indicate_server = None
self.mpris_object = None
def on_start(self):
self.mpris_object = objects.MprisObject()
self.send_startup_notification()
def on_stop(self):
logger.debug(u'Removing MPRIS object from D-Bus connection...')
self.mpris_object.remove_from_connection()
self.mpris_object = None
logger.debug(u'Removed MPRIS object from D-Bus connection')
def send_startup_notification(self):
"""
Send startup notification using libindicate to make Mopidy appear in
e.g. `Ubuntu's sound menu <https://wiki.ubuntu.com/SoundMenu>`_.
A reference to the libindicate server is kept for as long as Mopidy is
running. When Mopidy exits, the server will be unreferenced and Mopidy
will automatically be unregistered from e.g. the sound menu.
"""
if not indicate:
return
logger.debug(u'Sending startup notification...')
self.indicate_server = indicate.Server()
self.indicate_server.set_type('music.mopidy')
# FIXME Location of .desktop file shouldn't be hardcoded
self.indicate_server.set_desktop_file(
'/usr/share/applications/mopidy.desktop')
self.indicate_server.show()
logger.debug(u'Startup notification sent')
def _emit_properties_changed(self, *changed_properties):
if self.mpris_object is None:
return
props_with_new_values = [
(p, self.mpris_object.Get(objects.PLAYER_IFACE, p))
for p in changed_properties]
self.mpris_object.PropertiesChanged(objects.PLAYER_IFACE,
dict(props_with_new_values), [])
def track_playback_paused(self, track, time_position):
logger.debug(u'Received track playback paused event')
self._emit_properties_changed('PlaybackStatus')
def track_playback_resumed(self, track, time_position):
logger.debug(u'Received track playback resumed event')
self._emit_properties_changed('PlaybackStatus')
def track_playback_started(self, track):
logger.debug(u'Received track playback started event')
self._emit_properties_changed('PlaybackStatus', 'Metadata')
def track_playback_ended(self, track, time_position):
logger.debug(u'Received track playback ended event')
self._emit_properties_changed('PlaybackStatus', 'Metadata')
def volume_changed(self):
logger.debug(u'Received volume changed event')
self._emit_properties_changed('Volume')
def seeked(self):
logger.debug(u'Received seeked event')
if self.mpris_object is None:
return
self.mpris_object.Seeked(
self.mpris_object.Get(objects.PLAYER_IFACE, 'Position'))

View File

@ -10,18 +10,10 @@ except ImportError as import_error:
from mopidy import OptionalDependencyError
raise OptionalDependencyError(import_error)
try:
import indicate
except ImportError as import_error:
indicate = None
logger.debug(u'Startup notification will not be sent (%s)', import_error)
from pykka.actor import ThreadingActor
from pykka.registry import ActorRegistry
from mopidy.backends.base import Backend
from mopidy.backends.base.playback import PlaybackController
from mopidy.listeners import BackendListener
from mopidy.mixers.base import BaseMixer
from mopidy.utils.process import exit_process
@ -35,112 +27,6 @@ ROOT_IFACE = 'org.mpris.MediaPlayer2'
PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player'
class MprisFrontend(ThreadingActor, BackendListener):
"""
Frontend which lets you control Mopidy through the Media Player Remote
Interfacing Specification (MPRIS) D-Bus interface.
An example of an MPRIS client is `Ubuntu's sound menu
<https://wiki.ubuntu.com/SoundMenu>`_.
**Dependencies:**
- ``dbus`` Python bindings. The package is named ``python-dbus`` in
Ubuntu/Debian.
- ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the
Ubuntu Sound Menu. The package is named ``python-indicate`` in
Ubuntu/Debian.
**Testing the frontend**
To test, start Mopidy, and then run the following in a Python shell::
import dbus
bus = dbus.SessionBus()
player = bus.get_object('org.mpris.MediaPlayer2.mopidy',
'/org/mpris/MediaPlayer2')
Now you can control Mopidy through the player object. Examples:
- To get some properties from Mopidy, run::
props = player.GetAll('org.mpris.MediaPlayer2',
dbus_interface='org.freedesktop.DBus.Properties')
- To quit Mopidy through D-Bus, run::
player.Quit(dbus_interface='org.mpris.MediaPlayer2')
"""
def __init__(self):
self.indicate_server = None
self.mpris_object = None
def on_start(self):
self.mpris_object = MprisObject()
self.send_startup_notification()
def on_stop(self):
logger.debug(u'Removing MPRIS object from D-Bus connection...')
self.mpris_object.remove_from_connection()
self.mpris_object = None
logger.debug(u'Removed MPRIS object from D-Bus connection')
def send_startup_notification(self):
"""
Send startup notification using libindicate to make Mopidy appear in
e.g. `Ubuntu's sound menu <https://wiki.ubuntu.com/SoundMenu>`_.
A reference to the libindicate server is kept for as long as Mopidy is
running. When Mopidy exits, the server will be unreferenced and Mopidy
will automatically be unregistered from e.g. the sound menu.
"""
if not indicate:
return
logger.debug(u'Sending startup notification...')
self.indicate_server = indicate.Server()
self.indicate_server.set_type('music.mopidy')
# FIXME Location of .desktop file shouldn't be hardcoded
self.indicate_server.set_desktop_file(
'/usr/share/applications/mopidy.desktop')
self.indicate_server.show()
logger.debug(u'Startup notification sent')
def _emit_properties_changed(self, *changed_properties):
if self.mpris_object is None:
return
props_with_new_values = [(p, self.mpris_object.Get(PLAYER_IFACE, p))
for p in changed_properties]
self.mpris_object.PropertiesChanged(PLAYER_IFACE,
dict(props_with_new_values), [])
def track_playback_paused(self, track, time_position):
logger.debug(u'Received track playback paused event')
self._emit_properties_changed('PlaybackStatus')
def track_playback_resumed(self, track, time_position):
logger.debug(u'Received track playback resumed event')
self._emit_properties_changed('PlaybackStatus')
def track_playback_started(self, track):
logger.debug(u'Received track playback started event')
self._emit_properties_changed('PlaybackStatus', 'Metadata')
def track_playback_ended(self, track, time_position):
logger.debug(u'Received track playback ended event')
self._emit_properties_changed('PlaybackStatus', 'Metadata')
def volume_changed(self):
logger.debug(u'Received volume changed event')
self._emit_properties_changed('Volume')
def seeked(self):
logger.debug(u'Received seeked event')
if self.mpris_object is None:
return
self.mpris_object.Seeked(PLAYER_IFACE, self.mpris_object.Get(PLAYER_IFACE, 'Position'))
class MprisObject(dbus.service.Object):
"""Implements http://www.mpris.org/2.1/spec/"""
@ -198,7 +84,8 @@ class MprisObject(dbus.service.Object):
def backend(self):
if self._backend is None:
backend_refs = ActorRegistry.get_by_class(Backend)
assert len(backend_refs) == 1, 'Expected exactly one running backend.'
assert len(backend_refs) == 1, \
'Expected exactly one running backend.'
self._backend = backend_refs[0].proxy()
return self._backend
@ -262,7 +149,7 @@ class MprisObject(dbus.service.Object):
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Raise(self):
logger.debug(u'%s.Raise called', ROOT_IFACE)
pass # We do not have a GUI
# Do nothing, as we do not have a GUI
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Quit(self):
@ -390,7 +277,7 @@ class MprisObject(dbus.service.Object):
@dbus.service.signal(dbus_interface=PLAYER_IFACE, signature='x')
def Seeked(self, position):
logger.debug(u'%s.Seeked signaled', PLAYER_IFACE)
pass
# Do nothing, as just calling the method is enough to emit the signal.
### Player interface properties

View File

@ -1,66 +1,68 @@
import mock
import unittest
from mopidy.frontends.mpris import MprisFrontend, MprisObject, PLAYER_IFACE
from mopidy.frontends.mpris import MprisFrontend, objects
from mopidy.models import Track
class BackendEventsTest(unittest.TestCase):
def setUp(self):
self.mpris_frontend = MprisFrontend() # As a plain class, not an actor
self.mpris_object = mock.Mock(spec=MprisObject)
self.mpris_object = mock.Mock(spec=objects.MprisObject)
self.mpris_frontend.mpris_object = self.mpris_object
def test_track_playback_paused_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Paused'
self.mpris_frontend.track_playback_paused(Track(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, [])
objects.PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, [])
def test_track_playback_resumed_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Playing'
self.mpris_frontend.track_playback_resumed(Track(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, [])
objects.PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, [])
def test_track_playback_started_event_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_started(Track())
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((PLAYER_IFACE, 'PlaybackStatus'), {}),
((PLAYER_IFACE, 'Metadata'), {}),
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
PLAYER_IFACE, {'Metadata': '...', 'PlaybackStatus': '...'}, [])
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_track_playback_ended_event_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_ended(Track(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((PLAYER_IFACE, 'PlaybackStatus'), {}),
((PLAYER_IFACE, 'Metadata'), {}),
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
PLAYER_IFACE, {'Metadata': '...', 'PlaybackStatus': '...'}, [])
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_volume_changed_event_changes_volume(self):
self.mpris_object.Get.return_value = 1.0
self.mpris_frontend.volume_changed()
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((PLAYER_IFACE, 'Volume'), {}),
((objects.PLAYER_IFACE, 'Volume'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
PLAYER_IFACE, {'Volume': 1.0}, [])
objects.PLAYER_IFACE, {'Volume': 1.0}, [])
def test_seeked_event_causes_mpris_seeked_event(self):
self.mpris_object.Get.return_value = 31000000
self.mpris_frontend.seeked()
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((PLAYER_IFACE, 'Position'), {}),
((objects.PLAYER_IFACE, 'Position'), {}),
])
self.mpris_object.Seeked.assert_called_with( PLAYER_IFACE, 31000000)
self.mpris_object.Seeked.assert_called_with(31000000)

View File

@ -3,7 +3,7 @@ import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.backends.base.playback import PlaybackController
from mopidy.frontends import mpris
from mopidy.frontends.mpris import objects
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Album, Artist, Track
@ -13,10 +13,10 @@ STOPPED = PlaybackController.STOPPED
class PlayerInterfaceTest(unittest.TestCase):
def setUp(self):
mpris.MprisObject._connect_to_dbus = mock.Mock()
objects.MprisObject._connect_to_dbus = mock.Mock()
self.mixer = DummyMixer.start().proxy()
self.backend = DummyBackend.start().proxy()
self.mpris = mpris.MprisObject()
self.mpris = objects.MprisObject()
self.mpris._backend = self.backend
def tearDown(self):
@ -25,68 +25,68 @@ class PlayerInterfaceTest(unittest.TestCase):
def test_get_playback_status_is_playing_when_playing(self):
self.backend.playback.state = PLAYING
result = self.mpris.Get(mpris.PLAYER_IFACE, 'PlaybackStatus')
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Playing', result)
def test_get_playback_status_is_paused_when_paused(self):
self.backend.playback.state = PAUSED
result = self.mpris.Get(mpris.PLAYER_IFACE, 'PlaybackStatus')
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Paused', result)
def test_get_playback_status_is_stopped_when_stopped(self):
self.backend.playback.state = STOPPED
result = self.mpris.Get(mpris.PLAYER_IFACE, 'PlaybackStatus')
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Stopped', result)
def test_get_loop_status_is_none_when_not_looping(self):
self.backend.playback.repeat = False
self.backend.playback.single = False
result = self.mpris.Get(mpris.PLAYER_IFACE, 'LoopStatus')
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('None', result)
def test_get_loop_status_is_track_when_looping_a_single_track(self):
self.backend.playback.repeat = True
self.backend.playback.single = True
result = self.mpris.Get(mpris.PLAYER_IFACE, 'LoopStatus')
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Track', result)
def test_get_loop_status_is_playlist_when_looping_the_current_playlist(self):
self.backend.playback.repeat = True
self.backend.playback.single = False
result = self.mpris.Get(mpris.PLAYER_IFACE, 'LoopStatus')
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Playlist', result)
def test_set_loop_status_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.playback.repeat = True
self.backend.playback.single = True
self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'None')
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEquals(self.backend.playback.repeat.get(), True)
self.assertEquals(self.backend.playback.single.get(), True)
def test_set_loop_status_to_none_unsets_repeat_and_single(self):
self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'None')
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEquals(self.backend.playback.repeat.get(), False)
self.assertEquals(self.backend.playback.single.get(), False)
def test_set_loop_status_to_track_sets_repeat_and_single(self):
self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'Track')
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Track')
self.assertEquals(self.backend.playback.repeat.get(), True)
self.assertEquals(self.backend.playback.single.get(), True)
def test_set_loop_status_to_playlists_sets_repeat_and_not_single(self):
self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'Playlist')
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Playlist')
self.assertEquals(self.backend.playback.repeat.get(), True)
self.assertEquals(self.backend.playback.single.get(), False)
def test_get_rate_is_greater_or_equal_than_minimum_rate(self):
rate = self.mpris.Get(mpris.PLAYER_IFACE, 'Rate')
minimum_rate = self.mpris.Get(mpris.PLAYER_IFACE, 'MinimumRate')
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
minimum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assert_(rate >= minimum_rate)
def test_get_rate_is_less_or_equal_than_maximum_rate(self):
rate = self.mpris.Get(mpris.PLAYER_IFACE, 'Rate')
maximum_rate = self.mpris.Get(mpris.PLAYER_IFACE, 'MaximumRate')
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
maximum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assert_(rate >= maximum_rate)
def test_set_rate_is_ignored_if_can_control_is_false(self):
@ -94,46 +94,46 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Set(mpris.PLAYER_IFACE, 'Rate', 0)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_set_rate_to_zero_pauses_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Set(mpris.PLAYER_IFACE, 'Rate', 0)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEquals(self.backend.playback.state.get(), PAUSED)
def test_get_shuffle_returns_true_if_random_is_active(self):
self.backend.playback.random = True
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Shuffle')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertTrue(result)
def test_get_shuffle_returns_false_if_random_is_inactive(self):
self.backend.playback.random = False
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Shuffle')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertFalse(result)
def test_set_shuffle_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.playback.random = False
result = self.mpris.Set(mpris.PLAYER_IFACE, 'Shuffle', True)
result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertFalse(self.backend.playback.random.get())
def test_set_shuffle_to_true_activates_random_mode(self):
self.backend.playback.random = False
self.assertFalse(self.backend.playback.random.get())
result = self.mpris.Set(mpris.PLAYER_IFACE, 'Shuffle', True)
result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertTrue(self.backend.playback.random.get())
def test_set_shuffle_to_false_deactivates_random_mode(self):
self.backend.playback.random = True
self.assertTrue(self.backend.playback.random.get())
result = self.mpris.Set(mpris.PLAYER_IFACE, 'Shuffle', False)
result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', False)
self.assertFalse(self.backend.playback.random.get())
def test_get_metadata_has_trackid_even_when_no_current_track(self):
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assert_('mpris:trackid' in result.keys())
self.assertEquals(result['mpris:trackid'], '')
@ -141,7 +141,7 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.play()
(cpid, track) = self.backend.playback.current_cp_track.get()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:trackid', result.keys())
self.assertEquals(result['mpris:trackid'],
'/com/mopidy/track/%d' % cpid)
@ -149,21 +149,21 @@ class PlayerInterfaceTest(unittest.TestCase):
def test_get_metadata_has_track_length(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:length', result.keys())
self.assertEquals(result['mpris:length'], 40000000)
def test_get_metadata_has_track_uri(self):
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:url', result.keys())
self.assertEquals(result['xesam:url'], 'a')
def test_get_metadata_has_track_title(self):
self.backend.current_playlist.append([Track(name='a')])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:title', result.keys())
self.assertEquals(result['xesam:title'], 'a')
@ -171,14 +171,14 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)])])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:artist', result.keys())
self.assertEquals(result['xesam:artist'], ['a', 'b'])
def test_get_metadata_has_track_album(self):
self.backend.current_playlist.append([Track(album=Album(name='a'))])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:album', result.keys())
self.assertEquals(result['xesam:album'], 'a')
@ -186,75 +186,75 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(album=Album(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)]))])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:albumArtist', result.keys())
self.assertEquals(result['xesam:albumArtist'], ['a', 'b'])
def test_get_metadata_has_track_number_in_album(self):
self.backend.current_playlist.append([Track(track_no=7)])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:trackNumber', result.keys())
self.assertEquals(result['xesam:trackNumber'], 7)
def test_get_volume_should_return_volume_between_zero_and_one(self):
self.mixer.volume = 0
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Volume')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEquals(result, 0)
self.mixer.volume = 50
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Volume')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEquals(result, 0.5)
self.mixer.volume = 100
result = self.mpris.Get(mpris.PLAYER_IFACE, 'Volume')
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEquals(result, 1)
def test_set_volume_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.mixer.volume = 0
self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', 1.0)
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEquals(self.mixer.volume.get(), 0)
def test_set_volume_to_one_should_set_mixer_volume_to_100(self):
self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', 1.0)
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEquals(self.mixer.volume.get(), 100)
def test_set_volume_to_anything_above_one_should_set_mixer_volume_to_100(self):
self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', 2.0)
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 2.0)
self.assertEquals(self.mixer.volume.get(), 100)
def test_set_volume_to_anything_not_a_number_does_not_change_volume(self):
self.mixer.volume = 10
self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', None)
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', None)
self.assertEquals(self.mixer.volume.get(), 10)
def test_get_position_returns_time_position_in_microseconds(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
self.backend.playback.seek(10000)
result_in_microseconds = self.mpris.Get(mpris.PLAYER_IFACE, 'Position')
result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assert_(result_in_milliseconds >= 10000)
def test_get_position_when_no_current_track_should_be_zero(self):
result_in_microseconds = self.mpris.Get(mpris.PLAYER_IFACE, 'Position')
result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assertEquals(result_in_milliseconds, 0)
def test_get_minimum_rate_is_one_or_less(self):
result = self.mpris.Get(mpris.PLAYER_IFACE, 'MinimumRate')
result = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assert_(result <= 1.0)
def test_get_maximum_rate_is_one_or_more(self):
result = self.mpris.Get(mpris.PLAYER_IFACE, 'MaximumRate')
result = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assert_(result >= 1.0)
def test_can_go_next_is_true_if_can_control_and_other_next_track(self):
self.mpris.get_CanControl = lambda *_: True
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoNext')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertTrue(result)
def test_can_go_next_is_false_if_next_track_is_the_same(self):
@ -262,14 +262,14 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.repeat = True
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoNext')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_next_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoNext')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_previous_is_true_if_can_control_and_other_previous_track(self):
@ -277,7 +277,7 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoPrevious')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertTrue(result)
def test_can_go_previous_is_false_if_previous_track_is_the_same(self):
@ -285,7 +285,7 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.repeat = True
self.backend.playback.play()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoPrevious')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_go_previous_is_false_if_can_control_is_false(self):
@ -293,7 +293,7 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoPrevious')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_play_is_true_if_can_control_and_current_track(self):
@ -301,42 +301,42 @@ class PlayerInterfaceTest(unittest.TestCase):
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.play()
self.assertTrue(self.backend.playback.current_track.get())
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPlay')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertTrue(result)
def test_can_play_is_false_if_no_current_track(self):
self.mpris.get_CanControl = lambda *_: True
self.assertFalse(self.backend.playback.current_track.get())
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPlay')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_play_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPlay')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_pause_is_true_if_can_control_and_track_can_be_paused(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPause')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertTrue(result)
def test_can_pause_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPause')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertFalse(result)
def test_can_seek_is_true_if_can_control_is_true(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanSeek')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertTrue(result)
def test_can_seek_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanSeek')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertFalse(result)
def test_can_control_is_true(self):
result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanControl')
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanControl')
self.assertTrue(result)
def test_next_is_ignored_if_can_go_next_is_false(self):

View File

@ -2,14 +2,14 @@ import mock
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends import mpris
from mopidy.frontends.mpris import objects
class RootInterfaceTest(unittest.TestCase):
def setUp(self):
mpris.exit_process = mock.Mock()
mpris.MprisObject._connect_to_dbus = mock.Mock()
objects.exit_process = mock.Mock()
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = DummyBackend.start().proxy()
self.mpris = mpris.MprisObject()
self.mpris = objects.MprisObject()
def tearDown(self):
self.backend.stop()
@ -18,37 +18,37 @@ class RootInterfaceTest(unittest.TestCase):
self.assert_(self.mpris._connect_to_dbus.called)
def test_can_raise_returns_false(self):
result = self.mpris.Get(mpris.ROOT_IFACE, 'CanRaise')
result = self.mpris.Get(objects.ROOT_IFACE, 'CanRaise')
self.assertFalse(result)
def test_raise_does_nothing(self):
self.mpris.Raise()
def test_can_quit_returns_true(self):
result = self.mpris.Get(mpris.ROOT_IFACE, 'CanQuit')
result = self.mpris.Get(objects.ROOT_IFACE, 'CanQuit')
self.assertTrue(result)
def test_quit_should_stop_all_actors(self):
self.mpris.Quit()
self.assert_(mpris.exit_process.called)
self.assert_(objects.exit_process.called)
def test_has_track_list_returns_false(self):
result = self.mpris.Get(mpris.ROOT_IFACE, 'HasTrackList')
result = self.mpris.Get(objects.ROOT_IFACE, 'HasTrackList')
self.assertFalse(result)
def test_identify_is_mopidy(self):
result = self.mpris.Get(mpris.ROOT_IFACE, 'Identity')
result = self.mpris.Get(objects.ROOT_IFACE, 'Identity')
self.assertEquals(result, 'Mopidy')
def test_desktop_entry_is_mopidy(self):
result = self.mpris.Get(mpris.ROOT_IFACE, 'DesktopEntry')
result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry')
self.assertEquals(result, 'mopidy')
def test_supported_uri_schemes_is_empty(self):
result = self.mpris.Get(mpris.ROOT_IFACE, 'SupportedUriSchemes')
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedUriSchemes')
self.assertEquals(len(result), 1)
self.assertEquals(result[0], 'dummy')
def test_supported_mime_types_is_empty(self):
result = self.mpris.Get(mpris.ROOT_IFACE, 'SupportedMimeTypes')
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedMimeTypes')
self.assertEquals(len(result), 0)