Release v0.1.0

This commit is contained in:
Stein Magnus Jodal 2010-08-23 13:19:58 +02:00
commit 5eeb964647
77 changed files with 1633 additions and 1247 deletions

View File

@ -9,9 +9,10 @@ in Spotify's vast archive, manage playlists, and play music, you can use most
platforms, including Windows, Mac OS X, Linux, and iPhone and Android phones.
To install Mopidy, check out
`the installation docs <http://www.mopidy.com/docs/installation/>`_.
`the installation docs <http://www.mopidy.com/docs/master/installation/>`_.
* `Documentation <http://www.mopidy.com/>`_
* `Documentation <http://www.mopidy.com/docs/master/>`_
* `Documentation (development version) <http://www.mopidy.com/docs/develop/>`_
* `Source code <http://github.com/jodal/mopidy>`_
* `Issue tracker <http://github.com/jodal/mopidy/issues>`_
* IRC: ``#mopidy`` at `irc.freenode.net <http://freenode.net/>`_

View File

@ -0,0 +1,18 @@
***********************
:mod:`mopidy.frontends`
***********************
A frontend is responsible for exposing Mopidy for a type of clients.
Frontend API
============
A stable frontend API is not available yet, as we've only implemented a single
frontend module.
Frontends
=========
* :mod:`mopidy.frontends.mpd`

View File

@ -4,6 +4,8 @@
.. automodule:: mopidy.frontends.mpd
:synopsis: MPD frontend
:members:
:undoc-members:
MPD server
@ -17,10 +19,21 @@ MPD server
.. inheritance-diagram:: mopidy.frontends.mpd.server
MPD frontend
============
MPD session
===========
.. automodule:: mopidy.frontends.mpd.frontend
.. automodule:: mopidy.frontends.mpd.session
:synopsis: MPD client session
:members:
:undoc-members:
.. inheritance-diagram:: mopidy.frontends.mpd.session
MPD dispatcher
==============
.. automodule:: mopidy.frontends.mpd.dispatcher
:synopsis: MPD request dispatcher
:members:
:undoc-members:

View File

@ -40,58 +40,58 @@ methods as described below.
:mod:`mopidy.mixers.alsa` -- ALSA mixer for Linux
=================================================
.. inheritance-diagram:: mopidy.mixers.alsa
.. automodule:: mopidy.mixers.alsa
:synopsis: ALSA mixer for Linux
:members:
.. inheritance-diagram:: mopidy.mixers.alsa.AlsaMixer
:mod:`mopidy.mixers.denon` -- Hardware mixer for Denon amplifiers
=================================================================
.. inheritance-diagram:: mopidy.mixers.denon
.. automodule:: mopidy.mixers.denon
:synopsis: Hardware mixer for Denon amplifiers
:members:
.. inheritance-diagram:: mopidy.mixers.denon
:mod:`mopidy.mixers.dummy` -- Dummy mixer for testing
=====================================================
.. inheritance-diagram:: mopidy.mixers.dummy
.. automodule:: mopidy.mixers.dummy
:synopsis: Dummy mixer for testing
:members:
.. inheritance-diagram:: mopidy.mixers.dummy
:mod:`mopidy.mixers.gstreamer_software` -- Software mixer for all platforms
===========================================================================
.. inheritance-diagram:: mopidy.mixers.gstreamer_software
.. automodule:: mopidy.mixers.gstreamer_software
:synopsis: Software mixer for all platforms
:members:
.. inheritance-diagram:: mopidy.mixers.gstreamer_software
:mod:`mopidy.mixers.osa` -- Osa mixer for OS X
==============================================
.. inheritance-diagram:: mopidy.mixers.osa
.. automodule:: mopidy.mixers.osa
:synopsis: Osa mixer for OS X
:members:
.. inheritance-diagram:: mopidy.mixers.osa
:mod:`mopidy.mixers.nad` -- Hardware mixer for NAD amplifiers
=============================================================
.. inheritance-diagram:: mopidy.mixers.nad
.. automodule:: mopidy.mixers.nad
:synopsis: Hardware mixer for NAD amplifiers
:members:
.. inheritance-diagram:: mopidy.mixers.nad

22
docs/api/outputs.rst Normal file
View File

@ -0,0 +1,22 @@
*********************
:mod:`mopidy.outputs`
*********************
Outputs are responsible for playing audio.
Output API
==========
A stable output API is not available yet, as we've only implemented a single
output module.
:mod:`mopidy.outputs.gstreamer` -- GStreamer output for all platforms
=====================================================================
.. inheritance-diagram:: mopidy.outputs.gstreamer
.. automodule:: mopidy.outputs.gstreamer
:synopsis: GStreamer output for all platforms
:members:

View File

@ -5,17 +5,26 @@ Changes
This change log is used to track all major changes to Mopidy.
0.1.0a4 (in development)
========================
0.1.0 (2010-08-23)
==================
The greatest release ever! We present to you important improvements in search
functionality, working track position seeking, no known stability issues, and
greatly improved MPD client support.
After three weeks of long nights and sprints we're finally pleased enough with
the state of Mopidy to remove the alpha label, and do a regular release.
Mopidy 0.1.0 got important improvements in search functionality, working track
position seeking, no known stability issues, and greatly improved MPD client
support. There are lots of changes since 0.1.0a3, and we urge you to at least
read the *important changes* below.
This release does not support OS X. We're sorry about that, and are working on
fixing the OS X issues for a future release. You can track the progress at
:issue:`14`.
**Important changes**
- License changed from GPLv2 to Apache License, version 2.0.
- GStreamer is now a required dependency.
- GStreamer is now a required dependency. See our :doc:`GStreamer installation
docs <installation/gstreamer>`.
- :mod:`mopidy.backends.libspotify` is now the default backend.
:mod:`mopidy.backends.despotify` is no longer available. This means that you
need to install the :doc:`dependencies for libspotify
@ -39,8 +48,6 @@ greatly improved MPD client support.
the packages created by ``setup.py`` for i.e. PyPI.
- MPD frontend:
- Relocate from :mod:`mopidy.mpd` to :mod:`mopidy.frontends.mpd`.
- Split gigantic protocol implementation into eleven modules.
- Search improvements, including support for multi-word search.
- Fixed ``play "-1"`` and ``playid "-1"`` behaviour when playlist is empty
or when a current track is set.
@ -56,8 +63,14 @@ greatly improved MPD client support.
- Fix ``load`` so that one can append a playlist to the current playlist, and
make it return the correct error message if the playlist is not found.
- Support for single track repeat added. (Fixes: :issue:`4`)
- Relocate from :mod:`mopidy.mpd` to :mod:`mopidy.frontends.mpd`.
- Split gigantic protocol implementation into eleven modules.
- Rename ``mopidy.frontends.mpd.{serializer => translator}`` to match naming
in backends.
- Remove setting :attr:`mopidy.settings.SERVER` and
:attr:`mopidy.settings.FRONTEND` in favour of the new
:attr:`mopidy.settings.FRONTENDS`.
- Run MPD server in its own process.
- Backends:
@ -68,6 +81,9 @@ greatly improved MPD client support.
- A Spotify application key is now bundled with the source.
:attr:`mopidy.settings.SPOTIFY_LIB_APPKEY` is thus removed.
- If failing to play a track, playback will skip to the next track.
- Both :mod:`mopidy.backends.libspotify` and :mod:`mopidy.backends.local`
have been rewritten to use the new common GStreamer audio output module,
:mod:`mopidy.outputs.gstreamer`.
- Mixers:

View File

@ -130,7 +130,7 @@ html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.

View File

@ -58,6 +58,8 @@ Code style
Commit guidelines
=================
- We follow the development process described at http://nvie.com/git-model.
- Keep commits small and on topic.
- If a commit looks too big you should be working in a feature branch not a
@ -107,8 +109,7 @@ Continuous integration server
We run a continuous integration server called Hudson at
http://hudson.mopidy.com/ that runs all test on multiple platforms (Ubuntu, OS
X, etc.) for every commit we push to GitHub. If the build is broken or fixed,
Hudson will issue notifications to our IRC channel.
X, etc.) for every commit we push to GitHub.
In addition to running tests, Hudson also does coverage statistics and uses
pylint to check for errors and possible improvements in our code. So, if you're
@ -135,9 +136,16 @@ Then, to generate docs::
.. note::
The documentation at http://www.mopidy.com/docs/ is automatically updated
within 10 minutes after a documentation update is pushed to
``jodal/mopidy/master`` at GitHub.
The documentation at http://www.mopidy.com/ is automatically updated when a
documentation update is pushed to ``jodal/mopidy`` at GitHub.
Documentation generated from the ``master`` branch is published at
http://www.mopidy.com/docs/master/, and will always be valid for the latest
release.
Documentation generated from the ``develop`` branch is published at
http://www.mopidy.com/docs/develop/, and will always be valid for the
latest development snapshot.
Creating releases

View File

@ -12,40 +12,94 @@ In addition to what you'll find here, don't forget the :doc:`/api/index`.
Class instantiation and usage
=============================
The following diagram shows how Mopidy with the despotify backend and ALSA
mixer is wired together. The gray nodes are part of external dependencies, and
not Mopidy. The red nodes lives in the ``main`` process (running an
:mod:`asyncore` loop), while the blue nodes lives in a secondary process named
``core`` (running a service loop in :class:`mopidy.core.CoreProcess`).
The following diagram shows how Mopidy is wired together with the MPD client,
the Spotify service, and the speakers.
**Legend**
- Filled red boxes are the key external systems.
- Gray boxes are external dependencies.
- Blue circles lives in the ``main`` process, also known as ``CoreProcess``.
It is processing messages put on the core queue.
- Purple circles lives in a process named ``MpdProcess``, running an
:mod:`asyncore` loop.
- Green circles lives in a process named ``GStreamerProcess``.
- Brown circle is a thread living in the ``CoreProcess``.
.. digraph:: class_instantiation_and_usage
"spytify" [ color="gray" ]
"despotify" [ color="gray" ]
"alsaaudio" [ color="gray" ]
"__main__" [ color="red" ]
"main" [ color="blue" ]
"CoreProcess" [ color="blue" ]
"DespotifyBackend" [ color="blue" ]
"AlsaMixer" [ color="blue" ]
# Frontend
"MPD client" [ color="red", style="filled", shape="box" ]
"MpdFrontend" [ color="blue" ]
"MpdServer" [ color="red" ]
"MpdSession" [ color="red" ]
"__main__" -> "CoreProcess" [ label="create" ]
"__main__" -> "MpdServer" [ label="create" ]
"CoreProcess" -> "DespotifyBackend" [ label="create" ]
"MpdProcess" [ color="purple" ]
"MpdServer" [ color="purple" ]
"MpdSession" [ color="purple" ]
"MpdDispatcher" [ color="blue" ]
# Backend
"Libspotify\nBackend" [ color="blue" ]
"Libspotify\nSessionManager" [ color="brown" ]
"pyspotify" [ color="gray", shape="box" ]
"libspotify" [ color="gray", shape="box" ]
"Spotify" [ color="red", style="filled", shape="box" ]
# Output/mixer
"GStreamer\nOutput" [ color="blue" ]
"GStreamer\nSoftwareMixer" [ color="blue" ]
"GStreamer\nProcess" [ color="green" ]
"GStreamer" [ color="gray", shape="box" ]
"Speakers" [ color="red", style="filled", shape="box" ]
"main" -> "CoreProcess" [ label="create" ]
# Frontend
"CoreProcess" -> "MpdFrontend" [ label="create" ]
"MpdServer" -> "MpdSession" [ label="create one per client" ]
"MpdSession" -> "MpdFrontend" [ label="pass MPD requests to" ]
"MpdFrontend" -> "DespotifyBackend" [ label="use backend API" ]
"DespotifyBackend" -> "AlsaMixer" [ label="create and use mixer API" ]
"DespotifyBackend" -> "spytify" [ label="use Python wrapper" ]
"spytify" -> "despotify" [ label="use C library" ]
"AlsaMixer" -> "alsaaudio" [ label="use Python library" ]
"MpdFrontend" -> "MpdProcess" [ label="create" ]
"MpdFrontend" -> "MpdDispatcher" [ label="create" ]
"MpdProcess" -> "MpdServer" [ label="create" ]
"MpdServer" -> "MpdSession" [ label="create one\nper client" ]
"MpdSession" -> "MpdDispatcher" [
label="pass requests\nvia core_queue" ]
"MpdDispatcher" -> "MpdSession" [
label="pass response\nvia reply_to pipe" ]
"MpdDispatcher" -> "Libspotify\nBackend" [ label="use backend API" ]
"MPD client" -> "MpdServer" [ label="connect" ]
"MPD client" -> "MpdSession" [ label="request" ]
"MpdSession" -> "MPD client" [ label="response" ]
# Backend
"CoreProcess" -> "Libspotify\nBackend" [ label="create" ]
"Libspotify\nBackend" -> "Libspotify\nSessionManager" [
label="creates and uses" ]
"Libspotify\nSessionManager" -> "Libspotify\nBackend" [
label="pass commands\nvia core_queue" ]
"Libspotify\nSessionManager" -> "pyspotify" [ label="use Python\nwrapper" ]
"pyspotify" -> "Libspotify\nSessionManager" [ label="use callbacks" ]
"pyspotify" -> "libspotify" [ label="use C library" ]
"libspotify" -> "Spotify" [ label="use service" ]
"Libspotify\nSessionManager" -> "GStreamer\nProcess" [
label="pass commands\nand audio data\nvia output_queue" ]
# Output/mixer
"Libspotify\nBackend" -> "GStreamer\nSoftwareMixer" [
label="create and\nuse mixer API" ]
"GStreamer\nSoftwareMixer" -> "GStreamer\nProcess" [
label="pass commands\nvia output_queue" ]
"CoreProcess" -> "GStreamer\nOutput" [ label="create" ]
"GStreamer\nOutput" -> "GStreamer\nProcess" [ label="create" ]
"GStreamer\nProcess" -> "GStreamer" [ label="use library" ]
"GStreamer" -> "Speakers" [ label="play audio" ]
Thread/process communication
============================
.. warning::
This section is currently outdated.
- Everything starts with ``Main``.
- ``Main`` creates a ``Core`` process which runs the frontend, backend, and
mixer.

View File

@ -2,10 +2,10 @@
Installation
************
To get a basic version of Mopidy running, you need Python and the GStreamer
library. To use Spotify with Mopidy, you also need :doc:`libspotify and
pyspotify <libspotify>`. Mopidy itself can either be installed from the Python
package index, PyPI, or from git.
To get a basic version of Mopidy running, you need Python and the
:doc:`GStreamer library <gstreamer>`. To use Spotify with Mopidy, you also need
:doc:`libspotify and pyspotify <libspotify>`. Mopidy itself can either be
installed from the Python package index, PyPI, or from git.
Install dependencies
@ -31,6 +31,11 @@ Make sure you got the required dependencies installed.
- pyserial (Debian/Ubuntu package: python-serial)
- *Default:* :mod:`mopidy.mixers.gstreamer_software` (Linux, OS X, and
Windows)
- No additional dependencies.
- :mod:`mopidy.mixers.nad` (Linux, OS X, and Windows)
- pyserial (Debian/Ubuntu package: python-serial)
@ -41,7 +46,7 @@ Make sure you got the required dependencies installed.
- Dependencies for at least one Mopidy backend:
- :mod:`mopidy.backends.libspotify` (Linux, OS X, and Windows)
- *Default:* :mod:`mopidy.backends.libspotify` (Linux, OS X, and Windows)
- :doc:`libspotify and pyspotify <libspotify>`
@ -91,20 +96,42 @@ For an introduction to ``git``, please visit `git-scm.com
Settings
========
Create a file named ``settings.py`` in the directory ``~/.mopidy/``.
Mopidy reads settings from the file ``~/.mopidy/settings.py``, where ``~``
means your *home directory*. If your username is ``alice`` and you are running
Linux, the settings file should probably be at
``/home/alice/.mopidy/settings.py``.
If you are using a Spotify backend, enter your Spotify Premium account's
username and password into the file, like this::
You can either create this file yourself, or run the ``mopidy`` command, and it
will create an empty settings file for you.
Music from Spotify
------------------
If you are using the Spotify backend, which is the default, enter your Spotify
Premium account's username and password into the file, like this::
SPOTIFY_USERNAME = u'myusername'
SPOTIFY_PASSWORD = u'mysecret'
Currently :mod:`mopidy.backends.libspotify` is the default backend. If you want
to use :mod:`mopidy.backends.local`, add the following setting::
Music from local storage
------------------------
If you want use Mopidy to play music you have locally at your machine instead
of using Spotify, you need to change the backend from the default to
:mod:`mopidy.backends.local` by adding the following line to your settings
file::
BACKENDS = (u'mopidy.backends.local.LocalBackend',)
For a full list of available settings, see :mod:`mopidy.settings`.
You may also want to change some of the ``LOCAL_*`` settings. See
:mod:`mopidy.settings`, for a full list of available settings.
Connecting from other machines on the network
---------------------------------------------
As a secure default, Mopidy only accepts connections from ``localhost``. If you
want to open it for connections from other machines on your network, see
the documentation for :attr:`mopidy.settings.MPD_SERVER_HOSTNAME`.
Running Mopidy
@ -114,10 +141,9 @@ To start Mopidy, simply open a terminal and run::
mopidy
When Mopidy says ``MPD server running at [localhost]:6600`` it's ready to
accept connections by any MPD client. You can find a list of tons of MPD
clients at http://mpd.wikia.com/wiki/Clients. We use GMPC and
ncmpcpp during development. The first is a GUI client, and the second is a
terminal client.
When Mopidy says ``MPD server running at [127.0.0.1]:6600`` it's ready to
accept connections by any MPD client. You can find tons of MPD clients at
http://mpd.wikia.com/wiki/Clients. We use GMPC and ncmpcpp during development.
The first is a GUI client, and the second is a terminal client.
To stop Mopidy, press ``CTRL+C``.

