local: Move tag cache translators and tests out.

This commit is contained in:
Thomas Adamcik 2013-11-26 17:47:52 +01:00
parent 03f5ff6f57
commit ff9f473c2f
7 changed files with 598 additions and 581 deletions

View File

@ -5,10 +5,10 @@ import os
import tempfile import tempfile
from mopidy.backends import base from mopidy.backends import base
from mopidy.frontends.mpd import translator as mpd_translator from mopidy.backends.local.translator import local_to_file_uri
from mopidy.models import Album, SearchResult from mopidy.models import Album, SearchResult
from ..translator import local_to_file_uri, parse_mpd_tag_cache from .translator import parse_mpd_tag_cache, tracks_to_tag_cache_format
logger = logging.getLogger('mopidy.backends.local.tagcache') logger = logging.getLogger('mopidy.backends.local.tagcache')
@ -251,7 +251,7 @@ class LocalLibraryUpdateProvider(base.BaseLibraryProvider):
prefix=basename + '.', dir=directory, delete=False) prefix=basename + '.', dir=directory, delete=False)
try: try:
for row in mpd_translator.tracks_to_tag_cache_format( for row in tracks_to_tag_cache_format(
self._tracks.values(), self._media_dir): self._tracks.values(), self._media_dir):
if len(row) == 1: if len(row) == 1:
tmp.write(('%s\n' % row).encode('utf-8')) tmp.write(('%s\n' % row).encode('utf-8'))

View File

