Merge pull request #127 from jodal/feature/mpris-frontend

feature/mpris-frontend
This commit is contained in:
Thomas Adamcik 2011-07-31 09:05:08 -07:00
commit a89d89d05f
18 changed files with 1746 additions and 70 deletions

View File

@ -8,3 +8,4 @@ TryExec=mopidy
Exec=mopidy
Terminal=true
Categories=AudioVideo;Audio;Player;ConsoleOnly;
StartupNotify=true

View File

@ -28,3 +28,4 @@ Frontend implementations
* :mod:`mopidy.frontends.lastfm`
* :mod:`mopidy.frontends.mpd`
* :mod:`mopidy.frontends.mpris`

View File

@ -28,6 +28,11 @@ v0.6.0 (in development)
- The MPD command ``idle`` is now supported by Mopidy for the following
subsystems: player, playlist, options, and mixer. (Fixes: :issue:`32`)
- A new frontend :mod:`mopidy.frontends.mpris` have been added. It exposes
Mopidy through the `MPRIS interface <http://www.mpris.org/>`_ over D-Bus. In
practice, this makes it possible to control Mopidy thorugh the `Ubuntu Sound
Menu <https://wiki.ubuntu.com/SoundMenu>`_.
**Changes**
- Replace :attr:`mopidy.backends.base.Backend.uri_handlers` with
@ -127,6 +132,18 @@ Please note that 0.5.0 requires some updated dependencies, as listed under
- Found and worked around strange WMA metadata behaviour.
- Backend API:
- Calling on :meth:`mopidy.backends.base.playback.PlaybackController.next`
and :meth:`mopidy.backends.base.playback.PlaybackController.previous` no
longer implies that playback should be started. The playback state--whether
playing, paused or stopped--will now be kept.
- The method
:meth:`mopidy.backends.base.playback.PlaybackController.change_track`
has been added. Like ``next()``, and ``prev()``, it changes the current
track without changing the playback state.
v0.4.1 (2011-05-06)
===================

View File

@ -9,26 +9,6 @@
:members:
MPD server
==========
.. inheritance-diagram:: mopidy.frontends.mpd.server
.. automodule:: mopidy.frontends.mpd.server
:synopsis: MPD server
:members:
MPD session
===========
.. inheritance-diagram:: mopidy.frontends.mpd.session
.. automodule:: mopidy.frontends.mpd.session
:synopsis: MPD client session
:members:
MPD dispatcher
==============

View File

@ -0,0 +1,7 @@
***********************************************
:mod:`mopidy.frontends.mpris` -- MPRIS frontend
***********************************************
.. automodule:: mopidy.frontends.mpris
:synopsis: MPRIS frontend
:members:

View File

@ -92,7 +92,6 @@ To make a ``tag_cache`` of your local music available for Mopidy:
.. _use_mpd_on_a_network:
Connecting from other machines on the network
=============================================
@ -120,6 +119,33 @@ file::
LASTFM_PASSWORD = u'mysecret'
.. _install_desktop_file:
Controlling Mopidy through the Ubuntu Sound Menu
================================================
If you are running Ubuntu and installed Mopidy using the Debian package from
APT you should be able to control Mopidy through the `Ubuntu Sound Menu
<https://wiki.ubuntu.com/SoundMenu>`_ without any changes.
If you installed Mopidy in any other way and want to control Mopidy through the
Ubuntu Sound Menu, you must install the ``mopidy.desktop`` file which can be
found in the ``data/`` dir of the Mopidy source into the
``/usr/share/applications`` dir by hand::
cd /path/to/mopidy/source
sudo cp data/mopidy.desktop /usr/share/applications/
After you have installed the file, start Mopidy in any way, and Mopidy should
appear in the Ubuntu Sound Menu. When you quit Mopidy, it will still be listed
in the Ubuntu Sound Menu, and may be restarted by selecting it there.
The Ubuntu Sound Menu interacts with Mopidy's MPRIS frontend,
:mod:`mopidy.frontends.mpris`. The MPRIS frontend supports the minimum
requirements of the `MPRIS specification <http://www.mpris.org/>`_. The
``TrackList`` and the ``Playlists`` interfaces of the spec are not supported.
Streaming audio through a SHOUTcast/Icecast server
==================================================

View File