View File

@ -3,10 +3,7 @@ if not (2, 6) <= sys.version_info < (3,):
sys.exit(u'Mopidy requires Python >= 2.6, < 3')
def get_version():
return u'0.1.0a4'
def get_mpd_protocol_version():
return u'0.16.0'
return u'0.1.0'
class MopidyException(Exception):
def __init__(self, message, *args, **kwargs):

View File

@ -1,85 +1,17 @@
import asyncore
import logging
import logging.handlers
import multiprocessing
import optparse
import os
import sys
# Add ../ to the path so we can run Mopidy from a Git checkout without
# installing it on the system.
sys.path.insert(0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))
from mopidy import get_version, settings, SettingsError
from mopidy.process import CoreProcess
from mopidy.utils import get_class
from mopidy.utils.path import get_or_create_folder
from mopidy.utils.settings import list_settings_optparse_callback
logger = logging.getLogger('mopidy.main')
from mopidy.core import CoreProcess
def main():
options = _parse_options()
_setup_logging(options.verbosity_level, options.dump)
settings.validate()
logger.info('-- Starting Mopidy --')
get_or_create_folder('~/.mopidy/')
core_queue = multiprocessing.Queue()
get_class(settings.SERVER)(core_queue).start()
output_class = get_class(settings.OUTPUT)
backend_class = get_class(settings.BACKENDS[0])
frontend_class = get_class(settings.FRONTEND)
core = CoreProcess(core_queue, output_class, backend_class, frontend_class)
core.start()
asyncore.loop()
def _parse_options():
parser = optparse.OptionParser(version='Mopidy %s' % get_version())
parser.add_option('-q', '--quiet',
action='store_const', const=0, dest='verbosity_level',
help='less output (warning level)')
parser.add_option('-v', '--verbose',
action='store_const', const=2, dest='verbosity_level',
help='more output (debug level)')
parser.add_option('--dump',
action='store_true', dest='dump',
help='dump debug log to file')
parser.add_option('--list-settings',
action='callback', callback=list_settings_optparse_callback,
help='list current settings')
return parser.parse_args()[0]
def _setup_logging(verbosity_level, dump):
_setup_console_logging(verbosity_level)
if dump:
_setup_dump_logging()
def _setup_console_logging(verbosity_level):
if verbosity_level == 0:
level = logging.WARNING
elif verbosity_level == 2:
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(format=settings.CONSOLE_LOG_FORMAT, level=level)
def _setup_dump_logging():
root = logging.getLogger('')
root.setLevel(logging.DEBUG)
formatter = logging.Formatter(settings.DUMP_LOG_FORMAT)
handler = logging.handlers.RotatingFileHandler(
settings.DUMP_LOG_FILENAME, maxBytes=102400, backupCount=3)
handler.setFormatter(formatter)
root.addHandler(handler)
# Explictly call run() instead of start(), since we don't need to start
# another process.
CoreProcess().run()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logger.info(u'Interrupted by user')
sys.exit(0)
except SettingsError, e:
logger.error(e)
sys.exit(1)
except SystemExit, e:
logger.error(e)
sys.exit(1)
main()

View File

@ -440,7 +440,10 @@ class BasePlaybackController(object):
:param time_position: time position in milliseconds
:type time_position: int
:rtype: :class:`True` if successful, else :class:`False`
"""
# FIXME I think return value is only really useful for internal
# testing, as such it should probably not be exposed in API.
if self.state == self.STOPPED:
self.play()
elif self.state == self.PAUSED:
@ -455,7 +458,7 @@ class BasePlaybackController(object):
self._play_time_started = self._current_wall_time
self._play_time_accumulated = time_position
self._seek(time_position)
return self._seek(time_position)
def _seek(self, time_position):
"""

View File

@ -16,6 +16,12 @@ class LibspotifyBackend(BaseBackend):
**Issues:** http://github.com/jodal/mopidy/issues/labels/backend-libspotify
**Settings:**
- :attr:`mopidy.settings.SPOTIFY_LIB_CACHE`
- :attr:`mopidy.settings.SPOTIFY_USERNAME`
- :attr:`mopidy.settings.SPOTIFY_PASSWORD`
.. note::
This product uses SPOTIFY(R) CORE but is not endorsed, certified or

View File

@ -15,6 +15,8 @@ class LibspotifyLibraryController(BaseLibraryController):
def lookup(self, uri):
spotify_track = Link.from_string(uri).as_track()
# TODO Block until metadata_updated callback is called. Before that the
# track will be unloaded, unless it's already in the stored playlists.
return LibspotifyTranslator.to_mopidy_track(spotify_track)
def refresh(self, uri=None):

View File

@ -4,7 +4,7 @@ import multiprocessing
from spotify import Link, SpotifyError
from mopidy.backends.base import BasePlaybackController
from mopidy.process import pickle_connection
from mopidy.utils.process import pickle_connection
logger = logging.getLogger('mopidy.backends.libspotify.playback')

View File

@ -18,7 +18,10 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
def __init__(self, username, password, core_queue, output_queue):
SpotifySessionManager.__init__(self, username, password)
threading.Thread.__init__(self)
threading.Thread.__init__(self, name='LibspotifySessionManagerThread')
# Run as a daemon thread, so Mopidy won't wait for this thread to exit
# before Mopidy exits.
self.daemon = True
self.core_queue = core_queue
self.output_queue = output_queue
self.connected = threading.Event()

View File

@ -1,39 +1,29 @@
import gobject
gobject.threads_init()
# FIXME make sure we don't get hit by
# http://jameswestby.net/
# weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html
import pygst
pygst.require('0.10')
import gst
import logging
import os
import glob
import logging
import multiprocessing
import os
import shutil
import threading
from mopidy import settings
from mopidy.backends.base import *
from mopidy.models import Playlist, Track, Album
from mopidy.utils.process import pickle_connection
from .translator import parse_m3u, parse_mpd_tag_cache
logger = logging.getLogger(u'mopidy.backends.local')
class LocalMessages(threading.Thread):
def run(self):
gobject.MainLoop().run()
message_thread = LocalMessages()
message_thread.daemon = True
message_thread.start()
class LocalBackend(BaseBackend):
"""
A backend for playing music from a local music archive.
**Issues:** http://github.com/jodal/mopidy/issues/labels/backend-local
**Settings:**
- :attr:`mopidy.settings.LOCAL_MUSIC_FOLDER`
- :attr:`mopidy.settings.LOCAL_PLAYLIST_FOLDER`
- :attr:`mopidy.settings.LOCAL_TAG_CACHE`
"""
def __init__(self, *args, **kwargs):
@ -49,71 +39,40 @@ class LocalBackend(BaseBackend):
class LocalPlaybackController(BasePlaybackController):
def __init__(self, backend):
super(LocalPlaybackController, self).__init__(backend)
self._bin = gst.element_factory_make("playbin", "player")
self._bus = self._bin.get_bus()
sink = gst.element_factory_make("fakesink", "fakesink")
# FIXME cleanup fakesink?
self._bin.set_property("video-sink", sink)
self._bus.add_signal_watch()
self._bus_id = self._bus.connect('message', self._message)
self.stop()
def _set_state(self, state):
self._bin.set_state(state)
(_, new, _) = self._bin.get_state()
return new == state
def _send_recv(self, message):
(my_end, other_end) = multiprocessing.Pipe()
message.update({'reply_to': pickle_connection(other_end)})
self.backend.output_queue.put(message)
my_end.poll(None)
return my_end.recv()
def _message(self, bus, message):
if message.type == gst.MESSAGE_EOS:
self.on_end_of_track()
elif message.type == gst.MESSAGE_ERROR:
self._bin.set_state(gst.STATE_NULL)
error, debug = message.parse_error()
logger.error('%s %s', error, debug)
def _send(self, message):
self.backend.output_queue.put(message)
def _set_state(self, state):
return self._send_recv({'command': 'set_state', 'state': state})
def _play(self, track):
self._bin.set_state(gst.STATE_READY)
self._bin.set_property('uri', track.uri)
return self._set_state(gst.STATE_PLAYING)
return self._send_recv({'command': 'play_uri', 'uri': track.uri})
def _stop(self):
return self._set_state(gst.STATE_READY)
return self._set_state('READY')
def _pause(self):
return self._set_state(gst.STATE_PAUSED)
return self._set_state('PAUSED')
def _resume(self):
return self._set_state(gst.STATE_PLAYING)
return self._set_state('PLAYING')
def _seek(self, time_position):
self._bin.seek_simple(gst.Format(gst.FORMAT_TIME),
gst.SEEK_FLAG_FLUSH, time_position * gst.MSECOND)
self._set_state(gst.STATE_PLAYING)
return self._send_recv({'command': 'set_position',
'position': time_position})
@property
def time_position(self):
try:
return self._bin.query_position(gst.FORMAT_TIME)[0] // gst.MSECOND
except gst.QueryError, e:
logger.error('time_position failed: %s', e)
return 0
def destroy(self):
playbin, self._bin = self._bin, None
bus, self._bus = self._bus, None
bus.disconnect(self._bus_id)
bus.remove_signal_watch()
playbin.get_state()
playbin.set_state(gst.STATE_NULL)
bus.set_flushing(True)
del bus
del playbin
return self._send_recv({'command': 'get_position'})
class LocalStoredPlaylistsController(BaseStoredPlaylistsController):

90
mopidy/core.py Normal file
View File

@ -0,0 +1,90 @@
import logging
import multiprocessing
import optparse
from mopidy import get_version, settings
from mopidy.utils import get_class
from mopidy.utils.log import setup_logging
from mopidy.utils.path import get_or_create_folder, get_or_create_file
from mopidy.utils.process import BaseProcess, unpickle_connection
from mopidy.utils.settings import list_settings_optparse_callback
logger = logging.getLogger('mopidy.core')
class CoreProcess(BaseProcess):
def __init__(self):
super(CoreProcess, self).__init__(name='CoreProcess')
self.core_queue = multiprocessing.Queue()
self.options = self.parse_options()
self.output_queue = None
self.backend = None
self.frontend = None
def parse_options(self):
parser = optparse.OptionParser(version='Mopidy %s' % get_version())
parser.add_option('-q', '--quiet',
action='store_const', const=0, dest='verbosity_level',
help='less output (warning level)')
parser.add_option('-v', '--verbose',
action='store_const', const=2, dest='verbosity_level',
help='more output (debug level)')
parser.add_option('--dump',
action='store_true', dest='dump',
help='dump debug log to file')
parser.add_option('--list-settings',
action='callback', callback=list_settings_optparse_callback,
help='list current settings')
return parser.parse_args()[0]
def run_inside_try(self):
logger.info(u'-- Starting Mopidy --')
self.setup()
while True:
message = self.core_queue.get()
self.process_message(message)
def setup(self):
self.setup_logging()
self.setup_settings()
self.output_queue = self.setup_output(self.core_queue)
self.backend = self.setup_backend(self.core_queue, self.output_queue)
self.frontend = self.setup_frontend(self.core_queue, self.backend)
def setup_logging(self):
setup_logging(self.options.verbosity_level, self.options.dump)
def setup_settings(self):
get_or_create_folder('~/.mopidy/')
get_or_create_file('~/.mopidy/settings.py')
settings.validate()
def setup_output(self, core_queue):
output_queue = multiprocessing.Queue()
get_class(settings.OUTPUT)(core_queue, output_queue)
return output_queue
def setup_backend(self, core_queue, output_queue):
return get_class(settings.BACKENDS[0])(core_queue, output_queue)
def setup_frontend(self, core_queue, backend):
frontend = get_class(settings.FRONTENDS[0])()
frontend.start_server(core_queue)
frontend.create_dispatcher(backend)
return frontend
def process_message(self, message):
if message.get('to') == 'output':
self.output_queue.put(message)
elif message['command'] == 'mpd_request':
response = self.frontend.dispatcher.handle_request(
message['request'])
connection = unpickle_connection(message['reply_to'])
connection.send(response)
elif message['command'] == 'end_of_track':
self.backend.playback.on_end_of_track()
elif message['command'] == 'stop_playback':
self.backend.playback.stop()
elif message['command'] == 'set_stored_playlists':
self.backend.stored_playlists.playlists = message['playlists']
else:
logger.warning(u'Cannot handle message: %s', message)

