Merge pull request #221 from jodal/feature/stored-playlists-cleanup
Stored playlists cleanup and multibackend support
This commit is contained in:
commit
825136edef
@ -56,6 +56,24 @@ backends:
|
||||
dummy/mocked lower layers easier than with the old variant, where
|
||||
dependencies where looked up in Pykka's actor registry.
|
||||
|
||||
- The stored playlists part of the core API has been revised to be more focused
|
||||
around the playlist URI, and some redundant functionality has been removed:
|
||||
|
||||
- :attr:`mopidy.core.StoredPlaylistsController.playlists` no longer supports
|
||||
assignment to it. The `playlists` property on the backend layer still does,
|
||||
and all functionality is maintained by assigning to the playlists
|
||||
collections at the backend level.
|
||||
|
||||
- :meth:`mopidy.core.StoredPlaylistsController.delete` now accepts an URI,
|
||||
and not a playlist object.
|
||||
|
||||
- :meth:`mopidy.core.StoredPlaylistsController.save` now returns the saved
|
||||
playlist. The returned playlist may differ from the saved playlist, and
|
||||
should thus be used instead of the playlist passed to ``save()``.
|
||||
|
||||
- :meth:`mopidy.core.StoredPlaylistsController.rename` has been removed,
|
||||
since renaming can be done with ``save()``.
|
||||
|
||||
**Changes**
|
||||
|
||||
- Made the :mod:`NAD mixer <mopidy.audio.mixers.nad>` responsive to interrupts
|
||||
|
||||
@ -182,7 +182,7 @@ class BaseStoredPlaylistsProvider(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def delete(self, playlist):
|
||||
def delete(self, uri):
|
||||
"""
|
||||
See :meth:`mopidy.core.StoredPlaylistsController.delete`.
|
||||
|
||||
@ -206,14 +206,6 @@ class BaseStoredPlaylistsProvider(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def rename(self, playlist, new_name):
|
||||
"""
|
||||
See :meth:`mopidy.core.StoredPlaylistsController.rename`.
|
||||
|
||||
*MUST be implemented by subclass.*
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def save(self, playlist):
|
||||
"""
|
||||
See :meth:`mopidy.core.StoredPlaylistsController.save`.
|
||||
|
||||
@ -6,80 +6,111 @@ import shutil
|
||||
from mopidy import settings
|
||||
from mopidy.backends import base
|
||||
from mopidy.models import Playlist
|
||||
from mopidy.utils import formatting, path
|
||||
|
||||
from .translator import parse_m3u
|
||||
|
||||
|
||||
logger = logging.getLogger(u'mopidy.backends.local')
|
||||
|
||||
|
||||
class LocalStoredPlaylistsProvider(base.BaseStoredPlaylistsProvider):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(LocalStoredPlaylistsProvider, self).__init__(*args, **kwargs)
|
||||
self._folder = settings.LOCAL_PLAYLIST_PATH
|
||||
self._path = settings.LOCAL_PLAYLIST_PATH
|
||||
self.refresh()
|
||||
|
||||
def create(self, name):
|
||||
name = formatting.slugify(name)
|
||||
uri = path.path_to_uri(self._get_m3u_path(name))
|
||||
playlist = Playlist(uri=uri, name=name)
|
||||
return self.save(playlist)
|
||||
|
||||
def delete(self, uri):
|
||||
playlist = self.lookup(uri)
|
||||
if not playlist:
|
||||
return
|
||||
|
||||
self._playlists.remove(playlist)
|
||||
self._delete_m3u(playlist.uri)
|
||||
|
||||
def lookup(self, uri):
|
||||
pass # TODO
|
||||
for playlist in self._playlists:
|
||||
if playlist.uri == uri:
|
||||
return playlist
|
||||
|
||||
def refresh(self):
|
||||
logger.info('Loading playlists from %s', self._path)
|
||||
|
||||
playlists = []
|
||||
|
||||
logger.info('Loading playlists from %s', self._folder)
|
||||
|
||||
for m3u in glob.glob(os.path.join(self._folder, '*.m3u')):
|
||||
for m3u in glob.glob(os.path.join(self._path, '*.m3u')):
|
||||
uri = path.path_to_uri(m3u)
|
||||
name = os.path.splitext(os.path.basename(m3u))[0]
|
||||
|
||||
tracks = []
|
||||
for uri in parse_m3u(m3u, settings.LOCAL_MUSIC_PATH):
|
||||
for track_uri in parse_m3u(m3u, settings.LOCAL_MUSIC_PATH):
|
||||
try:
|
||||
tracks.append(self.backend.library.lookup(uri))
|
||||
# TODO We must use core.library.lookup() to support tracks
|
||||
# from other backends
|
||||
tracks.append(self.backend.library.lookup(track_uri))
|
||||
except LookupError as ex:
|
||||
logger.error('Playlist item could not be added: %s', ex)
|
||||
playlist = Playlist(tracks=tracks, name=name)
|
||||
|
||||
# FIXME playlist name needs better handling
|
||||
# FIXME tracks should come from lib. lookup
|
||||
|
||||
playlist = Playlist(uri=uri, name=name, tracks=tracks)
|
||||
playlists.append(playlist)
|
||||
|
||||
self.playlists = playlists
|
||||
|
||||
def create(self, name):
|
||||
playlist = Playlist(name=name)
|
||||
self.save(playlist)
|
||||
def save(self, playlist):
|
||||
assert playlist.uri, 'Cannot save playlist without URI'
|
||||
|
||||
old_playlist = self.lookup(playlist.uri)
|
||||
|
||||
if old_playlist and playlist.name != old_playlist.name:
|
||||
playlist = playlist.copy(name=formatting.slugify(playlist.name))
|
||||
playlist = self._rename_m3u(playlist)
|
||||
|
||||
self._save_m3u(playlist)
|
||||
|
||||
if old_playlist is not None:
|
||||
index = self._playlists.index(old_playlist)
|
||||
self._playlists[index] = playlist
|
||||
else:
|
||||
self._playlists.append(playlist)
|
||||
|
||||
return playlist
|
||||
|
||||
def delete(self, playlist):
|
||||
if playlist not in self._playlists:
|
||||
return
|
||||
def _get_m3u_path(self, name):
|
||||
name = formatting.slugify(name)
|
||||
file_path = os.path.join(self._path, name + '.m3u')
|
||||
path.check_file_path_is_inside_base_dir(file_path, self._path)
|
||||
return file_path
|
||||
|
||||
self._playlists.remove(playlist)
|
||||
filename = os.path.join(self._folder, playlist.name + '.m3u')
|
||||
|
||||
if os.path.exists(filename):
|
||||
os.remove(filename)
|
||||
|
||||
def rename(self, playlist, name):
|
||||
if playlist not in self._playlists:
|
||||
return
|
||||
|
||||
src = os.path.join(self._folder, playlist.name + '.m3u')
|
||||
dst = os.path.join(self._folder, name + '.m3u')
|
||||
|
||||
renamed = playlist.copy(name=name)
|
||||
index = self._playlists.index(playlist)
|
||||
self._playlists[index] = renamed
|
||||
|
||||
shutil.move(src, dst)
|
||||
|
||||
def save(self, playlist):
|
||||
file_path = os.path.join(self._folder, playlist.name + '.m3u')
|
||||
|
||||
# FIXME this should be a save_m3u function, not inside save
|
||||
def _save_m3u(self, playlist):
|
||||
file_path = path.uri_to_path(playlist.uri)
|
||||
path.check_file_path_is_inside_base_dir(file_path, self._path)
|
||||
with open(file_path, 'w') as file_handle:
|
||||
for track in playlist.tracks:
|
||||
if track.uri.startswith('file://'):
|
||||
file_handle.write(track.uri[len('file://'):] + '\n')
|
||||
uri = path.uri_to_path(track.uri)
|
||||
else:
|
||||
file_handle.write(track.uri + '\n')
|
||||
uri = track.uri
|
||||
file_handle.write(uri + '\n')
|
||||
|
||||
self._playlists.append(playlist)
|
||||
def _delete_m3u(self, uri):
|
||||
file_path = path.uri_to_path(uri)
|
||||
path.check_file_path_is_inside_base_dir(file_path, self._path)
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
|
||||
def _rename_m3u(self, playlist):
|
||||
src_file_path = path.uri_to_path(playlist.uri)
|
||||
path.check_file_path_is_inside_base_dir(src_file_path, self._path)
|
||||
|
||||
dst_file_path = self._get_m3u_path(playlist.name)
|
||||
path.check_file_path_is_inside_base_dir(dst_file_path, self._path)
|
||||
|
||||
shutil.move(src_file_path, dst_file_path)
|
||||
|
||||
return playlist.copy(uri=path.path_to_uri(dst_file_path))
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import itertools
|
||||
import urlparse
|
||||
|
||||
import pykka
|
||||
|
||||
@ -15,37 +16,50 @@ class StoredPlaylistsController(object):
|
||||
"""
|
||||
Currently stored playlists.
|
||||
|
||||
Read/write. List of :class:`mopidy.models.Playlist`.
|
||||
Read-only. List of :class:`mopidy.models.Playlist`.
|
||||
"""
|
||||
futures = [b.stored_playlists.playlists for b in self.backends]
|
||||
results = pykka.get_all(futures)
|
||||
return list(itertools.chain(*results))
|
||||
|
||||
@playlists.setter # noqa
|
||||
def playlists(self, playlists):
|
||||
# TODO Support multiple backends
|
||||
self.backends[0].stored_playlists.playlists = playlists
|
||||
|
||||
def create(self, name):
|
||||
def create(self, name, uri_scheme=None):
|
||||
"""
|
||||
Create a new playlist.
|
||||
|
||||
If ``uri_scheme`` matches an URI scheme handled by a current backend,
|
||||
that backend is asked to create the playlist. If ``uri_scheme`` is
|
||||
:class:`None` or doesn't match a current backend, the first backend is
|
||||
asked to create the playlist.
|
||||
|
||||
All new playlists should be created by calling this method, and **not**
|
||||
by creating new instances of :class:`mopidy.models.Playlist`.
|
||||
|
||||
:param name: name of the new playlist
|
||||
:type name: string
|
||||
:param uri_scheme: use the backend matching the URI scheme
|
||||
:type uri_scheme: string
|
||||
:rtype: :class:`mopidy.models.Playlist`
|
||||
"""
|
||||
# TODO Support multiple backends
|
||||
return self.backends[0].stored_playlists.create(name).get()
|
||||
if uri_scheme in self.backends.by_uri_scheme:
|
||||
backend = self.backends.by_uri_scheme[uri_scheme]
|
||||
else:
|
||||
backend = self.backends[0]
|
||||
return backend.stored_playlists.create(name).get()
|
||||
|
||||
def delete(self, playlist):
|
||||
def delete(self, uri):
|
||||
"""
|
||||
Delete playlist.
|
||||
Delete playlist identified by the URI.
|
||||
|
||||
:param playlist: the playlist to delete
|
||||
:type playlist: :class:`mopidy.models.Playlist`
|
||||
If the URI doesn't match the URI schemes handled by the current
|
||||
backends, nothing happens.
|
||||
|
||||
:param uri: URI of the playlist to delete
|
||||
:type uri: string
|
||||
"""
|
||||
# TODO Support multiple backends
|
||||
return self.backends[0].stored_playlists.delete(playlist).get()
|
||||
uri_scheme = urlparse.urlparse(uri).scheme
|
||||
if uri_scheme in self.backends.by_uri_scheme:
|
||||
backend = self.backends.by_uri_scheme[uri_scheme]
|
||||
backend.stored_playlists.delete(uri).get()
|
||||
|
||||
def get(self, **criteria):
|
||||
"""
|
||||
@ -80,41 +94,65 @@ class StoredPlaylistsController(object):
|
||||
def lookup(self, uri):
|
||||
"""
|
||||
Lookup playlist with given URI in both the set of stored playlists and
|
||||
in any other playlist sources.
|
||||
in any other playlist sources. Returns :class:`None` if not found.
|
||||
|
||||
:param uri: playlist URI
|
||||
:type uri: string
|
||||
:rtype: :class:`mopidy.models.Playlist`
|
||||
:rtype: :class:`mopidy.models.Playlist` or :class:`None`
|
||||
"""
|
||||
# TODO Support multiple backends
|
||||
return self.backends[0].stored_playlists.lookup(uri).get()
|
||||
uri_scheme = urlparse.urlparse(uri).scheme
|
||||
backend = self.backends.by_uri_scheme.get(uri_scheme, None)
|
||||
if backend:
|
||||
return backend.stored_playlists.lookup(uri).get()
|
||||
else:
|
||||
return None
|
||||
|
||||
def refresh(self):
|
||||
def refresh(self, uri_scheme=None):
|
||||
"""
|
||||
Refresh the stored playlists in :attr:`playlists`.
|
||||
"""
|
||||
# TODO Support multiple backends
|
||||
return self.backends[0].stored_playlists.refresh().get()
|
||||
|
||||
def rename(self, playlist, new_name):
|
||||
"""
|
||||
Rename playlist.
|
||||
If ``uri_scheme`` is :class:`None`, all backends are asked to refresh.
|
||||
If ``uri_scheme`` is an URI scheme handled by a backend, only that
|
||||
backend is asked to refresh. If ``uri_scheme`` doesn't match any
|
||||
current backend, nothing happens.
|
||||
|
||||
:param playlist: the playlist
|
||||
:type playlist: :class:`mopidy.models.Playlist`
|
||||
:param new_name: the new name
|
||||
:type new_name: string
|
||||
:param uri_scheme: limit to the backend matching the URI scheme
|
||||
:type uri_scheme: string
|
||||
"""
|
||||
# TODO Support multiple backends
|
||||
return self.backends[0].stored_playlists.rename(
|
||||
playlist, new_name).get()
|
||||
if uri_scheme is None:
|
||||
futures = [b.stored_playlists.refresh() for b in self.backends]
|
||||
pykka.get_all(futures)
|
||||
else:
|
||||
if uri_scheme in self.backends.by_uri_scheme:
|
||||
backend = self.backends.by_uri_scheme[uri_scheme]
|
||||
backend.stored_playlists.refresh().get()
|
||||
|
||||
def save(self, playlist):
|
||||
"""
|
||||
Save the playlist to the set of stored playlists.
|
||||
|
||||
For a playlist to be saveable, it must have the ``uri`` attribute set.
|
||||
You should not set the ``uri`` atribute yourself, but use playlist
|
||||
objects returned by :meth:`create` or retrieved from :attr:`playlists`,
|
||||
which will always give you saveable playlists.
|
||||
|
||||
The method returns the saved playlist. The return playlist may differ
|
||||
from the saved playlist. E.g. if the playlist name was changed, the
|
||||
returned playlist may have a different URI. The caller of this method
|
||||
should throw away the playlist sent to this method, and use the
|
||||
returned playlist instead.
|
||||
|
||||
If the playlist's URI isn't set or doesn't match the URI scheme of a
|
||||
current backend, nothing is done and :class:`None` is returned.
|
||||
|
||||
:param playlist: the playlist
|
||||
:type playlist: :class:`mopidy.models.Playlist`
|
||||
:rtype: :class:`mopidy.models.Playlist` or :class:`None`
|
||||
"""
|
||||
# TODO Support multiple backends
|
||||
return self.backends[0].stored_playlists.save(playlist).get()
|
||||
if playlist.uri is None:
|
||||
return
|
||||
uri_scheme = urlparse.urlparse(playlist.uri).scheme
|
||||
if uri_scheme not in self.backends.by_uri_scheme:
|
||||
return
|
||||
backend = self.backends.by_uri_scheme[uri_scheme]
|
||||
return backend.stored_playlists.save(playlist).get()
|
||||
|
||||
@ -1,3 +1,7 @@
|
||||
import re
|
||||
import unicodedata
|
||||
|
||||
|
||||
def indent(string, places=4, linebreak='\n'):
|
||||
lines = string.split(linebreak)
|
||||
if len(lines) == 1:
|
||||
@ -6,3 +10,17 @@ def indent(string, places=4, linebreak='\n'):
|
||||
for line in lines:
|
||||
result += linebreak + ' ' * places + line
|
||||
return result
|
||||
|
||||
|
||||
def slugify(value):
|
||||
"""
|
||||
Converts to lowercase, removes non-word characters (alphanumerics and
|
||||
underscores) and converts spaces to hyphens. Also strips leading and
|
||||
trailing whitespace.
|
||||
|
||||
This function is based on Django's slugify implementation.
|
||||
"""
|
||||
value = unicodedata.normalize('NFKD', value)
|
||||
value = value.encode('ascii', 'ignore').decode('ascii')
|
||||
value = re.sub(r'[^\w\s-]', '', value).strip().lower()
|
||||
return re.sub(r'[-\s]+', '-', value)
|
||||
|
||||
@ -102,6 +102,25 @@ def find_files(path):
|
||||
yield filename
|
||||
|
||||
|
||||
def check_file_path_is_inside_base_dir(file_path, base_path):
|
||||
assert not file_path.endswith(os.sep), (
|
||||
'File path %s cannot end with a path separator' % file_path)
|
||||
|
||||
# Expand symlinks
|
||||
real_base_path = os.path.realpath(base_path)
|
||||
real_file_path = os.path.realpath(file_path)
|
||||
|
||||
# Use dir of file for prefix comparision, so we don't accept
|
||||
# /tmp/foo.m3u as being inside /tmp/foo, simply because they have a
|
||||
# common prefix, /tmp/foo, which matches the base path, /tmp/foo.
|
||||
real_dir_path = os.path.dirname(real_file_path)
|
||||
|
||||
# Check if dir of file is the base path or a subdir
|
||||
common_prefix = os.path.commonprefix([real_base_path, real_dir_path])
|
||||
assert common_prefix == real_base_path, (
|
||||
'File path %s must be in %s' % (real_file_path, real_base_path))
|
||||
|
||||
|
||||
# FIXME replace with mock usage in tests.
|
||||
class Mtime(object):
|
||||
def __init__(self):
|
||||
|
||||
@ -30,12 +30,16 @@ class StoredPlaylistsControllerTest(object):
|
||||
|
||||
settings.runtime.clear()
|
||||
|
||||
def test_create(self):
|
||||
playlist = self.stored.create('test')
|
||||
def test_create_returns_playlist_with_name_set(self):
|
||||
playlist = self.stored.create(u'test')
|
||||
self.assertEqual(playlist.name, 'test')
|
||||
|
||||
def test_create_in_playlists(self):
|
||||
playlist = self.stored.create('test')
|
||||
def test_create_returns_playlist_with_uri_set(self):
|
||||
playlist = self.stored.create(u'test')
|
||||
self.assert_(playlist.uri)
|
||||
|
||||
def test_create_adds_playlist_to_playlists_collection(self):
|
||||
playlist = self.stored.create(u'test')
|
||||
self.assert_(self.stored.playlists)
|
||||
self.assertIn(playlist, self.stored.playlists)
|
||||
|
||||
@ -43,12 +47,15 @@ class StoredPlaylistsControllerTest(object):
|
||||
self.assert_(not self.stored.playlists)
|
||||
|
||||
def test_delete_non_existant_playlist(self):
|
||||
self.stored.delete(Playlist())
|
||||
self.stored.delete('file:///unknown/playlist')
|
||||
|
||||
def test_delete_playlist(self):
|
||||
playlist = self.stored.create('test')
|
||||
self.stored.delete(playlist)
|
||||
self.assert_(not self.stored.playlists)
|
||||
def test_delete_playlist_removes_it_from_the_collection(self):
|
||||
playlist = self.stored.create(u'test')
|
||||
self.assertIn(playlist, self.stored.playlists)
|
||||
|
||||
self.stored.delete(playlist.uri)
|
||||
|
||||
self.assertNotIn(playlist, self.stored.playlists)
|
||||
|
||||
def test_get_without_criteria(self):
|
||||
test = self.stored.get
|
||||
@ -59,18 +66,19 @@ class StoredPlaylistsControllerTest(object):
|
||||
self.assertRaises(LookupError, test)
|
||||
|
||||
def test_get_with_right_criteria(self):
|
||||
playlist1 = self.stored.create('test')
|
||||
playlist1 = self.stored.create(u'test')
|
||||
playlist2 = self.stored.get(name='test')
|
||||
self.assertEqual(playlist1, playlist2)
|
||||
|
||||
def test_get_by_name_returns_unique_match(self):
|
||||
playlist = Playlist(name='b')
|
||||
self.stored.playlists = [Playlist(name='a'), playlist]
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='a'), playlist]
|
||||
self.assertEqual(playlist, self.stored.get(name='b'))
|
||||
|
||||
def test_get_by_name_returns_first_of_multiple_matches(self):
|
||||
playlist = Playlist(name='b')
|
||||
self.stored.playlists = [
|
||||
self.backend.stored_playlists.playlists = [
|
||||
playlist, Playlist(name='a'), Playlist(name='b')]
|
||||
try:
|
||||
self.stored.get(name='b')
|
||||
@ -79,36 +87,33 @@ class StoredPlaylistsControllerTest(object):
|
||||
self.assertEqual(u'"name=b" match multiple playlists', e[0])
|
||||
|
||||
def test_get_by_name_raises_keyerror_if_no_match(self):
|
||||
self.stored.playlists = [Playlist(name='a'), Playlist(name='b')]
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='a'), Playlist(name='b')]
|
||||
try:
|
||||
self.stored.get(name='c')
|
||||
self.fail(u'Should raise LookupError if no match')
|
||||
except LookupError as e:
|
||||
self.assertEqual(u'"name=c" match no playlists', e[0])
|
||||
|
||||
@unittest.SkipTest
|
||||
def test_lookup(self):
|
||||
pass
|
||||
def test_lookup_finds_playlist_by_uri(self):
|
||||
original_playlist = self.stored.create(u'test')
|
||||
|
||||
looked_up_playlist = self.stored.lookup(original_playlist.uri)
|
||||
|
||||
self.assertEqual(original_playlist, looked_up_playlist)
|
||||
|
||||
@unittest.SkipTest
|
||||
def test_refresh(self):
|
||||
pass
|
||||
|
||||
def test_rename(self):
|
||||
playlist = self.stored.create('test')
|
||||
self.stored.rename(playlist, 'test2')
|
||||
self.stored.get(name='test2')
|
||||
def test_save_replaces_stored_playlist_with_updated_playlist(self):
|
||||
playlist1 = self.stored.create(u'test1')
|
||||
self.assertIn(playlist1, self.stored.playlists)
|
||||
|
||||
def test_rename_unknown_playlist(self):
|
||||
self.stored.rename(Playlist(), 'test2')
|
||||
test = lambda: self.stored.get(name='test2')
|
||||
self.assertRaises(LookupError, test)
|
||||
|
||||
def test_save(self):
|
||||
# FIXME should we handle playlists without names?
|
||||
playlist = Playlist(name='test')
|
||||
self.stored.save(playlist)
|
||||
self.assertIn(playlist, self.stored.playlists)
|
||||
playlist2 = playlist1.copy(name=u'test2')
|
||||
playlist2 = self.stored.save(playlist2)
|
||||
self.assertNotIn(playlist1, self.stored.playlists)
|
||||
self.assertIn(playlist2, self.stored.playlists)
|
||||
|
||||
@unittest.SkipTest
|
||||
def test_playlist_with_unknown_track(self):
|
||||
|
||||
@ -2,7 +2,7 @@ import os
|
||||
|
||||
from mopidy import settings
|
||||
from mopidy.backends.local import LocalBackend
|
||||
from mopidy.models import Playlist, Track
|
||||
from mopidy.models import Track
|
||||
from mopidy.utils.path import path_to_uri
|
||||
|
||||
from tests import unittest, path_to_data_dir
|
||||
@ -18,53 +18,80 @@ class LocalStoredPlaylistsControllerTest(
|
||||
|
||||
def test_created_playlist_is_persisted(self):
|
||||
path = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test.m3u')
|
||||
self.assert_(not os.path.exists(path))
|
||||
self.stored.create('test')
|
||||
self.assert_(os.path.exists(path))
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
self.stored.create(u'test')
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
def test_create_slugifies_playlist_name(self):
|
||||
path = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test-foo-bar.m3u')
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
playlist = self.stored.create(u'test FOO baR')
|
||||
self.assertEqual(u'test-foo-bar', playlist.name)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
def test_create_slugifies_names_which_tries_to_change_directory(self):
|
||||
path = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test-foo-bar.m3u')
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
playlist = self.stored.create(u'../../test FOO baR')
|
||||
self.assertEqual(u'test-foo-bar', playlist.name)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
def test_saved_playlist_is_persisted(self):
|
||||
path = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test2.m3u')
|
||||
self.assert_(not os.path.exists(path))
|
||||
self.stored.save(Playlist(name='test2'))
|
||||
self.assert_(os.path.exists(path))
|
||||
path1 = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test1.m3u')
|
||||
path2 = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test2-foo-bar.m3u')
|
||||
|
||||
playlist = self.stored.create(u'test1')
|
||||
|
||||
self.assertTrue(os.path.exists(path1))
|
||||
self.assertFalse(os.path.exists(path2))
|
||||
|
||||
playlist = playlist.copy(name=u'test2 FOO baR')
|
||||
playlist = self.stored.save(playlist)
|
||||
|
||||
self.assertEqual(u'test2-foo-bar', playlist.name)
|
||||
self.assertFalse(os.path.exists(path1))
|
||||
self.assertTrue(os.path.exists(path2))
|
||||
|
||||
def test_deleted_playlist_is_removed(self):
|
||||
playlist = self.stored.create('test')
|
||||
self.stored.delete(playlist)
|
||||
path = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test.m3u')
|
||||
self.assert_(not os.path.exists(path))
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
def test_renamed_playlist_is_moved(self):
|
||||
playlist = self.stored.create('test')
|
||||
file1 = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test.m3u')
|
||||
file2 = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test2.m3u')
|
||||
self.assert_(not os.path.exists(file2))
|
||||
self.stored.rename(playlist, 'test2')
|
||||
self.assert_(not os.path.exists(file1))
|
||||
self.assert_(os.path.exists(file2))
|
||||
playlist = self.stored.create(u'test')
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
self.stored.delete(playlist.uri)
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
def test_playlist_contents_is_written_to_disk(self):
|
||||
track = Track(uri=generate_song(1))
|
||||
uri = track.uri[len('file://'):]
|
||||
playlist = Playlist(tracks=[track], name='test')
|
||||
path = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test.m3u')
|
||||
track_path = track.uri[len('file://'):]
|
||||
playlist = self.stored.create(u'test')
|
||||
playlist_path = playlist.uri[len('file://'):]
|
||||
playlist = playlist.copy(tracks=[track])
|
||||
playlist = self.stored.save(playlist)
|
||||
|
||||
self.stored.save(playlist)
|
||||
|
||||
with open(path) as playlist_file:
|
||||
with open(playlist_path) as playlist_file:
|
||||
contents = playlist_file.read()
|
||||
|
||||
self.assertEqual(uri, contents.strip())
|
||||
self.assertEqual(track_path, contents.strip())
|
||||
|
||||
def test_playlists_are_loaded_at_startup(self):
|
||||
playlist_path = os.path.join(settings.LOCAL_PLAYLIST_PATH, 'test.m3u')
|
||||
|
||||
track = Track(uri=path_to_uri(path_to_data_dir('uri2')))
|
||||
playlist = self.stored.create('test')
|
||||
playlist = self.stored.create(u'test')
|
||||
playlist = playlist.copy(tracks=[track])
|
||||
self.stored.save(playlist)
|
||||
playlist = self.stored.save(playlist)
|
||||
|
||||
backend = self.backend_class(audio=self.audio)
|
||||
|
||||
self.assert_(backend.stored_playlists.playlists)
|
||||
self.assertEqual(
|
||||
path_to_uri(playlist_path),
|
||||
backend.stored_playlists.playlists[0].uri)
|
||||
self.assertEqual(
|
||||
playlist.name, backend.stored_playlists.playlists[0].name)
|
||||
self.assertEqual(
|
||||
@ -77,11 +104,3 @@ class LocalStoredPlaylistsControllerTest(
|
||||
@unittest.SkipTest
|
||||
def test_playlist_folder_is_createad(self):
|
||||
pass
|
||||
|
||||
@unittest.SkipTest
|
||||
def test_create_sets_playlist_uri(self):
|
||||
pass
|
||||
|
||||
@unittest.SkipTest
|
||||
def test_save_sets_playlist_uri(self):
|
||||
pass
|
||||
|
||||
@ -37,5 +37,108 @@ class StoredPlaylistsTest(unittest.TestCase):
|
||||
self.assertIn(self.pl2a, result)
|
||||
self.assertIn(self.pl2b, result)
|
||||
|
||||
# TODO The rest of the stored playlists API is pending redesign before
|
||||
# we'll update it to support multiple backends.
|
||||
def test_create_without_uri_scheme_uses_first_backend(self):
|
||||
playlist = Playlist()
|
||||
self.sp1.create().get.return_value = playlist
|
||||
self.sp1.reset_mock()
|
||||
|
||||
result = self.core.stored_playlists.create('foo')
|
||||
|
||||
self.assertEqual(playlist, result)
|
||||
self.sp1.create.assert_called_once_with('foo')
|
||||
self.assertFalse(self.sp2.create.called)
|
||||
|
||||
def test_create_with_uri_scheme_selects_the_matching_backend(self):
|
||||
playlist = Playlist()
|
||||
self.sp2.create().get.return_value = playlist
|
||||
self.sp2.reset_mock()
|
||||
|
||||
result = self.core.stored_playlists.create('foo', uri_scheme='dummy2')
|
||||
|
||||
self.assertEqual(playlist, result)
|
||||
self.assertFalse(self.sp1.create.called)
|
||||
self.sp2.create.assert_called_once_with('foo')
|
||||
|
||||
def test_delete_selects_the_dummy1_backend(self):
|
||||
self.core.stored_playlists.delete('dummy1:a')
|
||||
|
||||
self.sp1.delete.assert_called_once_with('dummy1:a')
|
||||
self.assertFalse(self.sp2.delete.called)
|
||||
|
||||
def test_delete_selects_the_dummy2_backend(self):
|
||||
self.core.stored_playlists.delete('dummy2:a')
|
||||
|
||||
self.assertFalse(self.sp1.delete.called)
|
||||
self.sp2.delete.assert_called_once_with('dummy2:a')
|
||||
|
||||
def test_delete_with_unknown_uri_scheme_does_nothing(self):
|
||||
self.core.stored_playlists.delete('unknown:a')
|
||||
|
||||
self.assertFalse(self.sp1.delete.called)
|
||||
self.assertFalse(self.sp2.delete.called)
|
||||
|
||||
def test_lookup_selects_the_dummy1_backend(self):
|
||||
self.core.stored_playlists.lookup('dummy1:a')
|
||||
|
||||
self.sp1.lookup.assert_called_once_with('dummy1:a')
|
||||
self.assertFalse(self.sp2.lookup.called)
|
||||
|
||||
def test_lookup_selects_the_dummy2_backend(self):
|
||||
self.core.stored_playlists.lookup('dummy2:a')
|
||||
|
||||
self.assertFalse(self.sp1.lookup.called)
|
||||
self.sp2.lookup.assert_called_once_with('dummy2:a')
|
||||
|
||||
def test_refresh_without_uri_scheme_refreshes_all_backends(self):
|
||||
self.core.stored_playlists.refresh()
|
||||
|
||||
self.sp1.refresh.assert_called_once_with()
|
||||
self.sp2.refresh.assert_called_once_with()
|
||||
|
||||
def test_refresh_with_uri_scheme_refreshes_matching_backend(self):
|
||||
self.core.stored_playlists.refresh(uri_scheme='dummy2')
|
||||
|
||||
self.assertFalse(self.sp1.refresh.called)
|
||||
self.sp2.refresh.assert_called_once_with()
|
||||
|
||||
def test_refresh_with_unknown_uri_scheme_refreshes_nothing(self):
|
||||
self.core.stored_playlists.refresh(uri_scheme='foobar')
|
||||
|
||||
self.assertFalse(self.sp1.refresh.called)
|
||||
self.assertFalse(self.sp2.refresh.called)
|
||||
|
||||
def test_save_selects_the_dummy1_backend(self):
|
||||
playlist = Playlist(uri='dummy1:a')
|
||||
self.sp1.save().get.return_value = playlist
|
||||
self.sp1.reset_mock()
|
||||
|
||||
result = self.core.stored_playlists.save(playlist)
|
||||
|
||||
self.assertEqual(playlist, result)
|
||||
self.sp1.save.assert_called_once_with(playlist)
|
||||
self.assertFalse(self.sp2.save.called)
|
||||
|
||||
def test_save_selects_the_dummy2_backend(self):
|
||||
playlist = Playlist(uri='dummy2:a')
|
||||
self.sp2.save().get.return_value = playlist
|
||||
self.sp2.reset_mock()
|
||||
|
||||
result = self.core.stored_playlists.save(playlist)
|
||||
|
||||
self.assertEqual(playlist, result)
|
||||
self.assertFalse(self.sp1.save.called)
|
||||
self.sp2.save.assert_called_once_with(playlist)
|
||||
|
||||
def test_save_does_nothing_if_playlist_uri_is_unset(self):
|
||||
result = self.core.stored_playlists.save(Playlist())
|
||||
|
||||
self.assertIsNone(result)
|
||||
self.assertFalse(self.sp1.save.called)
|
||||
self.assertFalse(self.sp2.save.called)
|
||||
|
||||
def test_save_does_nothing_if_playlist_uri_has_unknown_scheme(self):
|
||||
result = self.core.stored_playlists.save(Playlist(uri='foobar:a'))
|
||||
|
||||
self.assertIsNone(result)
|
||||
self.assertFalse(self.sp1.save.called)
|
||||
self.assertFalse(self.sp2.save.called)
|
||||
|
||||
@ -7,7 +7,7 @@ from tests.frontends.mpd import protocol
|
||||
|
||||
class StoredPlaylistsHandlerTest(protocol.BaseTestCase):
|
||||
def test_listplaylist(self):
|
||||
self.core.stored_playlists.playlists = [
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='name', tracks=[Track(uri='file:///dev/urandom')])]
|
||||
|
||||
self.sendRequest(u'listplaylist "name"')
|
||||
@ -15,7 +15,7 @@ class StoredPlaylistsHandlerTest(protocol.BaseTestCase):
|
||||
self.assertInResponse(u'OK')
|
||||
|
||||
def test_listplaylist_without_quotes(self):
|
||||
self.core.stored_playlists.playlists = [
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='name', tracks=[Track(uri='file:///dev/urandom')])]
|
||||
|
||||
self.sendRequest(u'listplaylist name')
|
||||
@ -27,7 +27,7 @@ class StoredPlaylistsHandlerTest(protocol.BaseTestCase):
|
||||
self.assertEqualResponse(u'ACK [50@0] {listplaylist} No such playlist')
|
||||
|
||||
def test_listplaylistinfo(self):
|
||||
self.core.stored_playlists.playlists = [
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='name', tracks=[Track(uri='file:///dev/urandom')])]
|
||||
|
||||
self.sendRequest(u'listplaylistinfo "name"')
|
||||
@ -37,7 +37,7 @@ class StoredPlaylistsHandlerTest(protocol.BaseTestCase):
|
||||
self.assertInResponse(u'OK')
|
||||
|
||||
def test_listplaylistinfo_without_quotes(self):
|
||||
self.core.stored_playlists.playlists = [
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='name', tracks=[Track(uri='file:///dev/urandom')])]
|
||||
|
||||
self.sendRequest(u'listplaylistinfo name')
|
||||
@ -53,7 +53,7 @@ class StoredPlaylistsHandlerTest(protocol.BaseTestCase):
|
||||
|
||||
def test_listplaylists(self):
|
||||
last_modified = datetime.datetime(2001, 3, 17, 13, 41, 17, 12345)
|
||||
self.core.stored_playlists.playlists = [
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='a', last_modified=last_modified)]
|
||||
|
||||
self.sendRequest(u'listplaylists')
|
||||
@ -65,7 +65,7 @@ class StoredPlaylistsHandlerTest(protocol.BaseTestCase):
|
||||
def test_load_known_playlist_appends_to_current_playlist(self):
|
||||
self.core.current_playlist.append([Track(uri='a'), Track(uri='b')])
|
||||
self.assertEqual(len(self.core.current_playlist.tracks.get()), 2)
|
||||
self.core.stored_playlists.playlists = [
|
||||
self.backend.stored_playlists.playlists = [
|
||||
Playlist(name='A-list', tracks=[
|
||||
Track(uri='c'), Track(uri='d'), Track(uri='e')])]
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user