@ -327,6 +327,26 @@ class PlaybackController(object):
def _current_wall_time(self):
return int(time.time() * 1000)
def change_track(self, cp_track, on_error_step=1):
"""
Change to the given track, keeping the current playback state.
:param cp_track: track to change to
:type cp_track: two-tuple (CPID integer, :class:`mopidy.models.Track`)
or :class:`None`
:param on_error_step: direction to step at play error, 1 for next
track (default), -1 for previous track
:type on_error_step: int, -1 or 1
"""
old_state = self.state
self.stop()
self.current_cp_track = cp_track
if old_state == self.PLAYING:
self.play(on_error_step=on_error_step)
elif old_state == self.PAUSED:
self.pause()
def on_end_of_track(self):
"""
Tell the playback controller that end of track is reached.
@ -363,20 +383,23 @@ class PlaybackController(object):
self.stop(clear_current_track=True)
def next(self):
"""Play the next track."""
if self.state == self.STOPPED:
return
"""
Change to the next track.
The current playback state will be kept. If it was playing, playing
will continue. If it was paused, it will still be paused, etc.
"""
if self.cp_track_at_next:
self._trigger_track_playback_ended()
self.play(self.cp_track_at_next)
self.change_track(self.cp_track_at_next)
else:
self.stop(clear_current_track=True)
def pause(self):
"""Pause playback."""
if self.state == self.PLAYING and self.provider.pause():
if self.provider.pause():
self.state = self.PAUSED
self._trigger_track_playback_paused()
def play(self, cp_track=None, on_error_step=1):
"""
@ -393,12 +416,15 @@ class PlaybackController(object):
if cp_track is not None:
assert cp_track in self.backend.current_playlist.cp_tracks
if cp_track is None and self.current_cp_track is None:
cp_track = self.cp_track_at_next
if cp_track is None and self.state == self.PAUSED:
self.resume()
elif cp_track is None:
if self.state == self.PAUSED:
return self.resume()
elif self.current_cp_track is not None:
cp_track = self.current_cp_track
elif self.current_cp_track is None and on_error_step == 1:
cp_track = self.cp_track_at_next
elif self.current_cp_track is None and on_error_step == -1:
cp_track = self.cp_track_at_previous
if cp_track is not None:
self.current_cp_track = cp_track
@ -418,18 +444,20 @@ class PlaybackController(object):
self._trigger_track_playback_started()
def previous(self):
"""Play the previous track."""
if self.cp_track_at_previous is None:
return
if self.state == self.STOPPED:
return
"""
Change to the previous track.
The current playback state will be kept. If it was playing, playing
will continue. If it was paused, it will still be paused, etc.
"""
self._trigger_track_playback_ended()
self.play(self.cp_track_at_previous, on_error_step=-1)
self.change_track(self.cp_track_at_previous, on_error_step=-1)
def resume(self):
"""If paused, resume playing the current track."""
if self.state == self.PAUSED and self.provider.resume():
self.state = self.PLAYING
self._trigger_track_playback_resumed()
def seek(self, time_position):
"""
@ -456,7 +484,10 @@ class PlaybackController(object):
self.play_time_started = self._current_wall_time
self.play_time_accumulated = time_position
return self.provider.seek(time_position)
success = self.provider.seek(time_position)
if success:
self._trigger_seeked()
return success
def stop(self, clear_current_track=False):
"""
@ -473,6 +504,22 @@ class PlaybackController(object):
if clear_current_track:
self.current_cp_track = None
def _trigger_track_playback_paused(self):
logger.debug(u'Triggering track playback paused event')
if self.current_track is None:
return
BackendListener.send('track_playback_paused',
track=self.current_track,
time_position=self.time_position)
def _trigger_track_playback_resumed(self):
logger.debug(u'Triggering track playback resumed event')
if self.current_track is None:
return
BackendListener.send('track_playback_resumed',
track=self.current_track,
time_position=self.time_position)
def _trigger_track_playback_started(self):
logger.debug(u'Triggering track playback started event')
if self.current_track is None:
@ -496,6 +543,10 @@ class PlaybackController(object):
logger.debug(u'Triggering options changed event')
BackendListener.send('options_changed')
def _trigger_seeked(self):
logger.debug(u'Triggering seeked event')
BackendListener.send('seeked')
class BasePlaybackProvider(object):
"""

View File

@ -0,0 +1,130 @@
import logging
logger = logging.getLogger('mopidy.frontends.mpris')
try:
import indicate
except ImportError as import_error:
indicate = None
logger.debug(u'Startup notification will not be sent (%s)', import_error)
from pykka.actor import ThreadingActor
from mopidy import settings
from mopidy.frontends.mpris import objects
from mopidy.listeners import BackendListener
class MprisFrontend(ThreadingActor, BackendListener):
"""
Frontend which lets you control Mopidy through the Media Player Remote
Interfacing Specification (`MPRIS <http://www.mpris.org/>`_) D-Bus
interface.
An example of an MPRIS client is the `Ubuntu Sound Menu
<https://wiki.ubuntu.com/SoundMenu>`_.
**Dependencies:**
- D-Bus Python bindings. The package is named ``python-dbus`` in
Ubuntu/Debian.
- ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the
Ubuntu Sound Menu. The package is named ``python-indicate`` in
Ubuntu/Debian.
- An ``.desktop`` file for Mopidy installed at the path set in
:attr:`mopidy.settings.DESKTOP_FILE`. See :ref:`install_desktop_file` for
details.
**Testing the frontend**
To test, start Mopidy, and then run the following in a Python shell::
import dbus
bus = dbus.SessionBus()
player = bus.get_object('org.mpris.MediaPlayer2.mopidy',
'/org/mpris/MediaPlayer2')
Now you can control Mopidy through the player object. Examples:
- To get some properties from Mopidy, run::
props = player.GetAll('org.mpris.MediaPlayer2',
dbus_interface='org.freedesktop.DBus.Properties')
- To quit Mopidy through D-Bus, run::
player.Quit(dbus_interface='org.mpris.MediaPlayer2')
"""
def __init__(self):
self.indicate_server = None
self.mpris_object = None
def on_start(self):
try:
self.mpris_object = objects.MprisObject()
self._send_startup_notification()
except Exception as e:
logger.error(u'MPRIS frontend setup failed (%s)', e)
self.stop()
def on_stop(self):
logger.debug(u'Removing MPRIS object from D-Bus connection...')
if self.mpris_object:
self.mpris_object.remove_from_connection()
self.mpris_object = None
logger.debug(u'Removed MPRIS object from D-Bus connection')
def _send_startup_notification(self):
"""
Send startup notification using libindicate to make Mopidy appear in
e.g. `Ubuntu's sound menu <https://wiki.ubuntu.com/SoundMenu>`_.
A reference to the libindicate server is kept for as long as Mopidy is
running. When Mopidy exits, the server will be unreferenced and Mopidy
will automatically be unregistered from e.g. the sound menu.
"""
if not indicate:
return
logger.debug(u'Sending startup notification...')
self.indicate_server = indicate.Server()
self.indicate_server.set_type('music.mopidy')
self.indicate_server.set_desktop_file(settings.DESKTOP_FILE)
self.indicate_server.show()
logger.debug(u'Startup notification sent')
def _emit_properties_changed(self, *changed_properties):
if self.mpris_object is None:
return
props_with_new_values = [
(p, self.mpris_object.Get(objects.PLAYER_IFACE, p))
for p in changed_properties]
self.mpris_object.PropertiesChanged(objects.PLAYER_IFACE,
dict(props_with_new_values), [])
def track_playback_paused(self, track, time_position):
logger.debug(u'Received track playback paused event')
self._emit_properties_changed('PlaybackStatus')
def track_playback_resumed(self, track, time_position):
logger.debug(u'Received track playback resumed event')
self._emit_properties_changed('PlaybackStatus')
def track_playback_started(self, track):
logger.debug(u'Received track playback started event')
self._emit_properties_changed('PlaybackStatus', 'Metadata')
def track_playback_ended(self, track, time_position):
logger.debug(u'Received track playback ended event')
self._emit_properties_changed('PlaybackStatus', 'Metadata')
def volume_changed(self):
logger.debug(u'Received volume changed event')
self._emit_properties_changed('Volume')
def seeked(self):
logger.debug(u'Received seeked event')
if self.mpris_object is None:
return
self.mpris_object.Seeked(
self.mpris_object.Get(objects.PLAYER_IFACE, 'Position'))

View File

@ -0,0 +1,436 @@
import logging
import os
logger = logging.getLogger('mopidy.frontends.mpris')
try:
import dbus
import dbus.mainloop.glib
import dbus.service
import gobject
except ImportError as import_error:
from mopidy import OptionalDependencyError
raise OptionalDependencyError(import_error)
from pykka.registry import ActorRegistry
from mopidy import settings
from mopidy.backends.base import Backend
from mopidy.backends.base.playback import PlaybackController
from mopidy.mixers.base import BaseMixer
from mopidy.utils.process import exit_process
# Must be done before dbus.SessionBus() is called
gobject.threads_init()
dbus.mainloop.glib.threads_init()
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
BUS_NAME = 'org.mpris.MediaPlayer2.mopidy'
OBJECT_PATH = '/org/mpris/MediaPlayer2'
ROOT_IFACE = 'org.mpris.MediaPlayer2'
PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player'
class MprisObject(dbus.service.Object):
"""Implements http://www.mpris.org/2.1/spec/"""
properties = None
def __init__(self):
self._backend = None
self._mixer = None
self.properties = {
ROOT_IFACE: self._get_root_iface_properties(),
PLAYER_IFACE: self._get_player_iface_properties(),
}
bus_name = self._connect_to_dbus()
super(MprisObject, self).__init__(bus_name, OBJECT_PATH)
def _get_root_iface_properties(self):
return {
'CanQuit': (True, None),
'CanRaise': (False, None),
# NOTE Change if adding optional track list support
'HasTrackList': (False, None),
'Identity': ('Mopidy', None),
'DesktopEntry': (self.get_DesktopEntry, None),
'SupportedUriSchemes': (self.get_SupportedUriSchemes, None),
# NOTE Return MIME types supported by local backend if support for
# reporting supported MIME types is added
'SupportedMimeTypes': (dbus.Array([], signature='s'), None),
}
def _get_player_iface_properties(self):
return {
'PlaybackStatus': (self.get_PlaybackStatus, None),
'LoopStatus': (self.get_LoopStatus, self.set_LoopStatus),
'Rate': (1.0, self.set_Rate),
'Shuffle': (self.get_Shuffle, self.set_Shuffle),
'Metadata': (self.get_Metadata, None),
'Volume': (self.get_Volume, self.set_Volume),
'Position': (self.get_Position, None),
'MinimumRate': (1.0, None),
'MaximumRate': (1.0, None),
'CanGoNext': (self.get_CanGoNext, None),
'CanGoPrevious': (self.get_CanGoPrevious, None),
'CanPlay': (self.get_CanPlay, None),
'CanPause': (self.get_CanPause, None),
'CanSeek': (self.get_CanSeek, None),
'CanControl': (self.get_CanControl, None),
}
def _connect_to_dbus(self):
logger.debug(u'Connecting to D-Bus...')
bus_name = dbus.service.BusName(BUS_NAME, dbus.SessionBus())
logger.info(u'Connected to D-Bus')
return bus_name
@property
def backend(self):
if self._backend is None:
backend_refs = ActorRegistry.get_by_class(Backend)
assert len(backend_refs) == 1, \
'Expected exactly one running backend.'
self._backend = backend_refs[0].proxy()
return self._backend
@property
def mixer(self):
if self._mixer is None:
mixer_refs = ActorRegistry.get_by_class(BaseMixer)
assert len(mixer_refs) == 1, 'Expected exactly one running mixer.'
self._mixer = mixer_refs[0].proxy()
return self._mixer
def _get_track_id(self, cp_track):
return '/com/mopidy/track/%d' % cp_track.cpid
def _get_cpid(self, track_id):
assert track_id.startswith('/com/mopidy/track/')
return track_id.split('/')[-1]
### Properties interface
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='ss', out_signature='v')
def Get(self, interface, prop):
logger.debug(u'%s.Get(%s, %s) called',
dbus.PROPERTIES_IFACE, repr(interface), repr(prop))
(getter, setter) = self.properties[interface][prop]
if callable(getter):
return getter()
else:
return getter
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
logger.debug(u'%s.GetAll(%s) called',
dbus.PROPERTIES_IFACE, repr(interface))
getters = {}
for key, (getter, setter) in self.properties[interface].iteritems():
getters[key] = getter() if callable(getter) else getter
return getters
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='ssv', out_signature='')
def Set(self, interface, prop, value):
logger.debug(u'%s.Set(%s, %s, %s) called',
dbus.PROPERTIES_IFACE, repr(interface), repr(prop), repr(value))
getter, setter = self.properties[interface][prop]
if setter is not None:
setter(value)
self.PropertiesChanged(interface,
{prop: self.Get(interface, prop)}, [])
@dbus.service.signal(dbus_interface=dbus.PROPERTIES_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed_properties,
invalidated_properties):
logger.debug(u'%s.PropertiesChanged(%s, %s, %s) signaled',
dbus.PROPERTIES_IFACE, interface, changed_properties,
invalidated_properties)
### Root interface methods
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Raise(self):
logger.debug(u'%s.Raise called', ROOT_IFACE)
# Do nothing, as we do not have a GUI
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Quit(self):
logger.debug(u'%s.Quit called', ROOT_IFACE)
exit_process()
### Root interface properties
def get_DesktopEntry(self):
return os.path.splitext(os.path.basename(settings.DESKTOP_FILE))[0]
def get_SupportedUriSchemes(self):
return dbus.Array(self.backend.uri_schemes.get(), signature='s')
### Player interface methods
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Next(self):
logger.debug(u'%s.Next called', PLAYER_IFACE)
if not self.get_CanGoNext():
logger.debug(u'%s.Next not allowed', PLAYER_IFACE)
return
self.backend.playback.next().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Previous(self):
logger.debug(u'%s.Previous called', PLAYER_IFACE)
if not self.get_CanGoPrevious():
logger.debug(u'%s.Previous not allowed', PLAYER_IFACE)
return
self.backend.playback.previous().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Pause(self):
logger.debug(u'%s.Pause called', PLAYER_IFACE)
if not self.get_CanPause():
logger.debug(u'%s.Pause not allowed', PLAYER_IFACE)
return
self.backend.playback.pause().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def PlayPause(self):
logger.debug(u'%s.PlayPause called', PLAYER_IFACE)
if not self.get_CanPause():
logger.debug(u'%s.PlayPause not allowed', PLAYER_IFACE)
return
state = self.backend.playback.state.get()
if state == PlaybackController.PLAYING:
self.backend.playback.pause().get()
elif state == PlaybackController.PAUSED:
self.backend.playback.resume().get()
elif state == PlaybackController.STOPPED:
self.backend.playback.play().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Stop(self):
logger.debug(u'%s.Stop called', PLAYER_IFACE)
if not self.get_CanControl():
logger.debug(u'%s.Stop not allowed', PLAYER_IFACE)
return
self.backend.playback.stop().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Play(self):
logger.debug(u'%s.Play called', PLAYER_IFACE)
if not self.get_CanPlay():
logger.debug(u'%s.Play not allowed', PLAYER_IFACE)
return
state = self.backend.playback.state.get()
if state == PlaybackController.PAUSED:
self.backend.playback.resume().get()
else:
self.backend.playback.play().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Seek(self, offset):
logger.debug(u'%s.Seek called', PLAYER_IFACE)
if not self.get_CanSeek():
logger.debug(u'%s.Seek not allowed', PLAYER_IFACE)
return
offset_in_milliseconds = offset // 1000
current_position = self.backend.playback.time_position.get()
new_position = current_position + offset_in_milliseconds
self.backend.playback.seek(new_position)
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def SetPosition(self, track_id, position):
logger.debug(u'%s.SetPosition called', PLAYER_IFACE)
if not self.get_CanSeek():
logger.debug(u'%s.SetPosition not allowed', PLAYER_IFACE)
return
position = position // 1000
current_cp_track = self.backend.playback.current_cp_track.get()
if current_cp_track is None:
return
if track_id != self._get_track_id(current_cp_track):
return
if position < 0:
return
if current_cp_track.track.length < position:
return
self.backend.playback.seek(position)
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def OpenUri(self, uri):
logger.debug(u'%s.OpenUri called', PLAYER_IFACE)
if not self.get_CanPlay():
# NOTE The spec does not explictly require this check, but guarding
# the other methods doesn't help much if OpenUri is open for use.
logger.debug(u'%s.Play not allowed', PLAYER_IFACE)
return
# NOTE Check if URI has MIME type known to the backend, if MIME support
# is added to the backend.
uri_schemes = self.backend.uri_schemes.get()
if not any([uri.startswith(uri_scheme) for uri_scheme in uri_schemes]):
return
track = self.backend.library.lookup(uri).get()
if track is not None:
cp_track = self.backend.current_playlist.add(track).get()
self.backend.playback.play(cp_track)
else:
logger.debug(u'Track with URI "%s" not found in library.', uri)
### Player interface signals
@dbus.service.signal(dbus_interface=PLAYER_IFACE, signature='x')
def Seeked(self, position):
logger.debug(u'%s.Seeked signaled', PLAYER_IFACE)
# Do nothing, as just calling the method is enough to emit the signal.
### Player interface properties
def get_PlaybackStatus(self):
state = self.backend.playback.state.get()
if state == PlaybackController.PLAYING:
return 'Playing'
elif state == PlaybackController.PAUSED:
return 'Paused'
elif state == PlaybackController.STOPPED:
return 'Stopped'
def get_LoopStatus(self):
repeat = self.backend.playback.repeat.get()
single = self.backend.playback.single.get()
if not repeat:
return 'None'
else:
if single:
return 'Track'
else:
return 'Playlist'
def set_LoopStatus(self, value):
if not self.get_CanControl():
logger.debug(u'Setting %s.LoopStatus not allowed', PLAYER_IFACE)
return
if value == 'None':
self.backend.playback.repeat = False
self.backend.playback.single = False
elif value == 'Track':
self.backend.playback.repeat = True
self.backend.playback.single = True
elif value == 'Playlist':
self.backend.playback.repeat = True
self.backend.playback.single = False
def set_Rate(self, value):
if not self.get_CanControl():
# NOTE The spec does not explictly require this check, but it was
# added to be consistent with all the other property setters.
logger.debug(u'Setting %s.Rate not allowed', PLAYER_IFACE)
return
if value == 0:
self.Pause()
def get_Shuffle(self):
return self.backend.playback.random.get()
def set_Shuffle(self, value):
if not self.get_CanControl():
logger.debug(u'Setting %s.Shuffle not allowed', PLAYER_IFACE)
return
if value:
self.backend.playback.random = True
else:
self.backend.playback.random = False
def get_Metadata(self):
current_cp_track = self.backend.playback.current_cp_track.get()
if current_cp_track is None:
return {'mpris:trackid': ''}
else:
(cpid, track) = current_cp_track
metadata = {'mpris:trackid': self._get_track_id(current_cp_track)}
if track.length:
metadata['mpris:length'] = track.length * 1000
if track.uri:
metadata['xesam:url'] = track.uri
if track.name:
metadata['xesam:title'] = track.name
if track.artists:
artists = list(track.artists)
artists.sort(key=lambda a: a.name)
metadata['xesam:artist'] = dbus.Array(
[a.name for a in artists if a.name], signature='s')
if track.album and track.album.name:
metadata['xesam:album'] = track.album.name
if track.album and track.album.artists:
artists = list(track.album.artists)
artists.sort(key=lambda a: a.name)
metadata['xesam:albumArtist'] = dbus.Array(
[a.name for a in artists if a.name], signature='s')
if track.track_no:
metadata['xesam:trackNumber'] = track.track_no
return dbus.Dictionary(metadata, signature='sv')
def get_Volume(self):
volume = self.mixer.volume.get()
if volume is not None:
return volume / 100.0
def set_Volume(self, value):
if not self.get_CanControl():
logger.debug(u'Setting %s.Volume not allowed', PLAYER_IFACE)
return
if value is None:
return
elif value < 0:
self.mixer.volume = 0
elif value > 1:
self.mixer.volume = 100
elif 0 <= value <= 1:
self.mixer.volume = int(value * 100)
def get_Position(self):
return self.backend.playback.time_position.get() * 1000
def get_CanGoNext(self):
if not self.get_CanControl():
return False
return (self.backend.playback.cp_track_at_next.get() !=
self.backend.playback.current_cp_track.get())
def get_CanGoPrevious(self):
if not self.get_CanControl():
return False
return (self.backend.playback.cp_track_at_previous.get() !=
self.backend.playback.current_cp_track.get())
def get_CanPlay(self):
if not self.get_CanControl():
return False
return (self.backend.playback.current_track.get() is not None
or self.backend.playback.track_at_next.get() is not None)
def get_CanPause(self):
if not self.get_CanControl():
return False
# NOTE Should be changed to vary based on capabilities of the current
# track if Mopidy starts supporting non-seekable media, like streams.
return True
def get_CanSeek(self):
if not self.get_CanControl():
return False
# NOTE Should be changed to vary based on capabilities of the current
# track if Mopidy starts supporting non-seekable media, like streams.
return True
def get_CanControl(self):
# NOTE This could be a setting for the end user to change.
return True

