Merge branch 'feature/local-json' into develop

This commit is contained in:
Stein Magnus Jodal 2013-12-04 23:30:16 +01:00
commit d7fc04e354
57 changed files with 663 additions and 1405 deletions

View File

@ -4,10 +4,27 @@ Changelog
This changelog is used to track all major changes to Mopidy.
v0.18.0 (UNRELEASED)
====================
**Pluggable local libraries**
Fixes issues :issue:`44`, partially resolves :issue:`397`, and causes
a temporary regression of :issue:`527`.
- Finished the work on creating pluggable libraries. Users can now
reconfigure Mopidy to use alternate library providers of their choosing
for local files.
- Switched default local library provider from "tag cache" to JSON. This
greatly simplifies our library code and reuses our existing serialization
code.
- Killed our outdated and bug-ridden "tag cache" implementation.
- Added support for deprecated config values in order to allow for
graceful removal of :confval:`local/tag_cache_file`.
**Internal changes**
- Events from the audio actor, backends, and core actor are now emitted

View File

@ -66,11 +66,8 @@ Sample session::
-OK
+ACK [2@0] {listallinfo} incorrect arguments
To ensure that Mopidy and MPD have comparable state it is suggested you setup
both to use ``tests/data/advanced_tag_cache`` for their tag cache and
``tests/data/scanner/advanced/`` for the music folder and ``tests/data`` for
playlists.
To ensure that Mopidy and MPD have comparable state it is suggested you scan
the same media directory with both servers.
Documentation writing
=====================

View File

@ -43,10 +43,6 @@ Configuration values
Path to playlists directory with m3u files for local media.
.. confval:: local/tag_cache_file
Path to tag cache for local media.
.. confval:: local/scan_timeout
Number of milliseconds before giving up scanning a file and moving on to
@ -63,29 +59,65 @@ Usage
If you want use Mopidy to play music you have locally at your machine, you need
to review and maybe change some of the local extension config values. See above
for a complete list. Then you need to generate a tag cache for your local
for a complete list. Then you need to generate a local library for your local
music...
.. _generating-a-tag-cache:
.. _generating-a-local-library:
Generating a tag cache
----------------------
Generating a local library
--------------------------
The command :command:`mopidy local scan` will scan the path set in the
:confval:`local/media_dir` config value for any media files and build a MPD
compatible ``tag_cache``.
:confval:`local/media_dir` config value for any audio files and build a
library.
To make a ``tag_cache`` of your local music available for Mopidy:
To make a local library for your music available for Mopidy:
#. Ensure that the :confval:`local/media_dir` config value points to where your
music is located. Check the current setting by running::
mopidy config
#. Scan your media library. The command writes the ``tag_cache`` to
the :confval:`local/tag_cache_file`::
#. Scan your media library.::
mopidy local scan
#. Start Mopidy, find the music library in a client, and play some local music!
Pluggable library support
-------------------------
Local libraries are fully pluggable. What this means is that users may opt to
disable the current default library ``local-json``, replacing it with a third
party one. When running :command:`mopidy local scan` mopidy will populate
whatever the current active library is with data. Only one library may be
active at a time.
*****************
Mopidy-Local-JSON
*****************
Extension for storing local music library in a JSON file, default built in
library for local files.
Default configuration
=====================
.. literalinclude:: ../../mopidy/backends/local/json/ext.conf
:language: ini
Configuration values
====================
.. confval:: local-json/enabled
If the local-json extension should be enabled or not.
.. confval:: local-json/json_file
Path to a file to store the gzipped JSON data in.

View File

@ -309,6 +309,10 @@ This is ``mopidy_soundspot/__init__.py``::
from .commands import SoundspotCommand
return SoundspotCommand()
def get_library_updaters(self):
from .library import SoundspotLibraryUpdateProvider
return [SoundspotLibraryUpdateProvider]
def register_gstreamer_elements(self):
from .mixer import SoundspotMixer
gobject.type_register(SoundspotMixer)
@ -406,6 +410,27 @@ more details.
return 0
Example library provider
========================
Currently library providers are only really relevant for people who want to
replace the default local library. Providing this in addition to a backend that
exposes a library for the `local` uri scheme lets you plug in whatever storage
solution you happen to prefer.
::
from mopidy.backends import base
class SoundspotLibraryUpdateProvider(base.BaseLibraryProvider):
def __init__(self, config):
super(SoundspotLibraryUpdateProvider, self).__init__(config)
self.config = config
# Your library provider implementation here.
Example GStreamer element
=========================

View File

@ -1,9 +1,13 @@
from __future__ import unicode_literals
import logging
import os
import mopidy
from mopidy import config, ext
from mopidy.utils import encoding, path
logger = logging.getLogger('mopidy.backends.local')
class Extension(ext.Extension):
@ -20,23 +24,23 @@ class Extension(ext.Extension):
schema = super(Extension, self).get_config_schema()
schema['media_dir'] = config.Path()
schema['playlists_dir'] = config.Path()
schema['tag_cache_file'] = config.Path()
schema['tag_cache_file'] = config.Deprecated()
schema['scan_timeout'] = config.Integer(
minimum=1000, maximum=1000*60*60)
schema['excluded_file_extensions'] = config.List(optional=True)
return schema
def validate_environment(self):
pass
try:
path.get_or_create_dir(b'$XDG_DATA_DIR/mopidy/local')
except EnvironmentError as error:
error = encoding.locale_decode(error)
logger.warning('Could not create local data dir: %s', error)
def get_backend_classes(self):
from .actor import LocalBackend
return [LocalBackend]
def get_library_updaters(self):
from .library import LocalLibraryUpdateProvider
return [LocalLibraryUpdateProvider]
def get_command(self):
from .commands import LocalCommand
return LocalCommand()

View File

@ -8,7 +8,6 @@ import pykka
from mopidy.backends import base
from mopidy.utils import encoding, path
from .library import LocalLibraryProvider
from .playlists import LocalPlaylistsProvider
from .playback import LocalPlaybackProvider
@ -23,7 +22,6 @@ class LocalBackend(pykka.ThreadingActor, base.Backend):
self.check_dirs_and_files()
self.library = LocalLibraryProvider(backend=self)
self.playback = LocalPlaybackProvider(audio=audio, backend=self)
self.playlists = LocalPlaylistsProvider(backend=self)
@ -40,10 +38,3 @@ class LocalBackend(pykka.ThreadingActor, base.Backend):
logger.warning(
'Could not create local playlists dir: %s',
encoding.locale_decode(error))
try:
path.get_or_create_file(self.config['local']['tag_cache_file'])
except EnvironmentError as error:
logger.warning(
'Could not create empty tag cache file: %s',
encoding.locale_decode(error))

View File

@ -44,49 +44,51 @@ class ScanCommand(commands.Command):
local_updater = updaters.values()[0](config)
# TODO: cleanup to consistently use local urls, not a random mix of
# local and file uris depending on how the data was loaded.
uris_library = set()
uris_update = set()
uris_remove = set()
uri_path_mapping = {}
uris_in_library = set()
uris_to_update = set()
uris_to_remove = set()
tracks = local_updater.load()
logger.info('Checking %d tracks from library.', len(tracks))
for track in tracks:
uri_path_mapping[track.uri] = translator.local_track_uri_to_path(
track.uri, media_dir)
try:
uri = translator.local_to_file_uri(track.uri, media_dir)
stat = os.stat(path.uri_to_path(uri))
stat = os.stat(uri_path_mapping[track.uri])
if int(stat.st_mtime) > track.last_modified:
uris_update.add(uri)
uris_library.add(uri)
uris_to_update.add(track.uri)
uris_in_library.add(track.uri)
except OSError:
logger.debug('Missing file %s', track.uri)
uris_remove.add(track.uri)
uris_to_remove.add(track.uri)
logger.info('Removing %d missing tracks.', len(uris_remove))
for uri in uris_remove:
logger.info('Removing %d missing tracks.', len(uris_to_remove))
for uri in uris_to_remove:
local_updater.remove(uri)
logger.info('Checking %s for unknown tracks.', media_dir)
for uri in path.find_uris(media_dir):
file_extension = os.path.splitext(path.uri_to_path(uri))[1]
for relpath in path.find_files(media_dir):
file_extension = os.path.splitext(relpath)[1]
if file_extension.lower() in excluded_file_extensions:
logger.debug('Skipped %s: File extension excluded.', uri)
continue
if uri not in uris_library:
uris_update.add(uri)
uri = translator.path_to_local_track_uri(relpath)
if uri not in uris_in_library:
uris_to_update.add(uri)
uri_path_mapping[uri] = os.path.join(media_dir, relpath)
logger.info('Found %d unknown tracks.', len(uris_update))
logger.info('Found %d unknown tracks.', len(uris_to_update))
logger.info('Scanning...')
scanner = scan.Scanner(scan_timeout)
progress = Progress(len(uris_update))
progress = Progress(len(uris_to_update))
for uri in sorted(uris_update):
for uri in sorted(uris_to_update):
try:
data = scanner.scan(uri)
track = scan.audio_data_to_track(data)
data = scanner.scan(path.path_to_uri(uri_path_mapping[uri]))
track = scan.audio_data_to_track(data).copy(uri=uri)
local_updater.add(track)
logger.debug('Added %s', track.uri)
except exceptions.ScannerError as error:

View File

@ -2,7 +2,6 @@
enabled = true
media_dir = $XDG_MUSIC_DIR
playlists_dir = $XDG_DATA_DIR/mopidy/local/playlists
tag_cache_file = $XDG_DATA_DIR/mopidy/local/tag_cache
scan_timeout = 1000
excluded_file_extensions =
.html

View File

@ -0,0 +1,30 @@
from __future__ import unicode_literals
import os
import mopidy
from mopidy import config, ext
class Extension(ext.Extension):
dist_name = 'Mopidy-Local-JSON'
ext_name = 'local-json'
version = mopidy.__version__
def get_default_config(self):
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
return config.read(conf_file)
def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema['json_file'] = config.Path()
return schema
def get_backend_classes(self):
from .actor import LocalJsonBackend
return [LocalJsonBackend]
def get_library_updaters(self):
from .library import LocalJsonLibraryUpdateProvider
return [LocalJsonLibraryUpdateProvider]

View File

@ -0,0 +1,30 @@
from __future__ import unicode_literals
import logging
import os
import pykka
from mopidy.backends import base
from mopidy.utils import encoding
from . import library
logger = logging.getLogger('mopidy.backends.local.json')
class LocalJsonBackend(pykka.ThreadingActor, base.Backend):
def __init__(self, config, audio):
super(LocalJsonBackend, self).__init__()
self.config = config
self.library = library.LocalJsonLibraryProvider(backend=self)
self.uri_schemes = ['local']
if not os.path.exists(config['local-json']['json_file']):
try:
library.write_library(config['local-json']['json_file'], {})
logger.info('Created empty local JSON library.')
except EnvironmentError as error:
error = encoding.locale_decode(error)
logger.warning('Could not create local library: %s', error)

View File

@ -0,0 +1,3 @@
[local-json]
enabled = true
json_file = $XDG_DATA_DIR/mopidy/local/library.json.gz

View File

@ -0,0 +1,108 @@
from __future__ import unicode_literals
import gzip
import json
import logging
import os
import tempfile
import mopidy
from mopidy import models
from mopidy.backends import base
from mopidy.backends.local import search
logger = logging.getLogger('mopidy.backends.local.json')
def load_library(json_file):
try:
with gzip.open(json_file, 'rb') as fp:
return json.load(fp, object_hook=models.model_json_decoder)
except (IOError, ValueError) as e:
logger.warning('Loading JSON local library failed: %s', e)
return {}
def write_library(json_file, data):
data['version'] = mopidy.__version__
directory, basename = os.path.split(json_file)
# TODO: cleanup directory/basename.* files.
tmp = tempfile.NamedTemporaryFile(
prefix=basename + '.', dir=directory, delete=False)
try:
with gzip.GzipFile(fileobj=tmp, mode='wb') as fp:
json.dump(data, fp, cls=models.ModelJSONEncoder,
indent=2, separators=(',', ': '))
os.rename(tmp.name, json_file)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
class LocalJsonLibraryProvider(base.BaseLibraryProvider):
def __init__(self, *args, **kwargs):
super(LocalJsonLibraryProvider, self).__init__(*args, **kwargs)
self._uri_mapping = {}
self._media_dir = self.backend.config['local']['media_dir']
self._json_file = self.backend.config['local-json']['json_file']
self.refresh()
def refresh(self, uri=None):
logger.debug(
'Loading local tracks from %s using %s',
self._media_dir, self._json_file)
tracks = load_library(self._json_file).get('tracks', [])
uris_to_remove = set(self._uri_mapping)
for track in tracks:
self._uri_mapping[track.uri] = track
uris_to_remove.discard(track.uri)
for uri in uris_to_remove:
del self._uri_mapping[uri]
logger.info(
'Loaded %d local tracks from %s using %s',
len(tracks), self._media_dir, self._json_file)
def lookup(self, uri):
try:
return [self._uri_mapping[uri]]
except KeyError:
logger.debug('Failed to lookup %r', uri)
return []
def find_exact(self, query=None, uris=None):
tracks = self._uri_mapping.values()
return search.find_exact(tracks, query=query, uris=uris)
def search(self, query=None, uris=None):
tracks = self._uri_mapping.values()
return search.search(tracks, query=query, uris=uris)
class LocalJsonLibraryUpdateProvider(base.BaseLibraryProvider):
uri_schemes = ['local']
def __init__(self, config):
self._tracks = {}
self._media_dir = config['local']['media_dir']
self._json_file = config['local-json']['json_file']
def load(self):
for track in load_library(self._json_file).get('tracks', []):
self._tracks[track.uri] = track
return self._tracks.values()
def add(self, track):
self._tracks[track.uri] = track
def remove(self, uri):
if uri in self._tracks:
del self._tracks[uri]
def commit(self):
write_library(self._json_file, {'tracks': self._tracks.values()})

View File