@ -0,0 +1,246 @@
from __future__ import unicode_literals
import logging
import os
import re
import urllib
from mopidy.frontends.mpd import translator as mpd, protocol
from mopidy.models import Track, Artist, Album
from mopidy.utils.encoding import locale_decode
from mopidy.utils.path import mtime as get_mtime, split_path, uri_to_path
logger = logging.getLogger('mopidy.backends.local.tagcache')
# TODO: remove music_dir from API
def parse_mpd_tag_cache(tag_cache, music_dir=''):
"""
Converts a MPD tag_cache into a lists of tracks, artists and albums.
"""
tracks = set()
try:
with open(tag_cache) as library:
contents = library.read()
except IOError as error:
logger.warning('Could not open tag cache: %s', locale_decode(error))
return tracks
current = {}
state = None
# TODO: uris as bytes
for line in contents.split(b'\n'):
if line == b'songList begin':
state = 'songs'
continue
elif line == b'songList end':
state = None
continue
elif not state:
continue
key, value = line.split(b': ', 1)
if key == b'key':
_convert_mpd_data(current, tracks)
current.clear()
current[key.lower()] = value.decode('utf-8')
_convert_mpd_data(current, tracks)
return tracks
def _convert_mpd_data(data, tracks):
if not data:
return
track_kwargs = {}
album_kwargs = {}
artist_kwargs = {}
albumartist_kwargs = {}
if 'track' in data:
if '/' in data['track']:
album_kwargs['num_tracks'] = int(data['track'].split('/')[1])
track_kwargs['track_no'] = int(data['track'].split('/')[0])
else:
track_kwargs['track_no'] = int(data['track'])
if 'mtime' in data:
track_kwargs['last_modified'] = int(data['mtime'])
if 'artist' in data:
artist_kwargs['name'] = data['artist']
if 'albumartist' in data:
albumartist_kwargs['name'] = data['albumartist']
if 'composer' in data:
track_kwargs['composers'] = [Artist(name=data['composer'])]
if 'performer' in data:
track_kwargs['performers'] = [Artist(name=data['performer'])]
if 'album' in data:
album_kwargs['name'] = data['album']
if 'title' in data:
track_kwargs['name'] = data['title']
if 'genre' in data:
track_kwargs['genre'] = data['genre']
if 'date' in data:
track_kwargs['date'] = data['date']
if 'comment' in data:
track_kwargs['comment'] = data['comment']
if 'musicbrainz_trackid' in data:
track_kwargs['musicbrainz_id'] = data['musicbrainz_trackid']
if 'musicbrainz_albumid' in data:
album_kwargs['musicbrainz_id'] = data['musicbrainz_albumid']
if 'musicbrainz_artistid' in data:
artist_kwargs['musicbrainz_id'] = data['musicbrainz_artistid']
if 'musicbrainz_albumartistid' in data:
albumartist_kwargs['musicbrainz_id'] = (
data['musicbrainz_albumartistid'])
if artist_kwargs:
artist = Artist(**artist_kwargs)
track_kwargs['artists'] = [artist]
if albumartist_kwargs:
albumartist = Artist(**albumartist_kwargs)
album_kwargs['artists'] = [albumartist]
if album_kwargs:
album = Album(**album_kwargs)
track_kwargs['album'] = album
if data['file'][0] == '/':
path = data['file'][1:]
else:
path = data['file']
track_kwargs['uri'] = 'local:track:%s' % path
track_kwargs['length'] = int(data.get('time', 0)) * 1000
track = Track(**track_kwargs)
tracks.add(track)
def tracks_to_tag_cache_format(tracks, media_dir):
"""
Format list of tracks for output to MPD tag cache
:param tracks: the tracks
:type tracks: list of :class:`mopidy.models.Track`
:param media_dir: the path to the music dir
:type media_dir: string
:rtype: list of lists of two-tuples
"""
result = [
('info_begin',),
('mpd_version', protocol.VERSION),
('fs_charset', protocol.ENCODING),
('info_end',)
]
tracks.sort(key=lambda t: t.uri)
dirs, files = tracks_to_directory_tree(tracks, media_dir)
_add_to_tag_cache(result, dirs, files, media_dir)
return result
# TODO: bytes only
def _add_to_tag_cache(result, dirs, files, media_dir):
base_path = media_dir.encode('utf-8')
for path, (entry_dirs, entry_files) in dirs.items():
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
name = os.path.split(text_path)[1]
result.append(('directory', text_path))
result.append(('mtime', get_mtime(os.path.join(base_path, path))))
result.append(('begin', name))
_add_to_tag_cache(result, entry_dirs, entry_files, media_dir)
result.append(('end', name))
result.append(('songList begin',))
for track in files:
track_result = dict(mpd.track_to_mpd_format(track))
# XXX Don't save comments to the tag cache as they may span multiple
# lines. We'll start saving track comments when we move from tag_cache
# to a JSON file. See #579 for details.
if 'Comment' in track_result:
del track_result['Comment']
path = uri_to_path(track_result['file'])
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
relative_path = os.path.relpath(path, base_path)
relative_uri = urllib.quote(relative_path)
# TODO: use track.last_modified
track_result['file'] = relative_uri
track_result['mtime'] = get_mtime(path)
track_result['key'] = os.path.basename(text_path)
track_result = order_mpd_track_info(track_result.items())
result.extend(track_result)
result.append(('songList end',))
def tracks_to_directory_tree(tracks, media_dir):
directories = ({}, [])
for track in tracks:
path = b''
current = directories
absolute_track_dir_path = os.path.dirname(uri_to_path(track.uri))
relative_track_dir_path = re.sub(
'^' + re.escape(media_dir), b'', absolute_track_dir_path)
for part in split_path(relative_track_dir_path):
path = os.path.join(path, part)
if path not in current[0]:
current[0][path] = ({}, [])
current = current[0][path]
current[1].append(track)
return directories
MPD_KEY_ORDER = '''
key file Time Artist Album AlbumArtist Title Track Genre Date Composer
Performer Comment Disc MUSICBRAINZ_ALBUMID MUSICBRAINZ_ALBUMARTISTID
MUSICBRAINZ_ARTISTID MUSICBRAINZ_TRACKID mtime
'''.split()
def order_mpd_track_info(result):
"""
Order results from
:func:`mopidy.frontends.mpd.translator.track_to_mpd_format` so that it
matches MPD's ordering. Simply a cosmetic fix for easier diffing of
tag_caches.
:param result: the track info
:type result: list of tuples
:rtype: list of tuples
"""
return sorted(result, key=lambda i: MPD_KEY_ORDER.index(i[0]))

View File