View File

@ -14,8 +14,8 @@ class BackendListener(object):
@staticmethod
def send(event, **kwargs):
"""Helper to allow calling of backend listener events"""
# FIXME this should be updated once pykka supports non-blocking calls
# on proxies or some similar solution
# FIXME this should be updated once Pykka supports non-blocking calls
# on proxies or some similar solution.
registry.ActorRegistry.broadcast({
'command': 'pykka_call',
'attr_path': (event,),
@ -23,6 +23,33 @@ class BackendListener(object):
'kwargs': kwargs,
}, target_class=BackendListener)
def track_playback_paused(self, track, time_position):
"""
Called whenever track playback is paused.
*MAY* be implemented by actor.
:param track: the track that was playing when playback paused
:type track: :class:`mopidy.models.Track`
:param time_position: the time position in milliseconds
:type time_position: int
"""
pass
def track_playback_resumed(self, track, time_position):
"""
Called whenever track playback is resumed.
*MAY* be implemented by actor.
:param track: the track that was playing when playback resumed
:type track: :class:`mopidy.models.Track`
:param time_position: the time position in milliseconds
:type time_position: int
"""
pass
def track_playback_started(self, track):
"""
Called whenever a new track starts playing.
@ -78,3 +105,12 @@ class BackendListener(object):
*MAY* be implemented by actor.
"""
pass
def seeked(self):
"""
Called whenever the time position changes by an unexpected amount, e.g.
at seek to a new time position.
*MAY* be implemented by actor.
"""
pass

