From 4e332da3ed21b227227db2e5e2404c2f9c81b3f2 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 26 Feb 2014 23:10:08 +0100 Subject: [PATCH 1/4] utils: Update find to be threaded. This is needed to speedup searches when using network mounted directories where we are heavily IO bound. --- mopidy/utils/path.py | 91 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 78 insertions(+), 13 deletions(-) diff --git a/mopidy/utils/path.py b/mopidy/utils/path.py index 29e8077e..66891f38 100644 --- a/mopidy/utils/path.py +++ b/mopidy/utils/path.py @@ -2,7 +2,10 @@ from __future__ import unicode_literals import logging import os +import Queue as queue +import stat import string +import threading import urllib import urlparse @@ -107,6 +110,78 @@ def expand_path(path): return path +def _find_worker(relative, hidden, done, work, results, errors): + """Worker thread for collecting stat() results. + + :param str relative: directory to make results relative to + :param bool hidden: if entries starting with . should be ignored + :param threading.Event done: event indicating that all work has been done + :param queue.Queue work: queue of paths to process + :param dict results: shared dictionary for storing all the stat() results + :param dict errors: shared dictionary for storing any per path errors + """ + while not done.is_set(): + try: + entry = work.get(block=False) + except queue.Empty: + continue + + if relative: + path = os.path.relpath(entry, relative) + else: + path = entry + + try: + st = os.lstat(entry) + if stat.S_ISDIR(st.st_mode): + for e in os.listdir(entry): + if hidden or not e.startswith(b'.'): + work.put(os.path.join(entry, e)) + elif stat.S_ISREG(st.st_mode): + results[path] = st + else: + errors[path] = 'Not a file or directory' + except os.error as e: + errors[path] = str(e) + finally: + work.task_done() + + +def _find(root, thread_count=10, hidden=True, relative=False): + """Threaded find implementation that provides stat results for files. + + Note that we do _not_ handle loops from bad sym/hardlinks in any way. + + :param str root: root directory to search from, may no be a file + :param int thread_count: number of workers to use, mainly useful to + mitigate network lag when scanning on NFS etc. + :param bool hidden: include files and directory starting with '.'? + :param bool relative: if results should be relative to root or absolute + """ + threads = [] + results = {} + errors = {} + done = threading.Event() + work = queue.Queue() + work.put(os.path.abspath(root)) + + if not relative: + root = None + + for i in range(thread_count): + t = threading.Thread(target=_find_worker, + args=(root, hidden, done, work, results, errors)) + t.daemon = True + t.start() + threads.append(t) + + work.join() + done.set() + for t in threads: + t.join() + return results, errors + + def find_files(path): """ Finds all files within a path. @@ -119,20 +194,10 @@ def find_files(path): path = path.encode('utf-8') if os.path.isfile(path): - return + return iter([]) - for dirpath, dirnames, filenames in os.walk(path, followlinks=True): - for dirname in dirnames: - if dirname.startswith(b'.'): - # Skip hidden dirs by modifying dirnames inplace - dirnames.remove(dirname) - - for filename in filenames: - if filename.startswith(b'.'): - # Skip hidden files - continue - - yield os.path.relpath(os.path.join(dirpath, filename), path) + results, errors = _find(path, hidden=False, relative=True) + return results.iterkeys() def check_file_path_is_inside_base_dir(file_path, base_path): From 5b7a6065e733200eeff8964a974542e6f5be1467 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 26 Feb 2014 23:27:44 +0100 Subject: [PATCH 2/4] util: Add find_mtimes helper --- mopidy/utils/path.py | 5 +++++ tests/utils/test_path.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/mopidy/utils/path.py b/mopidy/utils/path.py index 66891f38..51a611f0 100644 --- a/mopidy/utils/path.py +++ b/mopidy/utils/path.py @@ -200,6 +200,11 @@ def find_files(path): return results.iterkeys() +def find_mtimes(root): + results, errors = _find(root, hidden=False, relative=False) + return dict((f, int(st.st_mtime)) for f, st in results.iteritems()) + + def check_file_path_is_inside_base_dir(file_path, base_path): assert not file_path.endswith(os.sep), ( 'File path %s cannot end with a path separator' % file_path) diff --git a/tests/utils/test_path.py b/tests/utils/test_path.py index 3accab39..9793db3f 100644 --- a/tests/utils/test_path.py +++ b/tests/utils/test_path.py @@ -11,7 +11,7 @@ import glib from mopidy.utils import path -from tests import path_to_data_dir +from tests import any_int, path_to_data_dir class GetOrCreateDirTest(unittest.TestCase): @@ -235,6 +235,36 @@ class FindFilesTest(unittest.TestCase): is_bytes(name), '%s is not bytes object' % repr(name)) +class FindMTimesTest(unittest.TestCase): + maxDiff = None + + def find(self, value): + return path.find_mtimes(path_to_data_dir(value)) + + def test_basic_dir(self): + self.assert_(self.find('')) + + def test_nonexistant_dir(self): + self.assertEqual(self.find('does-not-exist'), {}) + + def test_file(self): + self.assertEqual({path_to_data_dir('blank.mp3'): any_int}, + self.find('blank.mp3')) + + def test_files(self): + mtimes = self.find('find') + expected_files = [ + b'find/foo/bar/file', b'find/foo/file', b'find/baz/file'] + expected = {path_to_data_dir(p): any_int for p in expected_files} + self.assertEqual(expected, mtimes) + + def test_names_are_bytestrings(self): + is_bytes = lambda f: isinstance(f, bytes) + for name in self.find(''): + self.assert_( + is_bytes(name), '%s is not bytes object' % repr(name)) + + # TODO: kill this in favour of just os.path.getmtime + mocks class MtimeTest(unittest.TestCase): def tearDown(self): From 78f727b9bef93b3cfe55255f5041a5e46bee5ab1 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 26 Feb 2014 23:31:11 +0100 Subject: [PATCH 3/4] local: Update how scanner work is done to improve remote FS support. This changes the scanner to prefetch all the files+mtimes using our new threaded finder. Hopefully making scanning against remote filesystems somewhat less painful. --- mopidy/local/commands.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/mopidy/local/commands.py b/mopidy/local/commands.py index fb8f5368..e5290049 100644 --- a/mopidy/local/commands.py +++ b/mopidy/local/commands.py @@ -70,32 +70,31 @@ class ScanCommand(commands.Command): library = _get_library(args, config) - uri_path_mapping = {} uris_in_library = set() uris_to_update = set() uris_to_remove = set() + file_mtimes = path.find_mtimes(media_dir) + logger.info('Found %d files in media_dir.', len(file_mtimes)) + num_tracks = library.load() logger.info('Checking %d tracks from library.', num_tracks) for track in library.begin(): - uri_path_mapping[track.uri] = translator.local_track_uri_to_path( - track.uri, media_dir) - try: - stat = os.stat(uri_path_mapping[track.uri]) - if int(stat.st_mtime) > track.last_modified: - uris_to_update.add(track.uri) - uris_in_library.add(track.uri) - except OSError: + abspath = translator.local_track_uri_to_path(track.uri, media_dir) + mtime = file_mtimes.pop(abspath, None) + if mtime is None: logger.debug('Missing file %s', track.uri) uris_to_remove.add(track.uri) + elif mtime > track.last_modified: + uris_in_library.add(track.uri) logger.info('Removing %d missing tracks.', len(uris_to_remove)) for uri in uris_to_remove: library.remove(uri) - logger.info('Checking %s for unknown tracks.', media_dir) - for relpath in path.find_files(media_dir): + for abspath in file_mtimes: + relpath = os.path.relpath(abspath, media_dir) uri = translator.path_to_local_track_uri(relpath) file_extension = os.path.splitext(relpath)[1] @@ -103,21 +102,23 @@ class ScanCommand(commands.Command): logger.debug('Skipped %s: File extension excluded.', uri) continue - if uri not in uris_in_library: - uris_to_update.add(uri) - uri_path_mapping[uri] = os.path.join(media_dir, relpath) + uris_to_update.add(uri) - logger.info('Found %d unknown tracks.', len(uris_to_update)) + logger.info( + 'Found %d tracks which need to be updated.', len(uris_to_update)) logger.info('Scanning...') - uris_to_update = sorted(uris_to_update)[:args.limit] + uris_to_update = sorted(uris_to_update, key=lambda v: v.lower()) + uris_to_update = uris_to_update[:args.limit] scanner = scan.Scanner(scan_timeout) progress = _Progress(flush_threshold, len(uris_to_update)) for uri in uris_to_update: try: - data = scanner.scan(path.path_to_uri(uri_path_mapping[uri])) + relpath = translator.local_track_uri_to_path(uri, media_dir) + file_uri = path.path_to_uri(os.path.join(media_dir, relpath)) + data = scanner.scan(file_uri) track = scan.audio_data_to_track(data).copy(uri=uri) library.add(track) logger.debug('Added %s', track.uri) From 7386dd30b6bf98d15759843f34aadb25bc955d74 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 26 Feb 2014 23:53:12 +0100 Subject: [PATCH 4/4] utils: Remove old find_files --- mopidy/local/commands.py | 7 +++---- mopidy/local/ext.conf | 1 + mopidy/utils/path.py | 18 ------------------ tests/audio/test_scan.py | 2 +- tests/utils/test_path.py | 25 ------------------------- 5 files changed, 5 insertions(+), 48 deletions(-) diff --git a/mopidy/local/commands.py b/mopidy/local/commands.py index e5290049..acff543d 100644 --- a/mopidy/local/commands.py +++ b/mopidy/local/commands.py @@ -65,8 +65,8 @@ class ScanCommand(commands.Command): scan_timeout = config['local']['scan_timeout'] flush_threshold = config['local']['scan_flush_threshold'] excluded_file_extensions = config['local']['excluded_file_extensions'] - excluded_file_extensions = set( - file_ext.lower() for file_ext in excluded_file_extensions) + excluded_file_extensions = tuple( + bytes(file_ext.lower()) for file_ext in excluded_file_extensions) library = _get_library(args, config) @@ -96,9 +96,8 @@ class ScanCommand(commands.Command): for abspath in file_mtimes: relpath = os.path.relpath(abspath, media_dir) uri = translator.path_to_local_track_uri(relpath) - file_extension = os.path.splitext(relpath)[1] - if file_extension.lower() in excluded_file_extensions: + if relpath.lower().endswith(excluded_file_extensions): logger.debug('Skipped %s: File extension excluded.', uri) continue diff --git a/mopidy/local/ext.conf b/mopidy/local/ext.conf index 8f1e860c..9a0f19f1 100644 --- a/mopidy/local/ext.conf +++ b/mopidy/local/ext.conf @@ -7,6 +7,7 @@ playlists_dir = $XDG_DATA_DIR/mopidy/local/playlists scan_timeout = 1000 scan_flush_threshold = 1000 excluded_file_extensions = + .directory .html .jpeg .jpg diff --git a/mopidy/utils/path.py b/mopidy/utils/path.py index 51a611f0..a2205c4f 100644 --- a/mopidy/utils/path.py +++ b/mopidy/utils/path.py @@ -182,24 +182,6 @@ def _find(root, thread_count=10, hidden=True, relative=False): return results, errors -def find_files(path): - """ - Finds all files within a path. - - Directories and files with names starting with ``.`` is ignored. - - :returns: yields the full path to files as bytestrings - """ - if isinstance(path, unicode): - path = path.encode('utf-8') - - if os.path.isfile(path): - return iter([]) - - results, errors = _find(path, hidden=False, relative=True) - return results.iterkeys() - - def find_mtimes(root): results, errors = _find(root, hidden=False, relative=False) return dict((f, int(st.st_mtime)) for f, st in results.iteritems()) diff --git a/tests/audio/test_scan.py b/tests/audio/test_scan.py index cc44ecd6..8d40b5f5 100644 --- a/tests/audio/test_scan.py +++ b/tests/audio/test_scan.py @@ -283,7 +283,7 @@ class ScannerTest(unittest.TestCase): def find(self, path): media_dir = path_to_data_dir(path) - for path in path_lib.find_files(media_dir): + for path in path_lib.find_mtimes(media_dir): yield os.path.join(media_dir, path) def scan(self, paths): diff --git a/tests/utils/test_path.py b/tests/utils/test_path.py index 9793db3f..078cdb20 100644 --- a/tests/utils/test_path.py +++ b/tests/utils/test_path.py @@ -210,31 +210,6 @@ class ExpandPathTest(unittest.TestCase): path.expand_path(b'/tmp/$XDG_INVALID_DIR/foo')) -class FindFilesTest(unittest.TestCase): - def find(self, value): - return list(path.find_files(path_to_data_dir(value))) - - def test_basic_dir(self): - self.assert_(self.find('')) - - def test_nonexistant_dir(self): - self.assertEqual(self.find('does-not-exist'), []) - - def test_file(self): - self.assertEqual([], self.find('blank.mp3')) - - def test_files(self): - files = self.find('find') - expected = [b'foo/bar/file', b'foo/file', b'baz/file'] - self.assertItemsEqual(expected, files) - - def test_names_are_bytestrings(self): - is_bytes = lambda f: isinstance(f, bytes) - for name in self.find(''): - self.assert_( - is_bytes(name), '%s is not bytes object' % repr(name)) - - class FindMTimesTest(unittest.TestCase): maxDiff = None