Add basic mpd tag cache parser and tests
This commit is contained in:
parent
7158056c26
commit
f36f64c6ca
@ -6,6 +6,8 @@ import urllib
|
||||
|
||||
logger = logging.getLogger('mopidy.utils')
|
||||
|
||||
from mopidy.models import Track, Artist, Album
|
||||
|
||||
def flatten(the_list):
|
||||
result = []
|
||||
for element in the_list:
|
||||
@ -133,3 +135,69 @@ def parse_m3u(file_path):
|
||||
uris.append('file://' + path)
|
||||
|
||||
return uris
|
||||
|
||||
def parse_mpd_tag_cache(tag_cache, music_dir=''):
|
||||
"""
|
||||
Converts a MPD tag_cache into a lists of tracks, artists and albums.
|
||||
"""
|
||||
with open(tag_cache) as library:
|
||||
contents = library.read()
|
||||
|
||||
tracks = set()
|
||||
artists = set()
|
||||
albums = set()
|
||||
current = {}
|
||||
state = None
|
||||
|
||||
for line in contents.split('\n'):
|
||||
if line == 'songList begin':
|
||||
state = 'songs'
|
||||
continue
|
||||
elif line == 'songList end':
|
||||
state = None
|
||||
continue
|
||||
elif not state:
|
||||
continue
|
||||
|
||||
key, value = line.split(': ', 1)
|
||||
|
||||
if key == 'key':
|
||||
_convert_mpd_data(current, tracks, artists, albums, music_dir)
|
||||
current.clear()
|
||||
|
||||
current[key.lower()] = value
|
||||
|
||||
_convert_mpd_data(current, tracks, artists, albums, music_dir)
|
||||
|
||||
return tracks, artists, albums
|
||||
|
||||
def _convert_mpd_data(data, tracks, artists, albums, music_dir):
|
||||
if not data:
|
||||
return
|
||||
|
||||
match = filter(lambda a: a.name == data['artist'], artists)
|
||||
if match:
|
||||
artist = match[0]
|
||||
else:
|
||||
artist = Artist(name=data['artist'])
|
||||
print 'adding %s' % artist
|
||||
artists.add(artist)
|
||||
|
||||
match = filter(lambda a: a.name == data['album'], albums)
|
||||
if match:
|
||||
album = match[0]
|
||||
else:
|
||||
num_tracks = int(data['track'].split('/')[1])
|
||||
album = Album(name=data['album'], artists=[artist], num_tracks=num_tracks)
|
||||
print 'adding %s' % album
|
||||
albums.add(album)
|
||||
|
||||
match = filter(lambda t: t.name == data['title'], tracks)
|
||||
if not match:
|
||||
path = os.path.join(music_dir, data['file'][1:])
|
||||
uri = 'file://' + urllib.pathname2url(path)
|
||||
track_no = int(data['track'].split('/')[0])
|
||||
track = Track(name=data['title'], artists=[artist], track_no=track_no,
|
||||
length=int(data['time'])*1000, uri=uri, album=album)
|
||||
print 'adding %s' % track
|
||||
tracks.add(track)
|
||||
|
||||
6
tests/data/empty_tag_cache
Normal file
6
tests/data/empty_tag_cache
Normal file
@ -0,0 +1,6 @@
|
||||
info_begin
|
||||
mpd_version: 0.14.2
|
||||
fs_charset: UTF-8
|
||||
info_end
|
||||
songList begin
|
||||
songList end
|
||||
15
tests/data/simple_tag_cache
Normal file
15
tests/data/simple_tag_cache
Normal file
@ -0,0 +1,15 @@
|
||||
info_begin
|
||||
mpd_version: 0.14.2
|
||||
fs_charset: UTF-8
|
||||
info_end
|
||||
songList begin
|
||||
key: song1.mp3
|
||||
file: /song1.mp3
|
||||
Time: 4
|
||||
Artist: name
|
||||
Title: trackname
|
||||
Album: albumname
|
||||
Track: 1/2
|
||||
Date: 2006
|
||||
mtime: 1272319626
|
||||
songList end
|
||||
@ -5,7 +5,8 @@ import tempfile
|
||||
import unittest
|
||||
import urllib
|
||||
|
||||
from mopidy.utils import parse_m3u
|
||||
from mopidy.utils import parse_m3u, parse_mpd_tag_cache
|
||||
from mopidy.models import Track, Artist, Album
|
||||
|
||||
def data(name):
|
||||
folder = os.path.dirname(__file__)
|
||||
@ -61,3 +62,45 @@ class M3UToUriTest(unittest.TestCase):
|
||||
def test_encoding_is_latin1(self):
|
||||
uris = parse_m3u(data('encoding.m3u'))
|
||||
self.assertEqual([encoded_uri], uris)
|
||||
|
||||
expected_artists = [Artist(name='name')]
|
||||
expected_albums = [Album(name='albumname', artists=expected_artists, num_tracks=2)]
|
||||
expected_tracks = []
|
||||
|
||||
def generate_track(path):
|
||||
uri = 'file://' + urllib.pathname2url(data(path))
|
||||
track = Track(name='trackname', artists=expected_artists, track_no=1,
|
||||
album=expected_albums[0], length=4000, uri=uri)
|
||||
expected_tracks.append(track)
|
||||
|
||||
generate_track('song1.mp3')
|
||||
generate_track('song2.mp3')
|
||||
generate_track('song3.mp3')
|
||||
generate_track('subdir1/song4.mp3')
|
||||
generate_track('subdir1/song5.mp3')
|
||||
generate_track('subdir2/song6.mp3')
|
||||
generate_track('subdir2/song7.mp3')
|
||||
generate_track('subdir1/subsubdir/song8.mp3')
|
||||
generate_track('subdir1/subsubdir/song9.mp3')
|
||||
|
||||
class MPDTagCacheToTracksTest(unittest.TestCase):
|
||||
def test_emtpy_cache(self):
|
||||
tracks, artists, albums = parse_mpd_tag_cache(data('empty_tag_cache'), data(''))
|
||||
self.assertEqual(set(), tracks)
|
||||
self.assertEqual(set(), artists)
|
||||
self.assertEqual(set(), albums)
|
||||
|
||||
def test_simple_cache(self):
|
||||
tracks, artists, albums = parse_mpd_tag_cache(data('simple_tag_cache'), data(''))
|
||||
|
||||
track1 = expected_tracks[0]
|
||||
track2 = list(tracks)[0]
|
||||
|
||||
self.assertEqual(track1.album._artists, track2.album._artists)
|
||||
self.assertEqual(track1.album.name, track2.album.name)
|
||||
self.assertEqual(track1.album.num_tracks, track2.album.num_tracks)
|
||||
self.assertEqual(track1.artists, track2.artists)
|
||||
self.assertEqual(track1, track2)
|
||||
|
||||
self.assertEqual(set(expected_artists), artists)
|
||||
self.assertEqual(set(expected_albums), albums)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user