mpris: Move to external extension

This commit is contained in:
Stein Magnus Jodal 2013-10-07 23:41:15 +02:00
parent 509afdbb02
commit c589583b74
17 changed files with 31 additions and 1995 deletions

View File

@ -49,4 +49,3 @@ Frontend implementations
* :mod:`mopidy.frontends.http`
* :mod:`mopidy.frontends.mpd`
* :mod:`mopidy.frontends.mpris`

View File

@ -8,7 +8,8 @@ MPRIS clients
Specification. It's a spec that describes a standard D-Bus interface for making
media players available to other applications on the same system.
Mopidy's :ref:`MPRIS frontend <ext-mpris>` currently implements all required
The MPRIS frontend provided by the `Mopidy-MPRIS extension
<https://github.com/mopidy/mopidy-mpris>`_ currently implements all required
parts of the MPRIS spec, plus the optional playlist interface. It does not
implement the optional tracklist interface.

View File

@ -36,19 +36,21 @@ How to make Mopidy available as an UPnP MediaRenderer
=====================================================
With the help of `the Rygel project <https://live.gnome.org/Rygel>`_ Mopidy can
be made available as an UPnP MediaRenderer. Rygel will interface with Mopidy's
:ref:`MPRIS frontend <ext-mpris>`, and make Mopidy available as a MediaRenderer
on the local network. Since this depends on the MPRIS frontend, which again
depends on D-Bus being available, this will only work on Linux, and not OS X.
MPRIS/D-Bus is only available to other applications on the same host, so Rygel
must be running on the same machine as Mopidy.
be made available as an UPnP MediaRenderer. Rygel will interface with the MPRIS
interface provided by the `Mopidy-MPRIS extension
<https://github.com/mopidy/mopidy-mpris>`_, and make Mopidy available as a
MediaRenderer on the local network. Since this depends on the MPRIS frontend,
which again depends on D-Bus being available, this will only work on Linux, and
not OS X. MPRIS/D-Bus is only available to other applications on the same
host, so Rygel must be running on the same machine as Mopidy.
1. Start Mopidy and make sure the :ref:`MPRIS frontend <ext-mpris>` is working.
It is activated by default, but you may miss dependencies or be using OS X,
in which case it will not work. Check the console output when Mopidy is
started for any errors related to the MPRIS frontend. If you're unsure it is
working, there are instructions for how to test it on the :ref:`MPRIS
frontend <ext-mpris>` page.
1. Start Mopidy and make sure the MPRIS frontend is working. It is activated
by default when the Mopidy-MPRIS extension is installed, but you may miss
dependencies or be using OS X, in which case it will not work. Check the
console output when Mopidy is started for any errors related to the MPRIS
frontend. If you're unsure it is working, there are instructions for how to
test it on in the `Mopidy-MPRIS readme
<https://github.com/mopidy/mopidy-mpris>`_.
2. Install Rygel. On Debian/Ubuntu::

View File

@ -48,6 +48,15 @@ Provides a backend for playing music from `Google Play Music
<https://play.google.com/music/>`_.
Mopidy-MPRIS
------------
https://github.com/mopidy/mopidy-mpris
Extension for controlling Mopidy through the `MPRIS <http://www.mpris.org/>`_
D-Bus interface, for example using the Ubuntu Sound Menu.
Mopidy-NAD
----------
@ -67,7 +76,7 @@ Extension for scrobbling played tracks to Last.fm.
Mopidy-SomaFM
-------------
https://github.com/AlexandrePTJ/mopidy-somafm/
https://github.com/AlexandrePTJ/mopidy-somafm
Provides a backend for playing music from the `SomaFM <http://somafm.com/>`_
service.

View File

@ -1,105 +0,0 @@
.. _ext-mpris:
************
Mopidy-MPRIS
************
This extension lets you control Mopidy through the Media Player Remote
Interfacing Specification (`MPRIS <http://www.mpris.org/>`_) D-Bus interface.
An example of an MPRIS client is the :ref:`ubuntu-sound-menu`.
Dependencies
============
- D-Bus Python bindings. The package is named ``python-dbus`` in
Ubuntu/Debian.
- ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the
Ubuntu Sound Menu. The package is named ``python-indicate`` in
Ubuntu/Debian.
- An ``.desktop`` file for Mopidy installed at the path set in the
:confval:`mpris/desktop_file` config value. See usage section below for
details.
Default configuration
=====================
.. literalinclude:: ../../mopidy/frontends/mpris/ext.conf
:language: ini
Configuration values
====================
.. confval:: mpris/enabled
If the MPRIS extension should be enabled or not.
.. confval:: mpris/desktop_file
Location of the Mopidy ``.desktop`` file.
Usage
=====
The extension is enabled by default if all dependencies are available.
Controlling Mopidy through the Ubuntu Sound Menu
------------------------------------------------
If you are running Ubuntu and installed Mopidy using the Debian package from
APT you should be able to control Mopidy through the :ref:`ubuntu-sound-menu`
without any changes.
If you installed Mopidy in any other way and want to control Mopidy through the
Ubuntu Sound Menu, you must install the ``mopidy.desktop`` file which can be
found in the ``data/`` dir of the Mopidy source repo into the
``/usr/share/applications`` dir by hand::
cd /path/to/mopidy/source
sudo cp data/mopidy.desktop /usr/share/applications/
If the correct path to the installed ``mopidy.desktop`` file on your system
isn't ``/usr/share/applications/mopidy.conf``, you'll need to set the
:confval:`mpris/desktop_file` config value.
After you have installed the file, start Mopidy in any way, and Mopidy should
appear in the Ubuntu Sound Menu. When you quit Mopidy, it will still be listed
in the Ubuntu Sound Menu, and may be restarted by selecting it there.
The Ubuntu Sound Menu interacts with Mopidy's MPRIS frontend. The MPRIS
frontend supports the minimum requirements of the `MPRIS specification
<http://www.mpris.org/>`_. The ``TrackList`` interface of the spec is not
supported.
Testing the MPRIS API directly
------------------------------
To use the MPRIS API directly, start Mopidy, and then run the following in a
Python shell::
import dbus
bus = dbus.SessionBus()
player = bus.get_object('org.mpris.MediaPlayer2.mopidy',
'/org/mpris/MediaPlayer2')
Now you can control Mopidy through the player object. Examples:
- To get some properties from Mopidy, run::
props = player.GetAll('org.mpris.MediaPlayer2',
dbus_interface='org.freedesktop.DBus.Properties')
- To quit Mopidy through D-Bus, run::
player.Quit(dbus_interface='org.mpris.MediaPlayer2')
For details on the API, please refer to the `MPRIS specification
<http://www.mpris.org/>`_.

