diff --git a/data/mopidy.desktop b/data/mopidy.desktop
index 70257d58..88dd5ae4 100644
--- a/data/mopidy.desktop
+++ b/data/mopidy.desktop
@@ -8,3 +8,4 @@ TryExec=mopidy
Exec=mopidy
Terminal=true
Categories=AudioVideo;Audio;Player;ConsoleOnly;
+StartupNotify=true
diff --git a/docs/api/frontends.rst b/docs/api/frontends.rst
index 792e4bc9..dc53cca2 100644
--- a/docs/api/frontends.rst
+++ b/docs/api/frontends.rst
@@ -28,3 +28,4 @@ Frontend implementations
* :mod:`mopidy.frontends.lastfm`
* :mod:`mopidy.frontends.mpd`
+* :mod:`mopidy.frontends.mpris`
diff --git a/docs/changes.rst b/docs/changes.rst
index 8179bcbd..f2211dab 100644
--- a/docs/changes.rst
+++ b/docs/changes.rst
@@ -28,6 +28,11 @@ v0.6.0 (in development)
- The MPD command ``idle`` is now supported by Mopidy for the following
subsystems: player, playlist, options, and mixer. (Fixes: :issue:`32`)
+- A new frontend :mod:`mopidy.frontends.mpris` have been added. It exposes
+ Mopidy through the `MPRIS interface `_ over D-Bus. In
+ practice, this makes it possible to control Mopidy thorugh the `Ubuntu Sound
+ Menu `_.
+
**Changes**
- Replace :attr:`mopidy.backends.base.Backend.uri_handlers` with
@@ -127,6 +132,18 @@ Please note that 0.5.0 requires some updated dependencies, as listed under
- Found and worked around strange WMA metadata behaviour.
+- Backend API:
+
+ - Calling on :meth:`mopidy.backends.base.playback.PlaybackController.next`
+ and :meth:`mopidy.backends.base.playback.PlaybackController.previous` no
+ longer implies that playback should be started. The playback state--whether
+ playing, paused or stopped--will now be kept.
+
+ - The method
+ :meth:`mopidy.backends.base.playback.PlaybackController.change_track`
+ has been added. Like ``next()``, and ``prev()``, it changes the current
+ track without changing the playback state.
+
v0.4.1 (2011-05-06)
===================
diff --git a/docs/modules/frontends/mpd.rst b/docs/modules/frontends/mpd.rst
index 6f69b2a9..b0c7e3c5 100644
--- a/docs/modules/frontends/mpd.rst
+++ b/docs/modules/frontends/mpd.rst
@@ -9,26 +9,6 @@
:members:
-MPD server
-==========
-
-.. inheritance-diagram:: mopidy.frontends.mpd.server
-
-.. automodule:: mopidy.frontends.mpd.server
- :synopsis: MPD server
- :members:
-
-
-MPD session
-===========
-
-.. inheritance-diagram:: mopidy.frontends.mpd.session
-
-.. automodule:: mopidy.frontends.mpd.session
- :synopsis: MPD client session
- :members:
-
-
MPD dispatcher
==============
diff --git a/docs/modules/frontends/mpris.rst b/docs/modules/frontends/mpris.rst
new file mode 100644
index 00000000..05a6e287
--- /dev/null
+++ b/docs/modules/frontends/mpris.rst
@@ -0,0 +1,7 @@
+***********************************************
+:mod:`mopidy.frontends.mpris` -- MPRIS frontend
+***********************************************
+
+.. automodule:: mopidy.frontends.mpris
+ :synopsis: MPRIS frontend
+ :members:
diff --git a/docs/settings.rst b/docs/settings.rst
index 68adfd55..76eb6315 100644
--- a/docs/settings.rst
+++ b/docs/settings.rst
@@ -92,7 +92,6 @@ To make a ``tag_cache`` of your local music available for Mopidy:
.. _use_mpd_on_a_network:
-
Connecting from other machines on the network
=============================================
@@ -120,6 +119,33 @@ file::
LASTFM_PASSWORD = u'mysecret'
+.. _install_desktop_file:
+
+Controlling Mopidy through the Ubuntu Sound Menu
+================================================
+
+If you are running Ubuntu and installed Mopidy using the Debian package from
+APT you should be able to control Mopidy through the `Ubuntu Sound Menu
+`_ without any changes.
+
+If you installed Mopidy in any other way and want to control Mopidy through the
+Ubuntu Sound Menu, you must install the ``mopidy.desktop`` file which can be
+found in the ``data/`` dir of the Mopidy source into the
+``/usr/share/applications`` dir by hand::
+
+ cd /path/to/mopidy/source
+ sudo cp data/mopidy.desktop /usr/share/applications/
+
+After you have installed the file, start Mopidy in any way, and Mopidy should
+appear in the Ubuntu Sound Menu. When you quit Mopidy, it will still be listed
+in the Ubuntu Sound Menu, and may be restarted by selecting it there.
+
+The Ubuntu Sound Menu interacts with Mopidy's MPRIS frontend,
+:mod:`mopidy.frontends.mpris`. The MPRIS frontend supports the minimum
+requirements of the `MPRIS specification `_. The
+``TrackList`` and the ``Playlists`` interfaces of the spec are not supported.
+
+
Streaming audio through a SHOUTcast/Icecast server
==================================================
diff --git a/mopidy/backends/base/playback.py b/mopidy/backends/base/playback.py
index 5cab8229..57a7ad85 100644
--- a/mopidy/backends/base/playback.py
+++ b/mopidy/backends/base/playback.py
@@ -327,6 +327,26 @@ class PlaybackController(object):
def _current_wall_time(self):
return int(time.time() * 1000)
+ def change_track(self, cp_track, on_error_step=1):
+ """
+ Change to the given track, keeping the current playback state.
+
+ :param cp_track: track to change to
+ :type cp_track: two-tuple (CPID integer, :class:`mopidy.models.Track`)
+ or :class:`None`
+ :param on_error_step: direction to step at play error, 1 for next
+ track (default), -1 for previous track
+ :type on_error_step: int, -1 or 1
+
+ """
+ old_state = self.state
+ self.stop()
+ self.current_cp_track = cp_track
+ if old_state == self.PLAYING:
+ self.play(on_error_step=on_error_step)
+ elif old_state == self.PAUSED:
+ self.pause()
+
def on_end_of_track(self):
"""
Tell the playback controller that end of track is reached.
@@ -363,20 +383,23 @@ class PlaybackController(object):
self.stop(clear_current_track=True)
def next(self):
- """Play the next track."""
- if self.state == self.STOPPED:
- return
+ """
+ Change to the next track.
+ The current playback state will be kept. If it was playing, playing
+ will continue. If it was paused, it will still be paused, etc.
+ """
if self.cp_track_at_next:
self._trigger_track_playback_ended()
- self.play(self.cp_track_at_next)
+ self.change_track(self.cp_track_at_next)
else:
self.stop(clear_current_track=True)
def pause(self):
"""Pause playback."""
- if self.state == self.PLAYING and self.provider.pause():
+ if self.provider.pause():
self.state = self.PAUSED
+ self._trigger_track_playback_paused()
def play(self, cp_track=None, on_error_step=1):
"""
@@ -393,12 +416,15 @@ class PlaybackController(object):
if cp_track is not None:
assert cp_track in self.backend.current_playlist.cp_tracks
-
- if cp_track is None and self.current_cp_track is None:
- cp_track = self.cp_track_at_next
-
- if cp_track is None and self.state == self.PAUSED:
- self.resume()
+ elif cp_track is None:
+ if self.state == self.PAUSED:
+ return self.resume()
+ elif self.current_cp_track is not None:
+ cp_track = self.current_cp_track
+ elif self.current_cp_track is None and on_error_step == 1:
+ cp_track = self.cp_track_at_next
+ elif self.current_cp_track is None and on_error_step == -1:
+ cp_track = self.cp_track_at_previous
if cp_track is not None:
self.current_cp_track = cp_track
@@ -418,18 +444,20 @@ class PlaybackController(object):
self._trigger_track_playback_started()
def previous(self):
- """Play the previous track."""
- if self.cp_track_at_previous is None:
- return
- if self.state == self.STOPPED:
- return
+ """
+ Change to the previous track.
+
+ The current playback state will be kept. If it was playing, playing
+ will continue. If it was paused, it will still be paused, etc.
+ """
self._trigger_track_playback_ended()
- self.play(self.cp_track_at_previous, on_error_step=-1)
+ self.change_track(self.cp_track_at_previous, on_error_step=-1)
def resume(self):
"""If paused, resume playing the current track."""
if self.state == self.PAUSED and self.provider.resume():
self.state = self.PLAYING
+ self._trigger_track_playback_resumed()
def seek(self, time_position):
"""
@@ -456,7 +484,10 @@ class PlaybackController(object):
self.play_time_started = self._current_wall_time
self.play_time_accumulated = time_position
- return self.provider.seek(time_position)
+ success = self.provider.seek(time_position)
+ if success:
+ self._trigger_seeked()
+ return success
def stop(self, clear_current_track=False):
"""
@@ -473,6 +504,22 @@ class PlaybackController(object):
if clear_current_track:
self.current_cp_track = None
+ def _trigger_track_playback_paused(self):
+ logger.debug(u'Triggering track playback paused event')
+ if self.current_track is None:
+ return
+ BackendListener.send('track_playback_paused',
+ track=self.current_track,
+ time_position=self.time_position)
+
+ def _trigger_track_playback_resumed(self):
+ logger.debug(u'Triggering track playback resumed event')
+ if self.current_track is None:
+ return
+ BackendListener.send('track_playback_resumed',
+ track=self.current_track,
+ time_position=self.time_position)
+
def _trigger_track_playback_started(self):
logger.debug(u'Triggering track playback started event')
if self.current_track is None:
@@ -496,6 +543,10 @@ class PlaybackController(object):
logger.debug(u'Triggering options changed event')
BackendListener.send('options_changed')
+ def _trigger_seeked(self):
+ logger.debug(u'Triggering seeked event')
+ BackendListener.send('seeked')
+
class BasePlaybackProvider(object):
"""
diff --git a/mopidy/frontends/mpris/__init__.py b/mopidy/frontends/mpris/__init__.py
new file mode 100644
index 00000000..579038ca
--- /dev/null
+++ b/mopidy/frontends/mpris/__init__.py
@@ -0,0 +1,130 @@
+import logging
+
+logger = logging.getLogger('mopidy.frontends.mpris')
+
+try:
+ import indicate
+except ImportError as import_error:
+ indicate = None
+ logger.debug(u'Startup notification will not be sent (%s)', import_error)
+
+from pykka.actor import ThreadingActor
+
+from mopidy import settings
+from mopidy.frontends.mpris import objects
+from mopidy.listeners import BackendListener
+
+
+class MprisFrontend(ThreadingActor, BackendListener):
+ """
+ Frontend which lets you control Mopidy through the Media Player Remote
+ Interfacing Specification (`MPRIS `_) D-Bus
+ interface.
+
+ An example of an MPRIS client is the `Ubuntu Sound Menu
+ `_.
+
+ **Dependencies:**
+
+ - D-Bus Python bindings. The package is named ``python-dbus`` in
+ Ubuntu/Debian.
+ - ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the
+ Ubuntu Sound Menu. The package is named ``python-indicate`` in
+ Ubuntu/Debian.
+ - An ``.desktop`` file for Mopidy installed at the path set in
+ :attr:`mopidy.settings.DESKTOP_FILE`. See :ref:`install_desktop_file` for
+ details.
+
+ **Testing the frontend**
+
+ To test, start Mopidy, and then run the following in a Python shell::
+
+ import dbus
+ bus = dbus.SessionBus()
+ player = bus.get_object('org.mpris.MediaPlayer2.mopidy',
+ '/org/mpris/MediaPlayer2')
+
+ Now you can control Mopidy through the player object. Examples:
+
+ - To get some properties from Mopidy, run::
+
+ props = player.GetAll('org.mpris.MediaPlayer2',
+ dbus_interface='org.freedesktop.DBus.Properties')
+
+ - To quit Mopidy through D-Bus, run::
+
+ player.Quit(dbus_interface='org.mpris.MediaPlayer2')
+ """
+
+ def __init__(self):
+ self.indicate_server = None
+ self.mpris_object = None
+
+ def on_start(self):
+ try:
+ self.mpris_object = objects.MprisObject()
+ self._send_startup_notification()
+ except Exception as e:
+ logger.error(u'MPRIS frontend setup failed (%s)', e)
+ self.stop()
+
+ def on_stop(self):
+ logger.debug(u'Removing MPRIS object from D-Bus connection...')
+ if self.mpris_object:
+ self.mpris_object.remove_from_connection()
+ self.mpris_object = None
+ logger.debug(u'Removed MPRIS object from D-Bus connection')
+
+ def _send_startup_notification(self):
+ """
+ Send startup notification using libindicate to make Mopidy appear in
+ e.g. `Ubuntu's sound menu `_.
+
+ A reference to the libindicate server is kept for as long as Mopidy is
+ running. When Mopidy exits, the server will be unreferenced and Mopidy
+ will automatically be unregistered from e.g. the sound menu.
+ """
+ if not indicate:
+ return
+ logger.debug(u'Sending startup notification...')
+ self.indicate_server = indicate.Server()
+ self.indicate_server.set_type('music.mopidy')
+ self.indicate_server.set_desktop_file(settings.DESKTOP_FILE)
+ self.indicate_server.show()
+ logger.debug(u'Startup notification sent')
+
+ def _emit_properties_changed(self, *changed_properties):
+ if self.mpris_object is None:
+ return
+ props_with_new_values = [
+ (p, self.mpris_object.Get(objects.PLAYER_IFACE, p))
+ for p in changed_properties]
+ self.mpris_object.PropertiesChanged(objects.PLAYER_IFACE,
+ dict(props_with_new_values), [])
+
+ def track_playback_paused(self, track, time_position):
+ logger.debug(u'Received track playback paused event')
+ self._emit_properties_changed('PlaybackStatus')
+
+ def track_playback_resumed(self, track, time_position):
+ logger.debug(u'Received track playback resumed event')
+ self._emit_properties_changed('PlaybackStatus')
+
+ def track_playback_started(self, track):
+ logger.debug(u'Received track playback started event')
+ self._emit_properties_changed('PlaybackStatus', 'Metadata')
+
+ def track_playback_ended(self, track, time_position):
+ logger.debug(u'Received track playback ended event')
+ self._emit_properties_changed('PlaybackStatus', 'Metadata')
+
+ def volume_changed(self):
+ logger.debug(u'Received volume changed event')
+ self._emit_properties_changed('Volume')
+
+ def seeked(self):
+ logger.debug(u'Received seeked event')
+ if self.mpris_object is None:
+ return
+ self.mpris_object.Seeked(
+ self.mpris_object.Get(objects.PLAYER_IFACE, 'Position'))
diff --git a/mopidy/frontends/mpris/objects.py b/mopidy/frontends/mpris/objects.py
new file mode 100644
index 00000000..77278778
--- /dev/null
+++ b/mopidy/frontends/mpris/objects.py
@@ -0,0 +1,436 @@
+import logging
+import os
+
+logger = logging.getLogger('mopidy.frontends.mpris')
+
+try:
+ import dbus
+ import dbus.mainloop.glib
+ import dbus.service
+ import gobject
+except ImportError as import_error:
+ from mopidy import OptionalDependencyError
+ raise OptionalDependencyError(import_error)
+
+from pykka.registry import ActorRegistry
+
+from mopidy import settings
+from mopidy.backends.base import Backend
+from mopidy.backends.base.playback import PlaybackController
+from mopidy.mixers.base import BaseMixer
+from mopidy.utils.process import exit_process
+
+# Must be done before dbus.SessionBus() is called
+gobject.threads_init()
+dbus.mainloop.glib.threads_init()
+dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+BUS_NAME = 'org.mpris.MediaPlayer2.mopidy'
+OBJECT_PATH = '/org/mpris/MediaPlayer2'
+ROOT_IFACE = 'org.mpris.MediaPlayer2'
+PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player'
+
+
+class MprisObject(dbus.service.Object):
+ """Implements http://www.mpris.org/2.1/spec/"""
+
+ properties = None
+
+ def __init__(self):
+ self._backend = None
+ self._mixer = None
+ self.properties = {
+ ROOT_IFACE: self._get_root_iface_properties(),
+ PLAYER_IFACE: self._get_player_iface_properties(),
+ }
+ bus_name = self._connect_to_dbus()
+ super(MprisObject, self).__init__(bus_name, OBJECT_PATH)
+
+ def _get_root_iface_properties(self):
+ return {
+ 'CanQuit': (True, None),
+ 'CanRaise': (False, None),
+ # NOTE Change if adding optional track list support
+ 'HasTrackList': (False, None),
+ 'Identity': ('Mopidy', None),
+ 'DesktopEntry': (self.get_DesktopEntry, None),
+ 'SupportedUriSchemes': (self.get_SupportedUriSchemes, None),
+ # NOTE Return MIME types supported by local backend if support for
+ # reporting supported MIME types is added
+ 'SupportedMimeTypes': (dbus.Array([], signature='s'), None),
+ }
+
+ def _get_player_iface_properties(self):
+ return {
+ 'PlaybackStatus': (self.get_PlaybackStatus, None),
+ 'LoopStatus': (self.get_LoopStatus, self.set_LoopStatus),
+ 'Rate': (1.0, self.set_Rate),
+ 'Shuffle': (self.get_Shuffle, self.set_Shuffle),
+ 'Metadata': (self.get_Metadata, None),
+ 'Volume': (self.get_Volume, self.set_Volume),
+ 'Position': (self.get_Position, None),
+ 'MinimumRate': (1.0, None),
+ 'MaximumRate': (1.0, None),
+ 'CanGoNext': (self.get_CanGoNext, None),
+ 'CanGoPrevious': (self.get_CanGoPrevious, None),
+ 'CanPlay': (self.get_CanPlay, None),
+ 'CanPause': (self.get_CanPause, None),
+ 'CanSeek': (self.get_CanSeek, None),
+ 'CanControl': (self.get_CanControl, None),
+ }
+
+ def _connect_to_dbus(self):
+ logger.debug(u'Connecting to D-Bus...')
+ bus_name = dbus.service.BusName(BUS_NAME, dbus.SessionBus())
+ logger.info(u'Connected to D-Bus')
+ return bus_name
+
+ @property
+ def backend(self):
+ if self._backend is None:
+ backend_refs = ActorRegistry.get_by_class(Backend)
+ assert len(backend_refs) == 1, \
+ 'Expected exactly one running backend.'
+ self._backend = backend_refs[0].proxy()
+ return self._backend
+
+ @property
+ def mixer(self):
+ if self._mixer is None:
+ mixer_refs = ActorRegistry.get_by_class(BaseMixer)
+ assert len(mixer_refs) == 1, 'Expected exactly one running mixer.'
+ self._mixer = mixer_refs[0].proxy()
+ return self._mixer
+
+ def _get_track_id(self, cp_track):
+ return '/com/mopidy/track/%d' % cp_track.cpid
+
+ def _get_cpid(self, track_id):
+ assert track_id.startswith('/com/mopidy/track/')
+ return track_id.split('/')[-1]
+
+ ### Properties interface
+
+ @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
+ in_signature='ss', out_signature='v')
+ def Get(self, interface, prop):
+ logger.debug(u'%s.Get(%s, %s) called',
+ dbus.PROPERTIES_IFACE, repr(interface), repr(prop))
+ (getter, setter) = self.properties[interface][prop]
+ if callable(getter):
+ return getter()
+ else:
+ return getter
+
+ @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
+ in_signature='s', out_signature='a{sv}')
+ def GetAll(self, interface):
+ logger.debug(u'%s.GetAll(%s) called',
+ dbus.PROPERTIES_IFACE, repr(interface))
+ getters = {}
+ for key, (getter, setter) in self.properties[interface].iteritems():
+ getters[key] = getter() if callable(getter) else getter
+ return getters
+
+ @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
+ in_signature='ssv', out_signature='')
+ def Set(self, interface, prop, value):
+ logger.debug(u'%s.Set(%s, %s, %s) called',
+ dbus.PROPERTIES_IFACE, repr(interface), repr(prop), repr(value))
+ getter, setter = self.properties[interface][prop]
+ if setter is not None:
+ setter(value)
+ self.PropertiesChanged(interface,
+ {prop: self.Get(interface, prop)}, [])
+
+ @dbus.service.signal(dbus_interface=dbus.PROPERTIES_IFACE,
+ signature='sa{sv}as')
+ def PropertiesChanged(self, interface, changed_properties,
+ invalidated_properties):
+ logger.debug(u'%s.PropertiesChanged(%s, %s, %s) signaled',
+ dbus.PROPERTIES_IFACE, interface, changed_properties,
+ invalidated_properties)
+
+
+ ### Root interface methods
+
+ @dbus.service.method(dbus_interface=ROOT_IFACE)
+ def Raise(self):
+ logger.debug(u'%s.Raise called', ROOT_IFACE)
+ # Do nothing, as we do not have a GUI
+
+ @dbus.service.method(dbus_interface=ROOT_IFACE)
+ def Quit(self):
+ logger.debug(u'%s.Quit called', ROOT_IFACE)
+ exit_process()
+
+
+ ### Root interface properties
+
+ def get_DesktopEntry(self):
+ return os.path.splitext(os.path.basename(settings.DESKTOP_FILE))[0]
+
+ def get_SupportedUriSchemes(self):
+ return dbus.Array(self.backend.uri_schemes.get(), signature='s')
+
+
+ ### Player interface methods
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def Next(self):
+ logger.debug(u'%s.Next called', PLAYER_IFACE)
+ if not self.get_CanGoNext():
+ logger.debug(u'%s.Next not allowed', PLAYER_IFACE)
+ return
+ self.backend.playback.next().get()
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def Previous(self):
+ logger.debug(u'%s.Previous called', PLAYER_IFACE)
+ if not self.get_CanGoPrevious():
+ logger.debug(u'%s.Previous not allowed', PLAYER_IFACE)
+ return
+ self.backend.playback.previous().get()
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def Pause(self):
+ logger.debug(u'%s.Pause called', PLAYER_IFACE)
+ if not self.get_CanPause():
+ logger.debug(u'%s.Pause not allowed', PLAYER_IFACE)
+ return
+ self.backend.playback.pause().get()
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def PlayPause(self):
+ logger.debug(u'%s.PlayPause called', PLAYER_IFACE)
+ if not self.get_CanPause():
+ logger.debug(u'%s.PlayPause not allowed', PLAYER_IFACE)
+ return
+ state = self.backend.playback.state.get()
+ if state == PlaybackController.PLAYING:
+ self.backend.playback.pause().get()
+ elif state == PlaybackController.PAUSED:
+ self.backend.playback.resume().get()
+ elif state == PlaybackController.STOPPED:
+ self.backend.playback.play().get()
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def Stop(self):
+ logger.debug(u'%s.Stop called', PLAYER_IFACE)
+ if not self.get_CanControl():
+ logger.debug(u'%s.Stop not allowed', PLAYER_IFACE)
+ return
+ self.backend.playback.stop().get()
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def Play(self):
+ logger.debug(u'%s.Play called', PLAYER_IFACE)
+ if not self.get_CanPlay():
+ logger.debug(u'%s.Play not allowed', PLAYER_IFACE)
+ return
+ state = self.backend.playback.state.get()
+ if state == PlaybackController.PAUSED:
+ self.backend.playback.resume().get()
+ else:
+ self.backend.playback.play().get()
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def Seek(self, offset):
+ logger.debug(u'%s.Seek called', PLAYER_IFACE)
+ if not self.get_CanSeek():
+ logger.debug(u'%s.Seek not allowed', PLAYER_IFACE)
+ return
+ offset_in_milliseconds = offset // 1000
+ current_position = self.backend.playback.time_position.get()
+ new_position = current_position + offset_in_milliseconds
+ self.backend.playback.seek(new_position)
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def SetPosition(self, track_id, position):
+ logger.debug(u'%s.SetPosition called', PLAYER_IFACE)
+ if not self.get_CanSeek():
+ logger.debug(u'%s.SetPosition not allowed', PLAYER_IFACE)
+ return
+ position = position // 1000
+ current_cp_track = self.backend.playback.current_cp_track.get()
+ if current_cp_track is None:
+ return
+ if track_id != self._get_track_id(current_cp_track):
+ return
+ if position < 0:
+ return
+ if current_cp_track.track.length < position:
+ return
+ self.backend.playback.seek(position)
+
+ @dbus.service.method(dbus_interface=PLAYER_IFACE)
+ def OpenUri(self, uri):
+ logger.debug(u'%s.OpenUri called', PLAYER_IFACE)
+ if not self.get_CanPlay():
+ # NOTE The spec does not explictly require this check, but guarding
+ # the other methods doesn't help much if OpenUri is open for use.
+ logger.debug(u'%s.Play not allowed', PLAYER_IFACE)
+ return
+ # NOTE Check if URI has MIME type known to the backend, if MIME support
+ # is added to the backend.
+ uri_schemes = self.backend.uri_schemes.get()
+ if not any([uri.startswith(uri_scheme) for uri_scheme in uri_schemes]):
+ return
+ track = self.backend.library.lookup(uri).get()
+ if track is not None:
+ cp_track = self.backend.current_playlist.add(track).get()
+ self.backend.playback.play(cp_track)
+ else:
+ logger.debug(u'Track with URI "%s" not found in library.', uri)
+
+
+ ### Player interface signals
+
+ @dbus.service.signal(dbus_interface=PLAYER_IFACE, signature='x')
+ def Seeked(self, position):
+ logger.debug(u'%s.Seeked signaled', PLAYER_IFACE)
+ # Do nothing, as just calling the method is enough to emit the signal.
+
+
+ ### Player interface properties
+
+ def get_PlaybackStatus(self):
+ state = self.backend.playback.state.get()
+ if state == PlaybackController.PLAYING:
+ return 'Playing'
+ elif state == PlaybackController.PAUSED:
+ return 'Paused'
+ elif state == PlaybackController.STOPPED:
+ return 'Stopped'
+
+ def get_LoopStatus(self):
+ repeat = self.backend.playback.repeat.get()
+ single = self.backend.playback.single.get()
+ if not repeat:
+ return 'None'
+ else:
+ if single:
+ return 'Track'
+ else:
+ return 'Playlist'
+
+ def set_LoopStatus(self, value):
+ if not self.get_CanControl():
+ logger.debug(u'Setting %s.LoopStatus not allowed', PLAYER_IFACE)
+ return
+ if value == 'None':
+ self.backend.playback.repeat = False
+ self.backend.playback.single = False
+ elif value == 'Track':
+ self.backend.playback.repeat = True
+ self.backend.playback.single = True
+ elif value == 'Playlist':
+ self.backend.playback.repeat = True
+ self.backend.playback.single = False
+
+ def set_Rate(self, value):
+ if not self.get_CanControl():
+ # NOTE The spec does not explictly require this check, but it was
+ # added to be consistent with all the other property setters.
+ logger.debug(u'Setting %s.Rate not allowed', PLAYER_IFACE)
+ return
+ if value == 0:
+ self.Pause()
+
+ def get_Shuffle(self):
+ return self.backend.playback.random.get()
+
+ def set_Shuffle(self, value):
+ if not self.get_CanControl():
+ logger.debug(u'Setting %s.Shuffle not allowed', PLAYER_IFACE)
+ return
+ if value:
+ self.backend.playback.random = True
+ else:
+ self.backend.playback.random = False
+
+ def get_Metadata(self):
+ current_cp_track = self.backend.playback.current_cp_track.get()
+ if current_cp_track is None:
+ return {'mpris:trackid': ''}
+ else:
+ (cpid, track) = current_cp_track
+ metadata = {'mpris:trackid': self._get_track_id(current_cp_track)}
+ if track.length:
+ metadata['mpris:length'] = track.length * 1000
+ if track.uri:
+ metadata['xesam:url'] = track.uri
+ if track.name:
+ metadata['xesam:title'] = track.name
+ if track.artists:
+ artists = list(track.artists)
+ artists.sort(key=lambda a: a.name)
+ metadata['xesam:artist'] = dbus.Array(
+ [a.name for a in artists if a.name], signature='s')
+ if track.album and track.album.name:
+ metadata['xesam:album'] = track.album.name
+ if track.album and track.album.artists:
+ artists = list(track.album.artists)
+ artists.sort(key=lambda a: a.name)
+ metadata['xesam:albumArtist'] = dbus.Array(
+ [a.name for a in artists if a.name], signature='s')
+ if track.track_no:
+ metadata['xesam:trackNumber'] = track.track_no
+ return dbus.Dictionary(metadata, signature='sv')
+
+ def get_Volume(self):
+ volume = self.mixer.volume.get()
+ if volume is not None:
+ return volume / 100.0
+
+ def set_Volume(self, value):
+ if not self.get_CanControl():
+ logger.debug(u'Setting %s.Volume not allowed', PLAYER_IFACE)
+ return
+ if value is None:
+ return
+ elif value < 0:
+ self.mixer.volume = 0
+ elif value > 1:
+ self.mixer.volume = 100
+ elif 0 <= value <= 1:
+ self.mixer.volume = int(value * 100)
+
+ def get_Position(self):
+ return self.backend.playback.time_position.get() * 1000
+
+ def get_CanGoNext(self):
+ if not self.get_CanControl():
+ return False
+ return (self.backend.playback.cp_track_at_next.get() !=
+ self.backend.playback.current_cp_track.get())
+
+ def get_CanGoPrevious(self):
+ if not self.get_CanControl():
+ return False
+ return (self.backend.playback.cp_track_at_previous.get() !=
+ self.backend.playback.current_cp_track.get())
+
+ def get_CanPlay(self):
+ if not self.get_CanControl():
+ return False
+ return (self.backend.playback.current_track.get() is not None
+ or self.backend.playback.track_at_next.get() is not None)
+
+ def get_CanPause(self):
+ if not self.get_CanControl():
+ return False
+ # NOTE Should be changed to vary based on capabilities of the current
+ # track if Mopidy starts supporting non-seekable media, like streams.
+ return True
+
+ def get_CanSeek(self):
+ if not self.get_CanControl():
+ return False
+ # NOTE Should be changed to vary based on capabilities of the current
+ # track if Mopidy starts supporting non-seekable media, like streams.
+ return True
+
+ def get_CanControl(self):
+ # NOTE This could be a setting for the end user to change.
+ return True
diff --git a/mopidy/listeners.py b/mopidy/listeners.py
index 590f0ad0..ee360bf3 100644
--- a/mopidy/listeners.py
+++ b/mopidy/listeners.py
@@ -14,8 +14,8 @@ class BackendListener(object):
@staticmethod
def send(event, **kwargs):
"""Helper to allow calling of backend listener events"""
- # FIXME this should be updated once pykka supports non-blocking calls
- # on proxies or some similar solution
+ # FIXME this should be updated once Pykka supports non-blocking calls
+ # on proxies or some similar solution.
registry.ActorRegistry.broadcast({
'command': 'pykka_call',
'attr_path': (event,),
@@ -23,6 +23,33 @@ class BackendListener(object):
'kwargs': kwargs,
}, target_class=BackendListener)
+ def track_playback_paused(self, track, time_position):
+ """
+ Called whenever track playback is paused.
+
+ *MAY* be implemented by actor.
+
+ :param track: the track that was playing when playback paused
+ :type track: :class:`mopidy.models.Track`
+ :param time_position: the time position in milliseconds
+ :type time_position: int
+ """
+ pass
+
+ def track_playback_resumed(self, track, time_position):
+ """
+ Called whenever track playback is resumed.
+
+ *MAY* be implemented by actor.
+
+ :param track: the track that was playing when playback resumed
+ :type track: :class:`mopidy.models.Track`
+ :param time_position: the time position in milliseconds
+ :type time_position: int
+ """
+ pass
+
+
def track_playback_started(self, track):
"""
Called whenever a new track starts playing.
@@ -78,3 +105,12 @@ class BackendListener(object):
*MAY* be implemented by actor.
"""
pass
+
+ def seeked(self):
+ """
+ Called whenever the time position changes by an unexpected amount, e.g.
+ at seek to a new time position.
+
+ *MAY* be implemented by actor.
+ """
+ pass
diff --git a/mopidy/settings.py b/mopidy/settings.py
index 9909973e..b1e0c791 100644
--- a/mopidy/settings.py
+++ b/mopidy/settings.py
@@ -49,6 +49,15 @@ DEBUG_LOG_FORMAT = u'%(levelname)-8s %(asctime)s' + \
#: DEBUG_LOG_FILENAME = u'mopidy.log'
DEBUG_LOG_FILENAME = u'mopidy.log'
+#: Location of the Mopidy .desktop file.
+#:
+#: Used by :mod:`mopidy.frontends.mpris`.
+#:
+#: Default::
+#:
+#: DESKTOP_FILE = u'/usr/share/applications/mopidy.desktop'
+DESKTOP_FILE = u'/usr/share/applications/mopidy.desktop'
+
#: List of server frontends to use.
#:
#: Default::
@@ -56,10 +65,12 @@ DEBUG_LOG_FILENAME = u'mopidy.log'
#: FRONTENDS = (
#: u'mopidy.frontends.mpd.MpdFrontend',
#: u'mopidy.frontends.lastfm.LastfmFrontend',
+#: u'mopidy.frontends.mpris.MprisFrontend',
#: )
FRONTENDS = (
u'mopidy.frontends.mpd.MpdFrontend',
u'mopidy.frontends.lastfm.LastfmFrontend',
+ u'mopidy.frontends.mpris.MprisFrontend',
)
#: Your `Last.fm `_ username.
diff --git a/tests/backends/base/playback.py b/tests/backends/base/playback.py
index 2d455225..47a14e3c 100644
--- a/tests/backends/base/playback.py
+++ b/tests/backends/base/playback.py
@@ -555,7 +555,7 @@ class PlaybackControllerTest(object):
@populate_playlist
def test_pause_when_stopped(self):
self.playback.pause()
- self.assertEqual(self.playback.state, self.playback.STOPPED)
+ self.assertEqual(self.playback.state, self.playback.PAUSED)
@populate_playlist
def test_pause_when_playing(self):
diff --git a/tests/backends/events_test.py b/tests/backends/events_test.py
index bc39ac00..88429166 100644
--- a/tests/backends/events_test.py
+++ b/tests/backends/events_test.py
@@ -1,45 +1,51 @@
-import threading
+import mock
import unittest
-from pykka.actor import ThreadingActor
from pykka.registry import ActorRegistry
from mopidy.backends.dummy import DummyBackend
from mopidy.listeners import BackendListener
from mopidy.models import Track
+@mock.patch.object(BackendListener, 'send')
class BackendEventsTest(unittest.TestCase):
def setUp(self):
- self.events = {
- 'track_playback_started': threading.Event(),
- 'track_playback_ended': threading.Event(),
- }
self.backend = DummyBackend.start().proxy()
- self.listener = DummyBackendListener.start(self.events).proxy()
def tearDown(self):
ActorRegistry.stop_all()
- def test_play_sends_track_playback_started_event(self):
- self.backend.current_playlist.add([Track(uri='a')])
+ def test_pause_sends_track_playback_paused_event(self, send):
+ self.backend.current_playlist.add(Track(uri='a'))
+ self.backend.playback.play().get()
+ send.reset_mock()
+ self.backend.playback.pause().get()
+ self.assertEqual(send.call_args[0][0], 'track_playback_paused')
+
+ def test_resume_sends_track_playback_resumed(self, send):
+ self.backend.current_playlist.add(Track(uri='a'))
self.backend.playback.play()
- self.events['track_playback_started'].wait(timeout=1)
- self.assertTrue(self.events['track_playback_started'].is_set())
+ self.backend.playback.pause().get()
+ send.reset_mock()
+ self.backend.playback.resume().get()
+ self.assertEqual(send.call_args[0][0], 'track_playback_resumed')
- def test_stop_sends_track_playback_ended_event(self):
- self.backend.current_playlist.add([Track(uri='a')])
- self.backend.playback.play()
- self.backend.playback.stop()
- self.events['track_playback_ended'].wait(timeout=1)
- self.assertTrue(self.events['track_playback_ended'].is_set())
+ def test_play_sends_track_playback_started_event(self, send):
+ self.backend.current_playlist.add(Track(uri='a'))
+ send.reset_mock()
+ self.backend.playback.play().get()
+ self.assertEqual(send.call_args[0][0], 'track_playback_started')
+ def test_stop_sends_track_playback_ended_event(self, send):
+ self.backend.current_playlist.add(Track(uri='a'))
+ self.backend.playback.play().get()
+ send.reset_mock()
+ self.backend.playback.stop().get()
+ self.assertEqual(send.call_args_list[0][0][0], 'track_playback_ended')
-class DummyBackendListener(ThreadingActor, BackendListener):
- def __init__(self, events):
- self.events = events
-
- def track_playback_started(self, track):
- self.events['track_playback_started'].set()
-
- def track_playback_ended(self, track, time_position):
- self.events['track_playback_ended'].set()
+ def test_seek_sends_seeked_event(self, send):
+ self.backend.current_playlist.add(Track(uri='a', length=40000))
+ self.backend.playback.play().get()
+ send.reset_mock()
+ self.backend.playback.seek(1000).get()
+ self.assertEqual(send.call_args[0][0], 'seeked')
diff --git a/tests/frontends/mpris/__init__.py b/tests/frontends/mpris/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/tests/frontends/mpris/events_test.py b/tests/frontends/mpris/events_test.py
new file mode 100644
index 00000000..2f737744
--- /dev/null
+++ b/tests/frontends/mpris/events_test.py
@@ -0,0 +1,68 @@
+import mock
+import unittest
+
+from mopidy.frontends.mpris import MprisFrontend, objects
+from mopidy.models import Track
+
+class BackendEventsTest(unittest.TestCase):
+ def setUp(self):
+ self.mpris_frontend = MprisFrontend() # As a plain class, not an actor
+ self.mpris_object = mock.Mock(spec=objects.MprisObject)
+ self.mpris_frontend.mpris_object = self.mpris_object
+
+ def test_track_playback_paused_event_changes_playback_status(self):
+ self.mpris_object.Get.return_value = 'Paused'
+ self.mpris_frontend.track_playback_paused(Track(), 0)
+ self.assertListEqual(self.mpris_object.Get.call_args_list, [
+ ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
+ ])
+ self.mpris_object.PropertiesChanged.assert_called_with(
+ objects.PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, [])
+
+ def test_track_playback_resumed_event_changes_playback_status(self):
+ self.mpris_object.Get.return_value = 'Playing'
+ self.mpris_frontend.track_playback_resumed(Track(), 0)
+ self.assertListEqual(self.mpris_object.Get.call_args_list, [
+ ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
+ ])
+ self.mpris_object.PropertiesChanged.assert_called_with(
+ objects.PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, [])
+
+ def test_track_playback_started_event_changes_playback_status_and_metadata(self):
+ self.mpris_object.Get.return_value = '...'
+ self.mpris_frontend.track_playback_started(Track())
+ self.assertListEqual(self.mpris_object.Get.call_args_list, [
+ ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
+ ((objects.PLAYER_IFACE, 'Metadata'), {}),
+ ])
+ self.mpris_object.PropertiesChanged.assert_called_with(
+ objects.PLAYER_IFACE,
+ {'Metadata': '...', 'PlaybackStatus': '...'}, [])
+
+ def test_track_playback_ended_event_changes_playback_status_and_metadata(self):
+ self.mpris_object.Get.return_value = '...'
+ self.mpris_frontend.track_playback_ended(Track(), 0)
+ self.assertListEqual(self.mpris_object.Get.call_args_list, [
+ ((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
+ ((objects.PLAYER_IFACE, 'Metadata'), {}),
+ ])
+ self.mpris_object.PropertiesChanged.assert_called_with(
+ objects.PLAYER_IFACE,
+ {'Metadata': '...', 'PlaybackStatus': '...'}, [])
+
+ def test_volume_changed_event_changes_volume(self):
+ self.mpris_object.Get.return_value = 1.0
+ self.mpris_frontend.volume_changed()
+ self.assertListEqual(self.mpris_object.Get.call_args_list, [
+ ((objects.PLAYER_IFACE, 'Volume'), {}),
+ ])
+ self.mpris_object.PropertiesChanged.assert_called_with(
+ objects.PLAYER_IFACE, {'Volume': 1.0}, [])
+
+ def test_seeked_event_causes_mpris_seeked_event(self):
+ self.mpris_object.Get.return_value = 31000000
+ self.mpris_frontend.seeked()
+ self.assertListEqual(self.mpris_object.Get.call_args_list, [
+ ((objects.PLAYER_IFACE, 'Position'), {}),
+ ])
+ self.mpris_object.Seeked.assert_called_with(31000000)
diff --git a/tests/frontends/mpris/player_interface_test.py b/tests/frontends/mpris/player_interface_test.py
new file mode 100644
index 00000000..ee668a33
--- /dev/null
+++ b/tests/frontends/mpris/player_interface_test.py
@@ -0,0 +1,824 @@
+import mock
+import unittest
+
+from mopidy.backends.dummy import DummyBackend
+from mopidy.backends.base.playback import PlaybackController
+from mopidy.frontends.mpris import objects
+from mopidy.mixers.dummy import DummyMixer
+from mopidy.models import Album, Artist, Track
+
+PLAYING = PlaybackController.PLAYING
+PAUSED = PlaybackController.PAUSED
+STOPPED = PlaybackController.STOPPED
+
+class PlayerInterfaceTest(unittest.TestCase):
+ def setUp(self):
+ objects.MprisObject._connect_to_dbus = mock.Mock()
+ self.mixer = DummyMixer.start().proxy()
+ self.backend = DummyBackend.start().proxy()
+ self.mpris = objects.MprisObject()
+ self.mpris._backend = self.backend
+
+ def tearDown(self):
+ self.backend.stop()
+ self.mixer.stop()
+
+ def test_get_playback_status_is_playing_when_playing(self):
+ self.backend.playback.state = PLAYING
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
+ self.assertEqual('Playing', result)
+
+ def test_get_playback_status_is_paused_when_paused(self):
+ self.backend.playback.state = PAUSED
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
+ self.assertEqual('Paused', result)
+
+ def test_get_playback_status_is_stopped_when_stopped(self):
+ self.backend.playback.state = STOPPED
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
+ self.assertEqual('Stopped', result)
+
+ def test_get_loop_status_is_none_when_not_looping(self):
+ self.backend.playback.repeat = False
+ self.backend.playback.single = False
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
+ self.assertEqual('None', result)
+
+ def test_get_loop_status_is_track_when_looping_a_single_track(self):
+ self.backend.playback.repeat = True
+ self.backend.playback.single = True
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
+ self.assertEqual('Track', result)
+
+ def test_get_loop_status_is_playlist_when_looping_the_current_playlist(self):
+ self.backend.playback.repeat = True
+ self.backend.playback.single = False
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
+ self.assertEqual('Playlist', result)
+
+ def test_set_loop_status_is_ignored_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ self.backend.playback.repeat = True
+ self.backend.playback.single = True
+ self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
+ self.assertEquals(self.backend.playback.repeat.get(), True)
+ self.assertEquals(self.backend.playback.single.get(), True)
+
+ def test_set_loop_status_to_none_unsets_repeat_and_single(self):
+ self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
+ self.assertEquals(self.backend.playback.repeat.get(), False)
+ self.assertEquals(self.backend.playback.single.get(), False)
+
+ def test_set_loop_status_to_track_sets_repeat_and_single(self):
+ self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Track')
+ self.assertEquals(self.backend.playback.repeat.get(), True)
+ self.assertEquals(self.backend.playback.single.get(), True)
+
+ def test_set_loop_status_to_playlists_sets_repeat_and_not_single(self):
+ self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Playlist')
+ self.assertEquals(self.backend.playback.repeat.get(), True)
+ self.assertEquals(self.backend.playback.single.get(), False)
+
+ def test_get_rate_is_greater_or_equal_than_minimum_rate(self):
+ rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
+ minimum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
+ self.assert_(rate >= minimum_rate)
+
+ def test_get_rate_is_less_or_equal_than_maximum_rate(self):
+ rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
+ maximum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
+ self.assert_(rate >= maximum_rate)
+
+ def test_set_rate_is_ignored_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_set_rate_to_zero_pauses_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+
+ def test_get_shuffle_returns_true_if_random_is_active(self):
+ self.backend.playback.random = True
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
+ self.assertTrue(result)
+
+ def test_get_shuffle_returns_false_if_random_is_inactive(self):
+ self.backend.playback.random = False
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
+ self.assertFalse(result)
+
+ def test_set_shuffle_is_ignored_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ self.backend.playback.random = False
+ result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
+ self.assertFalse(self.backend.playback.random.get())
+
+ def test_set_shuffle_to_true_activates_random_mode(self):
+ self.backend.playback.random = False
+ self.assertFalse(self.backend.playback.random.get())
+ result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
+ self.assertTrue(self.backend.playback.random.get())
+
+ def test_set_shuffle_to_false_deactivates_random_mode(self):
+ self.backend.playback.random = True
+ self.assertTrue(self.backend.playback.random.get())
+ result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', False)
+ self.assertFalse(self.backend.playback.random.get())
+
+ def test_get_metadata_has_trackid_even_when_no_current_track(self):
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assert_('mpris:trackid' in result.keys())
+ self.assertEquals(result['mpris:trackid'], '')
+
+ def test_get_metadata_has_trackid_based_on_cpid(self):
+ self.backend.current_playlist.append([Track(uri='a')])
+ self.backend.playback.play()
+ (cpid, track) = self.backend.playback.current_cp_track.get()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('mpris:trackid', result.keys())
+ self.assertEquals(result['mpris:trackid'],
+ '/com/mopidy/track/%d' % cpid)
+
+ def test_get_metadata_has_track_length(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('mpris:length', result.keys())
+ self.assertEquals(result['mpris:length'], 40000000)
+
+ def test_get_metadata_has_track_uri(self):
+ self.backend.current_playlist.append([Track(uri='a')])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('xesam:url', result.keys())
+ self.assertEquals(result['xesam:url'], 'a')
+
+ def test_get_metadata_has_track_title(self):
+ self.backend.current_playlist.append([Track(name='a')])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('xesam:title', result.keys())
+ self.assertEquals(result['xesam:title'], 'a')
+
+ def test_get_metadata_has_track_artists(self):
+ self.backend.current_playlist.append([Track(artists=[
+ Artist(name='a'), Artist(name='b'), Artist(name=None)])])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('xesam:artist', result.keys())
+ self.assertEquals(result['xesam:artist'], ['a', 'b'])
+
+ def test_get_metadata_has_track_album(self):
+ self.backend.current_playlist.append([Track(album=Album(name='a'))])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('xesam:album', result.keys())
+ self.assertEquals(result['xesam:album'], 'a')
+
+ def test_get_metadata_has_track_album_artists(self):
+ self.backend.current_playlist.append([Track(album=Album(artists=[
+ Artist(name='a'), Artist(name='b'), Artist(name=None)]))])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('xesam:albumArtist', result.keys())
+ self.assertEquals(result['xesam:albumArtist'], ['a', 'b'])
+
+ def test_get_metadata_has_track_number_in_album(self):
+ self.backend.current_playlist.append([Track(track_no=7)])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
+ self.assertIn('xesam:trackNumber', result.keys())
+ self.assertEquals(result['xesam:trackNumber'], 7)
+
+ def test_get_volume_should_return_volume_between_zero_and_one(self):
+ self.mixer.volume = 0
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
+ self.assertEquals(result, 0)
+
+ self.mixer.volume = 50
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
+ self.assertEquals(result, 0.5)
+
+ self.mixer.volume = 100
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
+ self.assertEquals(result, 1)
+
+ def test_set_volume_is_ignored_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ self.mixer.volume = 0
+ self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
+ self.assertEquals(self.mixer.volume.get(), 0)
+
+ def test_set_volume_to_one_should_set_mixer_volume_to_100(self):
+ self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
+ self.assertEquals(self.mixer.volume.get(), 100)
+
+ def test_set_volume_to_anything_above_one_should_set_mixer_volume_to_100(self):
+ self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 2.0)
+ self.assertEquals(self.mixer.volume.get(), 100)
+
+ def test_set_volume_to_anything_not_a_number_does_not_change_volume(self):
+ self.mixer.volume = 10
+ self.mpris.Set(objects.PLAYER_IFACE, 'Volume', None)
+ self.assertEquals(self.mixer.volume.get(), 10)
+
+ def test_get_position_returns_time_position_in_microseconds(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+ self.backend.playback.seek(10000)
+ result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position')
+ result_in_milliseconds = result_in_microseconds // 1000
+ self.assert_(result_in_milliseconds >= 10000)
+
+ def test_get_position_when_no_current_track_should_be_zero(self):
+ result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position')
+ result_in_milliseconds = result_in_microseconds // 1000
+ self.assertEquals(result_in_milliseconds, 0)
+
+ def test_get_minimum_rate_is_one_or_less(self):
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
+ self.assert_(result <= 1.0)
+
+ def test_get_maximum_rate_is_one_or_more(self):
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
+ self.assert_(result >= 1.0)
+
+ def test_can_go_next_is_true_if_can_control_and_other_next_track(self):
+ self.mpris.get_CanControl = lambda *_: True
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
+ self.assertTrue(result)
+
+ def test_can_go_next_is_false_if_next_track_is_the_same(self):
+ self.mpris.get_CanControl = lambda *_: True
+ self.backend.current_playlist.append([Track(uri='a')])
+ self.backend.playback.repeat = True
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
+ self.assertFalse(result)
+
+ def test_can_go_next_is_false_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
+ self.assertFalse(result)
+
+ def test_can_go_previous_is_true_if_can_control_and_other_previous_track(self):
+ self.mpris.get_CanControl = lambda *_: True
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.next()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
+ self.assertTrue(result)
+
+ def test_can_go_previous_is_false_if_previous_track_is_the_same(self):
+ self.mpris.get_CanControl = lambda *_: True
+ self.backend.current_playlist.append([Track(uri='a')])
+ self.backend.playback.repeat = True
+ self.backend.playback.play()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
+ self.assertFalse(result)
+
+ def test_can_go_previous_is_false_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.next()
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
+ self.assertFalse(result)
+
+ def test_can_play_is_true_if_can_control_and_current_track(self):
+ self.mpris.get_CanControl = lambda *_: True
+ self.backend.current_playlist.append([Track(uri='a')])
+ self.backend.playback.play()
+ self.assertTrue(self.backend.playback.current_track.get())
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
+ self.assertTrue(result)
+
+ def test_can_play_is_false_if_no_current_track(self):
+ self.mpris.get_CanControl = lambda *_: True
+ self.assertFalse(self.backend.playback.current_track.get())
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
+ self.assertFalse(result)
+
+ def test_can_play_if_false_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
+ self.assertFalse(result)
+
+ def test_can_pause_is_true_if_can_control_and_track_can_be_paused(self):
+ self.mpris.get_CanControl = lambda *_: True
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
+ self.assertTrue(result)
+
+ def test_can_pause_if_false_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
+ self.assertFalse(result)
+
+ def test_can_seek_is_true_if_can_control_is_true(self):
+ self.mpris.get_CanControl = lambda *_: True
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
+ self.assertTrue(result)
+
+ def test_can_seek_is_false_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
+ self.assertFalse(result)
+
+ def test_can_control_is_true(self):
+ result = self.mpris.Get(objects.PLAYER_IFACE, 'CanControl')
+ self.assertTrue(result)
+
+ def test_next_is_ignored_if_can_go_next_is_false(self):
+ self.mpris.get_CanGoNext = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.mpris.Next()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ def test_next_when_playing_should_skip_to_next_track_and_keep_playing(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Next()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_next_when_at_end_of_list_should_stop_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.next()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Next()
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_next_when_paused_should_skip_to_next_track_and_stay_paused(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.pause()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+ self.mpris.Next()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+
+ def test_next_when_stopped_should_skip_to_next_track_and_stay_stopped(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.stop()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+ self.mpris.Next()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_previous_is_ignored_if_can_go_previous_is_false(self):
+ self.mpris.get_CanGoPrevious = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.next()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.mpris.Previous()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+
+ def test_previous_when_playing_should_skip_to_prev_track_and_keep_playing(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.next()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Previous()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_previous_when_at_start_of_list_should_stop_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Previous()
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_previous_when_paused_should_skip_to_previous_track_and_stay_paused(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.next()
+ self.backend.playback.pause()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+ self.mpris.Previous()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+
+ def test_previous_when_stopped_should_skip_to_previous_track_and_stay_stopped(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.next()
+ self.backend.playback.stop()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+ self.mpris.Previous()
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_pause_is_ignored_if_can_pause_is_false(self):
+ self.mpris.get_CanPause = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Pause()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_pause_when_playing_should_pause_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Pause()
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+
+ def test_pause_when_paused_has_no_effect(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.pause()
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+ self.mpris.Pause()
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+
+ def test_playpause_is_ignored_if_can_pause_is_false(self):
+ self.mpris.get_CanPause = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.PlayPause()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_playpause_when_playing_should_pause_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.PlayPause()
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+
+ def test_playpause_when_paused_should_resume_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.pause()
+
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+ at_pause = self.backend.playback.time_position.get()
+ self.assert_(at_pause >= 0)
+
+ self.mpris.PlayPause()
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ after_pause = self.backend.playback.time_position.get()
+ self.assert_(after_pause >= at_pause)
+
+ def test_playpause_when_stopped_should_start_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+ self.mpris.PlayPause()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_stop_is_ignored_if_can_control_is_false(self):
+ self.mpris.get_CanControl = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Stop()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_stop_when_playing_should_stop_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.mpris.Stop()
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_stop_when_paused_should_stop_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.pause()
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+ self.mpris.Stop()
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_play_is_ignored_if_can_play_is_false(self):
+ self.mpris.get_CanPlay = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+ self.mpris.Play()
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_play_when_stopped_starts_playback(self):
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+ self.mpris.Play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ def test_play_after_pause_resumes_from_same_position(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+
+ before_pause = self.backend.playback.time_position.get()
+ self.assert_(before_pause >= 0)
+
+ self.mpris.Pause()
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+ at_pause = self.backend.playback.time_position.get()
+ self.assert_(at_pause >= before_pause)
+
+ self.mpris.Play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ after_pause = self.backend.playback.time_position.get()
+ self.assert_(after_pause >= at_pause)
+
+ def test_play_when_there_is_no_track_has_no_effect(self):
+ self.backend.current_playlist.clear()
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+ self.mpris.Play()
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ def test_seek_is_ignored_if_can_seek_is_false(self):
+ self.mpris.get_CanSeek = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+
+ before_seek = self.backend.playback.time_position.get()
+ self.assert_(before_seek >= 0)
+
+ milliseconds_to_seek = 10000
+ microseconds_to_seek = milliseconds_to_seek * 1000
+
+ self.mpris.Seek(microseconds_to_seek)
+
+ after_seek = self.backend.playback.time_position.get()
+ self.assert_(before_seek <= after_seek < (
+ before_seek + milliseconds_to_seek))
+
+ def test_seek_seeks_given_microseconds_forward_in_the_current_track(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+
+ before_seek = self.backend.playback.time_position.get()
+ self.assert_(before_seek >= 0)
+
+ milliseconds_to_seek = 10000
+ microseconds_to_seek = milliseconds_to_seek * 1000
+
+ self.mpris.Seek(microseconds_to_seek)
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ after_seek = self.backend.playback.time_position.get()
+ self.assert_(after_seek >= (before_seek + milliseconds_to_seek))
+
+ def test_seek_seeks_given_microseconds_backward_if_negative(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+ self.backend.playback.seek(20000)
+
+ before_seek = self.backend.playback.time_position.get()
+ self.assert_(before_seek >= 20000)
+
+ milliseconds_to_seek = -10000
+ microseconds_to_seek = milliseconds_to_seek * 1000
+
+ self.mpris.Seek(microseconds_to_seek)
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ after_seek = self.backend.playback.time_position.get()
+ self.assert_(after_seek >= (before_seek + milliseconds_to_seek))
+ self.assert_(after_seek < before_seek)
+
+ def test_seek_seeks_to_start_of_track_if_new_position_is_negative(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+ self.backend.playback.seek(20000)
+
+ before_seek = self.backend.playback.time_position.get()
+ self.assert_(before_seek >= 20000)
+
+ milliseconds_to_seek = -30000
+ microseconds_to_seek = milliseconds_to_seek * 1000
+
+ self.mpris.Seek(microseconds_to_seek)
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ after_seek = self.backend.playback.time_position.get()
+ self.assert_(after_seek >= (before_seek + milliseconds_to_seek))
+ self.assert_(after_seek < before_seek)
+ self.assert_(after_seek >= 0)
+
+ def test_seek_skips_to_next_track_if_new_position_larger_than_track_length(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000),
+ Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.seek(20000)
+
+ before_seek = self.backend.playback.time_position.get()
+ self.assert_(before_seek >= 20000)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ milliseconds_to_seek = 50000
+ microseconds_to_seek = milliseconds_to_seek * 1000
+
+ self.mpris.Seek(microseconds_to_seek)
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
+
+ after_seek = self.backend.playback.time_position.get()
+ self.assert_(after_seek >= 0)
+ self.assert_(after_seek < before_seek)
+
+ def test_set_position_is_ignored_if_can_seek_is_false(self):
+ self.mpris.get_CanSeek = lambda *_: False
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+
+ before_set_position = self.backend.playback.time_position.get()
+ self.assert_(before_set_position <= 5000)
+
+ track_id = 'a'
+
+ position_to_set_in_milliseconds = 20000
+ position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
+
+ self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
+
+ after_set_position = self.backend.playback.time_position.get()
+ self.assert_(before_set_position <= after_set_position <
+ position_to_set_in_milliseconds)
+
+ def test_set_position_sets_the_current_track_position_in_microsecs(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+
+ before_set_position = self.backend.playback.time_position.get()
+ self.assert_(before_set_position <= 5000)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ track_id = '/com/mopidy/track/0'
+
+ position_to_set_in_milliseconds = 20000
+ position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
+
+ self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+
+ after_set_position = self.backend.playback.time_position.get()
+ self.assert_(after_set_position >= position_to_set_in_milliseconds)
+
+ def test_set_position_does_nothing_if_the_position_is_negative(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+ self.backend.playback.seek(20000)
+
+ before_set_position = self.backend.playback.time_position.get()
+ self.assert_(before_set_position >= 20000)
+ self.assert_(before_set_position <= 25000)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ track_id = '/com/mopidy/track/0'
+
+ position_to_set_in_milliseconds = -1000
+ position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
+
+ self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
+
+ after_set_position = self.backend.playback.time_position.get()
+ self.assert_(after_set_position >= before_set_position)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ def test_set_position_does_nothing_if_position_is_larger_than_track_length(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+ self.backend.playback.seek(20000)
+
+ before_set_position = self.backend.playback.time_position.get()
+ self.assert_(before_set_position >= 20000)
+ self.assert_(before_set_position <= 25000)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ track_id = 'a'
+
+ position_to_set_in_milliseconds = 50000
+ position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
+
+ self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
+
+ after_set_position = self.backend.playback.time_position.get()
+ self.assert_(after_set_position >= before_set_position)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ def test_set_position_does_nothing_if_track_id_does_not_match_current_track(self):
+ self.backend.current_playlist.append([Track(uri='a', length=40000)])
+ self.backend.playback.play()
+ self.backend.playback.seek(20000)
+
+ before_set_position = self.backend.playback.time_position.get()
+ self.assert_(before_set_position >= 20000)
+ self.assert_(before_set_position <= 25000)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ track_id = 'b'
+
+ position_to_set_in_milliseconds = 0
+ position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
+
+ self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
+
+ after_set_position = self.backend.playback.time_position.get()
+ self.assert_(after_set_position >= before_set_position)
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ def test_open_uri_is_ignored_if_can_play_is_false(self):
+ self.mpris.get_CanPlay = lambda *_: False
+ self.backend.library.provider.dummy_library = [
+ Track(uri='dummy:/test/uri')]
+ self.mpris.OpenUri('dummy:/test/uri')
+ self.assertEquals(len(self.backend.current_playlist.tracks.get()), 0)
+
+ def test_open_uri_ignores_uris_with_unknown_uri_scheme(self):
+ self.assertListEqual(self.backend.uri_schemes.get(), ['dummy'])
+ self.mpris.get_CanPlay = lambda *_: True
+ self.backend.library.provider.dummy_library = [
+ Track(uri='notdummy:/test/uri')]
+ self.mpris.OpenUri('notdummy:/test/uri')
+ self.assertEquals(len(self.backend.current_playlist.tracks.get()), 0)
+
+ def test_open_uri_adds_uri_to_current_playlist(self):
+ self.mpris.get_CanPlay = lambda *_: True
+ self.backend.library.provider.dummy_library = [
+ Track(uri='dummy:/test/uri')]
+ self.mpris.OpenUri('dummy:/test/uri')
+ self.assertEquals(self.backend.current_playlist.tracks.get()[0].uri,
+ 'dummy:/test/uri')
+
+ def test_open_uri_starts_playback_of_new_track_if_stopped(self):
+ self.mpris.get_CanPlay = lambda *_: True
+ self.backend.library.provider.dummy_library = [
+ Track(uri='dummy:/test/uri')]
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.assertEquals(self.backend.playback.state.get(), STOPPED)
+
+ self.mpris.OpenUri('dummy:/test/uri')
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri,
+ 'dummy:/test/uri')
+
+ def test_open_uri_starts_playback_of_new_track_if_paused(self):
+ self.mpris.get_CanPlay = lambda *_: True
+ self.backend.library.provider.dummy_library = [
+ Track(uri='dummy:/test/uri')]
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.backend.playback.pause()
+ self.assertEquals(self.backend.playback.state.get(), PAUSED)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ self.mpris.OpenUri('dummy:/test/uri')
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri,
+ 'dummy:/test/uri')
+
+ def test_open_uri_starts_playback_of_new_track_if_playing(self):
+ self.mpris.get_CanPlay = lambda *_: True
+ self.backend.library.provider.dummy_library = [
+ Track(uri='dummy:/test/uri')]
+ self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
+ self.backend.playback.play()
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
+
+ self.mpris.OpenUri('dummy:/test/uri')
+
+ self.assertEquals(self.backend.playback.state.get(), PLAYING)
+ self.assertEquals(self.backend.playback.current_track.get().uri,
+ 'dummy:/test/uri')
diff --git a/tests/frontends/mpris/root_interface_test.py b/tests/frontends/mpris/root_interface_test.py
new file mode 100644
index 00000000..52a1c66e
--- /dev/null
+++ b/tests/frontends/mpris/root_interface_test.py
@@ -0,0 +1,61 @@
+import mock
+import unittest
+
+from mopidy import settings
+from mopidy.backends.dummy import DummyBackend
+from mopidy.frontends.mpris import objects
+
+class RootInterfaceTest(unittest.TestCase):
+ def setUp(self):
+ objects.exit_process = mock.Mock()
+ objects.MprisObject._connect_to_dbus = mock.Mock()
+ self.backend = DummyBackend.start().proxy()
+ self.mpris = objects.MprisObject()
+
+ def tearDown(self):
+ self.backend.stop()
+
+ def test_constructor_connects_to_dbus(self):
+ self.assert_(self.mpris._connect_to_dbus.called)
+
+ def test_can_raise_returns_false(self):
+ result = self.mpris.Get(objects.ROOT_IFACE, 'CanRaise')
+ self.assertFalse(result)
+
+ def test_raise_does_nothing(self):
+ self.mpris.Raise()
+
+ def test_can_quit_returns_true(self):
+ result = self.mpris.Get(objects.ROOT_IFACE, 'CanQuit')
+ self.assertTrue(result)
+
+ def test_quit_should_stop_all_actors(self):
+ self.mpris.Quit()
+ self.assert_(objects.exit_process.called)
+
+ def test_has_track_list_returns_false(self):
+ result = self.mpris.Get(objects.ROOT_IFACE, 'HasTrackList')
+ self.assertFalse(result)
+
+ def test_identify_is_mopidy(self):
+ result = self.mpris.Get(objects.ROOT_IFACE, 'Identity')
+ self.assertEquals(result, 'Mopidy')
+
+ def test_desktop_entry_is_mopidy(self):
+ result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry')
+ self.assertEquals(result, 'mopidy')
+
+ def test_desktop_entry_is_based_on_DESKTOP_FILE_setting(self):
+ settings.runtime['DESKTOP_FILE'] = '/tmp/foo.desktop'
+ result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry')
+ self.assertEquals(result, 'foo')
+ settings.runtime.clear()
+
+ def test_supported_uri_schemes_is_empty(self):
+ result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedUriSchemes')
+ self.assertEquals(len(result), 1)
+ self.assertEquals(result[0], 'dummy')
+
+ def test_supported_mime_types_is_empty(self):
+ result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedMimeTypes')
+ self.assertEquals(len(result), 0)
diff --git a/tests/listeners_test.py b/tests/listeners_test.py
index 2c31efdb..d67da692 100644
--- a/tests/listeners_test.py
+++ b/tests/listeners_test.py
@@ -7,8 +7,29 @@ class BackendListenerTest(unittest.TestCase):
def setUp(self):
self.listener = BackendListener()
- def test_listener_has_default_impl_for_the_track_playback_started(self):
+ def test_listener_has_default_impl_for_track_playback_paused(self):
+ self.listener.track_playback_paused(Track(), 0)
+
+ def test_listener_has_default_impl_for_track_playback_resumed(self):
+ self.listener.track_playback_resumed(Track(), 0)
+
+ def test_listener_has_default_impl_for_track_playback_started(self):
self.listener.track_playback_started(Track())
- def test_listener_has_default_impl_for_the_track_playback_ended(self):
+ def test_listener_has_default_impl_for_track_playback_ended(self):
self.listener.track_playback_ended(Track(), 0)
+
+ def test_listener_has_default_impl_for_playback_state_changed(self):
+ self.listener.playback_state_changed()
+
+ def test_listener_has_default_impl_for_playlist_changed(self):
+ self.listener.playlist_changed()
+
+ def test_listener_has_default_impl_for_options_changed(self):
+ self.listener.options_changed()
+
+ def test_listener_has_default_impl_for_volume_changed(self):
+ self.listener.volume_changed()
+
+ def test_listener_has_default_impl_for_seeked(self):
+ self.listener.seeked()