View File

@ -1,93 +1,37 @@
import re
from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.process import MpdProcess
from mopidy import MopidyException
class MpdAckError(MopidyException):
class MpdFrontend(object):
"""
Available MPD error codes::
The MPD frontend.
ACK_ERROR_NOT_LIST = 1
ACK_ERROR_ARG = 2
ACK_ERROR_PASSWORD = 3
ACK_ERROR_PERMISSION = 4
ACK_ERROR_UNKNOWN = 5
ACK_ERROR_NO_EXIST = 50
ACK_ERROR_PLAYLIST_MAX = 51
ACK_ERROR_SYSTEM = 52
ACK_ERROR_PLAYLIST_LOAD = 53
ACK_ERROR_UPDATE_ALREADY = 54
ACK_ERROR_PLAYER_SYNC = 55
ACK_ERROR_EXIST = 56
**Settings:**
- :attr:`mopidy.settings.MPD_SERVER_HOSTNAME`
- :attr:`mopidy.settings.MPD_SERVER_PORT`
"""
def __init__(self, message=u'', error_code=0, index=0, command=u''):
super(MpdAckError, self).__init__(message, error_code, index, command)
self.message = message
self.error_code = error_code
self.index = index
self.command = command
def __init__(self):
self.process = None
self.dispatcher = None
def get_mpd_ack(self):
def start_server(self, core_queue):
"""
MPD error code format::
Starts the MPD server.
ACK [%(error_code)i@%(index)i] {%(command)s} description
:param core_queue: the core queue
:type core_queue: :class:`multiprocessing.Queue`
"""
return u'ACK [%i@%i] {%s} %s' % (
self.error_code, self.index, self.command, self.message)
self.process = MpdProcess(core_queue)
self.process.start()
class MpdArgError(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdArgError, self).__init__(*args, **kwargs)
self.error_code = 2 # ACK_ERROR_ARG
def create_dispatcher(self, backend):
"""
Creates a dispatcher for MPD requests.
class MpdUnknownCommand(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdUnknownCommand, self).__init__(*args, **kwargs)
self.message = u'unknown command "%s"' % self.command
self.command = u''
self.error_code = 5 # ACK_ERROR_UNKNOWN
class MpdNoExistError(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdNoExistError, self).__init__(*args, **kwargs)
self.error_code = 50 # ACK_ERROR_NO_EXIST
class MpdNotImplemented(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdNotImplemented, self).__init__(*args, **kwargs)
self.message = u'Not implemented'
mpd_commands = set()
request_handlers = {}
def handle_pattern(pattern):
"""
Decorator for connecting command handlers to command patterns.
If you use named groups in the pattern, the decorated method will get the
groups as keyword arguments. If the group is optional, remember to give the
argument a default value.
For example, if the command is ``do that thing`` the ``what`` argument will
be ``this thing``::
@handle_pattern('^do (?P<what>.+)$')
def do(what):
...
:param pattern: regexp pattern for matching commands
:type pattern: string
"""
def decorator(func):
match = re.search('([a-z_]+)', pattern)
if match is not None:
mpd_commands.add(match.group())
if pattern in request_handlers:
raise ValueError(u'Tried to redefine handler for %s with %s' % (
pattern, func))
request_handlers[pattern] = func
func.__doc__ = ' - *Pattern:* ``%s``\n\n%s' % (
pattern, func.__doc__ or '')
return func
return decorator
:param backend: the backend
:type backend: :class:`mopidy.backends.base.BaseBackend`
:rtype: :class:`mopidy.frontends.mpd.dispatcher.MpdDispatcher`
"""
self.dispatcher = MpdDispatcher(backend)
return self.dispatcher

View File

@ -1,20 +1,18 @@
import logging
import re
from mopidy.frontends.mpd import (mpd_commands, request_handlers,
handle_pattern, MpdAckError, MpdArgError, MpdUnknownCommand)
from mopidy.frontends.mpd.exceptions import (MpdAckError, MpdArgError,
MpdUnknownCommand)
from mopidy.frontends.mpd.protocol import mpd_commands, request_handlers
# Do not remove the following import. The protocol modules must be imported to
# get them registered as request handlers.
from mopidy.frontends.mpd.protocol import (audio_output, command_list,
connection, current_playlist, music_db, playback, reflection, status,
stickers, stored_playlists)
connection, current_playlist, empty, music_db, playback, reflection,
status, stickers, stored_playlists)
from mopidy.utils import flatten
logger = logging.getLogger('mopidy.frontends.mpd.frontend')
class MpdFrontend(object):
class MpdDispatcher(object):
"""
The MPD frontend dispatches MPD requests to the correct handler.
Dispatches MPD requests to the correct handler.
"""
def __init__(self, backend=None):
@ -72,8 +70,3 @@ class MpdFrontend(object):
if add_ok and (not response or not response[-1].startswith(u'ACK')):
response.append(u'OK')
return response
@handle_pattern(r'^$')
def empty(frontend):
"""The original MPD server returns ``OK`` on an empty request."""
pass

View File

@ -0,0 +1,57 @@
from mopidy import MopidyException
class MpdAckError(MopidyException):
"""
Available MPD error codes::
ACK_ERROR_NOT_LIST = 1
ACK_ERROR_ARG = 2
ACK_ERROR_PASSWORD = 3
ACK_ERROR_PERMISSION = 4
ACK_ERROR_UNKNOWN = 5
ACK_ERROR_NO_EXIST = 50
ACK_ERROR_PLAYLIST_MAX = 51
ACK_ERROR_SYSTEM = 52
ACK_ERROR_PLAYLIST_LOAD = 53
ACK_ERROR_UPDATE_ALREADY = 54
ACK_ERROR_PLAYER_SYNC = 55
ACK_ERROR_EXIST = 56
"""
def __init__(self, message=u'', error_code=0, index=0, command=u''):
super(MpdAckError, self).__init__(message, error_code, index, command)
self.message = message
self.error_code = error_code
self.index = index
self.command = command
def get_mpd_ack(self):
"""
MPD error code format::
ACK [%(error_code)i@%(index)i] {%(command)s} description
"""
return u'ACK [%i@%i] {%s} %s' % (
self.error_code, self.index, self.command, self.message)
class MpdArgError(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdArgError, self).__init__(*args, **kwargs)
self.error_code = 2 # ACK_ERROR_ARG
class MpdUnknownCommand(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdUnknownCommand, self).__init__(*args, **kwargs)
self.message = u'unknown command "%s"' % self.command
self.command = u''
self.error_code = 5 # ACK_ERROR_UNKNOWN
class MpdNoExistError(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdNoExistError, self).__init__(*args, **kwargs)
self.error_code = 50 # ACK_ERROR_NO_EXIST
class MpdNotImplemented(MpdAckError):
def __init__(self, *args, **kwargs):
super(MpdNotImplemented, self).__init__(*args, **kwargs)
self.message = u'Not implemented'

View File

@ -0,0 +1,18 @@
import asyncore
import logging
from mopidy.frontends.mpd.server import MpdServer
from mopidy.utils.process import BaseProcess
logger = logging.getLogger('mopidy.frontends.mpd.process')
class MpdProcess(BaseProcess):
def __init__(self, core_queue):
super(MpdProcess, self).__init__(name='MpdProcess')
self.core_queue = core_queue
def run_inside_try(self):
logger.debug(u'Starting MPD server process')
server = MpdServer(self.core_queue)
server.start()
asyncore.loop()

View File

@ -10,8 +10,47 @@ implement our own MPD server which is compatible with the numerous existing
`MPD clients <http://mpd.wikia.com/wiki/Clients>`_.
"""
import re
#: The MPD protocol uses UTF-8 for encoding all data.
ENCODING = u'utf-8'
#: The MPD protocol uses ``\n`` as line terminator.
LINE_TERMINATOR = u'\n'
#: The MPD protocol version is 0.16.0.
VERSION = u'0.16.0'
mpd_commands = set()
request_handlers = {}
def handle_pattern(pattern):
"""
Decorator for connecting command handlers to command patterns.
If you use named groups in the pattern, the decorated method will get the
groups as keyword arguments. If the group is optional, remember to give the
argument a default value.
For example, if the command is ``do that thing`` the ``what`` argument will
be ``this thing``::
@handle_pattern('^do (?P<what>.+)$')
def do(what):
...
:param pattern: regexp pattern for matching commands
:type pattern: string
"""
def decorator(func):
match = re.search('([a-z_]+)', pattern)
if match is not None:
mpd_commands.add(match.group())
if pattern in request_handlers:
raise ValueError(u'Tried to redefine handler for %s with %s' % (
pattern, func))
request_handlers[pattern] = func
func.__doc__ = ' - *Pattern:* ``%s``\n\n%s' % (
pattern, func.__doc__ or '')
return func
return decorator

View File

@ -1,4 +1,5 @@
from mopidy.frontends.mpd import handle_pattern, MpdNotImplemented
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import MpdNotImplemented
@handle_pattern(r'^disableoutput "(?P<outputid>\d+)"$')
def disableoutput(frontend, outputid):

View File

@ -1,4 +1,5 @@
from mopidy.frontends.mpd import handle_pattern, MpdUnknownCommand
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import MpdUnknownCommand
@handle_pattern(r'^command_list_begin$')
def command_list_begin(frontend):

View File

@ -1,4 +1,5 @@
from mopidy.frontends.mpd import handle_pattern, MpdNotImplemented
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import MpdNotImplemented
@handle_pattern(r'^close$')
def close(frontend):
@ -9,8 +10,7 @@ def close(frontend):
Closes the connection to MPD.
"""
# TODO Does not work after multiprocessing branch merge
#frontend.session.do_close()
pass # TODO
@handle_pattern(r'^kill$')
def kill(frontend):
@ -21,8 +21,7 @@ def kill(frontend):
Kills MPD.
"""
# TODO Does not work after multiprocessing branch merge
#frontend.session.do_kill()
pass # TODO
@handle_pattern(r'^password "(?P<password>[^"]+)"$')
def password_(frontend, password):

View File

@ -1,4 +1,5 @@
from mopidy.frontends.mpd import (handle_pattern, MpdArgError, MpdNoExistError,
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import (MpdArgError, MpdNoExistError,
MpdNotImplemented)
@handle_pattern(r'^add "(?P<uri>[^"]*)"$')

View File

@ -0,0 +1,6 @@
from mopidy.frontends.mpd.protocol import handle_pattern
@handle_pattern(r'^$')
def empty(frontend):
"""The original MPD server returns ``OK`` on an empty request."""
pass

View File

@ -1,7 +1,7 @@
import re
from mopidy.frontends.mpd import handle_pattern, MpdNotImplemented
from mopidy.frontends.mpd.protocol import stored_playlists
from mopidy.frontends.mpd.protocol import handle_pattern, stored_playlists
from mopidy.frontends.mpd.exceptions import MpdNotImplemented
def _build_query(mpd_query):
"""

View File

@ -1,4 +1,5 @@
from mopidy.frontends.mpd import (handle_pattern, MpdArgError, MpdNoExistError,
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import (MpdArgError, MpdNoExistError,
MpdNotImplemented)
@handle_pattern(r'^consume (?P<state>[01])$')
@ -300,7 +301,7 @@ def seek(frontend, songpos, seconds):
"""
if frontend.backend.playback.current_playlist_position != songpos:
playpos(frontend, songpos)
return frontend.backend.playback.seek(int(seconds) * 1000)
frontend.backend.playback.seek(int(seconds) * 1000)
@handle_pattern(r'^seekid "(?P<cpid>\d+)" "(?P<seconds>\d+)"$')
def seekid(frontend, cpid, seconds):
@ -313,7 +314,7 @@ def seekid(frontend, cpid, seconds):
"""
if frontend.backend.playback.current_cpid != cpid:
playid(frontend, cpid)
return frontend.backend.playback.seek(int(seconds) * 1000)
frontend.backend.playback.seek(int(seconds) * 1000)
@handle_pattern(r'^setvol "(?P<volume>[-+]*\d+)"$')
def setvol(frontend, volume):

View File

@ -1,5 +1,5 @@
from mopidy.frontends.mpd import (handle_pattern, mpd_commands,
MpdNotImplemented)
from mopidy.frontends.mpd.protocol import handle_pattern, mpd_commands
from mopidy.frontends.mpd.exceptions import MpdNotImplemented
@handle_pattern(r'^commands$')
def commands(frontend):

View File

@ -1,4 +1,5 @@
from mopidy.frontends.mpd import handle_pattern, MpdNotImplemented
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import MpdNotImplemented
@handle_pattern(r'^clearerror$')
def clearerror(frontend):

View File

@ -1,4 +1,5 @@
from mopidy.frontends.mpd import handle_pattern, MpdNotImplemented
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import MpdNotImplemented
@handle_pattern(r'^sticker delete "(?P<field>[^"]+)" '
r'"(?P<uri>[^"]+)"( "(?P<name>[^"]+)")*$')

View File

@ -1,7 +1,7 @@
import datetime as dt
from mopidy.frontends.mpd import (handle_pattern, MpdNoExistError,
MpdNotImplemented)
from mopidy.frontends.mpd.protocol import handle_pattern
from mopidy.frontends.mpd.exceptions import MpdNoExistError, MpdNotImplemented
@handle_pattern(r'^listplaylist "(?P<name>[^"]+)"$')
def listplaylist(frontend, name):

View File

@ -1,21 +1,18 @@
import asynchat
import asyncore
import logging
import multiprocessing
import re
import socket
import sys
from mopidy import get_mpd_protocol_version, settings
from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR
from mopidy.process import pickle_connection
from mopidy.utils import indent
from mopidy import settings
from .session import MpdSession
logger = logging.getLogger('mopidy.frontends.mpd.server')
class MpdServer(asyncore.dispatcher):
"""
The MPD server. Creates a :class:`MpdSession` for each client connection.
The MPD server. Creates a :class:`mopidy.frontends.mpd.session.MpdSession`
for each client connection.
"""
def __init__(self, core_queue):
@ -58,65 +55,3 @@ class MpdServer(asyncore.dispatcher):
and re.match('\d+.\d+.\d+.\d+', hostname) is not None):
hostname = '::ffff:%s' % hostname
return hostname
class MpdSession(asynchat.async_chat):
"""
The MPD client session. Keeps track of a single client and dispatches its
MPD requests to the frontend.
"""
def __init__(self, server, client_socket, client_socket_address,
core_queue):
asynchat.async_chat.__init__(self, sock=client_socket)
self.server = server
self.client_address = client_socket_address[0]
self.client_port = client_socket_address[1]
self.core_queue = core_queue
self.input_buffer = []
self.set_terminator(LINE_TERMINATOR.encode(ENCODING))
def start(self):
"""Start a new client session."""
self.send_response(u'OK MPD %s' % get_mpd_protocol_version())
def collect_incoming_data(self, data):
"""Collect incoming data into buffer until a terminator is found."""
self.input_buffer.append(data)
def found_terminator(self):
"""Handle request when a terminator is found."""
data = ''.join(self.input_buffer).strip()
self.input_buffer = []
try:
request = data.decode(ENCODING)
logger.debug(u'Input from [%s]:%s: %s', self.client_address,
self.client_port, indent(request))
self.handle_request(request)
except UnicodeDecodeError as e:
logger.warning(u'Received invalid data: %s', e)
def handle_request(self, request):
"""Handle request by sending it to the MPD frontend."""
my_end, other_end = multiprocessing.Pipe()
self.core_queue.put({
'command': 'mpd_request',
'request': request,
'reply_to': pickle_connection(other_end),
})
my_end.poll(None)
response = my_end.recv()
if response is not None:
self.handle_response(response)
def handle_response(self, response):
"""Handle response from the MPD frontend."""
self.send_response(LINE_TERMINATOR.join(response))
def send_response(self, output):
"""Send a response to the client."""
logger.debug(u'Output to [%s]:%s: %s', self.client_address,
self.client_port, indent(output))
output = u'%s%s' % (output, LINE_TERMINATOR)
data = output.encode(ENCODING)
self.push(data)