View File

@ -24,10 +24,9 @@ Glossary
frontend
A part of Mopidy *using* the :term:`core` API. Existing frontends
include the :ref:`MPD server <ext-mpd>`, the :ref:`MPRIS/D-Bus
integration <ext-mpris>`, the Last.fm scrobbler, and the :ref:`HTTP
server <ext-http>` with JavaScript API. See :ref:`frontend-api` for
details.
include the :ref:`MPD server <ext-mpd>`, the MPRIS/D-Bus integration,
the Last.fm scrobbler, and the :ref:`HTTP server <ext-http>` with
JavaScript API. See :ref:`frontend-api` for details.
mixer
A GStreamer element that controls audio volume.

View File

@ -250,8 +250,8 @@ can install Mopidy from PyPI using Pip.
sudo pip-python install -U cherrypy ws4py
#. Optional: To use MPRIS, e.g. for controlling Mopidy from the Ubuntu Sound
Menu or from an UPnP client via Rygel, you need some additional
#. Optional: To use Mopidy-MPRIS, e.g. for controlling Mopidy from the Ubuntu
Sound Menu or from an UPnP client via Rygel, you need some additional
dependencies: the Python bindings for libindicate, and the Python bindings
for libdbus, the reference D-Bus library.

View File

@ -1,36 +0,0 @@
from __future__ import unicode_literals
import os
import mopidy
from mopidy import config, exceptions, ext
class Extension(ext.Extension):
dist_name = 'Mopidy-MPRIS'
ext_name = 'mpris'
version = mopidy.__version__
def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
return config.read(conf_file)
def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema['desktop_file'] = config.Path()
return schema
def validate_environment(self):
if 'DISPLAY' not in os.environ:
raise exceptions.ExtensionError(
'An X11 $DISPLAY is needed to use D-Bus')
try:
import dbus # noqa
except ImportError as e:
raise exceptions.ExtensionError('dbus library not found', e)
def get_frontend_classes(self):
from .actor import MprisFrontend
return [MprisFrontend]

View File

@ -1,110 +0,0 @@
from __future__ import unicode_literals
import logging
import os
import pykka
from mopidy.core import CoreListener
from mopidy.frontends.mpris import objects
logger = logging.getLogger('mopidy.frontends.mpris')
try:
indicate = None
if 'DISPLAY' in os.environ:
import indicate
except ImportError:
pass
if indicate is None:
logger.debug('Startup notification will not be sent')
class MprisFrontend(pykka.ThreadingActor, CoreListener):
def __init__(self, config, core):
super(MprisFrontend, self).__init__()
self.config = config
self.core = core
self.indicate_server = None
self.mpris_object = None
def on_start(self):
try:
self.mpris_object = objects.MprisObject(self.config, self.core)
self._send_startup_notification()
except Exception as e:
logger.warning('MPRIS frontend setup failed (%s)', e)
self.stop()
def on_stop(self):
logger.debug('Removing MPRIS object from D-Bus connection...')
if self.mpris_object:
self.mpris_object.remove_from_connection()
self.mpris_object = None
logger.debug('Removed MPRIS object from D-Bus connection')
def _send_startup_notification(self):
"""
Send startup notification using libindicate to make Mopidy appear in
e.g. `Ubunt's sound menu <https://wiki.ubuntu.com/SoundMenu>`_.
A reference to the libindicate server is kept for as long as Mopidy is
running. When Mopidy exits, the server will be unreferenced and Mopidy
will automatically be unregistered from e.g. the sound menu.
"""
if not indicate:
return
logger.debug('Sending startup notification...')
self.indicate_server = indicate.Server()
self.indicate_server.set_type('music.mopidy')
self.indicate_server.set_desktop_file(
self.config['mpris']['desktop_file'])
self.indicate_server.show()
logger.debug('Startup notification sent')
def _emit_properties_changed(self, interface, changed_properties):
if self.mpris_object is None:
return
props_with_new_values = [
(p, self.mpris_object.Get(interface, p))
for p in changed_properties]
self.mpris_object.PropertiesChanged(
interface, dict(props_with_new_values), [])
def track_playback_paused(self, tl_track, time_position):
logger.debug('Received track_playback_paused event')
self._emit_properties_changed(objects.PLAYER_IFACE, ['PlaybackStatus'])
def track_playback_resumed(self, tl_track, time_position):
logger.debug('Received track_playback_resumed event')
self._emit_properties_changed(objects.PLAYER_IFACE, ['PlaybackStatus'])
def track_playback_started(self, tl_track):
logger.debug('Received track_playback_started event')
self._emit_properties_changed(
objects.PLAYER_IFACE, ['PlaybackStatus', 'Metadata'])
def track_playback_ended(self, tl_track, time_position):
logger.debug('Received track_playback_ended event')
self._emit_properties_changed(
objects.PLAYER_IFACE, ['PlaybackStatus', 'Metadata'])
def volume_changed(self, volume):
logger.debug('Received volume_changed event')
self._emit_properties_changed(objects.PLAYER_IFACE, ['Volume'])
def seeked(self, time_position_in_ms):
logger.debug('Received seeked event')
self.mpris_object.Seeked(time_position_in_ms * 1000)
def playlists_loaded(self):
logger.debug('Received playlists_loaded event')
self._emit_properties_changed(
objects.PLAYLISTS_IFACE, ['PlaylistCount'])
def playlist_changed(self, playlist):
logger.debug('Received playlist_changed event')
playlist_id = self.mpris_object.get_playlist_id(playlist.uri)
playlist = (playlist_id, playlist.name, '')
self.mpris_object.PlaylistChanged(playlist)

View File

@ -1,3 +0,0 @@
[mpris]
enabled = true
desktop_file = /usr/share/applications/mopidy.desktop

View File