View File

@ -49,6 +49,15 @@ DEBUG_LOG_FORMAT = u'%(levelname)-8s %(asctime)s' + \
#: DEBUG_LOG_FILENAME = u'mopidy.log'
DEBUG_LOG_FILENAME = u'mopidy.log'
#: Location of the Mopidy .desktop file.
#:
#: Used by :mod:`mopidy.frontends.mpris`.
#:
#: Default::
#:
#: DESKTOP_FILE = u'/usr/share/applications/mopidy.desktop'
DESKTOP_FILE = u'/usr/share/applications/mopidy.desktop'
#: List of server frontends to use.
#:
#: Default::
@ -56,10 +65,12 @@ DEBUG_LOG_FILENAME = u'mopidy.log'
#: FRONTENDS = (
#: u'mopidy.frontends.mpd.MpdFrontend',
#: u'mopidy.frontends.lastfm.LastfmFrontend',
#: u'mopidy.frontends.mpris.MprisFrontend',
#: )
FRONTENDS = (
u'mopidy.frontends.mpd.MpdFrontend',
u'mopidy.frontends.lastfm.LastfmFrontend',
u'mopidy.frontends.mpris.MprisFrontend',
)
#: Your `Last.fm <http://www.last.fm/>`_ username.

View File

@ -555,7 +555,7 @@ class PlaybackControllerTest(object):
@populate_playlist
def test_pause_when_stopped(self):
self.playback.pause()
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.state, self.playback.PAUSED)
@populate_playlist
def test_pause_when_playing(self):