@ -4,7 +4,6 @@ import logging
import os import os
import urlparse import urlparse
from mopidy.models import Track, Artist, Album
from mopidy.utils.encoding import locale_decode from mopidy.utils.encoding import locale_decode
from mopidy.utils.path import path_to_uri, uri_to_path from mopidy.utils.path import path_to_uri, uri_to_path
@ -63,127 +62,3 @@ def parse_m3u(file_path, media_dir):
uris.append(path) uris.append(path)
return uris return uris
# TODO: remove music_dir from API
def parse_mpd_tag_cache(tag_cache, music_dir=''):
"""
Converts a MPD tag_cache into a lists of tracks, artists and albums.
"""
tracks = set()
try:
with open(tag_cache) as library:
contents = library.read()
except IOError as error:
logger.warning('Could not open tag cache: %s', locale_decode(error))
return tracks
current = {}
state = None
# TODO: uris as bytes
for line in contents.split(b'\n'):
if line == b'songList begin':
state = 'songs'
continue
elif line == b'songList end':
state = None
continue
elif not state:
continue
key, value = line.split(b': ', 1)
if key == b'key':
_convert_mpd_data(current, tracks)
current.clear()
current[key.lower()] = value.decode('utf-8')
_convert_mpd_data(current, tracks)
return tracks
def _convert_mpd_data(data, tracks):
if not data:
return
track_kwargs = {}
album_kwargs = {}
artist_kwargs = {}
albumartist_kwargs = {}
if 'track' in data:
if '/' in data['track']:
album_kwargs['num_tracks'] = int(data['track'].split('/')[1])
track_kwargs['track_no'] = int(data['track'].split('/')[0])
else:
track_kwargs['track_no'] = int(data['track'])
if 'mtime' in data:
track_kwargs['last_modified'] = int(data['mtime'])
if 'artist' in data:
artist_kwargs['name'] = data['artist']
if 'albumartist' in data:
albumartist_kwargs['name'] = data['albumartist']
if 'composer' in data:
track_kwargs['composers'] = [Artist(name=data['composer'])]
if 'performer' in data:
track_kwargs['performers'] = [Artist(name=data['performer'])]
if 'album' in data:
album_kwargs['name'] = data['album']
if 'title' in data:
track_kwargs['name'] = data['title']
if 'genre' in data:
track_kwargs['genre'] = data['genre']
if 'date' in data:
track_kwargs['date'] = data['date']
if 'comment' in data:
track_kwargs['comment'] = data['comment']
if 'musicbrainz_trackid' in data:
track_kwargs['musicbrainz_id'] = data['musicbrainz_trackid']
if 'musicbrainz_albumid' in data:
album_kwargs['musicbrainz_id'] = data['musicbrainz_albumid']
if 'musicbrainz_artistid' in data:
artist_kwargs['musicbrainz_id'] = data['musicbrainz_artistid']
if 'musicbrainz_albumartistid' in data:
albumartist_kwargs['musicbrainz_id'] = (
data['musicbrainz_albumartistid'])
if artist_kwargs:
artist = Artist(**artist_kwargs)
track_kwargs['artists'] = [artist]
if albumartist_kwargs:
albumartist = Artist(**albumartist_kwargs)
album_kwargs['artists'] = [albumartist]
if album_kwargs:
album = Album(**album_kwargs)
track_kwargs['album'] = album
if data['file'][0] == '/':
path = data['file'][1:]
else:
path = data['file']
track_kwargs['uri'] = 'local:track:%s' % path
track_kwargs['length'] = int(data.get('time', 0)) * 1000
track = Track(**track_kwargs)
tracks.add(track)

View File