@ -1,498 +0,0 @@
from __future__ import unicode_literals
import base64
import logging
import os
import dbus
import dbus.mainloop.glib
import dbus.service
import gobject
from mopidy.core import PlaybackState
from mopidy.utils.process import exit_process
logger = logging.getLogger('mopidy.frontends.mpris')
# Must be done before dbus.SessionBus() is called
gobject.threads_init()
dbus.mainloop.glib.threads_init()
BUS_NAME = 'org.mpris.MediaPlayer2.mopidy'
OBJECT_PATH = '/org/mpris/MediaPlayer2'
ROOT_IFACE = 'org.mpris.MediaPlayer2'
PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player'
PLAYLISTS_IFACE = 'org.mpris.MediaPlayer2.Playlists'
class MprisObject(dbus.service.Object):
"""Implements http://www.mpris.org/2.2/spec/"""
properties = None
def __init__(self, config, core):
self.config = config
self.core = core
self.properties = {
ROOT_IFACE: self._get_root_iface_properties(),
PLAYER_IFACE: self._get_player_iface_properties(),
PLAYLISTS_IFACE: self._get_playlists_iface_properties(),
}
bus_name = self._connect_to_dbus()
dbus.service.Object.__init__(self, bus_name, OBJECT_PATH)
def _get_root_iface_properties(self):
return {
'CanQuit': (True, None),
'Fullscreen': (False, None),
'CanSetFullscreen': (False, None),
'CanRaise': (False, None),
# NOTE Change if adding optional track list support
'HasTrackList': (False, None),
'Identity': ('Mopidy', None),
'DesktopEntry': (self.get_DesktopEntry, None),
'SupportedUriSchemes': (self.get_SupportedUriSchemes, None),
# NOTE Return MIME types supported by local backend if support for
# reporting supported MIME types is added
'SupportedMimeTypes': (dbus.Array([], signature='s'), None),
}
def _get_player_iface_properties(self):
return {
'PlaybackStatus': (self.get_PlaybackStatus, None),
'LoopStatus': (self.get_LoopStatus, self.set_LoopStatus),
'Rate': (1.0, self.set_Rate),
'Shuffle': (self.get_Shuffle, self.set_Shuffle),
'Metadata': (self.get_Metadata, None),
'Volume': (self.get_Volume, self.set_Volume),
'Position': (self.get_Position, None),
'MinimumRate': (1.0, None),
'MaximumRate': (1.0, None),
'CanGoNext': (self.get_CanGoNext, None),
'CanGoPrevious': (self.get_CanGoPrevious, None),
'CanPlay': (self.get_CanPlay, None),
'CanPause': (self.get_CanPause, None),
'CanSeek': (self.get_CanSeek, None),
'CanControl': (self.get_CanControl, None),
}
def _get_playlists_iface_properties(self):
return {
'PlaylistCount': (self.get_PlaylistCount, None),
'Orderings': (self.get_Orderings, None),
'ActivePlaylist': (self.get_ActivePlaylist, None),
}
def _connect_to_dbus(self):
logger.debug('Connecting to D-Bus...')
mainloop = dbus.mainloop.glib.DBusGMainLoop()
bus_name = dbus.service.BusName(
BUS_NAME, dbus.SessionBus(mainloop=mainloop))
logger.info('MPRIS server connected to D-Bus')
return bus_name
def get_playlist_id(self, playlist_uri):
# Only A-Za-z0-9_ is allowed, which is 63 chars, so we can't use
# base64. Luckily, D-Bus does not limit the length of object paths.
# Since base32 pads trailing bytes with "=" chars, we need to replace
# them with an allowed character such as "_".
encoded_uri = base64.b32encode(playlist_uri).replace('=', '_')
return '/com/mopidy/playlist/%s' % encoded_uri
def get_playlist_uri(self, playlist_id):
encoded_uri = playlist_id.split('/')[-1].replace('_', '=')
return base64.b32decode(encoded_uri)
def get_track_id(self, tl_track):
return '/com/mopidy/track/%d' % tl_track.tlid
def get_track_tlid(self, track_id):
assert track_id.startswith('/com/mopidy/track/')
return track_id.split('/')[-1]
### Properties interface
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='ss', out_signature='v')
def Get(self, interface, prop):
logger.debug(
'%s.Get(%s, %s) called',
dbus.PROPERTIES_IFACE, repr(interface), repr(prop))
(getter, _) = self.properties[interface][prop]
if callable(getter):
return getter()
else:
return getter
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
logger.debug(
'%s.GetAll(%s) called', dbus.PROPERTIES_IFACE, repr(interface))
getters = {}
for key, (getter, _) in self.properties[interface].iteritems():
getters[key] = getter() if callable(getter) else getter
return getters
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='ssv', out_signature='')
def Set(self, interface, prop, value):
logger.debug(
'%s.Set(%s, %s, %s) called',
dbus.PROPERTIES_IFACE, repr(interface), repr(prop), repr(value))
_, setter = self.properties[interface][prop]
if setter is not None:
setter(value)
self.PropertiesChanged(
interface, {prop: self.Get(interface, prop)}, [])
@dbus.service.signal(dbus_interface=dbus.PROPERTIES_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed_properties,
invalidated_properties):
logger.debug(
'%s.PropertiesChanged(%s, %s, %s) signaled',
dbus.PROPERTIES_IFACE, interface, changed_properties,
invalidated_properties)
### Root interface methods
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Raise(self):
logger.debug('%s.Raise called', ROOT_IFACE)
# Do nothing, as we do not have a GUI
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Quit(self):
logger.debug('%s.Quit called', ROOT_IFACE)
exit_process()
### Root interface properties
def get_DesktopEntry(self):
return os.path.splitext(os.path.basename(
self.config['mpris']['desktop_file']))[0]
def get_SupportedUriSchemes(self):
return dbus.Array(self.core.uri_schemes.get(), signature='s')
### Player interface methods
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Next(self):
logger.debug('%s.Next called', PLAYER_IFACE)
if not self.get_CanGoNext():
logger.debug('%s.Next not allowed', PLAYER_IFACE)
return
self.core.playback.next().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Previous(self):
logger.debug('%s.Previous called', PLAYER_IFACE)
if not self.get_CanGoPrevious():
logger.debug('%s.Previous not allowed', PLAYER_IFACE)
return
self.core.playback.previous().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Pause(self):
logger.debug('%s.Pause called', PLAYER_IFACE)
if not self.get_CanPause():
logger.debug('%s.Pause not allowed', PLAYER_IFACE)
return
self.core.playback.pause().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def PlayPause(self):
logger.debug('%s.PlayPause called', PLAYER_IFACE)
if not self.get_CanPause():
logger.debug('%s.PlayPause not allowed', PLAYER_IFACE)
return
state = self.core.playback.state.get()
if state == PlaybackState.PLAYING:
self.core.playback.pause().get()
elif state == PlaybackState.PAUSED:
self.core.playback.resume().get()
elif state == PlaybackState.STOPPED:
self.core.playback.play().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Stop(self):
logger.debug('%s.Stop called', PLAYER_IFACE)
if not self.get_CanControl():
logger.debug('%s.Stop not allowed', PLAYER_IFACE)
return
self.core.playback.stop().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Play(self):
logger.debug('%s.Play called', PLAYER_IFACE)
if not self.get_CanPlay():
logger.debug('%s.Play not allowed', PLAYER_IFACE)
return
state = self.core.playback.state.get()
if state == PlaybackState.PAUSED:
self.core.playback.resume().get()
else:
self.core.playback.play().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Seek(self, offset):
logger.debug('%s.Seek called', PLAYER_IFACE)
if not self.get_CanSeek():
logger.debug('%s.Seek not allowed', PLAYER_IFACE)
return
offset_in_milliseconds = offset // 1000
current_position = self.core.playback.time_position.get()
new_position = current_position + offset_in_milliseconds
self.core.playback.seek(new_position)
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def SetPosition(self, track_id, position):
logger.debug('%s.SetPosition called', PLAYER_IFACE)
if not self.get_CanSeek():
logger.debug('%s.SetPosition not allowed', PLAYER_IFACE)
return
position = position // 1000
current_tl_track = self.core.playback.current_tl_track.get()
if current_tl_track is None:
return
if track_id != self.get_track_id(current_tl_track):
return
if position < 0:
return
if current_tl_track.track.length < position:
return
self.core.playback.seek(position)
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def OpenUri(self, uri):
logger.debug('%s.OpenUri called', PLAYER_IFACE)
if not self.get_CanPlay():
# NOTE The spec does not explictly require this check, but guarding
# the other methods doesn't help much if OpenUri is open for use.
logger.debug('%s.Play not allowed', PLAYER_IFACE)
return
# NOTE Check if URI has MIME type known to the backend, if MIME support
# is added to the backend.
tl_tracks = self.core.tracklist.add(uri=uri).get()
if tl_tracks:
self.core.playback.play(tl_tracks[0])
else:
logger.debug('Track with URI "%s" not found in library.', uri)
### Player interface signals
@dbus.service.signal(dbus_interface=PLAYER_IFACE, signature='x')
def Seeked(self, position):
logger.debug('%s.Seeked signaled', PLAYER_IFACE)
# Do nothing, as just calling the method is enough to emit the signal.
### Player interface properties
def get_PlaybackStatus(self):
state = self.core.playback.state.get()
if state == PlaybackState.PLAYING:
return 'Playing'
elif state == PlaybackState.PAUSED:
return 'Paused'
elif state == PlaybackState.STOPPED:
return 'Stopped'
def get_LoopStatus(self):
repeat = self.core.playback.repeat.get()
single = self.core.playback.single.get()
if not repeat:
return 'None'
else:
if single:
return 'Track'
else:
return 'Playlist'
def set_LoopStatus(self, value):
if not self.get_CanControl():
logger.debug('Setting %s.LoopStatus not allowed', PLAYER_IFACE)
return
if value == 'None':
self.core.playback.repeat = False
self.core.playback.single = False
elif value == 'Track':
self.core.playback.repeat = True
self.core.playback.single = True
elif value == 'Playlist':
self.core.playback.repeat = True
self.core.playback.single = False
def set_Rate(self, value):
if not self.get_CanControl():
# NOTE The spec does not explictly require this check, but it was
# added to be consistent with all the other property setters.
logger.debug('Setting %s.Rate not allowed', PLAYER_IFACE)
return
if value == 0:
self.Pause()
def get_Shuffle(self):
return self.core.playback.random.get()
def set_Shuffle(self, value):
if not self.get_CanControl():
logger.debug('Setting %s.Shuffle not allowed', PLAYER_IFACE)
return
if value:
self.core.playback.random = True
else:
self.core.playback.random = False
def get_Metadata(self):
current_tl_track = self.core.playback.current_tl_track.get()
if current_tl_track is None:
return {'mpris:trackid': ''}
else:
(_, track) = current_tl_track
metadata = {'mpris:trackid': self.get_track_id(current_tl_track)}
if track.length:
metadata['mpris:length'] = track.length * 1000
if track.uri:
metadata['xesam:url'] = track.uri
if track.name:
metadata['xesam:title'] = track.name
if track.artists:
artists = list(track.artists)
artists.sort(key=lambda a: a.name)
metadata['xesam:artist'] = dbus.Array(
[a.name for a in artists if a.name], signature='s')
if track.album and track.album.name:
metadata['xesam:album'] = track.album.name
if track.album and track.album.artists:
artists = list(track.album.artists)
artists.sort(key=lambda a: a.name)
metadata['xesam:albumArtist'] = dbus.Array(
[a.name for a in artists if a.name], signature='s')
if track.album and track.album.images:
url = list(track.album.images)[0]
if url:
metadata['mpris:artUrl'] = url
if track.disc_no:
metadata['xesam:discNumber'] = track.disc_no
if track.track_no:
metadata['xesam:trackNumber'] = track.track_no
return dbus.Dictionary(metadata, signature='sv')
def get_Volume(self):
volume = self.core.playback.volume.get()
if volume is None:
return 0
return volume / 100.0
def set_Volume(self, value):
if not self.get_CanControl():
logger.debug('Setting %s.Volume not allowed', PLAYER_IFACE)
return
if value is None:
return
elif value < 0:
self.core.playback.volume = 0
elif value > 1:
self.core.playback.volume = 100
elif 0 <= value <= 1:
self.core.playback.volume = int(value * 100)
def get_Position(self):
return self.core.playback.time_position.get() * 1000
def get_CanGoNext(self):
if not self.get_CanControl():
return False
return (
self.core.playback.tl_track_at_next.get() !=
self.core.playback.current_tl_track.get())
def get_CanGoPrevious(self):
if not self.get_CanControl():
return False
return (
self.core.playback.tl_track_at_previous.get() !=
self.core.playback.current_tl_track.get())
def get_CanPlay(self):
if not self.get_CanControl():
return False
return (
self.core.playback.current_tl_track.get() is not None or
self.core.playback.tl_track_at_next.get() is not None)
def get_CanPause(self):
if not self.get_CanControl():
return False
# NOTE Should be changed to vary based on capabilities of the current
# track if Mopidy starts supporting non-seekable media, like streams.
return True
def get_CanSeek(self):
if not self.get_CanControl():
return False
# NOTE Should be changed to vary based on capabilities of the current
# track if Mopidy starts supporting non-seekable media, like streams.
return True
def get_CanControl(self):
# NOTE This could be a setting for the end user to change.
return True
### Playlists interface methods
@dbus.service.method(dbus_interface=PLAYLISTS_IFACE)
def ActivatePlaylist(self, playlist_id):
logger.debug(
'%s.ActivatePlaylist(%r) called', PLAYLISTS_IFACE, playlist_id)
playlist_uri = self.get_playlist_uri(playlist_id)
playlist = self.core.playlists.lookup(playlist_uri).get()
if playlist and playlist.tracks:
tl_tracks = self.core.tracklist.add(playlist.tracks).get()
self.core.playback.play(tl_tracks[0])
@dbus.service.method(dbus_interface=PLAYLISTS_IFACE)
def GetPlaylists(self, index, max_count, order, reverse):
logger.debug(
'%s.GetPlaylists(%r, %r, %r, %r) called',
PLAYLISTS_IFACE, index, max_count, order, reverse)
playlists = self.core.playlists.playlists.get()
if order == 'Alphabetical':
playlists.sort(key=lambda p: p.name, reverse=reverse)
elif order == 'Modified':
playlists.sort(key=lambda p: p.last_modified, reverse=reverse)
elif order == 'User' and reverse:
playlists.reverse()
slice_end = index + max_count
playlists = playlists[index:slice_end]
results = [
(self.get_playlist_id(p.uri), p.name, '')
for p in playlists]
return dbus.Array(results, signature='(oss)')
### Playlists interface signals
@dbus.service.signal(dbus_interface=PLAYLISTS_IFACE, signature='(oss)')
def PlaylistChanged(self, playlist):
logger.debug('%s.PlaylistChanged signaled', PLAYLISTS_IFACE)
# Do nothing, as just calling the method is enough to emit the signal.
### Playlists interface properties
def get_PlaylistCount(self):
return len(self.core.playlists.playlists.get())
def get_Orderings(self):
return [
'Alphabetical', # Order by playlist.name
'Modified', # Order by playlist.last_modified
'User', # Don't change order
]
def get_ActivePlaylist(self):
playlist_is_valid = False
playlist = ('/', 'None', '')
return (playlist_is_valid, playlist)

