Merge pull request #1281 from jodal/fix/1250-nested-stream-playlists
Fix #1250: Unwrap nested stream playlists
This commit is contained in:
commit
c1c38472ff
@ -28,6 +28,9 @@ Bug fix release.
|
||||
- Core: Fix error in :meth:`~mopidy.core.TracklistController.get_eot_tlid`
|
||||
docstring. (Fixes: :issue:`1269`)
|
||||
|
||||
- Audio: Add ``timeout`` parameter to :meth:`~mopidy.audio.scan.Scanner.scan`.
|
||||
(Part of: :issue:`1250`, PR: :issue:`1281`)
|
||||
|
||||
- Extension support: Make :meth:`~mopidy.ext.Extension.get_cache_dir`,
|
||||
:meth:`~mopidy.ext.Extension.get_config_dir`, and
|
||||
:meth:`~mopidy.ext.Extension.get_data_dir` class methods, so they can be used
|
||||
@ -50,6 +53,11 @@ Bug fix release.
|
||||
services installed from packages that properly set :confval:`core/data_dir`,
|
||||
like the Debian and Arch pakages. (Fixes: :issue:`1259`, PR: :issue:`1266`)
|
||||
|
||||
- Stream: Expand nested playlists to find the stream URI. This used to work,
|
||||
but regressed in 1.1.0 with the extraction of stream playlist parsing from
|
||||
GStreamer to being handled by the Mopidy-Stream backend. (Fixes:
|
||||
:issue:`1250`, PR: :issue:`1281`)
|
||||
|
||||
- Stream: If "file" is present in the :confval:`stream/protocols` config value
|
||||
and the :ref:`ext-file` extension is enabled, we exited with an error because
|
||||
two extensions claimed the same URI scheme. We now log a warning recommending
|
||||
|
||||
@ -33,12 +33,15 @@ class Scanner(object):
|
||||
self._timeout_ms = int(timeout)
|
||||
self._proxy_config = proxy_config or {}
|
||||
|
||||
def scan(self, uri):
|
||||
def scan(self, uri, timeout=None):
|
||||
"""
|
||||
Scan the given uri collecting relevant metadata.
|
||||
|
||||
:param uri: URI of the resource to scan.
|
||||
:type event: string
|
||||
:type uri: string
|
||||
:param timeout: timeout for scanning a URI in ms. Defaults to the
|
||||
``timeout`` value used when creating the scanner.
|
||||
:type timeout: int
|
||||
:return: A named tuple containing
|
||||
``(uri, tags, duration, seekable, mime)``.
|
||||
``tags`` is a dictionary of lists for all the tags we found.
|
||||
@ -46,12 +49,13 @@ class Scanner(object):
|
||||
:class:`None` if the URI has no duration. ``seekable`` is boolean.
|
||||
indicating if a seek would succeed.
|
||||
"""
|
||||
timeout = int(timeout or self._timeout_ms)
|
||||
tags, duration, seekable, mime = None, None, None, None
|
||||
pipeline = _setup_pipeline(uri, self._proxy_config)
|
||||
|
||||
try:
|
||||
_start_pipeline(pipeline)
|
||||
tags, mime, have_audio = _process(pipeline, self._timeout_ms)
|
||||
tags, mime, have_audio = _process(pipeline, timeout)
|
||||
duration = _query_duration(pipeline)
|
||||
seekable = _query_seekable(pipeline)
|
||||
finally:
|
||||
@ -132,7 +136,10 @@ def _process(pipeline, timeout_ms):
|
||||
clock = pipeline.get_clock()
|
||||
bus = pipeline.get_bus()
|
||||
timeout = timeout_ms * gst.MSECOND
|
||||
tags, mime, have_audio, missing_message = {}, None, False, None
|
||||
tags = {}
|
||||
mime = None
|
||||
have_audio = False
|
||||
missing_message = None
|
||||
|
||||
types = (gst.MESSAGE_ELEMENT | gst.MESSAGE_APPLICATION | gst.MESSAGE_ERROR
|
||||
| gst.MESSAGE_EOS | gst.MESSAGE_ASYNC_DONE | gst.MESSAGE_TAG)
|
||||
|
||||
@ -1,9 +1,14 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import logging
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
from mopidy import httpclient
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_requests_session(proxy_config, user_agent):
|
||||
proxy = httpclient.format_proxy(proxy_config)
|
||||
@ -14,3 +19,30 @@ def get_requests_session(proxy_config, user_agent):
|
||||
session.headers.update({'user-agent': full_user_agent})
|
||||
|
||||
return session
|
||||
|
||||
|
||||
def download(session, uri, timeout=1.0, chunk_size=4096):
|
||||
try:
|
||||
response = session.get(uri, stream=True, timeout=timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning('Download of %r failed due to connection timeout after '
|
||||
'%.3fs', uri, timeout)
|
||||
return None
|
||||
except requests.exceptions.InvalidSchema:
|
||||
logger.warning('%s has an unsupported schema.', uri)
|
||||
return None
|
||||
|
||||
content = []
|
||||
deadline = time.time() + timeout
|
||||
for chunk in response.iter_content(chunk_size):
|
||||
content.append(chunk)
|
||||
if time.time() > deadline:
|
||||
logger.warning('Download of %r failed due to download taking more '
|
||||
'than %.3fs', uri, timeout)
|
||||
return None
|
||||
|
||||
if not response.ok:
|
||||
logger.warning('Problem downloading %r: %s', uri, response.reason)
|
||||
return None
|
||||
|
||||
return b''.join(content)
|
||||
|
||||
@ -8,8 +8,6 @@ import urlparse
|
||||
|
||||
import pykka
|
||||
|
||||
import requests
|
||||
|
||||
from mopidy import audio as audio_lib, backend, exceptions, stream
|
||||
from mopidy.audio import scan, utils
|
||||
from mopidy.internal import http, playlists
|
||||
@ -77,59 +75,84 @@ class StreamPlaybackProvider(backend.PlaybackProvider):
|
||||
super(StreamPlaybackProvider, self).__init__(audio, backend)
|
||||
self._config = config
|
||||
self._scanner = backend._scanner
|
||||
|
||||
def translate_uri(self, uri):
|
||||
try:
|
||||
scan_result = self._scanner.scan(uri)
|
||||
except exceptions.ScannerError as e:
|
||||
logger.warning(
|
||||
'Problem scanning URI %s: %s', uri, e)
|
||||
return None
|
||||
|
||||
if not (scan_result.mime.startswith('text/') or
|
||||
scan_result.mime.startswith('application/')):
|
||||
return uri
|
||||
|
||||
content = self._download(uri)
|
||||
if content is None:
|
||||
return None
|
||||
|
||||
tracks = list(playlists.parse(content))
|
||||
if tracks:
|
||||
# TODO Test streams and return first that seems to be playable
|
||||
return tracks[0]
|
||||
|
||||
def _download(self, uri):
|
||||
timeout = self._config['stream']['timeout'] / 1000.0
|
||||
|
||||
session = http.get_requests_session(
|
||||
proxy_config=self._config['proxy'],
|
||||
self._session = http.get_requests_session(
|
||||
proxy_config=config['proxy'],
|
||||
user_agent='%s/%s' % (
|
||||
stream.Extension.dist_name, stream.Extension.version))
|
||||
|
||||
def translate_uri(self, uri):
|
||||
return _unwrap_stream(
|
||||
uri,
|
||||
timeout=self._config['stream']['timeout'],
|
||||
scanner=self._scanner,
|
||||
requests_session=self._session)
|
||||
|
||||
|
||||
def _unwrap_stream(uri, timeout, scanner, requests_session):
|
||||
"""
|
||||
Get a stream URI from a playlist URI, ``uri``.
|
||||
|
||||
Unwraps nested playlists until something that's not a playlist is found or
|
||||
the ``timeout`` is reached.
|
||||
"""
|
||||
|
||||
original_uri = uri
|
||||
seen_uris = set()
|
||||
deadline = time.time() + timeout
|
||||
|
||||
while time.time() < deadline:
|
||||
if uri in seen_uris:
|
||||
logger.info(
|
||||
'Unwrapping stream from URI (%s) failed: '
|
||||
'playlist referenced itself', uri)
|
||||
return None
|
||||
else:
|
||||
seen_uris.add(uri)
|
||||
|
||||
logger.debug('Unwrapping stream from URI: %s', uri)
|
||||
|
||||
try:
|
||||
response = session.get(
|
||||
uri, stream=True, timeout=timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning(
|
||||
'Download of stream playlist (%s) failed due to connection '
|
||||
'timeout after %.3fs', uri, timeout)
|
||||
return None
|
||||
|
||||
deadline = time.time() + timeout
|
||||
content = []
|
||||
for chunk in response.iter_content(4096):
|
||||
content.append(chunk)
|
||||
if time.time() > deadline:
|
||||
logger.warning(
|
||||
'Download of stream playlist (%s) failed due to download '
|
||||
'taking more than %.3fs', uri, timeout)
|
||||
scan_timeout = deadline - time.time()
|
||||
if scan_timeout < 0:
|
||||
logger.info(
|
||||
'Unwrapping stream from URI (%s) failed: '
|
||||
'timed out in %sms', uri, timeout)
|
||||
return None
|
||||
scan_result = scanner.scan(uri, timeout=scan_timeout)
|
||||
except exceptions.ScannerError as exc:
|
||||
logger.debug('GStreamer failed scanning URI (%s): %s', uri, exc)
|
||||
scan_result = None
|
||||
|
||||
if not response.ok:
|
||||
logger.warning(
|
||||
'Problem downloading stream playlist %s: %s',
|
||||
uri, response.reason)
|
||||
if scan_result is not None and not (
|
||||
scan_result.mime.startswith('text/') or
|
||||
scan_result.mime.startswith('application/')):
|
||||
logger.debug(
|
||||
'Unwrapped potential %s stream: %s', scan_result.mime, uri)
|
||||
return uri
|
||||
|
||||
download_timeout = deadline - time.time()
|
||||
if download_timeout < 0:
|
||||
logger.info(
|
||||
'Unwrapping stream from URI (%s) failed: timed out in %sms',
|
||||
uri, timeout)
|
||||
return None
|
||||
content = http.download(
|
||||
requests_session, uri, timeout=download_timeout)
|
||||
|
||||
if content is None:
|
||||
logger.info(
|
||||
'Unwrapping stream from URI (%s) failed: '
|
||||
'error downloading URI %s', original_uri, uri)
|
||||
return None
|
||||
|
||||
return b''.join(content)
|
||||
uris = playlists.parse(content)
|
||||
if not uris:
|
||||
logger.debug(
|
||||
'Failed parsing URI (%s) as playlist; found potential stream.',
|
||||
uri)
|
||||
return uri
|
||||
|
||||
# TODO Test streams and return first that seems to be playable
|
||||
logger.debug(
|
||||
'Parsed playlist (%s) and found new URI: %s', uri, uris[0])
|
||||
uri = uris[0]
|
||||
|
||||
63
tests/internal/test_http.py
Normal file
63
tests/internal/test_http.py
Normal file
@ -0,0 +1,63 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import mock
|
||||
|
||||
import pytest
|
||||
|
||||
import requests
|
||||
|
||||
import responses
|
||||
|
||||
from mopidy.internal import http
|
||||
|
||||
|
||||
TIMEOUT = 1000
|
||||
URI = 'http://example.com/foo.txt'
|
||||
BODY = "This is the contents of foo.txt."
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session():
|
||||
return requests.Session()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def session_mock():
|
||||
return mock.Mock(spec=requests.Session)
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_download_on_server_side_error(session, caplog):
|
||||
responses.add(responses.GET, URI, body=BODY, status=500)
|
||||
|
||||
result = http.download(session, URI)
|
||||
|
||||
assert result is None
|
||||
assert 'Problem downloading' in caplog.text()
|
||||
|
||||
|
||||
def test_download_times_out_if_connection_times_out(session_mock, caplog):
|
||||
session_mock.get.side_effect = requests.exceptions.Timeout
|
||||
|
||||
result = http.download(session_mock, URI, timeout=1.0)
|
||||
|
||||
session_mock.get.assert_called_once_with(URI, timeout=1.0, stream=True)
|
||||
assert result is None
|
||||
assert (
|
||||
'Download of %r failed due to connection timeout after 1.000s' % URI
|
||||
in caplog.text())
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_download_times_out_if_download_is_slow(session, caplog):
|
||||
responses.add(responses.GET, URI, body=BODY, content_type='text/plain')
|
||||
|
||||
with mock.patch.object(http, 'time') as time_mock:
|
||||
time_mock.time.side_effect = [0, TIMEOUT + 1]
|
||||
|
||||
result = http.download(session, URI)
|
||||
|
||||
assert result is None
|
||||
assert (
|
||||
'Download of %r failed due to download taking more than 1.000s' % URI
|
||||
in caplog.text())
|
||||
@ -4,8 +4,6 @@ import mock
|
||||
|
||||
import pytest
|
||||
|
||||
import requests
|
||||
|
||||
import responses
|
||||
|
||||
from mopidy import exceptions
|
||||
@ -14,7 +12,8 @@ from mopidy.stream import actor
|
||||
|
||||
|
||||
TIMEOUT = 1000
|
||||
URI = 'http://example.com/listen.m3u'
|
||||
PLAYLIST_URI = 'http://example.com/listen.m3u'
|
||||
STREAM_URI = 'http://example.com/stream.mp3'
|
||||
BODY = """
|
||||
#EXTM3U
|
||||
http://example.com/stream.mp3
|
||||
@ -39,9 +38,7 @@ def audio():
|
||||
|
||||
@pytest.fixture
|
||||
def scanner():
|
||||
scanner = mock.Mock(spec=scan.Scanner)
|
||||
scanner.scan.return_value.mime = 'text/foo'
|
||||
return scanner
|
||||
return mock.Mock(spec=scan.Scanner)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@ -57,89 +54,134 @@ def provider(audio, backend, config):
|
||||
return actor.StreamPlaybackProvider(audio, backend, config)
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_of_audio_stream_returns_same_uri(
|
||||
scanner, provider):
|
||||
class TestTranslateURI(object):
|
||||
|
||||
scanner.scan.return_value.mime = 'audio/ogg'
|
||||
@responses.activate
|
||||
def test_audio_stream_returns_same_uri(self, scanner, provider):
|
||||
scanner.scan.return_value.mime = 'audio/mpeg'
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
result = provider.translate_uri(STREAM_URI)
|
||||
|
||||
scanner.scan.assert_called_once_with(URI)
|
||||
assert result == URI
|
||||
scanner.scan.assert_called_once_with(STREAM_URI, timeout=mock.ANY)
|
||||
assert result == STREAM_URI
|
||||
|
||||
@responses.activate
|
||||
def test_text_playlist_with_mpeg_stream(
|
||||
self, scanner, provider, caplog):
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_of_playlist_returns_first_uri_in_list(
|
||||
scanner, provider):
|
||||
scanner.scan.side_effect = [
|
||||
mock.Mock(mime='text/foo'), # scanning playlist
|
||||
mock.Mock(mime='audio/mpeg'), # scanning stream
|
||||
]
|
||||
responses.add(
|
||||
responses.GET, PLAYLIST_URI,
|
||||
body=BODY, content_type='audio/x-mpegurl')
|
||||
|
||||
responses.add(
|
||||
responses.GET, URI, body=BODY, content_type='audio/x-mpegurl')
|
||||
result = provider.translate_uri(PLAYLIST_URI)
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
assert scanner.scan.mock_calls == [
|
||||
mock.call(PLAYLIST_URI, timeout=mock.ANY),
|
||||
mock.call(STREAM_URI, timeout=mock.ANY),
|
||||
]
|
||||
assert result == STREAM_URI
|
||||
|
||||
scanner.scan.assert_called_once_with(URI)
|
||||
assert result == 'http://example.com/stream.mp3'
|
||||
assert responses.calls[0].request.headers['User-Agent'].startswith(
|
||||
'Mopidy-Stream/')
|
||||
# Check logging to ensure debuggability
|
||||
assert 'Unwrapping stream from URI: %s' % PLAYLIST_URI
|
||||
assert 'Parsed playlist (%s)' % PLAYLIST_URI in caplog.text()
|
||||
assert 'Unwrapping stream from URI: %s' % STREAM_URI
|
||||
assert (
|
||||
'Unwrapped potential audio/mpeg stream: %s' % STREAM_URI
|
||||
in caplog.text())
|
||||
|
||||
# Check proper Requests session setup
|
||||
assert responses.calls[0].request.headers['User-Agent'].startswith(
|
||||
'Mopidy-Stream/')
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_of_playlist_with_xml_mimetype(scanner, provider):
|
||||
scanner.scan.return_value.mime = 'application/xspf+xml'
|
||||
responses.add(
|
||||
responses.GET, URI, body=BODY, content_type='application/xspf+xml')
|
||||
@responses.activate
|
||||
def test_xml_playlist_with_mpeg_stream(self, scanner, provider):
|
||||
scanner.scan.side_effect = [
|
||||
mock.Mock(mime='application/xspf+xml'), # scanning playlist
|
||||
mock.Mock(mime='audio/mpeg'), # scanning stream
|
||||
]
|
||||
responses.add(
|
||||
responses.GET, PLAYLIST_URI,
|
||||
body=BODY, content_type='application/xspf+xml')
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
result = provider.translate_uri(PLAYLIST_URI)
|
||||
|
||||
scanner.scan.assert_called_once_with(URI)
|
||||
assert result == 'http://example.com/stream.mp3'
|
||||
assert scanner.scan.mock_calls == [
|
||||
mock.call(PLAYLIST_URI, timeout=mock.ANY),
|
||||
mock.call(STREAM_URI, timeout=mock.ANY),
|
||||
]
|
||||
assert result == STREAM_URI
|
||||
|
||||
@responses.activate
|
||||
def test_scan_fails_but_playlist_parsing_succeeds(
|
||||
self, scanner, provider, caplog):
|
||||
|
||||
def test_translate_uri_when_scanner_fails(scanner, provider, caplog):
|
||||
scanner.scan.side_effect = exceptions.ScannerError('foo failed')
|
||||
scanner.scan.side_effect = [
|
||||
exceptions.ScannerError('some failure'), # scanning playlist
|
||||
mock.Mock(mime='audio/mpeg'), # scanning stream
|
||||
]
|
||||
responses.add(
|
||||
responses.GET, PLAYLIST_URI,
|
||||
body=BODY, content_type='audio/x-mpegurl')
|
||||
|
||||
result = provider.translate_uri('bar')
|
||||
result = provider.translate_uri(PLAYLIST_URI)
|
||||
|
||||
assert result is None
|
||||
assert 'Problem scanning URI bar: foo failed' in caplog.text()
|
||||
assert 'Unwrapping stream from URI: %s' % PLAYLIST_URI
|
||||
assert (
|
||||
'GStreamer failed scanning URI (%s)' % PLAYLIST_URI
|
||||
in caplog.text())
|
||||
assert 'Parsed playlist (%s)' % PLAYLIST_URI in caplog.text()
|
||||
assert (
|
||||
'Unwrapped potential audio/mpeg stream: %s' % STREAM_URI
|
||||
in caplog.text())
|
||||
assert result == STREAM_URI
|
||||
|
||||
@responses.activate
|
||||
def test_scan_fails_and_playlist_parsing_fails(
|
||||
self, scanner, provider, caplog):
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_when_playlist_download_fails(provider, caplog):
|
||||
responses.add(responses.GET, URI, body=BODY, status=500)
|
||||
scanner.scan.side_effect = exceptions.ScannerError('some failure')
|
||||
responses.add(
|
||||
responses.GET, STREAM_URI,
|
||||
body=b'some audio data', content_type='audio/mpeg')
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
result = provider.translate_uri(STREAM_URI)
|
||||
|
||||
assert result is None
|
||||
assert 'Problem downloading stream playlist' in caplog.text()
|
||||
assert 'Unwrapping stream from URI: %s' % STREAM_URI
|
||||
assert (
|
||||
'GStreamer failed scanning URI (%s)' % STREAM_URI
|
||||
in caplog.text())
|
||||
assert (
|
||||
'Failed parsing URI (%s) as playlist; found potential stream.'
|
||||
% STREAM_URI in caplog.text())
|
||||
assert result == STREAM_URI
|
||||
|
||||
def test_failed_download_returns_none(self, provider, caplog):
|
||||
with mock.patch.object(actor, 'http') as http_mock:
|
||||
http_mock.download.return_value = None
|
||||
|
||||
def test_translate_uri_times_out_if_connection_times_out(provider, caplog):
|
||||
with mock.patch.object(actor.requests, 'Session') as session_mock:
|
||||
get_mock = session_mock.return_value.get
|
||||
get_mock.side_effect = requests.exceptions.Timeout
|
||||
result = provider.translate_uri(PLAYLIST_URI)
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
assert result is None
|
||||
|
||||
get_mock.assert_called_once_with(URI, timeout=1.0, stream=True)
|
||||
assert result is None
|
||||
assert (
|
||||
'Download of stream playlist (%s) failed due to connection '
|
||||
'timeout after 1.000s' % URI in caplog.text())
|
||||
@responses.activate
|
||||
def test_playlist_references_itself(self, scanner, provider, caplog):
|
||||
scanner.scan.return_value.mime = 'text/foo'
|
||||
responses.add(
|
||||
responses.GET, PLAYLIST_URI,
|
||||
body=BODY.replace(STREAM_URI, PLAYLIST_URI),
|
||||
content_type='audio/x-mpegurl')
|
||||
|
||||
result = provider.translate_uri(PLAYLIST_URI)
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_times_out_if_download_is_slow(provider, caplog):
|
||||
responses.add(
|
||||
responses.GET, URI, body=BODY, content_type='audio/x-mpegurl')
|
||||
|
||||
with mock.patch.object(actor, 'time') as time_mock:
|
||||
time_mock.time.side_effect = [0, TIMEOUT + 1]
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
|
||||
assert result is None
|
||||
assert (
|
||||
'Download of stream playlist (%s) failed due to download taking '
|
||||
'more than 1.000s' % URI in caplog.text())
|
||||
assert 'Unwrapping stream from URI: %s' % PLAYLIST_URI in caplog.text()
|
||||
assert (
|
||||
'Parsed playlist (%s) and found new URI: %s'
|
||||
% (PLAYLIST_URI, PLAYLIST_URI)) in caplog.text()
|
||||
assert (
|
||||
'Unwrapping stream from URI (%s) failed: '
|
||||
'playlist referenced itself' % PLAYLIST_URI) in caplog.text()
|
||||
assert result is None
|
||||
|
||||
Loading…
Reference in New Issue
Block a user