Merge pull request #707 into develop
Improve scan performance for remote filesystems
This commit is contained in:
commit
957036c899
@ -65,59 +65,59 @@ class ScanCommand(commands.Command):
|
||||
scan_timeout = config['local']['scan_timeout']
|
||||
flush_threshold = config['local']['scan_flush_threshold']
|
||||
excluded_file_extensions = config['local']['excluded_file_extensions']
|
||||
excluded_file_extensions = set(
|
||||
file_ext.lower() for file_ext in excluded_file_extensions)
|
||||
excluded_file_extensions = tuple(
|
||||
bytes(file_ext.lower()) for file_ext in excluded_file_extensions)
|
||||
|
||||
library = _get_library(args, config)
|
||||
|
||||
uri_path_mapping = {}
|
||||
uris_in_library = set()
|
||||
uris_to_update = set()
|
||||
uris_to_remove = set()
|
||||
|
||||
file_mtimes = path.find_mtimes(media_dir)
|
||||
logger.info('Found %d files in media_dir.', len(file_mtimes))
|
||||
|
||||
num_tracks = library.load()
|
||||
logger.info('Checking %d tracks from library.', num_tracks)
|
||||
|
||||
for track in library.begin():
|
||||
uri_path_mapping[track.uri] = translator.local_track_uri_to_path(
|
||||
track.uri, media_dir)
|
||||
try:
|
||||
stat = os.stat(uri_path_mapping[track.uri])
|
||||
if int(stat.st_mtime) > track.last_modified:
|
||||
uris_to_update.add(track.uri)
|
||||
uris_in_library.add(track.uri)
|
||||
except OSError:
|
||||
abspath = translator.local_track_uri_to_path(track.uri, media_dir)
|
||||
mtime = file_mtimes.pop(abspath, None)
|
||||
if mtime is None:
|
||||
logger.debug('Missing file %s', track.uri)
|
||||
uris_to_remove.add(track.uri)
|
||||
elif mtime > track.last_modified:
|
||||
uris_in_library.add(track.uri)
|
||||
|
||||
logger.info('Removing %d missing tracks.', len(uris_to_remove))
|
||||
for uri in uris_to_remove:
|
||||
library.remove(uri)
|
||||
|
||||
logger.info('Checking %s for unknown tracks.', media_dir)
|
||||
for relpath in path.find_files(media_dir):
|
||||
for abspath in file_mtimes:
|
||||
relpath = os.path.relpath(abspath, media_dir)
|
||||
uri = translator.path_to_local_track_uri(relpath)
|
||||
file_extension = os.path.splitext(relpath)[1]
|
||||
|
||||
if file_extension.lower() in excluded_file_extensions:
|
||||
if relpath.lower().endswith(excluded_file_extensions):
|
||||
logger.debug('Skipped %s: File extension excluded.', uri)
|
||||
continue
|
||||
|
||||
if uri not in uris_in_library:
|
||||
uris_to_update.add(uri)
|
||||
uri_path_mapping[uri] = os.path.join(media_dir, relpath)
|
||||
uris_to_update.add(uri)
|
||||
|
||||
logger.info('Found %d unknown tracks.', len(uris_to_update))
|
||||
logger.info(
|
||||
'Found %d tracks which need to be updated.', len(uris_to_update))
|
||||
logger.info('Scanning...')
|
||||
|
||||
uris_to_update = sorted(uris_to_update)[:args.limit]
|
||||
uris_to_update = sorted(uris_to_update, key=lambda v: v.lower())
|
||||
uris_to_update = uris_to_update[:args.limit]
|
||||
|
||||
scanner = scan.Scanner(scan_timeout)
|
||||
progress = _Progress(flush_threshold, len(uris_to_update))
|
||||
|
||||
for uri in uris_to_update:
|
||||
try:
|
||||
data = scanner.scan(path.path_to_uri(uri_path_mapping[uri]))
|
||||
relpath = translator.local_track_uri_to_path(uri, media_dir)
|
||||
file_uri = path.path_to_uri(os.path.join(media_dir, relpath))
|
||||
data = scanner.scan(file_uri)
|
||||
track = scan.audio_data_to_track(data).copy(uri=uri)
|
||||
library.add(track)
|
||||
logger.debug('Added %s', track.uri)
|
||||
|
||||
@ -7,6 +7,7 @@ playlists_dir = $XDG_DATA_DIR/mopidy/local/playlists
|
||||
scan_timeout = 1000
|
||||
scan_flush_threshold = 1000
|
||||
excluded_file_extensions =
|
||||
.directory
|
||||
.html
|
||||
.jpeg
|
||||
.jpg
|
||||
|
||||
@ -2,7 +2,10 @@ from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import Queue as queue
|
||||
import stat
|
||||
import string
|
||||
import threading
|
||||
import urllib
|
||||
import urlparse
|
||||
|
||||
@ -107,32 +110,81 @@ def expand_path(path):
|
||||
return path
|
||||
|
||||
|
||||
def find_files(path):
|
||||
def _find_worker(relative, hidden, done, work, results, errors):
|
||||
"""Worker thread for collecting stat() results.
|
||||
|
||||
:param str relative: directory to make results relative to
|
||||
:param bool hidden: if entries starting with . should be ignored
|
||||
:param threading.Event done: event indicating that all work has been done
|
||||
:param queue.Queue work: queue of paths to process
|
||||
:param dict results: shared dictionary for storing all the stat() results
|
||||
:param dict errors: shared dictionary for storing any per path errors
|
||||
"""
|
||||
Finds all files within a path.
|
||||
while not done.is_set():
|
||||
try:
|
||||
entry = work.get(block=False)
|
||||
except queue.Empty:
|
||||
continue
|
||||
|
||||
Directories and files with names starting with ``.`` is ignored.
|
||||
if relative:
|
||||
path = os.path.relpath(entry, relative)
|
||||
else:
|
||||
path = entry
|
||||
|
||||
:returns: yields the full path to files as bytestrings
|
||||
try:
|
||||
st = os.lstat(entry)
|
||||
if stat.S_ISDIR(st.st_mode):
|
||||
for e in os.listdir(entry):
|
||||
if hidden or not e.startswith(b'.'):
|
||||
work.put(os.path.join(entry, e))
|
||||
elif stat.S_ISREG(st.st_mode):
|
||||
results[path] = st
|
||||
else:
|
||||
errors[path] = 'Not a file or directory'
|
||||
except os.error as e:
|
||||
errors[path] = str(e)
|
||||
finally:
|
||||
work.task_done()
|
||||
|
||||
|
||||
def _find(root, thread_count=10, hidden=True, relative=False):
|
||||
"""Threaded find implementation that provides stat results for files.
|
||||
|
||||
Note that we do _not_ handle loops from bad sym/hardlinks in any way.
|
||||
|
||||
:param str root: root directory to search from, may no be a file
|
||||
:param int thread_count: number of workers to use, mainly useful to
|
||||
mitigate network lag when scanning on NFS etc.
|
||||
:param bool hidden: include files and directory starting with '.'?
|
||||
:param bool relative: if results should be relative to root or absolute
|
||||
"""
|
||||
if isinstance(path, unicode):
|
||||
path = path.encode('utf-8')
|
||||
threads = []
|
||||
results = {}
|
||||
errors = {}
|
||||
done = threading.Event()
|
||||
work = queue.Queue()
|
||||
work.put(os.path.abspath(root))
|
||||
|
||||
if os.path.isfile(path):
|
||||
return
|
||||
if not relative:
|
||||
root = None
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(path, followlinks=True):
|
||||
for dirname in dirnames:
|
||||
if dirname.startswith(b'.'):
|
||||
# Skip hidden dirs by modifying dirnames inplace
|
||||
dirnames.remove(dirname)
|
||||
for i in range(thread_count):
|
||||
t = threading.Thread(target=_find_worker,
|
||||
args=(root, hidden, done, work, results, errors))
|
||||
t.daemon = True
|
||||
t.start()
|
||||
threads.append(t)
|
||||
|
||||
for filename in filenames:
|
||||
if filename.startswith(b'.'):
|
||||
# Skip hidden files
|
||||
continue
|
||||
work.join()
|
||||
done.set()
|
||||
for t in threads:
|
||||
t.join()
|
||||
return results, errors
|
||||
|
||||
yield os.path.relpath(os.path.join(dirpath, filename), path)
|
||||
|
||||
def find_mtimes(root):
|
||||
results, errors = _find(root, hidden=False, relative=False)
|
||||
return dict((f, int(st.st_mtime)) for f, st in results.iteritems())
|
||||
|
||||
|
||||
def check_file_path_is_inside_base_dir(file_path, base_path):
|
||||
|
||||
@ -283,7 +283,7 @@ class ScannerTest(unittest.TestCase):
|
||||
|
||||
def find(self, path):
|
||||
media_dir = path_to_data_dir(path)
|
||||
for path in path_lib.find_files(media_dir):
|
||||
for path in path_lib.find_mtimes(media_dir):
|
||||
yield os.path.join(media_dir, path)
|
||||
|
||||
def scan(self, paths):
|
||||
|
||||
@ -11,7 +11,7 @@ import glib
|
||||
|
||||
from mopidy.utils import path
|
||||
|
||||
from tests import path_to_data_dir
|
||||
from tests import any_int, path_to_data_dir
|
||||
|
||||
|
||||
class GetOrCreateDirTest(unittest.TestCase):
|
||||
@ -210,23 +210,28 @@ class ExpandPathTest(unittest.TestCase):
|
||||
path.expand_path(b'/tmp/$XDG_INVALID_DIR/foo'))
|
||||
|
||||
|
||||
class FindFilesTest(unittest.TestCase):
|
||||
class FindMTimesTest(unittest.TestCase):
|
||||
maxDiff = None
|
||||
|
||||
def find(self, value):
|
||||
return list(path.find_files(path_to_data_dir(value)))
|
||||
return path.find_mtimes(path_to_data_dir(value))
|
||||
|
||||
def test_basic_dir(self):
|
||||
self.assert_(self.find(''))
|
||||
|
||||
def test_nonexistant_dir(self):
|
||||
self.assertEqual(self.find('does-not-exist'), [])
|
||||
self.assertEqual(self.find('does-not-exist'), {})
|
||||
|
||||
def test_file(self):
|
||||
self.assertEqual([], self.find('blank.mp3'))
|
||||
self.assertEqual({path_to_data_dir('blank.mp3'): any_int},
|
||||
self.find('blank.mp3'))
|
||||
|
||||
def test_files(self):
|
||||
files = self.find('find')
|
||||
expected = [b'foo/bar/file', b'foo/file', b'baz/file']
|
||||
self.assertItemsEqual(expected, files)
|
||||
mtimes = self.find('find')
|
||||
expected_files = [
|
||||
b'find/foo/bar/file', b'find/foo/file', b'find/baz/file']
|
||||
expected = {path_to_data_dir(p): any_int for p in expected_files}
|
||||
self.assertEqual(expected, mtimes)
|
||||
|
||||
def test_names_are_bytestrings(self):
|
||||
is_bytes = lambda f: isinstance(f, bytes)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user