diff --git a/mopidy/frontends/mpris/__init__.py b/mopidy/frontends/mpris/__init__.py new file mode 100644 index 00000000..7e979152 --- /dev/null +++ b/mopidy/frontends/mpris/__init__.py @@ -0,0 +1,122 @@ +import logging + +logger = logging.getLogger('mopidy.frontends.mpris') + +try: + import indicate +except ImportError as import_error: + indicate = None + logger.debug(u'Startup notification will not be sent (%s)', import_error) + +from pykka.actor import ThreadingActor + +from mopidy.frontends.mpris import objects +from mopidy.listeners import BackendListener + + +class MprisFrontend(ThreadingActor, BackendListener): + """ + Frontend which lets you control Mopidy through the Media Player Remote + Interfacing Specification (MPRIS) D-Bus interface. + + An example of an MPRIS client is `Ubuntu's sound menu + `_. + + **Dependencies:** + + - ``dbus`` Python bindings. The package is named ``python-dbus`` in + Ubuntu/Debian. + - ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the + Ubuntu Sound Menu. The package is named ``python-indicate`` in + Ubuntu/Debian. + + **Testing the frontend** + + To test, start Mopidy, and then run the following in a Python shell:: + + import dbus + bus = dbus.SessionBus() + player = bus.get_object('org.mpris.MediaPlayer2.mopidy', + '/org/mpris/MediaPlayer2') + + Now you can control Mopidy through the player object. Examples: + + - To get some properties from Mopidy, run:: + + props = player.GetAll('org.mpris.MediaPlayer2', + dbus_interface='org.freedesktop.DBus.Properties') + + - To quit Mopidy through D-Bus, run:: + + player.Quit(dbus_interface='org.mpris.MediaPlayer2') + """ + + def __init__(self): + self.indicate_server = None + self.mpris_object = None + + def on_start(self): + self.mpris_object = objects.MprisObject() + self.send_startup_notification() + + def on_stop(self): + logger.debug(u'Removing MPRIS object from D-Bus connection...') + self.mpris_object.remove_from_connection() + self.mpris_object = None + logger.debug(u'Removed MPRIS object from D-Bus connection') + + def send_startup_notification(self): + """ + Send startup notification using libindicate to make Mopidy appear in + e.g. `Ubuntu's sound menu `_. + + A reference to the libindicate server is kept for as long as Mopidy is + running. When Mopidy exits, the server will be unreferenced and Mopidy + will automatically be unregistered from e.g. the sound menu. + """ + if not indicate: + return + logger.debug(u'Sending startup notification...') + self.indicate_server = indicate.Server() + self.indicate_server.set_type('music.mopidy') + # FIXME Location of .desktop file shouldn't be hardcoded + self.indicate_server.set_desktop_file( + '/usr/share/applications/mopidy.desktop') + self.indicate_server.show() + logger.debug(u'Startup notification sent') + + def _emit_properties_changed(self, *changed_properties): + if self.mpris_object is None: + return + props_with_new_values = [ + (p, self.mpris_object.Get(objects.PLAYER_IFACE, p)) + for p in changed_properties] + self.mpris_object.PropertiesChanged(objects.PLAYER_IFACE, + dict(props_with_new_values), []) + + def track_playback_paused(self, track, time_position): + logger.debug(u'Received track playback paused event') + self._emit_properties_changed('PlaybackStatus') + + def track_playback_resumed(self, track, time_position): + logger.debug(u'Received track playback resumed event') + self._emit_properties_changed('PlaybackStatus') + + def track_playback_started(self, track): + logger.debug(u'Received track playback started event') + self._emit_properties_changed('PlaybackStatus', 'Metadata') + + def track_playback_ended(self, track, time_position): + logger.debug(u'Received track playback ended event') + self._emit_properties_changed('PlaybackStatus', 'Metadata') + + def volume_changed(self): + logger.debug(u'Received volume changed event') + self._emit_properties_changed('Volume') + + def seeked(self): + logger.debug(u'Received seeked event') + if self.mpris_object is None: + return + self.mpris_object.Seeked( + self.mpris_object.Get(objects.PLAYER_IFACE, 'Position')) diff --git a/mopidy/frontends/mpris.py b/mopidy/frontends/mpris/objects.py similarity index 78% rename from mopidy/frontends/mpris.py rename to mopidy/frontends/mpris/objects.py index d932b853..3721e119 100644 --- a/mopidy/frontends/mpris.py +++ b/mopidy/frontends/mpris/objects.py @@ -10,18 +10,10 @@ except ImportError as import_error: from mopidy import OptionalDependencyError raise OptionalDependencyError(import_error) -try: - import indicate -except ImportError as import_error: - indicate = None - logger.debug(u'Startup notification will not be sent (%s)', import_error) - -from pykka.actor import ThreadingActor from pykka.registry import ActorRegistry from mopidy.backends.base import Backend from mopidy.backends.base.playback import PlaybackController -from mopidy.listeners import BackendListener from mopidy.mixers.base import BaseMixer from mopidy.utils.process import exit_process @@ -35,112 +27,6 @@ ROOT_IFACE = 'org.mpris.MediaPlayer2' PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player' -class MprisFrontend(ThreadingActor, BackendListener): - """ - Frontend which lets you control Mopidy through the Media Player Remote - Interfacing Specification (MPRIS) D-Bus interface. - - An example of an MPRIS client is `Ubuntu's sound menu - `_. - - **Dependencies:** - - - ``dbus`` Python bindings. The package is named ``python-dbus`` in - Ubuntu/Debian. - - ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the - Ubuntu Sound Menu. The package is named ``python-indicate`` in - Ubuntu/Debian. - - **Testing the frontend** - - To test, start Mopidy, and then run the following in a Python shell:: - - import dbus - bus = dbus.SessionBus() - player = bus.get_object('org.mpris.MediaPlayer2.mopidy', - '/org/mpris/MediaPlayer2') - - Now you can control Mopidy through the player object. Examples: - - - To get some properties from Mopidy, run:: - - props = player.GetAll('org.mpris.MediaPlayer2', - dbus_interface='org.freedesktop.DBus.Properties') - - - To quit Mopidy through D-Bus, run:: - - player.Quit(dbus_interface='org.mpris.MediaPlayer2') - """ - - def __init__(self): - self.indicate_server = None - self.mpris_object = None - - def on_start(self): - self.mpris_object = MprisObject() - self.send_startup_notification() - - def on_stop(self): - logger.debug(u'Removing MPRIS object from D-Bus connection...') - self.mpris_object.remove_from_connection() - self.mpris_object = None - logger.debug(u'Removed MPRIS object from D-Bus connection') - - def send_startup_notification(self): - """ - Send startup notification using libindicate to make Mopidy appear in - e.g. `Ubuntu's sound menu `_. - - A reference to the libindicate server is kept for as long as Mopidy is - running. When Mopidy exits, the server will be unreferenced and Mopidy - will automatically be unregistered from e.g. the sound menu. - """ - if not indicate: - return - logger.debug(u'Sending startup notification...') - self.indicate_server = indicate.Server() - self.indicate_server.set_type('music.mopidy') - # FIXME Location of .desktop file shouldn't be hardcoded - self.indicate_server.set_desktop_file( - '/usr/share/applications/mopidy.desktop') - self.indicate_server.show() - logger.debug(u'Startup notification sent') - - def _emit_properties_changed(self, *changed_properties): - if self.mpris_object is None: - return - props_with_new_values = [(p, self.mpris_object.Get(PLAYER_IFACE, p)) - for p in changed_properties] - self.mpris_object.PropertiesChanged(PLAYER_IFACE, - dict(props_with_new_values), []) - - def track_playback_paused(self, track, time_position): - logger.debug(u'Received track playback paused event') - self._emit_properties_changed('PlaybackStatus') - - def track_playback_resumed(self, track, time_position): - logger.debug(u'Received track playback resumed event') - self._emit_properties_changed('PlaybackStatus') - - def track_playback_started(self, track): - logger.debug(u'Received track playback started event') - self._emit_properties_changed('PlaybackStatus', 'Metadata') - - def track_playback_ended(self, track, time_position): - logger.debug(u'Received track playback ended event') - self._emit_properties_changed('PlaybackStatus', 'Metadata') - - def volume_changed(self): - logger.debug(u'Received volume changed event') - self._emit_properties_changed('Volume') - - def seeked(self): - logger.debug(u'Received seeked event') - if self.mpris_object is None: - return - self.mpris_object.Seeked(PLAYER_IFACE, self.mpris_object.Get(PLAYER_IFACE, 'Position')) - - class MprisObject(dbus.service.Object): """Implements http://www.mpris.org/2.1/spec/""" @@ -198,7 +84,8 @@ class MprisObject(dbus.service.Object): def backend(self): if self._backend is None: backend_refs = ActorRegistry.get_by_class(Backend) - assert len(backend_refs) == 1, 'Expected exactly one running backend.' + assert len(backend_refs) == 1, \ + 'Expected exactly one running backend.' self._backend = backend_refs[0].proxy() return self._backend @@ -262,7 +149,7 @@ class MprisObject(dbus.service.Object): @dbus.service.method(dbus_interface=ROOT_IFACE) def Raise(self): logger.debug(u'%s.Raise called', ROOT_IFACE) - pass # We do not have a GUI + # Do nothing, as we do not have a GUI @dbus.service.method(dbus_interface=ROOT_IFACE) def Quit(self): @@ -390,7 +277,7 @@ class MprisObject(dbus.service.Object): @dbus.service.signal(dbus_interface=PLAYER_IFACE, signature='x') def Seeked(self, position): logger.debug(u'%s.Seeked signaled', PLAYER_IFACE) - pass + # Do nothing, as just calling the method is enough to emit the signal. ### Player interface properties diff --git a/tests/frontends/mpris/events_test.py b/tests/frontends/mpris/events_test.py index 803094a8..2f737744 100644 --- a/tests/frontends/mpris/events_test.py +++ b/tests/frontends/mpris/events_test.py @@ -1,66 +1,68 @@ import mock import unittest -from mopidy.frontends.mpris import MprisFrontend, MprisObject, PLAYER_IFACE +from mopidy.frontends.mpris import MprisFrontend, objects from mopidy.models import Track class BackendEventsTest(unittest.TestCase): def setUp(self): self.mpris_frontend = MprisFrontend() # As a plain class, not an actor - self.mpris_object = mock.Mock(spec=MprisObject) + self.mpris_object = mock.Mock(spec=objects.MprisObject) self.mpris_frontend.mpris_object = self.mpris_object def test_track_playback_paused_event_changes_playback_status(self): self.mpris_object.Get.return_value = 'Paused' self.mpris_frontend.track_playback_paused(Track(), 0) self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((PLAYER_IFACE, 'PlaybackStatus'), {}), + ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), ]) self.mpris_object.PropertiesChanged.assert_called_with( - PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, []) + objects.PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, []) def test_track_playback_resumed_event_changes_playback_status(self): self.mpris_object.Get.return_value = 'Playing' self.mpris_frontend.track_playback_resumed(Track(), 0) self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((PLAYER_IFACE, 'PlaybackStatus'), {}), + ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), ]) self.mpris_object.PropertiesChanged.assert_called_with( - PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, []) + objects.PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, []) def test_track_playback_started_event_changes_playback_status_and_metadata(self): self.mpris_object.Get.return_value = '...' self.mpris_frontend.track_playback_started(Track()) self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((PLAYER_IFACE, 'PlaybackStatus'), {}), - ((PLAYER_IFACE, 'Metadata'), {}), + ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), + ((objects.PLAYER_IFACE, 'Metadata'), {}), ]) self.mpris_object.PropertiesChanged.assert_called_with( - PLAYER_IFACE, {'Metadata': '...', 'PlaybackStatus': '...'}, []) + objects.PLAYER_IFACE, + {'Metadata': '...', 'PlaybackStatus': '...'}, []) def test_track_playback_ended_event_changes_playback_status_and_metadata(self): self.mpris_object.Get.return_value = '...' self.mpris_frontend.track_playback_ended(Track(), 0) self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((PLAYER_IFACE, 'PlaybackStatus'), {}), - ((PLAYER_IFACE, 'Metadata'), {}), + ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), + ((objects.PLAYER_IFACE, 'Metadata'), {}), ]) self.mpris_object.PropertiesChanged.assert_called_with( - PLAYER_IFACE, {'Metadata': '...', 'PlaybackStatus': '...'}, []) + objects.PLAYER_IFACE, + {'Metadata': '...', 'PlaybackStatus': '...'}, []) def test_volume_changed_event_changes_volume(self): self.mpris_object.Get.return_value = 1.0 self.mpris_frontend.volume_changed() self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((PLAYER_IFACE, 'Volume'), {}), + ((objects.PLAYER_IFACE, 'Volume'), {}), ]) self.mpris_object.PropertiesChanged.assert_called_with( - PLAYER_IFACE, {'Volume': 1.0}, []) + objects.PLAYER_IFACE, {'Volume': 1.0}, []) def test_seeked_event_causes_mpris_seeked_event(self): self.mpris_object.Get.return_value = 31000000 self.mpris_frontend.seeked() self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((PLAYER_IFACE, 'Position'), {}), + ((objects.PLAYER_IFACE, 'Position'), {}), ]) - self.mpris_object.Seeked.assert_called_with( PLAYER_IFACE, 31000000) + self.mpris_object.Seeked.assert_called_with(31000000) diff --git a/tests/frontends/mpris/player_interface_test.py b/tests/frontends/mpris/player_interface_test.py index 1ddd23fe..ee668a33 100644 --- a/tests/frontends/mpris/player_interface_test.py +++ b/tests/frontends/mpris/player_interface_test.py @@ -3,7 +3,7 @@ import unittest from mopidy.backends.dummy import DummyBackend from mopidy.backends.base.playback import PlaybackController -from mopidy.frontends import mpris +from mopidy.frontends.mpris import objects from mopidy.mixers.dummy import DummyMixer from mopidy.models import Album, Artist, Track @@ -13,10 +13,10 @@ STOPPED = PlaybackController.STOPPED class PlayerInterfaceTest(unittest.TestCase): def setUp(self): - mpris.MprisObject._connect_to_dbus = mock.Mock() + objects.MprisObject._connect_to_dbus = mock.Mock() self.mixer = DummyMixer.start().proxy() self.backend = DummyBackend.start().proxy() - self.mpris = mpris.MprisObject() + self.mpris = objects.MprisObject() self.mpris._backend = self.backend def tearDown(self): @@ -25,68 +25,68 @@ class PlayerInterfaceTest(unittest.TestCase): def test_get_playback_status_is_playing_when_playing(self): self.backend.playback.state = PLAYING - result = self.mpris.Get(mpris.PLAYER_IFACE, 'PlaybackStatus') + result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus') self.assertEqual('Playing', result) def test_get_playback_status_is_paused_when_paused(self): self.backend.playback.state = PAUSED - result = self.mpris.Get(mpris.PLAYER_IFACE, 'PlaybackStatus') + result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus') self.assertEqual('Paused', result) def test_get_playback_status_is_stopped_when_stopped(self): self.backend.playback.state = STOPPED - result = self.mpris.Get(mpris.PLAYER_IFACE, 'PlaybackStatus') + result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus') self.assertEqual('Stopped', result) def test_get_loop_status_is_none_when_not_looping(self): self.backend.playback.repeat = False self.backend.playback.single = False - result = self.mpris.Get(mpris.PLAYER_IFACE, 'LoopStatus') + result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus') self.assertEqual('None', result) def test_get_loop_status_is_track_when_looping_a_single_track(self): self.backend.playback.repeat = True self.backend.playback.single = True - result = self.mpris.Get(mpris.PLAYER_IFACE, 'LoopStatus') + result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus') self.assertEqual('Track', result) def test_get_loop_status_is_playlist_when_looping_the_current_playlist(self): self.backend.playback.repeat = True self.backend.playback.single = False - result = self.mpris.Get(mpris.PLAYER_IFACE, 'LoopStatus') + result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus') self.assertEqual('Playlist', result) def test_set_loop_status_is_ignored_if_can_control_is_false(self): self.mpris.get_CanControl = lambda *_: False self.backend.playback.repeat = True self.backend.playback.single = True - self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'None') + self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None') self.assertEquals(self.backend.playback.repeat.get(), True) self.assertEquals(self.backend.playback.single.get(), True) def test_set_loop_status_to_none_unsets_repeat_and_single(self): - self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'None') + self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None') self.assertEquals(self.backend.playback.repeat.get(), False) self.assertEquals(self.backend.playback.single.get(), False) def test_set_loop_status_to_track_sets_repeat_and_single(self): - self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'Track') + self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Track') self.assertEquals(self.backend.playback.repeat.get(), True) self.assertEquals(self.backend.playback.single.get(), True) def test_set_loop_status_to_playlists_sets_repeat_and_not_single(self): - self.mpris.Set(mpris.PLAYER_IFACE, 'LoopStatus', 'Playlist') + self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Playlist') self.assertEquals(self.backend.playback.repeat.get(), True) self.assertEquals(self.backend.playback.single.get(), False) def test_get_rate_is_greater_or_equal_than_minimum_rate(self): - rate = self.mpris.Get(mpris.PLAYER_IFACE, 'Rate') - minimum_rate = self.mpris.Get(mpris.PLAYER_IFACE, 'MinimumRate') + rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate') + minimum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate') self.assert_(rate >= minimum_rate) def test_get_rate_is_less_or_equal_than_maximum_rate(self): - rate = self.mpris.Get(mpris.PLAYER_IFACE, 'Rate') - maximum_rate = self.mpris.Get(mpris.PLAYER_IFACE, 'MaximumRate') + rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate') + maximum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate') self.assert_(rate >= maximum_rate) def test_set_rate_is_ignored_if_can_control_is_false(self): @@ -94,46 +94,46 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')]) self.backend.playback.play() self.assertEquals(self.backend.playback.state.get(), PLAYING) - self.mpris.Set(mpris.PLAYER_IFACE, 'Rate', 0) + self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0) self.assertEquals(self.backend.playback.state.get(), PLAYING) def test_set_rate_to_zero_pauses_playback(self): self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')]) self.backend.playback.play() self.assertEquals(self.backend.playback.state.get(), PLAYING) - self.mpris.Set(mpris.PLAYER_IFACE, 'Rate', 0) + self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0) self.assertEquals(self.backend.playback.state.get(), PAUSED) def test_get_shuffle_returns_true_if_random_is_active(self): self.backend.playback.random = True - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Shuffle') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle') self.assertTrue(result) def test_get_shuffle_returns_false_if_random_is_inactive(self): self.backend.playback.random = False - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Shuffle') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle') self.assertFalse(result) def test_set_shuffle_is_ignored_if_can_control_is_false(self): self.mpris.get_CanControl = lambda *_: False self.backend.playback.random = False - result = self.mpris.Set(mpris.PLAYER_IFACE, 'Shuffle', True) + result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True) self.assertFalse(self.backend.playback.random.get()) def test_set_shuffle_to_true_activates_random_mode(self): self.backend.playback.random = False self.assertFalse(self.backend.playback.random.get()) - result = self.mpris.Set(mpris.PLAYER_IFACE, 'Shuffle', True) + result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True) self.assertTrue(self.backend.playback.random.get()) def test_set_shuffle_to_false_deactivates_random_mode(self): self.backend.playback.random = True self.assertTrue(self.backend.playback.random.get()) - result = self.mpris.Set(mpris.PLAYER_IFACE, 'Shuffle', False) + result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', False) self.assertFalse(self.backend.playback.random.get()) def test_get_metadata_has_trackid_even_when_no_current_track(self): - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assert_('mpris:trackid' in result.keys()) self.assertEquals(result['mpris:trackid'], '') @@ -141,7 +141,7 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(uri='a')]) self.backend.playback.play() (cpid, track) = self.backend.playback.current_cp_track.get() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('mpris:trackid', result.keys()) self.assertEquals(result['mpris:trackid'], '/com/mopidy/track/%d' % cpid) @@ -149,21 +149,21 @@ class PlayerInterfaceTest(unittest.TestCase): def test_get_metadata_has_track_length(self): self.backend.current_playlist.append([Track(uri='a', length=40000)]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('mpris:length', result.keys()) self.assertEquals(result['mpris:length'], 40000000) def test_get_metadata_has_track_uri(self): self.backend.current_playlist.append([Track(uri='a')]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('xesam:url', result.keys()) self.assertEquals(result['xesam:url'], 'a') def test_get_metadata_has_track_title(self): self.backend.current_playlist.append([Track(name='a')]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('xesam:title', result.keys()) self.assertEquals(result['xesam:title'], 'a') @@ -171,14 +171,14 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(artists=[ Artist(name='a'), Artist(name='b'), Artist(name=None)])]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('xesam:artist', result.keys()) self.assertEquals(result['xesam:artist'], ['a', 'b']) def test_get_metadata_has_track_album(self): self.backend.current_playlist.append([Track(album=Album(name='a'))]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('xesam:album', result.keys()) self.assertEquals(result['xesam:album'], 'a') @@ -186,75 +186,75 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(album=Album(artists=[ Artist(name='a'), Artist(name='b'), Artist(name=None)]))]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('xesam:albumArtist', result.keys()) self.assertEquals(result['xesam:albumArtist'], ['a', 'b']) def test_get_metadata_has_track_number_in_album(self): self.backend.current_playlist.append([Track(track_no=7)]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Metadata') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') self.assertIn('xesam:trackNumber', result.keys()) self.assertEquals(result['xesam:trackNumber'], 7) def test_get_volume_should_return_volume_between_zero_and_one(self): self.mixer.volume = 0 - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Volume') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume') self.assertEquals(result, 0) self.mixer.volume = 50 - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Volume') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume') self.assertEquals(result, 0.5) self.mixer.volume = 100 - result = self.mpris.Get(mpris.PLAYER_IFACE, 'Volume') + result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume') self.assertEquals(result, 1) def test_set_volume_is_ignored_if_can_control_is_false(self): self.mpris.get_CanControl = lambda *_: False self.mixer.volume = 0 - self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', 1.0) + self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0) self.assertEquals(self.mixer.volume.get(), 0) def test_set_volume_to_one_should_set_mixer_volume_to_100(self): - self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', 1.0) + self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0) self.assertEquals(self.mixer.volume.get(), 100) def test_set_volume_to_anything_above_one_should_set_mixer_volume_to_100(self): - self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', 2.0) + self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 2.0) self.assertEquals(self.mixer.volume.get(), 100) def test_set_volume_to_anything_not_a_number_does_not_change_volume(self): self.mixer.volume = 10 - self.mpris.Set(mpris.PLAYER_IFACE, 'Volume', None) + self.mpris.Set(objects.PLAYER_IFACE, 'Volume', None) self.assertEquals(self.mixer.volume.get(), 10) def test_get_position_returns_time_position_in_microseconds(self): self.backend.current_playlist.append([Track(uri='a', length=40000)]) self.backend.playback.play() self.backend.playback.seek(10000) - result_in_microseconds = self.mpris.Get(mpris.PLAYER_IFACE, 'Position') + result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position') result_in_milliseconds = result_in_microseconds // 1000 self.assert_(result_in_milliseconds >= 10000) def test_get_position_when_no_current_track_should_be_zero(self): - result_in_microseconds = self.mpris.Get(mpris.PLAYER_IFACE, 'Position') + result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position') result_in_milliseconds = result_in_microseconds // 1000 self.assertEquals(result_in_milliseconds, 0) def test_get_minimum_rate_is_one_or_less(self): - result = self.mpris.Get(mpris.PLAYER_IFACE, 'MinimumRate') + result = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate') self.assert_(result <= 1.0) def test_get_maximum_rate_is_one_or_more(self): - result = self.mpris.Get(mpris.PLAYER_IFACE, 'MaximumRate') + result = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate') self.assert_(result >= 1.0) def test_can_go_next_is_true_if_can_control_and_other_next_track(self): self.mpris.get_CanControl = lambda *_: True self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoNext') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext') self.assertTrue(result) def test_can_go_next_is_false_if_next_track_is_the_same(self): @@ -262,14 +262,14 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(uri='a')]) self.backend.playback.repeat = True self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoNext') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext') self.assertFalse(result) def test_can_go_next_is_false_if_can_control_is_false(self): self.mpris.get_CanControl = lambda *_: False self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')]) self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoNext') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext') self.assertFalse(result) def test_can_go_previous_is_true_if_can_control_and_other_previous_track(self): @@ -277,7 +277,7 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')]) self.backend.playback.play() self.backend.playback.next() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoPrevious') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious') self.assertTrue(result) def test_can_go_previous_is_false_if_previous_track_is_the_same(self): @@ -285,7 +285,7 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(uri='a')]) self.backend.playback.repeat = True self.backend.playback.play() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoPrevious') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious') self.assertFalse(result) def test_can_go_previous_is_false_if_can_control_is_false(self): @@ -293,7 +293,7 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')]) self.backend.playback.play() self.backend.playback.next() - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanGoPrevious') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious') self.assertFalse(result) def test_can_play_is_true_if_can_control_and_current_track(self): @@ -301,42 +301,42 @@ class PlayerInterfaceTest(unittest.TestCase): self.backend.current_playlist.append([Track(uri='a')]) self.backend.playback.play() self.assertTrue(self.backend.playback.current_track.get()) - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPlay') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay') self.assertTrue(result) def test_can_play_is_false_if_no_current_track(self): self.mpris.get_CanControl = lambda *_: True self.assertFalse(self.backend.playback.current_track.get()) - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPlay') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay') self.assertFalse(result) def test_can_play_if_false_if_can_control_is_false(self): self.mpris.get_CanControl = lambda *_: False - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPlay') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay') self.assertFalse(result) def test_can_pause_is_true_if_can_control_and_track_can_be_paused(self): self.mpris.get_CanControl = lambda *_: True - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPause') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause') self.assertTrue(result) def test_can_pause_if_false_if_can_control_is_false(self): self.mpris.get_CanControl = lambda *_: False - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanPause') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause') self.assertFalse(result) def test_can_seek_is_true_if_can_control_is_true(self): self.mpris.get_CanControl = lambda *_: True - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanSeek') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek') self.assertTrue(result) def test_can_seek_is_false_if_can_control_is_false(self): self.mpris.get_CanControl = lambda *_: False - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanSeek') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek') self.assertFalse(result) def test_can_control_is_true(self): - result = self.mpris.Get(mpris.PLAYER_IFACE, 'CanControl') + result = self.mpris.Get(objects.PLAYER_IFACE, 'CanControl') self.assertTrue(result) def test_next_is_ignored_if_can_go_next_is_false(self): diff --git a/tests/frontends/mpris/root_interface_test.py b/tests/frontends/mpris/root_interface_test.py index f781d261..72800f64 100644 --- a/tests/frontends/mpris/root_interface_test.py +++ b/tests/frontends/mpris/root_interface_test.py @@ -2,14 +2,14 @@ import mock import unittest from mopidy.backends.dummy import DummyBackend -from mopidy.frontends import mpris +from mopidy.frontends.mpris import objects class RootInterfaceTest(unittest.TestCase): def setUp(self): - mpris.exit_process = mock.Mock() - mpris.MprisObject._connect_to_dbus = mock.Mock() + objects.exit_process = mock.Mock() + objects.MprisObject._connect_to_dbus = mock.Mock() self.backend = DummyBackend.start().proxy() - self.mpris = mpris.MprisObject() + self.mpris = objects.MprisObject() def tearDown(self): self.backend.stop() @@ -18,37 +18,37 @@ class RootInterfaceTest(unittest.TestCase): self.assert_(self.mpris._connect_to_dbus.called) def test_can_raise_returns_false(self): - result = self.mpris.Get(mpris.ROOT_IFACE, 'CanRaise') + result = self.mpris.Get(objects.ROOT_IFACE, 'CanRaise') self.assertFalse(result) def test_raise_does_nothing(self): self.mpris.Raise() def test_can_quit_returns_true(self): - result = self.mpris.Get(mpris.ROOT_IFACE, 'CanQuit') + result = self.mpris.Get(objects.ROOT_IFACE, 'CanQuit') self.assertTrue(result) def test_quit_should_stop_all_actors(self): self.mpris.Quit() - self.assert_(mpris.exit_process.called) + self.assert_(objects.exit_process.called) def test_has_track_list_returns_false(self): - result = self.mpris.Get(mpris.ROOT_IFACE, 'HasTrackList') + result = self.mpris.Get(objects.ROOT_IFACE, 'HasTrackList') self.assertFalse(result) def test_identify_is_mopidy(self): - result = self.mpris.Get(mpris.ROOT_IFACE, 'Identity') + result = self.mpris.Get(objects.ROOT_IFACE, 'Identity') self.assertEquals(result, 'Mopidy') def test_desktop_entry_is_mopidy(self): - result = self.mpris.Get(mpris.ROOT_IFACE, 'DesktopEntry') + result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry') self.assertEquals(result, 'mopidy') def test_supported_uri_schemes_is_empty(self): - result = self.mpris.Get(mpris.ROOT_IFACE, 'SupportedUriSchemes') + result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedUriSchemes') self.assertEquals(len(result), 1) self.assertEquals(result[0], 'dummy') def test_supported_mime_types_is_empty(self): - result = self.mpris.Get(mpris.ROOT_IFACE, 'SupportedMimeTypes') + result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedMimeTypes') self.assertEquals(len(result), 0)