diff --git a/docs/api/frontends.rst b/docs/api/frontends.rst index 96d8266e..70bd73cf 100644 --- a/docs/api/frontends.rst +++ b/docs/api/frontends.rst @@ -49,4 +49,3 @@ Frontend implementations * :mod:`mopidy.frontends.http` * :mod:`mopidy.frontends.mpd` -* :mod:`mopidy.frontends.mpris` diff --git a/docs/clients/mpris.rst b/docs/clients/mpris.rst index 141a2371..e1bd4bff 100644 --- a/docs/clients/mpris.rst +++ b/docs/clients/mpris.rst @@ -8,7 +8,8 @@ MPRIS clients Specification. It's a spec that describes a standard D-Bus interface for making media players available to other applications on the same system. -Mopidy's :ref:`MPRIS frontend ` currently implements all required +The MPRIS frontend provided by the `Mopidy-MPRIS extension +`_ currently implements all required parts of the MPRIS spec, plus the optional playlist interface. It does not implement the optional tracklist interface. diff --git a/docs/clients/upnp.rst b/docs/clients/upnp.rst index 9f30bd1c..7f21a6c6 100644 --- a/docs/clients/upnp.rst +++ b/docs/clients/upnp.rst @@ -36,19 +36,21 @@ How to make Mopidy available as an UPnP MediaRenderer ===================================================== With the help of `the Rygel project `_ Mopidy can -be made available as an UPnP MediaRenderer. Rygel will interface with Mopidy's -:ref:`MPRIS frontend `, and make Mopidy available as a MediaRenderer -on the local network. Since this depends on the MPRIS frontend, which again -depends on D-Bus being available, this will only work on Linux, and not OS X. -MPRIS/D-Bus is only available to other applications on the same host, so Rygel -must be running on the same machine as Mopidy. +be made available as an UPnP MediaRenderer. Rygel will interface with the MPRIS +interface provided by the `Mopidy-MPRIS extension +`_, and make Mopidy available as a +MediaRenderer on the local network. Since this depends on the MPRIS frontend, +which again depends on D-Bus being available, this will only work on Linux, and +not OS X. MPRIS/D-Bus is only available to other applications on the same +host, so Rygel must be running on the same machine as Mopidy. -1. Start Mopidy and make sure the :ref:`MPRIS frontend ` is working. - It is activated by default, but you may miss dependencies or be using OS X, - in which case it will not work. Check the console output when Mopidy is - started for any errors related to the MPRIS frontend. If you're unsure it is - working, there are instructions for how to test it on the :ref:`MPRIS - frontend ` page. +1. Start Mopidy and make sure the MPRIS frontend is working. It is activated + by default when the Mopidy-MPRIS extension is installed, but you may miss + dependencies or be using OS X, in which case it will not work. Check the + console output when Mopidy is started for any errors related to the MPRIS + frontend. If you're unsure it is working, there are instructions for how to + test it on in the `Mopidy-MPRIS readme + `_. 2. Install Rygel. On Debian/Ubuntu:: diff --git a/docs/ext/index.rst b/docs/ext/index.rst index 3163a6c0..bdc1efe8 100644 --- a/docs/ext/index.rst +++ b/docs/ext/index.rst @@ -48,6 +48,15 @@ Provides a backend for playing music from `Google Play Music `_. +Mopidy-MPRIS +------------ + +https://github.com/mopidy/mopidy-mpris + +Extension for controlling Mopidy through the `MPRIS `_ +D-Bus interface, for example using the Ubuntu Sound Menu. + + Mopidy-NAD ---------- @@ -67,7 +76,7 @@ Extension for scrobbling played tracks to Last.fm. Mopidy-SomaFM ------------- -https://github.com/AlexandrePTJ/mopidy-somafm/ +https://github.com/AlexandrePTJ/mopidy-somafm Provides a backend for playing music from the `SomaFM `_ service. diff --git a/docs/ext/mpris.rst b/docs/ext/mpris.rst deleted file mode 100644 index 125f8fec..00000000 --- a/docs/ext/mpris.rst +++ /dev/null @@ -1,105 +0,0 @@ -.. _ext-mpris: - -************ -Mopidy-MPRIS -************ - -This extension lets you control Mopidy through the Media Player Remote -Interfacing Specification (`MPRIS `_) D-Bus interface. - -An example of an MPRIS client is the :ref:`ubuntu-sound-menu`. - - -Dependencies -============ - -- D-Bus Python bindings. The package is named ``python-dbus`` in - Ubuntu/Debian. - -- ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the - Ubuntu Sound Menu. The package is named ``python-indicate`` in - Ubuntu/Debian. - -- An ``.desktop`` file for Mopidy installed at the path set in the - :confval:`mpris/desktop_file` config value. See usage section below for - details. - - -Default configuration -===================== - -.. literalinclude:: ../../mopidy/frontends/mpris/ext.conf - :language: ini - - -Configuration values -==================== - -.. confval:: mpris/enabled - - If the MPRIS extension should be enabled or not. - -.. confval:: mpris/desktop_file - - Location of the Mopidy ``.desktop`` file. - - -Usage -===== - -The extension is enabled by default if all dependencies are available. - - -Controlling Mopidy through the Ubuntu Sound Menu ------------------------------------------------- - -If you are running Ubuntu and installed Mopidy using the Debian package from -APT you should be able to control Mopidy through the :ref:`ubuntu-sound-menu` -without any changes. - -If you installed Mopidy in any other way and want to control Mopidy through the -Ubuntu Sound Menu, you must install the ``mopidy.desktop`` file which can be -found in the ``data/`` dir of the Mopidy source repo into the -``/usr/share/applications`` dir by hand:: - - cd /path/to/mopidy/source - sudo cp data/mopidy.desktop /usr/share/applications/ - -If the correct path to the installed ``mopidy.desktop`` file on your system -isn't ``/usr/share/applications/mopidy.conf``, you'll need to set the -:confval:`mpris/desktop_file` config value. - -After you have installed the file, start Mopidy in any way, and Mopidy should -appear in the Ubuntu Sound Menu. When you quit Mopidy, it will still be listed -in the Ubuntu Sound Menu, and may be restarted by selecting it there. - -The Ubuntu Sound Menu interacts with Mopidy's MPRIS frontend. The MPRIS -frontend supports the minimum requirements of the `MPRIS specification -`_. The ``TrackList`` interface of the spec is not -supported. - - -Testing the MPRIS API directly ------------------------------- - -To use the MPRIS API directly, start Mopidy, and then run the following in a -Python shell:: - - import dbus - bus = dbus.SessionBus() - player = bus.get_object('org.mpris.MediaPlayer2.mopidy', - '/org/mpris/MediaPlayer2') - -Now you can control Mopidy through the player object. Examples: - -- To get some properties from Mopidy, run:: - - props = player.GetAll('org.mpris.MediaPlayer2', - dbus_interface='org.freedesktop.DBus.Properties') - -- To quit Mopidy through D-Bus, run:: - - player.Quit(dbus_interface='org.mpris.MediaPlayer2') - -For details on the API, please refer to the `MPRIS specification -`_. diff --git a/docs/glossary.rst b/docs/glossary.rst index 102af3b6..2acb9981 100644 --- a/docs/glossary.rst +++ b/docs/glossary.rst @@ -24,10 +24,9 @@ Glossary frontend A part of Mopidy *using* the :term:`core` API. Existing frontends - include the :ref:`MPD server `, the :ref:`MPRIS/D-Bus - integration `, the Last.fm scrobbler, and the :ref:`HTTP - server ` with JavaScript API. See :ref:`frontend-api` for - details. + include the :ref:`MPD server `, the MPRIS/D-Bus integration, + the Last.fm scrobbler, and the :ref:`HTTP server ` with + JavaScript API. See :ref:`frontend-api` for details. mixer A GStreamer element that controls audio volume. diff --git a/docs/installation/index.rst b/docs/installation/index.rst index 85e07c9d..369e3e29 100644 --- a/docs/installation/index.rst +++ b/docs/installation/index.rst @@ -250,8 +250,8 @@ can install Mopidy from PyPI using Pip. sudo pip-python install -U cherrypy ws4py -#. Optional: To use MPRIS, e.g. for controlling Mopidy from the Ubuntu Sound - Menu or from an UPnP client via Rygel, you need some additional +#. Optional: To use Mopidy-MPRIS, e.g. for controlling Mopidy from the Ubuntu + Sound Menu or from an UPnP client via Rygel, you need some additional dependencies: the Python bindings for libindicate, and the Python bindings for libdbus, the reference D-Bus library. diff --git a/mopidy/frontends/mpris/__init__.py b/mopidy/frontends/mpris/__init__.py deleted file mode 100644 index 1fd258b5..00000000 --- a/mopidy/frontends/mpris/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -from __future__ import unicode_literals - -import os - -import mopidy -from mopidy import config, exceptions, ext - - -class Extension(ext.Extension): - - dist_name = 'Mopidy-MPRIS' - ext_name = 'mpris' - version = mopidy.__version__ - - def get_default_config(self): - conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf') - return config.read(conf_file) - - def get_config_schema(self): - schema = super(Extension, self).get_config_schema() - schema['desktop_file'] = config.Path() - return schema - - def validate_environment(self): - if 'DISPLAY' not in os.environ: - raise exceptions.ExtensionError( - 'An X11 $DISPLAY is needed to use D-Bus') - - try: - import dbus # noqa - except ImportError as e: - raise exceptions.ExtensionError('dbus library not found', e) - - def get_frontend_classes(self): - from .actor import MprisFrontend - return [MprisFrontend] diff --git a/mopidy/frontends/mpris/actor.py b/mopidy/frontends/mpris/actor.py deleted file mode 100644 index d44e9262..00000000 --- a/mopidy/frontends/mpris/actor.py +++ /dev/null @@ -1,110 +0,0 @@ -from __future__ import unicode_literals - -import logging -import os - -import pykka - -from mopidy.core import CoreListener -from mopidy.frontends.mpris import objects - -logger = logging.getLogger('mopidy.frontends.mpris') - -try: - indicate = None - if 'DISPLAY' in os.environ: - import indicate -except ImportError: - pass - -if indicate is None: - logger.debug('Startup notification will not be sent') - - -class MprisFrontend(pykka.ThreadingActor, CoreListener): - def __init__(self, config, core): - super(MprisFrontend, self).__init__() - self.config = config - self.core = core - self.indicate_server = None - self.mpris_object = None - - def on_start(self): - try: - self.mpris_object = objects.MprisObject(self.config, self.core) - self._send_startup_notification() - except Exception as e: - logger.warning('MPRIS frontend setup failed (%s)', e) - self.stop() - - def on_stop(self): - logger.debug('Removing MPRIS object from D-Bus connection...') - if self.mpris_object: - self.mpris_object.remove_from_connection() - self.mpris_object = None - logger.debug('Removed MPRIS object from D-Bus connection') - - def _send_startup_notification(self): - """ - Send startup notification using libindicate to make Mopidy appear in - e.g. `Ubunt's sound menu `_. - - A reference to the libindicate server is kept for as long as Mopidy is - running. When Mopidy exits, the server will be unreferenced and Mopidy - will automatically be unregistered from e.g. the sound menu. - """ - if not indicate: - return - logger.debug('Sending startup notification...') - self.indicate_server = indicate.Server() - self.indicate_server.set_type('music.mopidy') - self.indicate_server.set_desktop_file( - self.config['mpris']['desktop_file']) - self.indicate_server.show() - logger.debug('Startup notification sent') - - def _emit_properties_changed(self, interface, changed_properties): - if self.mpris_object is None: - return - props_with_new_values = [ - (p, self.mpris_object.Get(interface, p)) - for p in changed_properties] - self.mpris_object.PropertiesChanged( - interface, dict(props_with_new_values), []) - - def track_playback_paused(self, tl_track, time_position): - logger.debug('Received track_playback_paused event') - self._emit_properties_changed(objects.PLAYER_IFACE, ['PlaybackStatus']) - - def track_playback_resumed(self, tl_track, time_position): - logger.debug('Received track_playback_resumed event') - self._emit_properties_changed(objects.PLAYER_IFACE, ['PlaybackStatus']) - - def track_playback_started(self, tl_track): - logger.debug('Received track_playback_started event') - self._emit_properties_changed( - objects.PLAYER_IFACE, ['PlaybackStatus', 'Metadata']) - - def track_playback_ended(self, tl_track, time_position): - logger.debug('Received track_playback_ended event') - self._emit_properties_changed( - objects.PLAYER_IFACE, ['PlaybackStatus', 'Metadata']) - - def volume_changed(self, volume): - logger.debug('Received volume_changed event') - self._emit_properties_changed(objects.PLAYER_IFACE, ['Volume']) - - def seeked(self, time_position_in_ms): - logger.debug('Received seeked event') - self.mpris_object.Seeked(time_position_in_ms * 1000) - - def playlists_loaded(self): - logger.debug('Received playlists_loaded event') - self._emit_properties_changed( - objects.PLAYLISTS_IFACE, ['PlaylistCount']) - - def playlist_changed(self, playlist): - logger.debug('Received playlist_changed event') - playlist_id = self.mpris_object.get_playlist_id(playlist.uri) - playlist = (playlist_id, playlist.name, '') - self.mpris_object.PlaylistChanged(playlist) diff --git a/mopidy/frontends/mpris/ext.conf b/mopidy/frontends/mpris/ext.conf deleted file mode 100644 index b83411c2..00000000 --- a/mopidy/frontends/mpris/ext.conf +++ /dev/null @@ -1,3 +0,0 @@ -[mpris] -enabled = true -desktop_file = /usr/share/applications/mopidy.desktop diff --git a/mopidy/frontends/mpris/objects.py b/mopidy/frontends/mpris/objects.py deleted file mode 100644 index 15be1eea..00000000 --- a/mopidy/frontends/mpris/objects.py +++ /dev/null @@ -1,498 +0,0 @@ -from __future__ import unicode_literals - -import base64 -import logging -import os - -import dbus -import dbus.mainloop.glib -import dbus.service -import gobject - -from mopidy.core import PlaybackState -from mopidy.utils.process import exit_process - - -logger = logging.getLogger('mopidy.frontends.mpris') - -# Must be done before dbus.SessionBus() is called -gobject.threads_init() -dbus.mainloop.glib.threads_init() - -BUS_NAME = 'org.mpris.MediaPlayer2.mopidy' -OBJECT_PATH = '/org/mpris/MediaPlayer2' -ROOT_IFACE = 'org.mpris.MediaPlayer2' -PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player' -PLAYLISTS_IFACE = 'org.mpris.MediaPlayer2.Playlists' - - -class MprisObject(dbus.service.Object): - """Implements http://www.mpris.org/2.2/spec/""" - - properties = None - - def __init__(self, config, core): - self.config = config - self.core = core - self.properties = { - ROOT_IFACE: self._get_root_iface_properties(), - PLAYER_IFACE: self._get_player_iface_properties(), - PLAYLISTS_IFACE: self._get_playlists_iface_properties(), - } - bus_name = self._connect_to_dbus() - dbus.service.Object.__init__(self, bus_name, OBJECT_PATH) - - def _get_root_iface_properties(self): - return { - 'CanQuit': (True, None), - 'Fullscreen': (False, None), - 'CanSetFullscreen': (False, None), - 'CanRaise': (False, None), - # NOTE Change if adding optional track list support - 'HasTrackList': (False, None), - 'Identity': ('Mopidy', None), - 'DesktopEntry': (self.get_DesktopEntry, None), - 'SupportedUriSchemes': (self.get_SupportedUriSchemes, None), - # NOTE Return MIME types supported by local backend if support for - # reporting supported MIME types is added - 'SupportedMimeTypes': (dbus.Array([], signature='s'), None), - } - - def _get_player_iface_properties(self): - return { - 'PlaybackStatus': (self.get_PlaybackStatus, None), - 'LoopStatus': (self.get_LoopStatus, self.set_LoopStatus), - 'Rate': (1.0, self.set_Rate), - 'Shuffle': (self.get_Shuffle, self.set_Shuffle), - 'Metadata': (self.get_Metadata, None), - 'Volume': (self.get_Volume, self.set_Volume), - 'Position': (self.get_Position, None), - 'MinimumRate': (1.0, None), - 'MaximumRate': (1.0, None), - 'CanGoNext': (self.get_CanGoNext, None), - 'CanGoPrevious': (self.get_CanGoPrevious, None), - 'CanPlay': (self.get_CanPlay, None), - 'CanPause': (self.get_CanPause, None), - 'CanSeek': (self.get_CanSeek, None), - 'CanControl': (self.get_CanControl, None), - } - - def _get_playlists_iface_properties(self): - return { - 'PlaylistCount': (self.get_PlaylistCount, None), - 'Orderings': (self.get_Orderings, None), - 'ActivePlaylist': (self.get_ActivePlaylist, None), - } - - def _connect_to_dbus(self): - logger.debug('Connecting to D-Bus...') - mainloop = dbus.mainloop.glib.DBusGMainLoop() - bus_name = dbus.service.BusName( - BUS_NAME, dbus.SessionBus(mainloop=mainloop)) - logger.info('MPRIS server connected to D-Bus') - return bus_name - - def get_playlist_id(self, playlist_uri): - # Only A-Za-z0-9_ is allowed, which is 63 chars, so we can't use - # base64. Luckily, D-Bus does not limit the length of object paths. - # Since base32 pads trailing bytes with "=" chars, we need to replace - # them with an allowed character such as "_". - encoded_uri = base64.b32encode(playlist_uri).replace('=', '_') - return '/com/mopidy/playlist/%s' % encoded_uri - - def get_playlist_uri(self, playlist_id): - encoded_uri = playlist_id.split('/')[-1].replace('_', '=') - return base64.b32decode(encoded_uri) - - def get_track_id(self, tl_track): - return '/com/mopidy/track/%d' % tl_track.tlid - - def get_track_tlid(self, track_id): - assert track_id.startswith('/com/mopidy/track/') - return track_id.split('/')[-1] - - ### Properties interface - - @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, - in_signature='ss', out_signature='v') - def Get(self, interface, prop): - logger.debug( - '%s.Get(%s, %s) called', - dbus.PROPERTIES_IFACE, repr(interface), repr(prop)) - (getter, _) = self.properties[interface][prop] - if callable(getter): - return getter() - else: - return getter - - @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, - in_signature='s', out_signature='a{sv}') - def GetAll(self, interface): - logger.debug( - '%s.GetAll(%s) called', dbus.PROPERTIES_IFACE, repr(interface)) - getters = {} - for key, (getter, _) in self.properties[interface].iteritems(): - getters[key] = getter() if callable(getter) else getter - return getters - - @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, - in_signature='ssv', out_signature='') - def Set(self, interface, prop, value): - logger.debug( - '%s.Set(%s, %s, %s) called', - dbus.PROPERTIES_IFACE, repr(interface), repr(prop), repr(value)) - _, setter = self.properties[interface][prop] - if setter is not None: - setter(value) - self.PropertiesChanged( - interface, {prop: self.Get(interface, prop)}, []) - - @dbus.service.signal(dbus_interface=dbus.PROPERTIES_IFACE, - signature='sa{sv}as') - def PropertiesChanged(self, interface, changed_properties, - invalidated_properties): - logger.debug( - '%s.PropertiesChanged(%s, %s, %s) signaled', - dbus.PROPERTIES_IFACE, interface, changed_properties, - invalidated_properties) - - ### Root interface methods - - @dbus.service.method(dbus_interface=ROOT_IFACE) - def Raise(self): - logger.debug('%s.Raise called', ROOT_IFACE) - # Do nothing, as we do not have a GUI - - @dbus.service.method(dbus_interface=ROOT_IFACE) - def Quit(self): - logger.debug('%s.Quit called', ROOT_IFACE) - exit_process() - - ### Root interface properties - - def get_DesktopEntry(self): - return os.path.splitext(os.path.basename( - self.config['mpris']['desktop_file']))[0] - - def get_SupportedUriSchemes(self): - return dbus.Array(self.core.uri_schemes.get(), signature='s') - - ### Player interface methods - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def Next(self): - logger.debug('%s.Next called', PLAYER_IFACE) - if not self.get_CanGoNext(): - logger.debug('%s.Next not allowed', PLAYER_IFACE) - return - self.core.playback.next().get() - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def Previous(self): - logger.debug('%s.Previous called', PLAYER_IFACE) - if not self.get_CanGoPrevious(): - logger.debug('%s.Previous not allowed', PLAYER_IFACE) - return - self.core.playback.previous().get() - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def Pause(self): - logger.debug('%s.Pause called', PLAYER_IFACE) - if not self.get_CanPause(): - logger.debug('%s.Pause not allowed', PLAYER_IFACE) - return - self.core.playback.pause().get() - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def PlayPause(self): - logger.debug('%s.PlayPause called', PLAYER_IFACE) - if not self.get_CanPause(): - logger.debug('%s.PlayPause not allowed', PLAYER_IFACE) - return - state = self.core.playback.state.get() - if state == PlaybackState.PLAYING: - self.core.playback.pause().get() - elif state == PlaybackState.PAUSED: - self.core.playback.resume().get() - elif state == PlaybackState.STOPPED: - self.core.playback.play().get() - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def Stop(self): - logger.debug('%s.Stop called', PLAYER_IFACE) - if not self.get_CanControl(): - logger.debug('%s.Stop not allowed', PLAYER_IFACE) - return - self.core.playback.stop().get() - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def Play(self): - logger.debug('%s.Play called', PLAYER_IFACE) - if not self.get_CanPlay(): - logger.debug('%s.Play not allowed', PLAYER_IFACE) - return - state = self.core.playback.state.get() - if state == PlaybackState.PAUSED: - self.core.playback.resume().get() - else: - self.core.playback.play().get() - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def Seek(self, offset): - logger.debug('%s.Seek called', PLAYER_IFACE) - if not self.get_CanSeek(): - logger.debug('%s.Seek not allowed', PLAYER_IFACE) - return - offset_in_milliseconds = offset // 1000 - current_position = self.core.playback.time_position.get() - new_position = current_position + offset_in_milliseconds - self.core.playback.seek(new_position) - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def SetPosition(self, track_id, position): - logger.debug('%s.SetPosition called', PLAYER_IFACE) - if not self.get_CanSeek(): - logger.debug('%s.SetPosition not allowed', PLAYER_IFACE) - return - position = position // 1000 - current_tl_track = self.core.playback.current_tl_track.get() - if current_tl_track is None: - return - if track_id != self.get_track_id(current_tl_track): - return - if position < 0: - return - if current_tl_track.track.length < position: - return - self.core.playback.seek(position) - - @dbus.service.method(dbus_interface=PLAYER_IFACE) - def OpenUri(self, uri): - logger.debug('%s.OpenUri called', PLAYER_IFACE) - if not self.get_CanPlay(): - # NOTE The spec does not explictly require this check, but guarding - # the other methods doesn't help much if OpenUri is open for use. - logger.debug('%s.Play not allowed', PLAYER_IFACE) - return - # NOTE Check if URI has MIME type known to the backend, if MIME support - # is added to the backend. - tl_tracks = self.core.tracklist.add(uri=uri).get() - if tl_tracks: - self.core.playback.play(tl_tracks[0]) - else: - logger.debug('Track with URI "%s" not found in library.', uri) - - ### Player interface signals - - @dbus.service.signal(dbus_interface=PLAYER_IFACE, signature='x') - def Seeked(self, position): - logger.debug('%s.Seeked signaled', PLAYER_IFACE) - # Do nothing, as just calling the method is enough to emit the signal. - - ### Player interface properties - - def get_PlaybackStatus(self): - state = self.core.playback.state.get() - if state == PlaybackState.PLAYING: - return 'Playing' - elif state == PlaybackState.PAUSED: - return 'Paused' - elif state == PlaybackState.STOPPED: - return 'Stopped' - - def get_LoopStatus(self): - repeat = self.core.playback.repeat.get() - single = self.core.playback.single.get() - if not repeat: - return 'None' - else: - if single: - return 'Track' - else: - return 'Playlist' - - def set_LoopStatus(self, value): - if not self.get_CanControl(): - logger.debug('Setting %s.LoopStatus not allowed', PLAYER_IFACE) - return - if value == 'None': - self.core.playback.repeat = False - self.core.playback.single = False - elif value == 'Track': - self.core.playback.repeat = True - self.core.playback.single = True - elif value == 'Playlist': - self.core.playback.repeat = True - self.core.playback.single = False - - def set_Rate(self, value): - if not self.get_CanControl(): - # NOTE The spec does not explictly require this check, but it was - # added to be consistent with all the other property setters. - logger.debug('Setting %s.Rate not allowed', PLAYER_IFACE) - return - if value == 0: - self.Pause() - - def get_Shuffle(self): - return self.core.playback.random.get() - - def set_Shuffle(self, value): - if not self.get_CanControl(): - logger.debug('Setting %s.Shuffle not allowed', PLAYER_IFACE) - return - if value: - self.core.playback.random = True - else: - self.core.playback.random = False - - def get_Metadata(self): - current_tl_track = self.core.playback.current_tl_track.get() - if current_tl_track is None: - return {'mpris:trackid': ''} - else: - (_, track) = current_tl_track - metadata = {'mpris:trackid': self.get_track_id(current_tl_track)} - if track.length: - metadata['mpris:length'] = track.length * 1000 - if track.uri: - metadata['xesam:url'] = track.uri - if track.name: - metadata['xesam:title'] = track.name - if track.artists: - artists = list(track.artists) - artists.sort(key=lambda a: a.name) - metadata['xesam:artist'] = dbus.Array( - [a.name for a in artists if a.name], signature='s') - if track.album and track.album.name: - metadata['xesam:album'] = track.album.name - if track.album and track.album.artists: - artists = list(track.album.artists) - artists.sort(key=lambda a: a.name) - metadata['xesam:albumArtist'] = dbus.Array( - [a.name for a in artists if a.name], signature='s') - if track.album and track.album.images: - url = list(track.album.images)[0] - if url: - metadata['mpris:artUrl'] = url - if track.disc_no: - metadata['xesam:discNumber'] = track.disc_no - if track.track_no: - metadata['xesam:trackNumber'] = track.track_no - return dbus.Dictionary(metadata, signature='sv') - - def get_Volume(self): - volume = self.core.playback.volume.get() - if volume is None: - return 0 - return volume / 100.0 - - def set_Volume(self, value): - if not self.get_CanControl(): - logger.debug('Setting %s.Volume not allowed', PLAYER_IFACE) - return - if value is None: - return - elif value < 0: - self.core.playback.volume = 0 - elif value > 1: - self.core.playback.volume = 100 - elif 0 <= value <= 1: - self.core.playback.volume = int(value * 100) - - def get_Position(self): - return self.core.playback.time_position.get() * 1000 - - def get_CanGoNext(self): - if not self.get_CanControl(): - return False - return ( - self.core.playback.tl_track_at_next.get() != - self.core.playback.current_tl_track.get()) - - def get_CanGoPrevious(self): - if not self.get_CanControl(): - return False - return ( - self.core.playback.tl_track_at_previous.get() != - self.core.playback.current_tl_track.get()) - - def get_CanPlay(self): - if not self.get_CanControl(): - return False - return ( - self.core.playback.current_tl_track.get() is not None or - self.core.playback.tl_track_at_next.get() is not None) - - def get_CanPause(self): - if not self.get_CanControl(): - return False - # NOTE Should be changed to vary based on capabilities of the current - # track if Mopidy starts supporting non-seekable media, like streams. - return True - - def get_CanSeek(self): - if not self.get_CanControl(): - return False - # NOTE Should be changed to vary based on capabilities of the current - # track if Mopidy starts supporting non-seekable media, like streams. - return True - - def get_CanControl(self): - # NOTE This could be a setting for the end user to change. - return True - - ### Playlists interface methods - - @dbus.service.method(dbus_interface=PLAYLISTS_IFACE) - def ActivatePlaylist(self, playlist_id): - logger.debug( - '%s.ActivatePlaylist(%r) called', PLAYLISTS_IFACE, playlist_id) - playlist_uri = self.get_playlist_uri(playlist_id) - playlist = self.core.playlists.lookup(playlist_uri).get() - if playlist and playlist.tracks: - tl_tracks = self.core.tracklist.add(playlist.tracks).get() - self.core.playback.play(tl_tracks[0]) - - @dbus.service.method(dbus_interface=PLAYLISTS_IFACE) - def GetPlaylists(self, index, max_count, order, reverse): - logger.debug( - '%s.GetPlaylists(%r, %r, %r, %r) called', - PLAYLISTS_IFACE, index, max_count, order, reverse) - playlists = self.core.playlists.playlists.get() - if order == 'Alphabetical': - playlists.sort(key=lambda p: p.name, reverse=reverse) - elif order == 'Modified': - playlists.sort(key=lambda p: p.last_modified, reverse=reverse) - elif order == 'User' and reverse: - playlists.reverse() - slice_end = index + max_count - playlists = playlists[index:slice_end] - results = [ - (self.get_playlist_id(p.uri), p.name, '') - for p in playlists] - return dbus.Array(results, signature='(oss)') - - ### Playlists interface signals - - @dbus.service.signal(dbus_interface=PLAYLISTS_IFACE, signature='(oss)') - def PlaylistChanged(self, playlist): - logger.debug('%s.PlaylistChanged signaled', PLAYLISTS_IFACE) - # Do nothing, as just calling the method is enough to emit the signal. - - ### Playlists interface properties - - def get_PlaylistCount(self): - return len(self.core.playlists.playlists.get()) - - def get_Orderings(self): - return [ - 'Alphabetical', # Order by playlist.name - 'Modified', # Order by playlist.last_modified - 'User', # Don't change order - ] - - def get_ActivePlaylist(self): - playlist_is_valid = False - playlist = ('/', 'None', '') - return (playlist_is_valid, playlist) diff --git a/setup.py b/setup.py index 7cfb7409..ff6d49de 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,6 @@ setup( 'http = mopidy.frontends.http:Extension [http]', 'local = mopidy.backends.local:Extension', 'mpd = mopidy.frontends.mpd:Extension', - 'mpris = mopidy.frontends.mpris:Extension', 'spotify = mopidy.backends.spotify:Extension [spotify]', 'stream = mopidy.backends.stream:Extension', ], diff --git a/tests/frontends/mpris/__init__.py b/tests/frontends/mpris/__init__.py deleted file mode 100644 index baffc488..00000000 --- a/tests/frontends/mpris/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from __future__ import unicode_literals diff --git a/tests/frontends/mpris/events_test.py b/tests/frontends/mpris/events_test.py deleted file mode 100644 index 0a4bc79f..00000000 --- a/tests/frontends/mpris/events_test.py +++ /dev/null @@ -1,92 +0,0 @@ -from __future__ import unicode_literals - -import mock -import unittest - -try: - import dbus -except ImportError: - dbus = False - -from mopidy.models import Playlist, TlTrack - -if dbus: - from mopidy.frontends.mpris import actor, objects - - -@unittest.skipUnless(dbus, 'dbus not found') -class BackendEventsTest(unittest.TestCase): - def setUp(self): - # As a plain class, not an actor: - self.mpris_frontend = actor.MprisFrontend(config=None, core=None) - self.mpris_object = mock.Mock(spec=objects.MprisObject) - self.mpris_frontend.mpris_object = self.mpris_object - - def test_track_playback_paused_event_changes_playback_status(self): - self.mpris_object.Get.return_value = 'Paused' - self.mpris_frontend.track_playback_paused(TlTrack(), 0) - self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), - ]) - self.mpris_object.PropertiesChanged.assert_called_with( - objects.PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, []) - - def test_track_playback_resumed_event_changes_playback_status(self): - self.mpris_object.Get.return_value = 'Playing' - self.mpris_frontend.track_playback_resumed(TlTrack(), 0) - self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), - ]) - self.mpris_object.PropertiesChanged.assert_called_with( - objects.PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, []) - - def test_track_playback_started_changes_playback_status_and_metadata(self): - self.mpris_object.Get.return_value = '...' - self.mpris_frontend.track_playback_started(TlTrack()) - self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), - ((objects.PLAYER_IFACE, 'Metadata'), {}), - ]) - self.mpris_object.PropertiesChanged.assert_called_with( - objects.PLAYER_IFACE, - {'Metadata': '...', 'PlaybackStatus': '...'}, []) - - def test_track_playback_ended_changes_playback_status_and_metadata(self): - self.mpris_object.Get.return_value = '...' - self.mpris_frontend.track_playback_ended(TlTrack(), 0) - self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}), - ((objects.PLAYER_IFACE, 'Metadata'), {}), - ]) - self.mpris_object.PropertiesChanged.assert_called_with( - objects.PLAYER_IFACE, - {'Metadata': '...', 'PlaybackStatus': '...'}, []) - - def test_volume_changed_event_changes_volume(self): - self.mpris_object.Get.return_value = 1.0 - self.mpris_frontend.volume_changed(volume=100) - self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((objects.PLAYER_IFACE, 'Volume'), {}), - ]) - self.mpris_object.PropertiesChanged.assert_called_with( - objects.PLAYER_IFACE, {'Volume': 1.0}, []) - - def test_seeked_event_causes_mpris_seeked_event(self): - self.mpris_frontend.seeked(31000) - self.mpris_object.Seeked.assert_called_with(31000000) - - def test_playlists_loaded_event_changes_playlist_count(self): - self.mpris_object.Get.return_value = 17 - self.mpris_frontend.playlists_loaded() - self.assertListEqual(self.mpris_object.Get.call_args_list, [ - ((objects.PLAYLISTS_IFACE, 'PlaylistCount'), {}), - ]) - self.mpris_object.PropertiesChanged.assert_called_with( - objects.PLAYLISTS_IFACE, {'PlaylistCount': 17}, []) - - def test_playlist_changed_event_causes_mpris_playlist_changed_event(self): - self.mpris_object.get_playlist_id.return_value = 'id-for-dummy:foo' - playlist = Playlist(uri='dummy:foo', name='foo') - self.mpris_frontend.playlist_changed(playlist) - self.mpris_object.PlaylistChanged.assert_called_with( - ('id-for-dummy:foo', 'foo', '')) diff --git a/tests/frontends/mpris/player_interface_test.py b/tests/frontends/mpris/player_interface_test.py deleted file mode 100644 index 52cd964b..00000000 --- a/tests/frontends/mpris/player_interface_test.py +++ /dev/null @@ -1,869 +0,0 @@ -from __future__ import unicode_literals - -import mock -import unittest - -import pykka - -try: - import dbus -except ImportError: - dbus = False - -from mopidy import core -from mopidy.backends import dummy -from mopidy.core import PlaybackState -from mopidy.models import Album, Artist, Track - -if dbus: - from mopidy.frontends.mpris import objects - -PLAYING = PlaybackState.PLAYING -PAUSED = PlaybackState.PAUSED -STOPPED = PlaybackState.STOPPED - - -@unittest.skipUnless(dbus, 'dbus not found') -class PlayerInterfaceTest(unittest.TestCase): - def setUp(self): - objects.MprisObject._connect_to_dbus = mock.Mock() - self.backend = dummy.create_dummy_backend_proxy() - self.core = core.Core.start(backends=[self.backend]).proxy() - self.mpris = objects.MprisObject(config={}, core=self.core) - - def tearDown(self): - pykka.ActorRegistry.stop_all() - - def test_get_playback_status_is_playing_when_playing(self): - self.core.playback.state = PLAYING - result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus') - self.assertEqual('Playing', result) - - def test_get_playback_status_is_paused_when_paused(self): - self.core.playback.state = PAUSED - result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus') - self.assertEqual('Paused', result) - - def test_get_playback_status_is_stopped_when_stopped(self): - self.core.playback.state = STOPPED - result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus') - self.assertEqual('Stopped', result) - - def test_get_loop_status_is_none_when_not_looping(self): - self.core.playback.repeat = False - self.core.playback.single = False - result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus') - self.assertEqual('None', result) - - def test_get_loop_status_is_track_when_looping_a_single_track(self): - self.core.playback.repeat = True - self.core.playback.single = True - result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus') - self.assertEqual('Track', result) - - def test_get_loop_status_is_playlist_when_looping_tracklist(self): - self.core.playback.repeat = True - self.core.playback.single = False - result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus') - self.assertEqual('Playlist', result) - - def test_set_loop_status_is_ignored_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - self.core.playback.repeat = True - self.core.playback.single = True - self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None') - self.assertEqual(self.core.playback.repeat.get(), True) - self.assertEqual(self.core.playback.single.get(), True) - - def test_set_loop_status_to_none_unsets_repeat_and_single(self): - self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None') - self.assertEqual(self.core.playback.repeat.get(), False) - self.assertEqual(self.core.playback.single.get(), False) - - def test_set_loop_status_to_track_sets_repeat_and_single(self): - self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Track') - self.assertEqual(self.core.playback.repeat.get(), True) - self.assertEqual(self.core.playback.single.get(), True) - - def test_set_loop_status_to_playlists_sets_repeat_and_not_single(self): - self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Playlist') - self.assertEqual(self.core.playback.repeat.get(), True) - self.assertEqual(self.core.playback.single.get(), False) - - def test_get_rate_is_greater_or_equal_than_minimum_rate(self): - rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate') - minimum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate') - self.assertGreaterEqual(rate, minimum_rate) - - def test_get_rate_is_less_or_equal_than_maximum_rate(self): - rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate') - maximum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate') - self.assertGreaterEqual(rate, maximum_rate) - - def test_set_rate_is_ignored_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0) - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_set_rate_to_zero_pauses_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0) - self.assertEqual(self.core.playback.state.get(), PAUSED) - - def test_get_shuffle_returns_true_if_random_is_active(self): - self.core.playback.random = True - result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle') - self.assertTrue(result) - - def test_get_shuffle_returns_false_if_random_is_inactive(self): - self.core.playback.random = False - result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle') - self.assertFalse(result) - - def test_set_shuffle_is_ignored_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - self.core.playback.random = False - self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True) - self.assertFalse(self.core.playback.random.get()) - - def test_set_shuffle_to_true_activates_random_mode(self): - self.core.playback.random = False - self.assertFalse(self.core.playback.random.get()) - self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True) - self.assertTrue(self.core.playback.random.get()) - - def test_set_shuffle_to_false_deactivates_random_mode(self): - self.core.playback.random = True - self.assertTrue(self.core.playback.random.get()) - self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', False) - self.assertFalse(self.core.playback.random.get()) - - def test_get_metadata_has_trackid_even_when_no_current_track(self): - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('mpris:trackid', result.keys()) - self.assertEqual(result['mpris:trackid'], '') - - def test_get_metadata_has_trackid_based_on_tlid(self): - self.core.tracklist.add([Track(uri='dummy:a')]) - self.core.playback.play() - (tlid, track) = self.core.playback.current_tl_track.get() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('mpris:trackid', result.keys()) - self.assertEqual( - result['mpris:trackid'], '/com/mopidy/track/%d' % tlid) - - def test_get_metadata_has_track_length(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('mpris:length', result.keys()) - self.assertEqual(result['mpris:length'], 40000000) - - def test_get_metadata_has_track_uri(self): - self.core.tracklist.add([Track(uri='dummy:a')]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('xesam:url', result.keys()) - self.assertEqual(result['xesam:url'], 'dummy:a') - - def test_get_metadata_has_track_title(self): - self.core.tracklist.add([Track(name='a')]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('xesam:title', result.keys()) - self.assertEqual(result['xesam:title'], 'a') - - def test_get_metadata_has_track_artists(self): - self.core.tracklist.add([Track(artists=[ - Artist(name='a'), Artist(name='b'), Artist(name=None)])]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('xesam:artist', result.keys()) - self.assertEqual(result['xesam:artist'], ['a', 'b']) - - def test_get_metadata_has_track_album(self): - self.core.tracklist.add([Track(album=Album(name='a'))]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('xesam:album', result.keys()) - self.assertEqual(result['xesam:album'], 'a') - - def test_get_metadata_has_track_album_artists(self): - self.core.tracklist.add([Track(album=Album(artists=[ - Artist(name='a'), Artist(name='b'), Artist(name=None)]))]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('xesam:albumArtist', result.keys()) - self.assertEqual(result['xesam:albumArtist'], ['a', 'b']) - - def test_get_metadata_use_first_album_image_as_art_url(self): - # XXX Currently, the album image order isn't preserved because they - # are stored as a frozenset(). We pick the first in the set, which is - # sorted alphabetically, thus we get 'bar.jpg', not 'foo.jpg', which - # would probably make more sense. - self.core.tracklist.add([Track(album=Album(images=[ - 'http://example.com/foo.jpg', 'http://example.com/bar.jpg']))]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('mpris:artUrl', result.keys()) - self.assertEqual(result['mpris:artUrl'], 'http://example.com/bar.jpg') - - def test_get_metadata_has_no_art_url_if_no_album(self): - self.core.tracklist.add([Track()]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertNotIn('mpris:artUrl', result.keys()) - - def test_get_metadata_has_no_art_url_if_no_album_images(self): - self.core.tracklist.add([Track(Album(images=[]))]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertNotIn('mpris:artUrl', result.keys()) - - def test_get_metadata_has_disc_number_in_album(self): - self.core.tracklist.add([Track(disc_no=2)]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('xesam:discNumber', result.keys()) - self.assertEqual(result['xesam:discNumber'], 2) - - def test_get_metadata_has_track_number_in_album(self): - self.core.tracklist.add([Track(track_no=7)]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata') - self.assertIn('xesam:trackNumber', result.keys()) - self.assertEqual(result['xesam:trackNumber'], 7) - - def test_get_volume_should_return_volume_between_zero_and_one(self): - self.core.playback.volume = None - result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume') - self.assertEqual(result, 0) - - self.core.playback.volume = 0 - result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume') - self.assertEqual(result, 0) - - self.core.playback.volume = 50 - result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume') - self.assertEqual(result, 0.5) - - self.core.playback.volume = 100 - result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume') - self.assertEqual(result, 1) - - def test_set_volume_is_ignored_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - self.core.playback.volume = 0 - self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0) - self.assertEqual(self.core.playback.volume.get(), 0) - - def test_set_volume_to_one_should_set_mixer_volume_to_100(self): - self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0) - self.assertEqual(self.core.playback.volume.get(), 100) - - def test_set_volume_to_anything_above_one_sets_mixer_volume_to_100(self): - self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 2.0) - self.assertEqual(self.core.playback.volume.get(), 100) - - def test_set_volume_to_anything_not_a_number_does_not_change_volume(self): - self.core.playback.volume = 10 - self.mpris.Set(objects.PLAYER_IFACE, 'Volume', None) - self.assertEqual(self.core.playback.volume.get(), 10) - - def test_get_position_returns_time_position_in_microseconds(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - self.core.playback.seek(10000) - result_in_microseconds = self.mpris.Get( - objects.PLAYER_IFACE, 'Position') - result_in_milliseconds = result_in_microseconds // 1000 - self.assertGreaterEqual(result_in_milliseconds, 10000) - - def test_get_position_when_no_current_track_should_be_zero(self): - result_in_microseconds = self.mpris.Get( - objects.PLAYER_IFACE, 'Position') - result_in_milliseconds = result_in_microseconds // 1000 - self.assertEqual(result_in_milliseconds, 0) - - def test_get_minimum_rate_is_one_or_less(self): - result = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate') - self.assertLessEqual(result, 1.0) - - def test_get_maximum_rate_is_one_or_more(self): - result = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate') - self.assertGreaterEqual(result, 1.0) - - def test_can_go_next_is_true_if_can_control_and_other_next_track(self): - self.mpris.get_CanControl = lambda *_: True - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext') - self.assertTrue(result) - - def test_can_go_next_is_false_if_next_track_is_the_same(self): - self.mpris.get_CanControl = lambda *_: True - self.core.tracklist.add([Track(uri='dummy:a')]) - self.core.playback.repeat = True - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext') - self.assertFalse(result) - - def test_can_go_next_is_false_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext') - self.assertFalse(result) - - def test_can_go_previous_is_true_if_can_control_and_previous_track(self): - self.mpris.get_CanControl = lambda *_: True - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.next() - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious') - self.assertTrue(result) - - def test_can_go_previous_is_false_if_previous_track_is_the_same(self): - self.mpris.get_CanControl = lambda *_: True - self.core.tracklist.add([Track(uri='dummy:a')]) - self.core.playback.repeat = True - self.core.playback.play() - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious') - self.assertFalse(result) - - def test_can_go_previous_is_false_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.next() - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious') - self.assertFalse(result) - - def test_can_play_is_true_if_can_control_and_current_track(self): - self.mpris.get_CanControl = lambda *_: True - self.core.tracklist.add([Track(uri='dummy:a')]) - self.core.playback.play() - self.assertTrue(self.core.playback.current_track.get()) - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay') - self.assertTrue(result) - - def test_can_play_is_false_if_no_current_track(self): - self.mpris.get_CanControl = lambda *_: True - self.assertFalse(self.core.playback.current_track.get()) - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay') - self.assertFalse(result) - - def test_can_play_if_false_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay') - self.assertFalse(result) - - def test_can_pause_is_true_if_can_control_and_track_can_be_paused(self): - self.mpris.get_CanControl = lambda *_: True - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause') - self.assertTrue(result) - - def test_can_pause_if_false_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause') - self.assertFalse(result) - - def test_can_seek_is_true_if_can_control_is_true(self): - self.mpris.get_CanControl = lambda *_: True - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek') - self.assertTrue(result) - - def test_can_seek_is_false_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek') - self.assertFalse(result) - - def test_can_control_is_true(self): - result = self.mpris.Get(objects.PLAYER_IFACE, 'CanControl') - self.assertTrue(result) - - def test_next_is_ignored_if_can_go_next_is_false(self): - self.mpris.get_CanGoNext = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.mpris.Next() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - def test_next_when_playing_skips_to_next_track_and_keep_playing(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Next() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_next_when_at_end_of_list_should_stop_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.next() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Next() - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_next_when_paused_should_skip_to_next_track_and_stay_paused(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.pause() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.assertEqual(self.core.playback.state.get(), PAUSED) - self.mpris.Next() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.assertEqual(self.core.playback.state.get(), PAUSED) - - def test_next_when_stopped_skips_to_next_track_and_stay_stopped(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.stop() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.assertEqual(self.core.playback.state.get(), STOPPED) - self.mpris.Next() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_previous_is_ignored_if_can_go_previous_is_false(self): - self.mpris.get_CanGoPrevious = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.next() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.mpris.Previous() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - - def test_previous_when_playing_skips_to_prev_track_and_keep_playing(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.next() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Previous() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_previous_when_at_start_of_list_should_stop_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Previous() - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_previous_when_paused_skips_to_previous_track_and_pause(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.next() - self.core.playback.pause() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.assertEqual(self.core.playback.state.get(), PAUSED) - self.mpris.Previous() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.assertEqual(self.core.playback.state.get(), PAUSED) - - def test_previous_when_stopped_skips_to_previous_track_and_stops(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.next() - self.core.playback.stop() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - self.assertEqual(self.core.playback.state.get(), STOPPED) - self.mpris.Previous() - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_pause_is_ignored_if_can_pause_is_false(self): - self.mpris.get_CanPause = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Pause() - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_pause_when_playing_should_pause_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Pause() - self.assertEqual(self.core.playback.state.get(), PAUSED) - - def test_pause_when_paused_has_no_effect(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.pause() - self.assertEqual(self.core.playback.state.get(), PAUSED) - self.mpris.Pause() - self.assertEqual(self.core.playback.state.get(), PAUSED) - - def test_playpause_is_ignored_if_can_pause_is_false(self): - self.mpris.get_CanPause = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.PlayPause() - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_playpause_when_playing_should_pause_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.PlayPause() - self.assertEqual(self.core.playback.state.get(), PAUSED) - - def test_playpause_when_paused_should_resume_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.pause() - - self.assertEqual(self.core.playback.state.get(), PAUSED) - at_pause = self.core.playback.time_position.get() - self.assertGreaterEqual(at_pause, 0) - - self.mpris.PlayPause() - - self.assertEqual(self.core.playback.state.get(), PLAYING) - after_pause = self.core.playback.time_position.get() - self.assertGreaterEqual(after_pause, at_pause) - - def test_playpause_when_stopped_should_start_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.assertEqual(self.core.playback.state.get(), STOPPED) - self.mpris.PlayPause() - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_stop_is_ignored_if_can_control_is_false(self): - self.mpris.get_CanControl = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Stop() - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_stop_when_playing_should_stop_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.mpris.Stop() - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_stop_when_paused_should_stop_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.pause() - self.assertEqual(self.core.playback.state.get(), PAUSED) - self.mpris.Stop() - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_play_is_ignored_if_can_play_is_false(self): - self.mpris.get_CanPlay = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.assertEqual(self.core.playback.state.get(), STOPPED) - self.mpris.Play() - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_play_when_stopped_starts_playback(self): - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.assertEqual(self.core.playback.state.get(), STOPPED) - self.mpris.Play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - - def test_play_after_pause_resumes_from_same_position(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - - before_pause = self.core.playback.time_position.get() - self.assertGreaterEqual(before_pause, 0) - - self.mpris.Pause() - self.assertEqual(self.core.playback.state.get(), PAUSED) - at_pause = self.core.playback.time_position.get() - self.assertGreaterEqual(at_pause, before_pause) - - self.mpris.Play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - after_pause = self.core.playback.time_position.get() - self.assertGreaterEqual(after_pause, at_pause) - - def test_play_when_there_is_no_track_has_no_effect(self): - self.core.tracklist.clear() - self.assertEqual(self.core.playback.state.get(), STOPPED) - self.mpris.Play() - self.assertEqual(self.core.playback.state.get(), STOPPED) - - def test_seek_is_ignored_if_can_seek_is_false(self): - self.mpris.get_CanSeek = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - - before_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(before_seek, 0) - - milliseconds_to_seek = 10000 - microseconds_to_seek = milliseconds_to_seek * 1000 - - self.mpris.Seek(microseconds_to_seek) - - after_seek = self.core.playback.time_position.get() - self.assertLessEqual(before_seek, after_seek) - self.assertLess(after_seek, before_seek + milliseconds_to_seek) - - def test_seek_seeks_given_microseconds_forward_in_the_current_track(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - - before_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(before_seek, 0) - - milliseconds_to_seek = 10000 - microseconds_to_seek = milliseconds_to_seek * 1000 - - self.mpris.Seek(microseconds_to_seek) - - self.assertEqual(self.core.playback.state.get(), PLAYING) - - after_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek) - - def test_seek_seeks_given_microseconds_backward_if_negative(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - self.core.playback.seek(20000) - - before_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(before_seek, 20000) - - milliseconds_to_seek = -10000 - microseconds_to_seek = milliseconds_to_seek * 1000 - - self.mpris.Seek(microseconds_to_seek) - - self.assertEqual(self.core.playback.state.get(), PLAYING) - - after_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek) - self.assertLess(after_seek, before_seek) - - def test_seek_seeks_to_start_of_track_if_new_position_is_negative(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - self.core.playback.seek(20000) - - before_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(before_seek, 20000) - - milliseconds_to_seek = -30000 - microseconds_to_seek = milliseconds_to_seek * 1000 - - self.mpris.Seek(microseconds_to_seek) - - self.assertEqual(self.core.playback.state.get(), PLAYING) - - after_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek) - self.assertLess(after_seek, before_seek) - self.assertGreaterEqual(after_seek, 0) - - def test_seek_skips_to_next_track_if_new_position_gt_track_length(self): - self.core.tracklist.add([ - Track(uri='dummy:a', length=40000), - Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.seek(20000) - - before_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(before_seek, 20000) - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - milliseconds_to_seek = 50000 - microseconds_to_seek = milliseconds_to_seek * 1000 - - self.mpris.Seek(microseconds_to_seek) - - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b') - - after_seek = self.core.playback.time_position.get() - self.assertGreaterEqual(after_seek, 0) - self.assertLess(after_seek, before_seek) - - def test_set_position_is_ignored_if_can_seek_is_false(self): - self.mpris.get_CanSeek = lambda *_: False - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - - before_set_position = self.core.playback.time_position.get() - self.assertLessEqual(before_set_position, 5000) - - track_id = 'a' - - position_to_set_in_millisec = 20000 - position_to_set_in_microsec = position_to_set_in_millisec * 1000 - - self.mpris.SetPosition(track_id, position_to_set_in_microsec) - - after_set_position = self.core.playback.time_position.get() - self.assertLessEqual(before_set_position, after_set_position) - self.assertLess(after_set_position, position_to_set_in_millisec) - - def test_set_position_sets_the_current_track_position_in_microsecs(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - - before_set_position = self.core.playback.time_position.get() - self.assertLessEqual(before_set_position, 5000) - self.assertEqual(self.core.playback.state.get(), PLAYING) - - track_id = '/com/mopidy/track/0' - - position_to_set_in_millisec = 20000 - position_to_set_in_microsec = position_to_set_in_millisec * 1000 - - self.mpris.SetPosition(track_id, position_to_set_in_microsec) - - self.assertEqual(self.core.playback.state.get(), PLAYING) - - after_set_position = self.core.playback.time_position.get() - self.assertGreaterEqual( - after_set_position, position_to_set_in_millisec) - - def test_set_position_does_nothing_if_the_position_is_negative(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - self.core.playback.seek(20000) - - before_set_position = self.core.playback.time_position.get() - self.assertGreaterEqual(before_set_position, 20000) - self.assertLessEqual(before_set_position, 25000) - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - track_id = '/com/mopidy/track/0' - - position_to_set_in_millisec = -1000 - position_to_set_in_microsec = position_to_set_in_millisec * 1000 - - self.mpris.SetPosition(track_id, position_to_set_in_microsec) - - after_set_position = self.core.playback.time_position.get() - self.assertGreaterEqual(after_set_position, before_set_position) - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - def test_set_position_does_nothing_if_position_is_gt_track_length(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - self.core.playback.seek(20000) - - before_set_position = self.core.playback.time_position.get() - self.assertGreaterEqual(before_set_position, 20000) - self.assertLessEqual(before_set_position, 25000) - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - track_id = 'a' - - position_to_set_in_millisec = 50000 - position_to_set_in_microsec = position_to_set_in_millisec * 1000 - - self.mpris.SetPosition(track_id, position_to_set_in_microsec) - - after_set_position = self.core.playback.time_position.get() - self.assertGreaterEqual(after_set_position, before_set_position) - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - def test_set_position_is_noop_if_track_id_isnt_current_track(self): - self.core.tracklist.add([Track(uri='dummy:a', length=40000)]) - self.core.playback.play() - self.core.playback.seek(20000) - - before_set_position = self.core.playback.time_position.get() - self.assertGreaterEqual(before_set_position, 20000) - self.assertLessEqual(before_set_position, 25000) - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - track_id = 'b' - - position_to_set_in_millisec = 0 - position_to_set_in_microsec = position_to_set_in_millisec * 1000 - - self.mpris.SetPosition(track_id, position_to_set_in_microsec) - - after_set_position = self.core.playback.time_position.get() - self.assertGreaterEqual(after_set_position, before_set_position) - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - def test_open_uri_is_ignored_if_can_play_is_false(self): - self.mpris.get_CanPlay = lambda *_: False - self.backend.library.dummy_library = [ - Track(uri='dummy:/test/uri')] - self.mpris.OpenUri('dummy:/test/uri') - self.assertEqual(len(self.core.tracklist.tracks.get()), 0) - - def test_open_uri_ignores_uris_with_unknown_uri_scheme(self): - self.assertListEqual(self.core.uri_schemes.get(), ['dummy']) - self.mpris.get_CanPlay = lambda *_: True - self.backend.library.dummy_library = [Track(uri='notdummy:/test/uri')] - self.mpris.OpenUri('notdummy:/test/uri') - self.assertEqual(len(self.core.tracklist.tracks.get()), 0) - - def test_open_uri_adds_uri_to_tracklist(self): - self.mpris.get_CanPlay = lambda *_: True - self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')] - self.mpris.OpenUri('dummy:/test/uri') - self.assertEqual( - self.core.tracklist.tracks.get()[0].uri, 'dummy:/test/uri') - - def test_open_uri_starts_playback_of_new_track_if_stopped(self): - self.mpris.get_CanPlay = lambda *_: True - self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')] - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.assertEqual(self.core.playback.state.get(), STOPPED) - - self.mpris.OpenUri('dummy:/test/uri') - - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual( - self.core.playback.current_track.get().uri, 'dummy:/test/uri') - - def test_open_uri_starts_playback_of_new_track_if_paused(self): - self.mpris.get_CanPlay = lambda *_: True - self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')] - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.core.playback.pause() - self.assertEqual(self.core.playback.state.get(), PAUSED) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - self.mpris.OpenUri('dummy:/test/uri') - - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual( - self.core.playback.current_track.get().uri, 'dummy:/test/uri') - - def test_open_uri_starts_playback_of_new_track_if_playing(self): - self.mpris.get_CanPlay = lambda *_: True - self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')] - self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')]) - self.core.playback.play() - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a') - - self.mpris.OpenUri('dummy:/test/uri') - - self.assertEqual(self.core.playback.state.get(), PLAYING) - self.assertEqual( - self.core.playback.current_track.get().uri, 'dummy:/test/uri') diff --git a/tests/frontends/mpris/playlists_interface_test.py b/tests/frontends/mpris/playlists_interface_test.py deleted file mode 100644 index f8e2cf3e..00000000 --- a/tests/frontends/mpris/playlists_interface_test.py +++ /dev/null @@ -1,172 +0,0 @@ -from __future__ import unicode_literals - -import datetime -import mock -import unittest - -import pykka - -try: - import dbus -except ImportError: - dbus = False - -from mopidy import core -from mopidy.audio import PlaybackState -from mopidy.backends import dummy -from mopidy.models import Track - -if dbus: - from mopidy.frontends.mpris import objects - - -@unittest.skipUnless(dbus, 'dbus not found') -class PlayerInterfaceTest(unittest.TestCase): - def setUp(self): - objects.MprisObject._connect_to_dbus = mock.Mock() - self.backend = dummy.create_dummy_backend_proxy() - self.core = core.Core.start(backends=[self.backend]).proxy() - self.mpris = objects.MprisObject(config={}, core=self.core) - - foo = self.core.playlists.create('foo').get() - foo = foo.copy(last_modified=datetime.datetime(2012, 3, 1, 6, 0, 0)) - foo = self.core.playlists.save(foo).get() - - bar = self.core.playlists.create('bar').get() - bar = bar.copy(last_modified=datetime.datetime(2012, 2, 1, 6, 0, 0)) - bar = self.core.playlists.save(bar).get() - - baz = self.core.playlists.create('baz').get() - baz = baz.copy(last_modified=datetime.datetime(2012, 1, 1, 6, 0, 0)) - baz = self.core.playlists.save(baz).get() - self.playlist = baz - - def tearDown(self): - pykka.ActorRegistry.stop_all() - - def test_activate_playlist_appends_tracks_to_tracklist(self): - self.core.tracklist.add([ - Track(uri='dummy:old-a'), - Track(uri='dummy:old-b'), - ]) - self.playlist = self.playlist.copy(tracks=[ - Track(uri='dummy:baz-a'), - Track(uri='dummy:baz-b'), - Track(uri='dummy:baz-c'), - ]) - self.playlist = self.core.playlists.save(self.playlist).get() - - self.assertEqual(2, self.core.tracklist.length.get()) - - playlists = self.mpris.GetPlaylists(0, 100, 'User', False) - playlist_id = playlists[2][0] - self.mpris.ActivatePlaylist(playlist_id) - - self.assertEqual(5, self.core.tracklist.length.get()) - self.assertEqual( - PlaybackState.PLAYING, self.core.playback.state.get()) - self.assertEqual( - self.playlist.tracks[0], self.core.playback.current_track.get()) - - def test_activate_empty_playlist_is_harmless(self): - self.assertEqual(0, self.core.tracklist.length.get()) - - playlists = self.mpris.GetPlaylists(0, 100, 'User', False) - playlist_id = playlists[2][0] - self.mpris.ActivatePlaylist(playlist_id) - - self.assertEqual(0, self.core.tracklist.length.get()) - self.assertEqual( - PlaybackState.STOPPED, self.core.playback.state.get()) - self.assertIsNone(self.core.playback.current_track.get()) - - def test_get_playlists_in_alphabetical_order(self): - result = self.mpris.GetPlaylists(0, 100, 'Alphabetical', False) - - self.assertEqual(3, len(result)) - - self.assertEqual('/com/mopidy/playlist/MR2W23LZHJRGC4Q_', result[0][0]) - self.assertEqual('bar', result[0][1]) - - self.assertEqual('/com/mopidy/playlist/MR2W23LZHJRGC6Q_', result[1][0]) - self.assertEqual('baz', result[1][1]) - - self.assertEqual('/com/mopidy/playlist/MR2W23LZHJTG63Y_', result[2][0]) - self.assertEqual('foo', result[2][1]) - - def test_get_playlists_in_reverse_alphabetical_order(self): - result = self.mpris.GetPlaylists(0, 100, 'Alphabetical', True) - - self.assertEqual(3, len(result)) - self.assertEqual('foo', result[0][1]) - self.assertEqual('baz', result[1][1]) - self.assertEqual('bar', result[2][1]) - - def test_get_playlists_in_modified_order(self): - result = self.mpris.GetPlaylists(0, 100, 'Modified', False) - - self.assertEqual(3, len(result)) - self.assertEqual('baz', result[0][1]) - self.assertEqual('bar', result[1][1]) - self.assertEqual('foo', result[2][1]) - - def test_get_playlists_in_reverse_modified_order(self): - result = self.mpris.GetPlaylists(0, 100, 'Modified', True) - - self.assertEqual(3, len(result)) - self.assertEqual('foo', result[0][1]) - self.assertEqual('bar', result[1][1]) - self.assertEqual('baz', result[2][1]) - - def test_get_playlists_in_user_order(self): - result = self.mpris.GetPlaylists(0, 100, 'User', False) - - self.assertEqual(3, len(result)) - self.assertEqual('foo', result[0][1]) - self.assertEqual('bar', result[1][1]) - self.assertEqual('baz', result[2][1]) - - def test_get_playlists_in_reverse_user_order(self): - result = self.mpris.GetPlaylists(0, 100, 'User', True) - - self.assertEqual(3, len(result)) - self.assertEqual('baz', result[0][1]) - self.assertEqual('bar', result[1][1]) - self.assertEqual('foo', result[2][1]) - - def test_get_playlists_slice_on_start_of_list(self): - result = self.mpris.GetPlaylists(0, 2, 'User', False) - - self.assertEqual(2, len(result)) - self.assertEqual('foo', result[0][1]) - self.assertEqual('bar', result[1][1]) - - def test_get_playlists_slice_later_in_list(self): - result = self.mpris.GetPlaylists(2, 2, 'User', False) - - self.assertEqual(1, len(result)) - self.assertEqual('baz', result[0][1]) - - def test_get_playlist_count_returns_number_of_playlists(self): - result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'PlaylistCount') - - self.assertEqual(3, result) - - def test_get_orderings_includes_alpha_modified_and_user(self): - result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'Orderings') - - self.assertIn('Alphabetical', result) - self.assertNotIn('Created', result) - self.assertIn('Modified', result) - self.assertNotIn('Played', result) - self.assertIn('User', result) - - def test_get_active_playlist_does_not_return_a_playlist(self): - result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'ActivePlaylist') - valid, playlist = result - playlist_id, playlist_name, playlist_icon_uri = playlist - - self.assertEqual(False, valid) - self.assertEqual('/', playlist_id) - self.assertEqual('None', playlist_name) - self.assertEqual('', playlist_icon_uri) diff --git a/tests/frontends/mpris/root_interface_test.py b/tests/frontends/mpris/root_interface_test.py deleted file mode 100644 index f95f0969..00000000 --- a/tests/frontends/mpris/root_interface_test.py +++ /dev/null @@ -1,87 +0,0 @@ -from __future__ import unicode_literals - -import mock -import unittest - -import pykka - -try: - import dbus -except ImportError: - dbus = False - -from mopidy import core -from mopidy.backends import dummy - -if dbus: - from mopidy.frontends.mpris import objects - - -@unittest.skipUnless(dbus, 'dbus not found') -class RootInterfaceTest(unittest.TestCase): - def setUp(self): - config = { - 'mpris': { - 'desktop_file': '/tmp/foo.desktop', - } - } - - objects.exit_process = mock.Mock() - objects.MprisObject._connect_to_dbus = mock.Mock() - self.backend = dummy.create_dummy_backend_proxy() - self.core = core.Core.start(backends=[self.backend]).proxy() - self.mpris = objects.MprisObject(config=config, core=self.core) - - def tearDown(self): - pykka.ActorRegistry.stop_all() - - def test_constructor_connects_to_dbus(self): - self.assert_(self.mpris._connect_to_dbus.called) - - def test_fullscreen_returns_false(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'Fullscreen') - self.assertFalse(result) - - def test_setting_fullscreen_fails_and_returns_none(self): - result = self.mpris.Set(objects.ROOT_IFACE, 'Fullscreen', 'True') - self.assertIsNone(result) - - def test_can_set_fullscreen_returns_false(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'CanSetFullscreen') - self.assertFalse(result) - - def test_can_raise_returns_false(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'CanRaise') - self.assertFalse(result) - - def test_raise_does_nothing(self): - self.mpris.Raise() - - def test_can_quit_returns_true(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'CanQuit') - self.assertTrue(result) - - def test_quit_should_stop_all_actors(self): - self.mpris.Quit() - self.assert_(objects.exit_process.called) - - def test_has_track_list_returns_false(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'HasTrackList') - self.assertFalse(result) - - def test_identify_is_mopidy(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'Identity') - self.assertEquals(result, 'Mopidy') - - def test_desktop_entry_is_based_on_DESKTOP_FILE_setting(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry') - self.assertEquals(result, 'foo') - - def test_supported_uri_schemes_includes_backend_uri_schemes(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedUriSchemes') - self.assertEquals(len(result), 1) - self.assertEquals(result[0], 'dummy') - - def test_supported_mime_types_is_empty(self): - result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedMimeTypes') - self.assertEquals(len(result), 0)