@ -1,265 +0,0 @@
from __future__ import unicode_literals
import logging
import os
import tempfile
from mopidy.backends import base
from mopidy.frontends.mpd import translator as mpd_translator
from mopidy.models import Album, SearchResult
from .translator import local_to_file_uri, parse_mpd_tag_cache
logger = logging.getLogger('mopidy.backends.local')
class LocalLibraryProvider(base.BaseLibraryProvider):
def __init__(self, *args, **kwargs):
super(LocalLibraryProvider, self).__init__(*args, **kwargs)
self._uri_mapping = {}
self._media_dir = self.backend.config['local']['media_dir']
self._tag_cache_file = self.backend.config['local']['tag_cache_file']
self.refresh()
def _convert_to_int(self, string):
try:
return int(string)
except ValueError:
return object()
def refresh(self, uri=None):
logger.debug(
'Loading local tracks from %s using %s',
self._media_dir, self._tag_cache_file)
tracks = parse_mpd_tag_cache(self._tag_cache_file, self._media_dir)
uris_to_remove = set(self._uri_mapping)
for track in tracks:
self._uri_mapping[track.uri] = track
uris_to_remove.discard(track.uri)
for uri in uris_to_remove:
del self._uri_mapping[uri]
logger.info(
'Loaded %d local tracks from %s using %s',
len(tracks), self._media_dir, self._tag_cache_file)
def lookup(self, uri):
try:
return [self._uri_mapping[uri]]
except KeyError:
logger.debug('Failed to lookup %r', uri)
return []
def find_exact(self, query=None, uris=None):
# TODO Only return results within URI roots given by ``uris``
if query is None:
query = {}
self._validate_query(query)
result_tracks = self._uri_mapping.values()
for (field, values) in query.iteritems():
if not hasattr(values, '__iter__'):
values = [values]
# FIXME this is bound to be slow for large libraries
for value in values:
if field == 'track_no':
q = self._convert_to_int(value)
else:
q = value.strip()
uri_filter = lambda t: q == t.uri
track_name_filter = lambda t: q == t.name
album_filter = lambda t: q == getattr(t, 'album', Album()).name
artist_filter = lambda t: filter(
lambda a: q == a.name, t.artists)
albumartist_filter = lambda t: any([
q == a.name
for a in getattr(t.album, 'artists', [])])
composer_filter = lambda t: any([
q == a.name
for a in getattr(t, 'composers', [])])
performer_filter = lambda t: any([
q == a.name
for a in getattr(t, 'performers', [])])
track_no_filter = lambda t: q == t.track_no
genre_filter = lambda t: t.genre and q == t.genre
date_filter = lambda t: q == t.date
comment_filter = lambda t: q == t.comment
any_filter = lambda t: (
uri_filter(t) or
track_name_filter(t) or
album_filter(t) or
artist_filter(t) or
albumartist_filter(t) or
composer_filter(t) or
performer_filter(t) or
track_no_filter(t) or
genre_filter(t) or
date_filter(t) or
comment_filter(t))
if field == 'uri':
result_tracks = filter(uri_filter, result_tracks)
elif field == 'track_name':
result_tracks = filter(track_name_filter, result_tracks)
elif field == 'album':
result_tracks = filter(album_filter, result_tracks)
elif field == 'artist':
result_tracks = filter(artist_filter, result_tracks)
elif field == 'albumartist':
result_tracks = filter(albumartist_filter, result_tracks)
elif field == 'composer':
result_tracks = filter(composer_filter, result_tracks)
elif field == 'performer':
result_tracks = filter(performer_filter, result_tracks)
elif field == 'track_no':
result_tracks = filter(track_no_filter, result_tracks)
elif field == 'genre':
result_tracks = filter(genre_filter, result_tracks)
elif field == 'date':
result_tracks = filter(date_filter, result_tracks)
elif field == 'comment':
result_tracks = filter(comment_filter, result_tracks)
elif field == 'any':
result_tracks = filter(any_filter, result_tracks)
else:
raise LookupError('Invalid lookup field: %s' % field)
# TODO: add local:search:<query>
return SearchResult(uri='local:search', tracks=result_tracks)
def search(self, query=None, uris=None):
# TODO Only return results within URI roots given by ``uris``
if query is None:
query = {}
self._validate_query(query)
result_tracks = self._uri_mapping.values()
for (field, values) in query.iteritems():
if not hasattr(values, '__iter__'):
values = [values]
# FIXME this is bound to be slow for large libraries
for value in values:
if field == 'track_no':
q = self._convert_to_int(value)
else:
q = value.strip().lower()
uri_filter = lambda t: q in t.uri.lower()
track_name_filter = lambda t: q in t.name.lower()
album_filter = lambda t: q in getattr(
t, 'album', Album()).name.lower()
artist_filter = lambda t: filter(
lambda a: q in a.name.lower(), t.artists)
albumartist_filter = lambda t: any([
q in a.name.lower()
for a in getattr(t.album, 'artists', [])])
composer_filter = lambda t: any([
q in a.name.lower()
for a in getattr(t, 'composers', [])])
performer_filter = lambda t: any([
q in a.name.lower()
for a in getattr(t, 'performers', [])])
track_no_filter = lambda t: q == t.track_no
genre_filter = lambda t: t.genre and q in t.genre.lower()
date_filter = lambda t: t.date and t.date.startswith(q)
comment_filter = lambda t: t.comment and q in t.comment.lower()
any_filter = lambda t: (
uri_filter(t) or
track_name_filter(t) or
album_filter(t) or
artist_filter(t) or
albumartist_filter(t) or
composer_filter(t) or
performer_filter(t) or
track_no_filter(t) or
genre_filter(t) or
date_filter(t) or
comment_filter(t))
if field == 'uri':
result_tracks = filter(uri_filter, result_tracks)
elif field == 'track_name':
result_tracks = filter(track_name_filter, result_tracks)
elif field == 'album':
result_tracks = filter(album_filter, result_tracks)
elif field == 'artist':
result_tracks = filter(artist_filter, result_tracks)
elif field == 'albumartist':
result_tracks = filter(albumartist_filter, result_tracks)
elif field == 'composer':
result_tracks = filter(composer_filter, result_tracks)
elif field == 'performer':
result_tracks = filter(performer_filter, result_tracks)
elif field == 'track_no':
result_tracks = filter(track_no_filter, result_tracks)
elif field == 'genre':
result_tracks = filter(genre_filter, result_tracks)
elif field == 'date':
result_tracks = filter(date_filter, result_tracks)
elif field == 'comment':
result_tracks = filter(comment_filter, result_tracks)
elif field == 'any':
result_tracks = filter(any_filter, result_tracks)
else:
raise LookupError('Invalid lookup field: %s' % field)
# TODO: add local:search:<query>
return SearchResult(uri='local:search', tracks=result_tracks)
def _validate_query(self, query):
for (_, values) in query.iteritems():
if not values:
raise LookupError('Missing query')
for value in values:
if not value:
raise LookupError('Missing query')
# TODO: rename and move to tagcache extension.
class LocalLibraryUpdateProvider(base.BaseLibraryProvider):
uri_schemes = ['local']
def __init__(self, config):
self._tracks = {}
self._media_dir = config['local']['media_dir']
self._tag_cache_file = config['local']['tag_cache_file']
def load(self):
tracks = parse_mpd_tag_cache(self._tag_cache_file, self._media_dir)
for track in tracks:
# TODO: this should use uris as is, i.e. hack that should go away
# with tag caches.
uri = local_to_file_uri(track.uri, self._media_dir)
self._tracks[uri] = track.copy(uri=uri)
return tracks
def add(self, track):
self._tracks[track.uri] = track
def remove(self, uri):
if uri in self._tracks:
del self._tracks[uri]
def commit(self):
directory, basename = os.path.split(self._tag_cache_file)
# TODO: cleanup directory/basename.* files.
tmp = tempfile.NamedTemporaryFile(
prefix=basename + '.', dir=directory, delete=False)
try:
for row in mpd_translator.tracks_to_tag_cache_format(
self._tracks.values(), self._media_dir):
if len(row) == 1:
tmp.write(('%s\n' % row).encode('utf-8'))
else:
tmp.write(('%s: %s\n' % row).encode('utf-8'))
os.rename(tmp.name, self._tag_cache_file)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)

View File

@ -11,7 +11,6 @@ logger = logging.getLogger('mopidy.backends.local')
class LocalPlaybackProvider(base.BasePlaybackProvider):
def change_track(self, track):
media_dir = self.backend.config['local']['media_dir']
uri = translator.local_to_file_uri(track.uri, media_dir)
track = track.copy(uri=uri)
track = track.copy(uri=translator.local_track_uri_to_file_uri(
track.uri, self.backend.config['local']['media_dir']))
return super(LocalPlaybackProvider, self).change_track(track)

View File

@ -51,11 +51,8 @@ class LocalPlaylistsProvider(base.BasePlaylistsProvider):
tracks = []
for track_uri in parse_m3u(m3u, self._media_dir):
result = self.backend.library.lookup(track_uri)
if result:
tracks += self.backend.library.lookup(track_uri)
else:
tracks.append(Track(uri=track_uri))
# TODO: switch to having playlists being a list of uris
tracks.append(Track(uri=track_uri))
playlist = Playlist(uri=uri, name=name, tracks=tracks)
playlists.append(playlist)

View File

@ -0,0 +1,179 @@
from __future__ import unicode_literals
from mopidy.models import Album, SearchResult
def find_exact(tracks, query=None, uris=None):
# TODO Only return results within URI roots given by ``uris``
if query is None:
query = {}
_validate_query(query)
for (field, values) in query.iteritems():
if not hasattr(values, '__iter__'):
values = [values]
# FIXME this is bound to be slow for large libraries
for value in values:
if field == 'track_no':
q = _convert_to_int(value)
else:
q = value.strip()
uri_filter = lambda t: q == t.uri
track_name_filter = lambda t: q == t.name
album_filter = lambda t: q == getattr(t, 'album', Album()).name
artist_filter = lambda t: filter(
lambda a: q == a.name, t.artists)
albumartist_filter = lambda t: any([
q == a.name
for a in getattr(t.album, 'artists', [])])
composer_filter = lambda t: any([
q == a.name
for a in getattr(t, 'composers', [])])
performer_filter = lambda t: any([
q == a.name
for a in getattr(t, 'performers', [])])
track_no_filter = lambda t: q == t.track_no
genre_filter = lambda t: t.genre and q == t.genre
date_filter = lambda t: q == t.date
comment_filter = lambda t: q == t.comment
any_filter = lambda t: (
uri_filter(t) or
track_name_filter(t) or
album_filter(t) or
artist_filter(t) or
albumartist_filter(t) or
composer_filter(t) or
performer_filter(t) or
track_no_filter(t) or
genre_filter(t) or
date_filter(t) or
comment_filter(t))
if field == 'uri':
tracks = filter(uri_filter, tracks)
elif field == 'track_name':
tracks = filter(track_name_filter, tracks)
elif field == 'album':
tracks = filter(album_filter, tracks)
elif field == 'artist':
tracks = filter(artist_filter, tracks)
elif field == 'albumartist':
tracks = filter(albumartist_filter, tracks)
elif field == 'composer':
tracks = filter(composer_filter, tracks)
elif field == 'performer':
tracks = filter(performer_filter, tracks)
elif field == 'track_no':
tracks = filter(track_no_filter, tracks)
elif field == 'genre':
tracks = filter(genre_filter, tracks)
elif field == 'date':
tracks = filter(date_filter, tracks)
elif field == 'comment':
tracks = filter(comment_filter, tracks)
elif field == 'any':
tracks = filter(any_filter, tracks)
else:
raise LookupError('Invalid lookup field: %s' % field)
# TODO: add local:search:<query>
return SearchResult(uri='local:search', tracks=tracks)
def search(tracks, query=None, uris=None):
# TODO Only return results within URI roots given by ``uris``
if query is None:
query = {}
_validate_query(query)
for (field, values) in query.iteritems():
if not hasattr(values, '__iter__'):
values = [values]
# FIXME this is bound to be slow for large libraries
for value in values:
if field == 'track_no':
q = _convert_to_int(value)
else:
q = value.strip().lower()
uri_filter = lambda t: q in t.uri.lower()
track_name_filter = lambda t: q in t.name.lower()
album_filter = lambda t: q in getattr(
t, 'album', Album()).name.lower()
artist_filter = lambda t: filter(
lambda a: q in a.name.lower(), t.artists)
albumartist_filter = lambda t: any([
q in a.name.lower()
for a in getattr(t.album, 'artists', [])])
composer_filter = lambda t: any([
q in a.name.lower()
for a in getattr(t, 'composers', [])])
performer_filter = lambda t: any([
q in a.name.lower()
for a in getattr(t, 'performers', [])])
track_no_filter = lambda t: q == t.track_no
genre_filter = lambda t: t.genre and q in t.genre.lower()
date_filter = lambda t: t.date and t.date.startswith(q)
comment_filter = lambda t: t.comment and q in t.comment.lower()
any_filter = lambda t: (
uri_filter(t) or
track_name_filter(t) or
album_filter(t) or
artist_filter(t) or
albumartist_filter(t) or
composer_filter(t) or
performer_filter(t) or
track_no_filter(t) or
genre_filter(t) or
date_filter(t) or
comment_filter(t))
if field == 'uri':
tracks = filter(uri_filter, tracks)
elif field == 'track_name':
tracks = filter(track_name_filter, tracks)
elif field == 'album':
tracks = filter(album_filter, tracks)
elif field == 'artist':
tracks = filter(artist_filter, tracks)
elif field == 'albumartist':
tracks = filter(albumartist_filter, tracks)
elif field == 'composer':
tracks = filter(composer_filter, tracks)
elif field == 'performer':
tracks = filter(performer_filter, tracks)
elif field == 'track_no':
tracks = filter(track_no_filter, tracks)
elif field == 'genre':
tracks = filter(genre_filter, tracks)
elif field == 'date':
tracks = filter(date_filter, tracks)
elif field == 'comment':
tracks = filter(comment_filter, tracks)
elif field == 'any':
tracks = filter(any_filter, tracks)
else:
raise LookupError('Invalid lookup field: %s' % field)
# TODO: add local:search:<query>
return SearchResult(uri='local:search', tracks=tracks)
def _validate_query(query):
for (_, values) in query.iteritems():
if not values:
raise LookupError('Missing query')
for value in values:
if not value:
raise LookupError('Missing query')
def _convert_to_int(string):
try:
return int(string)
except ValueError:
return object()