@ -1,14 +1,9 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import os
import re
import shlex import shlex
import urllib
from mopidy.frontends.mpd import protocol
from mopidy.frontends.mpd.exceptions import MpdArgError from mopidy.frontends.mpd.exceptions import MpdArgError
from mopidy.models import TlTrack from mopidy.models import TlTrack
from mopidy.utils.path import mtime as get_mtime, uri_to_path, split_path
# TODO: special handling of local:// uri scheme # TODO: special handling of local:// uri scheme
@ -87,27 +82,6 @@ def track_to_mpd_format(track, position=None):
return result return result
MPD_KEY_ORDER = '''
key file Time Artist Album AlbumArtist Title Track Genre Date Composer
Performer Comment Disc MUSICBRAINZ_ALBUMID MUSICBRAINZ_ALBUMARTISTID
MUSICBRAINZ_ARTISTID MUSICBRAINZ_TRACKID mtime
'''.split()
def order_mpd_track_info(result):
"""
Order results from
:func:`mopidy.frontends.mpd.translator.track_to_mpd_format` so that it
matches MPD's ordering. Simply a cosmetic fix for easier diffing of
tag_caches.
:param result: the track info
:type result: list of tuples
:rtype: list of tuples
"""
return sorted(result, key=lambda i: MPD_KEY_ORDER.index(i[0]))
def artists_to_mpd_format(artists): def artists_to_mpd_format(artists):
""" """
Format track artists for output to MPD client. Format track artists for output to MPD client.
@ -197,92 +171,3 @@ def query_from_mpd_list_format(field, mpd_query):
return query return query
else: else:
raise MpdArgError('not able to parse args', command='list') raise MpdArgError('not able to parse args', command='list')
# TODO: move to tagcache backend.
def tracks_to_tag_cache_format(tracks, media_dir):
"""
Format list of tracks for output to MPD tag cache
:param tracks: the tracks
:type tracks: list of :class:`mopidy.models.Track`
:param media_dir: the path to the music dir
:type media_dir: string
:rtype: list of lists of two-tuples
"""
result = [
('info_begin',),
('mpd_version', protocol.VERSION),
('fs_charset', protocol.ENCODING),
('info_end',)
]
tracks.sort(key=lambda t: t.uri)
dirs, files = tracks_to_directory_tree(tracks, media_dir)
_add_to_tag_cache(result, dirs, files, media_dir)
return result
# TODO: bytes only
def _add_to_tag_cache(result, dirs, files, media_dir):
base_path = media_dir.encode('utf-8')
for path, (entry_dirs, entry_files) in dirs.items():
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
name = os.path.split(text_path)[1]
result.append(('directory', text_path))
result.append(('mtime', get_mtime(os.path.join(base_path, path))))
result.append(('begin', name))
_add_to_tag_cache(result, entry_dirs, entry_files, media_dir)
result.append(('end', name))
result.append(('songList begin',))
for track in files:
track_result = dict(track_to_mpd_format(track))
# XXX Don't save comments to the tag cache as they may span multiple
# lines. We'll start saving track comments when we move from tag_cache
# to a JSON file. See #579 for details.
if 'Comment' in track_result:
del track_result['Comment']
path = uri_to_path(track_result['file'])
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
relative_path = os.path.relpath(path, base_path)
relative_uri = urllib.quote(relative_path)
# TODO: use track.last_modified
track_result['file'] = relative_uri
track_result['mtime'] = get_mtime(path)
track_result['key'] = os.path.basename(text_path)
track_result = order_mpd_track_info(track_result.items())
result.extend(track_result)
result.append(('songList end',))
def tracks_to_directory_tree(tracks, media_dir):
directories = ({}, [])
for track in tracks:
path = b''
current = directories
absolute_track_dir_path = os.path.dirname(uri_to_path(track.uri))
relative_track_dir_path = re.sub(
'^' + re.escape(media_dir), b'', absolute_track_dir_path)
for part in split_path(relative_track_dir_path):
path = os.path.join(path, part)
if path not in current[0]:
current[0][path] = ({}, [])
current = current[0][path]
current[1].append(track)
return directories

View File

