From ff9f473c2f87bc2f1dfbaafc1d13dc085a1ac589 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Tue, 26 Nov 2013 17:47:52 +0100 Subject: [PATCH] local: Move tag cache translators and tests out. --- mopidy/backends/local/tagcache/library.py | 6 +- mopidy/backends/local/tagcache/translator.py | 246 +++++++++++++ mopidy/backends/local/translator.py | 125 ------- mopidy/frontends/mpd/translator.py | 115 ------ tests/backends/local/tagcache_test.py | 346 +++++++++++++++++++ tests/backends/local/translator_test.py | 106 +----- tests/frontends/mpd/translator_test.py | 235 +------------ 7 files changed, 598 insertions(+), 581 deletions(-) create mode 100644 mopidy/backends/local/tagcache/translator.py create mode 100644 tests/backends/local/tagcache_test.py diff --git a/mopidy/backends/local/tagcache/library.py b/mopidy/backends/local/tagcache/library.py index b6ec05ff..6efe6bf5 100644 --- a/mopidy/backends/local/tagcache/library.py +++ b/mopidy/backends/local/tagcache/library.py @@ -5,10 +5,10 @@ import os import tempfile from mopidy.backends import base -from mopidy.frontends.mpd import translator as mpd_translator +from mopidy.backends.local.translator import local_to_file_uri from mopidy.models import Album, SearchResult -from ..translator import local_to_file_uri, parse_mpd_tag_cache +from .translator import parse_mpd_tag_cache, tracks_to_tag_cache_format logger = logging.getLogger('mopidy.backends.local.tagcache') @@ -251,7 +251,7 @@ class LocalLibraryUpdateProvider(base.BaseLibraryProvider): prefix=basename + '.', dir=directory, delete=False) try: - for row in mpd_translator.tracks_to_tag_cache_format( + for row in tracks_to_tag_cache_format( self._tracks.values(), self._media_dir): if len(row) == 1: tmp.write(('%s\n' % row).encode('utf-8')) diff --git a/mopidy/backends/local/tagcache/translator.py b/mopidy/backends/local/tagcache/translator.py new file mode 100644 index 00000000..be54cd1d --- /dev/null +++ b/mopidy/backends/local/tagcache/translator.py @@ -0,0 +1,246 @@ +from __future__ import unicode_literals + +import logging +import os +import re +import urllib + +from mopidy.frontends.mpd import translator as mpd, protocol +from mopidy.models import Track, Artist, Album +from mopidy.utils.encoding import locale_decode +from mopidy.utils.path import mtime as get_mtime, split_path, uri_to_path + +logger = logging.getLogger('mopidy.backends.local.tagcache') + + +# TODO: remove music_dir from API +def parse_mpd_tag_cache(tag_cache, music_dir=''): + """ + Converts a MPD tag_cache into a lists of tracks, artists and albums. + """ + tracks = set() + + try: + with open(tag_cache) as library: + contents = library.read() + except IOError as error: + logger.warning('Could not open tag cache: %s', locale_decode(error)) + return tracks + + current = {} + state = None + + # TODO: uris as bytes + for line in contents.split(b'\n'): + if line == b'songList begin': + state = 'songs' + continue + elif line == b'songList end': + state = None + continue + elif not state: + continue + + key, value = line.split(b': ', 1) + + if key == b'key': + _convert_mpd_data(current, tracks) + current.clear() + + current[key.lower()] = value.decode('utf-8') + + _convert_mpd_data(current, tracks) + + return tracks + + +def _convert_mpd_data(data, tracks): + if not data: + return + + track_kwargs = {} + album_kwargs = {} + artist_kwargs = {} + albumartist_kwargs = {} + + if 'track' in data: + if '/' in data['track']: + album_kwargs['num_tracks'] = int(data['track'].split('/')[1]) + track_kwargs['track_no'] = int(data['track'].split('/')[0]) + else: + track_kwargs['track_no'] = int(data['track']) + + if 'mtime' in data: + track_kwargs['last_modified'] = int(data['mtime']) + + if 'artist' in data: + artist_kwargs['name'] = data['artist'] + + if 'albumartist' in data: + albumartist_kwargs['name'] = data['albumartist'] + + if 'composer' in data: + track_kwargs['composers'] = [Artist(name=data['composer'])] + + if 'performer' in data: + track_kwargs['performers'] = [Artist(name=data['performer'])] + + if 'album' in data: + album_kwargs['name'] = data['album'] + + if 'title' in data: + track_kwargs['name'] = data['title'] + + if 'genre' in data: + track_kwargs['genre'] = data['genre'] + + if 'date' in data: + track_kwargs['date'] = data['date'] + + if 'comment' in data: + track_kwargs['comment'] = data['comment'] + + if 'musicbrainz_trackid' in data: + track_kwargs['musicbrainz_id'] = data['musicbrainz_trackid'] + + if 'musicbrainz_albumid' in data: + album_kwargs['musicbrainz_id'] = data['musicbrainz_albumid'] + + if 'musicbrainz_artistid' in data: + artist_kwargs['musicbrainz_id'] = data['musicbrainz_artistid'] + + if 'musicbrainz_albumartistid' in data: + albumartist_kwargs['musicbrainz_id'] = ( + data['musicbrainz_albumartistid']) + + if artist_kwargs: + artist = Artist(**artist_kwargs) + track_kwargs['artists'] = [artist] + + if albumartist_kwargs: + albumartist = Artist(**albumartist_kwargs) + album_kwargs['artists'] = [albumartist] + + if album_kwargs: + album = Album(**album_kwargs) + track_kwargs['album'] = album + + if data['file'][0] == '/': + path = data['file'][1:] + else: + path = data['file'] + + track_kwargs['uri'] = 'local:track:%s' % path + track_kwargs['length'] = int(data.get('time', 0)) * 1000 + + track = Track(**track_kwargs) + tracks.add(track) + + +def tracks_to_tag_cache_format(tracks, media_dir): + """ + Format list of tracks for output to MPD tag cache + + :param tracks: the tracks + :type tracks: list of :class:`mopidy.models.Track` + :param media_dir: the path to the music dir + :type media_dir: string + :rtype: list of lists of two-tuples + """ + result = [ + ('info_begin',), + ('mpd_version', protocol.VERSION), + ('fs_charset', protocol.ENCODING), + ('info_end',) + ] + tracks.sort(key=lambda t: t.uri) + dirs, files = tracks_to_directory_tree(tracks, media_dir) + _add_to_tag_cache(result, dirs, files, media_dir) + return result + + +# TODO: bytes only +def _add_to_tag_cache(result, dirs, files, media_dir): + base_path = media_dir.encode('utf-8') + + for path, (entry_dirs, entry_files) in dirs.items(): + try: + text_path = path.decode('utf-8') + except UnicodeDecodeError: + text_path = urllib.quote(path).decode('utf-8') + name = os.path.split(text_path)[1] + result.append(('directory', text_path)) + result.append(('mtime', get_mtime(os.path.join(base_path, path)))) + result.append(('begin', name)) + _add_to_tag_cache(result, entry_dirs, entry_files, media_dir) + result.append(('end', name)) + + result.append(('songList begin',)) + + for track in files: + track_result = dict(mpd.track_to_mpd_format(track)) + + # XXX Don't save comments to the tag cache as they may span multiple + # lines. We'll start saving track comments when we move from tag_cache + # to a JSON file. See #579 for details. + if 'Comment' in track_result: + del track_result['Comment'] + + path = uri_to_path(track_result['file']) + try: + text_path = path.decode('utf-8') + except UnicodeDecodeError: + text_path = urllib.quote(path).decode('utf-8') + relative_path = os.path.relpath(path, base_path) + relative_uri = urllib.quote(relative_path) + + # TODO: use track.last_modified + track_result['file'] = relative_uri + track_result['mtime'] = get_mtime(path) + track_result['key'] = os.path.basename(text_path) + track_result = order_mpd_track_info(track_result.items()) + + result.extend(track_result) + + result.append(('songList end',)) + + +def tracks_to_directory_tree(tracks, media_dir): + directories = ({}, []) + + for track in tracks: + path = b'' + current = directories + + absolute_track_dir_path = os.path.dirname(uri_to_path(track.uri)) + relative_track_dir_path = re.sub( + '^' + re.escape(media_dir), b'', absolute_track_dir_path) + + for part in split_path(relative_track_dir_path): + path = os.path.join(path, part) + if path not in current[0]: + current[0][path] = ({}, []) + current = current[0][path] + current[1].append(track) + return directories + + +MPD_KEY_ORDER = ''' + key file Time Artist Album AlbumArtist Title Track Genre Date Composer + Performer Comment Disc MUSICBRAINZ_ALBUMID MUSICBRAINZ_ALBUMARTISTID + MUSICBRAINZ_ARTISTID MUSICBRAINZ_TRACKID mtime +'''.split() + + +def order_mpd_track_info(result): + """ + Order results from + :func:`mopidy.frontends.mpd.translator.track_to_mpd_format` so that it + matches MPD's ordering. Simply a cosmetic fix for easier diffing of + tag_caches. + + :param result: the track info + :type result: list of tuples + :rtype: list of tuples + """ + return sorted(result, key=lambda i: MPD_KEY_ORDER.index(i[0])) diff --git a/mopidy/backends/local/translator.py b/mopidy/backends/local/translator.py index b9aad3e0..dc266d1c 100644 --- a/mopidy/backends/local/translator.py +++ b/mopidy/backends/local/translator.py @@ -4,7 +4,6 @@ import logging import os import urlparse -from mopidy.models import Track, Artist, Album from mopidy.utils.encoding import locale_decode from mopidy.utils.path import path_to_uri, uri_to_path @@ -63,127 +62,3 @@ def parse_m3u(file_path, media_dir): uris.append(path) return uris - - -# TODO: remove music_dir from API -def parse_mpd_tag_cache(tag_cache, music_dir=''): - """ - Converts a MPD tag_cache into a lists of tracks, artists and albums. - """ - tracks = set() - - try: - with open(tag_cache) as library: - contents = library.read() - except IOError as error: - logger.warning('Could not open tag cache: %s', locale_decode(error)) - return tracks - - current = {} - state = None - - # TODO: uris as bytes - for line in contents.split(b'\n'): - if line == b'songList begin': - state = 'songs' - continue - elif line == b'songList end': - state = None - continue - elif not state: - continue - - key, value = line.split(b': ', 1) - - if key == b'key': - _convert_mpd_data(current, tracks) - current.clear() - - current[key.lower()] = value.decode('utf-8') - - _convert_mpd_data(current, tracks) - - return tracks - - -def _convert_mpd_data(data, tracks): - if not data: - return - - track_kwargs = {} - album_kwargs = {} - artist_kwargs = {} - albumartist_kwargs = {} - - if 'track' in data: - if '/' in data['track']: - album_kwargs['num_tracks'] = int(data['track'].split('/')[1]) - track_kwargs['track_no'] = int(data['track'].split('/')[0]) - else: - track_kwargs['track_no'] = int(data['track']) - - if 'mtime' in data: - track_kwargs['last_modified'] = int(data['mtime']) - - if 'artist' in data: - artist_kwargs['name'] = data['artist'] - - if 'albumartist' in data: - albumartist_kwargs['name'] = data['albumartist'] - - if 'composer' in data: - track_kwargs['composers'] = [Artist(name=data['composer'])] - - if 'performer' in data: - track_kwargs['performers'] = [Artist(name=data['performer'])] - - if 'album' in data: - album_kwargs['name'] = data['album'] - - if 'title' in data: - track_kwargs['name'] = data['title'] - - if 'genre' in data: - track_kwargs['genre'] = data['genre'] - - if 'date' in data: - track_kwargs['date'] = data['date'] - - if 'comment' in data: - track_kwargs['comment'] = data['comment'] - - if 'musicbrainz_trackid' in data: - track_kwargs['musicbrainz_id'] = data['musicbrainz_trackid'] - - if 'musicbrainz_albumid' in data: - album_kwargs['musicbrainz_id'] = data['musicbrainz_albumid'] - - if 'musicbrainz_artistid' in data: - artist_kwargs['musicbrainz_id'] = data['musicbrainz_artistid'] - - if 'musicbrainz_albumartistid' in data: - albumartist_kwargs['musicbrainz_id'] = ( - data['musicbrainz_albumartistid']) - - if artist_kwargs: - artist = Artist(**artist_kwargs) - track_kwargs['artists'] = [artist] - - if albumartist_kwargs: - albumartist = Artist(**albumartist_kwargs) - album_kwargs['artists'] = [albumartist] - - if album_kwargs: - album = Album(**album_kwargs) - track_kwargs['album'] = album - - if data['file'][0] == '/': - path = data['file'][1:] - else: - path = data['file'] - - track_kwargs['uri'] = 'local:track:%s' % path - track_kwargs['length'] = int(data.get('time', 0)) * 1000 - - track = Track(**track_kwargs) - tracks.add(track) diff --git a/mopidy/frontends/mpd/translator.py b/mopidy/frontends/mpd/translator.py index 4f38effa..671bfae7 100644 --- a/mopidy/frontends/mpd/translator.py +++ b/mopidy/frontends/mpd/translator.py @@ -1,14 +1,9 @@ from __future__ import unicode_literals -import os -import re import shlex -import urllib -from mopidy.frontends.mpd import protocol from mopidy.frontends.mpd.exceptions import MpdArgError from mopidy.models import TlTrack -from mopidy.utils.path import mtime as get_mtime, uri_to_path, split_path # TODO: special handling of local:// uri scheme @@ -87,27 +82,6 @@ def track_to_mpd_format(track, position=None): return result -MPD_KEY_ORDER = ''' - key file Time Artist Album AlbumArtist Title Track Genre Date Composer - Performer Comment Disc MUSICBRAINZ_ALBUMID MUSICBRAINZ_ALBUMARTISTID - MUSICBRAINZ_ARTISTID MUSICBRAINZ_TRACKID mtime -'''.split() - - -def order_mpd_track_info(result): - """ - Order results from - :func:`mopidy.frontends.mpd.translator.track_to_mpd_format` so that it - matches MPD's ordering. Simply a cosmetic fix for easier diffing of - tag_caches. - - :param result: the track info - :type result: list of tuples - :rtype: list of tuples - """ - return sorted(result, key=lambda i: MPD_KEY_ORDER.index(i[0])) - - def artists_to_mpd_format(artists): """ Format track artists for output to MPD client. @@ -197,92 +171,3 @@ def query_from_mpd_list_format(field, mpd_query): return query else: raise MpdArgError('not able to parse args', command='list') - - -# TODO: move to tagcache backend. -def tracks_to_tag_cache_format(tracks, media_dir): - """ - Format list of tracks for output to MPD tag cache - - :param tracks: the tracks - :type tracks: list of :class:`mopidy.models.Track` - :param media_dir: the path to the music dir - :type media_dir: string - :rtype: list of lists of two-tuples - """ - result = [ - ('info_begin',), - ('mpd_version', protocol.VERSION), - ('fs_charset', protocol.ENCODING), - ('info_end',) - ] - tracks.sort(key=lambda t: t.uri) - dirs, files = tracks_to_directory_tree(tracks, media_dir) - _add_to_tag_cache(result, dirs, files, media_dir) - return result - - -# TODO: bytes only -def _add_to_tag_cache(result, dirs, files, media_dir): - base_path = media_dir.encode('utf-8') - - for path, (entry_dirs, entry_files) in dirs.items(): - try: - text_path = path.decode('utf-8') - except UnicodeDecodeError: - text_path = urllib.quote(path).decode('utf-8') - name = os.path.split(text_path)[1] - result.append(('directory', text_path)) - result.append(('mtime', get_mtime(os.path.join(base_path, path)))) - result.append(('begin', name)) - _add_to_tag_cache(result, entry_dirs, entry_files, media_dir) - result.append(('end', name)) - - result.append(('songList begin',)) - - for track in files: - track_result = dict(track_to_mpd_format(track)) - - # XXX Don't save comments to the tag cache as they may span multiple - # lines. We'll start saving track comments when we move from tag_cache - # to a JSON file. See #579 for details. - if 'Comment' in track_result: - del track_result['Comment'] - - path = uri_to_path(track_result['file']) - try: - text_path = path.decode('utf-8') - except UnicodeDecodeError: - text_path = urllib.quote(path).decode('utf-8') - relative_path = os.path.relpath(path, base_path) - relative_uri = urllib.quote(relative_path) - - # TODO: use track.last_modified - track_result['file'] = relative_uri - track_result['mtime'] = get_mtime(path) - track_result['key'] = os.path.basename(text_path) - track_result = order_mpd_track_info(track_result.items()) - - result.extend(track_result) - - result.append(('songList end',)) - - -def tracks_to_directory_tree(tracks, media_dir): - directories = ({}, []) - - for track in tracks: - path = b'' - current = directories - - absolute_track_dir_path = os.path.dirname(uri_to_path(track.uri)) - relative_track_dir_path = re.sub( - '^' + re.escape(media_dir), b'', absolute_track_dir_path) - - for part in split_path(relative_track_dir_path): - path = os.path.join(path, part) - if path not in current[0]: - current[0][path] = ({}, []) - current = current[0][path] - current[1].append(track) - return directories diff --git a/tests/backends/local/tagcache_test.py b/tests/backends/local/tagcache_test.py new file mode 100644 index 00000000..6d0b7469 --- /dev/null +++ b/tests/backends/local/tagcache_test.py @@ -0,0 +1,346 @@ +# encoding: utf-8 + +from __future__ import unicode_literals + +import os +import unittest + +from mopidy.utils.path import mtime, uri_to_path +from mopidy.frontends.mpd import translator as mpd, protocol +from mopidy.backends.local.tagcache import translator +from mopidy.models import Album, Artist, Track + +from tests import path_to_data_dir + + +class TracksToTagCacheFormatTest(unittest.TestCase): + def setUp(self): + self.media_dir = '/dir/subdir' + mtime.set_fake_time(1234567) + + def tearDown(self): + mtime.undo_fake() + + def translate(self, track): + base_path = self.media_dir.encode('utf-8') + result = dict(mpd.track_to_mpd_format(track)) + result['file'] = uri_to_path(result['file'])[len(base_path) + 1:] + result['key'] = os.path.basename(result['file']) + result['mtime'] = mtime('') + return translator.order_mpd_track_info(result.items()) + + def consume_headers(self, result): + self.assertEqual(('info_begin',), result[0]) + self.assertEqual(('mpd_version', protocol.VERSION), result[1]) + self.assertEqual(('fs_charset', protocol.ENCODING), result[2]) + self.assertEqual(('info_end',), result[3]) + return result[4:] + + def consume_song_list(self, result): + self.assertEqual(('songList begin',), result[0]) + for i, row in enumerate(result): + if row == ('songList end',): + return result[1:i], result[i + 1:] + self.fail("Couldn't find songList end in result") + + def consume_directory(self, result): + self.assertEqual('directory', result[0][0]) + self.assertEqual(('mtime', mtime('.')), result[1]) + self.assertEqual(('begin', os.path.split(result[0][1])[1]), result[2]) + directory = result[2][1] + for i, row in enumerate(result): + if row == ('end', directory): + return result[3:i], result[i + 1:] + self.fail("Couldn't find end %s in result" % directory) + + def test_empty_tag_cache_has_header(self): + result = translator.tracks_to_tag_cache_format([], self.media_dir) + result = self.consume_headers(result) + + def test_empty_tag_cache_has_song_list(self): + result = translator.tracks_to_tag_cache_format([], self.media_dir) + result = self.consume_headers(result) + song_list, result = self.consume_song_list(result) + + self.assertEqual(len(song_list), 0) + self.assertEqual(len(result), 0) + + def test_tag_cache_has_header(self): + track = Track(uri='file:///dir/subdir/song.mp3') + result = translator.tracks_to_tag_cache_format([track], self.media_dir) + result = self.consume_headers(result) + + def test_tag_cache_has_song_list(self): + track = Track(uri='file:///dir/subdir/song.mp3') + result = translator.tracks_to_tag_cache_format([track], self.media_dir) + result = self.consume_headers(result) + song_list, result = self.consume_song_list(result) + + self.assert_(song_list) + self.assertEqual(len(result), 0) + + def test_tag_cache_has_formated_track(self): + track = Track(uri='file:///dir/subdir/song.mp3') + formated = self.translate(track) + result = translator.tracks_to_tag_cache_format([track], self.media_dir) + + result = self.consume_headers(result) + song_list, result = self.consume_song_list(result) + + self.assertEqual(formated, song_list) + self.assertEqual(len(result), 0) + + def test_tag_cache_has_formated_track_with_key_and_mtime(self): + track = Track(uri='file:///dir/subdir/song.mp3') + formated = self.translate(track) + result = translator.tracks_to_tag_cache_format([track], self.media_dir) + + result = self.consume_headers(result) + song_list, result = self.consume_song_list(result) + + self.assertEqual(formated, song_list) + self.assertEqual(len(result), 0) + + def test_tag_cache_supports_directories(self): + track = Track(uri='file:///dir/subdir/folder/song.mp3') + formated = self.translate(track) + result = translator.tracks_to_tag_cache_format([track], self.media_dir) + + result = self.consume_headers(result) + dir_data, result = self.consume_directory(result) + song_list, result = self.consume_song_list(result) + self.assertEqual(len(song_list), 0) + self.assertEqual(len(result), 0) + + song_list, result = self.consume_song_list(dir_data) + self.assertEqual(len(result), 0) + self.assertEqual(formated, song_list) + + def test_tag_cache_diretory_header_is_right(self): + track = Track(uri='file:///dir/subdir/folder/sub/song.mp3') + result = translator.tracks_to_tag_cache_format([track], self.media_dir) + + result = self.consume_headers(result) + dir_data, result = self.consume_directory(result) + + self.assertEqual(('directory', 'folder/sub'), dir_data[0]) + self.assertEqual(('mtime', mtime('.')), dir_data[1]) + self.assertEqual(('begin', 'sub'), dir_data[2]) + + def test_tag_cache_suports_sub_directories(self): + track = Track(uri='file:///dir/subdir/folder/sub/song.mp3') + formated = self.translate(track) + result = translator.tracks_to_tag_cache_format([track], self.media_dir) + + result = self.consume_headers(result) + + dir_data, result = self.consume_directory(result) + song_list, result = self.consume_song_list(result) + self.assertEqual(len(song_list), 0) + self.assertEqual(len(result), 0) + + dir_data, result = self.consume_directory(dir_data) + song_list, result = self.consume_song_list(result) + self.assertEqual(len(result), 0) + self.assertEqual(len(song_list), 0) + + song_list, result = self.consume_song_list(dir_data) + self.assertEqual(len(result), 0) + self.assertEqual(formated, song_list) + + def test_tag_cache_supports_multiple_tracks(self): + tracks = [ + Track(uri='file:///dir/subdir/song1.mp3'), + Track(uri='file:///dir/subdir/song2.mp3'), + ] + + formated = [] + formated.extend(self.translate(tracks[0])) + formated.extend(self.translate(tracks[1])) + + result = translator.tracks_to_tag_cache_format(tracks, self.media_dir) + + result = self.consume_headers(result) + song_list, result = self.consume_song_list(result) + + self.assertEqual(formated, song_list) + self.assertEqual(len(result), 0) + + def test_tag_cache_supports_multiple_tracks_in_dirs(self): + tracks = [ + Track(uri='file:///dir/subdir/song1.mp3'), + Track(uri='file:///dir/subdir/folder/song2.mp3'), + ] + + formated = [] + formated.append(self.translate(tracks[0])) + formated.append(self.translate(tracks[1])) + + result = translator.tracks_to_tag_cache_format(tracks, self.media_dir) + + result = self.consume_headers(result) + dir_data, result = self.consume_directory(result) + song_list, song_result = self.consume_song_list(dir_data) + + self.assertEqual(formated[1], song_list) + self.assertEqual(len(song_result), 0) + + song_list, result = self.consume_song_list(result) + self.assertEqual(len(result), 0) + self.assertEqual(formated[0], song_list) + + +class TracksToDirectoryTreeTest(unittest.TestCase): + def setUp(self): + self.media_dir = '/root' + + def test_no_tracks_gives_emtpy_tree(self): + tree = translator.tracks_to_directory_tree([], self.media_dir) + self.assertEqual(tree, ({}, [])) + + def test_top_level_files(self): + tracks = [ + Track(uri='file:///root/file1.mp3'), + Track(uri='file:///root/file2.mp3'), + Track(uri='file:///root/file3.mp3'), + ] + tree = translator.tracks_to_directory_tree(tracks, self.media_dir) + self.assertEqual(tree, ({}, tracks)) + + def test_single_file_in_subdir(self): + tracks = [Track(uri='file:///root/dir/file1.mp3')] + tree = translator.tracks_to_directory_tree(tracks, self.media_dir) + expected = ({'dir': ({}, tracks)}, []) + self.assertEqual(tree, expected) + + def test_single_file_in_sub_subdir(self): + tracks = [Track(uri='file:///root/dir1/dir2/file1.mp3')] + tree = translator.tracks_to_directory_tree(tracks, self.media_dir) + expected = ({'dir1': ({'dir1/dir2': ({}, tracks)}, [])}, []) + self.assertEqual(tree, expected) + + def test_complex_file_structure(self): + tracks = [ + Track(uri='file:///root/file1.mp3'), + Track(uri='file:///root/dir1/file2.mp3'), + Track(uri='file:///root/dir1/file3.mp3'), + Track(uri='file:///root/dir2/file4.mp3'), + Track(uri='file:///root/dir2/sub/file5.mp3'), + ] + tree = translator.tracks_to_directory_tree(tracks, self.media_dir) + expected = ( + { + 'dir1': ({}, [tracks[1], tracks[2]]), + 'dir2': ( + { + 'dir2/sub': ({}, [tracks[4]]) + }, + [tracks[3]] + ), + }, + [tracks[0]] + ) + self.assertEqual(tree, expected) + + +expected_artists = [Artist(name='name')] +expected_albums = [ + Album(name='albumname', artists=expected_artists, num_tracks=2), + Album(name='albumname', num_tracks=2), +] +expected_tracks = [] + + +def generate_track(path, ident, album_id): + uri = 'local:track:%s' % path + track = Track( + uri=uri, name='trackname', artists=expected_artists, + album=expected_albums[album_id], track_no=1, date='2006', length=4000, + last_modified=1272319626) + expected_tracks.append(track) + + +generate_track('song1.mp3', 6, 0) +generate_track('song2.mp3', 7, 0) +generate_track('song3.mp3', 8, 1) +generate_track('subdir1/song4.mp3', 2, 0) +generate_track('subdir1/song5.mp3', 3, 0) +generate_track('subdir2/song6.mp3', 4, 1) +generate_track('subdir2/song7.mp3', 5, 1) +generate_track('subdir1/subsubdir/song8.mp3', 0, 0) +generate_track('subdir1/subsubdir/song9.mp3', 1, 1) + + +class MPDTagCacheToTracksTest(unittest.TestCase): + def test_emtpy_cache(self): + tracks = translator.parse_mpd_tag_cache( + path_to_data_dir('empty_tag_cache'), path_to_data_dir('')) + self.assertEqual(set(), tracks) + + def test_simple_cache(self): + tracks = translator.parse_mpd_tag_cache( + path_to_data_dir('simple_tag_cache'), path_to_data_dir('')) + track = Track( + uri='local:track:song1.mp3', name='trackname', + artists=expected_artists, track_no=1, album=expected_albums[0], + date='2006', length=4000, last_modified=1272319626) + self.assertEqual(set([track]), tracks) + + def test_advanced_cache(self): + tracks = translator.parse_mpd_tag_cache( + path_to_data_dir('advanced_tag_cache'), path_to_data_dir('')) + self.assertEqual(set(expected_tracks), tracks) + + def test_unicode_cache(self): + tracks = translator.parse_mpd_tag_cache( + path_to_data_dir('utf8_tag_cache'), path_to_data_dir('')) + + artists = [Artist(name='æøå')] + album = Album(name='æøå', artists=artists) + track = Track( + uri='local:track:song1.mp3', name='æøå', artists=artists, + composers=artists, performers=artists, genre='æøå', + album=album, length=4000, last_modified=1272319626, + comment='æøå&^`ൂ㔶') + + self.assertEqual(track, list(tracks)[0]) + + @unittest.SkipTest + def test_misencoded_cache(self): + # FIXME not sure if this can happen + pass + + def test_cache_with_blank_track_info(self): + tracks = translator.parse_mpd_tag_cache( + path_to_data_dir('blank_tag_cache'), path_to_data_dir('')) + expected = Track( + uri='local:track:song1.mp3', length=4000, last_modified=1272319626) + self.assertEqual(set([expected]), tracks) + + def test_musicbrainz_tagcache(self): + tracks = translator.parse_mpd_tag_cache( + path_to_data_dir('musicbrainz_tag_cache'), path_to_data_dir('')) + artist = list(expected_tracks[0].artists)[0].copy( + musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897') + albumartist = list(expected_tracks[0].artists)[0].copy( + name='albumartistname', + musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897') + album = expected_tracks[0].album.copy( + artists=[albumartist], + musicbrainz_id='cb5f1603-d314-4c9c-91e5-e295cfb125d2') + track = expected_tracks[0].copy( + artists=[artist], album=album, + musicbrainz_id='90488461-8c1f-4a4e-826b-4c6dc70801f0') + + self.assertEqual(track, list(tracks)[0]) + + def test_albumartist_tag_cache(self): + tracks = translator.parse_mpd_tag_cache( + path_to_data_dir('albumartist_tag_cache'), path_to_data_dir('')) + artist = Artist(name='albumartistname') + album = expected_albums[0].copy(artists=[artist]) + track = Track( + uri='local:track:song1.mp3', name='trackname', + artists=expected_artists, track_no=1, album=album, date='2006', + length=4000, last_modified=1272319626) + self.assertEqual(track, list(tracks)[0]) diff --git a/tests/backends/local/translator_test.py b/tests/backends/local/translator_test.py index 5623c787..e5747f68 100644 --- a/tests/backends/local/translator_test.py +++ b/tests/backends/local/translator_test.py @@ -6,8 +6,7 @@ import os import tempfile import unittest -from mopidy.backends.local.translator import parse_m3u, parse_mpd_tag_cache -from mopidy.models import Track, Artist, Album +from mopidy.backends.local.translator import parse_m3u from mopidy.utils.path import path_to_uri from tests import path_to_data_dir @@ -89,106 +88,3 @@ class M3UToUriTest(unittest.TestCase): class URItoM3UTest(unittest.TestCase): pass - - -expected_artists = [Artist(name='name')] -expected_albums = [ - Album(name='albumname', artists=expected_artists, num_tracks=2), - Album(name='albumname', num_tracks=2), -] -expected_tracks = [] - - -def generate_track(path, ident, album_id): - uri = 'local:track:%s' % path - track = Track( - uri=uri, name='trackname', artists=expected_artists, - album=expected_albums[album_id], track_no=1, date='2006', length=4000, - last_modified=1272319626) - expected_tracks.append(track) - - -generate_track('song1.mp3', 6, 0) -generate_track('song2.mp3', 7, 0) -generate_track('song3.mp3', 8, 1) -generate_track('subdir1/song4.mp3', 2, 0) -generate_track('subdir1/song5.mp3', 3, 0) -generate_track('subdir2/song6.mp3', 4, 1) -generate_track('subdir2/song7.mp3', 5, 1) -generate_track('subdir1/subsubdir/song8.mp3', 0, 0) -generate_track('subdir1/subsubdir/song9.mp3', 1, 1) - - -class MPDTagCacheToTracksTest(unittest.TestCase): - def test_emtpy_cache(self): - tracks = parse_mpd_tag_cache( - path_to_data_dir('empty_tag_cache'), path_to_data_dir('')) - self.assertEqual(set(), tracks) - - def test_simple_cache(self): - tracks = parse_mpd_tag_cache( - path_to_data_dir('simple_tag_cache'), path_to_data_dir('')) - track = Track( - uri='local:track:song1.mp3', name='trackname', - artists=expected_artists, track_no=1, album=expected_albums[0], - date='2006', length=4000, last_modified=1272319626) - self.assertEqual(set([track]), tracks) - - def test_advanced_cache(self): - tracks = parse_mpd_tag_cache( - path_to_data_dir('advanced_tag_cache'), path_to_data_dir('')) - self.assertEqual(set(expected_tracks), tracks) - - def test_unicode_cache(self): - tracks = parse_mpd_tag_cache( - path_to_data_dir('utf8_tag_cache'), path_to_data_dir('')) - - artists = [Artist(name='æøå')] - album = Album(name='æøå', artists=artists) - track = Track( - uri='local:track:song1.mp3', name='æøå', artists=artists, - composers=artists, performers=artists, genre='æøå', - album=album, length=4000, last_modified=1272319626, - comment='æøå&^`ൂ㔶') - - self.assertEqual(track, list(tracks)[0]) - - @unittest.SkipTest - def test_misencoded_cache(self): - # FIXME not sure if this can happen - pass - - def test_cache_with_blank_track_info(self): - tracks = parse_mpd_tag_cache( - path_to_data_dir('blank_tag_cache'), path_to_data_dir('')) - expected = Track( - uri='local:track:song1.mp3', length=4000, last_modified=1272319626) - self.assertEqual(set([expected]), tracks) - - def test_musicbrainz_tagcache(self): - tracks = parse_mpd_tag_cache( - path_to_data_dir('musicbrainz_tag_cache'), path_to_data_dir('')) - artist = list(expected_tracks[0].artists)[0].copy( - musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897') - albumartist = list(expected_tracks[0].artists)[0].copy( - name='albumartistname', - musicbrainz_id='7364dea6-ca9a-48e3-be01-b44ad0d19897') - album = expected_tracks[0].album.copy( - artists=[albumartist], - musicbrainz_id='cb5f1603-d314-4c9c-91e5-e295cfb125d2') - track = expected_tracks[0].copy( - artists=[artist], album=album, - musicbrainz_id='90488461-8c1f-4a4e-826b-4c6dc70801f0') - - self.assertEqual(track, list(tracks)[0]) - - def test_albumartist_tag_cache(self): - tracks = parse_mpd_tag_cache( - path_to_data_dir('albumartist_tag_cache'), path_to_data_dir('')) - artist = Artist(name='albumartistname') - album = expected_albums[0].copy(artists=[artist]) - track = Track( - uri='local:track:song1.mp3', name='trackname', - artists=expected_artists, track_no=1, album=album, date='2006', - length=4000, last_modified=1272319626) - self.assertEqual(track, list(tracks)[0]) diff --git a/tests/frontends/mpd/translator_test.py b/tests/frontends/mpd/translator_test.py index a6a2eaa9..1db10ab9 100644 --- a/tests/frontends/mpd/translator_test.py +++ b/tests/frontends/mpd/translator_test.py @@ -1,11 +1,10 @@ from __future__ import unicode_literals import datetime -import os import unittest -from mopidy.utils.path import mtime, uri_to_path -from mopidy.frontends.mpd import translator, protocol +from mopidy.utils.path import mtime +from mopidy.frontends.mpd import translator from mopidy.models import Album, Artist, TlTrack, Playlist, Track @@ -126,233 +125,3 @@ class PlaylistMpdFormatTest(unittest.TestCase): result = translator.playlist_to_mpd_format(playlist, 1, 2) self.assertEqual(len(result), 1) self.assertEqual(dict(result[0])['Track'], 2) - - -class TracksToTagCacheFormatTest(unittest.TestCase): - def setUp(self): - self.media_dir = '/dir/subdir' - mtime.set_fake_time(1234567) - - def tearDown(self): - mtime.undo_fake() - - def translate(self, track): - base_path = self.media_dir.encode('utf-8') - result = dict(translator.track_to_mpd_format(track)) - result['file'] = uri_to_path(result['file'])[len(base_path) + 1:] - result['key'] = os.path.basename(result['file']) - result['mtime'] = mtime('') - return translator.order_mpd_track_info(result.items()) - - def consume_headers(self, result): - self.assertEqual(('info_begin',), result[0]) - self.assertEqual(('mpd_version', protocol.VERSION), result[1]) - self.assertEqual(('fs_charset', protocol.ENCODING), result[2]) - self.assertEqual(('info_end',), result[3]) - return result[4:] - - def consume_song_list(self, result): - self.assertEqual(('songList begin',), result[0]) - for i, row in enumerate(result): - if row == ('songList end',): - return result[1:i], result[i + 1:] - self.fail("Couldn't find songList end in result") - - def consume_directory(self, result): - self.assertEqual('directory', result[0][0]) - self.assertEqual(('mtime', mtime('.')), result[1]) - self.assertEqual(('begin', os.path.split(result[0][1])[1]), result[2]) - directory = result[2][1] - for i, row in enumerate(result): - if row == ('end', directory): - return result[3:i], result[i + 1:] - self.fail("Couldn't find end %s in result" % directory) - - def test_empty_tag_cache_has_header(self): - result = translator.tracks_to_tag_cache_format([], self.media_dir) - result = self.consume_headers(result) - - def test_empty_tag_cache_has_song_list(self): - result = translator.tracks_to_tag_cache_format([], self.media_dir) - result = self.consume_headers(result) - song_list, result = self.consume_song_list(result) - - self.assertEqual(len(song_list), 0) - self.assertEqual(len(result), 0) - - def test_tag_cache_has_header(self): - track = Track(uri='file:///dir/subdir/song.mp3') - result = translator.tracks_to_tag_cache_format([track], self.media_dir) - result = self.consume_headers(result) - - def test_tag_cache_has_song_list(self): - track = Track(uri='file:///dir/subdir/song.mp3') - result = translator.tracks_to_tag_cache_format([track], self.media_dir) - result = self.consume_headers(result) - song_list, result = self.consume_song_list(result) - - self.assert_(song_list) - self.assertEqual(len(result), 0) - - def test_tag_cache_has_formated_track(self): - track = Track(uri='file:///dir/subdir/song.mp3') - formated = self.translate(track) - result = translator.tracks_to_tag_cache_format([track], self.media_dir) - - result = self.consume_headers(result) - song_list, result = self.consume_song_list(result) - - self.assertEqual(formated, song_list) - self.assertEqual(len(result), 0) - - def test_tag_cache_has_formated_track_with_key_and_mtime(self): - track = Track(uri='file:///dir/subdir/song.mp3') - formated = self.translate(track) - result = translator.tracks_to_tag_cache_format([track], self.media_dir) - - result = self.consume_headers(result) - song_list, result = self.consume_song_list(result) - - self.assertEqual(formated, song_list) - self.assertEqual(len(result), 0) - - def test_tag_cache_supports_directories(self): - track = Track(uri='file:///dir/subdir/folder/song.mp3') - formated = self.translate(track) - result = translator.tracks_to_tag_cache_format([track], self.media_dir) - - result = self.consume_headers(result) - dir_data, result = self.consume_directory(result) - song_list, result = self.consume_song_list(result) - self.assertEqual(len(song_list), 0) - self.assertEqual(len(result), 0) - - song_list, result = self.consume_song_list(dir_data) - self.assertEqual(len(result), 0) - self.assertEqual(formated, song_list) - - def test_tag_cache_diretory_header_is_right(self): - track = Track(uri='file:///dir/subdir/folder/sub/song.mp3') - result = translator.tracks_to_tag_cache_format([track], self.media_dir) - - result = self.consume_headers(result) - dir_data, result = self.consume_directory(result) - - self.assertEqual(('directory', 'folder/sub'), dir_data[0]) - self.assertEqual(('mtime', mtime('.')), dir_data[1]) - self.assertEqual(('begin', 'sub'), dir_data[2]) - - def test_tag_cache_suports_sub_directories(self): - track = Track(uri='file:///dir/subdir/folder/sub/song.mp3') - formated = self.translate(track) - result = translator.tracks_to_tag_cache_format([track], self.media_dir) - - result = self.consume_headers(result) - - dir_data, result = self.consume_directory(result) - song_list, result = self.consume_song_list(result) - self.assertEqual(len(song_list), 0) - self.assertEqual(len(result), 0) - - dir_data, result = self.consume_directory(dir_data) - song_list, result = self.consume_song_list(result) - self.assertEqual(len(result), 0) - self.assertEqual(len(song_list), 0) - - song_list, result = self.consume_song_list(dir_data) - self.assertEqual(len(result), 0) - self.assertEqual(formated, song_list) - - def test_tag_cache_supports_multiple_tracks(self): - tracks = [ - Track(uri='file:///dir/subdir/song1.mp3'), - Track(uri='file:///dir/subdir/song2.mp3'), - ] - - formated = [] - formated.extend(self.translate(tracks[0])) - formated.extend(self.translate(tracks[1])) - - result = translator.tracks_to_tag_cache_format(tracks, self.media_dir) - - result = self.consume_headers(result) - song_list, result = self.consume_song_list(result) - - self.assertEqual(formated, song_list) - self.assertEqual(len(result), 0) - - def test_tag_cache_supports_multiple_tracks_in_dirs(self): - tracks = [ - Track(uri='file:///dir/subdir/song1.mp3'), - Track(uri='file:///dir/subdir/folder/song2.mp3'), - ] - - formated = [] - formated.append(self.translate(tracks[0])) - formated.append(self.translate(tracks[1])) - - result = translator.tracks_to_tag_cache_format(tracks, self.media_dir) - - result = self.consume_headers(result) - dir_data, result = self.consume_directory(result) - song_list, song_result = self.consume_song_list(dir_data) - - self.assertEqual(formated[1], song_list) - self.assertEqual(len(song_result), 0) - - song_list, result = self.consume_song_list(result) - self.assertEqual(len(result), 0) - self.assertEqual(formated[0], song_list) - - -class TracksToDirectoryTreeTest(unittest.TestCase): - def setUp(self): - self.media_dir = '/root' - - def test_no_tracks_gives_emtpy_tree(self): - tree = translator.tracks_to_directory_tree([], self.media_dir) - self.assertEqual(tree, ({}, [])) - - def test_top_level_files(self): - tracks = [ - Track(uri='file:///root/file1.mp3'), - Track(uri='file:///root/file2.mp3'), - Track(uri='file:///root/file3.mp3'), - ] - tree = translator.tracks_to_directory_tree(tracks, self.media_dir) - self.assertEqual(tree, ({}, tracks)) - - def test_single_file_in_subdir(self): - tracks = [Track(uri='file:///root/dir/file1.mp3')] - tree = translator.tracks_to_directory_tree(tracks, self.media_dir) - expected = ({'dir': ({}, tracks)}, []) - self.assertEqual(tree, expected) - - def test_single_file_in_sub_subdir(self): - tracks = [Track(uri='file:///root/dir1/dir2/file1.mp3')] - tree = translator.tracks_to_directory_tree(tracks, self.media_dir) - expected = ({'dir1': ({'dir1/dir2': ({}, tracks)}, [])}, []) - self.assertEqual(tree, expected) - - def test_complex_file_structure(self): - tracks = [ - Track(uri='file:///root/file1.mp3'), - Track(uri='file:///root/dir1/file2.mp3'), - Track(uri='file:///root/dir1/file3.mp3'), - Track(uri='file:///root/dir2/file4.mp3'), - Track(uri='file:///root/dir2/sub/file5.mp3'), - ] - tree = translator.tracks_to_directory_tree(tracks, self.media_dir) - expected = ( - { - 'dir1': ({}, [tracks[1], tracks[2]]), - 'dir2': ( - { - 'dir2/sub': ({}, [tracks[4]]) - }, - [tracks[3]] - ), - }, - [tracks[0]] - ) - self.assertEqual(tree, expected)