Merge branch 'gstreamer' into gstreamer-local-backend

Conflicts:
	mopidy/backends/local/__init__.py
	tests/backends/base.py
This commit is contained in:
Thomas Adamcik 2010-08-18 00:55:24 +02:00
commit 8203d45559
20 changed files with 614 additions and 316 deletions

View File

@ -31,6 +31,10 @@ greatly improved MPD client support.
**Changes**
- Exit early if not Python >= 2.6, < 3.
- Validate settings at startup and print useful error messages if the settings
has not been updated or anything is misspelled.
- Add command line option :option:`--list-settings` to print the currently
active settings.
- Include Sphinx scripts for building docs, pylintrc, tests and test data in
the packages created by ``setup.py`` for i.e. PyPI.
- MPD frontend:
@ -205,8 +209,8 @@ the established pace of at least a release per month.
- Improvements to MPD protocol handling, making Mopidy work much better with a
group of clients, including ncmpc, MPoD, and Theremin.
- New command line flag ``--dump`` for dumping debug log to ``dump.log`` in the
current directory.
- New command line flag :option:`--dump` for dumping debug log to ``dump.log``
in the current directory.
- New setting :attr:`mopidy.settings.MIXER_ALSA_CONTROL` for forcing what ALSA
control :class:`mopidy.mixers.alsa.AlsaMixer` should use.

View File

@ -2,8 +2,6 @@ import sys
if not (2, 6) <= sys.version_info < (3,):
sys.exit(u'Mopidy requires Python >= 2.6, < 3')
from mopidy import settings as raw_settings
def get_version():
return u'0.1.0a4'
@ -27,13 +25,6 @@ class MopidyException(Exception):
class SettingsError(MopidyException):
pass
class Settings(object):
def __getattr__(self, attr):
if attr.isupper() and not hasattr(raw_settings, attr):
raise SettingsError(u'Setting "%s" is not set.' % attr)
value = getattr(raw_settings, attr)
if type(value) != bool and not value:
raise SettingsError(u'Setting "%s" is empty.' % attr)
return value
settings = Settings()
from mopidy import settings as default_settings_module
from mopidy.utils.settings import SettingsProxy
settings = SettingsProxy(default_settings_module)

View File

