Merge pull request #538 from adamcik/feature/gst-discoverer

scanner: Switch to gst.pbutils.Discoverer and refactor API
This commit is contained in:
Stein Magnus Jodal 2013-10-20 07:03:47 -07:00
commit e6712bac93
4 changed files with 57 additions and 143 deletions

View File

@ -21,7 +21,8 @@ class Extension(ext.Extension):
schema['media_dir'] = config.Path() schema['media_dir'] = config.Path()
schema['playlists_dir'] = config.Path() schema['playlists_dir'] = config.Path()
schema['tag_cache_file'] = config.Path() schema['tag_cache_file'] = config.Path()
schema['scan_timeout'] = config.Integer(minimum=0) schema['scan_timeout'] = config.Integer(
minimum=1000, maximum=1000*60*60)
return schema return schema
def validate_environment(self): def validate_environment(self):

View File

@ -18,3 +18,7 @@ class MopidyException(Exception):
class ExtensionError(MopidyException): class ExtensionError(MopidyException):
pass pass
class ScannerError(MopidyException):
pass

View File

@ -19,8 +19,9 @@ sys.argv[1:] = []
import pygst import pygst
pygst.require('0.10') pygst.require('0.10')
import gst import gst
import gst.pbutils
from mopidy import config as config_lib, ext from mopidy import config as config_lib, exceptions, ext
from mopidy.models import Track, Artist, Album from mopidy.models import Track, Artist, Album
from mopidy.utils import log, path, versioning from mopidy.utils import log, path, versioning
@ -95,26 +96,18 @@ def main():
uris_update.add(uri) uris_update.add(uri)
logging.info('Found %d new or modified tracks.', len(uris_update)) logging.info('Found %d new or modified tracks.', len(uris_update))
def store(data):
track = translator(data)
local_updater.add(track)
logging.debug('Added %s', track.uri)
def debug(uri, error, debug):
logging.warning('Failed %s: %s', uri, error)
logging.debug('Debug info for %s: %s', uri, debug)
scan_timeout = config['local']['scan_timeout']
logging.info('Scanning new and modified tracks.') logging.info('Scanning new and modified tracks.')
# TODO: just pass the library in instead?
scanner = Scanner(uris_update, store, debug, scan_timeout) scanner = Scanner(config['local']['scan_timeout'])
try: for uri in uris_update:
scanner.start() try:
except KeyboardInterrupt: data = scanner.scan(uri)
scanner.stop() data[b'mtime'] = os.path.getmtime(path.uri_to_path(uri))
raise track = translator(data)
local_updater.add(track)
logging.debug('Added %s', track.uri)
except exceptions.ScannerError as error:
logging.warning('Failed %s: %s', uri, error)
logging.info('Done scanning; commiting changes.') logging.info('Done scanning; commiting changes.')
local_updater.commit() local_updater.commit()
@ -186,125 +179,43 @@ def translator(data):
class Scanner(object): class Scanner(object):
def __init__( def __init__(self, timeout=1000):
self, uris, data_callback, error_callback=None, scan_timeout=1000): self.discoverer = gst.pbutils.Discoverer(timeout * 1000000)
self.data = {}
self.uris = iter(uris)
self.data_callback = data_callback
self.error_callback = error_callback
self.scan_timeout = scan_timeout
self.loop = gobject.MainLoop()
self.timeout_id = None
self.fakesink = gst.element_factory_make('fakesink')
self.fakesink.set_property('signal-handoffs', True)
self.fakesink.connect('handoff', self.process_handoff)
self.uribin = gst.element_factory_make('uridecodebin')
self.uribin.set_property(
'caps', gst.Caps(b'audio/x-raw-int; audio/x-raw-float'))
self.uribin.connect('pad-added', self.process_new_pad)
self.pipe = gst.element_factory_make('pipeline')
self.pipe.add(self.uribin)
self.pipe.add(self.fakesink)
bus = self.pipe.get_bus()
bus.add_signal_watch()
bus.connect('message::application', self.process_application)
bus.connect('message::tag', self.process_tags)
bus.connect('message::error', self.process_error)
def process_handoff(self, fakesink, buffer_, pad):
# When this function is called the first buffer has reached the end of
# the pipeline, and we can continue with the next track. Since we're
# in another thread, we send a message back to the main thread using
# the bus.
structure = gst.Structure('handoff')
message = gst.message_new_application(fakesink, structure)
bus = self.pipe.get_bus()
bus.post(message)
def process_new_pad(self, source, pad):
pad.link(self.fakesink.get_pad('sink'))
def process_application(self, bus, message):
if message.src != self.fakesink:
return
if message.structure.get_name() != 'handoff':
return
uri = unicode(self.uribin.get_property('uri'))
self.data['uri'] = uri
self.data['mtime'] = os.path.getmtime(path.uri_to_path(uri))
self.data[gst.TAG_DURATION] = self.get_duration()
def scan(self, uri):
try: try:
self.data_callback(self.data) info = self.discoverer.discover_uri(uri)
self.next_uri() except gobject.GError as e:
except KeyboardInterrupt: # Loosing traceback is non-issue since this is from C code.
self.stop() raise exceptions.ScannerError(e)
def process_tags(self, bus, message): data = {}
taglist = message.parse_tag() audio_streams = info.get_audio_streams()
for key in taglist.keys(): if not audio_streams:
# XXX: For some crazy reason some wma files spit out lists here, raise exceptions.ScannerError('Did not find any audio streams.')
# not sure if this is due to better data in headers or wma being
# stupid. So ugly hack for now :/
if type(taglist[key]) is list:
self.data[key] = taglist[key][0]
else:
self.data[key] = taglist[key]
def process_error(self, bus, message): for stream in audio_streams:
if self.error_callback: taglist = stream.get_tags()
uri = self.uribin.get_property('uri') if not taglist:
error, debug = message.parse_error() continue
self.error_callback(uri, error, debug) for key in taglist.keys():
self.next_uri() # XXX: For some crazy reason some wma files spit out lists
# here, not sure if this is due to better data in headers or
# wma being stupid. So ugly hack for now :/
if type(taglist[key]) is list:
data[key] = taglist[key][0]
else:
data[key] = taglist[key]
def process_timeout(self): # Never trust metadata for these fields:
if self.error_callback: data[b'uri'] = uri
uri = self.uribin.get_property('uri') data[b'duration'] = info.get_duration() // gst.MSECOND
self.error_callback(
uri, 'Scan timed out after %d ms' % self.scan_timeout, None)
self.next_uri()
return False
def get_duration(self): if data[b'duration'] == 0:
self.pipe.get_state() # Block until state change is done. raise exceptions.ScannerError('Rejecting zero length audio.')
try:
return self.pipe.query_duration(
gst.FORMAT_TIME, None)[0] // gst.MSECOND
except gst.QueryError:
return None
def next_uri(self): return data
self.data = {}
if self.timeout_id:
gobject.source_remove(self.timeout_id)
self.timeout_id = None
try:
uri = next(self.uris)
except StopIteration:
self.stop()
return False
self.pipe.set_state(gst.STATE_NULL)
self.uribin.set_property('uri', uri)
self.timeout_id = gobject.timeout_add(
self.scan_timeout, self.process_timeout)
self.pipe.set_state(gst.STATE_PLAYING)
return True
def start(self):
if self.next_uri():
self.loop.run()
def stop(self):
self.pipe.set_state(gst.STATE_NULL)
self.loop.quit()
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -2,6 +2,7 @@ from __future__ import unicode_literals
import unittest import unittest
from mopidy import exceptions
from mopidy.scanner import Scanner, translator from mopidy.scanner import Scanner, translator
from mopidy.models import Track, Artist, Album from mopidy.models import Track, Artist, Album
from mopidy.utils import path as path_lib from mopidy.utils import path as path_lib
@ -150,21 +151,18 @@ class ScannerTest(unittest.TestCase):
def scan(self, path): def scan(self, path):
paths = path_lib.find_files(path_to_data_dir(path)) paths = path_lib.find_files(path_to_data_dir(path))
uris = (path_lib.path_to_uri(p) for p in paths) uris = (path_lib.path_to_uri(p) for p in paths)
scanner = Scanner(uris, self.data_callback, self.error_callback) scanner = Scanner()
scanner.start() for uri in uris:
key = uri[len('file://'):]
try:
self.data[key] = scanner.scan(uri)
except exceptions.ScannerError as error:
self.errors[key] = error
def check(self, name, key, value): def check(self, name, key, value):
name = path_to_data_dir(name) name = path_to_data_dir(name)
self.assertEqual(self.data[name][key], value) self.assertEqual(self.data[name][key], value)
def data_callback(self, data):
uri = data['uri'][len('file://'):]
self.data[uri] = data
def error_callback(self, uri, error, debug):
uri = uri[len('file://'):]
self.errors[uri] = (error, debug)
def test_data_is_set(self): def test_data_is_set(self):
self.scan('scanner/simple') self.scan('scanner/simple')
self.assert_(self.data) self.assert_(self.data)
@ -210,7 +208,7 @@ class ScannerTest(unittest.TestCase):
self.scan('scanner/image') self.scan('scanner/image')
self.assert_(self.errors) self.assert_(self.errors)
def test_log_file_is_ignored(self): def test_log_file_that_gst_thinks_is_mpeg_1_is_ignored(self):
self.scan('scanner/example.log') self.scan('scanner/example.log')
self.assert_(self.errors) self.assert_(self.errors)