View File

@ -3,19 +3,30 @@ from __future__ import unicode_literals
import logging
import os
import urlparse
import urllib
from mopidy.models import Track, Artist, Album
from mopidy.utils.encoding import locale_decode
from mopidy.utils.path import path_to_uri, uri_to_path
logger = logging.getLogger('mopidy.backends.local')
def local_to_file_uri(uri, media_dir):
# TODO: check that type is correct.
def local_track_uri_to_file_uri(uri, media_dir):
return path_to_uri(local_track_uri_to_path(uri, media_dir))
def local_track_uri_to_path(uri, media_dir):
if not uri.startswith('local:track:'):
raise ValueError('Invalid URI.')
file_path = uri_to_path(uri).split(b':', 1)[1]
file_path = os.path.join(media_dir, file_path)
return path_to_uri(file_path)
return os.path.join(media_dir, file_path)
def path_to_local_track_uri(relpath):
"""Convert path releative to media_dir to local track URI."""
if isinstance(relpath, unicode):
relpath = relpath.encode('utf-8')
return b'local:track:%s' % urllib.quote(relpath)
def parse_m3u(file_path, media_dir):
@ -63,127 +74,3 @@ def parse_m3u(file_path, media_dir):
uris.append(path)
return uris
# TODO: remove music_dir from API
def parse_mpd_tag_cache(tag_cache, music_dir=''):
"""
Converts a MPD tag_cache into a lists of tracks, artists and albums.
"""
tracks = set()
try:
with open(tag_cache) as library:
contents = library.read()
except IOError as error:
logger.warning('Could not open tag cache: %s', locale_decode(error))
return tracks
current = {}
state = None
# TODO: uris as bytes
for line in contents.split(b'\n'):
if line == b'songList begin':
state = 'songs'
continue
elif line == b'songList end':
state = None
continue
elif not state:
continue
key, value = line.split(b': ', 1)
if key == b'key':
_convert_mpd_data(current, tracks)
current.clear()
current[key.lower()] = value.decode('utf-8')
_convert_mpd_data(current, tracks)
return tracks
def _convert_mpd_data(data, tracks):
if not data:
return
track_kwargs = {}
album_kwargs = {}
artist_kwargs = {}
albumartist_kwargs = {}
if 'track' in data:
if '/' in data['track']:
album_kwargs['num_tracks'] = int(data['track'].split('/')[1])
track_kwargs['track_no'] = int(data['track'].split('/')[0])
else:
track_kwargs['track_no'] = int(data['track'])
if 'mtime' in data:
track_kwargs['last_modified'] = int(data['mtime'])
if 'artist' in data:
artist_kwargs['name'] = data['artist']
if 'albumartist' in data:
albumartist_kwargs['name'] = data['albumartist']
if 'composer' in data:
track_kwargs['composers'] = [Artist(name=data['composer'])]
if 'performer' in data:
track_kwargs['performers'] = [Artist(name=data['performer'])]
if 'album' in data:
album_kwargs['name'] = data['album']
if 'title' in data:
track_kwargs['name'] = data['title']
if 'genre' in data:
track_kwargs['genre'] = data['genre']
if 'date' in data:
track_kwargs['date'] = data['date']
if 'comment' in data:
track_kwargs['comment'] = data['comment']
if 'musicbrainz_trackid' in data:
track_kwargs['musicbrainz_id'] = data['musicbrainz_trackid']
if 'musicbrainz_albumid' in data:
album_kwargs['musicbrainz_id'] = data['musicbrainz_albumid']
if 'musicbrainz_artistid' in data:
artist_kwargs['musicbrainz_id'] = data['musicbrainz_artistid']
if 'musicbrainz_albumartistid' in data:
albumartist_kwargs['musicbrainz_id'] = (
data['musicbrainz_albumartistid'])
if artist_kwargs:
artist = Artist(**artist_kwargs)
track_kwargs['artists'] = [artist]
if albumartist_kwargs:
albumartist = Artist(**albumartist_kwargs)
album_kwargs['artists'] = [albumartist]
if album_kwargs:
album = Album(**album_kwargs)
track_kwargs['album'] = album
if data['file'][0] == '/':
path = data['file'][1:]
else:
path = data['file']
track_kwargs['uri'] = 'local:track:%s' % path
track_kwargs['length'] = int(data.get('time', 0)) * 1000
track = Track(**track_kwargs)
tracks.add(track)

View File

@ -167,6 +167,8 @@ def _format(config, comments, schemas, display, disable):
continue
output.append(b'[%s]' % bytes(schema.name))
for key, value in serialized.items():
if isinstance(value, types.DeprecatedValue):
continue
comment = bytes(comments.get(schema.name, {}).get(key, ''))
output.append(b'%s =' % bytes(key))
if value is not None:

View File

@ -45,7 +45,6 @@ def convert(settings):
helper('local/media_dir', 'LOCAL_MUSIC_PATH')
helper('local/playlists_dir', 'LOCAL_PLAYLIST_PATH')
helper('local/tag_cache_file', 'LOCAL_TAG_CACHE_FILE')
helper('spotify/username', 'SPOTIFY_USERNAME')
helper('spotify/password', 'SPOTIFY_PASSWORD')

View File

@ -72,7 +72,9 @@ class ConfigSchema(collections.OrderedDict):
errors[key] = str(e)
for key in self.keys():
if key not in result and key not in errors:
if isinstance(self[key], types.Deprecated):
result.pop(key, None)
elif key not in result and key not in errors:
result[key] = None
errors[key] = 'config key not found.'

View File

@ -31,6 +31,10 @@ class ExpandedPath(bytes):
self.original = original
class DeprecatedValue(object):
pass
class ConfigValue(object):
"""Represents a config key's value and how to handle it.
@ -59,6 +63,20 @@ class ConfigValue(object):
return bytes(value)
class Deprecated(ConfigValue):
"""Deprecated value
Used for ignoring old config values that are no longer in use, but should
not cause the config parser to crash.
"""
def deserialize(self, value):
return DeprecatedValue()
def serialize(self, value, display=False):
return DeprecatedValue()
class String(ConfigValue):
"""String value.

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import collections
import itertools
import pykka
@ -79,34 +80,29 @@ class Backends(list):
def __init__(self, backends):
super(Backends, self).__init__(backends)
# These lists keeps the backends in the original order, but only
# includes those which implements the required backend provider. Since
# it is important to keep the order, we can't simply use .values() on
# the X_by_uri_scheme dicts below.
self.with_library = [b for b in backends if b.has_library().get()]
self.with_playback = [b for b in backends if b.has_playback().get()]
self.with_playlists = [
b for b in backends if b.has_playlists().get()]
self.with_library = collections.OrderedDict()
self.with_playback = collections.OrderedDict()
self.with_playlists = collections.OrderedDict()
self.by_uri_scheme = {}
for backend in backends:
for uri_scheme in backend.uri_schemes.get():
assert uri_scheme not in self.by_uri_scheme, (
'Cannot add URI scheme %s for %s, '
'it is already handled by %s'
) % (
uri_scheme, backend.__class__.__name__,
self.by_uri_scheme[uri_scheme].__class__.__name__)
self.by_uri_scheme[uri_scheme] = backend
has_library = backend.has_library().get()
has_playback = backend.has_playback().get()
has_playlists = backend.has_playlists().get()
self.with_library_by_uri_scheme = {}
self.with_playback_by_uri_scheme = {}
self.with_playlists_by_uri_scheme = {}
for scheme in backend.uri_schemes.get():
self.add(self.with_library, has_library, scheme, backend)
self.add(self.with_playback, has_playback, scheme, backend)
self.add(self.with_playlists, has_playlists, scheme, backend)
for uri_scheme, backend in self.by_uri_scheme.items():
if backend.has_library().get():
self.with_library_by_uri_scheme[uri_scheme] = backend
if backend.has_playback().get():
self.with_playback_by_uri_scheme[uri_scheme] = backend
if backend.has_playlists().get():
self.with_playlists_by_uri_scheme[uri_scheme] = backend
def add(self, registry, supported, uri_scheme, backend):
if not supported:
return
if uri_scheme not in registry:
registry[uri_scheme] = backend
return
get_name = lambda actor: actor.actor_ref.actor_class.__name__
raise AssertionError(
'Cannot add URI scheme %s for %s, it is already handled by %s' %
(uri_scheme, get_name(backend), get_name(registry[uri_scheme])))

