utils: Update find to be threaded.

This is needed to speedup searches when using network mounted directories where
we are heavily IO bound.
This commit is contained in:
Thomas Adamcik 2014-02-26 23:10:08 +01:00
parent 590ce19148
commit 4e332da3ed

View File

@ -2,7 +2,10 @@ from __future__ import unicode_literals
import logging
import os
import Queue as queue
import stat
import string
import threading
import urllib
import urlparse
@ -107,6 +110,78 @@ def expand_path(path):
return path
def _find_worker(relative, hidden, done, work, results, errors):
"""Worker thread for collecting stat() results.
:param str relative: directory to make results relative to
:param bool hidden: if entries starting with . should be ignored
:param threading.Event done: event indicating that all work has been done
:param queue.Queue work: queue of paths to process
:param dict results: shared dictionary for storing all the stat() results
:param dict errors: shared dictionary for storing any per path errors
"""
while not done.is_set():
try:
entry = work.get(block=False)
except queue.Empty:
continue
if relative:
path = os.path.relpath(entry, relative)
else:
path = entry
try:
st = os.lstat(entry)
if stat.S_ISDIR(st.st_mode):
for e in os.listdir(entry):
if hidden or not e.startswith(b'.'):
work.put(os.path.join(entry, e))
elif stat.S_ISREG(st.st_mode):
results[path] = st
else:
errors[path] = 'Not a file or directory'
except os.error as e:
errors[path] = str(e)
finally:
work.task_done()
def _find(root, thread_count=10, hidden=True, relative=False):
"""Threaded find implementation that provides stat results for files.
Note that we do _not_ handle loops from bad sym/hardlinks in any way.
:param str root: root directory to search from, may no be a file
:param int thread_count: number of workers to use, mainly useful to
mitigate network lag when scanning on NFS etc.
:param bool hidden: include files and directory starting with '.'?
:param bool relative: if results should be relative to root or absolute
"""
threads = []
results = {}
errors = {}
done = threading.Event()
work = queue.Queue()
work.put(os.path.abspath(root))
if not relative:
root = None
for i in range(thread_count):
t = threading.Thread(target=_find_worker,
args=(root, hidden, done, work, results, errors))
t.daemon = True
t.start()
threads.append(t)
work.join()
done.set()
for t in threads:
t.join()
return results, errors
def find_files(path):
"""
Finds all files within a path.
@ -119,20 +194,10 @@ def find_files(path):
path = path.encode('utf-8')
if os.path.isfile(path):
return
return iter([])
for dirpath, dirnames, filenames in os.walk(path, followlinks=True):
for dirname in dirnames:
if dirname.startswith(b'.'):
# Skip hidden dirs by modifying dirnames inplace
dirnames.remove(dirname)
for filename in filenames:
if filename.startswith(b'.'):
# Skip hidden files
continue
yield os.path.relpath(os.path.join(dirpath, filename), path)
results, errors = _find(path, hidden=False, relative=True)
return results.iterkeys()
def check_file_path_is_inside_base_dir(file_path, base_path):