utils: Remove find_uris and update find_files

- find_uris is no more
- find_files now returns file paths relative to path being searched
- find_files now only works on directories
- find_files tests have been updated to reflect changes
- local scanner has gotten a minimal update to reflect this alteration
This commit is contained in:
Thomas Adamcik 2013-11-27 22:49:07 +01:00
parent c025b87076
commit 603b57ef3c
9 changed files with 40 additions and 66 deletions

View File

@ -68,12 +68,13 @@ class ScanCommand(commands.Command):
local_updater.remove(uri)
logger.info('Checking %s for unknown tracks.', media_dir)
for uri in path.find_uris(media_dir):
file_extension = os.path.splitext(path.uri_to_path(uri))[1]
for relpath in path.find_files(media_dir):
file_extension = os.path.splitext(relpath)[1]
if file_extension.lower() in excluded_file_extensions:
logger.debug('Skipped %s: File extension excluded.', uri)
continue
uri = path.path_to_uri(os.path.join(media_dir, relpath))
if uri not in uris_library:
uris_update.add(uri)

View File

@ -119,26 +119,20 @@ def find_files(path):
path = path.encode('utf-8')
if os.path.isfile(path):
if not os.path.basename(path).startswith(b'.'):
yield path
else:
for dirpath, dirnames, filenames in os.walk(path, followlinks=True):
for dirname in dirnames:
if dirname.startswith(b'.'):
# Skip hidden dirs by modifying dirnames inplace
dirnames.remove(dirname)
return
for filename in filenames:
if filename.startswith(b'.'):
# Skip hidden files
continue
for dirpath, dirnames, filenames in os.walk(path, followlinks=True):
for dirname in dirnames:
if dirname.startswith(b'.'):
# Skip hidden dirs by modifying dirnames inplace
dirnames.remove(dirname)
yield os.path.join(dirpath, filename)
for filename in filenames:
if filename.startswith(b'.'):
# Skip hidden files
continue
def find_uris(path):
for p in find_files(path):
yield path_to_uri(p)
yield os.path.relpath(os.path.join(dirpath, filename), path)
def check_file_path_is_inside_base_dir(file_path, base_path):

View File

@ -1,5 +1,6 @@
from __future__ import unicode_literals
import os
import unittest
from mopidy import exceptions
@ -240,11 +241,15 @@ class ScannerTest(unittest.TestCase):
self.errors = {}
self.data = {}
def scan(self, path):
paths = path_lib.find_files(path_to_data_dir(path))
uris = (path_lib.path_to_uri(p) for p in paths)
def find(self, path):
media_dir = path_to_data_dir(path)
for path in path_lib.find_files(media_dir):
yield os.path.join(media_dir, path)
def scan(self, paths):
scanner = scan.Scanner()
for uri in uris:
for path in paths:
uri = path_lib.path_to_uri(path)
key = uri[len('file://'):]
try:
self.data[key] = scanner.scan(uri)
@ -256,15 +261,15 @@ class ScannerTest(unittest.TestCase):
self.assertEqual(self.data[name][key], value)
def test_data_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.assert_(self.data)
def test_errors_is_not_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.assert_(not self.errors)
def test_uri_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check(
'scanner/simple/song1.mp3', 'uri',
'file://%s' % path_to_data_dir('scanner/simple/song1.mp3'))
@ -273,39 +278,39 @@ class ScannerTest(unittest.TestCase):
'file://%s' % path_to_data_dir('scanner/simple/song1.ogg'))
def test_duration_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'duration', 4680000000)
self.check('scanner/simple/song1.ogg', 'duration', 4680000000)
def test_artist_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'artist', 'name')
self.check('scanner/simple/song1.ogg', 'artist', 'name')
def test_album_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'album', 'albumname')
self.check('scanner/simple/song1.ogg', 'album', 'albumname')
def test_track_is_set(self):
self.scan('scanner/simple')
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'title', 'trackname')
self.check('scanner/simple/song1.ogg', 'title', 'trackname')
def test_nonexistant_dir_does_not_fail(self):
self.scan('scanner/does-not-exist')
self.scan(self.find('scanner/does-not-exist'))
self.assert_(not self.errors)
def test_other_media_is_ignored(self):
self.scan('scanner/image')
self.scan(self.find('scanner/image'))
self.assert_(self.errors)
def test_log_file_that_gst_thinks_is_mpeg_1_is_ignored(self):
self.scan('scanner/example.log')
self.scan([path_to_data_dir('scanner/example.log')])
self.assert_(self.errors)
def test_empty_wav_file_is_ignored(self):
self.scan('scanner/empty.wav')
self.scan([path_to_data_dir('scanner/empty.wav')])
self.assert_(self.errors)
@unittest.SkipTest

0
tests/data/find/baz/file Normal file
View File

View File

0
tests/data/find/foo/file Normal file
View File

View File

@ -221,9 +221,12 @@ class FindFilesTest(unittest.TestCase):
self.assertEqual(self.find('does-not-exist'), [])
def test_file(self):
files = self.find('blank.mp3')
self.assertEqual(len(files), 1)
self.assertEqual(files[0], path_to_data_dir('blank.mp3'))
self.assertEqual([], self.find('blank.mp3'))
def test_files(self):
files = self.find('find')
excepted = [b'foo/bar/file', b'foo/file', b'baz/file']
self.assertItemsEqual(excepted, files)
def test_names_are_bytestrings(self):
is_bytes = lambda f: isinstance(f, bytes)
@ -231,35 +234,6 @@ class FindFilesTest(unittest.TestCase):
self.assert_(
is_bytes(name), '%s is not bytes object' % repr(name))
def test_ignores_hidden_dirs(self):
self.assertEqual(self.find('.hidden'), [])
def test_ignores_hidden_files(self):
self.assertEqual(self.find('.blank.mp3'), [])
class FindUrisTest(unittest.TestCase):
def find(self, value):
return list(path.find_uris(path_to_data_dir(value)))
def test_basic_dir(self):
self.assert_(self.find(''))
def test_nonexistant_dir(self):
self.assertEqual(self.find('does-not-exist'), [])
def test_file(self):
uris = self.find('blank.mp3')
expected = path.path_to_uri(path_to_data_dir('blank.mp3'))
self.assertEqual(len(uris), 1)
self.assertEqual(uris[0], expected)
def test_ignores_hidden_dirs(self):
self.assertEqual(self.find('.hidden'), [])
def test_ignores_hidden_files(self):
self.assertEqual(self.find('.blank.mp3'), [])
# TODO: kill this in favour of just os.path.getmtime + mocks
class MtimeTest(unittest.TestCase):