Merge remote branch 'adamcik/feature/file-scanner' into develop

This commit is contained in:
Stein Magnus Jodal 2010-10-31 01:16:30 +02:00
commit 61c8720158
27 changed files with 950 additions and 18 deletions

31
bin/mopidy-scan Executable file
View File

@ -0,0 +1,31 @@
#!/usr/bin/env python
if __name__ == '__main__':
import sys
from mopidy import settings
from mopidy.scanner import Scanner, translator
from mopidy.frontends.mpd.translator import tracks_to_tag_cache_format
tracks = []
def store(data):
track = translator(data)
tracks.append(track)
print >> sys.stderr, 'Added %s' % track.uri
def debug(uri, error):
print >> sys.stderr, 'Failed %s: %s' % (uri, error)
print >> sys.stderr, 'Scanning %s' % settings.LOCAL_MUSIC_FOLDER
scanner = Scanner(settings.LOCAL_MUSIC_FOLDER, store, debug)
scanner.start()
print >> sys.stderr, 'Done'
for a in tracks_to_tag_cache_format(tracks):
if len(a) == 1:
print a[0]
else:
print u': '.join([unicode(b) for b in a])

View File

@ -31,6 +31,13 @@ file::
BACKENDS = (u'mopidy.backends.local.LocalBackend',)
Previously this backend relied purely on ``tag_cache`` files from MPD, to
remedy this the command ``mopidy-scan`` has been added. This program will scan
your current ``LOCAL_MUSIC_FOLDER`` and build a MPD compatible ``tag_cache``.
Currently the command outputs the ``tag_cache`` to ``stdout``, this means that
you will need to run ``mopidy-scan > path/to/your/tag_cache`` to actually start
using your new cache.
You may also want to change some of the ``LOCAL_*`` settings. See
:mod:`mopidy.settings`, for a full list of available settings.

View File

@ -13,7 +13,7 @@ implement our own MPD server which is compatible with the numerous existing
import re
#: The MPD protocol uses UTF-8 for encoding all data.
ENCODING = u'utf-8'
ENCODING = u'UTF-8'
#: The MPD protocol uses ``\n`` as line terminator.
LINE_TERMINATOR = u'\n'

View File