View File

@ -0,0 +1,70 @@
import asynchat
import logging
import multiprocessing
from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR, VERSION
from mopidy.utils.log import indent
from mopidy.utils.process import pickle_connection
logger = logging.getLogger('mopidy.frontends.mpd.session')
class MpdSession(asynchat.async_chat):
"""
The MPD client session. Keeps track of a single client and passes its
MPD requests to the dispatcher.
"""
def __init__(self, server, client_socket, client_socket_address,
core_queue):
asynchat.async_chat.__init__(self, sock=client_socket)
self.server = server
self.client_address = client_socket_address[0]
self.client_port = client_socket_address[1]
self.core_queue = core_queue
self.input_buffer = []
self.set_terminator(LINE_TERMINATOR.encode(ENCODING))
def start(self):
"""Start a new client session."""
self.send_response(u'OK MPD %s' % VERSION)
def collect_incoming_data(self, data):
"""Collect incoming data into buffer until a terminator is found."""
self.input_buffer.append(data)
def found_terminator(self):
"""Handle request when a terminator is found."""
data = ''.join(self.input_buffer).strip()
self.input_buffer = []
try:
request = data.decode(ENCODING)
logger.debug(u'Input from [%s]:%s: %s', self.client_address,
self.client_port, indent(request))
self.handle_request(request)
except UnicodeDecodeError as e:
logger.warning(u'Received invalid data: %s', e)
def handle_request(self, request):
"""Handle request by sending it to the MPD frontend."""
my_end, other_end = multiprocessing.Pipe()
self.core_queue.put({
'command': 'mpd_request',
'request': request,
'reply_to': pickle_connection(other_end),
})
my_end.poll(None)
response = my_end.recv()
if response is not None:
self.handle_response(response)
def handle_response(self, response):
"""Handle response from the MPD frontend."""
self.send_response(LINE_TERMINATOR.join(response))
def send_response(self, output):
"""Send a response to the client."""
logger.debug(u'Output to [%s]:%s: %s', self.client_address,
self.client_port, indent(output))
output = u'%s%s' % (output, LINE_TERMINATOR)
data = output.encode(ENCODING)
self.push(data)

View File

@ -4,6 +4,10 @@ class BaseMixer(object):
"""
:param backend: a backend instance
:type mixer: :class:`mopidy.backends.base.BaseBackend`
**Settings:**
- :attr:`mopidy.settings.MIXER_MAX_VOLUME`
"""
def __init__(self, backend, *args, **kwargs):

View File

@ -10,6 +10,10 @@ class AlsaMixer(BaseMixer):
"""
Mixer which uses the Advanced Linux Sound Architecture (ALSA) to control
volume.
**Settings:**
- :attr:`mopidy.settings.MIXER_ALSA_CONTROL`
"""
def __init__(self, *args, **kwargs):

View File

@ -3,8 +3,8 @@ from threading import Lock
from serial import Serial
from mopidy import settings
from mopidy.mixers import BaseMixer
from mopidy.settings import MIXER_EXT_PORT
logger = logging.getLogger(u'mopidy.mixers.denon')
@ -33,7 +33,8 @@ class DenonMixer(BaseMixer):
"""
super(DenonMixer, self).__init__(*args, **kwargs)
device = kwargs.get('device', None)
self._device = device or Serial(port=MIXER_EXT_PORT, timeout=0.2)
self._device = device or Serial(port=settings.MIXER_EXT_PORT,
timeout=0.2)
self._levels = ['99'] + ["%(#)02d" % {'#': v} for v in range(0, 99)]
self._volume = 0
self._lock = Lock()

View File

@ -1,7 +1,7 @@
import multiprocessing
from mopidy.mixers import BaseMixer
from mopidy.process import pickle_connection
from mopidy.utils.process import pickle_connection
class GStreamerSoftwareMixer(BaseMixer):
"""Mixer which uses GStreamer to control volume in software."""

View File

@ -2,10 +2,9 @@ import logging
from serial import Serial
from multiprocessing import Pipe
from mopidy import settings
from mopidy.mixers import BaseMixer
from mopidy.process import BaseProcess
from mopidy.settings import (MIXER_EXT_PORT, MIXER_EXT_SOURCE,
MIXER_EXT_SPEAKERS_A, MIXER_EXT_SPEAKERS_B)
from mopidy.utils.process import BaseProcess
logger = logging.getLogger('mopidy.mixers.nad')
@ -74,7 +73,7 @@ class NadTalker(BaseProcess):
_nad_volume = None
def __init__(self, pipe=None):
super(NadTalker, self).__init__()
super(NadTalker, self).__init__(name='NadTalker')
self.pipe = pipe
self._device = None
@ -91,8 +90,9 @@ class NadTalker(BaseProcess):
def _open_connection(self):
# Opens serial connection to the device.
# Communication settings: 115200 bps 8N1
logger.info(u'Connecting to serial device "%s"', MIXER_EXT_PORT)
self._device = Serial(port=MIXER_EXT_PORT, baudrate=115200,
logger.info(u'Connecting to serial device "%s"',
settings.MIXER_EXT_PORT)
self._device = Serial(port=settings.MIXER_EXT_PORT, baudrate=115200,
timeout=self.TIMEOUT)
self._get_device_model()
@ -114,20 +114,27 @@ class NadTalker(BaseProcess):
self._command_device('Main.Power', 'On')
def _select_speakers(self):
if MIXER_EXT_SPEAKERS_A is not None:
while self._ask_device('Main.SpeakerA') != MIXER_EXT_SPEAKERS_A:
logger.info(u'Setting speakers A "%s"', MIXER_EXT_SPEAKERS_A)
self._command_device('Main.SpeakerA', MIXER_EXT_SPEAKERS_A)
if MIXER_EXT_SPEAKERS_B is not None:
while self._ask_device('Main.SpeakerB') != MIXER_EXT_SPEAKERS_B:
logger.info(u'Setting speakers B "%s"', MIXER_EXT_SPEAKERS_B)
self._command_device('Main.SpeakerB', MIXER_EXT_SPEAKERS_B)
if settings.MIXER_EXT_SPEAKERS_A is not None:
while (self._ask_device('Main.SpeakerA')
!= settings.MIXER_EXT_SPEAKERS_A):
logger.info(u'Setting speakers A "%s"',
settings.MIXER_EXT_SPEAKERS_A)
self._command_device('Main.SpeakerA',
settings.MIXER_EXT_SPEAKERS_A)
if settings.MIXER_EXT_SPEAKERS_B is not None:
while (self._ask_device('Main.SpeakerB') !=
settings.MIXER_EXT_SPEAKERS_B):
logger.info(u'Setting speakers B "%s"',
settings.MIXER_EXT_SPEAKERS_B)
self._command_device('Main.SpeakerB',
settings.MIXER_EXT_SPEAKERS_B)
def _select_input_source(self):
if MIXER_EXT_SOURCE is not None:
while self._ask_device('Main.Source') != MIXER_EXT_SOURCE:
logger.info(u'Selecting input source "%s"', MIXER_EXT_SOURCE)
self._command_device('Main.Source', MIXER_EXT_SOURCE)
if settings.MIXER_EXT_SOURCE is not None:
while self._ask_device('Main.Source') != settings.MIXER_EXT_SOURCE:
logger.info(u'Selecting input source "%s"',
settings.MIXER_EXT_SOURCE)
self._command_device('Main.Source', settings.MIXER_EXT_SOURCE)
def _unmute(self):
while self._ask_device('Main.Mute') != 'Off':

View File

@ -8,7 +8,8 @@ import gst
import logging
import threading
from mopidy.process import BaseProcess, unpickle_connection
from mopidy import settings
from mopidy.utils.process import BaseProcess, unpickle_connection
logger = logging.getLogger('mopidy.outputs.gstreamer')
@ -17,6 +18,10 @@ class GStreamerOutput(object):
Audio output through GStreamer.
Starts the :class:`GStreamerProcess`.
**Settings:**
- :attr:`mopidy.settings.GSTREAMER_AUDIO_SINK`
"""
def __init__(self, core_queue, output_queue):
@ -42,22 +47,11 @@ class GStreamerProcess(BaseProcess):
http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html.
"""
pipeline_description = ' ! '.join([
'appsrc name=src',
'volume name=volume',
'autoaudiosink name=sink',
])
def __init__(self, core_queue, output_queue):
super(GStreamerProcess, self).__init__()
super(GStreamerProcess, self).__init__(name='GStreamerProcess')
self.core_queue = core_queue
self.output_queue = output_queue
self.gst_pipeline = None
self.gst_bus = None
self.gst_bus_id = None
self.gst_uri_bin = None
self.gst_data_src = None
self.gst_volume = None
def run_inside_try(self):
self.setup()
@ -73,16 +67,30 @@ class GStreamerProcess(BaseProcess):
messages_thread.daemon = True
messages_thread.start()
self.gst_pipeline = gst.parse_launch(self.pipeline_description)
self.gst_data_src = self.gst_pipeline.get_by_name('src')
#self.gst_uri_bin = self.gst_pipeline.get_by_name('uri')
self.gst_volume = self.gst_pipeline.get_by_name('volume')
self.gst_pipeline = gst.parse_launch(' ! '.join([
'audioconvert name=convert',
'volume name=volume',
settings.GSTREAMER_AUDIO_SINK,
]))
pad = self.gst_pipeline.get_by_name('convert').get_pad('sink')
if settings.BACKENDS[0] == 'mopidy.backends.local.LocalBackend':
uri_bin = gst.element_factory_make('uridecodebin', 'uri')
uri_bin.connect('pad-added', self.process_new_pad, pad)
self.gst_pipeline.add(uri_bin)
else:
app_src = gst.element_factory_make('appsrc', 'src')
self.gst_pipeline.add(app_src)
app_src.get_pad('src').link(pad)
# Setup bus and message processor
self.gst_bus = self.gst_pipeline.get_bus()
self.gst_bus.add_signal_watch()
self.gst_bus_id = self.gst_bus.connect('message',
self.process_gst_message)
gst_bus = self.gst_pipeline.get_bus()
gst_bus.add_signal_watch()
gst_bus.connect('message', self.process_gst_message)
def process_new_pad(self, source, pad, target_pad):
pad.link(target_pad)
def process_mopidy_message(self, message):
"""Process messages from the rest of Mopidy."""
@ -104,6 +112,14 @@ class GStreamerProcess(BaseProcess):
connection.send(volume)
elif message['command'] == 'set_volume':
self.set_volume(message['volume'])
elif message['command'] == 'set_position':
response = self.set_position(message['position'])
connection = unpickle_connection(message['reply_to'])
connection.send(response)
elif message['command'] == 'get_position':
response = self.get_position()
connection = unpickle_connection(message['reply_to'])
connection.send(response)
else:
logger.warning(u'Cannot handle message: %s', message)
@ -123,16 +139,17 @@ class GStreamerProcess(BaseProcess):
def play_uri(self, uri):
"""Play audio at URI"""
self.set_state('READY')
self.gst_uri_bin.set_property('uri', uri)
self.gst_pipeline.get_by_name('uri').set_property('uri', uri)
return self.set_state('PLAYING')
def deliver_data(self, caps_string, data):
"""Deliver audio data to be played"""
data_src = self.gst_pipeline.get_by_name('src')
caps = gst.caps_from_string(caps_string)
buffer_ = gst.Buffer(buffer(data))
buffer_.set_caps(caps)
self.gst_data_src.set_property('caps', caps)
self.gst_data_src.emit('push-buffer', buffer_)
data_src.set_property('caps', caps)
data_src.emit('push-buffer', buffer_)
def end_of_data_stream(self):
"""
@ -141,7 +158,7 @@ class GStreamerProcess(BaseProcess):
We will get a GStreamer message when the stream playback reaches the
token, and can then do any end-of-stream related tasks.
"""
self.gst_data_src.emit('end-of-stream')
self.gst_pipeline.get_by_name('src').emit('end-of-stream')
def set_state(self, state_name):
"""
@ -171,10 +188,25 @@ class GStreamerProcess(BaseProcess):
def get_volume(self):
"""Get volume in range [0..100]"""
gst_volume = self.gst_volume.get_property('volume')
return int(gst_volume * 100)
gst_volume = self.gst_pipeline.get_by_name('volume')
return int(gst_volume.get_property('volume') * 100)
def set_volume(self, volume):
"""Set volume in range [0..100]"""
gst_volume = volume / 100.0
self.gst_volume.set_property('volume', gst_volume)
gst_volume = self.gst_pipeline.get_by_name('volume')
gst_volume.set_property('volume', volume / 100.0)
def set_position(self, position):
self.gst_pipeline.get_state() # block until state changes are done
handeled = self.gst_pipeline.seek_simple(gst.Format(gst.FORMAT_TIME),
gst.SEEK_FLAG_FLUSH, position * gst.MSECOND)
self.gst_pipeline.get_state() # block until seek is done
return handeled
def get_position(self):
try:
position = self.gst_pipeline.query_position(gst.FORMAT_TIME)[0]
return position // gst.MSECOND
except gst.QueryError, e:
logger.error('time_position failed: %s', e)
return 0

View File

