diff --git a/docs/changelog.rst b/docs/changelog.rst index 650e402d..f06f291d 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -154,6 +154,13 @@ v0.20.0 (UNRELEASED) - Update scanner to operate with milliseconds for duration. + - Update scanner to use a custom src, typefind and decodebin. This allows us + to catch playlists before we try to decode them. + + - Refactored scanner to create a new pipeline per song, this is needed as + reseting decodebin is much slower than tearing it down and making a fresh + one. + - Add :meth:`mopidy.audio.AudioListener.tags_changed`. Notifies core when new tags are found. @@ -173,6 +180,12 @@ v0.20.0 (UNRELEASED) - Add workaround for volume not persisting across tracks on OS X. (Issue: :issue:`886`, PR: :issue:`958`) +- Improved missing plugin error reporting in scanner. + +- Introduced a new return type for the scanner, a named tuple with ``uri``, + ``tags``, ``duration``, ``seekable`` and ``mime``. Also added support for + checking seekable, and the initial MIME type guess. + **Stream backend** - Add basic tests for the stream library provider. diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index 38b86437..3880d91a 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -1,16 +1,25 @@ from __future__ import absolute_import, division, unicode_literals -import time +import collections import pygst pygst.require('0.10') import gst # noqa +import gst.pbutils from mopidy import exceptions from mopidy.audio import utils from mopidy.utils import encoding +_missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description +_Result = collections.namedtuple( + 'Result', ('uri', 'tags', 'duration', 'seekable', 'mime')) + +_RAW_AUDIO = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') + + +# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)? class Scanner(object): """ Helper to get tags and other relevant info from URIs. @@ -21,29 +30,8 @@ class Scanner(object): """ def __init__(self, timeout=1000, proxy_config=None): - self._timeout_ms = timeout - - sink = gst.element_factory_make('fakesink') - - audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') - - def pad_added(src, pad): - return pad.link(sink.get_pad('sink')) - - def source_setup(element, source): - utils.setup_proxy(source, proxy_config or {}) - - self._uribin = gst.element_factory_make('uridecodebin') - self._uribin.set_property('caps', audio_caps) - self._uribin.connect('pad-added', pad_added) - self._uribin.connect('source-setup', source_setup) - - self._pipe = gst.element_factory_make('pipeline') - self._pipe.add(self._uribin) - self._pipe.add(sink) - - self._bus = self._pipe.get_bus() - self._bus.set_flushing(True) + self._timeout_ms = int(timeout) + self._proxy_config = proxy_config or {} def scan(self, uri): """ @@ -51,68 +39,124 @@ class Scanner(object): :param uri: URI of the resource to scan. :type event: string - :return: (tags, duration) pair. tags is a dictionary of lists for all - the tags we found and duration is the length of the URI in - milliseconds, or :class:`None` if the URI has no duration. + :return: A named tuple containing + ``(uri, tags, duration, seekable, mime)``. + ``tags`` is a dictionary of lists for all the tags we found. + ``duration`` is the length of the URI in milliseconds, or + :class:`None` if the URI has no duration. ``seekable`` is boolean. + indicating if a seek would succeed. """ - tags, duration = None, None + tags, duration, seekable, mime = None, None, None, None + pipeline = _setup_pipeline(uri, self._proxy_config) + try: - self._setup(uri) - tags = self._collect() - duration = self._query_duration() + _start_pipeline(pipeline) + tags, mime = _process(pipeline, self._timeout_ms) + duration = _query_duration(pipeline) + seekable = _query_seekable(pipeline) finally: - self._reset() + pipeline.set_state(gst.STATE_NULL) + del pipeline - return tags, duration + return _Result(uri, tags, duration, seekable, mime) - def _setup(self, uri): - """Primes the pipeline for collection.""" - self._pipe.set_state(gst.STATE_READY) - self._uribin.set_property(b'uri', uri) - self._bus.set_flushing(False) - result = self._pipe.set_state(gst.STATE_PAUSED) - if result == gst.STATE_CHANGE_NO_PREROLL: - # Live sources don't pre-roll, so set to playing to get data. - self._pipe.set_state(gst.STATE_PLAYING) - def _collect(self): - """Polls for messages to collect data.""" - start = time.time() - timeout_s = self._timeout_ms / 1000.0 - tags = {} +# Turns out it's _much_ faster to just create a new pipeline for every as +# decodebins and other elements don't seem to take well to being reused. +def _setup_pipeline(uri, proxy_config=None): + src = gst.element_make_from_uri(gst.URI_SRC, uri) + if not src: + raise exceptions.ScannerError('GStreamer can not open: %s' % uri) - while time.time() - start < timeout_s: - if not self._bus.have_pending(): - continue - message = self._bus.pop() + typefind = gst.element_factory_make('typefind') + decodebin = gst.element_factory_make('decodebin2') + sink = gst.element_factory_make('fakesink') - if message.type == gst.MESSAGE_ERROR: - raise exceptions.ScannerError( - encoding.locale_decode(message.parse_error()[0])) - elif message.type == gst.MESSAGE_EOS: - return tags - elif message.type == gst.MESSAGE_ASYNC_DONE: - if message.src == self._pipe: - return tags - elif message.type == gst.MESSAGE_TAG: - taglist = message.parse_tag() - # Note that this will only keep the last tag. - tags.update(utils.convert_taglist(taglist)) + pipeline = gst.element_factory_make('pipeline') + pipeline.add_many(src, typefind, decodebin, sink) + gst.element_link_many(src, typefind, decodebin) - raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms) + if proxy_config: + utils.setup_proxy(src, proxy_config) - def _reset(self): - """Ensures we cleanup child elements and flush the bus.""" - self._bus.set_flushing(True) - self._pipe.set_state(gst.STATE_NULL) + decodebin.set_property('caps', _RAW_AUDIO) + decodebin.connect('pad-added', _pad_added, sink) + typefind.connect('have-type', _have_type, decodebin) - def _query_duration(self): - try: - duration = self._pipe.query_duration(gst.FORMAT_TIME, None)[0] - except gst.QueryError: - return None + return pipeline - if duration < 0: - return None - else: - return duration // gst.MSECOND + +def _have_type(element, probability, caps, decodebin): + decodebin.set_property('sink-caps', caps) + msg = gst.message_new_application(element, caps.get_structure(0)) + element.get_bus().post(msg) + + +def _pad_added(element, pad, sink): + return pad.link(sink.get_pad('sink')) + + +def _start_pipeline(pipeline): + if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_NO_PREROLL: + pipeline.set_state(gst.STATE_PLAYING) + + +def _query_duration(pipeline): + try: + duration = pipeline.query_duration(gst.FORMAT_TIME, None)[0] + except gst.QueryError: + return None + + if duration < 0: + return None + else: + return duration // gst.MSECOND + + +def _query_seekable(pipeline): + query = gst.query_new_seeking(gst.FORMAT_TIME) + pipeline.query(query) + return query.parse_seeking()[1] + + +def _process(pipeline, timeout_ms): + clock = pipeline.get_clock() + bus = pipeline.get_bus() + timeout = timeout_ms * gst.MSECOND + tags, mime, missing_description = {}, None, None + + types = (gst.MESSAGE_ELEMENT | gst.MESSAGE_APPLICATION | gst.MESSAGE_ERROR + | gst.MESSAGE_EOS | gst.MESSAGE_ASYNC_DONE | gst.MESSAGE_TAG) + + start = clock.get_time() + while timeout > 0: + message = bus.timed_pop_filtered(timeout, types) + + if message is None: + break + elif message.type == gst.MESSAGE_ELEMENT: + if gst.pbutils.is_missing_plugin_message(message): + missing_description = encoding.locale_decode( + _missing_plugin_desc(message)) + elif message.type == gst.MESSAGE_APPLICATION: + mime = message.structure.get_name() + if mime.startswith('text/') or mime == 'application/xml': + return tags, mime + elif message.type == gst.MESSAGE_ERROR: + error = encoding.locale_decode(message.parse_error()[0]) + if missing_description: + error = '%s (%s)' % (missing_description, error) + raise exceptions.ScannerError(error) + elif message.type == gst.MESSAGE_EOS: + return tags, mime + elif message.type == gst.MESSAGE_ASYNC_DONE: + if message.src == pipeline: + return tags, mime + elif message.type == gst.MESSAGE_TAG: + taglist = message.parse_tag() + # Note that this will only keep the last tag. + tags.update(utils.convert_taglist(taglist)) + + timeout -= clock.get_time() - start + + raise exceptions.ScannerError('Timeout after %dms' % timeout_ms) diff --git a/mopidy/local/commands.py b/mopidy/local/commands.py index 279fda13..af8b0025 100644 --- a/mopidy/local/commands.py +++ b/mopidy/local/commands.py @@ -133,7 +133,8 @@ class ScanCommand(commands.Command): try: relpath = translator.local_track_uri_to_path(uri, media_dir) file_uri = path.path_to_uri(os.path.join(media_dir, relpath)) - tags, duration = scanner.scan(file_uri) + result = scanner.scan(file_uri) + tags, duration = result.tags, result.duration if duration < MIN_DURATION_MS: logger.warning('Failed %s: Track shorter than %dms', uri, MIN_DURATION_MS) diff --git a/mopidy/stream/actor.py b/mopidy/stream/actor.py index 58fd966a..47bfd58f 100644 --- a/mopidy/stream/actor.py +++ b/mopidy/stream/actor.py @@ -45,9 +45,9 @@ class StreamLibraryProvider(backend.LibraryProvider): return [Track(uri=uri)] try: - tags, duration = self._scanner.scan(uri) - track = utils.convert_tags_to_track(tags).copy( - uri=uri, length=duration) + result = self._scanner.scan(uri) + track = utils.convert_tags_to_track(result.tags).copy( + uri=uri, length=result.duration) except exceptions.ScannerError as e: logger.warning('Problem looking up %s: %s', uri, e) track = Track(uri=uri) diff --git a/tests/audio/test_scan.py b/tests/audio/test_scan.py index 50ec8352..b2937a3f 100644 --- a/tests/audio/test_scan.py +++ b/tests/audio/test_scan.py @@ -31,9 +31,9 @@ class ScannerTest(unittest.TestCase): uri = path_lib.path_to_uri(path) key = uri[len('file://'):] try: - tags, duration = scanner.scan(uri) - self.tags[key] = tags - self.durations[key] = duration + result = scanner.scan(uri) + self.tags[key] = result.tags + self.durations[key] = result.duration except exceptions.ScannerError as error: self.errors[key] = error