Merge pull request #1033 from adamcik/feature/scanner-with-typefind
Add typefinding / MIME guess to scan code.
This commit is contained in:
commit
30badf60b9
@ -154,6 +154,13 @@ v0.20.0 (UNRELEASED)
|
||||
|
||||
- Update scanner to operate with milliseconds for duration.
|
||||
|
||||
- Update scanner to use a custom src, typefind and decodebin. This allows us
|
||||
to catch playlists before we try to decode them.
|
||||
|
||||
- Refactored scanner to create a new pipeline per song, this is needed as
|
||||
reseting decodebin is much slower than tearing it down and making a fresh
|
||||
one.
|
||||
|
||||
- Add :meth:`mopidy.audio.AudioListener.tags_changed`. Notifies core when new tags
|
||||
are found.
|
||||
|
||||
@ -173,6 +180,12 @@ v0.20.0 (UNRELEASED)
|
||||
- Add workaround for volume not persisting across tracks on OS X.
|
||||
(Issue: :issue:`886`, PR: :issue:`958`)
|
||||
|
||||
- Improved missing plugin error reporting in scanner.
|
||||
|
||||
- Introduced a new return type for the scanner, a named tuple with ``uri``,
|
||||
``tags``, ``duration``, ``seekable`` and ``mime``. Also added support for
|
||||
checking seekable, and the initial MIME type guess.
|
||||
|
||||
**Stream backend**
|
||||
|
||||
- Add basic tests for the stream library provider.
|
||||
|
||||
@ -1,16 +1,25 @@
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
import time
|
||||
import collections
|
||||
|
||||
import pygst
|
||||
pygst.require('0.10')
|
||||
import gst # noqa
|
||||
import gst.pbutils
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.audio import utils
|
||||
from mopidy.utils import encoding
|
||||
|
||||
_missing_plugin_desc = gst.pbutils.missing_plugin_message_get_description
|
||||
|
||||
_Result = collections.namedtuple(
|
||||
'Result', ('uri', 'tags', 'duration', 'seekable', 'mime'))
|
||||
|
||||
_RAW_AUDIO = gst.Caps(b'audio/x-raw-int; audio/x-raw-float')
|
||||
|
||||
|
||||
# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)?
|
||||
class Scanner(object):
|
||||
"""
|
||||
Helper to get tags and other relevant info from URIs.
|
||||
@ -21,29 +30,8 @@ class Scanner(object):
|
||||
"""
|
||||
|
||||
def __init__(self, timeout=1000, proxy_config=None):
|
||||
self._timeout_ms = timeout
|
||||
|
||||
sink = gst.element_factory_make('fakesink')
|
||||
|
||||
audio_caps = gst.Caps(b'audio/x-raw-int; audio/x-raw-float')
|
||||
|
||||
def pad_added(src, pad):
|
||||
return pad.link(sink.get_pad('sink'))
|
||||
|
||||
def source_setup(element, source):
|
||||
utils.setup_proxy(source, proxy_config or {})
|
||||
|
||||
self._uribin = gst.element_factory_make('uridecodebin')
|
||||
self._uribin.set_property('caps', audio_caps)
|
||||
self._uribin.connect('pad-added', pad_added)
|
||||
self._uribin.connect('source-setup', source_setup)
|
||||
|
||||
self._pipe = gst.element_factory_make('pipeline')
|
||||
self._pipe.add(self._uribin)
|
||||
self._pipe.add(sink)
|
||||
|
||||
self._bus = self._pipe.get_bus()
|
||||
self._bus.set_flushing(True)
|
||||
self._timeout_ms = int(timeout)
|
||||
self._proxy_config = proxy_config or {}
|
||||
|
||||
def scan(self, uri):
|
||||
"""
|
||||
@ -51,68 +39,124 @@ class Scanner(object):
|
||||
|
||||
:param uri: URI of the resource to scan.
|
||||
:type event: string
|
||||
:return: (tags, duration) pair. tags is a dictionary of lists for all
|
||||
the tags we found and duration is the length of the URI in
|
||||
milliseconds, or :class:`None` if the URI has no duration.
|
||||
:return: A named tuple containing
|
||||
``(uri, tags, duration, seekable, mime)``.
|
||||
``tags`` is a dictionary of lists for all the tags we found.
|
||||
``duration`` is the length of the URI in milliseconds, or
|
||||
:class:`None` if the URI has no duration. ``seekable`` is boolean.
|
||||
indicating if a seek would succeed.
|
||||
"""
|
||||
tags, duration = None, None
|
||||
tags, duration, seekable, mime = None, None, None, None
|
||||
pipeline = _setup_pipeline(uri, self._proxy_config)
|
||||
|
||||
try:
|
||||
self._setup(uri)
|
||||
tags = self._collect()
|
||||
duration = self._query_duration()
|
||||
_start_pipeline(pipeline)
|
||||
tags, mime = _process(pipeline, self._timeout_ms)
|
||||
duration = _query_duration(pipeline)
|
||||
seekable = _query_seekable(pipeline)
|
||||
finally:
|
||||
self._reset()
|
||||
pipeline.set_state(gst.STATE_NULL)
|
||||
del pipeline
|
||||
|
||||
return tags, duration
|
||||
return _Result(uri, tags, duration, seekable, mime)
|
||||
|
||||
def _setup(self, uri):
|
||||
"""Primes the pipeline for collection."""
|
||||
self._pipe.set_state(gst.STATE_READY)
|
||||
self._uribin.set_property(b'uri', uri)
|
||||
self._bus.set_flushing(False)
|
||||
result = self._pipe.set_state(gst.STATE_PAUSED)
|
||||
if result == gst.STATE_CHANGE_NO_PREROLL:
|
||||
# Live sources don't pre-roll, so set to playing to get data.
|
||||
self._pipe.set_state(gst.STATE_PLAYING)
|
||||
|
||||
def _collect(self):
|
||||
"""Polls for messages to collect data."""
|
||||
start = time.time()
|
||||
timeout_s = self._timeout_ms / 1000.0
|
||||
tags = {}
|
||||
# Turns out it's _much_ faster to just create a new pipeline for every as
|
||||
# decodebins and other elements don't seem to take well to being reused.
|
||||
def _setup_pipeline(uri, proxy_config=None):
|
||||
src = gst.element_make_from_uri(gst.URI_SRC, uri)
|
||||
if not src:
|
||||
raise exceptions.ScannerError('GStreamer can not open: %s' % uri)
|
||||
|
||||
while time.time() - start < timeout_s:
|
||||
if not self._bus.have_pending():
|
||||
continue
|
||||
message = self._bus.pop()
|
||||
typefind = gst.element_factory_make('typefind')
|
||||
decodebin = gst.element_factory_make('decodebin2')
|
||||
sink = gst.element_factory_make('fakesink')
|
||||
|
||||
if message.type == gst.MESSAGE_ERROR:
|
||||
raise exceptions.ScannerError(
|
||||
encoding.locale_decode(message.parse_error()[0]))
|
||||
elif message.type == gst.MESSAGE_EOS:
|
||||
return tags
|
||||
elif message.type == gst.MESSAGE_ASYNC_DONE:
|
||||
if message.src == self._pipe:
|
||||
return tags
|
||||
elif message.type == gst.MESSAGE_TAG:
|
||||
taglist = message.parse_tag()
|
||||
# Note that this will only keep the last tag.
|
||||
tags.update(utils.convert_taglist(taglist))
|
||||
pipeline = gst.element_factory_make('pipeline')
|
||||
pipeline.add_many(src, typefind, decodebin, sink)
|
||||
gst.element_link_many(src, typefind, decodebin)
|
||||
|
||||
raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms)
|
||||
if proxy_config:
|
||||
utils.setup_proxy(src, proxy_config)
|
||||
|
||||
def _reset(self):
|
||||
"""Ensures we cleanup child elements and flush the bus."""
|
||||
self._bus.set_flushing(True)
|
||||
self._pipe.set_state(gst.STATE_NULL)
|
||||
decodebin.set_property('caps', _RAW_AUDIO)
|
||||
decodebin.connect('pad-added', _pad_added, sink)
|
||||
typefind.connect('have-type', _have_type, decodebin)
|
||||
|
||||
def _query_duration(self):
|
||||
try:
|
||||
duration = self._pipe.query_duration(gst.FORMAT_TIME, None)[0]
|
||||
except gst.QueryError:
|
||||
return None
|
||||
return pipeline
|
||||
|
||||
if duration < 0:
|
||||
return None
|
||||
else:
|
||||
return duration // gst.MSECOND
|
||||
|
||||
def _have_type(element, probability, caps, decodebin):
|
||||
decodebin.set_property('sink-caps', caps)
|
||||
msg = gst.message_new_application(element, caps.get_structure(0))
|
||||
element.get_bus().post(msg)
|
||||
|
||||
|
||||
def _pad_added(element, pad, sink):
|
||||
return pad.link(sink.get_pad('sink'))
|
||||
|
||||
|
||||
def _start_pipeline(pipeline):
|
||||
if pipeline.set_state(gst.STATE_PAUSED) == gst.STATE_CHANGE_NO_PREROLL:
|
||||
pipeline.set_state(gst.STATE_PLAYING)
|
||||
|
||||
|
||||
def _query_duration(pipeline):
|
||||
try:
|
||||
duration = pipeline.query_duration(gst.FORMAT_TIME, None)[0]
|
||||
except gst.QueryError:
|
||||
return None
|
||||
|
||||
if duration < 0:
|
||||
return None
|
||||
else:
|
||||
return duration // gst.MSECOND
|
||||
|
||||
|
||||
def _query_seekable(pipeline):
|
||||
query = gst.query_new_seeking(gst.FORMAT_TIME)
|
||||
pipeline.query(query)
|
||||
return query.parse_seeking()[1]
|
||||
|
||||
|
||||
def _process(pipeline, timeout_ms):
|
||||
clock = pipeline.get_clock()
|
||||
bus = pipeline.get_bus()
|
||||
timeout = timeout_ms * gst.MSECOND
|
||||
tags, mime, missing_description = {}, None, None
|
||||
|
||||
types = (gst.MESSAGE_ELEMENT | gst.MESSAGE_APPLICATION | gst.MESSAGE_ERROR
|
||||
| gst.MESSAGE_EOS | gst.MESSAGE_ASYNC_DONE | gst.MESSAGE_TAG)
|
||||
|
||||
start = clock.get_time()
|
||||
while timeout > 0:
|
||||
message = bus.timed_pop_filtered(timeout, types)
|
||||
|
||||
if message is None:
|
||||
break
|
||||
elif message.type == gst.MESSAGE_ELEMENT:
|
||||
if gst.pbutils.is_missing_plugin_message(message):
|
||||
missing_description = encoding.locale_decode(
|
||||
_missing_plugin_desc(message))
|
||||
elif message.type == gst.MESSAGE_APPLICATION:
|
||||
mime = message.structure.get_name()
|
||||
if mime.startswith('text/') or mime == 'application/xml':
|
||||
return tags, mime
|
||||
elif message.type == gst.MESSAGE_ERROR:
|
||||
error = encoding.locale_decode(message.parse_error()[0])
|
||||
if missing_description:
|
||||
error = '%s (%s)' % (missing_description, error)
|
||||
raise exceptions.ScannerError(error)
|
||||
elif message.type == gst.MESSAGE_EOS:
|
||||
return tags, mime
|
||||
elif message.type == gst.MESSAGE_ASYNC_DONE:
|
||||
if message.src == pipeline:
|
||||
return tags, mime
|
||||
elif message.type == gst.MESSAGE_TAG:
|
||||
taglist = message.parse_tag()
|
||||
# Note that this will only keep the last tag.
|
||||
tags.update(utils.convert_taglist(taglist))
|
||||
|
||||
timeout -= clock.get_time() - start
|
||||
|
||||
raise exceptions.ScannerError('Timeout after %dms' % timeout_ms)
|
||||
|
||||
@ -133,7 +133,8 @@ class ScanCommand(commands.Command):
|
||||
try:
|
||||
relpath = translator.local_track_uri_to_path(uri, media_dir)
|
||||
file_uri = path.path_to_uri(os.path.join(media_dir, relpath))
|
||||
tags, duration = scanner.scan(file_uri)
|
||||
result = scanner.scan(file_uri)
|
||||
tags, duration = result.tags, result.duration
|
||||
if duration < MIN_DURATION_MS:
|
||||
logger.warning('Failed %s: Track shorter than %dms',
|
||||
uri, MIN_DURATION_MS)
|
||||
|
||||
@ -45,9 +45,9 @@ class StreamLibraryProvider(backend.LibraryProvider):
|
||||
return [Track(uri=uri)]
|
||||
|
||||
try:
|
||||
tags, duration = self._scanner.scan(uri)
|
||||
track = utils.convert_tags_to_track(tags).copy(
|
||||
uri=uri, length=duration)
|
||||
result = self._scanner.scan(uri)
|
||||
track = utils.convert_tags_to_track(result.tags).copy(
|
||||
uri=uri, length=result.duration)
|
||||
except exceptions.ScannerError as e:
|
||||
logger.warning('Problem looking up %s: %s', uri, e)
|
||||
track = Track(uri=uri)
|
||||
|
||||
@ -31,9 +31,9 @@ class ScannerTest(unittest.TestCase):
|
||||
uri = path_lib.path_to_uri(path)
|
||||
key = uri[len('file://'):]
|
||||
try:
|
||||
tags, duration = scanner.scan(uri)
|
||||
self.tags[key] = tags
|
||||
self.durations[key] = duration
|
||||
result = scanner.scan(uri)
|
||||
self.tags[key] = result.tags
|
||||
self.durations[key] = result.duration
|
||||
except exceptions.ScannerError as error:
|
||||
self.errors[key] = error
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user