@ -1,77 +0,0 @@
import logging
import multiprocessing
from multiprocessing.reduction import reduce_connection
import pickle
import sys
from mopidy import SettingsError
logger = logging.getLogger('mopidy.process')
def pickle_connection(connection):
return pickle.dumps(reduce_connection(connection))
def unpickle_connection(pickled_connection):
# From http://stackoverflow.com/questions/1446004
(func, args) = pickle.loads(pickled_connection)
return func(*args)
class BaseProcess(multiprocessing.Process):
def run(self):
try:
self.run_inside_try()
except KeyboardInterrupt:
logger.info(u'Interrupted by user')
sys.exit(0)
except SettingsError as e:
logger.error(e.message)
sys.exit(1)
except ImportError as e:
logger.error(e)
sys.exit(1)
def run_inside_try(self):
raise NotImplementedError
class CoreProcess(BaseProcess):
def __init__(self, core_queue, output_class, backend_class,
frontend_class):
super(CoreProcess, self).__init__()
self.core_queue = core_queue
self.output_queue = None
self.output_class = output_class
self.backend_class = backend_class
self.frontend_class = frontend_class
self.output = None
self.backend = None
self.frontend = None
def run_inside_try(self):
self.setup()
while True:
message = self.core_queue.get()
self.process_message(message)
def setup(self):
self.output_queue = multiprocessing.Queue()
self.output = self.output_class(self.core_queue, self.output_queue)
self.backend = self.backend_class(self.core_queue, self.output_queue)
self.frontend = self.frontend_class(self.backend)
def process_message(self, message):
if message.get('to') == 'output':
self.output_queue.put(message)
elif message['command'] == 'mpd_request':
response = self.frontend.handle_request(message['request'])
connection = unpickle_connection(message['reply_to'])
connection.send(response)
elif message['command'] == 'end_of_track':
self.backend.playback.on_end_of_track()
elif message['command'] == 'stop_playback':
self.backend.playback.stop()
elif message['command'] == 'set_stored_playlists':
self.backend.stored_playlists.playlists = message['playlists']
else:
logger.warning(u'Cannot handle message: %s', message)

View File

@ -41,12 +41,22 @@ DUMP_LOG_FORMAT = CONSOLE_LOG_FORMAT
#: DUMP_LOG_FILENAME = u'dump.log'
DUMP_LOG_FILENAME = u'dump.log'
#: Protocol frontend to use.
#: List of server frontends to use.
#:
#: Default::
#:
#: FRONTEND = u'mopidy.frontends.mpd.frontend.MpdFrontend'
FRONTEND = u'mopidy.frontends.mpd.frontend.MpdFrontend'
#: FRONTENDS = (u'mopidy.frontends.mpd.MpdFrontend',)
#:
#: .. note::
#: Currently only the first frontend in the list is used.
FRONTENDS = (u'mopidy.frontends.mpd.MpdFrontend',)
#: Which GStreamer audio sink to use in :mod:`mopidy.outputs.gstreamer`.
#:
#: Default::
#:
#: GSTREAMER_AUDIO_SINK = u'autoaudiosink'
GSTREAMER_AUDIO_SINK = u'autoaudiosink'
#: Path to folder with local music.
#:
@ -127,13 +137,6 @@ MIXER_MAX_VOLUME = 100
#: OUTPUT = u'mopidy.outputs.gstreamer.GStreamerOutput'
OUTPUT = u'mopidy.outputs.gstreamer.GStreamerOutput'
#: Server to use.
#:
#: Default::
#:
#: SERVER = u'mopidy.frontends.mpd.server.MpdServer'
SERVER = u'mopidy.frontends.mpd.server.MpdServer'
#: Which address Mopidy's MPD server should bind to.
#:
#:Examples:

View File

@ -27,12 +27,3 @@ def get_class(name):
except (ImportError, AttributeError):
raise ImportError("Couldn't load: %s" % name)
return class_object
def indent(string, places=4, linebreak='\n'):
lines = string.split(linebreak)
if len(lines) == 1:
return string
result = u''
for line in lines:
result += linebreak + ' ' * places + line
return result

36
mopidy/utils/log.py Normal file
View File

@ -0,0 +1,36 @@
import logging
import logging.handlers
from mopidy import settings
def setup_logging(verbosity_level, dump):
setup_console_logging(verbosity_level)
if dump:
setup_dump_logging()
def setup_console_logging(verbosity_level):
if verbosity_level == 0:
level = logging.WARNING
elif verbosity_level == 2:
level = logging.DEBUG
else:
level = logging.INFO
logging.basicConfig(format=settings.CONSOLE_LOG_FORMAT, level=level)
def setup_dump_logging():
root = logging.getLogger('')
root.setLevel(logging.DEBUG)
formatter = logging.Formatter(settings.DUMP_LOG_FORMAT)
handler = logging.handlers.RotatingFileHandler(
settings.DUMP_LOG_FILENAME, maxBytes=102400, backupCount=3)
handler.setFormatter(formatter)
root.addHandler(handler)
def indent(string, places=4, linebreak='\n'):
lines = string.split(linebreak)
if len(lines) == 1:
return string
result = u''
for line in lines:
result += linebreak + ' ' * places + line
return result

View File

@ -8,10 +8,17 @@ logger = logging.getLogger('mopidy.utils.path')
def get_or_create_folder(folder):
folder = os.path.expanduser(folder)
if not os.path.isdir(folder):
logger.info(u'Creating %s', folder)
logger.info(u'Creating dir %s', folder)
os.mkdir(folder, 0755)
return folder
def get_or_create_file(filename):
filename = os.path.expanduser(filename)
if not os.path.isfile(filename):
logger.info(u'Creating file %s', filename)
open(filename, 'w')
return filename
def path_to_uri(*paths):
path = os.path.join(*paths)
#path = os.path.expanduser(path) # FIXME Waiting for test case?

39
mopidy/utils/process.py Normal file
View File

@ -0,0 +1,39 @@
import logging
import multiprocessing
from multiprocessing.reduction import reduce_connection
import pickle
import sys
from mopidy import SettingsError
logger = logging.getLogger('mopidy.utils.process')
def pickle_connection(connection):
return pickle.dumps(reduce_connection(connection))
def unpickle_connection(pickled_connection):
# From http://stackoverflow.com/questions/1446004
(func, args) = pickle.loads(pickled_connection)
return func(*args)
class BaseProcess(multiprocessing.Process):
def run(self):
logger.debug(u'%s: Starting process', self.name)
try:
self.run_inside_try()
except KeyboardInterrupt:
logger.info(u'%s: Interrupted by user', self.name)
sys.exit(0)
except SettingsError as e:
logger.error(e.message)
sys.exit(1)
except ImportError as e:
logger.error(e)
sys.exit(1)
except Exception as e:
logger.exception(e)
raise e
def run_inside_try(self):
raise NotImplementedError

View File

