diff --git a/mopidy/audio/scan.py b/mopidy/audio/scan.py index c4516531..50fb8700 100644 --- a/mopidy/audio/scan.py +++ b/mopidy/audio/scan.py @@ -14,10 +14,13 @@ from mopidy.utils import encoding _missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description -Result = collections.namedtuple( +_Result = collections.namedtuple( 'Result', ('uri', 'tags', 'duration', 'seekable', 'mime')) +_RAW_AUDIO = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') + +# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)? class Scanner(object): """ Helper to get tags and other relevant info from URIs. @@ -31,33 +34,6 @@ class Scanner(object): self._timeout_ms = timeout self._proxy_config = proxy_config or {} - sink = gst.element_factory_make('fakesink') - self._src = None - - def pad_added(src, pad): - return pad.link(sink.get_pad('sink')) - - def have_type(finder, probability, caps): - msg = gst.message_new_application(finder, caps.get_structure(0)) - finder.get_bus().post(msg) - - self._typefinder = gst.element_factory_make('typefind') - self._typefinder.connect('have-type', have_type) - - audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float') - self._decodebin = gst.element_factory_make('decodebin2') - self._decodebin.set_property('caps', audio_caps) - self._decodebin.connect('pad-added', pad_added) - - self._pipe = gst.element_factory_make('pipeline') - self._pipe.add(self._typefinder) - self._pipe.add(self._decodebin) - self._pipe.add(sink) - - self._typefinder.link(self._decodebin) - - self._bus = self._pipe.get_bus() - def scan(self, uri): """ Scan the given uri collecting relevant metadata. @@ -72,92 +48,109 @@ class Scanner(object): indicating if a seek would succeed. """ tags, duration, seekable, mime = None, None, None, None + pipeline = _setup_pipeline(uri, self._proxy_config) + try: - self._setup(uri) - tags, mime = self._collect() - duration = self._query_duration() - seekable = self._query_seekable() + _start_pipeline(pipeline) + tags, mime = _process(pipeline, self._timeout_ms / 1000.0) + duration = _query_duration(pipeline) + seekable = _query_seekable(pipeline) finally: - self._reset() + pipeline.set_state(gst.STATE_NULL) + del pipeline - return Result(uri, tags, duration, seekable, mime) + return _Result(uri, tags, duration, seekable, mime) - def _setup(self, uri): - """Primes the pipeline for collection.""" - protocol = gst.uri_get_protocol(uri) - if self._src and protocol not in self._src.get_protocols(): - self._src.unlink(self._typefinder) - self._pipe.remove(self._src) - self._src = None - if not self._src: - self._src = gst.element_make_from_uri(gst.URI_SRC, uri) - if not self._src: - raise exceptions.ScannerError('Could not find any elements to ' - 'handle %s URI.' % protocol) - utils.setup_proxy(self._src, self._proxy_config) - self._pipe.add(self._src) - self._src.link(self._typefinder) +# Turns out it's _much_ faster to just create a new pipeline for every as +# decodebins and other elements don't seem to take well to being reused. +def _setup_pipeline(uri, proxy_config=None): + src = gst.element_make_from_uri(gst.URI_SRC, uri) + if not src: + raise exceptions.ScannerError('GStreamer can not open: %s' % uri) - self._pipe.set_state(gst.STATE_READY) - self._src.set_uri(uri) + typefind = gst.element_factory_make('typefind') + decodebin = gst.element_factory_make('decodebin2') + sink = gst.element_factory_make('fakesink') - result = self._pipe.set_state(gst.STATE_PAUSED) - if result == gst.STATE_CHANGE_NO_PREROLL: - # Live sources don't pre-roll, so set to playing to get data. - self._pipe.set_state(gst.STATE_PLAYING) + pipeline = gst.element_factory_make('pipeline') + pipeline.add_many(src, typefind, decodebin, sink) + gst.element_link_many(src, typefind, decodebin) - def _collect(self): - """Polls for messages to collect data.""" - start = time.time() - timeout_s = self._timeout_ms / 1000.0 - tags, mime, missing_description = {}, None, None + if proxy_config: + utils.setup_proxy(src, proxy_config) - while time.time() - start < timeout_s: - if not self._bus.have_pending(): - continue - message = self._bus.pop() + decodebin.set_property('caps', _RAW_AUDIO) + decodebin.connect('pad-added', _pad_added, sink) + typefind.connect('have-type', _have_type, decodebin) - if message.type == gst.MESSAGE_ELEMENT: - if gst.pbutils.is_missing_plugin_message(message): - missing_description = encoding.locale_decode( - _missing_plugin_desc(message)) - elif message.type == gst.MESSAGE_APPLICATION: - mime = message.structure.get_name() - if mime.startswith('text/') or mime == 'application/xml': - return tags, mime - elif message.type == gst.MESSAGE_ERROR: - error = encoding.locale_decode(message.parse_error()[0]) - if missing_description: - error = '%s (%s)' % (missing_description, error) - raise exceptions.ScannerError(error) - elif message.type == gst.MESSAGE_EOS: + return pipeline + + +def _have_type(element, probability, caps, decodebin): + decodebin.set_property('sink-caps', caps) + msg = gst.message_new_application(element, caps.get_structure(0)) + element.get_bus().post(msg) + + +def _pad_added(element, pad, sink): + return pad.link(sink.get_pad('sink')) + + +def _start_pipeline(pipeline): + if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_NO_PREROLL: + pipeline.set_state(gst.STATE_PLAYING) + + +def _query_duration(pipeline): + try: + duration = pipeline.query_duration(gst.FORMAT_TIME, None)[0] + except gst.QueryError: + return None + + if duration < 0: + return None + else: + return duration // gst.MSECOND + + +def _query_seekable(pipeline): + query = gst.query_new_seeking(gst.FORMAT_TIME) + pipeline.query(query) + return query.parse_seeking()[1] + + +def _process(pipeline, timeout): + start = time.time() + tags, mime, missing_description = {}, None, None + bus = pipeline.get_bus() + + while time.time() - start < timeout: + if not bus.have_pending(): + continue + message = bus.pop() + + if message.type == gst.MESSAGE_ELEMENT: + if gst.pbutils.is_missing_plugin_message(message): + missing_description = encoding.locale_decode( + _missing_plugin_desc(message)) + elif message.type == gst.MESSAGE_APPLICATION: + mime = message.structure.get_name() + if mime.startswith('text/') or mime == 'application/xml': return tags, mime - elif message.type == gst.MESSAGE_ASYNC_DONE: - if message.src == self._pipe: - return tags, mime - elif message.type == gst.MESSAGE_TAG: - taglist = message.parse_tag() - # Note that this will only keep the last tag. - tags.update(utils.convert_taglist(taglist)) + elif message.type == gst.MESSAGE_ERROR: + error = encoding.locale_decode(message.parse_error()[0]) + if missing_description: + error = '%s (%s)' % (missing_description, error) + raise exceptions.ScannerError(error) + elif message.type == gst.MESSAGE_EOS: + return tags, mime + elif message.type == gst.MESSAGE_ASYNC_DONE: + if message.src == pipeline: + return tags, mime + elif message.type == gst.MESSAGE_TAG: + taglist = message.parse_tag() + # Note that this will only keep the last tag. + tags.update(utils.convert_taglist(taglist)) - raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms) - - def _reset(self): - self._pipe.set_state(gst.STATE_NULL) - - def _query_duration(self): - try: - duration = self._pipe.query_duration(gst.FORMAT_TIME, None)[0] - except gst.QueryError: - return None - - if duration < 0: - return None - else: - return duration // gst.MSECOND - - def _query_seekable(self): - query = gst.query_new_seeking(gst.FORMAT_TIME) - self._pipe.query(query) - return query.parse_seeking()[1] + raise exceptions.ScannerError('Timeout after %dms' % (timeout * 1000))