View File

@ -47,7 +47,6 @@ setup(
'http = mopidy.frontends.http:Extension [http]',
'local = mopidy.backends.local:Extension',
'mpd = mopidy.frontends.mpd:Extension',
'mpris = mopidy.frontends.mpris:Extension',
'spotify = mopidy.backends.spotify:Extension [spotify]',
'stream = mopidy.backends.stream:Extension',
],

View File

@ -1 +0,0 @@
from __future__ import unicode_literals

View File

@ -1,92 +0,0 @@
from __future__ import unicode_literals
import mock
import unittest
try:
import dbus
except ImportError:
dbus = False
from mopidy.models import Playlist, TlTrack
if dbus:
from mopidy.frontends.mpris import actor, objects
@unittest.skipUnless(dbus, 'dbus not found')
class BackendEventsTest(unittest.TestCase):
def setUp(self):
# As a plain class, not an actor:
self.mpris_frontend = actor.MprisFrontend(config=None, core=None)
self.mpris_object = mock.Mock(spec=objects.MprisObject)
self.mpris_frontend.mpris_object = self.mpris_object
def test_track_playback_paused_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Paused'
self.mpris_frontend.track_playback_paused(TlTrack(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, [])
def test_track_playback_resumed_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Playing'
self.mpris_frontend.track_playback_resumed(TlTrack(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, [])
def test_track_playback_started_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_started(TlTrack())
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_track_playback_ended_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_ended(TlTrack(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_volume_changed_event_changes_volume(self):
self.mpris_object.Get.return_value = 1.0
self.mpris_frontend.volume_changed(volume=100)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'Volume'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'Volume': 1.0}, [])
def test_seeked_event_causes_mpris_seeked_event(self):
self.mpris_frontend.seeked(31000)
self.mpris_object.Seeked.assert_called_with(31000000)
def test_playlists_loaded_event_changes_playlist_count(self):
self.mpris_object.Get.return_value = 17
self.mpris_frontend.playlists_loaded()
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYLISTS_IFACE, 'PlaylistCount'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYLISTS_IFACE, {'PlaylistCount': 17}, [])
def test_playlist_changed_event_causes_mpris_playlist_changed_event(self):
self.mpris_object.get_playlist_id.return_value = 'id-for-dummy:foo'
playlist = Playlist(uri='dummy:foo', name='foo')
self.mpris_frontend.playlist_changed(playlist)
self.mpris_object.PlaylistChanged.assert_called_with(
('id-for-dummy:foo', 'foo', ''))

View File

@ -1,869 +0,0 @@
from __future__ import unicode_literals
import mock
import unittest
import pykka
try:
import dbus
except ImportError:
dbus = False
from mopidy import core
from mopidy.backends import dummy
from mopidy.core import PlaybackState
from mopidy.models import Album, Artist, Track
if dbus:
from mopidy.frontends.mpris import objects
PLAYING = PlaybackState.PLAYING
PAUSED = PlaybackState.PAUSED
STOPPED = PlaybackState.STOPPED
@unittest.skipUnless(dbus, 'dbus not found')
class PlayerInterfaceTest(unittest.TestCase):
def setUp(self):
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = dummy.create_dummy_backend_proxy()
self.core = core.Core.start(backends=[self.backend]).proxy()
self.mpris = objects.MprisObject(config={}, core=self.core)
def tearDown(self):
pykka.ActorRegistry.stop_all()
def test_get_playback_status_is_playing_when_playing(self):
self.core.playback.state = PLAYING
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Playing', result)
def test_get_playback_status_is_paused_when_paused(self):
self.core.playback.state = PAUSED
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Paused', result)
def test_get_playback_status_is_stopped_when_stopped(self):
self.core.playback.state = STOPPED
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Stopped', result)
def test_get_loop_status_is_none_when_not_looping(self):
self.core.playback.repeat = False
self.core.playback.single = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('None', result)
def test_get_loop_status_is_track_when_looping_a_single_track(self):
self.core.playback.repeat = True
self.core.playback.single = True
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Track', result)
def test_get_loop_status_is_playlist_when_looping_tracklist(self):
self.core.playback.repeat = True
self.core.playback.single = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Playlist', result)
def test_set_loop_status_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.playback.repeat = True
self.core.playback.single = True
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEqual(self.core.playback.repeat.get(), True)
self.assertEqual(self.core.playback.single.get(), True)
def test_set_loop_status_to_none_unsets_repeat_and_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEqual(self.core.playback.repeat.get(), False)
self.assertEqual(self.core.playback.single.get(), False)
def test_set_loop_status_to_track_sets_repeat_and_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Track')
self.assertEqual(self.core.playback.repeat.get(), True)
self.assertEqual(self.core.playback.single.get(), True)
def test_set_loop_status_to_playlists_sets_repeat_and_not_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Playlist')
self.assertEqual(self.core.playback.repeat.get(), True)
self.assertEqual(self.core.playback.single.get(), False)
def test_get_rate_is_greater_or_equal_than_minimum_rate(self):
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
minimum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assertGreaterEqual(rate, minimum_rate)
def test_get_rate_is_less_or_equal_than_maximum_rate(self):
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
maximum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assertGreaterEqual(rate, maximum_rate)
def test_set_rate_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_set_rate_to_zero_pauses_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_get_shuffle_returns_true_if_random_is_active(self):
self.core.playback.random = True
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertTrue(result)
def test_get_shuffle_returns_false_if_random_is_inactive(self):
self.core.playback.random = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertFalse(result)
def test_set_shuffle_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.playback.random = False
self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertFalse(self.core.playback.random.get())
def test_set_shuffle_to_true_activates_random_mode(self):
self.core.playback.random = False
self.assertFalse(self.core.playback.random.get())
self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertTrue(self.core.playback.random.get())
def test_set_shuffle_to_false_deactivates_random_mode(self):
self.core.playback.random = True
self.assertTrue(self.core.playback.random.get())
self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', False)
self.assertFalse(self.core.playback.random.get())
def test_get_metadata_has_trackid_even_when_no_current_track(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:trackid', result.keys())
self.assertEqual(result['mpris:trackid'], '')
def test_get_metadata_has_trackid_based_on_tlid(self):
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.play()
(tlid, track) = self.core.playback.current_tl_track.get()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:trackid', result.keys())
self.assertEqual(
result['mpris:trackid'], '/com/mopidy/track/%d' % tlid)
def test_get_metadata_has_track_length(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:length', result.keys())
self.assertEqual(result['mpris:length'], 40000000)
def test_get_metadata_has_track_uri(self):
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:url', result.keys())
self.assertEqual(result['xesam:url'], 'dummy:a')
def test_get_metadata_has_track_title(self):
self.core.tracklist.add([Track(name='a')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:title', result.keys())
self.assertEqual(result['xesam:title'], 'a')
def test_get_metadata_has_track_artists(self):
self.core.tracklist.add([Track(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)])])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:artist', result.keys())
self.assertEqual(result['xesam:artist'], ['a', 'b'])
def test_get_metadata_has_track_album(self):
self.core.tracklist.add([Track(album=Album(name='a'))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:album', result.keys())
self.assertEqual(result['xesam:album'], 'a')
def test_get_metadata_has_track_album_artists(self):
self.core.tracklist.add([Track(album=Album(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)]))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:albumArtist', result.keys())
self.assertEqual(result['xesam:albumArtist'], ['a', 'b'])
def test_get_metadata_use_first_album_image_as_art_url(self):
# XXX Currently, the album image order isn't preserved because they
# are stored as a frozenset(). We pick the first in the set, which is
# sorted alphabetically, thus we get 'bar.jpg', not 'foo.jpg', which
# would probably make more sense.
self.core.tracklist.add([Track(album=Album(images=[
'http://example.com/foo.jpg', 'http://example.com/bar.jpg']))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:artUrl', result.keys())
self.assertEqual(result['mpris:artUrl'], 'http://example.com/bar.jpg')
def test_get_metadata_has_no_art_url_if_no_album(self):
self.core.tracklist.add([Track()])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertNotIn('mpris:artUrl', result.keys())
def test_get_metadata_has_no_art_url_if_no_album_images(self):
self.core.tracklist.add([Track(Album(images=[]))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertNotIn('mpris:artUrl', result.keys())
def test_get_metadata_has_disc_number_in_album(self):
self.core.tracklist.add([Track(disc_no=2)])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:discNumber', result.keys())
self.assertEqual(result['xesam:discNumber'], 2)
def test_get_metadata_has_track_number_in_album(self):
self.core.tracklist.add([Track(track_no=7)])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:trackNumber', result.keys())
self.assertEqual(result['xesam:trackNumber'], 7)
def test_get_volume_should_return_volume_between_zero_and_one(self):
self.core.playback.volume = None
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 0)
self.core.playback.volume = 0
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 0)
self.core.playback.volume = 50
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 0.5)
self.core.playback.volume = 100
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 1)
def test_set_volume_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.playback.volume = 0
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEqual(self.core.playback.volume.get(), 0)
def test_set_volume_to_one_should_set_mixer_volume_to_100(self):
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEqual(self.core.playback.volume.get(), 100)
def test_set_volume_to_anything_above_one_sets_mixer_volume_to_100(self):
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 2.0)
self.assertEqual(self.core.playback.volume.get(), 100)
def test_set_volume_to_anything_not_a_number_does_not_change_volume(self):
self.core.playback.volume = 10
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', None)
self.assertEqual(self.core.playback.volume.get(), 10)
def test_get_position_returns_time_position_in_microseconds(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(10000)
result_in_microseconds = self.mpris.Get(
objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assertGreaterEqual(result_in_milliseconds, 10000)
def test_get_position_when_no_current_track_should_be_zero(self):
result_in_microseconds = self.mpris.Get(
objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assertEqual(result_in_milliseconds, 0)
def test_get_minimum_rate_is_one_or_less(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assertLessEqual(result, 1.0)
def test_get_maximum_rate_is_one_or_more(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assertGreaterEqual(result, 1.0)
def test_can_go_next_is_true_if_can_control_and_other_next_track(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertTrue(result)
def test_can_go_next_is_false_if_next_track_is_the_same(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.repeat = True
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_next_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_previous_is_true_if_can_control_and_previous_track(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertTrue(result)
def test_can_go_previous_is_false_if_previous_track_is_the_same(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.repeat = True
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_go_previous_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_play_is_true_if_can_control_and_current_track(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.play()
self.assertTrue(self.core.playback.current_track.get())
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertTrue(result)
def test_can_play_is_false_if_no_current_track(self):
self.mpris.get_CanControl = lambda *_: True
self.assertFalse(self.core.playback.current_track.get())
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_play_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_pause_is_true_if_can_control_and_track_can_be_paused(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertTrue(result)
def test_can_pause_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertFalse(result)
def test_can_seek_is_true_if_can_control_is_true(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertTrue(result)
def test_can_seek_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertFalse(result)
def test_can_control_is_true(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanControl')
self.assertTrue(result)
def test_next_is_ignored_if_can_go_next_is_false(self):
self.mpris.get_CanGoNext = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_next_when_playing_skips_to_next_track_and_keep_playing(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_next_when_at_end_of_list_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Next()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_next_when_paused_should_skip_to_next_track_and_stay_paused(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_next_when_stopped_skips_to_next_track_and_stay_stopped(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.stop()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_previous_is_ignored_if_can_go_previous_is_false(self):
self.mpris.get_CanGoPrevious = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
def test_previous_when_playing_skips_to_prev_track_and_keep_playing(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_previous_when_at_start_of_list_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Previous()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_previous_when_paused_skips_to_previous_track_and_pause(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.core.playback.pause()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_previous_when_stopped_skips_to_previous_track_and_stops(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.core.playback.stop()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_pause_is_ignored_if_can_pause_is_false(self):
self.mpris.get_CanPause = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_pause_when_playing_should_pause_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_pause_when_paused_has_no_effect(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_playpause_is_ignored_if_can_pause_is_false(self):
self.mpris.get_CanPause = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_playpause_when_playing_should_pause_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_playpause_when_paused_should_resume_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
at_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(at_pause, 0)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(after_pause, at_pause)
def test_playpause_when_stopped_should_start_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_stop_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Stop()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_stop_when_playing_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Stop()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_stop_when_paused_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Stop()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_play_is_ignored_if_can_play_is_false(self):
self.mpris.get_CanPlay = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_play_when_stopped_starts_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_play_after_pause_resumes_from_same_position(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(before_pause, 0)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
at_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(at_pause, before_pause)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(after_pause, at_pause)
def test_play_when_there_is_no_track_has_no_effect(self):
self.core.tracklist.clear()
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_seek_is_ignored_if_can_seek_is_false(self):
self.mpris.get_CanSeek = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 0)
milliseconds_to_seek = 10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
after_seek = self.core.playback.time_position.get()
self.assertLessEqual(before_seek, after_seek)
self.assertLess(after_seek, before_seek + milliseconds_to_seek)
def test_seek_seeks_given_microseconds_forward_in_the_current_track(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 0)
milliseconds_to_seek = 10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek)
def test_seek_seeks_given_microseconds_backward_if_negative(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 20000)
milliseconds_to_seek = -10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek)
self.assertLess(after_seek, before_seek)
def test_seek_seeks_to_start_of_track_if_new_position_is_negative(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 20000)
milliseconds_to_seek = -30000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek)
self.assertLess(after_seek, before_seek)
self.assertGreaterEqual(after_seek, 0)
def test_seek_skips_to_next_track_if_new_position_gt_track_length(self):
self.core.tracklist.add([
Track(uri='dummy:a', length=40000),
Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.seek(20000)
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 20000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
milliseconds_to_seek = 50000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, 0)
self.assertLess(after_seek, before_seek)
def test_set_position_is_ignored_if_can_seek_is_false(self):
self.mpris.get_CanSeek = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_set_position = self.core.playback.time_position.get()
self.assertLessEqual(before_set_position, 5000)
track_id = 'a'
position_to_set_in_millisec = 20000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertLessEqual(before_set_position, after_set_position)
self.assertLess(after_set_position, position_to_set_in_millisec)
def test_set_position_sets_the_current_track_position_in_microsecs(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_set_position = self.core.playback.time_position.get()
self.assertLessEqual(before_set_position, 5000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
track_id = '/com/mopidy/track/0'
position_to_set_in_millisec = 20000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(
after_set_position, position_to_set_in_millisec)
def test_set_position_does_nothing_if_the_position_is_negative(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(before_set_position, 20000)
self.assertLessEqual(before_set_position, 25000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
track_id = '/com/mopidy/track/0'
position_to_set_in_millisec = -1000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(after_set_position, before_set_position)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_set_position_does_nothing_if_position_is_gt_track_length(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(before_set_position, 20000)
self.assertLessEqual(before_set_position, 25000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
track_id = 'a'
position_to_set_in_millisec = 50000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(after_set_position, before_set_position)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_set_position_is_noop_if_track_id_isnt_current_track(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(before_set_position, 20000)
self.assertLessEqual(before_set_position, 25000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
track_id = 'b'
position_to_set_in_millisec = 0
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(after_set_position, before_set_position)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_open_uri_is_ignored_if_can_play_is_false(self):
self.mpris.get_CanPlay = lambda *_: False
self.backend.library.dummy_library = [
Track(uri='dummy:/test/uri')]
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(len(self.core.tracklist.tracks.get()), 0)
def test_open_uri_ignores_uris_with_unknown_uri_scheme(self):
self.assertListEqual(self.core.uri_schemes.get(), ['dummy'])
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='notdummy:/test/uri')]
self.mpris.OpenUri('notdummy:/test/uri')
self.assertEqual(len(self.core.tracklist.tracks.get()), 0)
def test_open_uri_adds_uri_to_tracklist(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(
self.core.tracklist.tracks.get()[0].uri, 'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_stopped(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(
self.core.playback.current_track.get().uri, 'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_paused(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(
self.core.playback.current_track.get().uri, 'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_playing(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(
self.core.playback.current_track.get().uri, 'dummy:/test/uri')

View File

@ -1,172 +0,0 @@
from __future__ import unicode_literals
import datetime
import mock
import unittest
import pykka
try:
import dbus
except ImportError:
dbus = False
from mopidy import core
from mopidy.audio import PlaybackState
from mopidy.backends import dummy
from mopidy.models import Track
if dbus:
from mopidy.frontends.mpris import objects
@unittest.skipUnless(dbus, 'dbus not found')
class PlayerInterfaceTest(unittest.TestCase):
def setUp(self):
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = dummy.create_dummy_backend_proxy()
self.core = core.Core.start(backends=[self.backend]).proxy()
self.mpris = objects.MprisObject(config={}, core=self.core)
foo = self.core.playlists.create('foo').get()
foo = foo.copy(last_modified=datetime.datetime(2012, 3, 1, 6, 0, 0))
foo = self.core.playlists.save(foo).get()
bar = self.core.playlists.create('bar').get()
bar = bar.copy(last_modified=datetime.datetime(2012, 2, 1, 6, 0, 0))
bar = self.core.playlists.save(bar).get()
baz = self.core.playlists.create('baz').get()
baz = baz.copy(last_modified=datetime.datetime(2012, 1, 1, 6, 0, 0))
baz = self.core.playlists.save(baz).get()
self.playlist = baz
def tearDown(self):
pykka.ActorRegistry.stop_all()
def test_activate_playlist_appends_tracks_to_tracklist(self):
self.core.tracklist.add([
Track(uri='dummy:old-a'),
Track(uri='dummy:old-b'),
])
self.playlist = self.playlist.copy(tracks=[
Track(uri='dummy:baz-a'),
Track(uri='dummy:baz-b'),
Track(uri='dummy:baz-c'),
])
self.playlist = self.core.playlists.save(self.playlist).get()
self.assertEqual(2, self.core.tracklist.length.get())
playlists = self.mpris.GetPlaylists(0, 100, 'User', False)
playlist_id = playlists[2][0]
self.mpris.ActivatePlaylist(playlist_id)
self.assertEqual(5, self.core.tracklist.length.get())
self.assertEqual(
PlaybackState.PLAYING, self.core.playback.state.get())
self.assertEqual(
self.playlist.tracks[0], self.core.playback.current_track.get())
def test_activate_empty_playlist_is_harmless(self):
self.assertEqual(0, self.core.tracklist.length.get())
playlists = self.mpris.GetPlaylists(0, 100, 'User', False)
playlist_id = playlists[2][0]
self.mpris.ActivatePlaylist(playlist_id)
self.assertEqual(0, self.core.tracklist.length.get())
self.assertEqual(
PlaybackState.STOPPED, self.core.playback.state.get())
self.assertIsNone(self.core.playback.current_track.get())
def test_get_playlists_in_alphabetical_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Alphabetical', False)
self.assertEqual(3, len(result))
self.assertEqual('/com/mopidy/playlist/MR2W23LZHJRGC4Q_', result[0][0])
self.assertEqual('bar', result[0][1])
self.assertEqual('/com/mopidy/playlist/MR2W23LZHJRGC6Q_', result[1][0])
self.assertEqual('baz', result[1][1])
self.assertEqual('/com/mopidy/playlist/MR2W23LZHJTG63Y_', result[2][0])
self.assertEqual('foo', result[2][1])
def test_get_playlists_in_reverse_alphabetical_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Alphabetical', True)
self.assertEqual(3, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('baz', result[1][1])
self.assertEqual('bar', result[2][1])
def test_get_playlists_in_modified_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Modified', False)
self.assertEqual(3, len(result))
self.assertEqual('baz', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('foo', result[2][1])
def test_get_playlists_in_reverse_modified_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Modified', True)
self.assertEqual(3, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('baz', result[2][1])
def test_get_playlists_in_user_order(self):
result = self.mpris.GetPlaylists(0, 100, 'User', False)
self.assertEqual(3, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('baz', result[2][1])
def test_get_playlists_in_reverse_user_order(self):
result = self.mpris.GetPlaylists(0, 100, 'User', True)
self.assertEqual(3, len(result))
self.assertEqual('baz', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('foo', result[2][1])
def test_get_playlists_slice_on_start_of_list(self):
result = self.mpris.GetPlaylists(0, 2, 'User', False)
self.assertEqual(2, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('bar', result[1][1])
def test_get_playlists_slice_later_in_list(self):
result = self.mpris.GetPlaylists(2, 2, 'User', False)
self.assertEqual(1, len(result))
self.assertEqual('baz', result[0][1])
def test_get_playlist_count_returns_number_of_playlists(self):
result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'PlaylistCount')
self.assertEqual(3, result)
def test_get_orderings_includes_alpha_modified_and_user(self):
result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'Orderings')
self.assertIn('Alphabetical', result)
self.assertNotIn('Created', result)
self.assertIn('Modified', result)
self.assertNotIn('Played', result)
self.assertIn('User', result)
def test_get_active_playlist_does_not_return_a_playlist(self):
result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'ActivePlaylist')
valid, playlist = result
playlist_id, playlist_name, playlist_icon_uri = playlist
self.assertEqual(False, valid)
self.assertEqual('/', playlist_id)
self.assertEqual('None', playlist_name)
self.assertEqual('', playlist_icon_uri)

View File

@ -1,87 +0,0 @@
from __future__ import unicode_literals
import mock
import unittest
import pykka
try:
import dbus
except ImportError:
dbus = False
from mopidy import core
from mopidy.backends import dummy
if dbus:
from mopidy.frontends.mpris import objects
@unittest.skipUnless(dbus, 'dbus not found')
class RootInterfaceTest(unittest.TestCase):
def setUp(self):
config = {
'mpris': {
'desktop_file': '/tmp/foo.desktop',
}
}
objects.exit_process = mock.Mock()
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = dummy.create_dummy_backend_proxy()
self.core = core.Core.start(backends=[self.backend]).proxy()
self.mpris = objects.MprisObject(config=config, core=self.core)
def tearDown(self):
pykka.ActorRegistry.stop_all()
def test_constructor_connects_to_dbus(self):
self.assert_(self.mpris._connect_to_dbus.called)
def test_fullscreen_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'Fullscreen')
self.assertFalse(result)
def test_setting_fullscreen_fails_and_returns_none(self):
result = self.mpris.Set(objects.ROOT_IFACE, 'Fullscreen', 'True')
self.assertIsNone(result)
def test_can_set_fullscreen_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanSetFullscreen')
self.assertFalse(result)
def test_can_raise_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanRaise')
self.assertFalse(result)
def test_raise_does_nothing(self):
self.mpris.Raise()
def test_can_quit_returns_true(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanQuit')
self.assertTrue(result)
def test_quit_should_stop_all_actors(self):
self.mpris.Quit()
self.assert_(objects.exit_process.called)
def test_has_track_list_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'HasTrackList')
self.assertFalse(result)
def test_identify_is_mopidy(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'Identity')
self.assertEquals(result, 'Mopidy')
def test_desktop_entry_is_based_on_DESKTOP_FILE_setting(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry')
self.assertEquals(result, 'foo')
def test_supported_uri_schemes_includes_backend_uri_schemes(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedUriSchemes')
self.assertEquals(len(result), 1)
self.assertEquals(result[0], 'dummy')
def test_supported_mime_types_is_empty(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedMimeTypes')
self.assertEquals(len(result), 0)