@ -1,4 +1,12 @@
def track_to_mpd_format(track, position=None, cpid=None):
import os
import re
from mopidy import settings
from mopidy.utils.path import mtime as get_mtime
from mopidy.frontends.mpd import protocol
from mopidy.utils.path import path_to_uri, uri_to_path, split_path
def track_to_mpd_format(track, position=None, cpid=None, key=False, mtime=False):
"""
Format track for output to MPD client.
@ -8,12 +16,16 @@ def track_to_mpd_format(track, position=None, cpid=None):
:type position: integer
:param cpid: track's CPID (current playlist ID)
:type cpid: integer
:param key: if we should set key
:type key: boolean
:param mtime: if we should set mtime
:type mtime: boolean
:rtype: list of two-tuples
"""
result = [
('file', track.uri or ''),
('Time', track.length and (track.length // 1000) or 0),
('Artist', track_artists_to_mpd_format(track)),
('Artist', artists_to_mpd_format(track.artists)),
('Title', track.name or ''),
('Album', track.album and track.album.name or ''),
('Date', track.date or ''),
@ -23,20 +35,43 @@ def track_to_mpd_format(track, position=None, cpid=None):
track.track_no, track.album.num_tracks)))
else:
result.append(('Track', track.track_no))
if track.album is not None and track.album.artists:
artists = artists_to_mpd_format(track.album.artists)
result.append(('AlbumArtist', artists))
if position is not None and cpid is not None:
result.append(('Pos', position))
result.append(('Id', cpid))
if key and track.uri:
result.insert(0, ('key', os.path.basename(uri_to_path(track.uri))))
if mtime and track.uri:
result.append(('mtime', get_mtime(uri_to_path(track.uri))))
return result
def track_artists_to_mpd_format(track):
MPD_KEY_ORDER = '''
key file Time Artist AlbumArtist Title Album Track Date MUSICBRAINZ_ALBUMID
MUSICBRAINZ_ALBUMARTISTID MUSICBRAINZ_ARTISTID MUSICBRAINZ_TRACKID mtime
'''.split()
def order_mpd_track_info(result):
"""
Order results from :func:`mopidy.frontends.mpd.translator.track_to_mpd_format`
so that it matches MPD's ordering. Simply a cosmetic fix for easier
diffing of tag_caches.
:param result: the track info
:type result: list of tuples
:rtype: list of tuples
"""
return sorted(result, key=lambda i: MPD_KEY_ORDER.index(i[0]))
def artists_to_mpd_format(artists):
"""
Format track artists for output to MPD client.
:param track: the track
:type track: :class:`mopidy.models.Track`
:param artists: the artists
:type track: array of :class:`mopidy.models.Artist`
:rtype: string
"""
artists = track.artists
artists.sort(key=lambda a: a.name)
return u', '.join([a.name for a in artists])
@ -72,3 +107,58 @@ def playlist_to_mpd_format(playlist, *args, **kwargs):
Arguments as for :func:`tracks_to_mpd_format`, except the first one.
"""
return tracks_to_mpd_format(playlist.tracks, *args, **kwargs)
def tracks_to_tag_cache_format(tracks):
"""
Format list of tracks for output to MPD tag cache
:param tracks: the tracks
:type tracks: list of :class:`mopidy.models.Track`
:rtype: list of lists of two-tuples
"""
result = [
('info_begin',),
('mpd_version', protocol.VERSION),
('fs_charset', protocol.ENCODING),
('info_end',)
]
tracks.sort(key=lambda t: t.uri)
_add_to_tag_cache(result, *tracks_to_directory_tree(tracks))
return result
def _add_to_tag_cache(result, folders, files):
for path, entry in folders.items():
name = os.path.split(path)[1]
music_folder = os.path.expanduser(settings.LOCAL_MUSIC_FOLDER)
mtime = get_mtime(os.path.join(music_folder, path))
result.append(('directory', path))
result.append(('mtime', mtime))
result.append(('begin', name))
_add_to_tag_cache(result, *entry)
result.append(('end', name))
result.append(('songList begin',))
for track in files:
track_result = track_to_mpd_format(track, key=True, mtime=True)
track_result = order_mpd_track_info(track_result)
result.extend(track_result)
result.append(('songList end',))
def tracks_to_directory_tree(tracks):
directories = ({}, [])
for track in tracks:
path = u''
current = directories
local_folder = os.path.expanduser(settings.LOCAL_MUSIC_FOLDER)
track_path = uri_to_path(track.uri)
track_path = re.sub('^' + re.escape(local_folder), '', track_path)
track_dir = os.path.dirname(track_path)
for part in split_path(track_dir):
path = os.path.join(path, part)
if path not in current[0]:
current[0][path] = ({}, [])
current = current[0][path]
current[1].append(track)
return directories

122
mopidy/scanner.py Normal file
View File

@ -0,0 +1,122 @@
import gobject
gobject.threads_init()
import pygst
pygst.require('0.10')
import gst
from os.path import abspath
import datetime
import sys
import threading
from mopidy.utils.path import path_to_uri, find_files
from mopidy.models import Track, Artist, Album
def translator(data):
albumartist_kwargs = {}
album_kwargs = {}
artist_kwargs = {}
track_kwargs = {}
if 'album' in data:
album_kwargs['name'] = data['album']
if 'track-count' in data:
album_kwargs['num_tracks'] = data['track-count']
if 'artist' in data:
artist_kwargs['name'] =data['artist']
if 'date' in data:
date = data['date']
date = datetime.date(date.year, date.month, date.day)
track_kwargs['date'] = date
if 'title' in data:
track_kwargs['name'] = data['title']
if 'track-number' in data:
track_kwargs['track_no'] = data['track-number']
if 'album-artist' in data:
albumartist_kwargs['name'] = data['album-artist']
if albumartist_kwargs:
album_kwargs['artists'] = [Artist(**albumartist_kwargs)]
track_kwargs['uri'] = data['uri']
track_kwargs['length'] = data['duration']
track_kwargs['album'] = Album(**album_kwargs)
track_kwargs['artists'] = [Artist(**artist_kwargs)]
return Track(**track_kwargs)
class Scanner(object):
def __init__(self, folder, data_callback, error_callback=None):
self.uris = [path_to_uri(f) for f in find_files(folder)]
self.data_callback = data_callback
self.error_callback = error_callback
self.loop = gobject.MainLoop()
caps = gst.Caps('audio/x-raw-int')
fakesink = gst.element_factory_make('fakesink')
pad = fakesink.get_pad('sink')
self.uribin = gst.element_factory_make('uridecodebin')
self.uribin.connect('pad-added', self.process_new_pad, pad)
self.uribin.set_property('caps', caps)
self.pipe = gst.element_factory_make('pipeline')
self.pipe.add(fakesink)
self.pipe.add(self.uribin)
bus = self.pipe.get_bus()
bus.add_signal_watch()
bus.connect('message::tag', self.process_tags)
bus.connect('message::error', self.process_error)
def process_new_pad(self, source, pad, target_pad):
pad.link(target_pad)
def process_tags(self, bus, message):
data = message.parse_tag()
data = dict([(k, data[k]) for k in data.keys()])
data['uri'] = unicode(self.uribin.get_property('uri'))
data['duration'] = self.get_duration()
self.data_callback(data)
self.next_uri()
def process_error(self, bus, message):
if self.error_callback:
uri = self.uribin.get_property('uri')
errors = message.parse_error()
self.error_callback(uri, errors)
self.next_uri()
def get_duration(self):
self.pipe.get_state()
try:
return self.pipe.query_duration(
gst.FORMAT_TIME, None)[0] // gst.MSECOND
except gst.QueryError:
return None
def next_uri(self):
if not self.uris:
return self.stop()
self.pipe.set_state(gst.STATE_NULL)
self.uribin.set_property('uri', self.uris.pop())
self.pipe.set_state(gst.STATE_PAUSED)
def start(self):
if not self.uris:
return
self.next_uri()
self.loop.run()
def stop(self):
self.pipe.set_state(gst.STATE_NULL)
self.loop.quit()

View File

@ -1,6 +1,7 @@
import logging
import os
import sys
import re
import urllib
logger = logging.getLogger('mopidy.utils.path')
@ -26,3 +27,53 @@ def path_to_uri(*paths):
if sys.platform == 'win32':
return 'file:' + urllib.pathname2url(path)
return 'file://' + urllib.pathname2url(path)
def uri_to_path(uri):
if sys.platform == 'win32':
path = urllib.url2pathname(re.sub('^file:', '', uri))
else:
path = urllib.url2pathname(re.sub('^file://', '', uri))
return path.encode('latin1').decode('utf-8') # Undo double encoding
def split_path(path):
parts = []
while True:
path, part = os.path.split(path)
if part:
parts.insert(0, part)
if not path or path == '/':
break
return parts
def find_files(path):
path = os.path.expanduser(path)
if os.path.isfile(path):
filename = os.path.abspath(path)
if not isinstance(filename, unicode):
filename = filename.decode('utf-8')
yield filename
else:
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
dirpath = os.path.abspath(dirpath)
filename = os.path.join(dirpath, filename)
if not isinstance(filename, unicode):
filename = filename.decode('utf-8')
yield filename
class Mtime(object):
def __init__(self):
self.fake = None
def __call__(self, path):
if self.fake is not None:
return self.fake
return int(os.stat(path).st_mtime)
def set_fake_time(self, time):
self.fake = time
def undo_fake(self):
self.fake = None
mtime = Mtime()

View File

@ -80,7 +80,7 @@ setup(
package_data={'mopidy': ['backends/libspotify/spotify_appkey.key']},
cmdclass=cmdclasses,
data_files=data_files,
scripts=['bin/mopidy'],
scripts=['bin/mopidy', 'bin/mopidy-scan'],
url='http://www.mopidy.com/',
license='Apache License, Version 2.0',
description='MPD server with Spotify support',

Binary file not shown.

View File

@ -0,0 +1 @@
../sample.mp3

View File

@ -0,0 +1 @@
../sample.mp3

View File

@ -0,0 +1 @@
../sample.mp3

View File

@ -0,0 +1 @@
../../sample.mp3

View File

@ -0,0 +1 @@
../../sample.mp3

View File

@ -0,0 +1 @@
../../sample.mp3

View File

@ -0,0 +1 @@
../../sample.mp3

View File

@ -0,0 +1 @@
../../sample.mp3

View File

@ -0,0 +1 @@
../../sample.mp3

View File

@ -0,0 +1,81 @@
info_begin
mpd_version: 0.15.4
fs_charset: UTF-8
info_end
directory: subdir1
mtime: 1288121499
begin: subdir1
songList begin
key: song4.mp3
file: subdir1/song4.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song5.mp3
file: subdir1/song5.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end
end: subdir1
directory: subdir2
mtime: 1288121499
begin: subdir2
songList begin
key: song6.mp3
file: subdir2/song6.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song7.mp3
file: subdir2/song7.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end
end: subdir2
songList begin
key: song1.mp3
file: /song1.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song2.mp3
file: /song2.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
key: song3.mp3
file: /song3.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end

0
tests/data/scanner/empty/.gitignore vendored Normal file
View File

View File

@ -0,0 +1,6 @@
info_begin
mpd_version: 0.15.4
fs_charset: UTF-8
info_end
songList begin
songList end

Binary file not shown.

After

Width:  |  Height:  |  Size: 176 B

Binary file not shown.

View File

@ -0,0 +1 @@
../sample.mp3

View File

@ -0,0 +1,15 @@
info_begin
mpd_version: 0.15.4
fs_charset: UTF-8
info_end
songList begin
key: song1.mp3
file: /song1.mp3
Time: 5
Artist: name
Title: trackname
Album: albumname
Track: 01/02
Date: 2006
mtime: 1288121370
songList end

View File

@ -1,11 +1,24 @@
import datetime as dt
import os
import unittest
from mopidy.frontends.mpd import translator
from mopidy import settings
from mopidy.utils.path import mtime
from mopidy.frontends.mpd import translator, protocol
from mopidy.models import Album, Artist, Playlist, Track
from tests import data_folder, SkipTest
class TrackMpdFormatTest(unittest.TestCase):
def test_mpd_format_for_empty_track(self):
def setUp(self):
settings.LOCAL_MUSIC_FOLDER = '/dir/subdir'
mtime.set_fake_time(1234567)
def tearDown(self):
settings.runtime.clear()
mtime.undo_fake()
def test_track_to_mpd_format_for_empty_track(self):
result = translator.track_to_mpd_format(Track())
self.assert_(('file', '') in result)
self.assert_(('Time', 0) in result)
@ -14,13 +27,43 @@ class TrackMpdFormatTest(unittest.TestCase):
self.assert_(('Album', '') in result)
self.assert_(('Track', 0) in result)
self.assert_(('Date', '') in result)
self.assertEqual(len(result), 7)
def test_mpd_format_for_nonempty_track(self):
def test_track_to_mpd_format_with_position(self):
result = translator.track_to_mpd_format(Track(), position=1)
self.assert_(('Pos', 1) not in result)
def test_track_to_mpd_format_with_cpid(self):
result = translator.track_to_mpd_format(Track(), cpid=1)
self.assert_(('Id', 1) not in result)
def test_track_to_mpd_format_with_position_and_cpid(self):
result = translator.track_to_mpd_format(Track(), position=1, cpid=2)
self.assert_(('Pos', 1) in result)
self.assert_(('Id', 2) in result)
def test_track_to_mpd_format_with_key(self):
track = Track(uri='file:///dir/subdir/file.mp3')
result = translator.track_to_mpd_format(track, key=True)
self.assert_(('key', 'file.mp3') in result)
def test_track_to_mpd_format_with_key_not_uri_encoded(self):
track = Track(uri='file:///dir/subdir/file%20test.mp3')
result = translator.track_to_mpd_format(track, key=True)
self.assert_(('key', 'file test.mp3') in result)
def test_track_to_mpd_format_with_mtime(self):
uri = translator.path_to_uri(data_folder('blank.mp3'))
result = translator.track_to_mpd_format(Track(uri=uri), mtime=True)
self.assert_(('mtime', 1234567) in result)
def test_track_to_mpd_format_for_nonempty_track(self):
track = Track(
uri=u'a uri',
artists=[Artist(name=u'an artist')],
name=u'a name',
album=Album(name=u'an album', num_tracks=13),
album=Album(name=u'an album', num_tracks=13,
artists=[Artist(name=u'an other artist')]),
track_no=7,
date=dt.date(1977, 1, 1),
length=137000,
@ -31,15 +74,17 @@ class TrackMpdFormatTest(unittest.TestCase):
self.assert_(('Artist', 'an artist') in result)
self.assert_(('Title', 'a name') in result)
self.assert_(('Album', 'an album') in result)
self.assert_(('AlbumArtist', 'an other artist') in result)
self.assert_(('Track', '7/13') in result)
self.assert_(('Date', dt.date(1977, 1, 1)) in result)
self.assert_(('Pos', 9) in result)
self.assert_(('Id', 122) in result)
self.assertEqual(len(result), 10)
def test_mpd_format_artists(self):
track = Track(artists=[Artist(name=u'ABBA'), Artist(name=u'Beatles')])
self.assertEqual(translator.track_artists_to_mpd_format(track),
u'ABBA, Beatles')
def test_artists_to_mpd_format(self):
artists = [Artist(name=u'ABBA'), Artist(name=u'Beatles')]
translated = translator.artists_to_mpd_format(artists)
self.assertEqual(translated, u'ABBA, Beatles')
class PlaylistMpdFormatTest(unittest.TestCase):
@ -55,3 +100,234 @@ class PlaylistMpdFormatTest(unittest.TestCase):
result = translator.playlist_to_mpd_format(playlist, 1, 2)
self.assertEqual(len(result), 1)
self.assertEqual(dict(result[0])['Track'], 2)
class TracksToTagCacheFormatTest(unittest.TestCase):
def setUp(self):
settings.LOCAL_MUSIC_FOLDER = '/dir/subdir'
mtime.set_fake_time(1234567)
def tearDown(self):
settings.runtime.clear()
mtime.undo_fake()
def translate(self, track):
result = translator.track_to_mpd_format(track, key=True, mtime=True)
return translator.order_mpd_track_info(result)
def consume_headers(self, result):
self.assertEqual(('info_begin',), result[0])
self.assertEqual(('mpd_version', protocol.VERSION), result[1])
self.assertEqual(('fs_charset', protocol.ENCODING), result[2])
self.assertEqual(('info_end',), result[3])
return result[4:]
def consume_song_list(self, result):
self.assertEqual(('songList begin',), result[0])
for i, row in enumerate(result):
if row == ('songList end',):
return result[1:i], result[i+1:]
self.fail("Couldn't find songList end in result")
def consume_directory(self, result):
self.assertEqual('directory', result[0][0])
self.assertEqual(('mtime', mtime('.')), result[1])
self.assertEqual(('begin', os.path.split(result[0][1])[1]), result[2])
directory = result[2][1]
for i, row in enumerate(result):
if row == ('end', directory):
return result[3:i], result[i+1:]
self.fail("Couldn't find end %s in result" % directory)
def test_empty_tag_cache_has_header(self):
result = translator.tracks_to_tag_cache_format([])
result = self.consume_headers(result)
def test_empty_tag_cache_has_song_list(self):
result = translator.tracks_to_tag_cache_format([])
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
def test_tag_cache_has_header(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track])
result = self.consume_headers(result)
def test_tag_cache_has_song_list(self):
track = Track(uri='file:///dir/subdir/song.mp3')
result = translator.tracks_to_tag_cache_format([track])
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assert_(song_list)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track])
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(song_list, formated)
self.assertEqual(len(result), 0)
def test_tag_cache_has_formated_track_with_key_and_mtime(self):
track = Track(uri='file:///dir/subdir/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track])
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(song_list, formated)
self.assertEqual(len(result), 0)
def test_tag_cache_suports_directories(self):
track = Track(uri='file:///dir/subdir/folder/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track])
result = self.consume_headers(result)
folder, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
song_list, result = self.consume_song_list(folder)
self.assertEqual(len(result), 0)
self.assertEqual(song_list, formated)
def test_tag_cache_diretory_header_is_right(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track])
result = self.consume_headers(result)
folder, result = self.consume_directory(result)
self.assertEqual(('directory', 'folder/sub'), folder[0])
self.assertEqual(('mtime', mtime('.')), folder[1])
self.assertEqual(('begin', 'sub'), folder[2])
def test_tag_cache_suports_sub_directories(self):
track = Track(uri='file:///dir/subdir/folder/sub/song.mp3')
formated = self.translate(track)
result = translator.tracks_to_tag_cache_format([track])
result = self.consume_headers(result)
folder, result = self.consume_directory(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(song_list), 0)
self.assertEqual(len(result), 0)
folder, result = self.consume_directory(folder)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(len(song_list), 0)
song_list, result = self.consume_song_list(folder)
self.assertEqual(len(result), 0)
self.assertEqual(song_list, formated)
def test_tag_cache_supports_multiple_tracks(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/song2.mp3'),
]
formated = []
formated.extend(self.translate(tracks[0]))
formated.extend(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks)
result = self.consume_headers(result)
song_list, result = self.consume_song_list(result)
self.assertEqual(song_list, formated)
self.assertEqual(len(result), 0)
def test_tag_cache_supports_multiple_tracks_in_dirs(self):
tracks = [
Track(uri='file:///dir/subdir/song1.mp3'),
Track(uri='file:///dir/subdir/folder/song2.mp3'),
]
formated = []
formated.append(self.translate(tracks[0]))
formated.append(self.translate(tracks[1]))
result = translator.tracks_to_tag_cache_format(tracks)
result = self.consume_headers(result)
folder, result = self.consume_directory(result)
song_list, song_result = self.consume_song_list(folder)
self.assertEqual(song_list, formated[1])
self.assertEqual(len(song_result), 0)
song_list, result = self.consume_song_list(result)
self.assertEqual(len(result), 0)
self.assertEqual(song_list, formated[0])
class TracksToDirectoryTreeTest(unittest.TestCase):
def setUp(self):
settings.LOCAL_MUSIC_FOLDER = '/root/'
def tearDown(self):
settings.runtime.clear()
def test_no_tracks_gives_emtpy_tree(self):
tree = translator.tracks_to_directory_tree([])
self.assertEqual(tree, ({}, []))
def test_top_level_files(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/file2.mp3'),
Track(uri='file:///root/file3.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks)
self.assertEqual(tree, ({}, tracks))
def test_single_file_in_subdir(self):
tracks = [Track(uri='file:///root/dir/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks)
expected = ({'dir': ({}, tracks)}, [])
self.assertEqual(tree, expected)
def test_single_file_in_sub_subdir(self):
tracks = [Track(uri='file:///root/dir1/dir2/file1.mp3')]
tree = translator.tracks_to_directory_tree(tracks)
expected = ({'dir1': ({'dir1/dir2': ({}, tracks)}, [])}, [])
self.assertEqual(tree, expected)
def test_complex_file_structure(self):
tracks = [
Track(uri='file:///root/file1.mp3'),
Track(uri='file:///root/dir1/file2.mp3'),
Track(uri='file:///root/dir1/file3.mp3'),
Track(uri='file:///root/dir2/file4.mp3'),
Track(uri='file:///root/dir2/sub/file5.mp3'),
]
tree = translator.tracks_to_directory_tree(tracks)
expected = (
{
'dir1': ({}, [tracks[1], tracks[2]]),
'dir2': (
{
'dir2/sub': ({}, [tracks[4]])
},
[tracks[3]]
),
},
[tracks[0]]
)
self.assertEqual(tree, expected)

158
tests/scanner_test.py Normal file
View File

@ -0,0 +1,158 @@
import unittest
from datetime import date
from mopidy.scanner import Scanner, translator
from mopidy.models import Track, Artist, Album
from tests import data_folder
class FakeGstDate(object):
def __init__(self, year, month, day):
self.year = year
self.month = month
self.day = day
class TranslatorTest(unittest.TestCase):
def setUp(self):
self.data = {
'uri': 'uri',
'album': u'albumname',
'track-number': 1,
'artist': u'name',
'album-artist': 'albumartistname',
'title': u'trackname',
'track-count': 2,
'date': FakeGstDate(2006, 1, 1,),
'container-format': u'ID3 tag',
'duration': 4531,
}
self.album = {
'name': 'albumname',
'num_tracks': 2,
}
self.artist = {
'name': 'name',
}
self.albumartist = {
'name': 'albumartistname',
}
self.track = {
'uri': 'uri',
'name': 'trackname',
'date': date(2006, 1, 1),
'track_no': 1,
'length': 4531,
}
def build_track(self):
if self.albumartist:
self.album['artists'] = [Artist(**self.albumartist)]
self.track['album'] = Album(**self.album)
self.track['artists'] = [Artist(**self.artist)]
return Track(**self.track)
def check(self):
expected = self.build_track()
actual = translator(self.data)
self.assertEqual(expected, actual)
def test_basic_data(self):
self.check()
def test_missing_track_number(self):
del self.data['track-number']
del self.track['track_no']
self.check()
def test_missing_track_count(self):
del self.data['track-count']
del self.album['num_tracks']
self.check()
def test_missing_track_name(self):
del self.data['title']
del self.track['name']
self.check()
def test_missing_album_name(self):
del self.data['album']
del self.album['name']
self.check()
def test_missing_artist_name(self):
del self.data['artist']
del self.artist['name']
self.check()
def test_missing_album_artist(self):
del self.data['album-artist']
del self.albumartist['name']
self.check()
def test_missing_date(self):
del self.data['date']
del self.track['date']
self.check()
class ScannerTest(unittest.TestCase):
def setUp(self):
self.errors = {}
self.data = {}
def scan(self, path):
scanner = Scanner(data_folder(path),
self.data_callback, self.error_callback)
scanner.start()
def check(self, name, key, value):
name = data_folder(name)
self.assertEqual(self.data[name][key], value)
def data_callback(self, data):
uri = data['uri'][len('file://'):]
self.data[uri] = data
def error_callback(self, uri, errors):
uri = uri[len('file://'):]
self.errors[uri] = errors
def test_data_is_set(self):
self.scan('scanner/simple')
self.assert_(self.data)
def test_errors_is_not_set(self):
self.scan('scanner/simple')
self.assert_(not self.errors)
def test_uri_is_set(self):
self.scan('scanner/simple')
self.check('scanner/simple/song1.mp3', 'uri', 'file://'
+ data_folder('scanner/simple/song1.mp3'))
def test_duration_is_set(self):
self.scan('scanner/simple')
self.check('scanner/simple/song1.mp3', 'duration', 4680)
def test_artist_is_set(self):
self.scan('scanner/simple')
self.check('scanner/simple/song1.mp3', 'artist', 'name')
def test_album_is_set(self):
self.scan('scanner/simple')
self.check('scanner/simple/song1.mp3', 'album', 'albumname')
def test_track_is_set(self):
self.scan('scanner/simple')
self.check('scanner/simple/song1.mp3', 'title', 'trackname')
def test_nonexistant_folder_does_not_fail(self):
self.scan('scanner/does-not-exist')
self.assert_(not self.errors)
def test_other_media_is_ignored(self):
self.scan('scanner/image')
self.assert_(self.errors)

View File

@ -6,9 +6,10 @@ import sys
import tempfile
import unittest
from mopidy.utils.path import get_or_create_folder, path_to_uri
from mopidy.utils.path import (get_or_create_folder, mtime,
path_to_uri, uri_to_path, split_path, find_files)
from tests import SkipTest
from tests import SkipTest, data_folder
class GetOrCreateFolderTest(unittest.TestCase):
def setUp(self):
@ -69,3 +70,87 @@ class PathToFileURITest(unittest.TestCase):
else:
result = path_to_uri(u'/tmp/æøå')
self.assertEqual(result, u'file:///tmp/%C3%A6%C3%B8%C3%A5')
class UriToPathTest(unittest.TestCase):
def test_simple_uri(self):
if sys.platform == 'win32':
result = uri_to_path('file:///C://WINDOWS/clock.avi')
self.assertEqual(result, u'C:/WINDOWS/clock.avi')
else:
result = uri_to_path('file:///etc/fstab')
self.assertEqual(result, u'/etc/fstab')
def test_space_in_uri(self):
if sys.platform == 'win32':
result = uri_to_path('file:///C://test%20this')
self.assertEqual(result, u'C:/test this')
else:
result = uri_to_path(u'file:///tmp/test%20this')
self.assertEqual(result, u'/tmp/test this')
def test_unicode_in_uri(self):
if sys.platform == 'win32':
result = uri_to_path( 'file:///C://%C3%A6%C3%B8%C3%A5')
self.assertEqual(result, u'C:/æøå')
else:
result = uri_to_path(u'file:///tmp/%C3%A6%C3%B8%C3%A5')
self.assertEqual(result, u'/tmp/æøå')
class SplitPathTest(unittest.TestCase):
def test_empty_path(self):
self.assertEqual([], split_path(''))
def test_single_folder(self):
self.assertEqual(['foo'], split_path('foo'))
def test_folders(self):
self.assertEqual(['foo', 'bar', 'baz'], split_path('foo/bar/baz'))
def test_folders(self):
self.assertEqual(['foo', 'bar', 'baz'], split_path('foo/bar/baz'))
def test_initial_slash_is_ignored(self):
self.assertEqual(['foo', 'bar', 'baz'], split_path('/foo/bar/baz'))
def test_only_slash(self):
self.assertEqual([], split_path('/'))
class FindFilesTest(unittest.TestCase):
def find(self, path):
return list(find_files(data_folder(path)))
def test_basic_folder(self):
self.assert_(self.find(''))
def test_nonexistant_folder(self):
self.assertEqual(self.find('does-not-exist'), [])
def test_file(self):
files = self.find('blank.mp3')
self.assertEqual(len(files), 1)
self.assert_(files[0], data_folder('blank.mp3'))
def test_names_are_unicode(self):
is_unicode = lambda f: isinstance(f, unicode)
for name in self.find(''):
self.assert_(is_unicode(name),
'%s is not unicode object' % repr(name))
def test_expanduser(self):
raise SkipTest
class MtimeTest(unittest.TestCase):
def tearDown(self):
mtime.undo_fake()
def test_mtime_of_current_dir(self):
mtime_dir = int(os.stat('.').st_mtime)
self.assertEqual(mtime_dir, mtime('.'))
def test_fake_time_is_returned(self):
mtime.set_fake_time(123456)
self.assertEqual(mtime('.'), 123456)