View File

@ -1,45 +1,51 @@
import threading
import mock
import unittest
from pykka.actor import ThreadingActor
from pykka.registry import ActorRegistry
from mopidy.backends.dummy import DummyBackend
from mopidy.listeners import BackendListener
from mopidy.models import Track
@mock.patch.object(BackendListener, 'send')
class BackendEventsTest(unittest.TestCase):
def setUp(self):
self.events = {
'track_playback_started': threading.Event(),
'track_playback_ended': threading.Event(),
}
self.backend = DummyBackend.start().proxy()
self.listener = DummyBackendListener.start(self.events).proxy()
def tearDown(self):
ActorRegistry.stop_all()
def test_play_sends_track_playback_started_event(self):
self.backend.current_playlist.add([Track(uri='a')])
def test_pause_sends_track_playback_paused_event(self, send):
self.backend.current_playlist.add(Track(uri='a'))
self.backend.playback.play().get()
send.reset_mock()
self.backend.playback.pause().get()
self.assertEqual(send.call_args[0][0], 'track_playback_paused')
def test_resume_sends_track_playback_resumed(self, send):
self.backend.current_playlist.add(Track(uri='a'))
self.backend.playback.play()
self.events['track_playback_started'].wait(timeout=1)
self.assertTrue(self.events['track_playback_started'].is_set())
self.backend.playback.pause().get()
send.reset_mock()
self.backend.playback.resume().get()
self.assertEqual(send.call_args[0][0], 'track_playback_resumed')
def test_stop_sends_track_playback_ended_event(self):
self.backend.current_playlist.add([Track(uri='a')])
self.backend.playback.play()
self.backend.playback.stop()
self.events['track_playback_ended'].wait(timeout=1)
self.assertTrue(self.events['track_playback_ended'].is_set())
def test_play_sends_track_playback_started_event(self, send):
self.backend.current_playlist.add(Track(uri='a'))
send.reset_mock()
self.backend.playback.play().get()
self.assertEqual(send.call_args[0][0], 'track_playback_started')
def test_stop_sends_track_playback_ended_event(self, send):
self.backend.current_playlist.add(Track(uri='a'))
self.backend.playback.play().get()
send.reset_mock()
self.backend.playback.stop().get()
self.assertEqual(send.call_args_list[0][0][0], 'track_playback_ended')
class DummyBackendListener(ThreadingActor, BackendListener):
def __init__(self, events):
self.events = events
def track_playback_started(self, track):
self.events['track_playback_started'].set()
def track_playback_ended(self, track, time_position):
self.events['track_playback_ended'].set()
def test_seek_sends_seeked_event(self, send):
self.backend.current_playlist.add(Track(uri='a', length=40000))
self.backend.playback.play().get()
send.reset_mock()
self.backend.playback.seek(1000).get()
self.assertEqual(send.call_args[0][0], 'seeked')

View File

View File

@ -0,0 +1,68 @@
import mock
import unittest
from mopidy.frontends.mpris import MprisFrontend, objects
from mopidy.models import Track
class BackendEventsTest(unittest.TestCase):
def setUp(self):
self.mpris_frontend = MprisFrontend() # As a plain class, not an actor
self.mpris_object = mock.Mock(spec=objects.MprisObject)
self.mpris_frontend.mpris_object = self.mpris_object
def test_track_playback_paused_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Paused'
self.mpris_frontend.track_playback_paused(Track(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, [])
def test_track_playback_resumed_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Playing'
self.mpris_frontend.track_playback_resumed(Track(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, [])
def test_track_playback_started_event_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_started(Track())
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_track_playback_ended_event_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_ended(Track(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_volume_changed_event_changes_volume(self):
self.mpris_object.Get.return_value = 1.0
self.mpris_frontend.volume_changed()
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'Volume'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'Volume': 1.0}, [])
def test_seeked_event_causes_mpris_seeked_event(self):
self.mpris_object.Get.return_value = 31000000
self.mpris_frontend.seeked()
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'Position'), {}),
])
self.mpris_object.Seeked.assert_called_with(31000000)

View File