@ -6,7 +6,7 @@ import os
import sys
from mopidy import SettingsError
from mopidy.utils import indent
from mopidy.utils.log import indent
logger = logging.getLogger('mopidy.utils.settings')
@ -37,7 +37,7 @@ class SettingsProxy(object):
def current(self):
current = copy(self.default)
current.update(self.local)
return current
return current
def __getattr__(self, attr):
if not self._is_setting(attr):
@ -81,6 +81,8 @@ def validate_settings(defaults, settings):
errors = {}
changed = {
'FRONTEND': 'FRONTENDS',
'SERVER': None,
'SERVER_HOSTNAME': 'MPD_SERVER_HOSTNAME',
'SERVER_PORT': 'MPD_SERVER_PORT',
'SPOTIFY_LIB_APPKEY': None,

View File

@ -9,6 +9,11 @@ except ImportError:
class SkipTest(Exception):
pass
from mopidy import settings
# Nuke any local settings to ensure same test env all over
settings.local.clear()
def data_folder(name):
folder = os.path.dirname(__file__)
folder = os.path.join(folder, 'data')

View File

@ -0,0 +1,9 @@
def populate_playlist(func):
def wrapper(self):
for track in self.tracks:
self.backend.current_playlist.add(track)
return func(self)
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper

View File

@ -0,0 +1,256 @@
import multiprocessing
import random
from mopidy import settings
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Playlist, Track
from mopidy.utils import get_class
from tests.backends.base import populate_playlist
class BaseCurrentPlaylistControllerTest(object):
tracks = []
def setUp(self):
self.output_queue = multiprocessing.Queue()
self.core_queue = multiprocessing.Queue()
self.output = get_class(settings.OUTPUT)(
self.core_queue, self.output_queue)
self.backend = self.backend_class(
self.core_queue, self.output_queue, DummyMixer)
self.controller = self.backend.current_playlist
self.playback = self.backend.playback
assert len(self.tracks) == 3, 'Need three tracks to run tests.'
def tearDown(self):
self.backend.destroy()
self.output.destroy()
def test_add(self):
for track in self.tracks:
cp_track = self.controller.add(track)
self.assertEqual(track, self.controller.tracks[-1])
self.assertEqual(cp_track, self.controller.cp_tracks[-1])
self.assertEqual(track, cp_track[1])
def test_add_at_position(self):
for track in self.tracks[:-1]:
cp_track = self.controller.add(track, 0)
self.assertEqual(track, self.controller.tracks[0])
self.assertEqual(cp_track, self.controller.cp_tracks[0])
self.assertEqual(track, cp_track[1])
@populate_playlist
def test_add_at_position_outside_of_playlist(self):
test = lambda: self.controller.add(self.tracks[0], len(self.tracks)+2)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_get_by_cpid(self):
cp_track = self.controller.cp_tracks[1]
self.assertEqual(cp_track, self.controller.get(cpid=cp_track[0]))
@populate_playlist
def test_get_by_uri(self):
cp_track = self.controller.cp_tracks[1]
self.assertEqual(cp_track, self.controller.get(uri=cp_track[1].uri))
@populate_playlist
def test_get_by_uri_raises_error_for_invalid_uri(self):
test = lambda: self.controller.get(uri='foobar')
self.assertRaises(LookupError, test)
@populate_playlist
def test_clear(self):
self.controller.clear()
self.assertEqual(len(self.controller.tracks), 0)
def test_clear_empty_playlist(self):
self.controller.clear()
self.assertEqual(len(self.controller.tracks), 0)
@populate_playlist
def test_clear_when_playing(self):
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.controller.clear()
self.assertEqual(self.playback.state, self.playback.STOPPED)
def test_get_by_uri_returns_unique_match(self):
track = Track(uri='a')
self.controller.append([Track(uri='z'), track, Track(uri='y')])
self.assertEqual(track, self.controller.get(uri='a')[1])
def test_get_by_uri_raises_error_if_multiple_matches(self):
track = Track(uri='a')
self.controller.append([Track(uri='z'), track, track])
try:
self.controller.get(uri='a')
self.fail(u'Should raise LookupError if multiple matches')
except LookupError as e:
self.assertEqual(u'"uri=a" match multiple tracks', e[0])
def test_get_by_uri_raises_error_if_no_match(self):
self.controller.playlist = Playlist(
tracks=[Track(uri='z'), Track(uri='y')])
try:
self.controller.get(uri='a')
self.fail(u'Should raise LookupError if no match')
except LookupError as e:
self.assertEqual(u'"uri=a" match no tracks', e[0])
def test_get_by_multiple_criteria_returns_elements_matching_all(self):
track1 = Track(uri='a', name='x')
track2 = Track(uri='b', name='x')
track3 = Track(uri='b', name='y')
self.controller.append([track1, track2, track3])
self.assertEqual(track1, self.controller.get(uri='a', name='x')[1])
self.assertEqual(track2, self.controller.get(uri='b', name='x')[1])
self.assertEqual(track3, self.controller.get(uri='b', name='y')[1])
def test_get_by_criteria_that_is_not_present_in_all_elements(self):
track1 = Track()
track2 = Track(uri='b')
track3 = Track()
self.controller.append([track1, track2, track3])
self.assertEqual(track2, self.controller.get(uri='b')[1])
def test_append_appends_to_the_current_playlist(self):
self.controller.append([Track(uri='a'), Track(uri='b')])
self.assertEqual(len(self.controller.tracks), 2)
self.controller.append([Track(uri='c'), Track(uri='d')])
self.assertEqual(len(self.controller.tracks), 4)
self.assertEqual(self.controller.tracks[0].uri, 'a')
self.assertEqual(self.controller.tracks[1].uri, 'b')
self.assertEqual(self.controller.tracks[2].uri, 'c')
self.assertEqual(self.controller.tracks[3].uri, 'd')
def test_append_does_not_reset_version(self):
version = self.controller.version
self.controller.append([])
self.assertEqual(self.controller.version, version + 1)
@populate_playlist
def test_append_preserves_playing_state(self):
self.playback.play()
track = self.playback.current_track
self.controller.append(self.controller.tracks[1:2])
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(self.playback.current_track, track)
@populate_playlist
def test_append_preserves_stopped_state(self):
self.controller.append(self.controller.tracks[1:2])
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@populate_playlist
def test_move_single(self):
self.controller.move(0, 0, 2)
tracks = self.controller.tracks
self.assertEqual(tracks[2], self.tracks[0])
@populate_playlist
def test_move_group(self):
self.controller.move(0, 2, 1)
tracks = self.controller.tracks
self.assertEqual(tracks[1], self.tracks[0])
self.assertEqual(tracks[2], self.tracks[1])
@populate_playlist
def test_moving_track_outside_of_playlist(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.move(0, 0, tracks+5)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_move_group_outside_of_playlist(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.move(0, 2, tracks+5)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_move_group_out_of_range(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.move(tracks+2, tracks+3, 0)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_move_group_invalid_group(self):
test = lambda: self.controller.move(2, 1, 0)
self.assertRaises(AssertionError, test)
def test_tracks_attribute_is_immutable(self):
tracks1 = self.controller.tracks
tracks2 = self.controller.tracks
self.assertNotEqual(id(tracks1), id(tracks2))
@populate_playlist
def test_remove(self):
track1 = self.controller.tracks[1]
track2 = self.controller.tracks[2]
version = self.controller.version
self.controller.remove(uri=track1.uri)
self.assert_(version < self.controller.version)
self.assert_(track1 not in self.controller.tracks)
self.assertEqual(track2, self.controller.tracks[1])
@populate_playlist
def test_removing_track_that_does_not_exist(self):
test = lambda: self.controller.remove(uri='/nonexistant')
self.assertRaises(LookupError, test)
def test_removing_from_empty_playlist(self):
test = lambda: self.controller.remove(uri='/nonexistant')
self.assertRaises(LookupError, test)
@populate_playlist
def test_shuffle(self):
random.seed(1)
self.controller.shuffle()
shuffled_tracks = self.controller.tracks
self.assertNotEqual(self.tracks, shuffled_tracks)
self.assertEqual(set(self.tracks), set(shuffled_tracks))
@populate_playlist
def test_shuffle_subset(self):
random.seed(1)
self.controller.shuffle(1, 3)
shuffled_tracks = self.controller.tracks
self.assertNotEqual(self.tracks, shuffled_tracks)
self.assertEqual(self.tracks[0], shuffled_tracks[0])
self.assertEqual(set(self.tracks), set(shuffled_tracks))
@populate_playlist
def test_shuffle_invalid_subset(self):
test = lambda: self.controller.shuffle(3, 1)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_shuffle_superset(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.shuffle(1, tracks+5)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_shuffle_open_subset(self):
random.seed(1)
self.controller.shuffle(1)
shuffled_tracks = self.controller.tracks
self.assertNotEqual(self.tracks, shuffled_tracks)
self.assertEqual(self.tracks[0], shuffled_tracks[0])
self.assertEqual(set(self.tracks), set(shuffled_tracks))
def test_version(self):
version = self.controller.version
self.controller.append([])
self.assert_(version < self.controller.version)

View File

@ -0,0 +1,158 @@
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Playlist, Track, Album, Artist
from tests import SkipTest, data_folder
class BaseLibraryControllerTest(object):
artists = [Artist(name='artist1'), Artist(name='artist2'), Artist()]
albums = [Album(name='album1', artists=artists[:1]),
Album(name='album2', artists=artists[1:2]),
Album()]
tracks = [Track(name='track1', length=4000, artists=artists[:1],
album=albums[0], uri='file://' + data_folder('uri1')),
Track(name='track2', length=4000, artists=artists[1:2],
album=albums[1], uri='file://' + data_folder('uri2')),
Track()]
def setUp(self):
self.backend = self.backend_class(mixer_class=DummyMixer)
self.library = self.backend.library
def tearDown(self):
self.backend.destroy()
def test_refresh(self):
self.library.refresh()
def test_refresh_uri(self):
raise SkipTest
def test_refresh_missing_uri(self):
raise SkipTest
def test_lookup(self):
track = self.library.lookup(self.tracks[0].uri)
self.assertEqual(track, self.tracks[0])
def test_lookup_unknown_track(self):
test = lambda: self.library.lookup('fake uri')
self.assertRaises(LookupError, test)
def test_find_exact_no_hits(self):
result = self.library.find_exact(track=['unknown track'])
self.assertEqual(result, Playlist())
result = self.library.find_exact(artist=['unknown artist'])
self.assertEqual(result, Playlist())
result = self.library.find_exact(album=['unknown artist'])
self.assertEqual(result, Playlist())
def test_find_exact_artist(self):
result = self.library.find_exact(artist=['artist1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.find_exact(artist=['artist2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_find_exact_track(self):
result = self.library.find_exact(track=['track1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.find_exact(track=['track2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_find_exact_album(self):
result = self.library.find_exact(album=['album1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.find_exact(album=['album2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_find_exact_wrong_type(self):
test = lambda: self.library.find_exact(wrong=['test'])
self.assertRaises(LookupError, test)
def test_find_exact_with_empty_query(self):
test = lambda: self.library.find_exact(artist=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.find_exact(track=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.find_exact(album=[''])
self.assertRaises(LookupError, test)
def test_search_no_hits(self):
result = self.library.search(track=['unknown track'])
self.assertEqual(result, Playlist())
result = self.library.search(artist=['unknown artist'])
self.assertEqual(result, Playlist())
result = self.library.search(album=['unknown artist'])
self.assertEqual(result, Playlist())
result = self.library.search(uri=['unknown'])
self.assertEqual(result, Playlist())
result = self.library.search(any=['unknown'])
self.assertEqual(result, Playlist())
def test_search_artist(self):
result = self.library.search(artist=['Tist1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(artist=['Tist2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_track(self):
result = self.library.search(track=['Rack1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(track=['Rack2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_album(self):
result = self.library.search(album=['Bum1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(album=['Bum2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_uri(self):
result = self.library.search(uri=['RI1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(uri=['RI2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_any(self):
result = self.library.search(any=['Tist1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(any=['Rack1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(any=['Bum1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(any=['RI1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
def test_search_wrong_type(self):
test = lambda: self.library.search(wrong=['test'])
self.assertRaises(LookupError, test)
def test_search_with_empty_query(self):
test = lambda: self.library.search(artist=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(track=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(album=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(uri=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(any=[''])
self.assertRaises(LookupError, test)

View File

@ -1,281 +1,25 @@
import os
import multiprocessing
import random
import shutil
import tempfile
import threading
import time
from mopidy import settings
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Playlist, Track, Album, Artist
from tests import SkipTest, data_folder
__all__ = ['BaseCurrentPlaylistControllerTest',
'BasePlaybackControllerTest',
'BaseStoredPlaylistsControllerTest',
'BaseLibraryControllerTest']
def populate_playlist(func):
def wrapper(self):
for track in self.tracks:
self.backend.current_playlist.add(track)
return func(self)
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
class BaseCurrentPlaylistControllerTest(object):
tracks = []
backend_class = None
def setUp(self):
self.backend = self.backend_class(mixer_class=DummyMixer)
self.controller = self.backend.current_playlist
self.playback = self.backend.playback
assert len(self.tracks) == 3, 'Need three tracks to run tests.'
def tearDown(self):
self.backend.destroy()
def test_add(self):
for track in self.tracks:
cp_track = self.controller.add(track)
self.assertEqual(track, self.controller.tracks[-1])
self.assertEqual(cp_track, self.controller.cp_tracks[-1])
self.assertEqual(track, cp_track[1])
def test_add_at_position(self):
for track in self.tracks[:-1]:
cp_track = self.controller.add(track, 0)
self.assertEqual(track, self.controller.tracks[0])
self.assertEqual(cp_track, self.controller.cp_tracks[0])
self.assertEqual(track, cp_track[1])
@populate_playlist
def test_add_at_position_outside_of_playlist(self):
test = lambda: self.controller.add(self.tracks[0], len(self.tracks)+2)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_get_by_cpid(self):
cp_track = self.controller.cp_tracks[1]
self.assertEqual(cp_track, self.controller.get(cpid=cp_track[0]))
@populate_playlist
def test_get_by_uri(self):
cp_track = self.controller.cp_tracks[1]
self.assertEqual(cp_track, self.controller.get(uri=cp_track[1].uri))
@populate_playlist
def test_get_by_uri_raises_error_for_invalid_uri(self):
test = lambda: self.controller.get(uri='foobar')
self.assertRaises(LookupError, test)
@populate_playlist
def test_clear(self):
self.controller.clear()
self.assertEqual(len(self.controller.tracks), 0)
def test_clear_empty_playlist(self):
self.controller.clear()
self.assertEqual(len(self.controller.tracks), 0)
@populate_playlist
def test_clear_when_playing(self):
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.controller.clear()
self.assertEqual(self.playback.state, self.playback.STOPPED)
def test_get_by_uri_returns_unique_match(self):
track = Track(uri='a')
self.controller.append([Track(uri='z'), track, Track(uri='y')])
self.assertEqual(track, self.controller.get(uri='a')[1])
def test_get_by_uri_raises_error_if_multiple_matches(self):
track = Track(uri='a')
self.controller.append([Track(uri='z'), track, track])
try:
self.controller.get(uri='a')
self.fail(u'Should raise LookupError if multiple matches')
except LookupError as e:
self.assertEqual(u'"uri=a" match multiple tracks', e[0])
def test_get_by_uri_raises_error_if_no_match(self):
self.controller.playlist = Playlist(
tracks=[Track(uri='z'), Track(uri='y')])
try:
self.controller.get(uri='a')
self.fail(u'Should raise LookupError if no match')
except LookupError as e:
self.assertEqual(u'"uri=a" match no tracks', e[0])
def test_get_by_multiple_criteria_returns_elements_matching_all(self):
track1 = Track(uri='a', name='x')
track2 = Track(uri='b', name='x')
track3 = Track(uri='b', name='y')
self.controller.append([track1, track2, track3])
self.assertEqual(track1, self.controller.get(uri='a', name='x')[1])
self.assertEqual(track2, self.controller.get(uri='b', name='x')[1])
self.assertEqual(track3, self.controller.get(uri='b', name='y')[1])
def test_get_by_criteria_that_is_not_present_in_all_elements(self):
track1 = Track()
track2 = Track(uri='b')
track3 = Track()
self.controller.append([track1, track2, track3])
self.assertEqual(track2, self.controller.get(uri='b')[1])
def test_append_appends_to_the_current_playlist(self):
self.controller.append([Track(uri='a'), Track(uri='b')])
self.assertEqual(len(self.controller.tracks), 2)
self.controller.append([Track(uri='c'), Track(uri='d')])
self.assertEqual(len(self.controller.tracks), 4)
self.assertEqual(self.controller.tracks[0].uri, 'a')
self.assertEqual(self.controller.tracks[1].uri, 'b')
self.assertEqual(self.controller.tracks[2].uri, 'c')
self.assertEqual(self.controller.tracks[3].uri, 'd')
def test_append_does_not_reset_version(self):
version = self.controller.version
self.controller.append([])
self.assertEqual(self.controller.version, version + 1)
@populate_playlist
def test_append_preserves_playing_state(self):
self.playback.play()
track = self.playback.current_track
self.controller.append(self.controller.tracks[1:2])
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(self.playback.current_track, track)
@populate_playlist
def test_append_preserves_stopped_state(self):
self.controller.append(self.controller.tracks[1:2])
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@populate_playlist
def test_move_single(self):
self.controller.move(0, 0, 2)
tracks = self.controller.tracks
self.assertEqual(tracks[2], self.tracks[0])
@populate_playlist
def test_move_group(self):
self.controller.move(0, 2, 1)
tracks = self.controller.tracks
self.assertEqual(tracks[1], self.tracks[0])
self.assertEqual(tracks[2], self.tracks[1])
@populate_playlist
def test_moving_track_outside_of_playlist(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.move(0, 0, tracks+5)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_move_group_outside_of_playlist(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.move(0, 2, tracks+5)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_move_group_out_of_range(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.move(tracks+2, tracks+3, 0)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_move_group_invalid_group(self):
test = lambda: self.controller.move(2, 1, 0)
self.assertRaises(AssertionError, test)
def test_tracks_attribute_is_immutable(self):
tracks1 = self.controller.tracks
tracks2 = self.controller.tracks
self.assertNotEqual(id(tracks1), id(tracks2))
@populate_playlist
def test_remove(self):
track1 = self.controller.tracks[1]
track2 = self.controller.tracks[2]
version = self.controller.version
self.controller.remove(uri=track1.uri)
self.assert_(version < self.controller.version)
self.assert_(track1 not in self.controller.tracks)
self.assertEqual(track2, self.controller.tracks[1])
@populate_playlist
def test_removing_track_that_does_not_exist(self):
test = lambda: self.controller.remove(uri='/nonexistant')
self.assertRaises(LookupError, test)
def test_removing_from_empty_playlist(self):
test = lambda: self.controller.remove(uri='/nonexistant')
self.assertRaises(LookupError, test)
@populate_playlist
def test_shuffle(self):
random.seed(1)
self.controller.shuffle()
shuffled_tracks = self.controller.tracks
self.assertNotEqual(self.tracks, shuffled_tracks)
self.assertEqual(set(self.tracks), set(shuffled_tracks))
@populate_playlist
def test_shuffle_subset(self):
random.seed(1)
self.controller.shuffle(1, 3)
shuffled_tracks = self.controller.tracks
self.assertNotEqual(self.tracks, shuffled_tracks)
self.assertEqual(self.tracks[0], shuffled_tracks[0])
self.assertEqual(set(self.tracks), set(shuffled_tracks))
@populate_playlist
def test_shuffle_invalid_subset(self):
test = lambda: self.controller.shuffle(3, 1)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_shuffle_superset(self):
tracks = len(self.controller.tracks)
test = lambda: self.controller.shuffle(1, tracks+5)
self.assertRaises(AssertionError, test)
@populate_playlist
def test_shuffle_open_subset(self):
random.seed(1)
self.controller.shuffle(1)
shuffled_tracks = self.controller.tracks
self.assertNotEqual(self.tracks, shuffled_tracks)
self.assertEqual(self.tracks[0], shuffled_tracks[0])
self.assertEqual(set(self.tracks), set(shuffled_tracks))
def test_version(self):
version = self.controller.version
self.controller.append([])
self.assert_(version < self.controller.version)
from mopidy.models import Track
from mopidy.utils import get_class
from tests import SkipTest
from tests.backends.base import populate_playlist
class BasePlaybackControllerTest(object):
tracks = []
backend_class = None
def setUp(self):
self.backend = self.backend_class(mixer_class=DummyMixer)
self.output_queue = multiprocessing.Queue()
self.core_queue = multiprocessing.Queue()
self.output = get_class(settings.OUTPUT)(
self.core_queue, self.output_queue)
self.backend = self.backend_class(
self.core_queue, self.output_queue, DummyMixer)
self.playback = self.backend.playback
self.current_playlist = self.backend.current_playlist
@ -286,6 +30,7 @@ class BasePlaybackControllerTest(object):
def tearDown(self):
self.backend.destroy()
self.output.destroy()
def test_initial_state_is_stopped(self):
self.assertEqual(self.playback.state, self.playback.STOPPED)
@ -335,6 +80,17 @@ class BasePlaybackControllerTest(object):
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(track, self.playback.current_track)
@populate_playlist
def test_play_when_pause_after_next(self):
self.playback.play()
self.playback.next()
self.playback.next()
track = self.playback.current_track
self.playback.pause()
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(track, self.playback.current_track)
@populate_playlist
def test_play_sets_current_track(self):
self.playback.play()
@ -772,23 +528,12 @@ class BasePlaybackControllerTest(object):
self.assert_(wrapper.called)
@populate_playlist
def test_on_end_of_track_gets_called(self):
on_end_of_track = self.playback.on_end_of_track
event = threading.Event()
def wrapper():
result = on_end_of_track()
event.set()
return result
self.playback.on_end_of_track = wrapper
def test_end_of_track_callback_gets_called(self):
self.playback.play()
self.playback.seek(self.tracks[0].length - 10)
event.wait(5)
self.assert_(event.is_set())
result = self.playback.seek(self.tracks[0].length - 10)
self.assert_(result, 'Seek failed')
message = self.core_queue.get()
self.assertEqual('end_of_track', message['command'])
@populate_playlist
def test_on_current_playlist_change_when_playing(self):
@ -871,11 +616,20 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_when_stopped(self):
result = self.playback.seek(1000)
self.assert_(result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_when_stopped_updates_position(self):
self.playback.seek(1000)
position = self.playback.time_position
self.assert_(position >= 990, position)
def test_seek_on_empty_playlist(self):
result = self.playback.seek(0)
self.assert_(not result, 'Seek return value was %s' % result)
def test_seek_on_empty_playlist_updates_position(self):
self.playback.seek(0)
self.assertEqual(self.playback.state, self.playback.STOPPED)
@ -886,6 +640,12 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_when_playing(self):
self.playback.play()
result = self.playback.seek(self.tracks[0].length - 1000)
self.assert_(result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_when_playing_updates_position(self):
length = self.backend.current_playlist.tracks[0].length
self.playback.play()
self.playback.seek(length - 1000)
@ -894,6 +654,13 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_when_paused(self):
self.playback.play()
self.playback.pause()
result = self.playback.seek(self.tracks[0].length - 1000)
self.assert_(result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_when_paused_updates_position(self):
length = self.backend.current_playlist.tracks[0].length
self.playback.play()
self.playback.pause()
@ -910,6 +677,13 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_beyond_end_of_song(self):
raise SkipTest # FIXME need to decide return value
self.playback.play()
result = self.playback.seek(self.tracks[0].length*100)
self.assert_(not result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_beyond_end_of_song_jumps_to_next_song(self):
self.playback.play()
self.playback.seek(self.tracks[0].length*100)
self.assertEqual(self.playback.current_track, self.tracks[1])
@ -922,17 +696,19 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_beyond_start_of_song(self):
raise SkipTest # FIXME need to decide return value
self.playback.play()
result = self.playback.seek(-1000)
self.assert_(not result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_beyond_start_of_song_update_postion(self):
self.playback.play()
self.playback.seek(-1000)
position = self.playback.time_position
self.assert_(position >= 0, position)
self.assertEqual(self.playback.state, self.playback.PLAYING)
@populate_playlist
def test_seek_return_value(self):
self.playback.play()
self.assertEqual(self.playback.seek(0), None)
@populate_playlist
def test_stop_when_stopped(self):
self.playback.stop()
@ -1083,265 +859,3 @@ class BasePlaybackControllerTest(object):
def test_playing_track_that_isnt_in_playlist(self):
test = lambda: self.playback.play((17, Track()))
self.assertRaises(AssertionError, test)
class BaseStoredPlaylistsControllerTest(object):
backend_class = None
def setUp(self):
self.original_playlist_folder = settings.LOCAL_PLAYLIST_FOLDER
self.original_tag_cache = settings.LOCAL_TAG_CACHE
self.original_music_folder = settings.LOCAL_MUSIC_FOLDER
settings.LOCAL_PLAYLIST_FOLDER = tempfile.mkdtemp()
settings.LOCAL_TAG_CACHE = data_folder('library_tag_cache')
settings.LOCAL_MUSIC_FOLDER = data_folder('')
self.backend = self.backend_class(mixer_class=DummyMixer)
self.stored = self.backend.stored_playlists
def tearDown(self):
self.backend.destroy()
if os.path.exists(settings.LOCAL_PLAYLIST_FOLDER):
shutil.rmtree(settings.LOCAL_PLAYLIST_FOLDER)
settings.LOCAL_PLAYLIST_FOLDER = self.original_playlist_folder
settings.LOCAL_TAG_CACHE = self.original_tag_cache
settings.LOCAL_MUSIC_FOLDER = self.original_music_folder
def test_create(self):
playlist = self.stored.create('test')
self.assertEqual(playlist.name, 'test')
def test_create_in_playlists(self):
playlist = self.stored.create('test')
self.assert_(self.stored.playlists)
self.assert_(playlist in self.stored.playlists)
def test_playlists_empty_to_start_with(self):
self.assert_(not self.stored.playlists)
def test_delete_non_existant_playlist(self):
self.stored.delete(Playlist())
def test_delete_playlist(self):
playlist = self.stored.create('test')
self.stored.delete(playlist)
self.assert_(not self.stored.playlists)
def test_get_without_criteria(self):
test = self.stored.get
self.assertRaises(LookupError, test)
def test_get_with_wrong_cirteria(self):
test = lambda: self.stored.get(name='foo')
self.assertRaises(LookupError, test)
def test_get_with_right_criteria(self):
playlist1 = self.stored.create('test')
playlist2 = self.stored.get(name='test')
self.assertEqual(playlist1, playlist2)
def test_get_by_name_returns_unique_match(self):
playlist = Playlist(name='b')
self.stored.playlists = [Playlist(name='a'), playlist]
self.assertEqual(playlist, self.stored.get(name='b'))
def test_get_by_name_returns_first_of_multiple_matches(self):
playlist = Playlist(name='b')
self.stored.playlists = [
playlist, Playlist(name='a'), Playlist(name='b')]
try:
self.stored.get(name='b')
self.fail(u'Should raise LookupError if multiple matches')
except LookupError as e:
self.assertEqual(u'"name=b" match multiple playlists', e[0])
def test_get_by_name_raises_keyerror_if_no_match(self):
self.stored.playlists = [Playlist(name='a'), Playlist(name='b')]
try:
self.stored.get(name='c')
self.fail(u'Should raise LookupError if no match')
except LookupError as e:
self.assertEqual(u'"name=c" match no playlists', e[0])
def test_lookup(self):
raise SkipTest
def test_refresh(self):
raise SkipTest
def test_rename(self):
playlist = self.stored.create('test')
self.stored.rename(playlist, 'test2')
self.stored.get(name='test2')
def test_rename_unknown_playlist(self):
self.stored.rename(Playlist(), 'test2')
test = lambda: self.stored.get(name='test2')
self.assertRaises(LookupError, test)
def test_save(self):
# FIXME should we handle playlists without names?
playlist = Playlist(name='test')
self.stored.save(playlist)
self.assert_(playlist in self.stored.playlists)
def test_playlist_with_unknown_track(self):
raise SkipTest
class BaseLibraryControllerTest(object):
artists = [Artist(name='artist1'), Artist(name='artist2'), Artist()]
albums = [Album(name='album1', artists=artists[:1]),
Album(name='album2', artists=artists[1:2]),
Album()]
tracks = [Track(name='track1', length=4000, artists=artists[:1],
album=albums[0], uri='file://' + data_folder('uri1')),
Track(name='track2', length=4000, artists=artists[1:2],
album=albums[1], uri='file://' + data_folder('uri2')),
Track()]
def setUp(self):
self.backend = self.backend_class(mixer_class=DummyMixer)
self.library = self.backend.library
def tearDown(self):
self.backend.destroy()
def test_refresh(self):
self.library.refresh()
def test_refresh_uri(self):
raise SkipTest
def test_refresh_missing_uri(self):
raise SkipTest
def test_lookup(self):
track = self.library.lookup(self.tracks[0].uri)
self.assertEqual(track, self.tracks[0])
def test_lookup_unknown_track(self):
test = lambda: self.library.lookup('fake uri')
self.assertRaises(LookupError, test)
def test_find_exact_no_hits(self):
result = self.library.find_exact(track=['unknown track'])
self.assertEqual(result, Playlist())
result = self.library.find_exact(artist=['unknown artist'])
self.assertEqual(result, Playlist())
result = self.library.find_exact(album=['unknown artist'])
self.assertEqual(result, Playlist())
def test_find_exact_artist(self):
result = self.library.find_exact(artist=['artist1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.find_exact(artist=['artist2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_find_exact_track(self):
result = self.library.find_exact(track=['track1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.find_exact(track=['track2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_find_exact_album(self):
result = self.library.find_exact(album=['album1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.find_exact(album=['album2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_find_exact_wrong_type(self):
test = lambda: self.library.find_exact(wrong=['test'])
self.assertRaises(LookupError, test)
def test_find_exact_with_empty_query(self):
test = lambda: self.library.find_exact(artist=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.find_exact(track=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.find_exact(album=[''])
self.assertRaises(LookupError, test)
def test_search_no_hits(self):
result = self.library.search(track=['unknown track'])
self.assertEqual(result, Playlist())
result = self.library.search(artist=['unknown artist'])
self.assertEqual(result, Playlist())
result = self.library.search(album=['unknown artist'])
self.assertEqual(result, Playlist())
result = self.library.search(uri=['unknown'])
self.assertEqual(result, Playlist())
result = self.library.search(any=['unknown'])
self.assertEqual(result, Playlist())
def test_search_artist(self):
result = self.library.search(artist=['Tist1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(artist=['Tist2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_track(self):
result = self.library.search(track=['Rack1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(track=['Rack2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_album(self):
result = self.library.search(album=['Bum1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(album=['Bum2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_uri(self):
result = self.library.search(uri=['RI1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(uri=['RI2'])
self.assertEqual(result, Playlist(tracks=self.tracks[1:2]))
def test_search_any(self):
result = self.library.search(any=['Tist1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(any=['Rack1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(any=['Bum1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
result = self.library.search(any=['RI1'])
self.assertEqual(result, Playlist(tracks=self.tracks[:1]))
def test_search_wrong_type(self):
test = lambda: self.library.search(wrong=['test'])
self.assertRaises(LookupError, test)
def test_search_with_empty_query(self):
test = lambda: self.library.search(artist=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(track=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(album=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(uri=[''])
self.assertRaises(LookupError, test)
test = lambda: self.library.search(any=[''])
self.assertRaises(LookupError, test)

View File

@ -0,0 +1,113 @@
import os
import shutil
import tempfile
from mopidy import settings
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Playlist
from tests import SkipTest, data_folder
class BaseStoredPlaylistsControllerTest(object):
def setUp(self):
self.original_playlist_folder = settings.LOCAL_PLAYLIST_FOLDER
self.original_tag_cache = settings.LOCAL_TAG_CACHE
self.original_music_folder = settings.LOCAL_MUSIC_FOLDER
settings.LOCAL_PLAYLIST_FOLDER = tempfile.mkdtemp()
settings.LOCAL_TAG_CACHE = data_folder('library_tag_cache')
settings.LOCAL_MUSIC_FOLDER = data_folder('')
self.backend = self.backend_class(mixer_class=DummyMixer)
self.stored = self.backend.stored_playlists
def tearDown(self):
self.backend.destroy()
if os.path.exists(settings.LOCAL_PLAYLIST_FOLDER):
shutil.rmtree(settings.LOCAL_PLAYLIST_FOLDER)
settings.LOCAL_PLAYLIST_FOLDER = self.original_playlist_folder
settings.LOCAL_TAG_CACHE = self.original_tag_cache
settings.LOCAL_MUSIC_FOLDER = self.original_music_folder
def test_create(self):
playlist = self.stored.create('test')
self.assertEqual(playlist.name, 'test')
def test_create_in_playlists(self):
playlist = self.stored.create('test')
self.assert_(self.stored.playlists)
self.assert_(playlist in self.stored.playlists)
def test_playlists_empty_to_start_with(self):
self.assert_(not self.stored.playlists)
def test_delete_non_existant_playlist(self):
self.stored.delete(Playlist())
def test_delete_playlist(self):
playlist = self.stored.create('test')
self.stored.delete(playlist)
self.assert_(not self.stored.playlists)
def test_get_without_criteria(self):
test = self.stored.get
self.assertRaises(LookupError, test)
def test_get_with_wrong_cirteria(self):
test = lambda: self.stored.get(name='foo')
self.assertRaises(LookupError, test)
def test_get_with_right_criteria(self):
playlist1 = self.stored.create('test')
playlist2 = self.stored.get(name='test')
self.assertEqual(playlist1, playlist2)
def test_get_by_name_returns_unique_match(self):
playlist = Playlist(name='b')
self.stored.playlists = [Playlist(name='a'), playlist]
self.assertEqual(playlist, self.stored.get(name='b'))
def test_get_by_name_returns_first_of_multiple_matches(self):
playlist = Playlist(name='b')
self.stored.playlists = [
playlist, Playlist(name='a'), Playlist(name='b')]
try:
self.stored.get(name='b')
self.fail(u'Should raise LookupError if multiple matches')
except LookupError as e:
self.assertEqual(u'"name=b" match multiple playlists', e[0])
def test_get_by_name_raises_keyerror_if_no_match(self):
self.stored.playlists = [Playlist(name='a'), Playlist(name='b')]
try:
self.stored.get(name='c')
self.fail(u'Should raise LookupError if no match')
except LookupError as e:
self.assertEqual(u'"name=c" match no playlists', e[0])
def test_lookup(self):
raise SkipTest
def test_refresh(self):
raise SkipTest
def test_rename(self):
playlist = self.stored.create('test')
self.stored.rename(playlist, 'test2')
self.stored.get(name='test2')
def test_rename_unknown_playlist(self):
self.stored.rename(Playlist(), 'test2')
test = lambda: self.stored.get(name='test2')
self.assertRaises(LookupError, test)
def test_save(self):
# FIXME should we handle playlists without names?
playlist = Playlist(name='test')
self.stored.save(playlist)
self.assert_(playlist in self.stored.playlists)
def test_playlist_with_unknown_track(self):
raise SkipTest

View File

View File

@ -5,7 +5,12 @@ import unittest
from mopidy.backends.libspotify import LibspotifyBackend
from mopidy.models import Track
from tests.backends.base import *
from tests.backends.base.current_playlist import \
BaseCurrentPlaylistControllerTest
from tests.backends.base.library import BaseLibraryControllerTest
from tests.backends.base.playback import BasePlaybackControllerTest
from tests.backends.base.stored_playlists import \
BaseStoredPlaylistsControllerTest
uris = [
'spotify:track:6vqcpVcbI3Zu6sH3ieLDNt',
@ -15,28 +20,25 @@ uris = [
class LibspotifyCurrentPlaylistControllerTest(
BaseCurrentPlaylistControllerTest, unittest.TestCase):
tracks = [Track(uri=uri, length=4464) for i, uri in enumerate(uris)]
backend_class = LibspotifyBackend
tracks = [Track(uri=uri, length=4464) for i, uri in enumerate(uris)]
class LibspotifyPlaybackControllerTest(
BasePlaybackControllerTest, unittest.TestCase):
tracks = [Track(uri=uri, length=4464) for i, uri in enumerate(uris)]
backend_class = LibspotifyBackend
tracks = [Track(uri=uri, length=4464) for i, uri in enumerate(uris)]
class LibspotifyStoredPlaylistsControllerTest(
BaseStoredPlaylistsControllerTest, unittest.TestCase):
backend_class = LibspotifyBackend
class LibspotifyLibraryControllerTest(
BaseLibraryControllerTest, unittest.TestCase):
backend_class = LibspotifyBackend
# TODO Plug this into the backend under test to avoid music output during
# testing.
class DummyAudioController(object):
def music_delivery(self, *args, **kwargs):
pass

View File

@ -0,0 +1,6 @@
from mopidy.utils.path import path_to_uri
from tests import data_folder
song = data_folder('song%s.wav')
generate_song = lambda i: path_to_uri(song % i)

View File

@ -0,0 +1,31 @@
import unittest
# FIXME Our Windows build server does not support GStreamer yet
import sys
if sys.platform == 'win32':
from tests import SkipTest
raise SkipTest
from mopidy import settings
from mopidy.backends.local import LocalBackend
from mopidy.models import Track
from tests.backends.base.current_playlist import \
BaseCurrentPlaylistControllerTest
from tests.backends.local import generate_song
class LocalCurrentPlaylistControllerTest(BaseCurrentPlaylistControllerTest,
unittest.TestCase):
backend_class = LocalBackend
tracks = [Track(uri=generate_song(i), length=4464)
for i in range(1, 4)]
def setUp(self):
self.original_backends = settings.BACKENDS
settings.BACKENDS = ('mopidy.backends.local.LocalBackend',)
super(LocalCurrentPlaylistControllerTest, self).setUp()
def tearDown(self):
super(LocalCurrentPlaylistControllerTest, self).tearDown()
settings.BACKENDS = settings.original_backends

View File

@ -0,0 +1,32 @@
import unittest
# FIXME Our Windows build server does not support GStreamer yet
import sys
if sys.platform == 'win32':
from tests import SkipTest
raise SkipTest
from mopidy import settings
from mopidy.backends.local import LocalBackend
from tests import data_folder
from tests.backends.base.library import BaseLibraryControllerTest
class LocalLibraryControllerTest(BaseLibraryControllerTest, unittest.TestCase):
backend_class = LocalBackend
def setUp(self):
self.original_tag_cache = settings.LOCAL_TAG_CACHE
self.original_music_folder = settings.LOCAL_MUSIC_FOLDER
settings.LOCAL_TAG_CACHE = data_folder('library_tag_cache')
settings.LOCAL_MUSIC_FOLDER = data_folder('')
super(LocalLibraryControllerTest, self).setUp()
def tearDown(self):
settings.LOCAL_TAG_CACHE = self.original_tag_cache
settings.LOCAL_MUSIC_FOLDER = self.original_music_folder
super(LocalLibraryControllerTest, self).tearDown()

View File

@ -0,0 +1,58 @@
import unittest
# FIXME Our Windows build server does not support GStreamer yet
import sys
if sys.platform == 'win32':
from tests import SkipTest
raise SkipTest
from mopidy import settings
from mopidy.backends.local import LocalBackend
from mopidy.models import Track
from mopidy.utils.path import path_to_uri
from tests import data_folder
from tests.backends.base.playback import BasePlaybackControllerTest
from tests.backends.local import generate_song
class LocalPlaybackControllerTest(BasePlaybackControllerTest,
unittest.TestCase):
backend_class = LocalBackend
tracks = [Track(uri=generate_song(i), length=4464)
for i in range(1, 4)]
def setUp(self):
self.original_backends = settings.BACKENDS
settings.BACKENDS = ('mopidy.backends.local.LocalBackend',)
super(LocalPlaybackControllerTest, self).setUp()
# Two tests does not work at all when using the fake sink
#self.backend.playback.use_fake_sink()
def tearDown(self):
super(LocalPlaybackControllerTest, self).tearDown()
settings.BACKENDS = settings.original_backends
def add_track(self, path):
uri = path_to_uri(data_folder(path))
track = Track(uri=uri, length=4464)
self.backend.current_playlist.add(track)
def test_uri_handler(self):
self.assert_('file://' in self.backend.uri_handlers)
def test_play_mp3(self):
self.add_track('blank.mp3')
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
def test_play_ogg(self):
self.add_track('blank.ogg')
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
def test_play_flac(self):
self.add_track('blank.flac')
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)

View File

@ -1,10 +1,11 @@
import unittest
import os
from tests import SkipTest
# FIXME Our Windows build server does not support GStreamer yet
import sys
if sys.platform == 'win32':
from tests import SkipTest
raise SkipTest
from mopidy import settings
@ -13,55 +14,10 @@ from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Playlist, Track
from mopidy.utils.path import path_to_uri
from tests.backends.base import *
from tests import SkipTest, data_folder
song = data_folder('song%s.wav')
generate_song = lambda i: path_to_uri(song % i)
# FIXME can be switched to generic test
class LocalCurrentPlaylistControllerTest(BaseCurrentPlaylistControllerTest,
unittest.TestCase):
tracks = [Track(uri=generate_song(i), length=4464)
for i in range(1, 4)]
backend_class = LocalBackend
class LocalPlaybackControllerTest(BasePlaybackControllerTest,
unittest.TestCase):
tracks = [Track(uri=generate_song(i), length=4464)
for i in range(1, 4)]
backend_class = LocalBackend
def setUp(self):
super(LocalPlaybackControllerTest, self).setUp()
# Two tests does not work at all when using the fake sink
#self.backend.playback.use_fake_sink()
def add_track(self, path):
uri = path_to_uri(data_folder(path))
track = Track(uri=uri, length=4464)
self.backend.current_playlist.add(track)
def test_uri_handler(self):
self.assert_('file://' in self.backend.uri_handlers)
def test_play_mp3(self):
self.add_track('blank.mp3')
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
def test_play_ogg(self):
self.add_track('blank.ogg')
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
def test_play_flac(self):
self.add_track('blank.flac')
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
from tests import data_folder
from tests.backends.base.stored_playlists import \
BaseStoredPlaylistsControllerTest
from tests.backends.local import generate_song
class LocalStoredPlaylistsControllerTest(BaseStoredPlaylistsControllerTest,
unittest.TestCase):
@ -133,27 +89,3 @@ class LocalStoredPlaylistsControllerTest(BaseStoredPlaylistsControllerTest,
def test_save_sets_playlist_uri(self):
raise SkipTest
class LocalLibraryControllerTest(BaseLibraryControllerTest,
unittest.TestCase):
backend_class = LocalBackend
def setUp(self):
self.original_tag_cache = settings.LOCAL_TAG_CACHE
self.original_music_folder = settings.LOCAL_MUSIC_FOLDER
settings.LOCAL_TAG_CACHE = data_folder('library_tag_cache')
settings.LOCAL_MUSIC_FOLDER = data_folder('')
super(LocalLibraryControllerTest, self).setUp()
def tearDown(self):
settings.LOCAL_TAG_CACHE = self.original_tag_cache
settings.LOCAL_MUSIC_FOLDER = self.original_music_folder
super(LocalLibraryControllerTest, self).tearDown()
if __name__ == '__main__':
unittest.main()

View File

@ -1,13 +1,13 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
class AudioOutputHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_enableoutput(self):
result = self.h.handle_request(u'enableoutput "0"')

View File

@ -1,13 +1,13 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
class CommandListsTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_command_list_begin(self):
result = self.h.handle_request(u'command_list_begin')

View File

@ -1,13 +1,13 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
class ConnectionHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_close(self):
result = self.h.handle_request(u'close')

View File

@ -1,14 +1,14 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Track
class CurrentPlaylistHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_add(self):
needle = Track(uri='dummy://foo')

View File

@ -1,19 +1,21 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend, MpdAckError
from mopidy.frontends.mpd import dispatcher
from mopidy.frontends.mpd.exceptions import MpdAckError
from mopidy.frontends.mpd.protocol import request_handlers, handle_pattern
from mopidy.mixers.dummy import DummyMixer
class RequestHandlerTest(unittest.TestCase):
class MpdDispatcherTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_register_same_pattern_twice_fails(self):
func = lambda: None
try:
frontend.handle_pattern('a pattern')(func)
frontend.handle_pattern('a pattern')(func)
handle_pattern('a pattern')(func)
handle_pattern('a pattern')(func)
self.fail('Registering a pattern twice shoulde raise ValueError')
except ValueError:
pass
@ -28,7 +30,7 @@ class RequestHandlerTest(unittest.TestCase):
def test_finding_handler_for_known_command_returns_handler_and_kwargs(self):
expected_handler = lambda x: None
frontend.request_handlers['known_command (?P<arg1>.+)'] = \
request_handlers['known_command (?P<arg1>.+)'] = \
expected_handler
(handler, kwargs) = self.h.find_handler('known_command an_arg')
self.assertEqual(handler, expected_handler)
@ -41,7 +43,7 @@ class RequestHandlerTest(unittest.TestCase):
def test_handling_known_request(self):
expected = 'magic'
frontend.request_handlers['known request'] = lambda x: expected
request_handlers['known request'] = lambda x: expected
result = self.h.handle_request('known request')
self.assert_(u'OK' in result)
self.assert_(expected in result)

View File

@ -1,6 +1,6 @@
import unittest
from mopidy.frontends.mpd import (MpdAckError, MpdUnknownCommand,
from mopidy.frontends.mpd.exceptions import (MpdAckError, MpdUnknownCommand,
MpdNotImplemented)
class MpdExceptionsTest(unittest.TestCase):

View File

@ -1,13 +1,13 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
class MusicDatabaseHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_count(self):
result = self.h.handle_request(u'count "tag" "needle"')

View File

@ -1,14 +1,14 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Track
class PlaybackOptionsHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_consume_off(self):
result = self.h.handle_request(u'consume "0"')
@ -167,7 +167,7 @@ class PlaybackOptionsHandlerTest(unittest.TestCase):
class PlaybackControlHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_next(self):
result = self.h.handle_request(u'next')

View File

@ -1,13 +1,13 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
class ReflectionHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_commands_returns_list_of_all_commands(self):
result = self.h.handle_request(u'commands')

View File

@ -1,14 +1,14 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Track
class StatusHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_clearerror(self):
result = self.h.handle_request(u'clearerror')
@ -51,7 +51,7 @@ class StatusHandlerTest(unittest.TestCase):
self.assert_(u'OK' in result)
def test_stats_method(self):
result = frontend.status.stats(self.h)
result = dispatcher.status.stats(self.h)
self.assert_('artists' in result)
self.assert_(int(result['artists']) >= 0)
self.assert_('albums' in result)
@ -72,106 +72,106 @@ class StatusHandlerTest(unittest.TestCase):
self.assert_(u'OK' in result)
def test_status_method_contains_volume_which_defaults_to_0(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('volume' in result)
self.assertEqual(int(result['volume']), 0)
def test_status_method_contains_volume(self):
self.b.mixer.volume = 17
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('volume' in result)
self.assertEqual(int(result['volume']), 17)
def test_status_method_contains_repeat_is_0(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('repeat' in result)
self.assertEqual(int(result['repeat']), 0)
def test_status_method_contains_repeat_is_1(self):
self.b.playback.repeat = 1
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('repeat' in result)
self.assertEqual(int(result['repeat']), 1)
def test_status_method_contains_random_is_0(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('random' in result)
self.assertEqual(int(result['random']), 0)
def test_status_method_contains_random_is_1(self):
self.b.playback.random = 1
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('random' in result)
self.assertEqual(int(result['random']), 1)
def test_status_method_contains_single(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('single' in result)
self.assert_(int(result['single']) in (0, 1))
def test_status_method_contains_consume_is_0(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('consume' in result)
self.assertEqual(int(result['consume']), 0)
def test_status_method_contains_consume_is_1(self):
self.b.playback.consume = 1
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('consume' in result)
self.assertEqual(int(result['consume']), 1)
def test_status_method_contains_playlist(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('playlist' in result)
self.assert_(int(result['playlist']) in xrange(0, 2**31 - 1))
def test_status_method_contains_playlistlength(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('playlistlength' in result)
self.assert_(int(result['playlistlength']) >= 0)
def test_status_method_contains_xfade(self):
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('xfade' in result)
self.assert_(int(result['xfade']) >= 0)
def test_status_method_contains_state_is_play(self):
self.b.playback.state = self.b.playback.PLAYING
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('state' in result)
self.assertEqual(result['state'], 'play')
def test_status_method_contains_state_is_stop(self):
self.b.playback.state = self.b.playback.STOPPED
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('state' in result)
self.assertEqual(result['state'], 'stop')
def test_status_method_contains_state_is_pause(self):
self.b.playback.state = self.b.playback.PLAYING
self.b.playback.state = self.b.playback.PAUSED
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('state' in result)
self.assertEqual(result['state'], 'pause')
def test_status_method_when_playlist_loaded_contains_song(self):
self.b.current_playlist.append([Track()])
self.b.playback.play()
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('song' in result)
self.assert_(int(result['song']) >= 0)
def test_status_method_when_playlist_loaded_contains_cpid_as_songid(self):
self.b.current_playlist.append([Track()])
self.b.playback.play()
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('songid' in result)
self.assertEqual(int(result['songid']), 1)
def test_status_method_when_playing_contains_time_with_no_length(self):
self.b.current_playlist.append([Track(length=None)])
self.b.playback.play()
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('time' in result)
(position, total) = result['time'].split(':')
position = int(position)
@ -181,7 +181,7 @@ class StatusHandlerTest(unittest.TestCase):
def test_status_method_when_playing_contains_time_with_length(self):
self.b.current_playlist.append([Track(length=10000)])
self.b.playback.play()
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('time' in result)
(position, total) = result['time'].split(':')
position = int(position)
@ -191,13 +191,13 @@ class StatusHandlerTest(unittest.TestCase):
def test_status_method_when_playing_contains_elapsed(self):
self.b.playback.state = self.b.playback.PAUSED
self.b.playback._play_time_accumulated = 59123
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('elapsed' in result)
self.assertEqual(int(result['elapsed']), 59123)
def test_status_method_when_playing_contains_bitrate(self):
self.b.current_playlist.append([Track(bitrate=320)])
self.b.playback.play()
result = dict(frontend.status.status(self.h))
result = dict(dispatcher.status.status(self.h))
self.assert_('bitrate' in result)
self.assertEqual(int(result['bitrate']), 320)

View File

@ -1,13 +1,13 @@
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
class StickersHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_sticker_get(self):
result = self.h.handle_request(

View File

@ -2,14 +2,14 @@ import datetime as dt
import unittest
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import frontend
from mopidy.frontends.mpd import dispatcher
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Track, Playlist
class StoredPlaylistsHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)
self.h = frontend.MpdFrontend(backend=self.b)
self.h = dispatcher.MpdDispatcher(backend=self.b)
def test_listplaylist(self):
self.b.stored_playlists.playlists = [

View File

@ -1,14 +1,17 @@
import multiprocessing
import unittest
from mopidy import settings
from mopidy.outputs.gstreamer import GStreamerOutput
from mopidy.process import pickle_connection
from mopidy.utils.path import path_to_uri
from mopidy.utils.process import pickle_connection
from tests import data_folder, SkipTest
class GStreamerOutputTest(unittest.TestCase):
def setUp(self):
self.original_backends = settings.BACKENDS
settings.BACKENDS = ('mopidy.backends.local.LocalBackend',)
self.song_uri = path_to_uri(data_folder('song1.wav'))
self.output_queue = multiprocessing.Queue()
self.core_queue = multiprocessing.Queue()
@ -16,6 +19,7 @@ class GStreamerOutputTest(unittest.TestCase):
def tearDown(self):
self.output.destroy()
settings.BACKENDS = settings.original_backends
def send_recv(self, message):
(my_end, other_end) = multiprocessing.Pipe()
@ -24,15 +28,14 @@ class GStreamerOutputTest(unittest.TestCase):
my_end.poll(None)
return my_end.recv()
def send(self, message):
self.output_queue.put(message)
@SkipTest
def test_play_uri_existing_file(self):
message = {'command': 'play_uri', 'uri': self.song_uri}
self.assertEqual(True, self.send_recv(message))
@SkipTest
def test_play_uri_non_existing_file(self):
message = {'command': 'play_uri', 'uri': self.song_uri + 'bogus'}
self.assertEqual(False, self.send_recv(message))

View File

@ -9,10 +9,9 @@ class VersionTest(unittest.TestCase):
def test_versions_can_be_strictly_ordered(self):
self.assert_(SV('0.1.0a0') < SV('0.1.0a1'))
self.assert_(SV('0.1.0a2') < SV(get_version()))
self.assert_(SV('0.1.0a1') < SV('0.1.0a2'))
self.assert_(SV('0.1.0a2') < SV('0.1.0a3'))
self.assert_(SV('0.1.0a3') < SV(get_version()))
self.assert_(SV(get_version()) < SV('0.1.0a5'))
self.assert_(SV('0.1.0a0') < SV('0.1.0'))
self.assert_(SV('0.1.0') < SV('0.1.1'))
self.assert_(SV(get_version()) < SV('0.1.1'))
self.assert_(SV('0.1.1') < SV('0.2.0'))
self.assert_(SV('0.2.0') < SV('1.0.0'))