@ -11,13 +11,16 @@ sys.path.insert(0,
from mopidy import get_version, settings, SettingsError
from mopidy.process import CoreProcess
from mopidy.utils import get_class, get_or_create_folder
from mopidy.utils import get_class
from mopidy.utils.path import get_or_create_folder
from mopidy.utils.settings import list_settings_optparse_callback
logger = logging.getLogger('mopidy.main')
def main():
options = _parse_options()
_setup_logging(options.verbosity_level, options.dump)
settings.validate()
logger.info('-- Starting Mopidy --')
get_or_create_folder('~/.mopidy/')
core_queue = multiprocessing.Queue()
@ -40,6 +43,9 @@ def _parse_options():
parser.add_option('--dump',
action='store_true', dest='dump',
help='dump debug log to file')
parser.add_option('--list-settings',
action='callback', callback=list_settings_optparse_callback,
help='list current settings')
return parser.parse_args()[0]
def _setup_logging(verbosity_level, dump):

View File

@ -4,12 +4,14 @@ import glob
import shutil
import multiprocessing
from mopidy import settings
from mopidy.backends.base import *
from mopidy.models import Playlist, Track, Album
from mopidy import settings
from mopidy.utils import parse_m3u, parse_mpd_tag_cache
from mopidy.process import pickle_connection
from .translator import parse_m3u, parse_mpd_tag_cache
logger = logging.getLogger(u'mopidy.backends.local')
class LocalBackend(BaseBackend):

View File

@ -1,59 +1,10 @@
import logging
import os
import sys
import urllib
logger = logging.getLogger('mopidy.utils')
logger = logging.getLogger('mopidy.backends.local.translator')
from mopidy.models import Track, Artist, Album
def flatten(the_list):
result = []
for element in the_list:
if isinstance(element, list):
result.extend(flatten(element))
else:
result.append(element)
return result
def import_module(name):
__import__(name)
return sys.modules[name]
def get_class(name):
module_name = name[:name.rindex('.')]
class_name = name[name.rindex('.') + 1:]
logger.debug('Loading: %s', name)
try:
module = import_module(module_name)
class_object = getattr(module, class_name)
except (ImportError, AttributeError):
raise ImportError("Couldn't load: %s" % name)
return class_object
def get_or_create_folder(folder):
folder = os.path.expanduser(folder)
if not os.path.isdir(folder):
logger.info(u'Creating %s', folder)
os.mkdir(folder, 0755)
return folder
def path_to_uri(*paths):
path = os.path.join(*paths)
#path = os.path.expanduser(path) # FIXME
path = path.encode('utf-8')
if sys.platform == 'win32':
return 'file:' + urllib.pathname2url(path)
return 'file://' + urllib.pathname2url(path)
def indent(string, places=4, linebreak='\n'):
lines = string.split(linebreak)
if len(lines) == 1:
return string
result = u''
for line in lines:
result += linebreak + ' ' * places + line
return result
from mopidy.utils.path import path_to_uri
def parse_m3u(file_path):
"""

View File

@ -94,7 +94,7 @@ def load(frontend, name):
try:
playlist = frontend.backend.stored_playlists.get(name=name)
frontend.backend.current_playlist.append(playlist.tracks)
except LookupError as e:
except LookupError:
raise MpdNoExistError(u'No such playlist', command=u'load')
@handle_pattern(r'^playlistadd "(?P<name>[^"]+)" "(?P<uri>[^"]+)"$')

View File

@ -7,11 +7,6 @@ Available settings and their default values.
file called ``~/.mopidy/settings.py`` and redefine settings there.
"""
# Absolute import needed to import ~/.mopidy/settings.py and not ourselves
from __future__ import absolute_import
import os
import sys
#: List of playback backends to use. See :mod:`mopidy.backends` for all
#: available backends.
#:
@ -172,10 +167,3 @@ SPOTIFY_USERNAME = u''
#:
#: Used by :mod:`mopidy.backends.libspotify`.
SPOTIFY_PASSWORD = u''
# Import user specific settings
dotdir = os.path.expanduser(u'~/.mopidy/')
settings_file = os.path.join(dotdir, u'settings.py')
if os.path.isfile(settings_file):
sys.path.insert(0, dotdir)
from settings import *

38
mopidy/utils/__init__.py Normal file
View File

@ -0,0 +1,38 @@
import logging
import os
import sys
logger = logging.getLogger('mopidy.utils')
def flatten(the_list):
result = []
for element in the_list:
if isinstance(element, list):
result.extend(flatten(element))
else:
result.append(element)
return result
def import_module(name):
__import__(name)
return sys.modules[name]
def get_class(name):
module_name = name[:name.rindex('.')]
class_name = name[name.rindex('.') + 1:]
logger.debug('Loading: %s', name)
try:
module = import_module(module_name)
class_object = getattr(module, class_name)
except (ImportError, AttributeError):
raise ImportError("Couldn't load: %s" % name)
return class_object
def indent(string, places=4, linebreak='\n'):
lines = string.split(linebreak)
if len(lines) == 1:
return string
result = u''
for line in lines:
result += linebreak + ' ' * places + line
return result

21
mopidy/utils/path.py Normal file
View File

@ -0,0 +1,21 @@
import logging
import os
import sys
import urllib
logger = logging.getLogger('mopidy.utils.path')
def get_or_create_folder(folder):
folder = os.path.expanduser(folder)
if not os.path.isdir(folder):
logger.info(u'Creating %s', folder)
os.mkdir(folder, 0755)
return folder
def path_to_uri(*paths):
path = os.path.join(*paths)
#path = os.path.expanduser(path) # FIXME Waiting for test case?
path = path.encode('utf-8')
if sys.platform == 'win32':
return 'file:' + urllib.pathname2url(path)
return 'file://' + urllib.pathname2url(path)

128
mopidy/utils/settings.py Normal file
View File

@ -0,0 +1,128 @@
# Absolute import needed to import ~/.mopidy/settings.py and not ourselves
from __future__ import absolute_import
from copy import copy
import logging
import os
import sys
from mopidy import SettingsError
from mopidy.utils import indent
logger = logging.getLogger('mopidy.utils.settings')
class SettingsProxy(object):
def __init__(self, default_settings_module):
self.default_settings = self._get_settings_dict_from_module(
default_settings_module)
self.local_settings = self._get_local_settings()
self.raw_settings = copy(self.default_settings)
self.raw_settings.update(self.local_settings)
def _get_local_settings(self):
dotdir = os.path.expanduser(u'~/.mopidy/')
settings_file = os.path.join(dotdir, u'settings.py')
if not os.path.isfile(settings_file):
return {}
sys.path.insert(0, dotdir)
import settings as local_settings_module
return self._get_settings_dict_from_module(local_settings_module)
def _get_settings_dict_from_module(self, module):
settings = filter(lambda (key, value): self._is_setting(key),
module.__dict__.iteritems())
return dict(settings)
def _is_setting(self, name):
return name.isupper()
def __getattr__(self, attr):
if not self._is_setting(attr):
return
if attr not in self.raw_settings:
raise SettingsError(u'Setting "%s" is not set.' % attr)
value = self.raw_settings[attr]
if type(value) != bool and not value:
raise SettingsError(u'Setting "%s" is empty.' % attr)
return value
def validate(self):
if self.get_errors():
logger.error(u'Settings validation errors: %s',
indent(self.get_errors_as_string()))
raise SettingsError(u'Settings validation failed.')
def get_errors(self):
return validate_settings(self.default_settings, self.local_settings)
def get_errors_as_string(self):
lines = []
for (setting, error) in self.get_errors().iteritems():
lines.append(u'%s: %s' % (setting, error))
return '\n'.join(lines)
def validate_settings(defaults, settings):
"""
Checks the settings for both errors like misspellings and against a set of
rules for renamed settings, etc.
Returns of setting names with associated errors.
:param defaults: Mopidy's default settings
:type defaults: dict
:param settings: the user's local settings
:type settings: dict
:rtype: dict
"""
errors = {}
changed = {
'SERVER_HOSTNAME': 'MPD_SERVER_HOSTNAME',
'SERVER_PORT': 'MPD_SERVER_PORT',
'SPOTIFY_LIB_APPKEY': None,
}
for setting, value in settings.iteritems():
if setting in changed:
if changed[setting] is None:
errors[setting] = u'Deprecated setting. It may be removed.'
else:
errors[setting] = u'Deprecated setting. Use %s.' % (
changed[setting],)
continue
if setting == 'BACKENDS':
if 'mopidy.backends.despotify.DespotifyBackend' in value:
errors[setting] = (u'Deprecated setting value. ' +
'"mopidy.backends.despotify.DespotifyBackend" is no ' +
'longer available.')
continue
if setting not in defaults:
errors[setting] = u'Unknown setting. Is it misspelled?'
continue
return errors
def list_settings_optparse_callback(*args):
"""
Prints a list of all settings.
Called by optparse when Mopidy is run with the :option:`--list-settings`
option.
"""
from mopidy import settings
errors = settings.get_errors()
lines = []
for (key, value) in sorted(settings.raw_settings.iteritems()):
default_value = settings.default_settings.get(key)
if key.endswith('PASSWORD'):
value = u'********'
lines.append(u'%s:' % key)
lines.append(u' Value: %s' % repr(value))
if value != default_value and default_value is not None:
lines.append(u' Default: %s' % repr(default_value))
if errors.get(key) is not None:
lines.append(u' Error: %s' % errors[key])
print u'Settings: %s' % indent('\n'.join(lines), places=2)
sys.exit(0)

View File

@ -98,12 +98,12 @@ class BaseCurrentPlaylistControllerTest(object):
def test_get_by_uri_returns_unique_match(self):
track = Track(uri='a')
self.controller.load([Track(uri='z'), track, Track(uri='y')])
self.controller.append([Track(uri='z'), track, Track(uri='y')])
self.assertEqual(track, self.controller.get(uri='a')[1])
def test_get_by_uri_raises_error_if_multiple_matches(self):
track = Track(uri='a')
self.controller.load([Track(uri='z'), track, track])
self.controller.append([Track(uri='z'), track, track])
try:
self.controller.get(uri='a')
self.fail(u'Should raise LookupError if multiple matches')
@ -123,7 +123,7 @@ class BaseCurrentPlaylistControllerTest(object):
track1 = Track(uri='a', name='x')
track2 = Track(uri='b', name='x')
track3 = Track(uri='b', name='y')
self.controller.load([track1, track2, track3])
self.controller.append([track1, track2, track3])
self.assertEqual(track1, self.controller.get(uri='a', name='x')[1])
self.assertEqual(track2, self.controller.get(uri='b', name='x')[1])
self.assertEqual(track3, self.controller.get(uri='b', name='y')[1])
@ -132,35 +132,35 @@ class BaseCurrentPlaylistControllerTest(object):
track1 = Track()
track2 = Track(uri='b')
track3 = Track()
self.controller.load([track1, track2, track3])
self.controller.append([track1, track2, track3])
self.assertEqual(track2, self.controller.get(uri='b')[1])
def test_load_appends_to_the_current_playlist(self):
self.controller.load([Track(uri='a'), Track(uri='b')])
def test_append_appends_to_the_current_playlist(self):
self.controller.append([Track(uri='a'), Track(uri='b')])
self.assertEqual(len(self.controller.tracks), 2)
self.controller.load([Track(uri='c'), Track(uri='d')])
self.controller.append([Track(uri='c'), Track(uri='d')])
self.assertEqual(len(self.controller.tracks), 4)
self.assertEqual(self.controller.tracks[0].uri, 'a')
self.assertEqual(self.controller.tracks[1].uri, 'b')
self.assertEqual(self.controller.tracks[2].uri, 'c')
self.assertEqual(self.controller.tracks[3].uri, 'd')
def test_load_does_not_reset_version(self):
def test_append_does_not_reset_version(self):
version = self.controller.version
self.controller.load([])
self.controller.append([])
self.assertEqual(self.controller.version, version + 1)
@populate_playlist
def test_load_preserves_playing_state(self):
def test_append_preserves_playing_state(self):
self.playback.play()
track = self.playback.current_track
self.controller.load(self.controller.tracks[1:2])
self.controller.append(self.controller.tracks[1:2])
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(self.playback.current_track, track)
@populate_playlist
def test_load_preserves_stopped_state(self):
self.controller.load(self.controller.tracks[1:2])
def test_append_preserves_stopped_state(self):
self.controller.append(self.controller.tracks[1:2])
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@ -271,7 +271,7 @@ class BaseCurrentPlaylistControllerTest(object):
def test_version(self):
version = self.controller.version
self.controller.load([])
self.controller.append([])
self.assert_(version < self.controller.version)
@ -376,7 +376,7 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_current_track_after_completed_playlist(self):
self.playback.play(self.current_playlist.cp_tracks[-1])
self.playback.end_of_track_callback()
self.playback.on_end_of_track()
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@ -385,6 +385,56 @@ class BasePlaybackControllerTest(object):
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@populate_playlist
def test_previous(self):
self.playback.play()
self.playback.next()
self.playback.previous()
self.assertEqual(self.playback.current_track, self.tracks[0])
@populate_playlist
def test_previous_more(self):
self.playback.play() # At track 0
self.playback.next() # At track 1
self.playback.next() # At track 2
self.playback.previous() # At track 1
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_previous_return_value(self):
self.playback.play()
self.playback.next()
self.assertEqual(self.playback.previous(), None)
@populate_playlist
def test_previous_does_not_trigger_playback(self):
self.playback.play()
self.playback.next()
self.playback.stop()
self.playback.previous()
self.assertEqual(self.playback.state, self.playback.STOPPED)
@populate_playlist
def test_previous_at_start_of_playlist(self):
self.playback.previous()
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
def test_previous_for_empty_playlist(self):
self.playback.previous()
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@populate_playlist
def test_previous_skips_to_previous_track_on_failure(self):
# If _play() returns False, it is a failure.
self.playback._play = lambda track: track != self.tracks[1]
self.playback.play(self.current_playlist.cp_tracks[2])
self.assertEqual(self.playback.current_track, self.tracks[2])
self.playback.previous()
self.assertNotEqual(self.playback.current_track, self.tracks[1])
self.assertEqual(self.playback.current_track, self.tracks[0])
@populate_playlist
def test_next(self):
self.playback.play()
@ -449,81 +499,31 @@ class BasePlaybackControllerTest(object):
self.assertNotEqual(self.playback.current_track, self.tracks[1])
self.assertEqual(self.playback.current_track, self.tracks[2])
@populate_playlist
def test_previous(self):
self.playback.play()
self.playback.next()
self.playback.previous()
self.assertEqual(self.playback.current_track, self.tracks[0])
@populate_playlist
def test_previous_more(self):
self.playback.play() # At track 0
self.playback.next() # At track 1
self.playback.next() # At track 2
self.playback.previous() # At track 1
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_previous_return_value(self):
self.playback.play()
self.playback.next()
self.assertEqual(self.playback.previous(), None)
@populate_playlist
def test_previous_does_not_trigger_playback(self):
self.playback.play()
self.playback.next()
self.playback.stop()
self.playback.previous()
self.assertEqual(self.playback.state, self.playback.STOPPED)
@populate_playlist
def test_previous_at_start_of_playlist(self):
self.playback.previous()
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
def test_previous_for_empty_playlist(self):
self.playback.previous()
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@populate_playlist
def test_previous_skips_to_previous_track_on_failure(self):
# If _play() returns False, it is a failure.
self.playback._play = lambda track: track != self.tracks[1]
self.playback.play(self.current_playlist.cp_tracks[2])
self.assertEqual(self.playback.current_track, self.tracks[2])
self.playback.previous()
self.assertNotEqual(self.playback.current_track, self.tracks[1])
self.assertEqual(self.playback.current_track, self.tracks[0])
@populate_playlist
def test_next_track_before_play(self):
self.assertEqual(self.playback.next_track, self.tracks[0])
self.assertEqual(self.playback.track_at_next, self.tracks[0])
@populate_playlist
def test_next_track_during_play(self):
self.playback.play()
self.assertEqual(self.playback.next_track, self.tracks[1])
self.assertEqual(self.playback.track_at_next, self.tracks[1])
@populate_playlist
def test_next_track_after_previous(self):
self.playback.play()
self.playback.next()
self.playback.previous()
self.assertEqual(self.playback.next_track, self.tracks[1])
self.assertEqual(self.playback.track_at_next, self.tracks[1])
def test_next_track_empty_playlist(self):
self.assertEqual(self.playback.next_track, None)
self.assertEqual(self.playback.track_at_next, None)
@populate_playlist
def test_next_track_at_end_of_playlist(self):
self.playback.play()
for track in self.current_playlist.cp_tracks[1:]:
self.playback.next()
self.assertEqual(self.playback.next_track, None)
self.assertEqual(self.playback.track_at_next, None)
@populate_playlist
def test_next_track_at_end_of_playlist_with_repeat(self):
@ -531,28 +531,197 @@ class BasePlaybackControllerTest(object):
self.playback.play()
for track in self.tracks[1:]:
self.playback.next()
self.assertEqual(self.playback.next_track, self.tracks[0])
self.assertEqual(self.playback.track_at_next, self.tracks[0])
@populate_playlist
def test_next_track_with_random(self):
random.seed(1)
self.playback.random = True
self.assertEqual(self.playback.next_track, self.tracks[2])
self.assertEqual(self.playback.track_at_next, self.tracks[2])
@populate_playlist
def test_next_with_consume(self):
self.playback.consume = True
self.playback.play()
self.playback.next()
self.assert_(self.tracks[0] in self.backend.current_playlist.tracks)
@populate_playlist
def test_next_with_single_and_repeat(self):
self.playback.single = True
self.playback.repeat = True
self.playback.play()
self.playback.next()
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_next_with_random(self):
# FIXME feels very fragile
random.seed(1)
self.playback.random = True
self.playback.play()
self.playback.next()
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_next_track_with_random_after_append_playlist(self):
random.seed(1)
self.playback.random = True
self.assertEqual(self.playback.track_at_next, self.tracks[2])
self.backend.current_playlist.append(self.tracks[:1])
self.assertEqual(self.playback.track_at_next, self.tracks[1])
@populate_playlist
def test_end_of_track(self):
self.playback.play()
old_position = self.playback.current_playlist_position
old_uri = self.playback.current_track.uri
self.playback.on_end_of_track()
self.assertEqual(self.playback.current_playlist_position,
old_position+1)
self.assertNotEqual(self.playback.current_track.uri, old_uri)
@populate_playlist
def test_end_of_track_return_value(self):
self.playback.play()
self.assertEqual(self.playback.on_end_of_track(), None)
@populate_playlist
def test_end_of_track_does_not_trigger_playback(self):
self.playback.on_end_of_track()
self.assertEqual(self.playback.state, self.playback.STOPPED)
@populate_playlist
def test_end_of_track_at_end_of_playlist(self):
self.playback.play()
for i, track in enumerate(self.tracks):
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(self.playback.current_track, track)
self.assertEqual(self.playback.current_playlist_position, i)
self.playback.on_end_of_track()
self.assertEqual(self.playback.state, self.playback.STOPPED)
@populate_playlist
def test_end_of_track_until_end_of_playlist_and_play_from_start(self):
self.playback.play()
for track in self.tracks:
self.playback.on_end_of_track()
self.assertEqual(self.playback.current_track, None)
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(self.playback.current_track, self.tracks[0])
def test_end_of_track_for_empty_playlist(self):
self.playback.on_end_of_track()
self.assertEqual(self.playback.state, self.playback.STOPPED)
@populate_playlist
def test_end_of_track_skips_to_next_track_on_failure(self):
# If _play() returns False, it is a failure.
self.playback._play = lambda track: track != self.tracks[1]
self.playback.play()
self.assertEqual(self.playback.current_track, self.tracks[0])
self.playback.on_end_of_track()
self.assertNotEqual(self.playback.current_track, self.tracks[1])
self.assertEqual(self.playback.current_track, self.tracks[2])
@populate_playlist
def test_end_of_track_track_before_play(self):
self.assertEqual(self.playback.track_at_next, self.tracks[0])
@populate_playlist
def test_end_of_track_track_during_play(self):
self.playback.play()
self.assertEqual(self.playback.track_at_next, self.tracks[1])
@populate_playlist
def test_end_of_track_track_after_previous(self):
self.playback.play()
self.playback.on_end_of_track()
self.playback.previous()
self.assertEqual(self.playback.track_at_next, self.tracks[1])
def test_end_of_track_track_empty_playlist(self):
self.assertEqual(self.playback.track_at_next, None)
@populate_playlist
def test_end_of_track_track_at_end_of_playlist(self):
self.playback.play()
for track in self.current_playlist.cp_tracks[1:]:
self.playback.on_end_of_track()
self.assertEqual(self.playback.track_at_next, None)
@populate_playlist
def test_end_of_track_track_at_end_of_playlist_with_repeat(self):
self.playback.repeat = True
self.playback.play()
for track in self.tracks[1:]:
self.playback.on_end_of_track()
self.assertEqual(self.playback.track_at_next, self.tracks[0])
@populate_playlist
def test_end_of_track_track_with_random(self):
random.seed(1)
self.playback.random = True
self.assertEqual(self.playback.track_at_next, self.tracks[2])
@populate_playlist
def test_end_of_track_with_consume(self):
self.playback.consume = True
self.playback.play()
self.playback.on_end_of_track()
self.assert_(self.tracks[0] not in self.backend.current_playlist.tracks)
@populate_playlist
def test_end_of_track_with_single_and_repeat(self):
self.playback.single = True
self.playback.repeat = True
self.playback.play()
self.playback.on_end_of_track()
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_end_of_track_with_random(self):
# FIXME feels very fragile
random.seed(1)
self.playback.random = True
self.playback.play()
self.playback.on_end_of_track()
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_end_of_track_track_with_random_after_append_playlist(self):
random.seed(1)
self.playback.random = True
self.assertEqual(self.playback.track_at_next, self.tracks[2])
self.backend.current_playlist.append(self.tracks[:1])
self.assertEqual(self.playback.track_at_next, self.tracks[1])
@populate_playlist
def test_previous_track_before_play(self):
self.assertEqual(self.playback.previous_track, None)
self.assertEqual(self.playback.track_at_previous, None)
@populate_playlist
def test_previous_track_after_play(self):
self.playback.play()
self.assertEqual(self.playback.previous_track, None)
self.assertEqual(self.playback.track_at_previous, None)
@populate_playlist
def test_previous_track_after_next(self):
self.playback.play()
self.playback.next()
self.assertEqual(self.playback.previous_track, self.tracks[0])
self.assertEqual(self.playback.track_at_previous, self.tracks[0])
@populate_playlist
def test_previous_track_after_previous(self):
@ -560,17 +729,17 @@ class BasePlaybackControllerTest(object):
self.playback.next() # At track 1
self.playback.next() # At track 2
self.playback.previous() # At track 1
self.assertEqual(self.playback.previous_track, self.tracks[0])
self.assertEqual(self.playback.track_at_previous, self.tracks[0])
def test_previous_track_empty_playlist(self):
self.assertEqual(self.playback.previous_track, None)
self.assertEqual(self.playback.track_at_previous, None)
@populate_playlist
def test_previous_track_with_consume(self):
self.playback.consume = True
for track in self.tracks:
self.playback.next()
self.assertEqual(self.playback.previous_track,
self.assertEqual(self.playback.track_at_previous,
self.playback.current_track)
@populate_playlist
@ -578,7 +747,7 @@ class BasePlaybackControllerTest(object):
self.playback.random = True
for track in self.tracks:
self.playback.next()
self.assertEqual(self.playback.previous_track,
self.assertEqual(self.playback.track_at_previous,
self.playback.current_track)
@populate_playlist
@ -614,7 +783,7 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_current_playlist_position_at_end_of_playlist(self):
self.playback.play(self.current_playlist.cp_tracks[-1])
self.playback.end_of_track_callback()
self.playback.on_end_of_track()
self.assertEqual(self.playback.current_playlist_position, None)
def test_on_current_playlist_change_gets_called(self):
@ -626,7 +795,7 @@ class BasePlaybackControllerTest(object):
wrapper.called = False
self.playback.on_current_playlist_change = wrapper
self.backend.current_playlist.load([])
self.backend.current_playlist.append([])
self.assert_(wrapper.called)
@ -641,14 +810,14 @@ class BasePlaybackControllerTest(object):
def test_on_current_playlist_change_when_playing(self):
self.playback.play()
current_track = self.playback.current_track
self.backend.current_playlist.load([self.tracks[2]])
self.backend.current_playlist.append([self.tracks[2]])
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(self.playback.current_track, current_track)
@populate_playlist
def test_on_current_playlist_change_when_stopped(self):
current_track = self.playback.current_track
self.backend.current_playlist.load([self.tracks[2]])
self.backend.current_playlist.append([self.tracks[2]])
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.assertEqual(self.playback.current_track, None)
@ -657,7 +826,7 @@ class BasePlaybackControllerTest(object):
self.playback.play()
self.playback.pause()
current_track = self.playback.current_track
self.backend.current_playlist.load([self.tracks[2]])
self.backend.current_playlist.append([self.tracks[2]])
self.assertEqual(self.playback.state, self.backend.playback.PAUSED)
self.assertEqual(self.playback.current_track, current_track)
@ -835,34 +1004,12 @@ class BasePlaybackControllerTest(object):
self.playback.play()
self.assertEqual(self.playback.current_track, self.tracks[0])
@populate_playlist
def test_next_with_consume(self):
self.playback.consume = True
self.playback.play()
self.playback.next()
self.assert_(self.tracks[0] in self.backend.current_playlist.tracks)
@populate_playlist
def test_end_of_track_with_consume(self):
self.playback.consume = True
self.playback.play()
self.playback.end_of_track_callback()
self.assert_(self.tracks[0] not in self.backend.current_playlist.tracks)
@populate_playlist
def test_next_with_single_and_repeat(self):
self.playback.single = True
self.playback.repeat = True
self.playback.play()
self.playback.next()
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_playlist_is_empty_after_all_tracks_are_played_with_consume(self):
self.playback.consume = True
self.playback.play()
for i in range(len(self.backend.current_playlist.tracks)):
self.playback.end_of_track_callback()
self.playback.on_end_of_track()
self.assertEqual(len(self.backend.current_playlist.tracks), 0)
@populate_playlist
@ -872,15 +1019,6 @@ class BasePlaybackControllerTest(object):
self.playback.play()
self.assertEqual(self.playback.current_track, self.tracks[2])
@populate_playlist
def test_next_with_random(self):
# FIXME feels very fragile
random.seed(1)
self.playback.random = True
self.playback.play()
self.playback.next()
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
def test_previous_with_random(self):
random.seed(1)
@ -894,7 +1032,7 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_end_of_song_starts_next_track(self):
self.playback.play()
self.playback.end_of_track_callback()
self.playback.on_end_of_track()
self.assertEqual(self.playback.current_track, self.tracks[1])
@populate_playlist
@ -902,13 +1040,13 @@ class BasePlaybackControllerTest(object):
self.playback.single = True
self.playback.repeat = True
self.playback.play()
self.playback.end_of_track_callback()
self.playback.on_end_of_track()
self.assertEqual(self.playback.current_track, self.tracks[0])
@populate_playlist
def test_end_of_playlist_stops(self):
self.playback.play(self.current_playlist.cp_tracks[-1])
self.playback.end_of_track_callback()
self.playback.on_end_of_track()
self.assertEqual(self.playback.state, self.playback.STOPPED)
def test_repeat_off_by_default(self):
@ -926,14 +1064,14 @@ class BasePlaybackControllerTest(object):
self.playback.play()
for track in self.tracks[1:]:
self.playback.next()
self.assertEqual(self.playback.next_track, None)
self.assertEqual(self.playback.track_at_next, None)
@populate_playlist
def test_random_until_end_of_playlist_and_play_from_start(self):
self.playback.repeat = True
for track in self.tracks:
self.playback.next()
self.assertNotEqual(self.playback.next_track, None)
self.assertNotEqual(self.playback.track_at_next, None)
self.assertEqual(self.playback.state, self.playback.STOPPED)
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
@ -945,15 +1083,7 @@ class BasePlaybackControllerTest(object):
self.playback.play()
for track in self.tracks:
self.playback.next()
self.assertNotEqual(self.playback.next_track, None)
@populate_playlist
def test_next_track_with_random_after_load_playlist(self):
random.seed(1)
self.playback.random = True
self.assertEqual(self.playback.next_track, self.tracks[2])
self.backend.current_playlist.load(self.tracks[:1])
self.assertEqual(self.playback.next_track, self.tracks[1])
self.assertNotEqual(self.playback.track_at_next, None)
@populate_playlist
def test_played_track_during_random_not_played_again(self):
@ -1052,21 +1182,6 @@ class BaseStoredPlaylistsControllerTest(object):
except LookupError as e:
self.assertEqual(u'"name=c" match no playlists', e[0])
def test_search_returns_empty_list(self):
self.assertEqual([], self.stored.search('test'))
def test_search_returns_playlist(self):
playlist = self.stored.create('test')
playlists = self.stored.search('test')
self.assert_(playlist in playlists)
def test_search_returns_mulitple_playlists(self):
playlist1 = self.stored.create('test')
playlist2 = self.stored.create('test2')
playlists = self.stored.search('test')
self.assert_(playlist1 in playlists)
self.assert_(playlist2 in playlists)
def test_lookup(self):
raise SkipTest

View File

View File

@ -11,7 +11,7 @@ from mopidy import settings
from mopidy.backends.local import LocalBackend
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Playlist, Track
from mopidy.utils import path_to_uri
from mopidy.utils.path import path_to_uri
from tests.backends.base import *
from tests import SkipTest, data_folder

View File

@ -1,96 +1,15 @@
#encoding: utf-8
# encoding: utf-8
import os
import sys
import shutil
import tempfile
import unittest
from mopidy.utils import *
from mopidy.utils.path import path_to_uri
from mopidy.backends.local.translator import parse_m3u, parse_mpd_tag_cache
from mopidy.models import Track, Artist, Album
from tests import SkipTest, data_folder
class GetClassTest(unittest.TestCase):
def test_loading_module_that_does_not_exist(self):
test = lambda: get_class('foo.bar.Baz')
self.assertRaises(ImportError, test)
def test_loading_class_that_does_not_exist(self):
test = lambda: get_class('unittest.FooBarBaz')
self.assertRaises(ImportError, test)
def test_import_error_message_contains_complete_class_path(self):
try:
get_class('foo.bar.Baz')
except ImportError as e:
self.assert_('foo.bar.Baz' in str(e))
def test_loading_existing_class(self):
cls = get_class('unittest.TestCase')
self.assertEqual(cls.__name__, 'TestCase')
class GetOrCreateFolderTest(unittest.TestCase):
def setUp(self):
self.parent = tempfile.mkdtemp()
def tearDown(self):
if os.path.isdir(self.parent):
shutil.rmtree(self.parent)
def test_creating_folder(self):
folder = os.path.join(self.parent, 'test')
self.assert_(not os.path.exists(folder))
self.assert_(not os.path.isdir(folder))
created = get_or_create_folder(folder)
self.assert_(os.path.exists(folder))
self.assert_(os.path.isdir(folder))
self.assertEqual(created, folder)
def test_creating_existing_folder(self):
created = get_or_create_folder(self.parent)
self.assert_(os.path.exists(self.parent))
self.assert_(os.path.isdir(self.parent))
self.assertEqual(created, self.parent)
def test_that_userfolder_is_expanded(self):
raise SkipTest # Not sure how to safely test this
class PathToFileURITest(unittest.TestCase):
def test_simple_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/WINDOWS/clock.avi')
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
else:
result = path_to_uri(u'/etc/fstab')
self.assertEqual(result, 'file:///etc/fstab')
def test_folder_and_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/WINDOWS/', u'clock.avi')
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
else:
result = path_to_uri(u'/etc', u'fstab')
self.assertEqual(result, u'file:///etc/fstab')
def test_space_in_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/test this')
self.assertEqual(result, 'file:///C://test%20this')
else:
result = path_to_uri(u'/tmp/test this')
self.assertEqual(result, u'file:///tmp/test%20this')
def test_unicode_in_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/æøå')
self.assertEqual(result, 'file:///C://%C3%A6%C3%B8%C3%A5')
else:
result = path_to_uri(u'/tmp/æøå')
self.assertEqual(result, u'file:///tmp/%C3%A6%C3%B8%C3%A5')
song1_path = data_folder('song1.mp3')
song2_path = data_folder('song2.mp3')
encoded_path = data_folder(u'æøå.mp3')
@ -98,7 +17,6 @@ song1_uri = path_to_uri(song1_path)
song2_uri = path_to_uri(song2_path)
encoded_uri = path_to_uri(encoded_path)
class M3UToUriTest(unittest.TestCase):
def test_empty_file(self):
uris = parse_m3u(data_folder('empty.m3u'))

View File

@ -6,8 +6,6 @@ from mopidy.frontends.mpd import frontend
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Track, Playlist
from tests import SkipTest
class StoredPlaylistsHandlerTest(unittest.TestCase):
def setUp(self):
self.b = DummyBackend(mixer_class=DummyMixer)

View File

@ -1,15 +1,15 @@
import multiprocessing
import unittest
from mopidy.utils import path_to_uri
from mopidy.process import pickle_connection
from mopidy.outputs.gstreamer import GStreamerOutput
from mopidy.process import pickle_connection
from mopidy.utils.path import path_to_uri
from tests import data_folder, SkipTest
class GStreamerOutputTest(unittest.TestCase):
def setUp(self):
self.song_uri = path_to_uri(data_folder('song1.wav'))
self.song_uri = path_to_uri(data_folder('song1.wav'))
self.output_queue = multiprocessing.Queue()
self.core_queue = multiprocessing.Queue()
self.output = GStreamerOutput(self.core_queue, self.output_queue)

0
tests/utils/__init__.py Normal file
View File

22
tests/utils/init_test.py Normal file
View File

@ -0,0 +1,22 @@
import unittest
from mopidy.utils import get_class
class GetClassTest(unittest.TestCase):
def test_loading_module_that_does_not_exist(self):
test = lambda: get_class('foo.bar.Baz')
self.assertRaises(ImportError, test)
def test_loading_class_that_does_not_exist(self):
test = lambda: get_class('unittest.FooBarBaz')
self.assertRaises(ImportError, test)
def test_import_error_message_contains_complete_class_path(self):
try:
get_class('foo.bar.Baz')
except ImportError as e:
self.assert_('foo.bar.Baz' in str(e))
def test_loading_existing_class(self):
cls = get_class('unittest.TestCase')
self.assertEqual(cls.__name__, 'TestCase')

71
tests/utils/path_test.py Normal file
View File

@ -0,0 +1,71 @@
# encoding: utf-8
import os
import shutil
import sys
import tempfile
import unittest
from mopidy.utils.path import get_or_create_folder, path_to_uri
from tests import SkipTest
class GetOrCreateFolderTest(unittest.TestCase):
def setUp(self):
self.parent = tempfile.mkdtemp()
def tearDown(self):
if os.path.isdir(self.parent):
shutil.rmtree(self.parent)
def test_creating_folder(self):
folder = os.path.join(self.parent, 'test')
self.assert_(not os.path.exists(folder))
self.assert_(not os.path.isdir(folder))
created = get_or_create_folder(folder)
self.assert_(os.path.exists(folder))
self.assert_(os.path.isdir(folder))
self.assertEqual(created, folder)
def test_creating_existing_folder(self):
created = get_or_create_folder(self.parent)
self.assert_(os.path.exists(self.parent))
self.assert_(os.path.isdir(self.parent))
self.assertEqual(created, self.parent)
def test_that_userfolder_is_expanded(self):
raise SkipTest # Not sure how to safely test this
class PathToFileURITest(unittest.TestCase):
def test_simple_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/WINDOWS/clock.avi')
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
else:
result = path_to_uri(u'/etc/fstab')
self.assertEqual(result, 'file:///etc/fstab')
def test_folder_and_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/WINDOWS/', u'clock.avi')
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
else:
result = path_to_uri(u'/etc', u'fstab')
self.assertEqual(result, u'file:///etc/fstab')
def test_space_in_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/test this')
self.assertEqual(result, 'file:///C://test%20this')
else:
result = path_to_uri(u'/tmp/test this')
self.assertEqual(result, u'file:///tmp/test%20this')
def test_unicode_in_path(self):
if sys.platform == 'win32':
result = path_to_uri(u'C:/æøå')
self.assertEqual(result, 'file:///C://%C3%A6%C3%B8%C3%A5')
else:
result = path_to_uri(u'/tmp/æøå')
self.assertEqual(result, u'file:///tmp/%C3%A6%C3%B8%C3%A5')

View File

@ -0,0 +1,45 @@
import unittest
from mopidy.utils.settings import validate_settings
class ValidateSettingsTest(unittest.TestCase):
def setUp(self):
self.defaults = {
'MPD_SERVER_HOSTNAME': '::',
'MPD_SERVER_PORT': 6600,
}
def test_no_errors_yields_empty_dict(self):
result = validate_settings(self.defaults, {})
self.assertEqual(result, {})
def test_unknown_setting_returns_error(self):
result = validate_settings(self.defaults,
{'MPD_SERVER_HOSTNMAE': '127.0.0.1'})
self.assertEqual(result['MPD_SERVER_HOSTNMAE'],
u'Unknown setting. Is it misspelled?')
def test_not_renamed_setting_returns_error(self):
result = validate_settings(self.defaults,
{'SERVER_HOSTNAME': '127.0.0.1'})
self.assertEqual(result['SERVER_HOSTNAME'],
u'Deprecated setting. Use MPD_SERVER_HOSTNAME.')
def test_unneeded_settings_returns_error(self):
result = validate_settings(self.defaults,
{'SPOTIFY_LIB_APPKEY': '/tmp/foo'})
self.assertEqual(result['SPOTIFY_LIB_APPKEY'],
u'Deprecated setting. It may be removed.')
def test_deprecated_setting_value_returns_error(self):
result = validate_settings(self.defaults,
{'BACKENDS': ('mopidy.backends.despotify.DespotifyBackend',)})
self.assertEqual(result['BACKENDS'],
u'Deprecated setting value. ' +
'"mopidy.backends.despotify.DespotifyBackend" is no longer ' +
'available.')
def test_two_errors_are_both_reported(self):
result = validate_settings(self.defaults,
{'FOO': '', 'BAR': ''})
self.assertEquals(len(result), 2)