audio: Recreate scan pipeline for each scan

Turns out this code runs a lot faster when we fully destroy the decodebins
between scans. And since going to NULL isn't enough I opted to just go for
redoing the whole pipeline instead of adding and removing decodebins all the
time.

As part of this almost all the logic has been ripped out of the scan class and
into internal functions. The external interface has been kept the same for now.
But we could easily switch to `scan(uri, timeout=1000, proxy=None)`
This commit is contained in:
Thomas Adamcik 2015-03-13 00:18:50 +01:00
parent 411bae5a56
commit 628c828087

View File

@ -14,10 +14,13 @@ from mopidy.utils import encoding
_missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description
Result = collections.namedtuple(
_Result = collections.namedtuple(
'Result', ('uri', 'tags', 'duration', 'seekable', 'mime'))
_RAW_AUDIO = gst.Caps(b'audio/x-raw-int; audio/x-raw-float')
# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)?
class Scanner(object):
"""
Helper to get tags and other relevant info from URIs.
@ -31,33 +34,6 @@ class Scanner(object):
self._timeout_ms = timeout
self._proxy_config = proxy_config or {}
sink = gst.element_factory_make('fakesink')
self._src = None
def pad_added(src, pad):
return pad.link(sink.get_pad('sink'))
def have_type(finder, probability, caps):
msg = gst.message_new_application(finder, caps.get_structure(0))
finder.get_bus().post(msg)
self._typefinder = gst.element_factory_make('typefind')
self._typefinder.connect('have-type', have_type)
audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float')
self._decodebin = gst.element_factory_make('decodebin2')
self._decodebin.set_property('caps', audio_caps)
self._decodebin.connect('pad-added', pad_added)
self._pipe = gst.element_factory_make('pipeline')
self._pipe.add(self._typefinder)
self._pipe.add(self._decodebin)
self._pipe.add(sink)
self._typefinder.link(self._decodebin)
self._bus = self._pipe.get_bus()
def scan(self, uri):
"""
Scan the given uri collecting relevant metadata.
@ -72,92 +48,109 @@ class Scanner(object):
indicating if a seek would succeed.
"""
tags, duration, seekable, mime = None, None, None, None
pipeline = _setup_pipeline(uri, self._proxy_config)
try:
self._setup(uri)
tags, mime = self._collect()
duration = self._query_duration()
seekable = self._query_seekable()
_start_pipeline(pipeline)
tags, mime = _process(pipeline, self._timeout_ms / 1000.0)
duration = _query_duration(pipeline)
seekable = _query_seekable(pipeline)
finally:
self._reset()
pipeline.set_state(gst.STATE_NULL)
del pipeline
return Result(uri, tags, duration, seekable, mime)
return _Result(uri, tags, duration, seekable, mime)
def _setup(self, uri):
"""Primes the pipeline for collection."""
protocol = gst.uri_get_protocol(uri)
if self._src and protocol not in self._src.get_protocols():
self._src.unlink(self._typefinder)
self._pipe.remove(self._src)
self._src = None
if not self._src:
self._src = gst.element_make_from_uri(gst.URI_SRC, uri)
if not self._src:
raise exceptions.ScannerError('Could not find any elements to '
'handle %s URI.' % protocol)
utils.setup_proxy(self._src, self._proxy_config)
self._pipe.add(self._src)
self._src.link(self._typefinder)
# Turns out it's _much_ faster to just create a new pipeline for every as
# decodebins and other elements don't seem to take well to being reused.
def _setup_pipeline(uri, proxy_config=None):
src = gst.element_make_from_uri(gst.URI_SRC, uri)
if not src:
raise exceptions.ScannerError('GStreamer can not open: %s' % uri)
self._pipe.set_state(gst.STATE_READY)
self._src.set_uri(uri)
typefind = gst.element_factory_make('typefind')
decodebin = gst.element_factory_make('decodebin2')
sink = gst.element_factory_make('fakesink')
result = self._pipe.set_state(gst.STATE_PAUSED)
if result == gst.STATE_CHANGE_NO_PREROLL:
# Live sources don't pre-roll, so set to playing to get data.
self._pipe.set_state(gst.STATE_PLAYING)
pipeline = gst.element_factory_make('pipeline')
pipeline.add_many(src, typefind, decodebin, sink)
gst.element_link_many(src, typefind, decodebin)
def _collect(self):
"""Polls for messages to collect data."""
start = time.time()
timeout_s = self._timeout_ms / 1000.0
tags, mime, missing_description = {}, None, None
if proxy_config:
utils.setup_proxy(src, proxy_config)
while time.time() - start < timeout_s:
if not self._bus.have_pending():
continue
message = self._bus.pop()
decodebin.set_property('caps', _RAW_AUDIO)
decodebin.connect('pad-added', _pad_added, sink)
typefind.connect('have-type', _have_type, decodebin)
if message.type == gst.MESSAGE_ELEMENT:
if gst.pbutils.is_missing_plugin_message(message):
missing_description = encoding.locale_decode(
_missing_plugin_desc(message))
elif message.type == gst.MESSAGE_APPLICATION:
mime = message.structure.get_name()
if mime.startswith('text/') or mime == 'application/xml':
return tags, mime
elif message.type == gst.MESSAGE_ERROR:
error = encoding.locale_decode(message.parse_error()[0])
if missing_description:
error = '%s (%s)' % (missing_description, error)
raise exceptions.ScannerError(error)
elif message.type == gst.MESSAGE_EOS:
return pipeline
def _have_type(element, probability, caps, decodebin):
decodebin.set_property('sink-caps', caps)
msg = gst.message_new_application(element, caps.get_structure(0))
element.get_bus().post(msg)
def _pad_added(element, pad, sink):
return pad.link(sink.get_pad('sink'))
def _start_pipeline(pipeline):
if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_NO_PREROLL:
pipeline.set_state(gst.STATE_PLAYING)
def _query_duration(pipeline):
try:
duration = pipeline.query_duration(gst.FORMAT_TIME, None)[0]
except gst.QueryError:
return None
if duration < 0:
return None
else:
return duration // gst.MSECOND
def _query_seekable(pipeline):
query = gst.query_new_seeking(gst.FORMAT_TIME)
pipeline.query(query)
return query.parse_seeking()[1]
def _process(pipeline, timeout):
start = time.time()
tags, mime, missing_description = {}, None, None
bus = pipeline.get_bus()
while time.time() - start < timeout:
if not bus.have_pending():
continue
message = bus.pop()
if message.type == gst.MESSAGE_ELEMENT:
if gst.pbutils.is_missing_plugin_message(message):
missing_description = encoding.locale_decode(
_missing_plugin_desc(message))
elif message.type == gst.MESSAGE_APPLICATION:
mime = message.structure.get_name()
if mime.startswith('text/') or mime == 'application/xml':
return tags, mime
elif message.type == gst.MESSAGE_ASYNC_DONE:
if message.src == self._pipe:
return tags, mime
elif message.type == gst.MESSAGE_TAG:
taglist = message.parse_tag()
# Note that this will only keep the last tag.
tags.update(utils.convert_taglist(taglist))
elif message.type == gst.MESSAGE_ERROR:
error = encoding.locale_decode(message.parse_error()[0])
if missing_description:
error = '%s (%s)' % (missing_description, error)
raise exceptions.ScannerError(error)
elif message.type == gst.MESSAGE_EOS:
return tags, mime
elif message.type == gst.MESSAGE_ASYNC_DONE:
if message.src == pipeline:
return tags, mime
elif message.type == gst.MESSAGE_TAG:
taglist = message.parse_tag()
# Note that this will only keep the last tag.
tags.update(utils.convert_taglist(taglist))
raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms)
def _reset(self):
self._pipe.set_state(gst.STATE_NULL)
def _query_duration(self):
try:
duration = self._pipe.query_duration(gst.FORMAT_TIME, None)[0]
except gst.QueryError:
return None
if duration < 0:
return None
else:
return duration // gst.MSECOND
def _query_seekable(self):
query = gst.query_new_seeking(gst.FORMAT_TIME)
self._pipe.query(query)
return query.parse_seeking()[1]
raise exceptions.ScannerError('Timeout after %dms' % (timeout * 1000))