diff --git a/mopidy/backends/local/__init__.py b/mopidy/backends/local/__init__.py index 5c6fec47..9d1e655f 100644 --- a/mopidy/backends/local/__init__.py +++ b/mopidy/backends/local/__init__.py @@ -21,7 +21,8 @@ class Extension(ext.Extension): schema['media_dir'] = config.Path() schema['playlists_dir'] = config.Path() schema['tag_cache_file'] = config.Path() - schema['scan_timeout'] = config.Integer(minimum=0) + schema['scan_timeout'] = config.Integer( + minimum=1000, maximum=1000*60*60) return schema def validate_environment(self): diff --git a/mopidy/exceptions.py b/mopidy/exceptions.py index 2c53e3e4..025d8fad 100644 --- a/mopidy/exceptions.py +++ b/mopidy/exceptions.py @@ -18,3 +18,7 @@ class MopidyException(Exception): class ExtensionError(MopidyException): pass + + +class ScannerError(MopidyException): + pass diff --git a/mopidy/scanner.py b/mopidy/scanner.py index 4acf49ba..3d2e6ddc 100644 --- a/mopidy/scanner.py +++ b/mopidy/scanner.py @@ -19,8 +19,9 @@ sys.argv[1:] = [] import pygst pygst.require('0.10') import gst +import gst.pbutils -from mopidy import config as config_lib, ext +from mopidy import config as config_lib, exceptions, ext from mopidy.models import Track, Artist, Album from mopidy.utils import log, path, versioning @@ -95,26 +96,18 @@ def main(): uris_update.add(uri) logging.info('Found %d new or modified tracks.', len(uris_update)) - - def store(data): - track = translator(data) - local_updater.add(track) - logging.debug('Added %s', track.uri) - - def debug(uri, error, debug): - logging.warning('Failed %s: %s', uri, error) - logging.debug('Debug info for %s: %s', uri, debug) - - scan_timeout = config['local']['scan_timeout'] - logging.info('Scanning new and modified tracks.') - # TODO: just pass the library in instead? - scanner = Scanner(uris_update, store, debug, scan_timeout) - try: - scanner.start() - except KeyboardInterrupt: - scanner.stop() - raise + + scanner = Scanner(config['local']['scan_timeout']) + for uri in uris_update: + try: + data = scanner.scan(uri) + data[b'mtime'] = os.path.getmtime(path.uri_to_path(uri)) + track = translator(data) + local_updater.add(track) + logging.debug('Added %s', track.uri) + except exceptions.ScannerError as error: + logging.warning('Failed %s: %s', uri, error) logging.info('Done scanning; commiting changes.') local_updater.commit() @@ -186,125 +179,43 @@ def translator(data): class Scanner(object): - def __init__( - self, uris, data_callback, error_callback=None, scan_timeout=1000): - self.data = {} - self.uris = iter(uris) - self.data_callback = data_callback - self.error_callback = error_callback - self.scan_timeout = scan_timeout - self.loop = gobject.MainLoop() - self.timeout_id = None - - self.fakesink = gst.element_factory_make('fakesink') - self.fakesink.set_property('signal-handoffs', True) - self.fakesink.connect('handoff', self.process_handoff) - - self.uribin = gst.element_factory_make('uridecodebin') - self.uribin.set_property( - 'caps', gst.Caps(b'audio/x-raw-int; audio/x-raw-float')) - self.uribin.connect('pad-added', self.process_new_pad) - - self.pipe = gst.element_factory_make('pipeline') - self.pipe.add(self.uribin) - self.pipe.add(self.fakesink) - - bus = self.pipe.get_bus() - bus.add_signal_watch() - bus.connect('message::application', self.process_application) - bus.connect('message::tag', self.process_tags) - bus.connect('message::error', self.process_error) - - def process_handoff(self, fakesink, buffer_, pad): - # When this function is called the first buffer has reached the end of - # the pipeline, and we can continue with the next track. Since we're - # in another thread, we send a message back to the main thread using - # the bus. - structure = gst.Structure('handoff') - message = gst.message_new_application(fakesink, structure) - bus = self.pipe.get_bus() - bus.post(message) - - def process_new_pad(self, source, pad): - pad.link(self.fakesink.get_pad('sink')) - - def process_application(self, bus, message): - if message.src != self.fakesink: - return - - if message.structure.get_name() != 'handoff': - return - - uri = unicode(self.uribin.get_property('uri')) - self.data['uri'] = uri - self.data['mtime'] = os.path.getmtime(path.uri_to_path(uri)) - self.data[gst.TAG_DURATION] = self.get_duration() + def __init__(self, timeout=1000): + self.discoverer = gst.pbutils.Discoverer(timeout * 1000000) + def scan(self, uri): try: - self.data_callback(self.data) - self.next_uri() - except KeyboardInterrupt: - self.stop() + info = self.discoverer.discover_uri(uri) + except gobject.GError as e: + # Loosing traceback is non-issue since this is from C code. + raise exceptions.ScannerError(e) - def process_tags(self, bus, message): - taglist = message.parse_tag() + data = {} + audio_streams = info.get_audio_streams() - for key in taglist.keys(): - # XXX: For some crazy reason some wma files spit out lists here, - # not sure if this is due to better data in headers or wma being - # stupid. So ugly hack for now :/ - if type(taglist[key]) is list: - self.data[key] = taglist[key][0] - else: - self.data[key] = taglist[key] + if not audio_streams: + raise exceptions.ScannerError('Did not find any audio streams.') - def process_error(self, bus, message): - if self.error_callback: - uri = self.uribin.get_property('uri') - error, debug = message.parse_error() - self.error_callback(uri, error, debug) - self.next_uri() + for stream in audio_streams: + taglist = stream.get_tags() + if not taglist: + continue + for key in taglist.keys(): + # XXX: For some crazy reason some wma files spit out lists + # here, not sure if this is due to better data in headers or + # wma being stupid. So ugly hack for now :/ + if type(taglist[key]) is list: + data[key] = taglist[key][0] + else: + data[key] = taglist[key] - def process_timeout(self): - if self.error_callback: - uri = self.uribin.get_property('uri') - self.error_callback( - uri, 'Scan timed out after %d ms' % self.scan_timeout, None) - self.next_uri() - return False + # Never trust metadata for these fields: + data[b'uri'] = uri + data[b'duration'] = info.get_duration() // gst.MSECOND - def get_duration(self): - self.pipe.get_state() # Block until state change is done. - try: - return self.pipe.query_duration( - gst.FORMAT_TIME, None)[0] // gst.MSECOND - except gst.QueryError: - return None + if data[b'duration'] == 0: + raise exceptions.ScannerError('Rejecting zero length audio.') - def next_uri(self): - self.data = {} - if self.timeout_id: - gobject.source_remove(self.timeout_id) - self.timeout_id = None - try: - uri = next(self.uris) - except StopIteration: - self.stop() - return False - self.pipe.set_state(gst.STATE_NULL) - self.uribin.set_property('uri', uri) - self.timeout_id = gobject.timeout_add( - self.scan_timeout, self.process_timeout) - self.pipe.set_state(gst.STATE_PLAYING) - return True - - def start(self): - if self.next_uri(): - self.loop.run() - - def stop(self): - self.pipe.set_state(gst.STATE_NULL) - self.loop.quit() + return data if __name__ == '__main__': diff --git a/tests/scanner_test.py b/tests/scanner_test.py index ca007533..deae4835 100644 --- a/tests/scanner_test.py +++ b/tests/scanner_test.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals import unittest +from mopidy import exceptions from mopidy.scanner import Scanner, translator from mopidy.models import Track, Artist, Album from mopidy.utils import path as path_lib @@ -150,21 +151,18 @@ class ScannerTest(unittest.TestCase): def scan(self, path): paths = path_lib.find_files(path_to_data_dir(path)) uris = (path_lib.path_to_uri(p) for p in paths) - scanner = Scanner(uris, self.data_callback, self.error_callback) - scanner.start() + scanner = Scanner() + for uri in uris: + key = uri[len('file://'):] + try: + self.data[key] = scanner.scan(uri) + except exceptions.ScannerError as error: + self.errors[key] = error def check(self, name, key, value): name = path_to_data_dir(name) self.assertEqual(self.data[name][key], value) - def data_callback(self, data): - uri = data['uri'][len('file://'):] - self.data[uri] = data - - def error_callback(self, uri, error, debug): - uri = uri[len('file://'):] - self.errors[uri] = (error, debug) - def test_data_is_set(self): self.scan('scanner/simple') self.assert_(self.data) @@ -210,7 +208,7 @@ class ScannerTest(unittest.TestCase): self.scan('scanner/image') self.assert_(self.errors) - def test_log_file_is_ignored(self): + def test_log_file_that_gst_thinks_is_mpeg_1_is_ignored(self): self.scan('scanner/example.log') self.assert_(self.errors)