View File

@ -1,6 +1,6 @@
from __future__ import unicode_literals
from collections import defaultdict
import collections
import urlparse
import pykka
@ -15,18 +15,18 @@ class LibraryController(object):
def _get_backend(self, uri):
uri_scheme = urlparse.urlparse(uri).scheme
return self.backends.with_library_by_uri_scheme.get(uri_scheme, None)
return self.backends.with_library.get(uri_scheme, None)
def _get_backends_to_uris(self, uris):
if uris:
backends_to_uris = defaultdict(list)
backends_to_uris = collections.defaultdict(list)
for uri in uris:
backend = self._get_backend(uri)
if backend is not None:
backends_to_uris[backend].append(uri)
else:
backends_to_uris = dict([
(b, None) for b in self.backends.with_library])
(b, None) for b in self.backends.with_library.values()])
return backends_to_uris
def find_exact(self, query=None, uris=None, **kwargs):
@ -103,8 +103,8 @@ class LibraryController(object):
if backend:
backend.library.refresh(uri).get()
else:
futures = [
b.library.refresh(uri) for b in self.backends.with_library]
futures = [b.library.refresh(uri)
for b in self.backends.with_library.values()]
pykka.get_all(futures)
def search(self, query=None, uris=None, **kwargs):

View File

@ -28,7 +28,7 @@ class PlaybackController(object):
return None
uri = self.current_tl_track.track.uri
uri_scheme = urlparse.urlparse(uri).scheme
return self.backends.with_playback_by_uri_scheme.get(uri_scheme, None)
return self.backends.with_playback.get(uri_scheme, None)
### Properties

View File

@ -16,8 +16,8 @@ class PlaylistsController(object):
self.core = core
def get_playlists(self, include_tracks=True):
futures = [
b.playlists.playlists for b in self.backends.with_playlists]
futures = [b.playlists.playlists
for b in self.backends.with_playlists.values()]
results = pykka.get_all(futures)
playlists = list(itertools.chain(*results))
if not include_tracks:
@ -49,10 +49,11 @@ class PlaylistsController(object):
:type uri_scheme: string
:rtype: :class:`mopidy.models.Playlist`
"""
if uri_scheme in self.backends.with_playlists_by_uri_scheme:
backend = self.backends.by_uri_scheme[uri_scheme]
if uri_scheme in self.backends.with_playlists:
backend = self.backends.with_playlists[uri_scheme]
else:
backend = self.backends.with_playlists[0]
# TODO: this fallback looks suspicious
backend = self.backends.with_playlists.values()[0]
playlist = backend.playlists.create(name).get()
listener.CoreListener.send('playlist_changed', playlist=playlist)
return playlist
@ -68,8 +69,7 @@ class PlaylistsController(object):
:type uri: string
"""
uri_scheme = urlparse.urlparse(uri).scheme
backend = self.backends.with_playlists_by_uri_scheme.get(
uri_scheme, None)
backend = self.backends.with_playlists.get(uri_scheme, None)
if backend:
backend.playlists.delete(uri).get()
@ -111,8 +111,7 @@ class PlaylistsController(object):
:rtype: :class:`mopidy.models.Playlist` or :class:`None`
"""
uri_scheme = urlparse.urlparse(uri).scheme
backend = self.backends.with_playlists_by_uri_scheme.get(
uri_scheme, None)
backend = self.backends.with_playlists.get(uri_scheme, None)
if backend:
return backend.playlists.lookup(uri).get()
else:
@ -131,13 +130,12 @@ class PlaylistsController(object):
:type uri_scheme: string
"""
if uri_scheme is None:
futures = [
b.playlists.refresh() for b in self.backends.with_playlists]
futures = [b.playlists.refresh()
for b in self.backends.with_playlists.values()]
pykka.get_all(futures)
listener.CoreListener.send('playlists_loaded')
else:
backend = self.backends.with_playlists_by_uri_scheme.get(
uri_scheme, None)
backend = self.backends.with_playlists.get(uri_scheme, None)
if backend:
backend.playlists.refresh().get()
listener.CoreListener.send('playlists_loaded')
@ -167,8 +165,7 @@ class PlaylistsController(object):
if playlist.uri is None:
return
uri_scheme = urlparse.urlparse(playlist.uri).scheme
backend = self.backends.with_playlists_by_uri_scheme.get(
uri_scheme, None)
backend = self.backends.with_playlists.get(uri_scheme, None)
if backend:
playlist = backend.playlists.save(playlist).get()
listener.CoreListener.send('playlist_changed', playlist=playlist)

View File

