Merge branch 'develop' into feature/audio-mute

This commit is contained in:
Stein Magnus Jodal 2013-10-09 22:57:16 +02:00
commit 56b1d6390c
51 changed files with 750 additions and 3213 deletions

5
.coveragerc Normal file
View File

@ -0,0 +1,5 @@
[report]
omit =
*/pyshared/*
*/python?.?/*
*/site-packages/nose/*

View File

@ -5,14 +5,17 @@ install:
- "sudo wget -O /etc/apt/sources.list.d/mopidy.list http://apt.mopidy.com/mopidy.list"
- "sudo apt-get update || true"
- "sudo apt-get install $(apt-cache depends mopidy | awk '$2 !~ /mopidy/ {print $2}')"
- "pip install flake8"
- "pip install coveralls flake8"
before_script:
- "rm $VIRTUAL_ENV/lib/python$TRAVIS_PYTHON_VERSION/no-global-site-packages.txt"
script:
- "flake8 $(find . -iname '*.py')"
- "nosetests"
- "nosetests --with-coverage --cover-package=mopidy"
after_success:
- "coveralls"
notifications:
irc:

View File

@ -24,3 +24,5 @@
- Alli Witheford <alzeih@gmail.com>
- Alexandre Petitjean <alpetitjean@gmail.com>
- Pavol Babincak <scroolik@gmail.com>
- Javier Domingo <javierdo1@gmail.com>
- Lasse Bigum <lasse@bigum.org>

View File

@ -2,7 +2,6 @@ include *.rst
include LICENSE
include MANIFEST.in
include data/mopidy.desktop
include mopidy/backends/spotify/spotify_appkey.key
include pylintrc
recursive-include docs *

View File

@ -25,5 +25,18 @@ To get started with Mopidy, check out `the docs <http://docs.mopidy.com/>`_.
- Mailing list: `mopidy@googlegroups.com <https://groups.google.com/forum/?fromgroups=#!forum/mopidy>`_
- Twitter: `@mopidy <https://twitter.com/mopidy/>`_
.. image:: https://pypip.in/v/Mopidy/badge.png
:target: https://crate.io/packages/Mopidy/
:alt: Latest PyPI version
.. image:: https://pypip.in/d/Mopidy/badge.png
:target: https://crate.io/packages/Mopidy/
:alt: Number of PyPI downloads
.. image:: https://travis-ci.org/mopidy/mopidy.png?branch=develop
:target: https://travis-ci.org/mopidy/mopidy
:alt: Travis CI build status
.. image:: https://coveralls.io/repos/mopidy/mopidy/badge.png?branch=develop
:target: https://coveralls.io/r/mopidy/mopidy?branch=develop
:alt: Test coverage

View File

@ -49,5 +49,3 @@ Frontend implementations
* :mod:`mopidy.frontends.http`
* :mod:`mopidy.frontends.mpd`
* :mod:`mopidy.frontends.mpris`
* :mod:`mopidy.frontends.scrobbler`

View File

@ -4,6 +4,43 @@ Changelog
This changelog is used to track all major changes to Mopidy.
v0.16.0 (UNRELEASED)
====================
**Dependencies**
Parts of Mopidy have been moved to their own external extensions. If you want
Mopidy to continue to work like it used to, you may have to install one or more
of the following extensions as well:
- The Spotify backend has been moved to
`Mopidy-Scrobbler <https://github.com/mopidy/mopidy-spotify>`_.
- The Last.fm scrobbler has been moved to
`Mopidy-Scrobbler <https://github.com/mopidy/mopidy-scrobbler>`_.
- The MPRIS frontend has been moved to
`Mopidy-MPRIS <https://github.com/mopidy/mopidy-mpris>`_.
**Audio**
- Added support for parsing and playback of playlists in GStreamer. For end
users this basically means that you can now add a radio playlist to Mopidy
and we will automatically download it and play the stream inside it.
Currently we support M3U, PLS, XSPF and ASX files. Also note that we can
currently only play the first stream in the playlist.
- We now handle the rare case where an audio track has max volume equal to min.
This was causing divide by zero errors when scaling volumes to a zero to
hundred scale. (Fixes: :issue:`525`)
**Extension support**
- A cookiecutter project for quickly creating new Mopidy extensions have been
created. You can find it at `cookiecutter-mopidy-ext
<https://github.com/mopidy/cookiecutter-mopidy-ext>`_. (Fixes: :issue:`522`)
v0.15.0 (2013-09-19)
====================

View File

@ -8,7 +8,8 @@ MPRIS clients
Specification. It's a spec that describes a standard D-Bus interface for making
media players available to other applications on the same system.
Mopidy's :ref:`MPRIS frontend <ext-mpris>` currently implements all required
The MPRIS frontend provided by the `Mopidy-MPRIS extension
<https://github.com/mopidy/mopidy-mpris>`_ currently implements all required
parts of the MPRIS spec, plus the optional playlist interface. It does not
implement the optional tracklist interface.

View File

@ -36,19 +36,21 @@ How to make Mopidy available as an UPnP MediaRenderer
=====================================================
With the help of `the Rygel project <https://live.gnome.org/Rygel>`_ Mopidy can
be made available as an UPnP MediaRenderer. Rygel will interface with Mopidy's
:ref:`MPRIS frontend <ext-mpris>`, and make Mopidy available as a MediaRenderer
on the local network. Since this depends on the MPRIS frontend, which again
depends on D-Bus being available, this will only work on Linux, and not OS X.
MPRIS/D-Bus is only available to other applications on the same host, so Rygel
must be running on the same machine as Mopidy.
be made available as an UPnP MediaRenderer. Rygel will interface with the MPRIS
interface provided by the `Mopidy-MPRIS extension
<https://github.com/mopidy/mopidy-mpris>`_, and make Mopidy available as a
MediaRenderer on the local network. Since this depends on the MPRIS frontend,
which again depends on D-Bus being available, this will only work on Linux, and
not OS X. MPRIS/D-Bus is only available to other applications on the same
host, so Rygel must be running on the same machine as Mopidy.
1. Start Mopidy and make sure the :ref:`MPRIS frontend <ext-mpris>` is working.
It is activated by default, but you may miss dependencies or be using OS X,
in which case it will not work. Check the console output when Mopidy is
started for any errors related to the MPRIS frontend. If you're unsure it is
working, there are instructions for how to test it on the :ref:`MPRIS
frontend <ext-mpris>` page.
1. Start Mopidy and make sure the MPRIS frontend is working. It is activated
by default when the Mopidy-MPRIS extension is installed, but you may miss
dependencies or be using OS X, in which case it will not work. Check the
console output when Mopidy is started for any errors related to the MPRIS
frontend. If you're unsure it is working, there are instructions for how to
test it on in the `Mopidy-MPRIS readme
<https://github.com/mopidy/mopidy-mpris>`_.
2. Install Rygel. On Debian/Ubuntu::

View File

@ -35,6 +35,8 @@ class Mock(object):
elif (name[0] == name[0].upper()
# gst.interfaces.MIXER_TRACK_*
and not name.startswith('MIXER_TRACK_')
# gst.PadTemplate
and not name.startswith('PadTemplate')
# dbus.String()
and not name == 'String'):
return type(name, (), {})

View File

@ -33,93 +33,77 @@ developers.
Mopidy-Beets
------------
https://github.com/mopidy/mopidy-beets
Provides a backend for playing music from your `Beets
<http://beets.radbox.org/>`_ music library through Beets' web extension.
Author:
Janez Troha
PyPI:
`Mopidy-Beets <https://pypi.python.org/pypi/Mopidy-Beets>`_
GitHub:
`dz0ny/mopidy-beets <https://github.com/dz0ny/mopidy-beets>`_
Issues:
https://github.com/dz0ny/mopidy-beets/issues
Mopidy-GMusic
-------------
https://github.com/hechtus/mopidy-gmusic
Provides a backend for playing music from `Google Play Music
<https://play.google.com/music/>`_.
Author:
Ronald Hecht
PyPI:
`Mopidy-GMusic <https://pypi.python.org/pypi/Mopidy-GMusic>`_
GitHub:
`hechtus/mopidy-gmusic <https://github.com/hechtus/mopidy-gmusic>`_
Issues:
https://github.com/hechtus/mopidy-gmusic/issues
Mopidy-MPRIS
------------
https://github.com/mopidy/mopidy-mpris
Extension for controlling Mopidy through the `MPRIS <http://www.mpris.org/>`_
D-Bus interface, for example using the Ubuntu Sound Menu.
Mopidy-NAD
----------
https://github.com/mopidy/mopidy-nad
Extension for controlling volume using an external NAD amplifier.
Author:
Stein Magnus Jodal
PyPI:
`Mopidy-NAD <https://pypi.python.org/pypi/Mopidy-NAD>`_
GitHub:
`mopidy/mopidy-nad <https://github.com/mopidy/mopidy-nad>`_
Issues:
https://github.com/mopidy/mopidy/issues
Mopidy-Scrobbler
----------------
https://github.com/mopidy/mopidy-scrobbler
Extension for scrobbling played tracks to Last.fm.
Mopidy-SomaFM
-------------
https://github.com/AlexandrePTJ/mopidy-somafm
Provides a backend for playing music from the `SomaFM <http://somafm.com/>`_
service.
Author:
Alexandre Petitjean
PyPI:
`Mopidy-SomaFM <https://pypi.python.org/pypi/Mopidy-SomaFM>`_
GitHub:
`AlexandrePTJ/mopidy-somafm <https://github.com/AlexandrePTJ/mopidy-somafm/>`_
Issues:
https://github.com/AlexandrePTJ/mopidy-somafm/issues
Mopidy-SoundCloud
-----------------
Provides a backend for playing music from the `SoundCloud
https://github.com/mopidy/mopidy-soundcloud
rovides a backend for playing music from the `SoundCloud
<http://www.soundcloud.com/>`_ service.
Author:
Janez Troha
PyPI:
`Mopidy-SoundCloud <https://pypi.python.org/pypi/Mopidy-SoundCloud>`_
GitHub:
`dz0ny/mopidy-soundcloud <https://github.com/dz0ny/mopidy-soundcloud>`_
Issues:
https://github.com/dz0ny/mopidy-soundcloud/issues
Mopidy-Spotify
--------------
https://github.com/mopidy/mopidy-spotify
Extension for playing music from the `Spotify <http://www.spotify.com/>`_ music
streaming service.
Mopidy-Subsonic
---------------
https://github.com/rattboi/mopidy-subsonic
Provides a backend for playing music from a `Subsonic Music Streamer
<http://www.subsonic.org/>`_ library.
Author:
Bradon Kanyid
PyPI:
`Mopidy-Subsonic <https://pypi.python.org/pypi/Mopidy-Subsonic>`_
GitHub:
`rattboi/mopidy-subsonic <https://github.com/rattboi/mopidy-subsonic>`_
Issues:
https://github.com/rattboi/mopidy-subsonic/issues

View File

@ -1,105 +0,0 @@
.. _ext-mpris:
************
Mopidy-MPRIS
************
This extension lets you control Mopidy through the Media Player Remote
Interfacing Specification (`MPRIS <http://www.mpris.org/>`_) D-Bus interface.
An example of an MPRIS client is the :ref:`ubuntu-sound-menu`.
Dependencies
============
- D-Bus Python bindings. The package is named ``python-dbus`` in
Ubuntu/Debian.
- ``libindicate`` Python bindings is needed to expose Mopidy in e.g. the
Ubuntu Sound Menu. The package is named ``python-indicate`` in
Ubuntu/Debian.
- An ``.desktop`` file for Mopidy installed at the path set in the
:confval:`mpris/desktop_file` config value. See usage section below for
details.
Default configuration
=====================
.. literalinclude:: ../../mopidy/frontends/mpris/ext.conf
:language: ini
Configuration values
====================
.. confval:: mpris/enabled
If the MPRIS extension should be enabled or not.
.. confval:: mpris/desktop_file
Location of the Mopidy ``.desktop`` file.
Usage
=====
The extension is enabled by default if all dependencies are available.
Controlling Mopidy through the Ubuntu Sound Menu
------------------------------------------------
If you are running Ubuntu and installed Mopidy using the Debian package from
APT you should be able to control Mopidy through the :ref:`ubuntu-sound-menu`
without any changes.
If you installed Mopidy in any other way and want to control Mopidy through the
Ubuntu Sound Menu, you must install the ``mopidy.desktop`` file which can be
found in the ``data/`` dir of the Mopidy source repo into the
``/usr/share/applications`` dir by hand::
cd /path/to/mopidy/source
sudo cp data/mopidy.desktop /usr/share/applications/
If the correct path to the installed ``mopidy.desktop`` file on your system
isn't ``/usr/share/applications/mopidy.conf``, you'll need to set the
:confval:`mpris/desktop_file` config value.
After you have installed the file, start Mopidy in any way, and Mopidy should
appear in the Ubuntu Sound Menu. When you quit Mopidy, it will still be listed
in the Ubuntu Sound Menu, and may be restarted by selecting it there.
The Ubuntu Sound Menu interacts with Mopidy's MPRIS frontend. The MPRIS
frontend supports the minimum requirements of the `MPRIS specification
<http://www.mpris.org/>`_. The ``TrackList`` interface of the spec is not
supported.
Testing the MPRIS API directly
------------------------------
To use the MPRIS API directly, start Mopidy, and then run the following in a
Python shell::
import dbus
bus = dbus.SessionBus()
player = bus.get_object('org.mpris.MediaPlayer2.mopidy',
'/org/mpris/MediaPlayer2')
Now you can control Mopidy through the player object. Examples:
- To get some properties from Mopidy, run::
props = player.GetAll('org.mpris.MediaPlayer2',
dbus_interface='org.freedesktop.DBus.Properties')
- To quit Mopidy through D-Bus, run::
player.Quit(dbus_interface='org.mpris.MediaPlayer2')
For details on the API, please refer to the `MPRIS specification
<http://www.mpris.org/>`_.

View File

@ -1,55 +0,0 @@
.. _ext-scrobbler:
****************
Mopidy-Scrobbler
****************
This extension scrobbles the music you play to your `Last.fm
<http://www.last.fm>`_ profile.
.. note::
This extension requires a free user account at Last.fm.
Dependencies
============
.. literalinclude:: ../../requirements/scrobbler.txt
Default configuration
=====================
.. literalinclude:: ../../mopidy/frontends/scrobbler/ext.conf
:language: ini
Configuration values
====================
.. confval:: scrobbler/enabled
If the scrobbler extension should be enabled or not.
.. confval:: scrobbler/username
Your Last.fm username.
.. confval:: scrobbler/password
Your Last.fm password.
Usage
=====
The extension is enabled by default if all dependencies are available. You just
need to add your Last.fm username and password to the
``~/.config/mopidy/mopidy.conf`` file:
.. code-block:: ini
[scrobbler]
username = myusername
password = mysecret

View File

@ -1,83 +0,0 @@
.. _ext-spotify:
**************
Mopidy-Spotify
**************
An extension for playing music from Spotify.
`Spotify <http://www.spotify.com/>`_ is a music streaming service. The backend
uses the official `libspotify
<http://developer.spotify.com/en/libspotify/overview/>`_ library and the
`pyspotify <http://github.com/mopidy/pyspotify/>`_ Python bindings for
libspotify. This backend handles URIs starting with ``spotify:``.
.. note::
This product uses SPOTIFY(R) CORE but is not endorsed, certified or
otherwise approved in any way by Spotify. Spotify is the registered
trade mark of the Spotify Group.
Known issues
============
https://github.com/mopidy/mopidy/issues?labels=Spotify+backend
Dependencies
============
.. literalinclude:: ../../requirements/spotify.txt
Default configuration
=====================
.. literalinclude:: ../../mopidy/backends/spotify/ext.conf
:language: ini
Configuration values
====================
.. confval:: spotify/enabled
If the Spotify extension should be enabled or not.
.. confval:: spotify/username
Your Spotify Premium username.
.. confval:: spotify/password
Your Spotify Premium password.
.. confval:: spotify/bitrate
The preferred audio bitrate. Valid values are 96, 160, 320.
.. confval:: spotify/timeout
Max number of seconds to wait for Spotify operations to complete.
.. confval:: spotify/cache_dir
Path to the Spotify data cache. Cannot be shared with other Spotify apps.
Usage
=====
If you are using the Spotify backend, which is the default, enter your Spotify
Premium account's username and password into ``~/.config/mopidy/mopidy.conf``,
like this:
.. code-block:: ini
[spotify]
username = myusername
password = mysecret
This will only work if you have the Spotify Premium subscription. Spotify
Unlimited will not work.

View File

@ -62,6 +62,20 @@ extension, Mopidy-Soundspot::
Example content for the most important files follows below.
cookiecutter project template
=============================
We've also made a `cookiecutter <http://cookiecutter.readthedocs.org/>`_
project template for `creating new Mopidy extensions
<https://github.com/mopidy/cookiecutter-mopidy-ext>`_. If you install
cookiecutter and run a single command, you're asked a few questions about the
name of your extension, etc. This is used to create a folder structure similar
to the above, with all the needed files and most of the details filled in for
you. This saves you a lot of tedious work and copy-pasting from this howto. See
the readme of `cookiecutter-mopidy-ext
<https://github.com/mopidy/cookiecutter-mopidy-ext>`_ for further details.
Example README.rst
==================
@ -73,24 +87,30 @@ installation using ``pip install Mopidy-Something==dev`` to work.
.. code-block:: rst
****************
Mopidy-Soundspot
================
****************
`Mopidy <http://www.mopidy.com/>`_ extension for playing music from
`Soundspot <http://soundspot.example.com/>`_.
Usage
-----
Requires a Soundspot Platina subscription and the pysoundspot library.
Installation
============
Install by running::
sudo pip install Mopidy-Soundspot
Or install the Debian/Ubuntu package from `apt.mopidy.com
Or, if available, install the Debian/Ubuntu package from `apt.mopidy.com
<http://apt.mopidy.com/>`_.
Configuration
=============
Before starting Mopidy, you must add your Soundspot username and password
to the Mopidy configuration file::
@ -98,34 +118,46 @@ installation using ``pip install Mopidy-Something==dev`` to work.
username = alice
password = secret
Project resources
-----------------
=================
- `Source code <https://github.com/mopidy/mopidy-soundspot>`_
- `Issue tracker <https://github.com/mopidy/mopidy-soundspot/issues>`_
- `Download development snapshot <https://github.com/mopidy/mopidy-soundspot/tarball/develop#egg=Mopidy-Soundspot-dev>`_
- `Download development snapshot <https://github.com/mopidy/mopidy-soundspot/tarball/master#egg=Mopidy-Soundspot-dev>`_
Changelog
=========
v0.1.0 (2013-09-17)
-------------------
- Initial release.
Example setup.py
================
The ``setup.py`` file must use setuptools/distribute, and not distutils. This
is because Mopidy extensions use setuptools' entry point functionality to
register themselves as available Mopidy extensions when they are installed on
your system.
The ``setup.py`` file must use setuptools, and not distutils. This is because
Mopidy extensions use setuptools' entry point functionality to register
themselves as available Mopidy extensions when they are installed on your
system.
The example below also includes a couple of convenient tricks for reading the
package version from the source code so that it is defined in a single place,
and to reuse the README file as the long description of the package for the
PyPI registration.
The package must have ``install_requires`` on ``setuptools`` and ``Mopidy``, in
addition to any other dependencies required by your extension. The
``entry_points`` part must be included. The ``mopidy.ext`` part cannot be
changed, but the innermost string should be changed. It's format is
``ext_name = package_name:Extension``. ``ext_name`` should be a short
name for your extension, typically the part after "Mopidy-" in lowercase. This
name is used e.g. to name the config section for your extension. The
The package must have ``install_requires`` on ``setuptools`` and ``Mopidy >=
0.14`` (or a newer version, if your extension requires it), in addition to any
other dependencies required by your extension. If you implement a Mopidy
frontend or backend, you'll need to include ``Pykka >= 1.1`` in the
requirements. The ``entry_points`` part must be included. The ``mopidy.ext``
part cannot be changed, but the innermost string should be changed. It's format
is ``ext_name = package_name:Extension``. ``ext_name`` should be a short name
for your extension, typically the part after "Mopidy-" in lowercase. This name
is used e.g. to name the config section for your extension. The
``package_name:Extension`` part is simply the Python path to the extension
class that will connect the rest of the dots.
@ -134,7 +166,7 @@ class that will connect the rest of the dots.
from __future__ import unicode_literals
import re
from setuptools import setup
from setuptools import setup, find_packages
def get_version(filename):
@ -146,20 +178,26 @@ class that will connect the rest of the dots.
setup(
name='Mopidy-Soundspot',
version=get_version('mopidy_soundspot/__init__.py'),
url='http://example.com/mopidy-soundspot/',
url='https://github.com/your-account/mopidy-soundspot',
license='Apache License, Version 2.0',
author='Your Name',
author_email='your-email@example.com',
description='Very short description',
long_description=open('README.rst').read(),
packages=['mopidy_soundspot'],
packages=find_packages(exclude=['tests', 'tests.*']),
zip_safe=False,
include_package_data=True,
install_requires=[
'setuptools',
'Mopidy',
'Mopidy >= 0.14',
'Pykka >= 1.1',
'pysoundspot',
],
test_suite='nose.collector',
tests_require=[
'nose',
'mock >= 1.0',
],
entry_points={
'mopidy.ext': [
'soundspot = mopidy_soundspot:Extension',

View File

@ -24,10 +24,9 @@ Glossary
frontend
A part of Mopidy *using* the :term:`core` API. Existing frontends
include the :ref:`MPD server <ext-mpd>`, the :ref:`MPRIS/D-Bus
integration <ext-mpris>`, the :ref:`Last.fm scrobbler <ext-scrobbler>`,
and the :ref:`HTTP server <ext-http>` with JavaScript API. See
:ref:`frontend-api` for details.
include the :ref:`MPD server <ext-mpd>`, the MPRIS/D-Bus integration,
the Last.fm scrobbler, and the :ref:`HTTP server <ext-http>` with
JavaScript API. See :ref:`frontend-api` for details.
mixer
A GStreamer element that controls audio volume.

View File

@ -4,9 +4,9 @@ Mopidy
Mopidy is a music server which can play music both from multiple sources, like
your :ref:`local hard drive <ext-local>`, :ref:`radio streams <ext-stream>`,
and from :ref:`Spotify <ext-spotify>` and SoundCloud. Searches combines results
from all music sources, and you can mix tracks from all sources in your play
queue. Your playlists from Spotify or SoundCloud are also available for use.
and from Spotify and SoundCloud. Searches combines results from all music
sources, and you can mix tracks from all sources in your play queue. Your
playlists from Spotify or SoundCloud are also available for use.
To control your Mopidy music server, you can use one of Mopidy's :ref:`web
clients <http-clients>`, the :ref:`Ubuntu Sound Menu <ubuntu-sound-menu>`, any

View File

@ -250,8 +250,8 @@ can install Mopidy from PyPI using Pip.
sudo pip-python install -U cherrypy ws4py
#. Optional: To use MPRIS, e.g. for controlling Mopidy from the Ubuntu Sound
Menu or from an UPnP client via Rygel, you need some additional
#. Optional: To use Mopidy-MPRIS, e.g. for controlling Mopidy from the Ubuntu
Sound Menu or from an UPnP client via Rygel, you need some additional
dependencies: the Python bindings for libindicate, and the Python bindings
for libdbus, the reference D-Bus library.

View File

@ -21,4 +21,4 @@ if (isinstance(pykka.__version__, basestring)
warnings.filterwarnings('ignore', 'could not open display')
__version__ = '0.15.0'
__version__ = '0.16.0a1'

View File

@ -11,7 +11,7 @@ import pykka
from mopidy.utils import process
from . import mixers, utils
from . import mixers, playlists, utils
from .constants import PlaybackState
from .listener import AudioListener
@ -19,6 +19,9 @@ logger = logging.getLogger('mopidy.audio')
mixers.register_mixers()
playlists.register_typefinders()
playlists.register_elements()
MB = 1 << 20
@ -573,6 +576,8 @@ class Audio(pykka.ThreadingActor):
"""Convert value between scales."""
new_min, new_max = new
old_min, old_max = old
if old_min == old_max:
return old_max
scaling = float(new_max - new_min) / (old_max - old_min)
return int(round(scaling * (value - old_min) + new_min))

412
mopidy/audio/playlists.py Normal file
View File

@ -0,0 +1,412 @@
from __future__ import unicode_literals
import pygst
pygst.require('0.10')
import gst
import gobject
import ConfigParser as configparser
import io
try:
import xml.etree.cElementTree as elementtree
except ImportError:
import xml.etree.ElementTree as elementtree
# TODO: make detect_FOO_header reusable in general mopidy code.
# i.e. give it just a "peek" like function.
def detect_m3u_header(typefind):
return typefind.peek(0, 8) == b'#EXTM3U\n'
def detect_pls_header(typefind):
return typefind.peek(0, 11).lower() == b'[playlist]\n'
def detect_xspf_header(typefind):
data = typefind.peek(0, 150)
if b'xspf' not in data:
return False
try:
data = io.BytesIO(data)
for event, element in elementtree.iterparse(data, events=(b'start',)):
return element.tag.lower() == '{http://xspf.org/ns/0/}playlist'
except elementtree.ParseError:
pass
return False
def detect_asx_header(typefind):
data = typefind.peek(0, 50)
if b'asx' not in data:
return False
try:
data = io.BytesIO(data)
for event, element in elementtree.iterparse(data, events=(b'start',)):
return element.tag.lower() == 'asx'
except elementtree.ParseError:
pass
return False
def parse_m3u(data):
# TODO: convert non URIs to file URIs.
found_header = False
for line in data.readlines():
if found_header or line.startswith('#EXTM3U'):
found_header = True
else:
continue
if not line.startswith('#') and line.strip():
yield line.strip()
def parse_pls(data):
# TODO: convert non URIs to file URIs.
try:
cp = configparser.RawConfigParser()
cp.readfp(data)
except configparser.Error:
return
for section in cp.sections():
if section.lower() != 'playlist':
continue
for i in xrange(cp.getint(section, 'numberofentries')):
yield cp.get(section, 'file%d' % (i+1))
def parse_xspf(data):
try:
for event, element in elementtree.iterparse(data):
element.tag = element.tag.lower() # normalize
except elementtree.ParseError:
return
ns = 'http://xspf.org/ns/0/'
for track in element.iterfind('{%s}tracklist/{%s}track' % (ns, ns)):
yield track.findtext('{%s}location' % ns)
def parse_asx(data):
try:
for event, element in elementtree.iterparse(data):
element.tag = element.tag.lower() # normalize
except elementtree.ParseError:
return
for ref in element.findall('entry/ref'):
yield ref.get('href', '').strip()
def parse_urilist(data):
for line in data.readlines():
if not line.startswith('#') and gst.uri_is_valid(line.strip()):
yield line
def playlist_typefinder(typefind, func, caps):
if func(typefind):
typefind.suggest(gst.TYPE_FIND_MAXIMUM, caps)
def register_typefind(mimetype, func, extensions):
caps = gst.caps_from_string(mimetype)
gst.type_find_register(mimetype, gst.RANK_PRIMARY, playlist_typefinder,
extensions, caps, func, caps)
def register_typefinders():
register_typefind('audio/x-mpegurl', detect_m3u_header, [b'm3u', b'm3u8'])
register_typefind('audio/x-scpls', detect_pls_header, [b'pls'])
register_typefind('application/xspf+xml', detect_xspf_header, [b'xspf'])
# NOTE: seems we can't use video/x-ms-asf which is the correct mime for asx
# as it is shared with asf for streaming videos :/
register_typefind('audio/x-ms-asx', detect_asx_header, [b'asx'])
class BasePlaylistElement(gst.Bin):
"""Base class for creating GStreamer elements for playlist support.
This element performs the following steps:
1. Initializes src and sink pads for the element.
2. Collects data from the sink until EOS is reached.
3. Passes the collected data to :meth:`convert` to get a list of URIs.
4. Passes the list of URIs to :meth:`handle`, default handling is to pass
the URIs to the src element as a uri-list.
5. If handle returned true, the EOS consumed and nothing more happens, if
it is not consumed it flows on to the next element downstream, which is
likely our uri-list consumer which needs the EOS to know we are done
sending URIs.
"""
sinkpad_template = None
"""GStreamer pad template to use for sink, must be overriden."""
srcpad_template = None
"""GStreamer pad template to use for src, must be overriden."""
ghost_srcpad = False
"""Indicates if src pad should be ghosted or not."""
def __init__(self):
"""Sets up src and sink pads plus behaviour."""
super(BasePlaylistElement, self).__init__()
self._data = io.BytesIO()
self._done = False
self.sinkpad = gst.Pad(self.sinkpad_template)
self.sinkpad.set_chain_function(self._chain)
self.sinkpad.set_event_function(self._event)
self.add_pad(self.sinkpad)
if self.ghost_srcpad:
self.srcpad = gst.ghost_pad_new_notarget('src', gst.PAD_SRC)
else:
self.srcpad = gst.Pad(self.srcpad_template)
self.add_pad(self.srcpad)
def convert(self, data):
"""Convert the data we have colleted to URIs.
:param data: collected data buffer
:type data: :class:`io.BytesIO`
:returns: iterable or generator of URIs
"""
raise NotImplementedError
def handle(self, uris):
"""Do something useful with the URIs.
:param uris: list of URIs
:type uris: :type:`list`
:returns: boolean indicating if EOS should be consumed
"""
# TODO: handle unicode uris which we can get out of elementtree
self.srcpad.push(gst.Buffer('\n'.join(uris)))
return False
def _chain(self, pad, buf):
if not self._done:
self._data.write(buf.data)
return gst.FLOW_OK
return gst.FLOW_EOS
def _event(self, pad, event):
if event.type == gst.EVENT_NEWSEGMENT:
return True
if event.type == gst.EVENT_EOS:
self._done = True
self._data.seek(0)
if self.handle(list(self.convert(self._data))):
return True
# Ensure we handle remaining events in a sane way.
return pad.event_default(event)
class M3uDecoder(BasePlaylistElement):
__gstdetails__ = ('M3U Decoder',
'Decoder',
'Convert .m3u to text/uri-list',
'Mopidy')
sinkpad_template = gst.PadTemplate(
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
gst.caps_from_string('audio/x-mpegurl'))
srcpad_template = gst.PadTemplate(
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
gst.caps_from_string('text/uri-list'))
__gsttemplates__ = (sinkpad_template, srcpad_template)
def convert(self, data):
return parse_m3u(data)
class PlsDecoder(BasePlaylistElement):
__gstdetails__ = ('PLS Decoder',
'Decoder',
'Convert .pls to text/uri-list',
'Mopidy')
sinkpad_template = gst.PadTemplate(
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
gst.caps_from_string('audio/x-scpls'))
srcpad_template = gst.PadTemplate(
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
gst.caps_from_string('text/uri-list'))
__gsttemplates__ = (sinkpad_template, srcpad_template)
def convert(self, data):
return parse_pls(data)
class XspfDecoder(BasePlaylistElement):
__gstdetails__ = ('XSPF Decoder',
'Decoder',
'Convert .pls to text/uri-list',
'Mopidy')
sinkpad_template = gst.PadTemplate(
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
gst.caps_from_string('application/xspf+xml'))
srcpad_template = gst.PadTemplate(
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
gst.caps_from_string('text/uri-list'))
__gsttemplates__ = (sinkpad_template, srcpad_template)
def convert(self, data):
return parse_xspf(data)
class AsxDecoder(BasePlaylistElement):
__gstdetails__ = ('ASX Decoder',
'Decoder',
'Convert .asx to text/uri-list',
'Mopidy')
sinkpad_template = gst.PadTemplate(
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
gst.caps_from_string('audio/x-ms-asx'))
srcpad_template = gst.PadTemplate(
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
gst.caps_from_string('text/uri-list'))
__gsttemplates__ = (sinkpad_template, srcpad_template)
def convert(self, data):
return parse_asx(data)
class UriListElement(BasePlaylistElement):
__gstdetails__ = ('URIListDemuxer',
'Demuxer',
'Convert a text/uri-list to a stream',
'Mopidy')
sinkpad_template = gst.PadTemplate(
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
gst.caps_from_string('text/uri-list'))
srcpad_template = gst.PadTemplate(
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
gst.caps_new_any())
ghost_srcpad = True # We need to hook this up to our internal decodebin
__gsttemplates__ = (sinkpad_template, srcpad_template)
def __init__(self):
super(UriListElement, self).__init__()
self.uridecodebin = gst.element_factory_make('uridecodebin')
self.uridecodebin.connect('pad-added', self.pad_added)
# Limit to anycaps so we get a single stream out, letting other
# elements downstream figure out actual muxing
self.uridecodebin.set_property('caps', gst.caps_new_any())
def pad_added(self, src, pad):
self.srcpad.set_target(pad)
pad.add_event_probe(self.pad_event)
def pad_event(self, pad, event):
if event.has_name('urilist-played'):
error = gst.GError(gst.RESOURCE_ERROR, gst.RESOURCE_ERROR_FAILED,
b'Nested playlists not supported.')
message = b'Playlists pointing to other playlists is not supported'
self.post_message(gst.message_new_error(self, error, message))
return 1 # GST_PAD_PROBE_OK
def handle(self, uris):
struct = gst.Structure('urilist-played')
event = gst.event_new_custom(gst.EVENT_CUSTOM_UPSTREAM, struct)
self.sinkpad.push_event(event)
# TODO: hookup about to finish and errors to rest of URIs so we
# round robin, only giving up once all have been tried.
# TODO: uris could be empty.
self.add(self.uridecodebin)
self.uridecodebin.set_state(gst.STATE_READY)
self.uridecodebin.set_property('uri', uris[0])
self.uridecodebin.sync_state_with_parent()
return True # Make sure we consume the EOS that triggered us.
def convert(self, data):
return parse_urilist(data)
class IcySrc(gst.Bin, gst.URIHandler):
__gstdetails__ = ('IcySrc',
'Src',
'HTTP src wrapper for icy:// support.',
'Mopidy')
srcpad_template = gst.PadTemplate(
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
gst.caps_new_any())
__gsttemplates__ = (srcpad_template,)
def __init__(self):
super(IcySrc, self).__init__()
self._httpsrc = gst.element_make_from_uri(gst.URI_SRC, 'http://')
try:
self._httpsrc.set_property('iradio-mode', True)
except TypeError:
pass
self.add(self._httpsrc)
self._srcpad = gst.GhostPad('src', self._httpsrc.get_pad('src'))
self.add_pad(self._srcpad)
@classmethod
def do_get_type_full(cls):
return gst.URI_SRC
@classmethod
def do_get_protocols_full(cls):
return [b'icy', b'icyx']
def do_set_uri(self, uri):
if uri.startswith('icy://'):
return self._httpsrc.set_uri(b'http://' + uri[len('icy://'):])
elif uri.startswith('icyx://'):
return self._httpsrc.set_uri(b'https://' + uri[len('icyx://'):])
else:
return False
def do_get_uri(self):
uri = self._httpsrc.get_uri()
if uri.startswith('http://'):
return b'icy://' + uri[len('http://'):]
else:
return b'icyx://' + uri[len('https://'):]
def register_element(element_class):
gobject.type_register(element_class)
gst.element_register(
element_class, element_class.__name__.lower(), gst.RANK_MARGINAL)
def register_elements():
register_element(M3uDecoder)
register_element(PlsDecoder)
register_element(XspfDecoder)
register_element(AsxDecoder)
register_element(UriListElement)
# Only register icy if gst install can't handle it on it's own.
if not gst.element_make_from_uri(gst.URI_SRC, 'icy://'):
register_element(IcySrc)

View File

@ -1,36 +0,0 @@
from __future__ import unicode_literals
import os
import mopidy
from mopidy import config, exceptions, ext
class Extension(ext.Extension):
dist_name = 'Mopidy-Spotify'
ext_name = 'spotify'
version = mopidy.__version__
def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
return config.read(conf_file)
def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema['username'] = config.String()
schema['password'] = config.Secret()
schema['bitrate'] = config.Integer(choices=(96, 160, 320))
schema['timeout'] = config.Integer(minimum=0)
schema['cache_dir'] = config.Path()
return schema
def validate_environment(self):
try:
import spotify # noqa
except ImportError as e:
raise exceptions.ExtensionError('pyspotify library not found', e)
def get_backend_classes(self):
from .actor import SpotifyBackend
return [SpotifyBackend]

View File

@ -1,37 +0,0 @@
from __future__ import unicode_literals
import logging
import pykka
from mopidy.backends import base
from mopidy.backends.spotify.library import SpotifyLibraryProvider
from mopidy.backends.spotify.playback import SpotifyPlaybackProvider
from mopidy.backends.spotify.session_manager import SpotifySessionManager
from mopidy.backends.spotify.playlists import SpotifyPlaylistsProvider
logger = logging.getLogger('mopidy.backends.spotify')
class SpotifyBackend(pykka.ThreadingActor, base.Backend):
def __init__(self, config, audio):
super(SpotifyBackend, self).__init__()
self.config = config
self.library = SpotifyLibraryProvider(backend=self)
self.playback = SpotifyPlaybackProvider(audio=audio, backend=self)
self.playlists = SpotifyPlaylistsProvider(backend=self)
self.uri_schemes = ['spotify']
self.spotify = SpotifySessionManager(
config, audio=audio, backend_ref=self.actor_ref)
def on_start(self):
logger.info('Mopidy uses SPOTIFY(R) CORE')
logger.debug('Connecting to Spotify')
self.spotify.start()
def on_stop(self):
self.spotify.logout()

View File

@ -1,51 +0,0 @@
from __future__ import unicode_literals
import logging
from spotify.manager import SpotifyContainerManager as \
PyspotifyContainerManager
logger = logging.getLogger('mopidy.backends.spotify')
class SpotifyContainerManager(PyspotifyContainerManager):
def __init__(self, session_manager):
PyspotifyContainerManager.__init__(self)
self.session_manager = session_manager
def container_loaded(self, container, userdata):
"""Callback used by pyspotify"""
logger.debug('Callback called: playlist container loaded')
self.session_manager.refresh_playlists()
count = 0
for playlist in self.session_manager.session.playlist_container():
if playlist.type() == 'playlist':
self.session_manager.playlist_manager.watch(playlist)
count += 1
logger.debug('Watching %d playlist(s) for changes', count)
def playlist_added(self, container, playlist, position, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: playlist added at position %d', position)
# container_loaded() is called after this callback, so we do not need
# to handle this callback.
def playlist_moved(self, container, playlist, old_position, new_position,
userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: playlist "%s" moved from position %d to %d',
playlist.name(), old_position, new_position)
# container_loaded() is called after this callback, so we do not need
# to handle this callback.
def playlist_removed(self, container, playlist, position, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: playlist "%s" removed from position %d',
playlist.name(), position)
# container_loaded() is called after this callback, so we do not need
# to handle this callback.

View File

@ -1,7 +0,0 @@
[spotify]
enabled = true
username =
password =
bitrate = 160
timeout = 10
cache_dir = $XDG_CACHE_DIR/mopidy/spotify

View File

@ -1,211 +0,0 @@
from __future__ import unicode_literals
import logging
import time
import urllib
import pykka
from spotify import Link, SpotifyError
from mopidy.backends import base
from mopidy.models import Track, SearchResult
from . import translator
logger = logging.getLogger('mopidy.backends.spotify')
TRACK_AVAILABLE = 1
class SpotifyTrack(Track):
"""Proxy object for unloaded Spotify tracks."""
def __init__(self, uri=None, track=None):
super(SpotifyTrack, self).__init__()
if (uri and track) or (not uri and not track):
raise AttributeError('uri or track must be provided')
elif uri:
self._spotify_track = Link.from_string(uri).as_track()
elif track:
self._spotify_track = track
self._unloaded_track = Track(uri=uri, name='[loading...]')
self._track = None
@property
def _proxy(self):
if self._track:
return self._track
elif self._spotify_track.is_loaded():
self._track = translator.to_mopidy_track(self._spotify_track)
return self._track
else:
return self._unloaded_track
def __getattribute__(self, name):
if name.startswith('_'):
return super(SpotifyTrack, self).__getattribute__(name)
return self._proxy.__getattribute__(name)
def __repr__(self):
return self._proxy.__repr__()
def __hash__(self):
return hash(self._proxy.uri)
def __eq__(self, other):
if not isinstance(other, Track):
return False
return self._proxy.uri == other.uri
def copy(self, **values):
return self._proxy.copy(**values)
class SpotifyLibraryProvider(base.BaseLibraryProvider):
def __init__(self, *args, **kwargs):
super(SpotifyLibraryProvider, self).__init__(*args, **kwargs)
self._timeout = self.backend.config['spotify']['timeout']
def find_exact(self, query=None, uris=None):
return self.search(query=query, uris=uris)
def lookup(self, uri):
try:
link = Link.from_string(uri)
if link.type() == Link.LINK_TRACK:
return self._lookup_track(uri)
if link.type() == Link.LINK_ALBUM:
return self._lookup_album(uri)
elif link.type() == Link.LINK_ARTIST:
return self._lookup_artist(uri)
elif link.type() == Link.LINK_PLAYLIST:
return self._lookup_playlist(uri)
else:
return []
except SpotifyError as error:
logger.debug(u'Failed to lookup "%s": %s', uri, error)
return []
def _lookup_track(self, uri):
track = Link.from_string(uri).as_track()
self._wait_for_object_to_load(track)
if track.is_loaded():
if track.availability() == TRACK_AVAILABLE:
return [SpotifyTrack(track=track)]
else:
return []
else:
return [SpotifyTrack(uri=uri)]
def _lookup_album(self, uri):
album = Link.from_string(uri).as_album()
album_browser = self.backend.spotify.session.browse_album(album)
self._wait_for_object_to_load(album_browser)
return [
SpotifyTrack(track=t)
for t in album_browser if t.availability() == TRACK_AVAILABLE]
def _lookup_artist(self, uri):
artist = Link.from_string(uri).as_artist()
artist_browser = self.backend.spotify.session.browse_artist(artist)
self._wait_for_object_to_load(artist_browser)
return [
SpotifyTrack(track=t)
for t in artist_browser if t.availability() == TRACK_AVAILABLE]
def _lookup_playlist(self, uri):
playlist = Link.from_string(uri).as_playlist()
self._wait_for_object_to_load(playlist)
return [
SpotifyTrack(track=t)
for t in playlist if t.availability() == TRACK_AVAILABLE]
def _wait_for_object_to_load(self, spotify_obj, timeout=None):
# XXX Sleeping to wait for the Spotify object to load is an ugly hack,
# but it works. We should look into other solutions for this.
if timeout is None:
timeout = self._timeout
wait_until = time.time() + timeout
while not spotify_obj.is_loaded():
time.sleep(0.1)
if time.time() > wait_until:
logger.debug(
'Timeout: Spotify object did not load in %ds', timeout)
return
def refresh(self, uri=None):
pass # TODO
def search(self, query=None, uris=None):
# TODO Only return results within URI roots given by ``uris``
if not query:
return self._get_all_tracks()
uris = query.get('uri', [])
if uris:
tracks = []
for uri in uris:
tracks += self.lookup(uri)
if len(uris) == 1:
uri = uris[0]
else:
uri = 'spotify:search'
return SearchResult(uri=uri, tracks=tracks)
spotify_query = self._translate_search_query(query)
logger.debug('Spotify search query: %s' % spotify_query)
future = pykka.ThreadingFuture()
def callback(results, userdata=None):
search_result = SearchResult(
uri='spotify:search:%s' % (
urllib.quote(results.query().encode('utf-8'))),
albums=[
translator.to_mopidy_album(a) for a in results.albums()],
artists=[
translator.to_mopidy_artist(a) for a in results.artists()],
tracks=[
translator.to_mopidy_track(t) for t in results.tracks()])
future.set(search_result)
if not self.backend.spotify.connected.is_set():
logger.debug('Not connected: Spotify search cancelled')
return SearchResult(uri='spotify:search')
self.backend.spotify.session.search(
spotify_query, callback,
album_count=200, artist_count=200, track_count=200)
try:
return future.get(timeout=self._timeout)
except pykka.Timeout:
logger.debug(
'Timeout: Spotify search did not return in %ds', self._timeout)
return SearchResult(uri='spotify:search')
def _get_all_tracks(self):
# Since we can't search for the entire Spotify library, we return
# all tracks in the playlists when the query is empty.
tracks = []
for playlist in self.backend.playlists.playlists:
tracks += playlist.tracks
return SearchResult(uri='spotify:search', tracks=tracks)
def _translate_search_query(self, mopidy_query):
spotify_query = []
for (field, values) in mopidy_query.iteritems():
if field == 'date':
field = 'year'
if not hasattr(values, '__iter__'):
values = [values]
for value in values:
if field == 'any':
spotify_query.append(value)
elif field == 'year':
value = int(value.split('-')[0]) # Extract year
spotify_query.append('%s:%d' % (field, value))
else:
spotify_query.append('%s:"%s"' % (field, value))
spotify_query = ' '.join(spotify_query)
return spotify_query

View File

@ -1,94 +0,0 @@
from __future__ import unicode_literals
import logging
import functools
from spotify import Link, SpotifyError
from mopidy import audio
from mopidy.backends import base
logger = logging.getLogger('mopidy.backends.spotify')
def need_data_callback(spotify_backend, length_hint):
spotify_backend.playback.on_need_data(length_hint)
def enough_data_callback(spotify_backend):
spotify_backend.playback.on_enough_data()
def seek_data_callback(spotify_backend, time_position):
spotify_backend.playback.on_seek_data(time_position)
class SpotifyPlaybackProvider(base.BasePlaybackProvider):
# These GStreamer caps matches the audio data provided by libspotify
_caps = (
'audio/x-raw-int, endianness=(int)1234, channels=(int)2, '
'width=(int)16, depth=(int)16, signed=(boolean)true, '
'rate=(int)44100')
def __init__(self, *args, **kwargs):
super(SpotifyPlaybackProvider, self).__init__(*args, **kwargs)
self._first_seek = False
def play(self, track):
if track.uri is None:
return False
spotify_backend = self.backend.actor_ref.proxy()
need_data_callback_bound = functools.partial(
need_data_callback, spotify_backend)
enough_data_callback_bound = functools.partial(
enough_data_callback, spotify_backend)
seek_data_callback_bound = functools.partial(
seek_data_callback, spotify_backend)
self._first_seek = True
try:
self.backend.spotify.session.load(
Link.from_string(track.uri).as_track())
self.backend.spotify.session.play(1)
self.backend.spotify.buffer_timestamp = 0
self.audio.prepare_change()
self.audio.set_appsrc(
self._caps,
need_data=need_data_callback_bound,
enough_data=enough_data_callback_bound,
seek_data=seek_data_callback_bound)
self.audio.start_playback()
self.audio.set_metadata(track)
return True
except SpotifyError as e:
logger.info('Playback of %s failed: %s', track.uri, e)
return False
def stop(self):
self.backend.spotify.session.play(0)
return super(SpotifyPlaybackProvider, self).stop()
def on_need_data(self, length_hint):
logger.debug('playback.on_need_data(%d) called', length_hint)
self.backend.spotify.push_audio_data = True
def on_enough_data(self):
logger.debug('playback.on_enough_data() called')
self.backend.spotify.push_audio_data = False
def on_seek_data(self, time_position):
logger.debug('playback.on_seek_data(%d) called', time_position)
if time_position == 0 and self._first_seek:
self._first_seek = False
logger.debug('Skipping seek due to issue #300')
return
self.backend.spotify.buffer_timestamp = audio.millisecond_to_clocktime(
time_position)
self.backend.spotify.session.seek(time_position)

View File

@ -1,105 +0,0 @@
from __future__ import unicode_literals
import datetime
import logging
from spotify.manager import SpotifyPlaylistManager as PyspotifyPlaylistManager
logger = logging.getLogger('mopidy.backends.spotify')
class SpotifyPlaylistManager(PyspotifyPlaylistManager):
def __init__(self, session_manager):
PyspotifyPlaylistManager.__init__(self)
self.session_manager = session_manager
def tracks_added(self, playlist, tracks, position, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: '
'%d track(s) added to position %d in playlist "%s"',
len(tracks), position, playlist.name())
self.session_manager.refresh_playlists()
def tracks_moved(self, playlist, tracks, new_position, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: '
'%d track(s) moved to position %d in playlist "%s"',
len(tracks), new_position, playlist.name())
self.session_manager.refresh_playlists()
def tracks_removed(self, playlist, tracks, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: '
'%d track(s) removed from playlist "%s"',
len(tracks), playlist.name())
self.session_manager.refresh_playlists()
def playlist_renamed(self, playlist, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: Playlist renamed to "%s"', playlist.name())
self.session_manager.refresh_playlists()
def playlist_state_changed(self, playlist, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: The state of playlist "%s" changed',
playlist.name())
def playlist_update_in_progress(self, playlist, done, userdata):
"""Callback used by pyspotify"""
if done:
logger.debug(
'Callback called: Update of playlist "%s" done',
playlist.name())
else:
logger.debug(
'Callback called: Update of playlist "%s" in progress',
playlist.name())
def playlist_metadata_updated(self, playlist, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: Metadata updated for playlist "%s"',
playlist.name())
def track_created_changed(self, playlist, position, user, when, userdata):
"""Callback used by pyspotify"""
when = datetime.datetime.fromtimestamp(when)
logger.debug(
'Callback called: Created by/when for track %d in playlist '
'"%s" changed to user "N/A" and time "%s"',
position, playlist.name(), when)
def track_message_changed(self, playlist, position, message, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: Message for track %d in playlist '
'"%s" changed to "%s"', position, playlist.name(), message)
def track_seen_changed(self, playlist, position, seen, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: Seen attribute for track %d in playlist '
'"%s" changed to "%s"', position, playlist.name(), seen)
def description_changed(self, playlist, description, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: Description changed for playlist "%s" to "%s"',
playlist.name(), description)
def subscribers_changed(self, playlist, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: Subscribers changed for playlist "%s"',
playlist.name())
def image_changed(self, playlist, image, userdata):
"""Callback used by pyspotify"""
logger.debug(
'Callback called: Image changed for playlist "%s"',
playlist.name())

View File

@ -1,22 +0,0 @@
from __future__ import unicode_literals
from mopidy.backends import base
class SpotifyPlaylistsProvider(base.BasePlaylistsProvider):
def create(self, name):
pass # TODO
def delete(self, uri):
pass # TODO
def lookup(self, uri):
for playlist in self._playlists:
if playlist.uri == uri:
return playlist
def refresh(self):
pass # TODO
def save(self, playlist):
pass # TODO

View File

@ -1,201 +0,0 @@
from __future__ import unicode_literals
import logging
import os
import threading
from spotify.manager import SpotifySessionManager as PyspotifySessionManager
from mopidy import audio
from mopidy.backends.listener import BackendListener
from mopidy.utils import process, versioning
from . import translator
from .container_manager import SpotifyContainerManager
from .playlist_manager import SpotifyPlaylistManager
logger = logging.getLogger('mopidy.backends.spotify')
BITRATES = {96: 2, 160: 0, 320: 1}
class SpotifySessionManager(process.BaseThread, PyspotifySessionManager):
cache_location = None
settings_location = None
appkey_file = os.path.join(os.path.dirname(__file__), 'spotify_appkey.key')
user_agent = 'Mopidy %s' % versioning.get_version()
def __init__(self, config, audio, backend_ref):
self.cache_location = config['spotify']['cache_dir']
self.settings_location = config['spotify']['cache_dir']
full_proxy = ''
if config['proxy']['hostname']:
full_proxy = config['proxy']['hostname']
if config['proxy']['port']:
full_proxy += ':' + str(config['proxy']['port'])
if config['proxy']['scheme']:
full_proxy = config['proxy']['scheme'] + "://" + full_proxy
PyspotifySessionManager.__init__(
self, config['spotify']['username'], config['spotify']['password'],
proxy=full_proxy,
proxy_username=config['proxy']['username'],
proxy_password=config['proxy']['password'])
process.BaseThread.__init__(self)
self.name = 'SpotifyThread'
self.audio = audio
self.backend = None
self.backend_ref = backend_ref
self.bitrate = config['spotify']['bitrate']
self.connected = threading.Event()
self.push_audio_data = True
self.buffer_timestamp = 0
self.container_manager = None
self.playlist_manager = None
self._initial_data_receive_completed = False
def run_inside_try(self):
self.backend = self.backend_ref.proxy()
self.connect()
def logged_in(self, session, error):
"""Callback used by pyspotify"""
if error:
logger.error('Spotify login error: %s', error)
return
logger.info('Connected to Spotify')
# To work with both pyspotify 1.9 and 1.10
if not hasattr(self, 'session'):
self.session = session
logger.debug('Preferred Spotify bitrate is %d kbps', self.bitrate)
session.set_preferred_bitrate(BITRATES[self.bitrate])
self.container_manager = SpotifyContainerManager(self)
self.playlist_manager = SpotifyPlaylistManager(self)
self.container_manager.watch(session.playlist_container())
self.connected.set()
def logged_out(self, session):
"""Callback used by pyspotify"""
logger.info('Disconnected from Spotify')
self.connected.clear()
def metadata_updated(self, session):
"""Callback used by pyspotify"""
logger.debug('Callback called: Metadata updated')
def connection_error(self, session, error):
"""Callback used by pyspotify"""
if error is None:
logger.info('Spotify connection OK')
else:
logger.error('Spotify connection error: %s', error)
if self.audio.state.get() == audio.PlaybackState.PLAYING:
self.backend.playback.pause()
def message_to_user(self, session, message):
"""Callback used by pyspotify"""
logger.debug('User message: %s', message.strip())
def music_delivery(self, session, frames, frame_size, num_frames,
sample_type, sample_rate, channels):
"""Callback used by pyspotify"""
if not self.push_audio_data:
return 0
assert sample_type == 0, 'Expects 16-bit signed integer samples'
capabilites = """
audio/x-raw-int,
endianness=(int)1234,
channels=(int)%(channels)d,
width=(int)16,
depth=(int)16,
signed=(boolean)true,
rate=(int)%(sample_rate)d
""" % {
'sample_rate': sample_rate,
'channels': channels,
}
duration = audio.calculate_duration(num_frames, sample_rate)
buffer_ = audio.create_buffer(bytes(frames),
capabilites=capabilites,
timestamp=self.buffer_timestamp,
duration=duration)
self.buffer_timestamp += duration
if self.audio.emit_data(buffer_).get():
return num_frames
else:
return 0
def play_token_lost(self, session):
"""Callback used by pyspotify"""
logger.debug('Play token lost')
self.backend.playback.pause()
def log_message(self, session, data):
"""Callback used by pyspotify"""
logger.debug('System message: %s' % data.strip())
if 'offline-mgr' in data and 'files unlocked' in data:
# XXX This is a very very fragile and ugly hack, but we get no
# proper event when libspotify is done with initial data loading.
# We delay the expensive refresh of Mopidy's playlists until this
# message arrives. This way, we avoid doing the refresh once for
# every playlist or other change. This reduces the time from
# startup until the Spotify backend is ready from 35s to 12s in one
# test with clean Spotify cache. In cases with an outdated cache
# the time improvements should be a lot greater.
if not self._initial_data_receive_completed:
self._initial_data_receive_completed = True
self.refresh_playlists()
def end_of_track(self, session):
"""Callback used by pyspotify"""
logger.debug('End of data stream reached')
self.audio.emit_end_of_stream()
def refresh_playlists(self):
"""Refresh the playlists in the backend with data from Spotify"""
if not self._initial_data_receive_completed:
logger.debug('Still getting data; skipped refresh of playlists')
return
playlists = []
folders = []
for spotify_playlist in self.session.playlist_container():
if spotify_playlist.type() == 'folder_start':
folders.append(spotify_playlist)
if spotify_playlist.type() == 'folder_end':
folders.pop()
playlists.append(translator.to_mopidy_playlist(
spotify_playlist, folders=folders,
bitrate=self.bitrate, username=self.username))
playlists.append(translator.to_mopidy_playlist(
self.session.starred(),
bitrate=self.bitrate, username=self.username))
playlists = filter(None, playlists)
self.backend.playlists.playlists = playlists
logger.info('Loaded %d Spotify playlists', len(playlists))
BackendListener.send('playlists_loaded')
def logout(self):
"""Log out from spotify"""
logger.debug('Logging out from Spotify')
# To work with both pyspotify 1.9 and 1.10
if getattr(self, 'session', None):
self.session.logout()

View File

@ -1,97 +0,0 @@
from __future__ import unicode_literals
import logging
import spotify
from mopidy.models import Artist, Album, Track, Playlist
logger = logging.getLogger('mopidy.backends.spotify')
artist_cache = {}
album_cache = {}
track_cache = {}
def to_mopidy_artist(spotify_artist):
if spotify_artist is None:
return
uri = str(spotify.Link.from_artist(spotify_artist))
if uri in artist_cache:
return artist_cache[uri]
if not spotify_artist.is_loaded():
return Artist(uri=uri, name='[loading...]')
artist_cache[uri] = Artist(uri=uri, name=spotify_artist.name())
return artist_cache[uri]
def to_mopidy_album(spotify_album):
if spotify_album is None:
return
uri = str(spotify.Link.from_album(spotify_album))
if uri in album_cache:
return album_cache[uri]
if not spotify_album.is_loaded():
return Album(uri=uri, name='[loading...]')
album_cache[uri] = Album(
uri=uri,
name=spotify_album.name(),
artists=[to_mopidy_artist(spotify_album.artist())],
date=spotify_album.year())
return album_cache[uri]
def to_mopidy_track(spotify_track, bitrate=None):
if spotify_track is None:
return
uri = str(spotify.Link.from_track(spotify_track, 0))
if uri in track_cache:
return track_cache[uri]
if not spotify_track.is_loaded():
return Track(uri=uri, name='[loading...]')
spotify_album = spotify_track.album()
if spotify_album is not None and spotify_album.is_loaded():
date = spotify_album.year()
else:
date = None
track_cache[uri] = Track(
uri=uri,
name=spotify_track.name(),
artists=[to_mopidy_artist(a) for a in spotify_track.artists()],
album=to_mopidy_album(spotify_track.album()),
track_no=spotify_track.index(),
date=date,
length=spotify_track.duration(),
bitrate=bitrate)
return track_cache[uri]
def to_mopidy_playlist(
spotify_playlist, folders=None, bitrate=None, username=None):
if spotify_playlist is None or spotify_playlist.type() != 'playlist':
return
try:
uri = str(spotify.Link.from_playlist(spotify_playlist))
except spotify.SpotifyError as e:
logger.debug('Spotify playlist translation error: %s', e)
return
if not spotify_playlist.is_loaded():
return Playlist(uri=uri, name='[loading...]')
name = spotify_playlist.name()
if folders:
folder_names = '/'.join(folder.name() for folder in folders)
name = folder_names + '/' + name
tracks = [
to_mopidy_track(spotify_track, bitrate=bitrate)
for spotify_track in spotify_playlist
if not spotify_track.is_local()
]
if not name:
name = 'Starred'
# Tracks in the Starred playlist are in reverse order from the official
# client.
tracks.reverse()
if spotify_playlist.owner().canonical_name() != username:
name += ' by ' + spotify_playlist.owner().canonical_name()
return Playlist(uri=uri, name=name, tracks=tracks)

View File

@ -83,8 +83,7 @@ class Extension(object):
"""List of library updater classes
:returns: list of
:class:`~mopidy.backends.base.BaseLibraryUpdateProvider`
subclasses
:class:`~mopidy.backends.base.BaseLibraryUpdateProvider` subclasses
"""
return []

View File

@ -1,36 +0,0 @@
from __future__ import unicode_literals
import os
import mopidy
from mopidy import config, exceptions, ext
class Extension(ext.Extension):
dist_name = 'Mopidy-MPRIS'
ext_name = 'mpris'
version = mopidy.__version__
def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
return config.read(conf_file)
def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema['desktop_file'] = config.Path()
return schema
def validate_environment(self):
if 'DISPLAY' not in os.environ:
raise exceptions.ExtensionError(
'An X11 $DISPLAY is needed to use D-Bus')
try:
import dbus # noqa
except ImportError as e:
raise exceptions.ExtensionError('dbus library not found', e)
def get_frontend_classes(self):
from .actor import MprisFrontend
return [MprisFrontend]

View File

@ -1,110 +0,0 @@
from __future__ import unicode_literals
import logging
import os
import pykka
from mopidy.core import CoreListener
from mopidy.frontends.mpris import objects
logger = logging.getLogger('mopidy.frontends.mpris')
try:
indicate = None
if 'DISPLAY' in os.environ:
import indicate
except ImportError:
pass
if indicate is None:
logger.debug('Startup notification will not be sent')
class MprisFrontend(pykka.ThreadingActor, CoreListener):
def __init__(self, config, core):
super(MprisFrontend, self).__init__()
self.config = config
self.core = core
self.indicate_server = None
self.mpris_object = None
def on_start(self):
try:
self.mpris_object = objects.MprisObject(self.config, self.core)
self._send_startup_notification()
except Exception as e:
logger.warning('MPRIS frontend setup failed (%s)', e)
self.stop()
def on_stop(self):
logger.debug('Removing MPRIS object from D-Bus connection...')
if self.mpris_object:
self.mpris_object.remove_from_connection()
self.mpris_object = None
logger.debug('Removed MPRIS object from D-Bus connection')
def _send_startup_notification(self):
"""
Send startup notification using libindicate to make Mopidy appear in
e.g. `Ubunt's sound menu <https://wiki.ubuntu.com/SoundMenu>`_.
A reference to the libindicate server is kept for as long as Mopidy is
running. When Mopidy exits, the server will be unreferenced and Mopidy
will automatically be unregistered from e.g. the sound menu.
"""
if not indicate:
return
logger.debug('Sending startup notification...')
self.indicate_server = indicate.Server()
self.indicate_server.set_type('music.mopidy')
self.indicate_server.set_desktop_file(
self.config['mpris']['desktop_file'])
self.indicate_server.show()
logger.debug('Startup notification sent')
def _emit_properties_changed(self, interface, changed_properties):
if self.mpris_object is None:
return
props_with_new_values = [
(p, self.mpris_object.Get(interface, p))
for p in changed_properties]
self.mpris_object.PropertiesChanged(
interface, dict(props_with_new_values), [])
def track_playback_paused(self, tl_track, time_position):
logger.debug('Received track_playback_paused event')
self._emit_properties_changed(objects.PLAYER_IFACE, ['PlaybackStatus'])
def track_playback_resumed(self, tl_track, time_position):
logger.debug('Received track_playback_resumed event')
self._emit_properties_changed(objects.PLAYER_IFACE, ['PlaybackStatus'])
def track_playback_started(self, tl_track):
logger.debug('Received track_playback_started event')
self._emit_properties_changed(
objects.PLAYER_IFACE, ['PlaybackStatus', 'Metadata'])
def track_playback_ended(self, tl_track, time_position):
logger.debug('Received track_playback_ended event')
self._emit_properties_changed(
objects.PLAYER_IFACE, ['PlaybackStatus', 'Metadata'])
def volume_changed(self, volume):
logger.debug('Received volume_changed event')
self._emit_properties_changed(objects.PLAYER_IFACE, ['Volume'])
def seeked(self, time_position_in_ms):
logger.debug('Received seeked event')
self.mpris_object.Seeked(time_position_in_ms * 1000)
def playlists_loaded(self):
logger.debug('Received playlists_loaded event')
self._emit_properties_changed(
objects.PLAYLISTS_IFACE, ['PlaylistCount'])
def playlist_changed(self, playlist):
logger.debug('Received playlist_changed event')
playlist_id = self.mpris_object.get_playlist_id(playlist.uri)
playlist = (playlist_id, playlist.name, '')
self.mpris_object.PlaylistChanged(playlist)

View File

@ -1,3 +0,0 @@
[mpris]
enabled = true
desktop_file = /usr/share/applications/mopidy.desktop

View File

@ -1,498 +0,0 @@
from __future__ import unicode_literals
import base64
import logging
import os
import dbus
import dbus.mainloop.glib
import dbus.service
import gobject
from mopidy.core import PlaybackState
from mopidy.utils.process import exit_process
logger = logging.getLogger('mopidy.frontends.mpris')
# Must be done before dbus.SessionBus() is called
gobject.threads_init()
dbus.mainloop.glib.threads_init()
BUS_NAME = 'org.mpris.MediaPlayer2.mopidy'
OBJECT_PATH = '/org/mpris/MediaPlayer2'
ROOT_IFACE = 'org.mpris.MediaPlayer2'
PLAYER_IFACE = 'org.mpris.MediaPlayer2.Player'
PLAYLISTS_IFACE = 'org.mpris.MediaPlayer2.Playlists'
class MprisObject(dbus.service.Object):
"""Implements http://www.mpris.org/2.2/spec/"""
properties = None
def __init__(self, config, core):
self.config = config
self.core = core
self.properties = {
ROOT_IFACE: self._get_root_iface_properties(),
PLAYER_IFACE: self._get_player_iface_properties(),
PLAYLISTS_IFACE: self._get_playlists_iface_properties(),
}
bus_name = self._connect_to_dbus()
dbus.service.Object.__init__(self, bus_name, OBJECT_PATH)
def _get_root_iface_properties(self):
return {
'CanQuit': (True, None),
'Fullscreen': (False, None),
'CanSetFullscreen': (False, None),
'CanRaise': (False, None),
# NOTE Change if adding optional track list support
'HasTrackList': (False, None),
'Identity': ('Mopidy', None),
'DesktopEntry': (self.get_DesktopEntry, None),
'SupportedUriSchemes': (self.get_SupportedUriSchemes, None),
# NOTE Return MIME types supported by local backend if support for
# reporting supported MIME types is added
'SupportedMimeTypes': (dbus.Array([], signature='s'), None),
}
def _get_player_iface_properties(self):
return {
'PlaybackStatus': (self.get_PlaybackStatus, None),
'LoopStatus': (self.get_LoopStatus, self.set_LoopStatus),
'Rate': (1.0, self.set_Rate),
'Shuffle': (self.get_Shuffle, self.set_Shuffle),
'Metadata': (self.get_Metadata, None),
'Volume': (self.get_Volume, self.set_Volume),
'Position': (self.get_Position, None),
'MinimumRate': (1.0, None),
'MaximumRate': (1.0, None),
'CanGoNext': (self.get_CanGoNext, None),
'CanGoPrevious': (self.get_CanGoPrevious, None),
'CanPlay': (self.get_CanPlay, None),
'CanPause': (self.get_CanPause, None),
'CanSeek': (self.get_CanSeek, None),
'CanControl': (self.get_CanControl, None),
}
def _get_playlists_iface_properties(self):
return {
'PlaylistCount': (self.get_PlaylistCount, None),
'Orderings': (self.get_Orderings, None),
'ActivePlaylist': (self.get_ActivePlaylist, None),
}
def _connect_to_dbus(self):
logger.debug('Connecting to D-Bus...')
mainloop = dbus.mainloop.glib.DBusGMainLoop()
bus_name = dbus.service.BusName(
BUS_NAME, dbus.SessionBus(mainloop=mainloop))
logger.info('MPRIS server connected to D-Bus')
return bus_name
def get_playlist_id(self, playlist_uri):
# Only A-Za-z0-9_ is allowed, which is 63 chars, so we can't use
# base64. Luckily, D-Bus does not limit the length of object paths.
# Since base32 pads trailing bytes with "=" chars, we need to replace
# them with an allowed character such as "_".
encoded_uri = base64.b32encode(playlist_uri).replace('=', '_')
return '/com/mopidy/playlist/%s' % encoded_uri
def get_playlist_uri(self, playlist_id):
encoded_uri = playlist_id.split('/')[-1].replace('_', '=')
return base64.b32decode(encoded_uri)
def get_track_id(self, tl_track):
return '/com/mopidy/track/%d' % tl_track.tlid
def get_track_tlid(self, track_id):
assert track_id.startswith('/com/mopidy/track/')
return track_id.split('/')[-1]
### Properties interface
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='ss', out_signature='v')
def Get(self, interface, prop):
logger.debug(
'%s.Get(%s, %s) called',
dbus.PROPERTIES_IFACE, repr(interface), repr(prop))
(getter, _) = self.properties[interface][prop]
if callable(getter):
return getter()
else:
return getter
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='s', out_signature='a{sv}')
def GetAll(self, interface):
logger.debug(
'%s.GetAll(%s) called', dbus.PROPERTIES_IFACE, repr(interface))
getters = {}
for key, (getter, _) in self.properties[interface].iteritems():
getters[key] = getter() if callable(getter) else getter
return getters
@dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE,
in_signature='ssv', out_signature='')
def Set(self, interface, prop, value):
logger.debug(
'%s.Set(%s, %s, %s) called',
dbus.PROPERTIES_IFACE, repr(interface), repr(prop), repr(value))
_, setter = self.properties[interface][prop]
if setter is not None:
setter(value)
self.PropertiesChanged(
interface, {prop: self.Get(interface, prop)}, [])
@dbus.service.signal(dbus_interface=dbus.PROPERTIES_IFACE,
signature='sa{sv}as')
def PropertiesChanged(self, interface, changed_properties,
invalidated_properties):
logger.debug(
'%s.PropertiesChanged(%s, %s, %s) signaled',
dbus.PROPERTIES_IFACE, interface, changed_properties,
invalidated_properties)
### Root interface methods
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Raise(self):
logger.debug('%s.Raise called', ROOT_IFACE)
# Do nothing, as we do not have a GUI
@dbus.service.method(dbus_interface=ROOT_IFACE)
def Quit(self):
logger.debug('%s.Quit called', ROOT_IFACE)
exit_process()
### Root interface properties
def get_DesktopEntry(self):
return os.path.splitext(os.path.basename(
self.config['mpris']['desktop_file']))[0]
def get_SupportedUriSchemes(self):
return dbus.Array(self.core.uri_schemes.get(), signature='s')
### Player interface methods
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Next(self):
logger.debug('%s.Next called', PLAYER_IFACE)
if not self.get_CanGoNext():
logger.debug('%s.Next not allowed', PLAYER_IFACE)
return
self.core.playback.next().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Previous(self):
logger.debug('%s.Previous called', PLAYER_IFACE)
if not self.get_CanGoPrevious():
logger.debug('%s.Previous not allowed', PLAYER_IFACE)
return
self.core.playback.previous().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Pause(self):
logger.debug('%s.Pause called', PLAYER_IFACE)
if not self.get_CanPause():
logger.debug('%s.Pause not allowed', PLAYER_IFACE)
return
self.core.playback.pause().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def PlayPause(self):
logger.debug('%s.PlayPause called', PLAYER_IFACE)
if not self.get_CanPause():
logger.debug('%s.PlayPause not allowed', PLAYER_IFACE)
return
state = self.core.playback.state.get()
if state == PlaybackState.PLAYING:
self.core.playback.pause().get()
elif state == PlaybackState.PAUSED:
self.core.playback.resume().get()
elif state == PlaybackState.STOPPED:
self.core.playback.play().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Stop(self):
logger.debug('%s.Stop called', PLAYER_IFACE)
if not self.get_CanControl():
logger.debug('%s.Stop not allowed', PLAYER_IFACE)
return
self.core.playback.stop().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Play(self):
logger.debug('%s.Play called', PLAYER_IFACE)
if not self.get_CanPlay():
logger.debug('%s.Play not allowed', PLAYER_IFACE)
return
state = self.core.playback.state.get()
if state == PlaybackState.PAUSED:
self.core.playback.resume().get()
else:
self.core.playback.play().get()
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def Seek(self, offset):
logger.debug('%s.Seek called', PLAYER_IFACE)
if not self.get_CanSeek():
logger.debug('%s.Seek not allowed', PLAYER_IFACE)
return
offset_in_milliseconds = offset // 1000
current_position = self.core.playback.time_position.get()
new_position = current_position + offset_in_milliseconds
self.core.playback.seek(new_position)
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def SetPosition(self, track_id, position):
logger.debug('%s.SetPosition called', PLAYER_IFACE)
if not self.get_CanSeek():
logger.debug('%s.SetPosition not allowed', PLAYER_IFACE)
return
position = position // 1000
current_tl_track = self.core.playback.current_tl_track.get()
if current_tl_track is None:
return
if track_id != self.get_track_id(current_tl_track):
return
if position < 0:
return
if current_tl_track.track.length < position:
return
self.core.playback.seek(position)
@dbus.service.method(dbus_interface=PLAYER_IFACE)
def OpenUri(self, uri):
logger.debug('%s.OpenUri called', PLAYER_IFACE)
if not self.get_CanPlay():
# NOTE The spec does not explictly require this check, but guarding
# the other methods doesn't help much if OpenUri is open for use.
logger.debug('%s.Play not allowed', PLAYER_IFACE)
return
# NOTE Check if URI has MIME type known to the backend, if MIME support
# is added to the backend.
tl_tracks = self.core.tracklist.add(uri=uri).get()
if tl_tracks:
self.core.playback.play(tl_tracks[0])
else:
logger.debug('Track with URI "%s" not found in library.', uri)
### Player interface signals
@dbus.service.signal(dbus_interface=PLAYER_IFACE, signature='x')
def Seeked(self, position):
logger.debug('%s.Seeked signaled', PLAYER_IFACE)
# Do nothing, as just calling the method is enough to emit the signal.
### Player interface properties
def get_PlaybackStatus(self):
state = self.core.playback.state.get()
if state == PlaybackState.PLAYING:
return 'Playing'
elif state == PlaybackState.PAUSED:
return 'Paused'
elif state == PlaybackState.STOPPED:
return 'Stopped'
def get_LoopStatus(self):
repeat = self.core.playback.repeat.get()
single = self.core.playback.single.get()
if not repeat:
return 'None'
else:
if single:
return 'Track'
else:
return 'Playlist'
def set_LoopStatus(self, value):
if not self.get_CanControl():
logger.debug('Setting %s.LoopStatus not allowed', PLAYER_IFACE)
return
if value == 'None':
self.core.playback.repeat = False
self.core.playback.single = False
elif value == 'Track':
self.core.playback.repeat = True
self.core.playback.single = True
elif value == 'Playlist':
self.core.playback.repeat = True
self.core.playback.single = False
def set_Rate(self, value):
if not self.get_CanControl():
# NOTE The spec does not explictly require this check, but it was
# added to be consistent with all the other property setters.
logger.debug('Setting %s.Rate not allowed', PLAYER_IFACE)
return
if value == 0:
self.Pause()
def get_Shuffle(self):
return self.core.playback.random.get()
def set_Shuffle(self, value):
if not self.get_CanControl():
logger.debug('Setting %s.Shuffle not allowed', PLAYER_IFACE)
return
if value:
self.core.playback.random = True
else:
self.core.playback.random = False
def get_Metadata(self):
current_tl_track = self.core.playback.current_tl_track.get()
if current_tl_track is None:
return {'mpris:trackid': ''}
else:
(_, track) = current_tl_track
metadata = {'mpris:trackid': self.get_track_id(current_tl_track)}
if track.length:
metadata['mpris:length'] = track.length * 1000
if track.uri:
metadata['xesam:url'] = track.uri
if track.name:
metadata['xesam:title'] = track.name
if track.artists:
artists = list(track.artists)
artists.sort(key=lambda a: a.name)
metadata['xesam:artist'] = dbus.Array(
[a.name for a in artists if a.name], signature='s')
if track.album and track.album.name:
metadata['xesam:album'] = track.album.name
if track.album and track.album.artists:
artists = list(track.album.artists)
artists.sort(key=lambda a: a.name)
metadata['xesam:albumArtist'] = dbus.Array(
[a.name for a in artists if a.name], signature='s')
if track.album and track.album.images:
url = list(track.album.images)[0]
if url:
metadata['mpris:artUrl'] = url
if track.disc_no:
metadata['xesam:discNumber'] = track.disc_no
if track.track_no:
metadata['xesam:trackNumber'] = track.track_no
return dbus.Dictionary(metadata, signature='sv')
def get_Volume(self):
volume = self.core.playback.volume.get()
if volume is None:
return 0
return volume / 100.0
def set_Volume(self, value):
if not self.get_CanControl():
logger.debug('Setting %s.Volume not allowed', PLAYER_IFACE)
return
if value is None:
return
elif value < 0:
self.core.playback.volume = 0
elif value > 1:
self.core.playback.volume = 100
elif 0 <= value <= 1:
self.core.playback.volume = int(value * 100)
def get_Position(self):
return self.core.playback.time_position.get() * 1000
def get_CanGoNext(self):
if not self.get_CanControl():
return False
return (
self.core.playback.tl_track_at_next.get() !=
self.core.playback.current_tl_track.get())
def get_CanGoPrevious(self):
if not self.get_CanControl():
return False
return (
self.core.playback.tl_track_at_previous.get() !=
self.core.playback.current_tl_track.get())
def get_CanPlay(self):
if not self.get_CanControl():
return False
return (
self.core.playback.current_tl_track.get() is not None or
self.core.playback.tl_track_at_next.get() is not None)
def get_CanPause(self):
if not self.get_CanControl():
return False
# NOTE Should be changed to vary based on capabilities of the current
# track if Mopidy starts supporting non-seekable media, like streams.
return True
def get_CanSeek(self):
if not self.get_CanControl():
return False
# NOTE Should be changed to vary based on capabilities of the current
# track if Mopidy starts supporting non-seekable media, like streams.
return True
def get_CanControl(self):
# NOTE This could be a setting for the end user to change.
return True
### Playlists interface methods
@dbus.service.method(dbus_interface=PLAYLISTS_IFACE)
def ActivatePlaylist(self, playlist_id):
logger.debug(
'%s.ActivatePlaylist(%r) called', PLAYLISTS_IFACE, playlist_id)
playlist_uri = self.get_playlist_uri(playlist_id)
playlist = self.core.playlists.lookup(playlist_uri).get()
if playlist and playlist.tracks:
tl_tracks = self.core.tracklist.add(playlist.tracks).get()
self.core.playback.play(tl_tracks[0])
@dbus.service.method(dbus_interface=PLAYLISTS_IFACE)
def GetPlaylists(self, index, max_count, order, reverse):
logger.debug(
'%s.GetPlaylists(%r, %r, %r, %r) called',
PLAYLISTS_IFACE, index, max_count, order, reverse)
playlists = self.core.playlists.playlists.get()
if order == 'Alphabetical':
playlists.sort(key=lambda p: p.name, reverse=reverse)
elif order == 'Modified':
playlists.sort(key=lambda p: p.last_modified, reverse=reverse)
elif order == 'User' and reverse:
playlists.reverse()
slice_end = index + max_count
playlists = playlists[index:slice_end]
results = [
(self.get_playlist_id(p.uri), p.name, '')
for p in playlists]
return dbus.Array(results, signature='(oss)')
### Playlists interface signals
@dbus.service.signal(dbus_interface=PLAYLISTS_IFACE, signature='(oss)')
def PlaylistChanged(self, playlist):
logger.debug('%s.PlaylistChanged signaled', PLAYLISTS_IFACE)
# Do nothing, as just calling the method is enough to emit the signal.
### Playlists interface properties
def get_PlaylistCount(self):
return len(self.core.playlists.playlists.get())
def get_Orderings(self):
return [
'Alphabetical', # Order by playlist.name
'Modified', # Order by playlist.last_modified
'User', # Don't change order
]
def get_ActivePlaylist(self):
playlist_is_valid = False
playlist = ('/', 'None', '')
return (playlist_is_valid, playlist)

View File

@ -1,33 +0,0 @@
from __future__ import unicode_literals
import os
import mopidy
from mopidy import config, exceptions, ext
class Extension(ext.Extension):
dist_name = 'Mopidy-Scrobbler'
ext_name = 'scrobbler'
version = mopidy.__version__
def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
return config.read(conf_file)
def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema['username'] = config.String()
schema['password'] = config.Secret()
return schema
def validate_environment(self):
try:
import pylast # noqa
except ImportError as e:
raise exceptions.ExtensionError('pylast library not found', e)
def get_frontend_classes(self):
from .actor import ScrobblerFrontend
return [ScrobblerFrontend]

View File

@ -1,81 +0,0 @@
from __future__ import unicode_literals
import logging
import time
import pykka
import pylast
from mopidy.core import CoreListener
logger = logging.getLogger('mopidy.frontends.scrobbler')
API_KEY = '2236babefa8ebb3d93ea467560d00d04'
API_SECRET = '94d9a09c0cd5be955c4afaeaffcaefcd'
class ScrobblerFrontend(pykka.ThreadingActor, CoreListener):
def __init__(self, config, core):
super(ScrobblerFrontend, self).__init__()
self.config = config
self.lastfm = None
self.last_start_time = None
def on_start(self):
try:
self.lastfm = pylast.LastFMNetwork(
api_key=API_KEY, api_secret=API_SECRET,
username=self.config['scrobbler']['username'],
password_hash=pylast.md5(self.config['scrobbler']['password']))
logger.info('Scrobbler connected to Last.fm')
except (pylast.NetworkError, pylast.MalformedResponseError,
pylast.WSError) as e:
logger.error('Error during Last.fm setup: %s', e)
self.stop()
def track_playback_started(self, tl_track):
track = tl_track.track
artists = ', '.join([a.name for a in track.artists])
duration = track.length and track.length // 1000 or 0
self.last_start_time = int(time.time())
logger.debug('Now playing track: %s - %s', artists, track.name)
try:
self.lastfm.update_now_playing(
artists,
(track.name or ''),
album=(track.album and track.album.name or ''),
duration=str(duration),
track_number=str(track.track_no),
mbid=(track.musicbrainz_id or ''))
except (pylast.ScrobblingError, pylast.NetworkError,
pylast.MalformedResponseError, pylast.WSError) as e:
logger.warning('Error submitting playing track to Last.fm: %s', e)
def track_playback_ended(self, tl_track, time_position):
track = tl_track.track
artists = ', '.join([a.name for a in track.artists])
duration = track.length and track.length // 1000 or 0
time_position = time_position // 1000
if duration < 30:
logger.debug('Track too short to scrobble. (30s)')
return
if time_position < duration // 2 and time_position < 240:
logger.debug(
'Track not played long enough to scrobble. (50% or 240s)')
return
if self.last_start_time is None:
self.last_start_time = int(time.time()) - duration
logger.debug('Scrobbling track: %s - %s', artists, track.name)
try:
self.lastfm.scrobble(
artists,
(track.name or ''),
str(self.last_start_time),
album=(track.album and track.album.name or ''),
track_number=str(track.track_no),
duration=str(duration),
mbid=(track.musicbrainz_id or ''))
except (pylast.ScrobblingError, pylast.NetworkError,
pylast.MalformedResponseError, pylast.WSError) as e:
logger.warning('Error submitting played track to Last.fm: %s', e)

View File

@ -1,4 +0,0 @@
[scrobbler]
enabled = true
username =
password =

View File

@ -1,3 +0,0 @@
pylast >= 0.5.7
# Available as python-pylast in newer Debian/Ubuntu and from apt.mopidy.com for
# older releases of Debian/Ubuntu

View File

@ -1,8 +0,0 @@
pyspotify >= 1.9, < 2
# The libspotify Python wrapper
# Available as the python-spotify package from apt.mopidy.com
# libspotify >= 12, < 13
# The libspotify C library from
# https://developer.spotify.com/technologies/libspotify/
# Available as the libspotify12 package from apt.mopidy.com

View File

@ -28,8 +28,6 @@ setup(
'Pykka >= 1.1',
],
extras_require={
'spotify': ['pyspotify >= 1.9, < 2'],
'scrobbler': ['pylast >= 0.5.7'],
'http': ['cherrypy >= 3.2.2', 'ws4py >= 0.2.3'],
},
test_suite='nose.collector',
@ -45,11 +43,8 @@ setup(
],
'mopidy.ext': [
'http = mopidy.frontends.http:Extension [http]',
'scrobbler = mopidy.frontends.scrobbler:Extension [scrobbler]',
'local = mopidy.backends.local:Extension',
'mpd = mopidy.frontends.mpd:Extension',
'mpris = mopidy.frontends.mpris:Extension',
'spotify = mopidy.backends.spotify:Extension [spotify]',
'stream = mopidy.backends.stream:Extension',
],
},

View File

@ -6,6 +6,9 @@ import pygst
pygst.require('0.10')
import gst
import gobject
gobject.threads_init()
import pykka
from mopidy import audio
@ -80,6 +83,18 @@ class AudioTest(unittest.TestCase):
self.assertTrue(self.audio.set_volume(value).get())
self.assertEqual(value, self.audio.get_volume().get())
def test_set_volume_with_mixer_min_equal_max(self):
config = {
'audio': {
'mixer': 'fakemixer track_max_volume=0',
'mixer_track': None,
'output': 'fakesink',
'visualizer': None,
}
}
self.audio = audio.Audio.start(config=config).proxy()
self.assertEqual(0, self.audio.get_volume().get())
@unittest.SkipTest
def test_set_state_encapsulation(self):
pass # TODO

View File

@ -0,0 +1,128 @@
#encoding: utf-8
from __future__ import unicode_literals
import io
import unittest
from mopidy.audio import playlists
BAD = b'foobarbaz'
M3U = b"""#EXTM3U
#EXTINF:123, Sample artist - Sample title
file:///tmp/foo
#EXTINF:321,Example Artist - Example title
file:///tmp/bar
#EXTINF:213,Some Artist - Other title
file:///tmp/baz
"""
PLS = b"""[Playlist]
NumberOfEntries=3
File1=file:///tmp/foo
Title1=Sample Title
Length1=123
File2=file:///tmp/bar
Title2=Example title
Length2=321
File3=file:///tmp/baz
Title3=Other title
Length3=213
Version=2
"""
ASX = b"""<asx version="3.0">
<title>Example</title>
<entry>
<title>Sample Title</title>
<ref href="file:///tmp/foo" />
</entry>
<entry>
<title>Example title</title>
<ref href="file:///tmp/bar" />
</entry>
<entry>
<title>Other title</title>
<ref href="file:///tmp/baz" />
</entry>
</asx>
"""
XSPF = b"""<?xml version="1.0" encoding="UTF-8"?>
<playlist version="1" xmlns="http://xspf.org/ns/0/">
<trackList>
<track>
<title>Sample Title</title>
<location>file:///tmp/foo</location>
</track>
<track>
<title>Example title</title>
<location>file:///tmp/bar</location>
</track>
<track>
<title>Other title</title>
<location>file:///tmp/baz</location>
</track>
</trackList>
</playlist>
"""
class TypeFind(object):
def __init__(self, data):
self.data = data
def peek(self, start, end):
return self.data[start:end]
class BasePlaylistTest(object):
valid = None
invalid = None
detect = None
parse = None
def test_detect_valid_header(self):
self.assertTrue(self.detect(TypeFind(self.valid)))
def test_detect_invalid_header(self):
self.assertFalse(self.detect(TypeFind(self.invalid)))
def test_parse_valid_playlist(self):
uris = list(self.parse(io.BytesIO(self.valid)))
expected = [b'file:///tmp/foo', b'file:///tmp/bar', b'file:///tmp/baz']
self.assertEqual(uris, expected)
def test_parse_invalid_playlist(self):
uris = list(self.parse(io.BytesIO(self.invalid)))
self.assertEqual(uris, [])
class M3uPlaylistTest(BasePlaylistTest, unittest.TestCase):
valid = M3U
invalid = BAD
detect = staticmethod(playlists.detect_m3u_header)
parse = staticmethod(playlists.parse_m3u)
class PlsPlaylistTest(BasePlaylistTest, unittest.TestCase):
valid = PLS
invalid = BAD
detect = staticmethod(playlists.detect_pls_header)
parse = staticmethod(playlists.parse_pls)
class AsxPlsPlaylistTest(BasePlaylistTest, unittest.TestCase):
valid = ASX
invalid = BAD
detect = staticmethod(playlists.detect_asx_header)
parse = staticmethod(playlists.parse_asx)
class XspfPlaylistTest(BasePlaylistTest, unittest.TestCase):
valid = XSPF
invalid = BAD
detect = staticmethod(playlists.detect_xspf_header)
parse = staticmethod(playlists.parse_xspf)

View File

@ -1 +0,0 @@
from __future__ import unicode_literals

View File

@ -1,92 +0,0 @@
from __future__ import unicode_literals
import mock
import unittest
try:
import dbus
except ImportError:
dbus = False
from mopidy.models import Playlist, TlTrack
if dbus:
from mopidy.frontends.mpris import actor, objects
@unittest.skipUnless(dbus, 'dbus not found')
class BackendEventsTest(unittest.TestCase):
def setUp(self):
# As a plain class, not an actor:
self.mpris_frontend = actor.MprisFrontend(config=None, core=None)
self.mpris_object = mock.Mock(spec=objects.MprisObject)
self.mpris_frontend.mpris_object = self.mpris_object
def test_track_playback_paused_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Paused'
self.mpris_frontend.track_playback_paused(TlTrack(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'PlaybackStatus': 'Paused'}, [])
def test_track_playback_resumed_event_changes_playback_status(self):
self.mpris_object.Get.return_value = 'Playing'
self.mpris_frontend.track_playback_resumed(TlTrack(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'PlaybackStatus': 'Playing'}, [])
def test_track_playback_started_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_started(TlTrack())
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_track_playback_ended_changes_playback_status_and_metadata(self):
self.mpris_object.Get.return_value = '...'
self.mpris_frontend.track_playback_ended(TlTrack(), 0)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'PlaybackStatus'), {}),
((objects.PLAYER_IFACE, 'Metadata'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE,
{'Metadata': '...', 'PlaybackStatus': '...'}, [])
def test_volume_changed_event_changes_volume(self):
self.mpris_object.Get.return_value = 1.0
self.mpris_frontend.volume_changed(volume=100)
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYER_IFACE, 'Volume'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYER_IFACE, {'Volume': 1.0}, [])
def test_seeked_event_causes_mpris_seeked_event(self):
self.mpris_frontend.seeked(31000)
self.mpris_object.Seeked.assert_called_with(31000000)
def test_playlists_loaded_event_changes_playlist_count(self):
self.mpris_object.Get.return_value = 17
self.mpris_frontend.playlists_loaded()
self.assertListEqual(self.mpris_object.Get.call_args_list, [
((objects.PLAYLISTS_IFACE, 'PlaylistCount'), {}),
])
self.mpris_object.PropertiesChanged.assert_called_with(
objects.PLAYLISTS_IFACE, {'PlaylistCount': 17}, [])
def test_playlist_changed_event_causes_mpris_playlist_changed_event(self):
self.mpris_object.get_playlist_id.return_value = 'id-for-dummy:foo'
playlist = Playlist(uri='dummy:foo', name='foo')
self.mpris_frontend.playlist_changed(playlist)
self.mpris_object.PlaylistChanged.assert_called_with(
('id-for-dummy:foo', 'foo', ''))

View File

@ -1,869 +0,0 @@
from __future__ import unicode_literals
import mock
import unittest
import pykka
try:
import dbus
except ImportError:
dbus = False
from mopidy import core
from mopidy.backends import dummy
from mopidy.core import PlaybackState
from mopidy.models import Album, Artist, Track
if dbus:
from mopidy.frontends.mpris import objects
PLAYING = PlaybackState.PLAYING
PAUSED = PlaybackState.PAUSED
STOPPED = PlaybackState.STOPPED
@unittest.skipUnless(dbus, 'dbus not found')
class PlayerInterfaceTest(unittest.TestCase):
def setUp(self):
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = dummy.create_dummy_backend_proxy()
self.core = core.Core.start(backends=[self.backend]).proxy()
self.mpris = objects.MprisObject(config={}, core=self.core)
def tearDown(self):
pykka.ActorRegistry.stop_all()
def test_get_playback_status_is_playing_when_playing(self):
self.core.playback.state = PLAYING
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Playing', result)
def test_get_playback_status_is_paused_when_paused(self):
self.core.playback.state = PAUSED
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Paused', result)
def test_get_playback_status_is_stopped_when_stopped(self):
self.core.playback.state = STOPPED
result = self.mpris.Get(objects.PLAYER_IFACE, 'PlaybackStatus')
self.assertEqual('Stopped', result)
def test_get_loop_status_is_none_when_not_looping(self):
self.core.playback.repeat = False
self.core.playback.single = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('None', result)
def test_get_loop_status_is_track_when_looping_a_single_track(self):
self.core.playback.repeat = True
self.core.playback.single = True
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Track', result)
def test_get_loop_status_is_playlist_when_looping_tracklist(self):
self.core.playback.repeat = True
self.core.playback.single = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'LoopStatus')
self.assertEqual('Playlist', result)
def test_set_loop_status_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.playback.repeat = True
self.core.playback.single = True
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEqual(self.core.playback.repeat.get(), True)
self.assertEqual(self.core.playback.single.get(), True)
def test_set_loop_status_to_none_unsets_repeat_and_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'None')
self.assertEqual(self.core.playback.repeat.get(), False)
self.assertEqual(self.core.playback.single.get(), False)
def test_set_loop_status_to_track_sets_repeat_and_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Track')
self.assertEqual(self.core.playback.repeat.get(), True)
self.assertEqual(self.core.playback.single.get(), True)
def test_set_loop_status_to_playlists_sets_repeat_and_not_single(self):
self.mpris.Set(objects.PLAYER_IFACE, 'LoopStatus', 'Playlist')
self.assertEqual(self.core.playback.repeat.get(), True)
self.assertEqual(self.core.playback.single.get(), False)
def test_get_rate_is_greater_or_equal_than_minimum_rate(self):
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
minimum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assertGreaterEqual(rate, minimum_rate)
def test_get_rate_is_less_or_equal_than_maximum_rate(self):
rate = self.mpris.Get(objects.PLAYER_IFACE, 'Rate')
maximum_rate = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assertGreaterEqual(rate, maximum_rate)
def test_set_rate_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_set_rate_to_zero_pauses_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Set(objects.PLAYER_IFACE, 'Rate', 0)
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_get_shuffle_returns_true_if_random_is_active(self):
self.core.playback.random = True
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertTrue(result)
def test_get_shuffle_returns_false_if_random_is_inactive(self):
self.core.playback.random = False
result = self.mpris.Get(objects.PLAYER_IFACE, 'Shuffle')
self.assertFalse(result)
def test_set_shuffle_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.playback.random = False
self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertFalse(self.core.playback.random.get())
def test_set_shuffle_to_true_activates_random_mode(self):
self.core.playback.random = False
self.assertFalse(self.core.playback.random.get())
self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', True)
self.assertTrue(self.core.playback.random.get())
def test_set_shuffle_to_false_deactivates_random_mode(self):
self.core.playback.random = True
self.assertTrue(self.core.playback.random.get())
self.mpris.Set(objects.PLAYER_IFACE, 'Shuffle', False)
self.assertFalse(self.core.playback.random.get())
def test_get_metadata_has_trackid_even_when_no_current_track(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:trackid', result.keys())
self.assertEqual(result['mpris:trackid'], '')
def test_get_metadata_has_trackid_based_on_tlid(self):
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.play()
(tlid, track) = self.core.playback.current_tl_track.get()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:trackid', result.keys())
self.assertEqual(
result['mpris:trackid'], '/com/mopidy/track/%d' % tlid)
def test_get_metadata_has_track_length(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:length', result.keys())
self.assertEqual(result['mpris:length'], 40000000)
def test_get_metadata_has_track_uri(self):
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:url', result.keys())
self.assertEqual(result['xesam:url'], 'dummy:a')
def test_get_metadata_has_track_title(self):
self.core.tracklist.add([Track(name='a')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:title', result.keys())
self.assertEqual(result['xesam:title'], 'a')
def test_get_metadata_has_track_artists(self):
self.core.tracklist.add([Track(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)])])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:artist', result.keys())
self.assertEqual(result['xesam:artist'], ['a', 'b'])
def test_get_metadata_has_track_album(self):
self.core.tracklist.add([Track(album=Album(name='a'))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:album', result.keys())
self.assertEqual(result['xesam:album'], 'a')
def test_get_metadata_has_track_album_artists(self):
self.core.tracklist.add([Track(album=Album(artists=[
Artist(name='a'), Artist(name='b'), Artist(name=None)]))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:albumArtist', result.keys())
self.assertEqual(result['xesam:albumArtist'], ['a', 'b'])
def test_get_metadata_use_first_album_image_as_art_url(self):
# XXX Currently, the album image order isn't preserved because they
# are stored as a frozenset(). We pick the first in the set, which is
# sorted alphabetically, thus we get 'bar.jpg', not 'foo.jpg', which
# would probably make more sense.
self.core.tracklist.add([Track(album=Album(images=[
'http://example.com/foo.jpg', 'http://example.com/bar.jpg']))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('mpris:artUrl', result.keys())
self.assertEqual(result['mpris:artUrl'], 'http://example.com/bar.jpg')
def test_get_metadata_has_no_art_url_if_no_album(self):
self.core.tracklist.add([Track()])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertNotIn('mpris:artUrl', result.keys())
def test_get_metadata_has_no_art_url_if_no_album_images(self):
self.core.tracklist.add([Track(Album(images=[]))])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertNotIn('mpris:artUrl', result.keys())
def test_get_metadata_has_disc_number_in_album(self):
self.core.tracklist.add([Track(disc_no=2)])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:discNumber', result.keys())
self.assertEqual(result['xesam:discNumber'], 2)
def test_get_metadata_has_track_number_in_album(self):
self.core.tracklist.add([Track(track_no=7)])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'Metadata')
self.assertIn('xesam:trackNumber', result.keys())
self.assertEqual(result['xesam:trackNumber'], 7)
def test_get_volume_should_return_volume_between_zero_and_one(self):
self.core.playback.volume = None
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 0)
self.core.playback.volume = 0
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 0)
self.core.playback.volume = 50
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 0.5)
self.core.playback.volume = 100
result = self.mpris.Get(objects.PLAYER_IFACE, 'Volume')
self.assertEqual(result, 1)
def test_set_volume_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.playback.volume = 0
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEqual(self.core.playback.volume.get(), 0)
def test_set_volume_to_one_should_set_mixer_volume_to_100(self):
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 1.0)
self.assertEqual(self.core.playback.volume.get(), 100)
def test_set_volume_to_anything_above_one_sets_mixer_volume_to_100(self):
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', 2.0)
self.assertEqual(self.core.playback.volume.get(), 100)
def test_set_volume_to_anything_not_a_number_does_not_change_volume(self):
self.core.playback.volume = 10
self.mpris.Set(objects.PLAYER_IFACE, 'Volume', None)
self.assertEqual(self.core.playback.volume.get(), 10)
def test_get_position_returns_time_position_in_microseconds(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(10000)
result_in_microseconds = self.mpris.Get(
objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assertGreaterEqual(result_in_milliseconds, 10000)
def test_get_position_when_no_current_track_should_be_zero(self):
result_in_microseconds = self.mpris.Get(
objects.PLAYER_IFACE, 'Position')
result_in_milliseconds = result_in_microseconds // 1000
self.assertEqual(result_in_milliseconds, 0)
def test_get_minimum_rate_is_one_or_less(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'MinimumRate')
self.assertLessEqual(result, 1.0)
def test_get_maximum_rate_is_one_or_more(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'MaximumRate')
self.assertGreaterEqual(result, 1.0)
def test_can_go_next_is_true_if_can_control_and_other_next_track(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertTrue(result)
def test_can_go_next_is_false_if_next_track_is_the_same(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.repeat = True
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_next_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoNext')
self.assertFalse(result)
def test_can_go_previous_is_true_if_can_control_and_previous_track(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertTrue(result)
def test_can_go_previous_is_false_if_previous_track_is_the_same(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.repeat = True
self.core.playback.play()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_go_previous_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanGoPrevious')
self.assertFalse(result)
def test_can_play_is_true_if_can_control_and_current_track(self):
self.mpris.get_CanControl = lambda *_: True
self.core.tracklist.add([Track(uri='dummy:a')])
self.core.playback.play()
self.assertTrue(self.core.playback.current_track.get())
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertTrue(result)
def test_can_play_is_false_if_no_current_track(self):
self.mpris.get_CanControl = lambda *_: True
self.assertFalse(self.core.playback.current_track.get())
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_play_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPlay')
self.assertFalse(result)
def test_can_pause_is_true_if_can_control_and_track_can_be_paused(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertTrue(result)
def test_can_pause_if_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanPause')
self.assertFalse(result)
def test_can_seek_is_true_if_can_control_is_true(self):
self.mpris.get_CanControl = lambda *_: True
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertTrue(result)
def test_can_seek_is_false_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanSeek')
self.assertFalse(result)
def test_can_control_is_true(self):
result = self.mpris.Get(objects.PLAYER_IFACE, 'CanControl')
self.assertTrue(result)
def test_next_is_ignored_if_can_go_next_is_false(self):
self.mpris.get_CanGoNext = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_next_when_playing_skips_to_next_track_and_keep_playing(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_next_when_at_end_of_list_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Next()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_next_when_paused_should_skip_to_next_track_and_stay_paused(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_next_when_stopped_skips_to_next_track_and_stay_stopped(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.stop()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_previous_is_ignored_if_can_go_previous_is_false(self):
self.mpris.get_CanGoPrevious = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
def test_previous_when_playing_skips_to_prev_track_and_keep_playing(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_previous_when_at_start_of_list_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Previous()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_previous_when_paused_skips_to_previous_track_and_pause(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.core.playback.pause()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_previous_when_stopped_skips_to_previous_track_and_stops(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.next()
self.core.playback.stop()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Previous()
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_pause_is_ignored_if_can_pause_is_false(self):
self.mpris.get_CanPause = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_pause_when_playing_should_pause_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_pause_when_paused_has_no_effect(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_playpause_is_ignored_if_can_pause_is_false(self):
self.mpris.get_CanPause = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_playpause_when_playing_should_pause_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
def test_playpause_when_paused_should_resume_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
at_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(at_pause, 0)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(after_pause, at_pause)
def test_playpause_when_stopped_should_start_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.PlayPause()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_stop_is_ignored_if_can_control_is_false(self):
self.mpris.get_CanControl = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Stop()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_stop_when_playing_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.mpris.Stop()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_stop_when_paused_should_stop_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.mpris.Stop()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_play_is_ignored_if_can_play_is_false(self):
self.mpris.get_CanPlay = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_play_when_stopped_starts_playback(self):
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
def test_play_after_pause_resumes_from_same_position(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(before_pause, 0)
self.mpris.Pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
at_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(at_pause, before_pause)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_pause = self.core.playback.time_position.get()
self.assertGreaterEqual(after_pause, at_pause)
def test_play_when_there_is_no_track_has_no_effect(self):
self.core.tracklist.clear()
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.Play()
self.assertEqual(self.core.playback.state.get(), STOPPED)
def test_seek_is_ignored_if_can_seek_is_false(self):
self.mpris.get_CanSeek = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 0)
milliseconds_to_seek = 10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
after_seek = self.core.playback.time_position.get()
self.assertLessEqual(before_seek, after_seek)
self.assertLess(after_seek, before_seek + milliseconds_to_seek)
def test_seek_seeks_given_microseconds_forward_in_the_current_track(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 0)
milliseconds_to_seek = 10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek)
def test_seek_seeks_given_microseconds_backward_if_negative(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 20000)
milliseconds_to_seek = -10000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek)
self.assertLess(after_seek, before_seek)
def test_seek_seeks_to_start_of_track_if_new_position_is_negative(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 20000)
milliseconds_to_seek = -30000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, before_seek + milliseconds_to_seek)
self.assertLess(after_seek, before_seek)
self.assertGreaterEqual(after_seek, 0)
def test_seek_skips_to_next_track_if_new_position_gt_track_length(self):
self.core.tracklist.add([
Track(uri='dummy:a', length=40000),
Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.seek(20000)
before_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(before_seek, 20000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
milliseconds_to_seek = 50000
microseconds_to_seek = milliseconds_to_seek * 1000
self.mpris.Seek(microseconds_to_seek)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:b')
after_seek = self.core.playback.time_position.get()
self.assertGreaterEqual(after_seek, 0)
self.assertLess(after_seek, before_seek)
def test_set_position_is_ignored_if_can_seek_is_false(self):
self.mpris.get_CanSeek = lambda *_: False
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_set_position = self.core.playback.time_position.get()
self.assertLessEqual(before_set_position, 5000)
track_id = 'a'
position_to_set_in_millisec = 20000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertLessEqual(before_set_position, after_set_position)
self.assertLess(after_set_position, position_to_set_in_millisec)
def test_set_position_sets_the_current_track_position_in_microsecs(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
before_set_position = self.core.playback.time_position.get()
self.assertLessEqual(before_set_position, 5000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
track_id = '/com/mopidy/track/0'
position_to_set_in_millisec = 20000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
self.assertEqual(self.core.playback.state.get(), PLAYING)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(
after_set_position, position_to_set_in_millisec)
def test_set_position_does_nothing_if_the_position_is_negative(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(before_set_position, 20000)
self.assertLessEqual(before_set_position, 25000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
track_id = '/com/mopidy/track/0'
position_to_set_in_millisec = -1000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(after_set_position, before_set_position)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_set_position_does_nothing_if_position_is_gt_track_length(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(before_set_position, 20000)
self.assertLessEqual(before_set_position, 25000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
track_id = 'a'
position_to_set_in_millisec = 50000
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(after_set_position, before_set_position)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_set_position_is_noop_if_track_id_isnt_current_track(self):
self.core.tracklist.add([Track(uri='dummy:a', length=40000)])
self.core.playback.play()
self.core.playback.seek(20000)
before_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(before_set_position, 20000)
self.assertLessEqual(before_set_position, 25000)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
track_id = 'b'
position_to_set_in_millisec = 0
position_to_set_in_microsec = position_to_set_in_millisec * 1000
self.mpris.SetPosition(track_id, position_to_set_in_microsec)
after_set_position = self.core.playback.time_position.get()
self.assertGreaterEqual(after_set_position, before_set_position)
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
def test_open_uri_is_ignored_if_can_play_is_false(self):
self.mpris.get_CanPlay = lambda *_: False
self.backend.library.dummy_library = [
Track(uri='dummy:/test/uri')]
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(len(self.core.tracklist.tracks.get()), 0)
def test_open_uri_ignores_uris_with_unknown_uri_scheme(self):
self.assertListEqual(self.core.uri_schemes.get(), ['dummy'])
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='notdummy:/test/uri')]
self.mpris.OpenUri('notdummy:/test/uri')
self.assertEqual(len(self.core.tracklist.tracks.get()), 0)
def test_open_uri_adds_uri_to_tracklist(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(
self.core.tracklist.tracks.get()[0].uri, 'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_stopped(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.assertEqual(self.core.playback.state.get(), STOPPED)
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(
self.core.playback.current_track.get().uri, 'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_paused(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.core.playback.pause()
self.assertEqual(self.core.playback.state.get(), PAUSED)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(
self.core.playback.current_track.get().uri, 'dummy:/test/uri')
def test_open_uri_starts_playback_of_new_track_if_playing(self):
self.mpris.get_CanPlay = lambda *_: True
self.backend.library.dummy_library = [Track(uri='dummy:/test/uri')]
self.core.tracklist.add([Track(uri='dummy:a'), Track(uri='dummy:b')])
self.core.playback.play()
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(self.core.playback.current_track.get().uri, 'dummy:a')
self.mpris.OpenUri('dummy:/test/uri')
self.assertEqual(self.core.playback.state.get(), PLAYING)
self.assertEqual(
self.core.playback.current_track.get().uri, 'dummy:/test/uri')

View File

@ -1,172 +0,0 @@
from __future__ import unicode_literals
import datetime
import mock
import unittest
import pykka
try:
import dbus
except ImportError:
dbus = False
from mopidy import core
from mopidy.audio import PlaybackState
from mopidy.backends import dummy
from mopidy.models import Track
if dbus:
from mopidy.frontends.mpris import objects
@unittest.skipUnless(dbus, 'dbus not found')
class PlayerInterfaceTest(unittest.TestCase):
def setUp(self):
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = dummy.create_dummy_backend_proxy()
self.core = core.Core.start(backends=[self.backend]).proxy()
self.mpris = objects.MprisObject(config={}, core=self.core)
foo = self.core.playlists.create('foo').get()
foo = foo.copy(last_modified=datetime.datetime(2012, 3, 1, 6, 0, 0))
foo = self.core.playlists.save(foo).get()
bar = self.core.playlists.create('bar').get()
bar = bar.copy(last_modified=datetime.datetime(2012, 2, 1, 6, 0, 0))
bar = self.core.playlists.save(bar).get()
baz = self.core.playlists.create('baz').get()
baz = baz.copy(last_modified=datetime.datetime(2012, 1, 1, 6, 0, 0))
baz = self.core.playlists.save(baz).get()
self.playlist = baz
def tearDown(self):
pykka.ActorRegistry.stop_all()
def test_activate_playlist_appends_tracks_to_tracklist(self):
self.core.tracklist.add([
Track(uri='dummy:old-a'),
Track(uri='dummy:old-b'),
])
self.playlist = self.playlist.copy(tracks=[
Track(uri='dummy:baz-a'),
Track(uri='dummy:baz-b'),
Track(uri='dummy:baz-c'),
])
self.playlist = self.core.playlists.save(self.playlist).get()
self.assertEqual(2, self.core.tracklist.length.get())
playlists = self.mpris.GetPlaylists(0, 100, 'User', False)
playlist_id = playlists[2][0]
self.mpris.ActivatePlaylist(playlist_id)
self.assertEqual(5, self.core.tracklist.length.get())
self.assertEqual(
PlaybackState.PLAYING, self.core.playback.state.get())
self.assertEqual(
self.playlist.tracks[0], self.core.playback.current_track.get())
def test_activate_empty_playlist_is_harmless(self):
self.assertEqual(0, self.core.tracklist.length.get())
playlists = self.mpris.GetPlaylists(0, 100, 'User', False)
playlist_id = playlists[2][0]
self.mpris.ActivatePlaylist(playlist_id)
self.assertEqual(0, self.core.tracklist.length.get())
self.assertEqual(
PlaybackState.STOPPED, self.core.playback.state.get())
self.assertIsNone(self.core.playback.current_track.get())
def test_get_playlists_in_alphabetical_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Alphabetical', False)
self.assertEqual(3, len(result))
self.assertEqual('/com/mopidy/playlist/MR2W23LZHJRGC4Q_', result[0][0])
self.assertEqual('bar', result[0][1])
self.assertEqual('/com/mopidy/playlist/MR2W23LZHJRGC6Q_', result[1][0])
self.assertEqual('baz', result[1][1])
self.assertEqual('/com/mopidy/playlist/MR2W23LZHJTG63Y_', result[2][0])
self.assertEqual('foo', result[2][1])
def test_get_playlists_in_reverse_alphabetical_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Alphabetical', True)
self.assertEqual(3, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('baz', result[1][1])
self.assertEqual('bar', result[2][1])
def test_get_playlists_in_modified_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Modified', False)
self.assertEqual(3, len(result))
self.assertEqual('baz', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('foo', result[2][1])
def test_get_playlists_in_reverse_modified_order(self):
result = self.mpris.GetPlaylists(0, 100, 'Modified', True)
self.assertEqual(3, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('baz', result[2][1])
def test_get_playlists_in_user_order(self):
result = self.mpris.GetPlaylists(0, 100, 'User', False)
self.assertEqual(3, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('baz', result[2][1])
def test_get_playlists_in_reverse_user_order(self):
result = self.mpris.GetPlaylists(0, 100, 'User', True)
self.assertEqual(3, len(result))
self.assertEqual('baz', result[0][1])
self.assertEqual('bar', result[1][1])
self.assertEqual('foo', result[2][1])
def test_get_playlists_slice_on_start_of_list(self):
result = self.mpris.GetPlaylists(0, 2, 'User', False)
self.assertEqual(2, len(result))
self.assertEqual('foo', result[0][1])
self.assertEqual('bar', result[1][1])
def test_get_playlists_slice_later_in_list(self):
result = self.mpris.GetPlaylists(2, 2, 'User', False)
self.assertEqual(1, len(result))
self.assertEqual('baz', result[0][1])
def test_get_playlist_count_returns_number_of_playlists(self):
result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'PlaylistCount')
self.assertEqual(3, result)
def test_get_orderings_includes_alpha_modified_and_user(self):
result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'Orderings')
self.assertIn('Alphabetical', result)
self.assertNotIn('Created', result)
self.assertIn('Modified', result)
self.assertNotIn('Played', result)
self.assertIn('User', result)
def test_get_active_playlist_does_not_return_a_playlist(self):
result = self.mpris.Get(objects.PLAYLISTS_IFACE, 'ActivePlaylist')
valid, playlist = result
playlist_id, playlist_name, playlist_icon_uri = playlist
self.assertEqual(False, valid)
self.assertEqual('/', playlist_id)
self.assertEqual('None', playlist_name)
self.assertEqual('', playlist_icon_uri)

View File

@ -1,87 +0,0 @@
from __future__ import unicode_literals
import mock
import unittest
import pykka
try:
import dbus
except ImportError:
dbus = False
from mopidy import core
from mopidy.backends import dummy
if dbus:
from mopidy.frontends.mpris import objects
@unittest.skipUnless(dbus, 'dbus not found')
class RootInterfaceTest(unittest.TestCase):
def setUp(self):
config = {
'mpris': {
'desktop_file': '/tmp/foo.desktop',
}
}
objects.exit_process = mock.Mock()
objects.MprisObject._connect_to_dbus = mock.Mock()
self.backend = dummy.create_dummy_backend_proxy()
self.core = core.Core.start(backends=[self.backend]).proxy()
self.mpris = objects.MprisObject(config=config, core=self.core)
def tearDown(self):
pykka.ActorRegistry.stop_all()
def test_constructor_connects_to_dbus(self):
self.assert_(self.mpris._connect_to_dbus.called)
def test_fullscreen_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'Fullscreen')
self.assertFalse(result)
def test_setting_fullscreen_fails_and_returns_none(self):
result = self.mpris.Set(objects.ROOT_IFACE, 'Fullscreen', 'True')
self.assertIsNone(result)
def test_can_set_fullscreen_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanSetFullscreen')
self.assertFalse(result)
def test_can_raise_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanRaise')
self.assertFalse(result)
def test_raise_does_nothing(self):
self.mpris.Raise()
def test_can_quit_returns_true(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'CanQuit')
self.assertTrue(result)
def test_quit_should_stop_all_actors(self):
self.mpris.Quit()
self.assert_(objects.exit_process.called)
def test_has_track_list_returns_false(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'HasTrackList')
self.assertFalse(result)
def test_identify_is_mopidy(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'Identity')
self.assertEquals(result, 'Mopidy')
def test_desktop_entry_is_based_on_DESKTOP_FILE_setting(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'DesktopEntry')
self.assertEquals(result, 'foo')
def test_supported_uri_schemes_includes_backend_uri_schemes(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedUriSchemes')
self.assertEquals(len(result), 1)
self.assertEquals(result[0], 'dummy')
def test_supported_mime_types_is_empty(self):
result = self.mpris.Get(objects.ROOT_IFACE, 'SupportedMimeTypes')
self.assertEquals(len(result), 0)

View File

@ -39,5 +39,6 @@ class VersionTest(unittest.TestCase):
self.assertLess(SV('0.13.0'), SV('0.14.0'))
self.assertLess(SV('0.14.0'), SV('0.14.1'))
self.assertLess(SV('0.14.1'), SV('0.14.2'))
self.assertLess(SV('0.14.2'), SV(__version__))
self.assertLess(SV(__version__), SV('0.15.1'))
self.assertLess(SV('0.14.2'), SV('0.15.0'))
self.assertLess(SV('0.15.0'), SV(__version__))
self.assertLess(SV(__version__), SV('0.16.1'))