From f4e6956bb749045b35179f99357c551f44d0dfda Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 11 Mar 2015 22:58:41 +0100 Subject: [PATCH 01/13] audio: Catch missing plugins in scanner for better error messages --- mopidy/audio/scan.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index 38b86437..c3eec941 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -5,11 +5,14 @@ import time import pygst pygst.require('0.10') import gst # noqa +import gst.pbutils from mopidy import exceptions from mopidy.audio import utils from mopidy.utils import encoding +_missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description + class Scanner(object): """ @@ -86,7 +89,11 @@ class Scanner(object): continue message = self._bus.pop() - if message.type == gst.MESSAGE_ERROR: + if message.type == gst.MESSAGE_ELEMENT: + if gst.pbutils.is_missing_plugin_message(message): + description = _missing_plugin_desc(message) + raise exceptions.ScannerError(description) + elif message.type == gst.MESSAGE_ERROR: raise exceptions.ScannerError( encoding.locale_decode(message.parse_error()[0])) elif message.type == gst.MESSAGE_EOS: From cee73b5501b7956ebc6cc5a5712fc31c3ff30523 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 11 Mar 2015 23:09:14 +0100 Subject: [PATCH 02/13] audio: Add support for checking seekable state in scanner Return type of scanner changed to a named tuple with (uri, tags, duration, seekable). This should help with #872 and the related "live" issues. Tests, local scan and stream metadata lookup have been updated to account for the changes. --- mopidy/audio/scan.py | 22 +++++++++++++++++----- mopidy/local/commands.py | 3 ++- mopidy/stream/actor.py | 6 +++--- tests/audio/test_scan.py | 6 +++--- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index c3eec941..d443b8bd 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -1,5 +1,6 @@ from __future__ import absolute_import, division, unicode_literals +import collections import time import pygst @@ -13,6 +14,9 @@ from mopidy.utils import encoding _missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description +Result = collections.namedtuple( + 'Result', ('uri', 'tags', 'duration', 'seekable')) + class Scanner(object): """ @@ -54,19 +58,22 @@ class Scanner(object): :param uri: URI of the resource to scan. :type event: string - :return: (tags, duration) pair. tags is a dictionary of lists for all - the tags we found and duration is the length of the URI in - milliseconds, or :class:`None` if the URI has no duration. + :return: A named tuple containing ``(uri, tags, duration, seekable)``. + ``tags`` is a dictionary of lists for all the tags we found. + ``duration`` is the length of the URI in milliseconds, or + :class:`None` if the URI has no duration. ``seekable`` is boolean + indicating if a seek would succeed. """ - tags, duration = None, None + tags, duration, seekable = None, None, None try: self._setup(uri) tags = self._collect() duration = self._query_duration() + seekable = self._query_seekable() finally: self._reset() - return tags, duration + return Result(uri, tags, duration, seekable) def _setup(self, uri): """Primes the pipeline for collection.""" @@ -123,3 +130,8 @@ class Scanner(object): return None else: return duration // gst.MSECOND + + def _query_seekable(self): + query = gst.query_new_seeking(gst.FORMAT_TIME) + self._pipe.query(query) + return query.parse_seeking()[1] diff --git a/mopidy/local/commands.py b/mopidy/local/commands.py index 279fda13..af8b0025 100644 --- a/mopidy/local/commands.py +++ b/mopidy/local/commands.py @@ -133,7 +133,8 @@ class ScanCommand(commands.Command): try: relpath = translator.local_track_uri_to_path(uri, media_dir) file_uri = path.path_to_uri(os.path.join(media_dir, relpath)) - tags, duration = scanner.scan(file_uri) + result = scanner.scan(file_uri) + tags, duration = result.tags, result.duration if duration < MIN_DURATION_MS: logger.warning('Failed %s: Track shorter than %dms', uri, MIN_DURATION_MS) diff --git a/mopidy/stream/actor.py b/mopidy/stream/actor.py index 58fd966a..47bfd58f 100644 --- a/mopidy/stream/actor.py +++ b/mopidy/stream/actor.py @@ -45,9 +45,9 @@ class StreamLibraryProvider(backend.LibraryProvider): return [Track(uri=uri)] try: - tags, duration = self._scanner.scan(uri) - track = utils.convert_tags_to_track(tags).copy( - uri=uri, length=duration) + result = self._scanner.scan(uri) + track = utils.convert_tags_to_track(result.tags).copy( + uri=uri, length=result.duration) except exceptions.ScannerError as e: logger.warning('Problem looking up %s: %s', uri, e) track = Track(uri=uri) diff --git a/tests/audio/test_scan.py b/tests/audio/test_scan.py index 50ec8352..b2937a3f 100644 --- a/tests/audio/test_scan.py +++ b/tests/audio/test_scan.py @@ -31,9 +31,9 @@ class ScannerTest(unittest.TestCase): uri = path_lib.path_to_uri(path) key = uri[len('file://'):] try: - tags, duration = scanner.scan(uri) - self.tags[key] = tags - self.durations[key] = duration + result = scanner.scan(uri) + self.tags[key] = result.tags + self.durations[key] = result.duration except exceptions.ScannerError as error: self.errors[key] = error From ccd3753b30514a02245f923c39d77a008bb8c847 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 11 Mar 2015 23:14:24 +0100 Subject: [PATCH 03/13] audio: Switch to decodebin2 in scanner and handle our own sources This is needed to be able to put in our own typefind and catch playlists before they make it to the decoder. --- mopidy/audio/scan.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index d443b8bd..c4fca6ad 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -29,24 +29,21 @@ class Scanner(object): def __init__(self, timeout=1000, proxy_config=None): self._timeout_ms = timeout + self._proxy_config = proxy_config or {} sink = gst.element_factory_make('fakesink') - - audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') + self._src = None def pad_added(src, pad): return pad.link(sink.get_pad('sink')) - def source_setup(element, source): - utils.setup_proxy(source, proxy_config or {}) - - self._uribin = gst.element_factory_make('uridecodebin') - self._uribin.set_property('caps', audio_caps) - self._uribin.connect('pad-added', pad_added) - self._uribin.connect('source-setup', source_setup) + audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') + self._decodebin = gst.element_factory_make('decodebin2') + self._decodebin.set_property('caps', audio_caps) + self._decodebin.connect('pad-added', pad_added) self._pipe = gst.element_factory_make('pipeline') - self._pipe.add(self._uribin) + self._pipe.add(self._decodebin) self._pipe.add(sink) self._bus = self._pipe.get_bus() @@ -78,8 +75,16 @@ class Scanner(object): def _setup(self, uri): """Primes the pipeline for collection.""" self._pipe.set_state(gst.STATE_READY) - self._uribin.set_property(b'uri', uri) + + self._src = gst.element_make_from_uri(gst.URI_SRC, uri) + utils.setup_proxy(self._src, self._proxy_config) + + self._pipe.add(self._src) + self._src.sync_state_with_parent() + self._src.link(self._decodebin) + self._bus.set_flushing(False) + result = self._pipe.set_state(gst.STATE_PAUSED) if result == gst.STATE_CHANGE_NO_PREROLL: # Live sources don't pre-roll, so set to playing to get data. @@ -119,6 +124,9 @@ class Scanner(object): """Ensures we cleanup child elements and flush the bus.""" self._bus.set_flushing(True) self._pipe.set_state(gst.STATE_NULL) + self._src.unlink(self._decodebin) + self._pipe.remove(self._src) + self._src = None def _query_duration(self): try: From cd579ff7bbd36b8a69fad05080d6a661b043cc95 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 11 Mar 2015 23:20:30 +0100 Subject: [PATCH 04/13] audio: Going to NULL already handles the flushing for us --- mopidy/audio/scan.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index c4fca6ad..84477def 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -47,7 +47,6 @@ class Scanner(object): self._pipe.add(sink) self._bus = self._pipe.get_bus() - self._bus.set_flushing(True) def scan(self, uri): """ @@ -83,8 +82,6 @@ class Scanner(object): self._src.sync_state_with_parent() self._src.link(self._decodebin) - self._bus.set_flushing(False) - result = self._pipe.set_state(gst.STATE_PAUSED) if result == gst.STATE_CHANGE_NO_PREROLL: # Live sources don't pre-roll, so set to playing to get data. @@ -121,8 +118,7 @@ class Scanner(object): raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms) def _reset(self): - """Ensures we cleanup child elements and flush the bus.""" - self._bus.set_flushing(True) + """Ensures we cleanup child elements.""" self._pipe.set_state(gst.STATE_NULL) self._src.unlink(self._decodebin) self._pipe.remove(self._src) From 24cceb69ebf2453b7f8cbd1f6c3126c39f2fd885 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 11 Mar 2015 23:21:41 +0100 Subject: [PATCH 05/13] audio: Going to ready is pointless in this code. --- mopidy/audio/scan.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index 84477def..359f31cf 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -73,11 +73,8 @@ class Scanner(object): def _setup(self, uri): """Primes the pipeline for collection.""" - self._pipe.set_state(gst.STATE_READY) - self._src = gst.element_make_from_uri(gst.URI_SRC, uri) utils.setup_proxy(self._src, self._proxy_config) - self._pipe.add(self._src) self._src.sync_state_with_parent() self._src.link(self._decodebin) From c93eaad7ed53e9175a5b88636c1009ca10ae7804 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Thu, 12 Mar 2015 00:16:02 +0100 Subject: [PATCH 06/13] audio: Try and reuse source when we can --- mopidy/audio/scan.py | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index 359f31cf..87e60076 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -73,11 +73,21 @@ class Scanner(object): def _setup(self, uri): """Primes the pipeline for collection.""" - self._src = gst.element_make_from_uri(gst.URI_SRC, uri) - utils.setup_proxy(self._src, self._proxy_config) - self._pipe.add(self._src) - self._src.sync_state_with_parent() - self._src.link(self._decodebin) + protocol = gst.uri_get_protocol(uri) + if self._src and protocol not in self._src.get_protocols(): + self._src.unlink(self._decodebin) + self._pipe.remove(self._src) + self._src = None + + if not self._src: + self._src = gst.element_make_from_uri(gst.URI_SRC, uri) + utils.setup_proxy(self._src, self._proxy_config) + self._pipe.add(self._src) + self._src.sync_state_with_parent() + self._src.link(self._decodebin) + + self._pipe.set_state(gst.STATE_READY) + self._src.set_uri(uri) result = self._pipe.set_state(gst.STATE_PAUSED) if result == gst.STATE_CHANGE_NO_PREROLL: @@ -115,11 +125,7 @@ class Scanner(object): raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms) def _reset(self): - """Ensures we cleanup child elements.""" self._pipe.set_state(gst.STATE_NULL) - self._src.unlink(self._decodebin) - self._pipe.remove(self._src) - self._src = None def _query_duration(self): try: From 837f2de62985232d9b7140e334ac70816d54933b Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Thu, 12 Mar 2015 01:06:36 +0100 Subject: [PATCH 07/13] audio: Add typefinder to scanner and add mime to result This should allow us to move playlist handling out of GStreamer as we will short circuit for text/* and application/xml now. --- mopidy/audio/scan.py | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index 87e60076..39cf172e 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -15,7 +15,7 @@ from mopidy.utils import encoding _missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description Result = collections.namedtuple( - 'Result', ('uri', 'tags', 'duration', 'seekable')) + 'Result', ('uri', 'tags', 'duration', 'seekable', 'mime')) class Scanner(object): @@ -37,15 +37,25 @@ class Scanner(object): def pad_added(src, pad): return pad.link(sink.get_pad('sink')) + def have_type(finder, probability, caps): + msg = gst.message_new_application(finder, caps.get_structure(0)) + finder.get_bus().post(msg) + + self._typefinder = gst.element_factory_make('typefind') + self._typefinder.connect('have-type', have_type) + audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') self._decodebin = gst.element_factory_make('decodebin2') self._decodebin.set_property('caps', audio_caps) self._decodebin.connect('pad-added', pad_added) self._pipe = gst.element_factory_make('pipeline') + self._pipe.add(self._typefinder) self._pipe.add(self._decodebin) self._pipe.add(sink) + self._typefinder.link(self._decodebin) + self._bus = self._pipe.get_bus() def scan(self, uri): @@ -54,28 +64,29 @@ class Scanner(object): :param uri: URI of the resource to scan. :type event: string - :return: A named tuple containing ``(uri, tags, duration, seekable)``. + :return: A named tuple containing + ``(uri, tags, duration, seekable, mime)``. ``tags`` is a dictionary of lists for all the tags we found. ``duration`` is the length of the URI in milliseconds, or - :class:`None` if the URI has no duration. ``seekable`` is boolean + :class:`None` if the URI has no duration. ``seekable`` is boolean. indicating if a seek would succeed. """ - tags, duration, seekable = None, None, None + tags, duration, seekable, mime = None, None, None, None try: self._setup(uri) - tags = self._collect() + tags, mime = self._collect() duration = self._query_duration() seekable = self._query_seekable() finally: self._reset() - return Result(uri, tags, duration, seekable) + return Result(uri, tags, duration, seekable, mime) def _setup(self, uri): """Primes the pipeline for collection.""" protocol = gst.uri_get_protocol(uri) if self._src and protocol not in self._src.get_protocols(): - self._src.unlink(self._decodebin) + self._src.unlink(self._typefinder) self._pipe.remove(self._src) self._src = None @@ -83,8 +94,7 @@ class Scanner(object): self._src = gst.element_make_from_uri(gst.URI_SRC, uri) utils.setup_proxy(self._src, self._proxy_config) self._pipe.add(self._src) - self._src.sync_state_with_parent() - self._src.link(self._decodebin) + self._src.link(self._typefinder) self._pipe.set_state(gst.STATE_READY) self._src.set_uri(uri) @@ -98,7 +108,7 @@ class Scanner(object): """Polls for messages to collect data.""" start = time.time() timeout_s = self._timeout_ms / 1000.0 - tags = {} + tags, mime = {}, None while time.time() - start < timeout_s: if not self._bus.have_pending(): @@ -109,14 +119,18 @@ class Scanner(object): if gst.pbutils.is_missing_plugin_message(message): description = _missing_plugin_desc(message) raise exceptions.ScannerError(description) + elif message.type == gst.MESSAGE_APPLICATION: + mime = message.structure.get_name() + if mime.startswith('text/') or mime == 'application/xml': + return tags, mime elif message.type == gst.MESSAGE_ERROR: raise exceptions.ScannerError( encoding.locale_decode(message.parse_error()[0])) elif message.type == gst.MESSAGE_EOS: - return tags + return tags, mime elif message.type == gst.MESSAGE_ASYNC_DONE: if message.src == self._pipe: - return tags + return tags, mime elif message.type == gst.MESSAGE_TAG: taglist = message.parse_tag() # Note that this will only keep the last tag. From 9c9d05be36616f528c9d79d7bc54e48d2e938283 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Thu, 12 Mar 2015 21:55:17 +0100 Subject: [PATCH 08/13] audio: Only warn about missing plugin on errors --- mopidy/audio/scan.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index 39cf172e..ed8e9eb9 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -108,7 +108,7 @@ class Scanner(object): """Polls for messages to collect data.""" start = time.time() timeout_s = self._timeout_ms / 1000.0 - tags, mime = {}, None + tags, mime, missing_description = {}, None, None while time.time() - start < timeout_s: if not self._bus.have_pending(): @@ -117,15 +117,17 @@ class Scanner(object): if message.type == gst.MESSAGE_ELEMENT: if gst.pbutils.is_missing_plugin_message(message): - description = _missing_plugin_desc(message) - raise exceptions.ScannerError(description) + missing_description = encoding.locale_decode( + _missing_plugin_desc(message)) elif message.type == gst.MESSAGE_APPLICATION: mime = message.structure.get_name() if mime.startswith('text/') or mime == 'application/xml': return tags, mime elif message.type == gst.MESSAGE_ERROR: - raise exceptions.ScannerError( - encoding.locale_decode(message.parse_error()[0])) + error = encoding.locale_decode(message.parse_error()[0]) + if missing_description: + error = '%s (%s)' % (missing_description, error) + raise exceptions.ScannerError(error) elif message.type == gst.MESSAGE_EOS: return tags, mime elif message.type == gst.MESSAGE_ASYNC_DONE: From 411bae5a56aaeb5e59e57bdb5939aa76f398511d Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Thu, 12 Mar 2015 21:58:27 +0100 Subject: [PATCH 09/13] audio: Raise error for unknown protocol types --- mopidy/audio/scan.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index ed8e9eb9..c4516531 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -92,6 +92,9 @@ class Scanner(object): if not self._src: self._src = gst.element_make_from_uri(gst.URI_SRC, uri) + if not self._src: + raise exceptions.ScannerError('Could not find any elements to ' + 'handle %s URI.' % protocol) utils.setup_proxy(self._src, self._proxy_config) self._pipe.add(self._src) self._src.link(self._typefinder) From 628c8280877e85ba6f027ac3a691a5830e0d2243 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Fri, 13 Mar 2015 00:18:50 +0100 Subject: [PATCH 10/13] audio: Recreate scan pipeline for each scan Turns out this code runs a lot faster when we fully destroy the decodebins between scans. And since going to NULL isn't enough I opted to just go for redoing the whole pipeline instead of adding and removing decodebins all the time. As part of this almost all the logic has been ripped out of the scan class and into internal functions. The external interface has been kept the same for now. But we could easily switch to `scan(uri, timeout=1000, proxy=None)` --- mopidy/audio/scan.py | 203 +++++++++++++++++++++---------------------- 1 file changed, 98 insertions(+), 105 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index c4516531..50fb8700 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -14,10 +14,13 @@ from mopidy.utils import encoding _missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description -Result = collections.namedtuple( +_Result = collections.namedtuple( 'Result', ('uri', 'tags', 'duration', 'seekable', 'mime')) +_RAW_AUDIO = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') + +# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)? class Scanner(object): """ Helper to get tags and other relevant info from URIs. @@ -31,33 +34,6 @@ class Scanner(object): self._timeout_ms = timeout self._proxy_config = proxy_config or {} - sink = gst.element_factory_make('fakesink') - self._src = None - - def pad_added(src, pad): - return pad.link(sink.get_pad('sink')) - - def have_type(finder, probability, caps): - msg = gst.message_new_application(finder, caps.get_structure(0)) - finder.get_bus().post(msg) - - self._typefinder = gst.element_factory_make('typefind') - self._typefinder.connect('have-type', have_type) - - audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') - self._decodebin = gst.element_factory_make('decodebin2') - self._decodebin.set_property('caps', audio_caps) - self._decodebin.connect('pad-added', pad_added) - - self._pipe = gst.element_factory_make('pipeline') - self._pipe.add(self._typefinder) - self._pipe.add(self._decodebin) - self._pipe.add(sink) - - self._typefinder.link(self._decodebin) - - self._bus = self._pipe.get_bus() - def scan(self, uri): """ Scan the given uri collecting relevant metadata. @@ -72,92 +48,109 @@ class Scanner(object): indicating if a seek would succeed. """ tags, duration, seekable, mime = None, None, None, None + pipeline = _setup_pipeline(uri, self._proxy_config) + try: - self._setup(uri) - tags, mime = self._collect() - duration = self._query_duration() - seekable = self._query_seekable() + _start_pipeline(pipeline) + tags, mime = _process(pipeline, self._timeout_ms / 1000.0) + duration = _query_duration(pipeline) + seekable = _query_seekable(pipeline) finally: - self._reset() + pipeline.set_state(gst.STATE_NULL) + del pipeline - return Result(uri, tags, duration, seekable, mime) + return _Result(uri, tags, duration, seekable, mime) - def _setup(self, uri): - """Primes the pipeline for collection.""" - protocol = gst.uri_get_protocol(uri) - if self._src and protocol not in self._src.get_protocols(): - self._src.unlink(self._typefinder) - self._pipe.remove(self._src) - self._src = None - if not self._src: - self._src = gst.element_make_from_uri(gst.URI_SRC, uri) - if not self._src: - raise exceptions.ScannerError('Could not find any elements to ' - 'handle %s URI.' % protocol) - utils.setup_proxy(self._src, self._proxy_config) - self._pipe.add(self._src) - self._src.link(self._typefinder) +# Turns out it's _much_ faster to just create a new pipeline for every as +# decodebins and other elements don't seem to take well to being reused. +def _setup_pipeline(uri, proxy_config=None): + src = gst.element_make_from_uri(gst.URI_SRC, uri) + if not src: + raise exceptions.ScannerError('GStreamer can not open: %s' % uri) - self._pipe.set_state(gst.STATE_READY) - self._src.set_uri(uri) + typefind = gst.element_factory_make('typefind') + decodebin = gst.element_factory_make('decodebin2') + sink = gst.element_factory_make('fakesink') - result = self._pipe.set_state(gst.STATE_PAUSED) - if result == gst.STATE_CHANGE_NO_PREROLL: - # Live sources don't pre-roll, so set to playing to get data. - self._pipe.set_state(gst.STATE_PLAYING) + pipeline = gst.element_factory_make('pipeline') + pipeline.add_many(src, typefind, decodebin, sink) + gst.element_link_many(src, typefind, decodebin) - def _collect(self): - """Polls for messages to collect data.""" - start = time.time() - timeout_s = self._timeout_ms / 1000.0 - tags, mime, missing_description = {}, None, None + if proxy_config: + utils.setup_proxy(src, proxy_config) - while time.time() - start < timeout_s: - if not self._bus.have_pending(): - continue - message = self._bus.pop() + decodebin.set_property('caps', _RAW_AUDIO) + decodebin.connect('pad-added', _pad_added, sink) + typefind.connect('have-type', _have_type, decodebin) - if message.type == gst.MESSAGE_ELEMENT: - if gst.pbutils.is_missing_plugin_message(message): - missing_description = encoding.locale_decode( - _missing_plugin_desc(message)) - elif message.type == gst.MESSAGE_APPLICATION: - mime = message.structure.get_name() - if mime.startswith('text/') or mime == 'application/xml': - return tags, mime - elif message.type == gst.MESSAGE_ERROR: - error = encoding.locale_decode(message.parse_error()[0]) - if missing_description: - error = '%s (%s)' % (missing_description, error) - raise exceptions.ScannerError(error) - elif message.type == gst.MESSAGE_EOS: + return pipeline + + +def _have_type(element, probability, caps, decodebin): + decodebin.set_property('sink-caps', caps) + msg = gst.message_new_application(element, caps.get_structure(0)) + element.get_bus().post(msg) + + +def _pad_added(element, pad, sink): + return pad.link(sink.get_pad('sink')) + + +def _start_pipeline(pipeline): + if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_NO_PREROLL: + pipeline.set_state(gst.STATE_PLAYING) + + +def _query_duration(pipeline): + try: + duration = pipeline.query_duration(gst.FORMAT_TIME, None)[0] + except gst.QueryError: + return None + + if duration < 0: + return None + else: + return duration // gst.MSECOND + + +def _query_seekable(pipeline): + query = gst.query_new_seeking(gst.FORMAT_TIME) + pipeline.query(query) + return query.parse_seeking()[1] + + +def _process(pipeline, timeout): + start = time.time() + tags, mime, missing_description = {}, None, None + bus = pipeline.get_bus() + + while time.time() - start < timeout: + if not bus.have_pending(): + continue + message = bus.pop() + + if message.type == gst.MESSAGE_ELEMENT: + if gst.pbutils.is_missing_plugin_message(message): + missing_description = encoding.locale_decode( + _missing_plugin_desc(message)) + elif message.type == gst.MESSAGE_APPLICATION: + mime = message.structure.get_name() + if mime.startswith('text/') or mime == 'application/xml': return tags, mime - elif message.type == gst.MESSAGE_ASYNC_DONE: - if message.src == self._pipe: - return tags, mime - elif message.type == gst.MESSAGE_TAG: - taglist = message.parse_tag() - # Note that this will only keep the last tag. - tags.update(utils.convert_taglist(taglist)) + elif message.type == gst.MESSAGE_ERROR: + error = encoding.locale_decode(message.parse_error()[0]) + if missing_description: + error = '%s (%s)' % (missing_description, error) + raise exceptions.ScannerError(error) + elif message.type == gst.MESSAGE_EOS: + return tags, mime + elif message.type == gst.MESSAGE_ASYNC_DONE: + if message.src == pipeline: + return tags, mime + elif message.type == gst.MESSAGE_TAG: + taglist = message.parse_tag() + # Note that this will only keep the last tag. + tags.update(utils.convert_taglist(taglist)) - raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms) - - def _reset(self): - self._pipe.set_state(gst.STATE_NULL) - - def _query_duration(self): - try: - duration = self._pipe.query_duration(gst.FORMAT_TIME, None)[0] - except gst.QueryError: - return None - - if duration < 0: - return None - else: - return duration // gst.MSECOND - - def _query_seekable(self): - query = gst.query_new_seeking(gst.FORMAT_TIME) - self._pipe.query(query) - return query.parse_seeking()[1] + raise exceptions.ScannerError('Timeout after %dms' % (timeout * 1000)) From 9e8b3263abf5fa8529dedeb46392f8f19eb723f4 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Fri, 13 Mar 2015 22:36:35 +0100 Subject: [PATCH 11/13] audio: Use timed pop for message loop and gst clocks --- mopidy/audio/scan.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index 50fb8700..cbf4c170 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -1,7 +1,6 @@ from __future__ import absolute_import, division, unicode_literals import collections -import time import pygst pygst.require('0.10') @@ -31,7 +30,7 @@ class Scanner(object): """ def __init__(self, timeout=1000, proxy_config=None): - self._timeout_ms = timeout + self._timeout_ms = int(timeout) self._proxy_config = proxy_config or {} def scan(self, uri): @@ -52,7 +51,7 @@ class Scanner(object): try: _start_pipeline(pipeline) - tags, mime = _process(pipeline, self._timeout_ms / 1000.0) + tags, mime = _process(pipeline, self._timeout_ms) duration = _query_duration(pipeline) seekable = _query_seekable(pipeline) finally: @@ -120,17 +119,19 @@ def _query_seekable(pipeline): return query.parse_seeking()[1] -def _process(pipeline, timeout): - start = time.time() - tags, mime, missing_description = {}, None, None +def _process(pipeline, timeout_ms): + clock = pipeline.get_clock() bus = pipeline.get_bus() + timeout = timeout_ms * gst.MSECOND + tags, mime, missing_description = {}, None, None - while time.time() - start < timeout: - if not bus.have_pending(): - continue - message = bus.pop() + start = clock.get_time() + while timeout > 0: + message = bus.timed_pop(timeout) - if message.type == gst.MESSAGE_ELEMENT: + if message is None: + break + elif message.type == gst.MESSAGE_ELEMENT: if gst.pbutils.is_missing_plugin_message(message): missing_description = encoding.locale_decode( _missing_plugin_desc(message)) @@ -153,4 +154,6 @@ def _process(pipeline, timeout): # Note that this will only keep the last tag. tags.update(utils.convert_taglist(taglist)) - raise exceptions.ScannerError('Timeout after %dms' % (timeout * 1000)) + timeout -= clock.get_time() - start + + raise exceptions.ScannerError('Timeout after %dms' % timeout_ms) From faab0b755af9ceb92b2f80b6a9654a670cf38f19 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Fri, 13 Mar 2015 22:39:52 +0100 Subject: [PATCH 12/13] audio: Filter for messages we care about, rest will be dropped --- mopidy/audio/scan.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index cbf4c170..3880d91a 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -125,9 +125,12 @@ def _process(pipeline, timeout_ms): timeout = timeout_ms * gst.MSECOND tags, mime, missing_description = {}, None, None + types = (gst.MESSAGE_ELEMENT | gst.MESSAGE_APPLICATION | gst.MESSAGE_ERROR + | gst.MESSAGE_EOS | gst.MESSAGE_ASYNC_DONE | gst.MESSAGE_TAG) + start = clock.get_time() while timeout > 0: - message = bus.timed_pop(timeout) + message = bus.timed_pop_filtered(timeout, types) if message is None: break From 6b7f9b4899555c8b2badadc4e2016e0f5ec3ee4d Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Fri, 13 Mar 2015 22:45:57 +0100 Subject: [PATCH 13/13] docs: Add changelog for the scanner improvements --- docs/changelog.rst | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/docs/changelog.rst b/docs/changelog.rst index 9e3fb9d2..c5808833 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -144,6 +144,13 @@ v0.20.0 (UNRELEASED) - Update scanner to operate with milliseconds for duration. + - Update scanner to use a custom src, typefind and decodebin. This allows us + to catch playlists before we try to decode them. + + - Refactored scanner to create a new pipeline per song, this is needed as + reseting decodebin is much slower than tearing it down and making a fresh + one. + - Add :meth:`mopidy.audio.AudioListener.tags_changed`. Notifies core when new tags are found. @@ -163,6 +170,12 @@ v0.20.0 (UNRELEASED) - Add workaround for volume not persisting across tracks on OS X. (Issue: :issue:`886`, PR: :issue:`958`) +- Improved missing plugin error reporting in scanner. + +- Introduced a new return type for the scanner, a named tuple with ``uri``, + ``tags``, ``duration``, ``seekable`` and ``mime``. Also added support for + checking seekable, and the initial MIME type guess. + **Stream backend** - Add basic tests for the stream library provider.