@ -1,14 +1,9 @@
from __future__ import unicode_literals
import os
import re
import shlex
import urllib
from mopidy.frontends.mpd import protocol
from mopidy.frontends.mpd.exceptions import MpdArgError
from mopidy.models import TlTrack
from mopidy.utils.path import mtime as get_mtime, uri_to_path, split_path
# TODO: special handling of local:// uri scheme
@ -87,27 +82,6 @@ def track_to_mpd_format(track, position=None):
return result
MPD_KEY_ORDER = '''
key file Time Artist Album AlbumArtist Title Track Genre Date Composer
Performer Comment Disc MUSICBRAINZ_ALBUMID MUSICBRAINZ_ALBUMARTISTID
MUSICBRAINZ_ARTISTID MUSICBRAINZ_TRACKID mtime
'''.split()
def order_mpd_track_info(result):
"""
Order results from
:func:`mopidy.frontends.mpd.translator.track_to_mpd_format` so that it
matches MPD's ordering. Simply a cosmetic fix for easier diffing of
tag_caches.
:param result: the track info
:type result: list of tuples
:rtype: list of tuples
"""
return sorted(result, key=lambda i: MPD_KEY_ORDER.index(i[0]))
def artists_to_mpd_format(artists):
"""
Format track artists for output to MPD client.
@ -197,92 +171,3 @@ def query_from_mpd_list_format(field, mpd_query):
return query
else:
raise MpdArgError('not able to parse args', command='list')
# TODO: move to tagcache backend.
def tracks_to_tag_cache_format(tracks, media_dir):
"""
Format list of tracks for output to MPD tag cache
:param tracks: the tracks
:type tracks: list of :class:`mopidy.models.Track`
:param media_dir: the path to the music dir
:type media_dir: string
:rtype: list of lists of two-tuples
"""
result = [
('info_begin',),
('mpd_version', protocol.VERSION),
('fs_charset', protocol.ENCODING),
('info_end',)
]
tracks.sort(key=lambda t: t.uri)
dirs, files = tracks_to_directory_tree(tracks, media_dir)
_add_to_tag_cache(result, dirs, files, media_dir)
return result
# TODO: bytes only
def _add_to_tag_cache(result, dirs, files, media_dir):
base_path = media_dir.encode('utf-8')
for path, (entry_dirs, entry_files) in dirs.items():
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
name = os.path.split(text_path)[1]
result.append(('directory', text_path))
result.append(('mtime', get_mtime(os.path.join(base_path, path))))
result.append(('begin', name))
_add_to_tag_cache(result, entry_dirs, entry_files, media_dir)
result.append(('end', name))
result.append(('songList begin',))
for track in files:
track_result = dict(track_to_mpd_format(track))
# XXX Don't save comments to the tag cache as they may span multiple
# lines. We'll start saving track comments when we move from tag_cache
# to a JSON file. See #579 for details.
if 'Comment' in track_result:
del track_result['Comment']
path = uri_to_path(track_result['file'])
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
relative_path = os.path.relpath(path, base_path)
relative_uri = urllib.quote(relative_path)
# TODO: use track.last_modified
track_result['file'] = relative_uri
track_result['mtime'] = get_mtime(path)
track_result['key'] = os.path.basename(text_path)
track_result = order_mpd_track_info(track_result.items())
result.extend(track_result)
result.append(('songList end',))
def tracks_to_directory_tree(tracks, media_dir):
directories = ({}, [])
for track in tracks:
path = b''
current = directories
absolute_track_dir_path = os.path.dirname(uri_to_path(track.uri))
relative_track_dir_path = re.sub(
'^' + re.escape(media_dir), b'', absolute_track_dir_path)
for part in split_path(relative_track_dir_path):
path = os.path.join(path, part)
if path not in current[0]:
current[0][path] = ({}, [])
current = current[0][path]
current[1].append(track)
return directories

View File

@ -119,26 +119,20 @@ def find_files(path):
path = path.encode('utf-8')
if os.path.isfile(path):
if not os.path.basename(path).startswith(b'.'):
yield path
else:
for dirpath, dirnames, filenames in os.walk(path, followlinks=True):
for dirname in dirnames:
if dirname.startswith(b'.'):
# Skip hidden dirs by modifying dirnames inplace
dirnames.remove(dirname)
return
for filename in filenames:
if filename.startswith(b'.'):
# Skip hidden files
continue
for dirpath, dirnames, filenames in os.walk(path, followlinks=True):
for dirname in dirnames:
if dirname.startswith(b'.'):
# Skip hidden dirs by modifying dirnames inplace
dirnames.remove(dirname)
yield os.path.join(dirpath, filename)
for filename in filenames:
if filename.startswith(b'.'):
# Skip hidden files
continue
def find_uris(path):
for p in find_files(path):
yield path_to_uri(p)
yield os.path.relpath(os.path.join(dirpath, filename), path)
def check_file_path_is_inside_base_dir(file_path, base_path):

View File

@ -43,6 +43,7 @@ setup(
'mopidy.ext': [
'http = mopidy.frontends.http:Extension [http]',
'local = mopidy.backends.local:Extension',
'local-json = mopidy.backends.local.json:Extension',
'mpd = mopidy.frontends.mpd:Extension',
'stream = mopidy.backends.stream:Extension',
],

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import os
import unittest
from mopidy import exceptions
@ -240,11 +241,15 @@ class ScannerTest(unittest.TestCase):
self.errors = {}
self.data = {}
def scan(self, path):
paths = path_lib.find_files(path_to_data_dir(path))
uris = (path_lib.path_to_uri(p) for p in paths)
def find(self, path):
media_dir = path_to_data_dir(path)
for path in path_lib.find_files(media_dir):
yield os.path.join(media_dir, path)
def scan(self, paths):
scanner = scan.Scanner()
for uri in uris:
for path in paths:
uri = path_lib.path_to_uri(path)
key = uri[len('file://'):]
try:
self.data[key] = scanner.scan(uri)
@ -256,15 +261,15 @@ class ScannerTest(unittest.TestCase):
self.assertEqual(self.data[name][key], value)
def test_data_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.assert_(self.data)
def test_errors_is_not_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.assert_(not self.errors)
def test_uri_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check(
'scanner/simple/song1.mp3', 'uri',
'file://%s' % path_to_data_dir('scanner/simple/song1.mp3'))
@ -273,39 +278,39 @@ class ScannerTest(unittest.TestCase):
'file://%s' % path_to_data_dir('scanner/simple/song1.ogg'))
def test_duration_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'duration', 4680000000)
self.check('scanner/simple/song1.ogg', 'duration', 4680000000)
def test_artist_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'artist', 'name')
self.check('scanner/simple/song1.ogg', 'artist', 'name')
def test_album_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'album', 'albumname')
self.check('scanner/simple/song1.ogg', 'album', 'albumname')
def test_track_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'title', 'trackname')
self.check('scanner/simple/song1.ogg', 'title', 'trackname')
def test_nonexistant_dir_does_not_fail(self):
self.scan('scanner/does-not-exist')
self.scan(self.find('scanner/does-not-exist'))
self.assert_(not self.errors)
def test_other_media_is_ignored(self):
self.scan('scanner/image')
self.scan(self.find('scanner/image'))
self.assert_(self.errors)
def test_log_file_that_gst_thinks_is_mpeg_1_is_ignored(self):
self.scan('scanner/example.log')
self.scan([path_to_data_dir('scanner/example.log')])
self.assert_(self.errors)
def test_empty_wav_file_is_ignored(self):
self.scan('scanner/empty.wav')
self.scan([path_to_data_dir('scanner/empty.wav')])
self.assert_(self.errors)
@unittest.SkipTest

View File

@ -18,7 +18,6 @@ class LocalBackendEventsTest(unittest.TestCase):
'local': {
'media_dir': path_to_data_dir(''),
'playlists_dir': b'',
'tag_cache_file': path_to_data_dir('empty_tag_cache'),
}
}

View File

@ -1,12 +1,13 @@
from __future__ import unicode_literals
import copy
import tempfile
import unittest
import pykka
from mopidy import core
from mopidy.backends.local import actor
from mopidy.backends.local.json import actor
from mopidy.models import Track, Album, Artist
from tests import path_to_data_dir
@ -61,12 +62,14 @@ class LocalLibraryProviderTest(unittest.TestCase):
'local': {
'media_dir': path_to_data_dir(''),
'playlists_dir': b'',
'tag_cache_file': path_to_data_dir('library_tag_cache'),
}
},
'local-json': {
'json_file': path_to_data_dir('library.json.gz'),
},
}
def setUp(self):
self.backend = actor.LocalBackend.start(
self.backend = actor.LocalJsonBackend.start(
config=self.config, audio=None).proxy()
self.core = core.Core(backends=[self.backend])
self.library = self.core.library
@ -85,27 +88,27 @@ class LocalLibraryProviderTest(unittest.TestCase):
# Verifies that https://github.com/mopidy/mopidy/issues/500
# has been fixed.
tag_cache = tempfile.NamedTemporaryFile()
with open(self.config['local']['tag_cache_file']) as fh:
tag_cache.write(fh.read())
tag_cache.flush()
with tempfile.NamedTemporaryFile() as library:
with open(self.config['local-json']['json_file']) as fh:
library.write(fh.read())
library.flush()
config = {'local': self.config['local'].copy()}
config['local']['tag_cache_file'] = tag_cache.name
backend = actor.LocalBackend(config=config, audio=None)
config = copy.deepcopy(self.config)
config['local-json']['json_file'] = library.name
backend = actor.LocalJsonBackend(config=config, audio=None)
# Sanity check that value is in tag cache
result = backend.library.lookup(self.tracks[0].uri)
self.assertEqual(result, self.tracks[0:1])
# Sanity check that value is in the library
result = backend.library.lookup(self.tracks[0].uri)
self.assertEqual(result, self.tracks[0:1])
# Clear tag cache and refresh
tag_cache.seek(0)
tag_cache.truncate()
backend.library.refresh()
# Clear library and refresh
library.seek(0)
library.truncate()
backend.library.refresh()
# Now it should be gone.
result = backend.library.lookup(self.tracks[0].uri)
self.assertEqual(result, [])
# Now it should be gone.
result = backend.library.lookup(self.tracks[0].uri)
self.assertEqual(result, [])
def test_lookup(self):
tracks = self.library.lookup(self.tracks[0].uri)
@ -115,6 +118,7 @@ class LocalLibraryProviderTest(unittest.TestCase):
tracks = self.library.lookup('fake uri')
self.assertEqual(tracks, [])
# TODO: move to search_test module
def test_find_exact_no_hits(self):
result = self.library.find_exact(track_name=['unknown track'])
self.assertEqual(list(result[0].tracks), [])

View File

@ -23,7 +23,6 @@ class LocalPlaybackProviderTest(unittest.TestCase):
'local': {
'media_dir': path_to_data_dir(''),
'playlists_dir': b'',
'tag_cache_file': path_to_data_dir('empty_tag_cache'),
}
}

View File

@ -20,7 +20,6 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
config = {
'local': {
'media_dir': path_to_data_dir(''),
'tag_cache_file': path_to_data_dir('library_tag_cache'),
}
}

View File

@ -19,7 +19,6 @@ class LocalTracklistProviderTest(unittest.TestCase):
'local': {
'media_dir': path_to_data_dir(''),
'playlists_dir': b'',
'tag_cache_file': path_to_data_dir('empty_tag_cache'),
}
}
tracks = [

View File

@ -6,8 +6,7 @@ import os
import tempfile
import unittest
from mopidy.backends.local.translator import parse_m3u, parse_mpd_tag_cache
from mopidy.models import Track, Artist, Album
from mopidy.backends.local.translator import parse_m3u
from mopidy.utils.path import path_to_uri
from tests import path_to_data_dir
@ -89,106 +88,3 @@ class M3UToUriTest(unittest.TestCase):
class URItoM3UTest(unittest.TestCase):
pass
expected_artists = [Artist(name='name')]
expected_albums = [
Album(name='albumname', artists=expected_artists, num_tracks=2),
Album(name='albumname', num_tracks=2),
]
expected_tracks = []
def generate_track(path, ident, album_id):
uri = 'local:track:%s' % path
track = Track(
uri=uri, name='trackname', artists=expected_artists,
album=expected_albums[album_id], track_no=1, date='2006', length=4000,
last_modified=1272319626)
expected_tracks.append(track)
generate_track('song1.mp3', 6, 0)
generate_track('song2.mp3', 7, 0)
generate_track('song3.mp3', 8, 1)
generate_track('subdir1/song4.mp3', 2, 0)
generate_track('subdir1/song5.mp3', 3, 0)
generate_track('subdir2/song6.mp3', 4, 1)
generate_track('subdir2/song7.mp3', 5, 1)
generate_track('subdir1/subsubdir/song8.mp3', 0, 0)
generate_track('subdir1/subsubdir/song9.mp3', 1, 1)
class MPDTagCacheToTracksTest(unittest.TestCase):
def test_emtpy_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('empty_tag_cache'), path_to_data_dir(''))
self.assertEqual(set(), tracks)
def test_simple_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('simple_tag_cache'), path_to_data_dir(''))
track = Track(
uri='local:track:song1.mp3', name='trackname',
artists=expected_artists, track_no=1, album=expected_albums[0],
date='2006', length=4000, last_modified=1272319626)
self.assertEqual(set([track]), tracks)
def test_advanced_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('advanced_tag_cache'), path_to_data_dir(''))
self.assertEqual(set(expected_tracks), tracks)
def test_unicode_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('utf8_tag_cache'), path_to_data_dir(''))
artists = [Artist(name='æøå')]
album = Album(name='æøå', artists=artists)
track = Track(
uri='local:track:song1.mp3', name='æøå', artists=artists,
composers=artists, performers=artists, genre='æøå',
album=album, length=4000, last_modified=1272319626,
comment='æøå&^`ൂ㔶')
self.assertEqual(track, list(tracks)[0])
@unittest.SkipTest
def test_misencoded_cache(self):
# FIXME not sure if this can happen
pass
def test_cache_with_blank_track_info(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('blank_tag_cache'), path_to_data_dir(''))
expected = Track(
uri='local:track:song1.mp3', length=4000, last_modified=1272319626)
self.assertEqual(set([expected]), tracks)
def test_musicbrainz_tagcache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('musicbrainz_tag_cache'), path_to_data_dir(''))
artist = list(expected_tracks[0].artists)[0].copy(
musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897')
albumartist = list(expected_tracks[0].artists)[0].copy(
name='albumartistname',
musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897')
album = expected_tracks[0].album.copy(
artists=[albumartist],
musicbrainz_id='cb5f1603-d314-4c9c-91e5-e295cfb125d2')
track = expected_tracks[0].copy(
artists=[artist], album=album,
musicbrainz_id='90488461-8c1f-4a4e-826b-4c6dc70801f0')
self.assertEqual(track, list(tracks)[0])
def test_albumartist_tag_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('albumartist_tag_cache'), path_to_data_dir(''))
artist = Artist(name='albumartistname')
album = expected_albums[0].copy(artists=[artist])
track = Track(
uri='local:track:song1.mp3', name='trackname',
artists=expected_artists, track_no=1, album=album, date='2006',
length=4000, last_modified=1272319626)
self.assertEqual(track, list(tracks)[0])

View File

@ -4,7 +4,7 @@ import logging
import mock
import unittest
from mopidy.config import schemas
from mopidy.config import schemas, types
from tests import any_unicode
@ -77,6 +77,13 @@ class ConfigSchemaTest(unittest.TestCase):
self.assertIsNone(result['bar'])
self.assertIsNone(result['baz'])
def test_deserialize_deprecated_value(self):
self.schema['foo'] = types.Deprecated()
result, errors = self.schema.deserialize(self.values)
self.assertItemsEqual(['bar', 'baz'], result.keys())
self.assertNotIn('foo', errors)
class LogLevelConfigSchemaTest(unittest.TestCase):
def test_conversion(self):

View File

@ -33,6 +33,16 @@ class ConfigValueTest(unittest.TestCase):
self.assertIsInstance(value.serialize(object(), display=True), bytes)
class DeprecatedTest(unittest.TestCase):
def test_deserialize_returns_deprecated_value(self):
self.assertIsInstance(types.Deprecated().deserialize(b'foobar'),
types.DeprecatedValue)
def test_serialize_returns_deprecated_value(self):
self.assertIsInstance(types.Deprecated().serialize('foobar'),
types.DeprecatedValue)
class StringTest(unittest.TestCase):
def test_deserialize_conversion_success(self):
value = types.String()

View File

@ -28,10 +28,29 @@ class CoreActorTest(unittest.TestCase):
self.assertIn('dummy2', result)
def test_backends_with_colliding_uri_schemes_fails(self):
self.backend1.__class__.__name__ = b'B1'
self.backend2.__class__.__name__ = b'B2'
self.backend1.actor_ref.actor_class.__name__ = b'B1'
self.backend2.actor_ref.actor_class.__name__ = b'B2'
self.backend2.uri_schemes.get.return_value = ['dummy1', 'dummy2']
self.assertRaisesRegexp(
AssertionError,
'Cannot add URI scheme dummy1 for B2, it is already handled by B1',
Core, audio=None, backends=[self.backend1, self.backend2])
def test_backends_with_colliding_uri_schemes_passes(self):
"""
Checks that backends with overlapping schemes, but distinct sub parts
provided can co-exist.
"""
self.backend1.has_library().get.return_value = False
self.backend1.has_playlists().get.return_value = False
self.backend2.uri_schemes.get.return_value = ['dummy1']
self.backend2.has_playback().get.return_value = False
self.backend2.has_playlists().get.return_value = False
core = Core(audio=None, backends=[self.backend1, self.backend2])
self.assertEqual(core.backends.with_playback,
{'dummy1': self.backend1})
self.assertEqual(core.backends.with_library,
{'dummy1': self.backend2})

View File

@ -1,107 +0,0 @@
info_begin
mpd_version: 0.14.2
fs_charset: UTF-8
info_end
directory: subdir1
begin: subdir1
directory: subsubdir
begin: subdir1/subsubdir
songList begin
key: song8.mp3
file: subdir1/subsubdir/song8.mp3
Time: 4
Artist: name
AlbumArtist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
key: song9.mp3
file: subdir1/subsubdir/song9.mp3
Time: 4
Artist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
songList end
end: subdir1/subsubdir
songList begin
key: song4.mp3
file: subdir1/song4.mp3
Time: 4
Artist: name
AlbumArtist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
key: song5.mp3
file: subdir1/song5.mp3
Time: 4
Artist: name
AlbumArtist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
songList end
end: subdir1
directory: subdir2
begin: subdir2
songList begin
key: song6.mp3
file: subdir2/song6.mp3
Time: 4
Artist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
key: song7.mp3
file: subdir2/song7.mp3
Time: 4
Artist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
songList end
end: subdir2
songList begin
key: song1.mp3
file: /song1.mp3
Time: 4
Artist: name
AlbumArtist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
key: song2.mp3
file: /song2.mp3
Time: 4
Artist: name
AlbumArtist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
key: song3.mp3
file: /song3.mp3
Time: 4
Artist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
songList end

View File

@ -1,16 +0,0 @@
info_begin
mpd_version: 0.14.2
fs_charset: UTF-8
info_end
songList begin
key: song1.mp3
file: /song1.mp3
Time: 4
Artist: name
Title: trackname
Album: albumname
AlbumArtist: albumartistname
Track: 1/2
Date: 2006
mtime: 1272319626
songList end

View File

@ -1,10 +0,0 @@
info_begin
mpd_version: 0.14.2
fs_charset: UTF-8
info_end
songList begin
key: song1.mp3
file: /song1.mp3
Time: 4
mtime: 1272319626
songList end

View File

@ -1,6 +0,0 @@
info_begin
mpd_version: 0.14.2
fs_charset: UTF-8
info_end
songList begin
songList end

0
tests/data/find/baz/file Normal file
View File

View File

0
tests/data/find/foo/file Normal file
View File

BIN
tests/data/library.json.gz Normal file

Binary file not shown.

View File

@ -1,56 +0,0 @@
info_begin
mpd_version: 0.14.2
fs_charset: UTF-8
info_end
songList begin
key: key1
file: /path1
Artist: artist1
AlbumArtist: artist1
Title: track1
Album: album1
Date: 2001-02-03
Track: 1
Time: 4
key: key2
file: /path2
Artist: artist2
AlbumArtist: artist2
Title: track2
Album: album2
Date: 2002
Track: 2
Time: 4
key: key3
file: /path3
Artist: artist4
AlbumArtist: artist3
Title: track3
Album: album3
Date: 2003
Track: 3
Time: 4
key: key4
file: /path4
Artist: artist3
Title: track4
Album: album4
Date: 2004
Track: 4
Comment: This is a fantastic track
Time: 60
key: key5
file: /path5
Composer: artist5
Title: track5
Album: album4
Genre: genre1
Time: 4
key: key6
file: /path6
Performer: artist6
Title: track6
Album: album4
Genre: genre2
Time: 4
songList end

View File

@ -1,20 +0,0 @@
info_begin
mpd_version: 0.16.0
fs_charset: UTF-8
info_end
songList begin
key: song1.mp3
file: /song1.mp3
Time: 4
Artist: name
Title: trackname
Album: albumname
AlbumArtist: albumartistname
Track: 1/2
Date: 2006
MUSICBRAINZ_ALBUMID: cb5f1603-d314-4c9c-91e5-e295cfb125d2
MUSICBRAINZ_ALBUMARTISTID: 7364dea6-ca9a-48e3-be01-b44ad0d19897
MUSICBRAINZ_ARTISTID: 7364dea6-ca9a-48e3-be01-b44ad0d19897
MUSICBRAINZ_TRACKID: 90488461-8c1f-4a4e-826b-4c6dc70801f0
mtime: 1272319626
songList end

View File

@ -1,81 +0,0 @@
info_begin
mpd_version: 0.15.4
fs_charset: UTF-8
info_end
directory: subdir1
mtime: 1288121499
begin: subdir1
songList begin
key: song4.mp3
file: subdir1/song4.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song5.mp3
file: subdir1/song5.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end
end: subdir1
directory: subdir2
mtime: 1288121499
begin: subdir2
songList begin
key: song6.mp3
file: subdir2/song6.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song7.mp3
file: subdir2/song7.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end
end: subdir2
songList begin
key: song1.mp3
file: /song1.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song2.mp3
file: /song2.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song3.mp3
file: /song3.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end

View File

@ -1,6 +0,0 @@
info_begin
mpd_version: 0.15.4
fs_charset: UTF-8
info_end
songList begin
songList end

View File

@ -1,15 +0,0 @@
info_begin
mpd_version: 0.15.4
fs_charset: UTF-8
info_end
songList begin
key: song1.mp3
file: /song1.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end

View File

@ -1,16 +0,0 @@
info_begin
mpd_version: 0.14.2
fs_charset: UTF-8
info_end
songList begin
key: song1.mp3
file: /song1.mp3
Time: 4
Artist: name
AlbumArtist: name
Title: trackname
Album: albumname
Track: 1/2
Date: 2006
mtime: 1272319626
songList end

View File

@ -1,18 +0,0 @@
info_begin
mpd_version: 0.14.2
fs_charset: UTF-8
info_end
songList begin
key: song1.mp3
file: /song1.mp3
Time: 4
Artist: æøå
AlbumArtist: æøå
Composer: æøå
Performer: æøå
Title: æøå
Album: æøå
Genre: æøå
Comment: æøå&^`ൂ㔶
mtime: 1272319626
songList end

View File

@ -1,11 +1,10 @@
from __future__ import unicode_literals
import datetime
import os
import unittest
from mopidy.utils.path import mtime, uri_to_path
from mopidy.frontends.mpd import translator, protocol
from mopidy.utils.path import mtime
from mopidy.frontends.mpd import translator
from mopidy.models import Album, Artist, TlTrack, Playlist, Track
@ -126,233 +125,3 @@ class PlaylistMpdFormatTest(unittest.TestCase):
result = translator.playlist_to_mpd_format(playlist, 1, 2)
self.assertEqual(len(result), 1)
self.assertEqual(dict(result[0])['Track'], 2)
class TracksToTagCacheFormatTest(unittest.TestCase):
def setUp(self):
self.media_dir = '/dir/subdir'
mtime.set_fake_time(1234567)
def tearDown(self):
mtime.undo_fake()
def translate(self, track):
base_path = self.media_dir.encode('utf-8')
result = dict(translator.track_to_mpd_format(track))
result['file'] = uri_to_path(result['file'])[len(base_path) + 1:]
result['key'] = os.path.basename(result['file'])
result['mtime'] = mtime('')
return translator.order_mpd_track_info(result.items())
def consume_headers(self, result):
self.assertEqual(('info_begin',), result[0])
self.assertEqual(('mpd_version', protocol.VERSION), result[1])
self.assertEqual(('fs_charset', protocol.ENCODING), result[2])
self.assertEqual(('info_end',), result[3])
return result[4:]
def consume_song_list(self, result):
self.assertEqual(('songList begin',), result[0])
for i, row in enumerate(result):
if row == ('songList end',):
return result[1:i], result[i + 1:]
self.fail("Couldn't find songList end in result")
def consume_directory(self, result):
self.assertEqual('directory', result[0][0])
self.assertEqual(('mtime', mtime('.')), result[1])
self.assertEqual(('begin', os.path.split(result[0][1])[1]), result[2])
directory = result[2][1]
for i, row in enumerate(result):
if row == ('end', directory):
return result[3:i], result[i + 1:]
self.fail("Couldn't find end %s in result" % directory)
def test_empty_tag_cache_has_header(self):
result = translator.tracks_to_tag_cache_format([], self.media_dir)
result = self.consume_headers(result)
def test_empty_tag_cache_has_song_list(self):
result = translator.tracks_to_tag_cache_format([], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
def test_tag_cache_has_header(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
def test_tag_cache_has_song_list(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assert_(song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track_with_key_and_mtime(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_supports_directories(self):
track = Track(uri='file:///dir/subdir/folder/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
song_list, result = self.consume_song_list(dir_data)
self.assertEqual(len(result), 0)
self.assertEqual(formated, song_list)
def test_tag_cache_diretory_header_is_right(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
self.assertEqual(('directory', 'folder/sub'), dir_data[0])
self.assertEqual(('mtime', mtime('.')), dir_data[1])
self.assertEqual(('begin', 'sub'), dir_data[2])
def test_tag_cache_suports_sub_directories(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
dir_data, result = self.consume_directory(dir_data)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(len(song_list), 0)
song_list, result = self.consume_song_list(dir_data)
self.assertEqual(len(result), 0)
self.assertEqual(formated, song_list)
def test_tag_cache_supports_multiple_tracks(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/song2.mp3'),
]
formated = []
formated.extend(self.translate(tracks[0]))
formated.extend(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks, self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_supports_multiple_tracks_in_dirs(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/folder/song2.mp3'),
]
formated = []
formated.append(self.translate(tracks[0]))
formated.append(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks, self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, song_result = self.consume_song_list(dir_data)
self.assertEqual(formated[1], song_list)
self.assertEqual(len(song_result), 0)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(formated[0], song_list)
class TracksToDirectoryTreeTest(unittest.TestCase):
def setUp(self):
self.media_dir = '/root'
def test_no_tracks_gives_emtpy_tree(self):
tree = translator.tracks_to_directory_tree([], self.media_dir)
self.assertEqual(tree, ({}, []))
def test_top_level_files(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/file2.mp3'),
Track(uri='file:///root/file3.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
self.assertEqual(tree, ({}, tracks))
def test_single_file_in_subdir(self):
tracks = [Track(uri='file:///root/dir/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = ({'dir': ({}, tracks)}, [])
self.assertEqual(tree, expected)
def test_single_file_in_sub_subdir(self):
tracks = [Track(uri='file:///root/dir1/dir2/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = ({'dir1': ({'dir1/dir2': ({}, tracks)}, [])}, [])
self.assertEqual(tree, expected)
def test_complex_file_structure(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/dir1/file2.mp3'),
Track(uri='file:///root/dir1/file3.mp3'),
Track(uri='file:///root/dir2/file4.mp3'),
Track(uri='file:///root/dir2/sub/file5.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = (
{
'dir1': ({}, [tracks[1], tracks[2]]),
'dir2': (
{
'dir2/sub': ({}, [tracks[4]])
},
[tracks[3]]
),
},
[tracks[0]]
)
self.assertEqual(tree, expected)

View File

@ -221,9 +221,12 @@ class FindFilesTest(unittest.TestCase):
self.assertEqual(self.find('does-not-exist'), [])
def test_file(self):
files = self.find('blank.mp3')
self.assertEqual(len(files), 1)
self.assertEqual(files[0], path_to_data_dir('blank.mp3'))
self.assertEqual([], self.find('blank.mp3'))
def test_files(self):
files = self.find('find')
expected = [b'foo/bar/file', b'foo/file', b'baz/file']
self.assertItemsEqual(expected, files)
def test_names_are_bytestrings(self):
is_bytes = lambda f: isinstance(f, bytes)
@ -231,35 +234,6 @@ class FindFilesTest(unittest.TestCase):
self.assert_(
is_bytes(name), '%s is not bytes object' % repr(name))
def test_ignores_hidden_dirs(self):
self.assertEqual(self.find('.hidden'), [])
def test_ignores_hidden_files(self):
self.assertEqual(self.find('.blank.mp3'), [])
class FindUrisTest(unittest.TestCase):
def find(self, value):
return list(path.find_uris(path_to_data_dir(value)))
def test_basic_dir(self):
self.assert_(self.find(''))
def test_nonexistant_dir(self):
self.assertEqual(self.find('does-not-exist'), [])
def test_file(self):
uris = self.find('blank.mp3')
expected = path.path_to_uri(path_to_data_dir('blank.mp3'))
self.assertEqual(len(uris), 1)
self.assertEqual(uris[0], expected)
def test_ignores_hidden_dirs(self):
self.assertEqual(self.find('.hidden'), [])
def test_ignores_hidden_files(self):
self.assertEqual(self.find('.blank.mp3'), [])
# TODO: kill this in favour of just os.path.getmtime + mocks
class MtimeTest(unittest.TestCase):