From 628c8280877e85ba6f027ac3a691a5830e0d2243 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Fri, 13 Mar 2015 00:18:50 +0100 Subject: [PATCH] audio: Recreate scan pipeline for each scan Turns out this code runs a lot faster when we fully destroy the decodebins between scans. And since going to NULL isn't enough I opted to just go for redoing the whole pipeline instead of adding and removing decodebins all the time. As part of this almost all the logic has been ripped out of the scan class and into internal functions. The external interface has been kept the same for now. But we could easily switch to `scan(uri, timeout=1000, proxy=None)` --- mopidy/audio/scan.py | 203 +++++++++++++++++++++---------------------- 1 file changed, 98 insertions(+), 105 deletions(-) diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index c4516531..50fb8700 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -14,10 +14,13 @@ from mopidy.utils import encoding _missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description -Result = collections.namedtuple( +_Result = collections.namedtuple( 'Result', ('uri', 'tags', 'duration', 'seekable', 'mime')) +_RAW_AUDIO = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') + +# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)? class Scanner(object): """ Helper to get tags and other relevant info from URIs. @@ -31,33 +34,6 @@ class Scanner(object): self._timeout_ms = timeout self._proxy_config = proxy_config or {} - sink = gst.element_factory_make('fakesink') - self._src = None - - def pad_added(src, pad): - return pad.link(sink.get_pad('sink')) - - def have_type(finder, probability, caps): - msg = gst.message_new_application(finder, caps.get_structure(0)) - finder.get_bus().post(msg) - - self._typefinder = gst.element_factory_make('typefind') - self._typefinder.connect('have-type', have_type) - - audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') - self._decodebin = gst.element_factory_make('decodebin2') - self._decodebin.set_property('caps', audio_caps) - self._decodebin.connect('pad-added', pad_added) - - self._pipe = gst.element_factory_make('pipeline') - self._pipe.add(self._typefinder) - self._pipe.add(self._decodebin) - self._pipe.add(sink) - - self._typefinder.link(self._decodebin) - - self._bus = self._pipe.get_bus() - def scan(self, uri): """ Scan the given uri collecting relevant metadata. @@ -72,92 +48,109 @@ class Scanner(object): indicating if a seek would succeed. """ tags, duration, seekable, mime = None, None, None, None + pipeline = _setup_pipeline(uri, self._proxy_config) + try: - self._setup(uri) - tags, mime = self._collect() - duration = self._query_duration() - seekable = self._query_seekable() + _start_pipeline(pipeline) + tags, mime = _process(pipeline, self._timeout_ms / 1000.0) + duration = _query_duration(pipeline) + seekable = _query_seekable(pipeline) finally: - self._reset() + pipeline.set_state(gst.STATE_NULL) + del pipeline - return Result(uri, tags, duration, seekable, mime) + return _Result(uri, tags, duration, seekable, mime) - def _setup(self, uri): - """Primes the pipeline for collection.""" - protocol = gst.uri_get_protocol(uri) - if self._src and protocol not in self._src.get_protocols(): - self._src.unlink(self._typefinder) - self._pipe.remove(self._src) - self._src = None - if not self._src: - self._src = gst.element_make_from_uri(gst.URI_SRC, uri) - if not self._src: - raise exceptions.ScannerError('Could not find any elements to ' - 'handle %s URI.' % protocol) - utils.setup_proxy(self._src, self._proxy_config) - self._pipe.add(self._src) - self._src.link(self._typefinder) +# Turns out it's _much_ faster to just create a new pipeline for every as +# decodebins and other elements don't seem to take well to being reused. +def _setup_pipeline(uri, proxy_config=None): + src = gst.element_make_from_uri(gst.URI_SRC, uri) + if not src: + raise exceptions.ScannerError('GStreamer can not open: %s' % uri) - self._pipe.set_state(gst.STATE_READY) - self._src.set_uri(uri) + typefind = gst.element_factory_make('typefind') + decodebin = gst.element_factory_make('decodebin2') + sink = gst.element_factory_make('fakesink') - result = self._pipe.set_state(gst.STATE_PAUSED) - if result == gst.STATE_CHANGE_NO_PREROLL: - # Live sources don't pre-roll, so set to playing to get data. - self._pipe.set_state(gst.STATE_PLAYING) + pipeline = gst.element_factory_make('pipeline') + pipeline.add_many(src, typefind, decodebin, sink) + gst.element_link_many(src, typefind, decodebin) - def _collect(self): - """Polls for messages to collect data.""" - start = time.time() - timeout_s = self._timeout_ms / 1000.0 - tags, mime, missing_description = {}, None, None + if proxy_config: + utils.setup_proxy(src, proxy_config) - while time.time() - start < timeout_s: - if not self._bus.have_pending(): - continue - message = self._bus.pop() + decodebin.set_property('caps', _RAW_AUDIO) + decodebin.connect('pad-added', _pad_added, sink) + typefind.connect('have-type', _have_type, decodebin) - if message.type == gst.MESSAGE_ELEMENT: - if gst.pbutils.is_missing_plugin_message(message): - missing_description = encoding.locale_decode( - _missing_plugin_desc(message)) - elif message.type == gst.MESSAGE_APPLICATION: - mime = message.structure.get_name() - if mime.startswith('text/') or mime == 'application/xml': - return tags, mime - elif message.type == gst.MESSAGE_ERROR: - error = encoding.locale_decode(message.parse_error()[0]) - if missing_description: - error = '%s (%s)' % (missing_description, error) - raise exceptions.ScannerError(error) - elif message.type == gst.MESSAGE_EOS: + return pipeline + + +def _have_type(element, probability, caps, decodebin): + decodebin.set_property('sink-caps', caps) + msg = gst.message_new_application(element, caps.get_structure(0)) + element.get_bus().post(msg) + + +def _pad_added(element, pad, sink): + return pad.link(sink.get_pad('sink')) + + +def _start_pipeline(pipeline): + if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_NO_PREROLL: + pipeline.set_state(gst.STATE_PLAYING) + + +def _query_duration(pipeline): + try: + duration = pipeline.query_duration(gst.FORMAT_TIME, None)[0] + except gst.QueryError: + return None + + if duration < 0: + return None + else: + return duration // gst.MSECOND + + +def _query_seekable(pipeline): + query = gst.query_new_seeking(gst.FORMAT_TIME) + pipeline.query(query) + return query.parse_seeking()[1] + + +def _process(pipeline, timeout): + start = time.time() + tags, mime, missing_description = {}, None, None + bus = pipeline.get_bus() + + while time.time() - start < timeout: + if not bus.have_pending(): + continue + message = bus.pop() + + if message.type == gst.MESSAGE_ELEMENT: + if gst.pbutils.is_missing_plugin_message(message): + missing_description = encoding.locale_decode( + _missing_plugin_desc(message)) + elif message.type == gst.MESSAGE_APPLICATION: + mime = message.structure.get_name() + if mime.startswith('text/') or mime == 'application/xml': return tags, mime - elif message.type == gst.MESSAGE_ASYNC_DONE: - if message.src == self._pipe: - return tags, mime - elif message.type == gst.MESSAGE_TAG: - taglist = message.parse_tag() - # Note that this will only keep the last tag. - tags.update(utils.convert_taglist(taglist)) + elif message.type == gst.MESSAGE_ERROR: + error = encoding.locale_decode(message.parse_error()[0]) + if missing_description: + error = '%s (%s)' % (missing_description, error) + raise exceptions.ScannerError(error) + elif message.type == gst.MESSAGE_EOS: + return tags, mime + elif message.type == gst.MESSAGE_ASYNC_DONE: + if message.src == pipeline: + return tags, mime + elif message.type == gst.MESSAGE_TAG: + taglist = message.parse_tag() + # Note that this will only keep the last tag. + tags.update(utils.convert_taglist(taglist)) - raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms) - - def _reset(self): - self._pipe.set_state(gst.STATE_NULL) - - def _query_duration(self): - try: - duration = self._pipe.query_duration(gst.FORMAT_TIME, None)[0] - except gst.QueryError: - return None - - if duration < 0: - return None - else: - return duration // gst.MSECOND - - def _query_seekable(self): - query = gst.query_new_seeking(gst.FORMAT_TIME) - self._pipe.query(query) - return query.parse_seeking()[1] + raise exceptions.ScannerError('Timeout after %dms' % (timeout * 1000))