@ -0,0 +1,824 @@
import mock
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.backends.base.playback import PlaybackController
from mopidy.frontends.mpris import objects
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Album, Artist, Track
PLAYING = PlaybackController.PLAYING
PAUSED = PlaybackController.PAUSED
STOPPED = PlaybackController.STOPPED
class PlayerInterfaceTest(unittest.TestCase):
def setUp(self):
objects.MprisObject._connect_to_dbus = mock.Mock()
self.mixer = DummyMixer.start().proxy()
self.backend = DummyBackend.start().proxy()
self.mpris = objects.MprisObject()
self.mpris._backend = self.backend
def tearDown(self):
self.backend.stop()
self.mixer.stop()
def test_get_playback_status_is_playing_when_playing(self):
self.backend.playback.state = PLAYING
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Playing', result)
def test_get_playback_status_is_paused_when_paused(self):
self.backend.playback.state = PAUSED
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Paused', result)
def test_get_playback_status_is_stopped_when_stopped(self):
self.backend.playback.state = STOPPED
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Stopped', result)
def test_get_loop_status_is_none_when_not_looping(self):
self.backend.playback.repeat = False
self.backend.playback.single = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('None', result)
def test_get_loop_status_is_track_when_looping_a_single_track(self):
self.backend.playback.repeat = True
self.backend.playback.single = True
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Track', result)
def test_get_loop_status_is_playlist_when_looping_the_current_playlist(self):
self.backend.playback.repeat = True
self.backend.playback.single = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Playlist', result)
def test_set_loop_status_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.playback.repeat = True
self.backend.playback.single = True
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEquals(self.backend.playback.repeat.get(), True)
self.assertEquals(self.backend.playback.single.get(), True)
def test_set_loop_status_to_none_unsets_repeat_and_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEquals(self.backend.playback.repeat.get(), False)
self.assertEquals(self.backend.playback.single.get(), False)
def test_set_loop_status_to_track_sets_repeat_and_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Track')
self.assertEquals(self.backend.playback.repeat.get(), True)
self.assertEquals(self.backend.playback.single.get(), True)
def test_set_loop_status_to_playlists_sets_repeat_and_not_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Playlist')
self.assertEquals(self.backend.playback.repeat.get(), True)
self.assertEquals(self.backend.playback.single.get(), False)
def test_get_rate_is_greater_or_equal_than_minimum_rate(self):
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
minimum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assert_(rate >= minimum_rate)
def test_get_rate_is_less_or_equal_than_maximum_rate(self):
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
maximum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assert_(rate >= maximum_rate)
def test_set_rate_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_set_rate_to_zero_pauses_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEquals(self.backend.playback.state.get(), PAUSED)
def test_get_shuffle_returns_true_if_random_is_active(self):
self.backend.playback.random = True
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertTrue(result)
def test_get_shuffle_returns_false_if_random_is_inactive(self):
self.backend.playback.random = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertFalse(result)
def test_set_shuffle_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.playback.random = False
result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertFalse(self.backend.playback.random.get())
def test_set_shuffle_to_true_activates_random_mode(self):
self.backend.playback.random = False
self.assertFalse(self.backend.playback.random.get())
result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertTrue(self.backend.playback.random.get())
def test_set_shuffle_to_false_deactivates_random_mode(self):
self.backend.playback.random = True
self.assertTrue(self.backend.playback.random.get())
result = self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', False)
self.assertFalse(self.backend.playback.random.get())
def test_get_metadata_has_trackid_even_when_no_current_track(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assert_('mpris:trackid' in result.keys())
self.assertEquals(result['mpris:trackid'], '')
def test_get_metadata_has_trackid_based_on_cpid(self):
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.play()
(cpid, track) = self.backend.playback.current_cp_track.get()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:trackid', result.keys())
self.assertEquals(result['mpris:trackid'],
'/com/mopidy/track/%d' % cpid)
def test_get_metadata_has_track_length(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:length', result.keys())
self.assertEquals(result['mpris:length'], 40000000)
def test_get_metadata_has_track_uri(self):
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:url', result.keys())
self.assertEquals(result['xesam:url'], 'a')
def test_get_metadata_has_track_title(self):
self.backend.current_playlist.append([Track(name='a')])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:title', result.keys())
self.assertEquals(result['xesam:title'], 'a')
def test_get_metadata_has_track_artists(self):
self.backend.current_playlist.append([Track(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)])])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:artist', result.keys())
self.assertEquals(result['xesam:artist'], ['a', 'b'])
def test_get_metadata_has_track_album(self):
self.backend.current_playlist.append([Track(album=Album(name='a'))])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:album', result.keys())
self.assertEquals(result['xesam:album'], 'a')
def test_get_metadata_has_track_album_artists(self):
self.backend.current_playlist.append([Track(album=Album(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)]))])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:albumArtist', result.keys())
self.assertEquals(result['xesam:albumArtist'], ['a', 'b'])
def test_get_metadata_has_track_number_in_album(self):
self.backend.current_playlist.append([Track(track_no=7)])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:trackNumber', result.keys())
self.assertEquals(result['xesam:trackNumber'], 7)
def test_get_volume_should_return_volume_between_zero_and_one(self):
self.mixer.volume = 0
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEquals(result, 0)
self.mixer.volume = 50
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEquals(result, 0.5)
self.mixer.volume = 100
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEquals(result, 1)
def test_set_volume_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.mixer.volume = 0
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEquals(self.mixer.volume.get(), 0)
def test_set_volume_to_one_should_set_mixer_volume_to_100(self):
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEquals(self.mixer.volume.get(), 100)
def test_set_volume_to_anything_above_one_should_set_mixer_volume_to_100(self):
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 2.0)
self.assertEquals(self.mixer.volume.get(), 100)
def test_set_volume_to_anything_not_a_number_does_not_change_volume(self):
self.mixer.volume = 10
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', None)
self.assertEquals(self.mixer.volume.get(), 10)
def test_get_position_returns_time_position_in_microseconds(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
self.backend.playback.seek(10000)
result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assert_(result_in_milliseconds >= 10000)
def test_get_position_when_no_current_track_should_be_zero(self):
result_in_microseconds = self.mpris.Get(objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assertEquals(result_in_milliseconds, 0)
def test_get_minimum_rate_is_one_or_less(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assert_(result <= 1.0)
def test_get_maximum_rate_is_one_or_more(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assert_(result >= 1.0)
def test_can_go_next_is_true_if_can_control_and_other_next_track(self):
self.mpris.get_CanControl = lambda *_: True
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertTrue(result)
def test_can_go_next_is_false_if_next_track_is_the_same(self):
self.mpris.get_CanControl = lambda *_: True
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.repeat = True
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_next_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_previous_is_true_if_can_control_and_other_previous_track(self):
self.mpris.get_CanControl = lambda *_: True
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertTrue(result)
def test_can_go_previous_is_false_if_previous_track_is_the_same(self):
self.mpris.get_CanControl = lambda *_: True
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.repeat = True
self.backend.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_go_previous_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_play_is_true_if_can_control_and_current_track(self):
self.mpris.get_CanControl = lambda *_: True
self.backend.current_playlist.append([Track(uri='a')])
self.backend.playback.play()
self.assertTrue(self.backend.playback.current_track.get())
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertTrue(result)
def test_can_play_is_false_if_no_current_track(self):
self.mpris.get_CanControl = lambda *_: True
self.assertFalse(self.backend.playback.current_track.get())
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_play_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_pause_is_true_if_can_control_and_track_can_be_paused(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertTrue(result)
def test_can_pause_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertFalse(result)
def test_can_seek_is_true_if_can_control_is_true(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertTrue(result)
def test_can_seek_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertFalse(result)
def test_can_control_is_true(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanControl')
self.assertTrue(result)
def test_next_is_ignored_if_can_go_next_is_false(self):
self.mpris.get_CanGoNext = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.mpris.Next()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
def test_next_when_playing_should_skip_to_next_track_and_keep_playing(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Next()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_next_when_at_end_of_list_should_stop_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Next()
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_next_when_paused_should_skip_to_next_track_and_stay_paused(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.pause()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.assertEquals(self.backend.playback.state.get(), PAUSED)
self.mpris.Next()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.assertEquals(self.backend.playback.state.get(), PAUSED)
def test_next_when_stopped_should_skip_to_next_track_and_stay_stopped(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.stop()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.assertEquals(self.backend.playback.state.get(), STOPPED)
self.mpris.Next()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_previous_is_ignored_if_can_go_previous_is_false(self):
self.mpris.get_CanGoPrevious = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.mpris.Previous()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
def test_previous_when_playing_should_skip_to_prev_track_and_keep_playing(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Previous()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_previous_when_at_start_of_list_should_stop_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Previous()
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_previous_when_paused_should_skip_to_previous_track_and_stay_paused(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
self.backend.playback.pause()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.assertEquals(self.backend.playback.state.get(), PAUSED)
self.mpris.Previous()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.assertEquals(self.backend.playback.state.get(), PAUSED)
def test_previous_when_stopped_should_skip_to_previous_track_and_stay_stopped(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.next()
self.backend.playback.stop()
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
self.assertEquals(self.backend.playback.state.get(), STOPPED)
self.mpris.Previous()
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_pause_is_ignored_if_can_pause_is_false(self):
self.mpris.get_CanPause = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Pause()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_pause_when_playing_should_pause_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Pause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
def test_pause_when_paused_has_no_effect(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.pause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
self.mpris.Pause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
def test_playpause_is_ignored_if_can_pause_is_false(self):
self.mpris.get_CanPause = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.PlayPause()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_playpause_when_playing_should_pause_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.PlayPause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
def test_playpause_when_paused_should_resume_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.pause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
at_pause = self.backend.playback.time_position.get()
self.assert_(at_pause >= 0)
self.mpris.PlayPause()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
after_pause = self.backend.playback.time_position.get()
self.assert_(after_pause >= at_pause)
def test_playpause_when_stopped_should_start_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.assertEquals(self.backend.playback.state.get(), STOPPED)
self.mpris.PlayPause()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_stop_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Stop()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_stop_when_playing_should_stop_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.mpris.Stop()
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_stop_when_paused_should_stop_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.pause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
self.mpris.Stop()
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_play_is_ignored_if_can_play_is_false(self):
self.mpris.get_CanPlay = lambda *_: False
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.assertEquals(self.backend.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_play_when_stopped_starts_playback(self):
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.assertEquals(self.backend.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
def test_play_after_pause_resumes_from_same_position(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
before_pause = self.backend.playback.time_position.get()
self.assert_(before_pause >= 0)
self.mpris.Pause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
at_pause = self.backend.playback.time_position.get()
self.assert_(at_pause >= before_pause)
self.mpris.Play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
after_pause = self.backend.playback.time_position.get()
self.assert_(after_pause >= at_pause)
def test_play_when_there_is_no_track_has_no_effect(self):
self.backend.current_playlist.clear()
self.assertEquals(self.backend.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEquals(self.backend.playback.state.get(), STOPPED)
def test_seek_is_ignored_if_can_seek_is_false(self):
self.mpris.get_CanSeek = lambda *_: False
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
before_seek = self.backend.playback.time_position.get()
self.assert_(before_seek >= 0)
milliseconds_to_seek = 10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
after_seek = self.backend.playback.time_position.get()
self.assert_(before_seek <= after_seek < (
before_seek + milliseconds_to_seek))
def test_seek_seeks_given_microseconds_forward_in_the_current_track(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
before_seek = self.backend.playback.time_position.get()
self.assert_(before_seek >= 0)
milliseconds_to_seek = 10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
after_seek = self.backend.playback.time_position.get()
self.assert_(after_seek >= (before_seek + milliseconds_to_seek))
def test_seek_seeks_given_microseconds_backward_if_negative(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
self.backend.playback.seek(20000)
before_seek = self.backend.playback.time_position.get()
self.assert_(before_seek >= 20000)
milliseconds_to_seek = -10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
after_seek = self.backend.playback.time_position.get()
self.assert_(after_seek >= (before_seek + milliseconds_to_seek))
self.assert_(after_seek < before_seek)
def test_seek_seeks_to_start_of_track_if_new_position_is_negative(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
self.backend.playback.seek(20000)
before_seek = self.backend.playback.time_position.get()
self.assert_(before_seek >= 20000)
milliseconds_to_seek = -30000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
after_seek = self.backend.playback.time_position.get()
self.assert_(after_seek >= (before_seek + milliseconds_to_seek))
self.assert_(after_seek < before_seek)
self.assert_(after_seek >= 0)
def test_seek_skips_to_next_track_if_new_position_larger_than_track_length(self):
self.backend.current_playlist.append([Track(uri='a', length=40000),
Track(uri='b')])
self.backend.playback.play()
self.backend.playback.seek(20000)
before_seek = self.backend.playback.time_position.get()
self.assert_(before_seek >= 20000)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
milliseconds_to_seek = 50000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'b')
after_seek = self.backend.playback.time_position.get()
self.assert_(after_seek >= 0)
self.assert_(after_seek < before_seek)
def test_set_position_is_ignored_if_can_seek_is_false(self):
self.mpris.get_CanSeek = lambda *_: False
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
before_set_position = self.backend.playback.time_position.get()
self.assert_(before_set_position <= 5000)
track_id = 'a'
position_to_set_in_milliseconds = 20000
position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
after_set_position = self.backend.playback.time_position.get()
self.assert_(before_set_position <= after_set_position <
position_to_set_in_milliseconds)
def test_set_position_sets_the_current_track_position_in_microsecs(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
before_set_position = self.backend.playback.time_position.get()
self.assert_(before_set_position <= 5000)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
track_id = '/com/mopidy/track/0'
position_to_set_in_milliseconds = 20000
position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
after_set_position = self.backend.playback.time_position.get()
self.assert_(after_set_position >= position_to_set_in_milliseconds)
def test_set_position_does_nothing_if_the_position_is_negative(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
self.backend.playback.seek(20000)
before_set_position = self.backend.playback.time_position.get()
self.assert_(before_set_position >= 20000)
self.assert_(before_set_position <= 25000)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
track_id = '/com/mopidy/track/0'
position_to_set_in_milliseconds = -1000
position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
after_set_position = self.backend.playback.time_position.get()
self.assert_(after_set_position >= before_set_position)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
def test_set_position_does_nothing_if_position_is_larger_than_track_length(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
self.backend.playback.seek(20000)
before_set_position = self.backend.playback.time_position.get()
self.assert_(before_set_position >= 20000)
self.assert_(before_set_position <= 25000)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
track_id = 'a'
position_to_set_in_milliseconds = 50000
position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
after_set_position = self.backend.playback.time_position.get()
self.assert_(after_set_position >= before_set_position)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
def test_set_position_does_nothing_if_track_id_does_not_match_current_track(self):
self.backend.current_playlist.append([Track(uri='a', length=40000)])
self.backend.playback.play()
self.backend.playback.seek(20000)
before_set_position = self.backend.playback.time_position.get()
self.assert_(before_set_position >= 20000)
self.assert_(before_set_position <= 25000)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
track_id = 'b'
position_to_set_in_milliseconds = 0
position_to_set_in_microseconds = position_to_set_in_milliseconds * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microseconds)
after_set_position = self.backend.playback.time_position.get()
self.assert_(after_set_position >= before_set_position)
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
def test_open_uri_is_ignored_if_can_play_is_false(self):
self.mpris.get_CanPlay = lambda *_: False
self.backend.library.provider.dummy_library = [
Track(uri='dummy:/test/uri')]
self.mpris.OpenUri('dummy:/test/uri')
self.assertEquals(len(self.backend.current_playlist.tracks.get()), 0)
def test_open_uri_ignores_uris_with_unknown_uri_scheme(self):
self.assertListEqual(self.backend.uri_schemes.get(), ['dummy'])
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.provider.dummy_library = [
Track(uri='notdummy:/test/uri')]
self.mpris.OpenUri('notdummy:/test/uri')
self.assertEquals(len(self.backend.current_playlist.tracks.get()), 0)
def test_open_uri_adds_uri_to_current_playlist(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.provider.dummy_library = [
Track(uri='dummy:/test/uri')]
self.mpris.OpenUri('dummy:/test/uri')
self.assertEquals(self.backend.current_playlist.tracks.get()[0].uri,
'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_stopped(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.provider.dummy_library = [
Track(uri='dummy:/test/uri')]
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.assertEquals(self.backend.playback.state.get(), STOPPED)
self.mpris.OpenUri('dummy:/test/uri')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri,
'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_paused(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.provider.dummy_library = [
Track(uri='dummy:/test/uri')]
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.backend.playback.pause()
self.assertEquals(self.backend.playback.state.get(), PAUSED)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.mpris.OpenUri('dummy:/test/uri')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri,
'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_playing(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.provider.dummy_library = [
Track(uri='dummy:/test/uri')]
self.backend.current_playlist.append([Track(uri='a'), Track(uri='b')])
self.backend.playback.play()
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri, 'a')
self.mpris.OpenUri('dummy:/test/uri')
self.assertEquals(self.backend.playback.state.get(), PLAYING)
self.assertEquals(self.backend.playback.current_track.get().uri,
'dummy:/test/uri')

View File

@ -0,0 +1,61 @@
import mock
import unittest
from mopidy import settings
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpris import objects
class RootInterfaceTest(unittest.TestCase):
def setUp(self):
objects.exit_process = mock.Mock()
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = DummyBackend.start().proxy()
self.mpris = objects.MprisObject()
def tearDown(self):
self.backend.stop()
def test_constructor_connects_to_dbus(self):
self.assert_(self.mpris._connect_to_dbus.called)
def test_can_raise_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanRaise')
self.assertFalse(result)
def test_raise_does_nothing(self):
self.mpris.Raise()
def test_can_quit_returns_true(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanQuit')
self.assertTrue(result)
def test_quit_should_stop_all_actors(self):
self.mpris.Quit()
self.assert_(objects.exit_process.called)
def test_has_track_list_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'HasTrackList')
self.assertFalse(result)
def test_identify_is_mopidy(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'Identity')
self.assertEquals(result, 'Mopidy')
def test_desktop_entry_is_mopidy(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry')
self.assertEquals(result, 'mopidy')
def test_desktop_entry_is_based_on_DESKTOP_FILE_setting(self):
settings.runtime['DESKTOP_FILE'] = '/tmp/foo.desktop'
result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry')
self.assertEquals(result, 'foo')
settings.runtime.clear()
def test_supported_uri_schemes_is_empty(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedUriSchemes')
self.assertEquals(len(result), 1)
self.assertEquals(result[0], 'dummy')
def test_supported_mime_types_is_empty(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedMimeTypes')
self.assertEquals(len(result), 0)

View File

@ -7,8 +7,29 @@ class BackendListenerTest(unittest.TestCase):
def setUp(self):
self.listener = BackendListener()
def test_listener_has_default_impl_for_the_track_playback_started(self):
def test_listener_has_default_impl_for_track_playback_paused(self):
self.listener.track_playback_paused(Track(), 0)
def test_listener_has_default_impl_for_track_playback_resumed(self):
self.listener.track_playback_resumed(Track(), 0)
def test_listener_has_default_impl_for_track_playback_started(self):
self.listener.track_playback_started(Track())
def test_listener_has_default_impl_for_the_track_playback_ended(self):
def test_listener_has_default_impl_for_track_playback_ended(self):
self.listener.track_playback_ended(Track(), 0)
def test_listener_has_default_impl_for_playback_state_changed(self):
self.listener.playback_state_changed()
def test_listener_has_default_impl_for_playlist_changed(self):
self.listener.playlist_changed()
def test_listener_has_default_impl_for_options_changed(self):
self.listener.options_changed()
def test_listener_has_default_impl_for_volume_changed(self):
self.listener.volume_changed()
def test_listener_has_default_impl_for_seeked(self):
self.listener.seeked()