@ -0,0 +1,346 @@
# encoding: utf-8
from __future__ import unicode_literals
import os
import unittest
from mopidy.utils.path import mtime, uri_to_path
from mopidy.frontends.mpd import translator as mpd, protocol
from mopidy.backends.local.tagcache import translator
from mopidy.models import Album, Artist, Track
from tests import path_to_data_dir
class TracksToTagCacheFormatTest(unittest.TestCase):
def setUp(self):
self.media_dir = '/dir/subdir'
mtime.set_fake_time(1234567)
def tearDown(self):
mtime.undo_fake()
def translate(self, track):
base_path = self.media_dir.encode('utf-8')
result = dict(mpd.track_to_mpd_format(track))
result['file'] = uri_to_path(result['file'])[len(base_path) + 1:]
result['key'] = os.path.basename(result['file'])
result['mtime'] = mtime('')
return translator.order_mpd_track_info(result.items())
def consume_headers(self, result):
self.assertEqual(('info_begin',), result[0])
self.assertEqual(('mpd_version', protocol.VERSION), result[1])
self.assertEqual(('fs_charset', protocol.ENCODING), result[2])
self.assertEqual(('info_end',), result[3])
return result[4:]
def consume_song_list(self, result):
self.assertEqual(('songList begin',), result[0])
for i, row in enumerate(result):
if row == ('songList end',):
return result[1:i], result[i + 1:]
self.fail("Couldn't find songList end in result")
def consume_directory(self, result):
self.assertEqual('directory', result[0][0])
self.assertEqual(('mtime', mtime('.')), result[1])
self.assertEqual(('begin', os.path.split(result[0][1])[1]), result[2])
directory = result[2][1]
for i, row in enumerate(result):
if row == ('end', directory):
return result[3:i], result[i + 1:]
self.fail("Couldn't find end %s in result" % directory)
def test_empty_tag_cache_has_header(self):
result = translator.tracks_to_tag_cache_format([], self.media_dir)
result = self.consume_headers(result)
def test_empty_tag_cache_has_song_list(self):
result = translator.tracks_to_tag_cache_format([], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
def test_tag_cache_has_header(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
def test_tag_cache_has_song_list(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assert_(song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track_with_key_and_mtime(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_supports_directories(self):
track = Track(uri='file:///dir/subdir/folder/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
song_list, result = self.consume_song_list(dir_data)
self.assertEqual(len(result), 0)
self.assertEqual(formated, song_list)
def test_tag_cache_diretory_header_is_right(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
self.assertEqual(('directory', 'folder/sub'), dir_data[0])
self.assertEqual(('mtime', mtime('.')), dir_data[1])
self.assertEqual(('begin', 'sub'), dir_data[2])
def test_tag_cache_suports_sub_directories(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
dir_data, result = self.consume_directory(dir_data)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(len(song_list), 0)
song_list, result = self.consume_song_list(dir_data)
self.assertEqual(len(result), 0)
self.assertEqual(formated, song_list)
def test_tag_cache_supports_multiple_tracks(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/song2.mp3'),
]
formated = []
formated.extend(self.translate(tracks[0]))
formated.extend(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks, self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_supports_multiple_tracks_in_dirs(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/folder/song2.mp3'),
]
formated = []
formated.append(self.translate(tracks[0]))
formated.append(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks, self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, song_result = self.consume_song_list(dir_data)
self.assertEqual(formated[1], song_list)
self.assertEqual(len(song_result), 0)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(formated[0], song_list)
class TracksToDirectoryTreeTest(unittest.TestCase):
def setUp(self):
self.media_dir = '/root'
def test_no_tracks_gives_emtpy_tree(self):
tree = translator.tracks_to_directory_tree([], self.media_dir)
self.assertEqual(tree, ({}, []))
def test_top_level_files(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/file2.mp3'),
Track(uri='file:///root/file3.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
self.assertEqual(tree, ({}, tracks))
def test_single_file_in_subdir(self):
tracks = [Track(uri='file:///root/dir/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = ({'dir': ({}, tracks)}, [])
self.assertEqual(tree, expected)
def test_single_file_in_sub_subdir(self):
tracks = [Track(uri='file:///root/dir1/dir2/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = ({'dir1': ({'dir1/dir2': ({}, tracks)}, [])}, [])
self.assertEqual(tree, expected)
def test_complex_file_structure(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/dir1/file2.mp3'),
Track(uri='file:///root/dir1/file3.mp3'),
Track(uri='file:///root/dir2/file4.mp3'),
Track(uri='file:///root/dir2/sub/file5.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = (
{
'dir1': ({}, [tracks[1], tracks[2]]),
'dir2': (
{
'dir2/sub': ({}, [tracks[4]])
},
[tracks[3]]
),
},
[tracks[0]]
)
self.assertEqual(tree, expected)
expected_artists = [Artist(name='name')]
expected_albums = [
Album(name='albumname', artists=expected_artists, num_tracks=2),
Album(name='albumname', num_tracks=2),
]
expected_tracks = []
def generate_track(path, ident, album_id):
uri = 'local:track:%s' % path
track = Track(
uri=uri, name='trackname', artists=expected_artists,
album=expected_albums[album_id], track_no=1, date='2006', length=4000,
last_modified=1272319626)
expected_tracks.append(track)
generate_track('song1.mp3', 6, 0)
generate_track('song2.mp3', 7, 0)
generate_track('song3.mp3', 8, 1)
generate_track('subdir1/song4.mp3', 2, 0)
generate_track('subdir1/song5.mp3', 3, 0)
generate_track('subdir2/song6.mp3', 4, 1)
generate_track('subdir2/song7.mp3', 5, 1)
generate_track('subdir1/subsubdir/song8.mp3', 0, 0)
generate_track('subdir1/subsubdir/song9.mp3', 1, 1)
class MPDTagCacheToTracksTest(unittest.TestCase):
def test_emtpy_cache(self):
tracks = translator.parse_mpd_tag_cache(
path_to_data_dir('empty_tag_cache'), path_to_data_dir(''))
self.assertEqual(set(), tracks)
def test_simple_cache(self):
tracks = translator.parse_mpd_tag_cache(
path_to_data_dir('simple_tag_cache'), path_to_data_dir(''))
track = Track(
uri='local:track:song1.mp3', name='trackname',
artists=expected_artists, track_no=1, album=expected_albums[0],
date='2006', length=4000, last_modified=1272319626)
self.assertEqual(set([track]), tracks)
def test_advanced_cache(self):
tracks = translator.parse_mpd_tag_cache(
path_to_data_dir('advanced_tag_cache'), path_to_data_dir(''))
self.assertEqual(set(expected_tracks), tracks)
def test_unicode_cache(self):
tracks = translator.parse_mpd_tag_cache(
path_to_data_dir('utf8_tag_cache'), path_to_data_dir(''))
artists = [Artist(name='æøå')]
album = Album(name='æøå', artists=artists)
track = Track(
uri='local:track:song1.mp3', name='æøå', artists=artists,
composers=artists, performers=artists, genre='æøå',
album=album, length=4000, last_modified=1272319626,
comment='æøå&^`ൂ㔶')
self.assertEqual(track, list(tracks)[0])
@unittest.SkipTest
def test_misencoded_cache(self):
# FIXME not sure if this can happen
pass
def test_cache_with_blank_track_info(self):
tracks = translator.parse_mpd_tag_cache(
path_to_data_dir('blank_tag_cache'), path_to_data_dir(''))
expected = Track(
uri='local:track:song1.mp3', length=4000, last_modified=1272319626)
self.assertEqual(set([expected]), tracks)
def test_musicbrainz_tagcache(self):
tracks = translator.parse_mpd_tag_cache(
path_to_data_dir('musicbrainz_tag_cache'), path_to_data_dir(''))
artist = list(expected_tracks[0].artists)[0].copy(
musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897')
albumartist = list(expected_tracks[0].artists)[0].copy(
name='albumartistname',
musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897')
album = expected_tracks[0].album.copy(
artists=[albumartist],
musicbrainz_id='cb5f1603-d314-4c9c-91e5-e295cfb125d2')
track = expected_tracks[0].copy(
artists=[artist], album=album,
musicbrainz_id='90488461-8c1f-4a4e-826b-4c6dc70801f0')
self.assertEqual(track, list(tracks)[0])
def test_albumartist_tag_cache(self):
tracks = translator.parse_mpd_tag_cache(
path_to_data_dir('albumartist_tag_cache'), path_to_data_dir(''))
artist = Artist(name='albumartistname')
album = expected_albums[0].copy(artists=[artist])
track = Track(
uri='local:track:song1.mp3', name='trackname',
artists=expected_artists, track_no=1, album=album, date='2006',
length=4000, last_modified=1272319626)
self.assertEqual(track, list(tracks)[0])

View File

@ -6,8 +6,7 @@ import os
import tempfile import tempfile
import unittest import unittest
from mopidy.backends.local.translator import parse_m3u, parse_mpd_tag_cache from mopidy.backends.local.translator import parse_m3u
from mopidy.models import Track, Artist, Album
from mopidy.utils.path import path_to_uri from mopidy.utils.path import path_to_uri
from tests import path_to_data_dir from tests import path_to_data_dir
@ -89,106 +88,3 @@ class M3UToUriTest(unittest.TestCase):
class URItoM3UTest(unittest.TestCase): class URItoM3UTest(unittest.TestCase):
pass pass
expected_artists = [Artist(name='name')]
expected_albums = [
Album(name='albumname', artists=expected_artists, num_tracks=2),
Album(name='albumname', num_tracks=2),
]
expected_tracks = []
def generate_track(path, ident, album_id):
uri = 'local:track:%s' % path
track = Track(
uri=uri, name='trackname', artists=expected_artists,
album=expected_albums[album_id], track_no=1, date='2006', length=4000,
last_modified=1272319626)
expected_tracks.append(track)
generate_track('song1.mp3', 6, 0)
generate_track('song2.mp3', 7, 0)
generate_track('song3.mp3', 8, 1)
generate_track('subdir1/song4.mp3', 2, 0)
generate_track('subdir1/song5.mp3', 3, 0)
generate_track('subdir2/song6.mp3', 4, 1)
generate_track('subdir2/song7.mp3', 5, 1)
generate_track('subdir1/subsubdir/song8.mp3', 0, 0)
generate_track('subdir1/subsubdir/song9.mp3', 1, 1)
class MPDTagCacheToTracksTest(unittest.TestCase):
def test_emtpy_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('empty_tag_cache'), path_to_data_dir(''))
self.assertEqual(set(), tracks)
def test_simple_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('simple_tag_cache'), path_to_data_dir(''))
track = Track(
uri='local:track:song1.mp3', name='trackname',
artists=expected_artists, track_no=1, album=expected_albums[0],
date='2006', length=4000, last_modified=1272319626)
self.assertEqual(set([track]), tracks)
def test_advanced_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('advanced_tag_cache'), path_to_data_dir(''))
self.assertEqual(set(expected_tracks), tracks)
def test_unicode_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('utf8_tag_cache'), path_to_data_dir(''))
artists = [Artist(name='æøå')]
album = Album(name='æøå', artists=artists)
track = Track(
uri='local:track:song1.mp3', name='æøå', artists=artists,
composers=artists, performers=artists, genre='æøå',
album=album, length=4000, last_modified=1272319626,
comment='æøå&^`ൂ㔶')
self.assertEqual(track, list(tracks)[0])
@unittest.SkipTest
def test_misencoded_cache(self):
# FIXME not sure if this can happen
pass
def test_cache_with_blank_track_info(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('blank_tag_cache'), path_to_data_dir(''))
expected = Track(
uri='local:track:song1.mp3', length=4000, last_modified=1272319626)
self.assertEqual(set([expected]), tracks)
def test_musicbrainz_tagcache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('musicbrainz_tag_cache'), path_to_data_dir(''))
artist = list(expected_tracks[0].artists)[0].copy(
musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897')
albumartist = list(expected_tracks[0].artists)[0].copy(
name='albumartistname',
musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897')
album = expected_tracks[0].album.copy(
artists=[albumartist],
musicbrainz_id='cb5f1603-d314-4c9c-91e5-e295cfb125d2')
track = expected_tracks[0].copy(
artists=[artist], album=album,
musicbrainz_id='90488461-8c1f-4a4e-826b-4c6dc70801f0')
self.assertEqual(track, list(tracks)[0])
def test_albumartist_tag_cache(self):
tracks = parse_mpd_tag_cache(
path_to_data_dir('albumartist_tag_cache'), path_to_data_dir(''))
artist = Artist(name='albumartistname')
album = expected_albums[0].copy(artists=[artist])
track = Track(
uri='local:track:song1.mp3', name='trackname',
artists=expected_artists, track_no=1, album=album, date='2006',
length=4000, last_modified=1272319626)
self.assertEqual(track, list(tracks)[0])

View File

@ -1,11 +1,10 @@
from __future__ import unicode_literals from __future__ import unicode_literals
import datetime import datetime
import os
import unittest import unittest
from mopidy.utils.path import mtime, uri_to_path from mopidy.utils.path import mtime
from mopidy.frontends.mpd import translator, protocol from mopidy.frontends.mpd import translator
from mopidy.models import Album, Artist, TlTrack, Playlist, Track from mopidy.models import Album, Artist, TlTrack, Playlist, Track
@ -126,233 +125,3 @@ class PlaylistMpdFormatTest(unittest.TestCase):
result = translator.playlist_to_mpd_format(playlist, 1, 2) result = translator.playlist_to_mpd_format(playlist, 1, 2)
self.assertEqual(len(result), 1) self.assertEqual(len(result), 1)
self.assertEqual(dict(result[0])['Track'], 2) self.assertEqual(dict(result[0])['Track'], 2)
class TracksToTagCacheFormatTest(unittest.TestCase):
def setUp(self):
self.media_dir = '/dir/subdir'
mtime.set_fake_time(1234567)
def tearDown(self):
mtime.undo_fake()
def translate(self, track):
base_path = self.media_dir.encode('utf-8')
result = dict(translator.track_to_mpd_format(track))
result['file'] = uri_to_path(result['file'])[len(base_path) + 1:]
result['key'] = os.path.basename(result['file'])
result['mtime'] = mtime('')
return translator.order_mpd_track_info(result.items())
def consume_headers(self, result):
self.assertEqual(('info_begin',), result[0])
self.assertEqual(('mpd_version', protocol.VERSION), result[1])
self.assertEqual(('fs_charset', protocol.ENCODING), result[2])
self.assertEqual(('info_end',), result[3])
return result[4:]
def consume_song_list(self, result):
self.assertEqual(('songList begin',), result[0])
for i, row in enumerate(result):
if row == ('songList end',):
return result[1:i], result[i + 1:]
self.fail("Couldn't find songList end in result")
def consume_directory(self, result):
self.assertEqual('directory', result[0][0])
self.assertEqual(('mtime', mtime('.')), result[1])
self.assertEqual(('begin', os.path.split(result[0][1])[1]), result[2])
directory = result[2][1]
for i, row in enumerate(result):
if row == ('end', directory):
return result[3:i], result[i + 1:]
self.fail("Couldn't find end %s in result" % directory)
def test_empty_tag_cache_has_header(self):
result = translator.tracks_to_tag_cache_format([], self.media_dir)
result = self.consume_headers(result)
def test_empty_tag_cache_has_song_list(self):
result = translator.tracks_to_tag_cache_format([], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
def test_tag_cache_has_header(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
def test_tag_cache_has_song_list(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assert_(song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track_with_key_and_mtime(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_supports_directories(self):
track = Track(uri='file:///dir/subdir/folder/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
song_list, result = self.consume_song_list(dir_data)
self.assertEqual(len(result), 0)
self.assertEqual(formated, song_list)
def test_tag_cache_diretory_header_is_right(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
self.assertEqual(('directory', 'folder/sub'), dir_data[0])
self.assertEqual(('mtime', mtime('.')), dir_data[1])
self.assertEqual(('begin', 'sub'), dir_data[2])
def test_tag_cache_suports_sub_directories(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track], self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
dir_data, result = self.consume_directory(dir_data)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(len(song_list), 0)
song_list, result = self.consume_song_list(dir_data)
self.assertEqual(len(result), 0)
self.assertEqual(formated, song_list)
def test_tag_cache_supports_multiple_tracks(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/song2.mp3'),
]
formated = []
formated.extend(self.translate(tracks[0]))
formated.extend(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks, self.media_dir)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(formated, song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_supports_multiple_tracks_in_dirs(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/folder/song2.mp3'),
]
formated = []
formated.append(self.translate(tracks[0]))
formated.append(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks, self.media_dir)
result = self.consume_headers(result)
dir_data, result = self.consume_directory(result)
song_list, song_result = self.consume_song_list(dir_data)
self.assertEqual(formated[1], song_list)
self.assertEqual(len(song_result), 0)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(formated[0], song_list)
class TracksToDirectoryTreeTest(unittest.TestCase):
def setUp(self):
self.media_dir = '/root'
def test_no_tracks_gives_emtpy_tree(self):
tree = translator.tracks_to_directory_tree([], self.media_dir)
self.assertEqual(tree, ({}, []))
def test_top_level_files(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/file2.mp3'),
Track(uri='file:///root/file3.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
self.assertEqual(tree, ({}, tracks))
def test_single_file_in_subdir(self):
tracks = [Track(uri='file:///root/dir/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = ({'dir': ({}, tracks)}, [])
self.assertEqual(tree, expected)
def test_single_file_in_sub_subdir(self):
tracks = [Track(uri='file:///root/dir1/dir2/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = ({'dir1': ({'dir1/dir2': ({}, tracks)}, [])}, [])
self.assertEqual(tree, expected)
def test_complex_file_structure(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/dir1/file2.mp3'),
Track(uri='file:///root/dir1/file3.mp3'),
Track(uri='file:///root/dir2/file4.mp3'),
Track(uri='file:///root/dir2/sub/file5.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks, self.media_dir)
expected = (
{
'dir1': ({}, [tracks[1], tracks[2]]),
'dir2': (
{
'dir2/sub': ({}, [tracks[4]])
},
[tracks[3]]
),
},
[tracks[0]]
)
self.assertEqual(tree, expected)