Merge pull request #1239 from jodal/feature/stream-playlists
stream/audio: Make stream backend parse radio playlists itself
This commit is contained in:
commit
61bb7df64d
@ -10,9 +10,11 @@ flake8-import-order
|
||||
|
||||
# Mock dependencies in tests
|
||||
mock
|
||||
responses
|
||||
|
||||
# Test runners
|
||||
pytest
|
||||
pytest-capturelog
|
||||
pytest-cov
|
||||
pytest-xdist
|
||||
tox
|
||||
|
||||
@ -7,6 +7,11 @@ This changelog is used to track all major changes to Mopidy.
|
||||
v1.1.0 (UNRELEASED)
|
||||
===================
|
||||
|
||||
Dependencies
|
||||
------------
|
||||
|
||||
- Mopidy now requires Requests.
|
||||
|
||||
Core API
|
||||
--------
|
||||
|
||||
@ -83,6 +88,12 @@ MPD frontend
|
||||
- Track data now include the ``Last-Modified`` field if set on the track model.
|
||||
(Fixes: :issue:`1218`, PR: :issue:`1219`)
|
||||
|
||||
Stream backend
|
||||
--------------
|
||||
|
||||
- Move stream playlist parsing from GStreamer to the stream backend. (Fixes:
|
||||
:issue:`671`)
|
||||
|
||||
Local backend
|
||||
-------------
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@ import gst.pbutils # noqa
|
||||
import pykka
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.audio import playlists, utils
|
||||
from mopidy.audio import icy, utils
|
||||
from mopidy.audio.constants import PlaybackState
|
||||
from mopidy.audio.listener import AudioListener
|
||||
from mopidy.internal import deprecation, process
|
||||
@ -26,8 +26,7 @@ logger = logging.getLogger(__name__)
|
||||
# set_state on a pipeline.
|
||||
gst_logger = logging.getLogger('mopidy.audio.gst')
|
||||
|
||||
playlists.register_typefinders()
|
||||
playlists.register_elements()
|
||||
icy.register()
|
||||
|
||||
_GST_STATE_MAPPING = {
|
||||
gst.STATE_PLAYING: PlaybackState.PLAYING,
|
||||
|
||||
63
mopidy/audio/icy.py
Normal file
63
mopidy/audio/icy.py
Normal file
@ -0,0 +1,63 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import gobject
|
||||
|
||||
import pygst
|
||||
pygst.require('0.10')
|
||||
import gst # noqa
|
||||
|
||||
|
||||
class IcySrc(gst.Bin, gst.URIHandler):
|
||||
__gstdetails__ = ('IcySrc',
|
||||
'Src',
|
||||
'HTTP src wrapper for icy:// support.',
|
||||
'Mopidy')
|
||||
|
||||
srcpad_template = gst.PadTemplate(
|
||||
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
|
||||
gst.caps_new_any())
|
||||
|
||||
__gsttemplates__ = (srcpad_template,)
|
||||
|
||||
def __init__(self):
|
||||
super(IcySrc, self).__init__()
|
||||
self._httpsrc = gst.element_make_from_uri(gst.URI_SRC, 'http://')
|
||||
try:
|
||||
self._httpsrc.set_property('iradio-mode', True)
|
||||
except TypeError:
|
||||
pass
|
||||
self.add(self._httpsrc)
|
||||
|
||||
self._srcpad = gst.GhostPad('src', self._httpsrc.get_pad('src'))
|
||||
self.add_pad(self._srcpad)
|
||||
|
||||
@classmethod
|
||||
def do_get_type_full(cls):
|
||||
return gst.URI_SRC
|
||||
|
||||
@classmethod
|
||||
def do_get_protocols_full(cls):
|
||||
return [b'icy', b'icyx']
|
||||
|
||||
def do_set_uri(self, uri):
|
||||
if uri.startswith('icy://'):
|
||||
return self._httpsrc.set_uri(b'http://' + uri[len('icy://'):])
|
||||
elif uri.startswith('icyx://'):
|
||||
return self._httpsrc.set_uri(b'https://' + uri[len('icyx://'):])
|
||||
else:
|
||||
return False
|
||||
|
||||
def do_get_uri(self):
|
||||
uri = self._httpsrc.get_uri()
|
||||
if uri.startswith('http://'):
|
||||
return b'icy://' + uri[len('http://'):]
|
||||
else:
|
||||
return b'icyx://' + uri[len('https://'):]
|
||||
|
||||
|
||||
def register():
|
||||
# Only register icy if gst install can't handle it on it's own.
|
||||
if not gst.element_make_from_uri(gst.URI_SRC, 'icy://'):
|
||||
gobject.type_register(IcySrc)
|
||||
gst.element_register(
|
||||
IcySrc, IcySrc.__name__.lower(), gst.RANK_MARGINAL)
|
||||
@ -1,420 +0,0 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import io
|
||||
|
||||
import gobject
|
||||
|
||||
import pygst
|
||||
pygst.require('0.10')
|
||||
import gst # noqa
|
||||
|
||||
from mopidy.compat import configparser
|
||||
|
||||
try:
|
||||
import xml.etree.cElementTree as elementtree
|
||||
except ImportError:
|
||||
import xml.etree.ElementTree as elementtree
|
||||
|
||||
|
||||
# TODO: make detect_FOO_header reusable in general mopidy code.
|
||||
# i.e. give it just a "peek" like function.
|
||||
def detect_m3u_header(typefind):
|
||||
return typefind.peek(0, 7).upper() == b'#EXTM3U'
|
||||
|
||||
|
||||
def detect_pls_header(typefind):
|
||||
return typefind.peek(0, 10).lower() == b'[playlist]'
|
||||
|
||||
|
||||
def detect_xspf_header(typefind):
|
||||
data = typefind.peek(0, 150)
|
||||
if b'xspf' not in data.lower():
|
||||
return False
|
||||
|
||||
try:
|
||||
data = io.BytesIO(data)
|
||||
for event, element in elementtree.iterparse(data, events=(b'start',)):
|
||||
return element.tag.lower() == '{http://xspf.org/ns/0/}playlist'
|
||||
except elementtree.ParseError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def detect_asx_header(typefind):
|
||||
data = typefind.peek(0, 50)
|
||||
if b'asx' not in data.lower():
|
||||
return False
|
||||
|
||||
try:
|
||||
data = io.BytesIO(data)
|
||||
for event, element in elementtree.iterparse(data, events=(b'start',)):
|
||||
return element.tag.lower() == 'asx'
|
||||
except elementtree.ParseError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def parse_m3u(data):
|
||||
# TODO: convert non URIs to file URIs.
|
||||
found_header = False
|
||||
for line in data.readlines():
|
||||
if found_header or line.startswith(b'#EXTM3U'):
|
||||
found_header = True
|
||||
else:
|
||||
continue
|
||||
if not line.startswith(b'#') and line.strip():
|
||||
yield line.strip()
|
||||
|
||||
|
||||
def parse_pls(data):
|
||||
# TODO: convert non URIs to file URIs.
|
||||
try:
|
||||
cp = configparser.RawConfigParser()
|
||||
cp.readfp(data)
|
||||
except configparser.Error:
|
||||
return
|
||||
|
||||
for section in cp.sections():
|
||||
if section.lower() != 'playlist':
|
||||
continue
|
||||
for i in range(cp.getint(section, 'numberofentries')):
|
||||
yield cp.get(section, 'file%d' % (i + 1))
|
||||
|
||||
|
||||
def parse_xspf(data):
|
||||
try:
|
||||
# Last element will be root.
|
||||
for event, element in elementtree.iterparse(data):
|
||||
element.tag = element.tag.lower() # normalize
|
||||
except elementtree.ParseError:
|
||||
return
|
||||
|
||||
ns = 'http://xspf.org/ns/0/'
|
||||
for track in element.iterfind('{%s}tracklist/{%s}track' % (ns, ns)):
|
||||
yield track.findtext('{%s}location' % ns)
|
||||
|
||||
|
||||
def parse_asx(data):
|
||||
try:
|
||||
# Last element will be root.
|
||||
for event, element in elementtree.iterparse(data):
|
||||
element.tag = element.tag.lower() # normalize
|
||||
except elementtree.ParseError:
|
||||
return
|
||||
|
||||
for ref in element.findall('entry/ref[@href]'):
|
||||
yield ref.get('href', '').strip()
|
||||
|
||||
for entry in element.findall('entry[@href]'):
|
||||
yield entry.get('href', '').strip()
|
||||
|
||||
|
||||
def parse_urilist(data):
|
||||
for line in data.readlines():
|
||||
if not line.startswith('#') and gst.uri_is_valid(line.strip()):
|
||||
yield line
|
||||
|
||||
|
||||
def playlist_typefinder(typefind, func, caps):
|
||||
if func(typefind):
|
||||
typefind.suggest(gst.TYPE_FIND_MAXIMUM, caps)
|
||||
|
||||
|
||||
def register_typefind(mimetype, func, extensions):
|
||||
caps = gst.caps_from_string(mimetype)
|
||||
gst.type_find_register(mimetype, gst.RANK_PRIMARY, playlist_typefinder,
|
||||
extensions, caps, func, caps)
|
||||
|
||||
|
||||
def register_typefinders():
|
||||
register_typefind('audio/x-mpegurl', detect_m3u_header, [b'm3u', b'm3u8'])
|
||||
register_typefind('audio/x-scpls', detect_pls_header, [b'pls'])
|
||||
register_typefind('application/xspf+xml', detect_xspf_header, [b'xspf'])
|
||||
# NOTE: seems we can't use video/x-ms-asf which is the correct mime for asx
|
||||
# as it is shared with asf for streaming videos :/
|
||||
register_typefind('audio/x-ms-asx', detect_asx_header, [b'asx'])
|
||||
|
||||
|
||||
class BasePlaylistElement(gst.Bin):
|
||||
|
||||
"""Base class for creating GStreamer elements for playlist support.
|
||||
|
||||
This element performs the following steps:
|
||||
|
||||
1. Initializes src and sink pads for the element.
|
||||
2. Collects data from the sink until EOS is reached.
|
||||
3. Passes the collected data to :meth:`convert` to get a list of URIs.
|
||||
4. Passes the list of URIs to :meth:`handle`, default handling is to pass
|
||||
the URIs to the src element as a uri-list.
|
||||
5. If handle returned true, the EOS consumed and nothing more happens, if
|
||||
it is not consumed it flows on to the next element downstream, which is
|
||||
likely our uri-list consumer which needs the EOS to know we are done
|
||||
sending URIs.
|
||||
"""
|
||||
|
||||
sinkpad_template = None
|
||||
"""GStreamer pad template to use for sink, must be overriden."""
|
||||
|
||||
srcpad_template = None
|
||||
"""GStreamer pad template to use for src, must be overriden."""
|
||||
|
||||
ghost_srcpad = False
|
||||
"""Indicates if src pad should be ghosted or not."""
|
||||
|
||||
def __init__(self):
|
||||
"""Sets up src and sink pads plus behaviour."""
|
||||
super(BasePlaylistElement, self).__init__()
|
||||
self._data = io.BytesIO()
|
||||
self._done = False
|
||||
|
||||
self.sinkpad = gst.Pad(self.sinkpad_template)
|
||||
self.sinkpad.set_chain_function(self._chain)
|
||||
self.sinkpad.set_event_function(self._event)
|
||||
self.add_pad(self.sinkpad)
|
||||
|
||||
if self.ghost_srcpad:
|
||||
self.srcpad = gst.ghost_pad_new_notarget('src', gst.PAD_SRC)
|
||||
else:
|
||||
self.srcpad = gst.Pad(self.srcpad_template)
|
||||
self.add_pad(self.srcpad)
|
||||
|
||||
def convert(self, data):
|
||||
"""Convert the data we have colleted to URIs.
|
||||
|
||||
:param data: collected data buffer
|
||||
:type data: :class:`io.BytesIO`
|
||||
:returns: iterable or generator of URIs
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def handle(self, uris):
|
||||
"""Do something useful with the URIs.
|
||||
|
||||
:param uris: list of URIs
|
||||
:type uris: :type:`list`
|
||||
:returns: boolean indicating if EOS should be consumed
|
||||
"""
|
||||
# TODO: handle unicode uris which we can get out of elementtree
|
||||
self.srcpad.push(gst.Buffer('\n'.join(uris)))
|
||||
return False
|
||||
|
||||
def _chain(self, pad, buf):
|
||||
if not self._done:
|
||||
self._data.write(buf.data)
|
||||
return gst.FLOW_OK
|
||||
return gst.FLOW_EOS
|
||||
|
||||
def _event(self, pad, event):
|
||||
if event.type == gst.EVENT_NEWSEGMENT:
|
||||
return True
|
||||
|
||||
if event.type == gst.EVENT_EOS:
|
||||
self._done = True
|
||||
self._data.seek(0)
|
||||
if self.handle(list(self.convert(self._data))):
|
||||
return True
|
||||
|
||||
# Ensure we handle remaining events in a sane way.
|
||||
return pad.event_default(event)
|
||||
|
||||
|
||||
class M3uDecoder(BasePlaylistElement):
|
||||
__gstdetails__ = ('M3U Decoder',
|
||||
'Decoder',
|
||||
'Convert .m3u to text/uri-list',
|
||||
'Mopidy')
|
||||
|
||||
sinkpad_template = gst.PadTemplate(
|
||||
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('audio/x-mpegurl'))
|
||||
|
||||
srcpad_template = gst.PadTemplate(
|
||||
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('text/uri-list'))
|
||||
|
||||
__gsttemplates__ = (sinkpad_template, srcpad_template)
|
||||
|
||||
def convert(self, data):
|
||||
return parse_m3u(data)
|
||||
|
||||
|
||||
class PlsDecoder(BasePlaylistElement):
|
||||
__gstdetails__ = ('PLS Decoder',
|
||||
'Decoder',
|
||||
'Convert .pls to text/uri-list',
|
||||
'Mopidy')
|
||||
|
||||
sinkpad_template = gst.PadTemplate(
|
||||
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('audio/x-scpls'))
|
||||
|
||||
srcpad_template = gst.PadTemplate(
|
||||
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('text/uri-list'))
|
||||
|
||||
__gsttemplates__ = (sinkpad_template, srcpad_template)
|
||||
|
||||
def convert(self, data):
|
||||
return parse_pls(data)
|
||||
|
||||
|
||||
class XspfDecoder(BasePlaylistElement):
|
||||
__gstdetails__ = ('XSPF Decoder',
|
||||
'Decoder',
|
||||
'Convert .pls to text/uri-list',
|
||||
'Mopidy')
|
||||
|
||||
sinkpad_template = gst.PadTemplate(
|
||||
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('application/xspf+xml'))
|
||||
|
||||
srcpad_template = gst.PadTemplate(
|
||||
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('text/uri-list'))
|
||||
|
||||
__gsttemplates__ = (sinkpad_template, srcpad_template)
|
||||
|
||||
def convert(self, data):
|
||||
return parse_xspf(data)
|
||||
|
||||
|
||||
class AsxDecoder(BasePlaylistElement):
|
||||
__gstdetails__ = ('ASX Decoder',
|
||||
'Decoder',
|
||||
'Convert .asx to text/uri-list',
|
||||
'Mopidy')
|
||||
|
||||
sinkpad_template = gst.PadTemplate(
|
||||
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('audio/x-ms-asx'))
|
||||
|
||||
srcpad_template = gst.PadTemplate(
|
||||
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('text/uri-list'))
|
||||
|
||||
__gsttemplates__ = (sinkpad_template, srcpad_template)
|
||||
|
||||
def convert(self, data):
|
||||
return parse_asx(data)
|
||||
|
||||
|
||||
class UriListElement(BasePlaylistElement):
|
||||
__gstdetails__ = ('URIListDemuxer',
|
||||
'Demuxer',
|
||||
'Convert a text/uri-list to a stream',
|
||||
'Mopidy')
|
||||
|
||||
sinkpad_template = gst.PadTemplate(
|
||||
'sink', gst.PAD_SINK, gst.PAD_ALWAYS,
|
||||
gst.caps_from_string('text/uri-list'))
|
||||
|
||||
srcpad_template = gst.PadTemplate(
|
||||
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
|
||||
gst.caps_new_any())
|
||||
|
||||
ghost_srcpad = True # We need to hook this up to our internal decodebin
|
||||
|
||||
__gsttemplates__ = (sinkpad_template, srcpad_template)
|
||||
|
||||
def __init__(self):
|
||||
super(UriListElement, self).__init__()
|
||||
self.uridecodebin = gst.element_factory_make('uridecodebin')
|
||||
self.uridecodebin.connect('pad-added', self.pad_added)
|
||||
# Limit to anycaps so we get a single stream out, letting other
|
||||
# elements downstream figure out actual muxing
|
||||
self.uridecodebin.set_property('caps', gst.caps_new_any())
|
||||
|
||||
def pad_added(self, src, pad):
|
||||
self.srcpad.set_target(pad)
|
||||
pad.add_event_probe(self.pad_event)
|
||||
|
||||
def pad_event(self, pad, event):
|
||||
if event.has_name('urilist-played'):
|
||||
error = gst.GError(gst.RESOURCE_ERROR, gst.RESOURCE_ERROR_FAILED,
|
||||
b'Nested playlists not supported.')
|
||||
message = b'Playlists pointing to other playlists is not supported'
|
||||
self.post_message(gst.message_new_error(self, error, message))
|
||||
return 1 # GST_PAD_PROBE_OK
|
||||
|
||||
def handle(self, uris):
|
||||
struct = gst.Structure('urilist-played')
|
||||
event = gst.event_new_custom(gst.EVENT_CUSTOM_UPSTREAM, struct)
|
||||
self.sinkpad.push_event(event)
|
||||
|
||||
# TODO: hookup about to finish and errors to rest of URIs so we
|
||||
# round robin, only giving up once all have been tried.
|
||||
# TODO: uris could be empty.
|
||||
self.add(self.uridecodebin)
|
||||
self.uridecodebin.set_state(gst.STATE_READY)
|
||||
self.uridecodebin.set_property('uri', uris[0])
|
||||
self.uridecodebin.sync_state_with_parent()
|
||||
return True # Make sure we consume the EOS that triggered us.
|
||||
|
||||
def convert(self, data):
|
||||
return parse_urilist(data)
|
||||
|
||||
|
||||
class IcySrc(gst.Bin, gst.URIHandler):
|
||||
__gstdetails__ = ('IcySrc',
|
||||
'Src',
|
||||
'HTTP src wrapper for icy:// support.',
|
||||
'Mopidy')
|
||||
|
||||
srcpad_template = gst.PadTemplate(
|
||||
'src', gst.PAD_SRC, gst.PAD_ALWAYS,
|
||||
gst.caps_new_any())
|
||||
|
||||
__gsttemplates__ = (srcpad_template,)
|
||||
|
||||
def __init__(self):
|
||||
super(IcySrc, self).__init__()
|
||||
self._httpsrc = gst.element_make_from_uri(gst.URI_SRC, 'http://')
|
||||
try:
|
||||
self._httpsrc.set_property('iradio-mode', True)
|
||||
except TypeError:
|
||||
pass
|
||||
self.add(self._httpsrc)
|
||||
|
||||
self._srcpad = gst.GhostPad('src', self._httpsrc.get_pad('src'))
|
||||
self.add_pad(self._srcpad)
|
||||
|
||||
@classmethod
|
||||
def do_get_type_full(cls):
|
||||
return gst.URI_SRC
|
||||
|
||||
@classmethod
|
||||
def do_get_protocols_full(cls):
|
||||
return [b'icy', b'icyx']
|
||||
|
||||
def do_set_uri(self, uri):
|
||||
if uri.startswith('icy://'):
|
||||
return self._httpsrc.set_uri(b'http://' + uri[len('icy://'):])
|
||||
elif uri.startswith('icyx://'):
|
||||
return self._httpsrc.set_uri(b'https://' + uri[len('icyx://'):])
|
||||
else:
|
||||
return False
|
||||
|
||||
def do_get_uri(self):
|
||||
uri = self._httpsrc.get_uri()
|
||||
if uri.startswith('http://'):
|
||||
return b'icy://' + uri[len('http://'):]
|
||||
else:
|
||||
return b'icyx://' + uri[len('https://'):]
|
||||
|
||||
|
||||
def register_element(element_class):
|
||||
gobject.type_register(element_class)
|
||||
gst.element_register(
|
||||
element_class, element_class.__name__.lower(), gst.RANK_MARGINAL)
|
||||
|
||||
|
||||
def register_elements():
|
||||
register_element(M3uDecoder)
|
||||
register_element(PlsDecoder)
|
||||
register_element(XspfDecoder)
|
||||
register_element(AsxDecoder)
|
||||
register_element(UriListElement)
|
||||
|
||||
# Only register icy if gst install can't handle it on it's own.
|
||||
if not gst.element_make_from_uri(gst.URI_SRC, 'icy://'):
|
||||
register_element(IcySrc)
|
||||
@ -1,8 +1,13 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import logging
|
||||
|
||||
from mopidy import listener, models
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Backend(object):
|
||||
|
||||
"""Backend API
|
||||
@ -238,6 +243,9 @@ class PlaybackProvider(object):
|
||||
:rtype: :class:`True` if successful, else :class:`False`
|
||||
"""
|
||||
uri = self.translate_uri(track.uri)
|
||||
if uri != track.uri:
|
||||
logger.debug(
|
||||
'Backend translated URI from %s to %s', track.uri, uri)
|
||||
if not uri:
|
||||
return False
|
||||
self.audio.set_uri(uri).get()
|
||||
|
||||
16
mopidy/internal/http.py
Normal file
16
mopidy/internal/http.py
Normal file
@ -0,0 +1,16 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import requests
|
||||
|
||||
from mopidy import httpclient
|
||||
|
||||
|
||||
def get_requests_session(proxy_config, user_agent):
|
||||
proxy = httpclient.format_proxy(proxy_config)
|
||||
full_user_agent = httpclient.format_user_agent(user_agent)
|
||||
|
||||
session = requests.Session()
|
||||
session.proxies.update({'http': proxy, 'https': proxy})
|
||||
session.headers.update({'user-agent': full_user_agent})
|
||||
|
||||
return session
|
||||
132
mopidy/internal/playlists.py
Normal file
132
mopidy/internal/playlists.py
Normal file
@ -0,0 +1,132 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import io
|
||||
|
||||
import pygst
|
||||
pygst.require('0.10')
|
||||
import gst # noqa
|
||||
|
||||
from mopidy.compat import configparser
|
||||
from mopidy.internal import validation
|
||||
|
||||
try:
|
||||
import xml.etree.cElementTree as elementtree
|
||||
except ImportError:
|
||||
import xml.etree.ElementTree as elementtree
|
||||
|
||||
|
||||
def parse(data):
|
||||
handlers = {
|
||||
detect_extm3u_header: parse_extm3u,
|
||||
detect_pls_header: parse_pls,
|
||||
detect_asx_header: parse_asx,
|
||||
detect_xspf_header: parse_xspf,
|
||||
}
|
||||
for detector, parser in handlers.items():
|
||||
if detector(data):
|
||||
return list(parser(data))
|
||||
return parse_urilist(data) # Fallback
|
||||
|
||||
|
||||
def detect_extm3u_header(data):
|
||||
return data[0:7].upper() == b'#EXTM3U'
|
||||
|
||||
|
||||
def detect_pls_header(data):
|
||||
return data[0:10].lower() == b'[playlist]'
|
||||
|
||||
|
||||
def detect_xspf_header(data):
|
||||
data = data[0:150]
|
||||
if b'xspf' not in data.lower():
|
||||
return False
|
||||
|
||||
try:
|
||||
data = io.BytesIO(data)
|
||||
for event, element in elementtree.iterparse(data, events=(b'start',)):
|
||||
return element.tag.lower() == '{http://xspf.org/ns/0/}playlist'
|
||||
except elementtree.ParseError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def detect_asx_header(data):
|
||||
data = data[0:50]
|
||||
if b'asx' not in data.lower():
|
||||
return False
|
||||
|
||||
try:
|
||||
data = io.BytesIO(data)
|
||||
for event, element in elementtree.iterparse(data, events=(b'start',)):
|
||||
return element.tag.lower() == 'asx'
|
||||
except elementtree.ParseError:
|
||||
pass
|
||||
return False
|
||||
|
||||
|
||||
def parse_extm3u(data):
|
||||
# TODO: convert non URIs to file URIs.
|
||||
found_header = False
|
||||
for line in data.splitlines():
|
||||
if found_header or line.startswith(b'#EXTM3U'):
|
||||
found_header = True
|
||||
else:
|
||||
continue
|
||||
if not line.startswith(b'#') and line.strip():
|
||||
yield line.strip()
|
||||
|
||||
|
||||
def parse_pls(data):
|
||||
# TODO: convert non URIs to file URIs.
|
||||
try:
|
||||
cp = configparser.RawConfigParser()
|
||||
cp.readfp(io.BytesIO(data))
|
||||
except configparser.Error:
|
||||
return
|
||||
|
||||
for section in cp.sections():
|
||||
if section.lower() != 'playlist':
|
||||
continue
|
||||
for i in range(cp.getint(section, 'numberofentries')):
|
||||
yield cp.get(section, 'file%d' % (i + 1))
|
||||
|
||||
|
||||
def parse_xspf(data):
|
||||
try:
|
||||
# Last element will be root.
|
||||
for event, element in elementtree.iterparse(io.BytesIO(data)):
|
||||
element.tag = element.tag.lower() # normalize
|
||||
except elementtree.ParseError:
|
||||
return
|
||||
|
||||
ns = 'http://xspf.org/ns/0/'
|
||||
for track in element.iterfind('{%s}tracklist/{%s}track' % (ns, ns)):
|
||||
yield track.findtext('{%s}location' % ns)
|
||||
|
||||
|
||||
def parse_asx(data):
|
||||
try:
|
||||
# Last element will be root.
|
||||
for event, element in elementtree.iterparse(io.BytesIO(data)):
|
||||
element.tag = element.tag.lower() # normalize
|
||||
except elementtree.ParseError:
|
||||
return
|
||||
|
||||
for ref in element.findall('entry/ref[@href]'):
|
||||
yield ref.get('href', '').strip()
|
||||
|
||||
for entry in element.findall('entry[@href]'):
|
||||
yield entry.get('href', '').strip()
|
||||
|
||||
|
||||
def parse_urilist(data):
|
||||
result = []
|
||||
for line in data.splitlines():
|
||||
if not line.strip() or line.startswith('#'):
|
||||
continue
|
||||
try:
|
||||
validation.check_uri(line)
|
||||
except ValueError:
|
||||
return []
|
||||
result.append(line)
|
||||
return result
|
||||
@ -3,12 +3,16 @@ from __future__ import absolute_import, unicode_literals
|
||||
import fnmatch
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
import urlparse
|
||||
|
||||
import pykka
|
||||
|
||||
from mopidy import audio as audio_lib, backend, exceptions
|
||||
import requests
|
||||
|
||||
from mopidy import audio as audio_lib, backend, exceptions, stream
|
||||
from mopidy.audio import scan, utils
|
||||
from mopidy.internal import http, playlists
|
||||
from mopidy.models import Track
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -19,11 +23,14 @@ class StreamBackend(pykka.ThreadingActor, backend.Backend):
|
||||
def __init__(self, config, audio):
|
||||
super(StreamBackend, self).__init__()
|
||||
|
||||
self._scanner = scan.Scanner(
|
||||
timeout=config['stream']['timeout'],
|
||||
proxy_config=config['proxy'])
|
||||
|
||||
self.library = StreamLibraryProvider(
|
||||
backend=self, timeout=config['stream']['timeout'],
|
||||
blacklist=config['stream']['metadata_blacklist'],
|
||||
proxy=config['proxy'])
|
||||
self.playback = backend.PlaybackProvider(audio=audio, backend=self)
|
||||
backend=self, blacklist=config['stream']['metadata_blacklist'])
|
||||
self.playback = StreamPlaybackProvider(
|
||||
audio=audio, backend=self, config=config)
|
||||
self.playlists = None
|
||||
|
||||
self.uri_schemes = audio_lib.supported_uri_schemes(
|
||||
@ -32,9 +39,9 @@ class StreamBackend(pykka.ThreadingActor, backend.Backend):
|
||||
|
||||
class StreamLibraryProvider(backend.LibraryProvider):
|
||||
|
||||
def __init__(self, backend, timeout, blacklist, proxy):
|
||||
def __init__(self, backend, blacklist):
|
||||
super(StreamLibraryProvider, self).__init__(backend)
|
||||
self._scanner = scan.Scanner(timeout=timeout, proxy_config=proxy)
|
||||
self._scanner = backend._scanner
|
||||
self._blacklist_re = re.compile(
|
||||
r'^(%s)$' % '|'.join(fnmatch.translate(u) for u in blacklist))
|
||||
|
||||
@ -55,3 +62,67 @@ class StreamLibraryProvider(backend.LibraryProvider):
|
||||
track = Track(uri=uri)
|
||||
|
||||
return [track]
|
||||
|
||||
|
||||
class StreamPlaybackProvider(backend.PlaybackProvider):
|
||||
|
||||
def __init__(self, audio, backend, config):
|
||||
super(StreamPlaybackProvider, self).__init__(audio, backend)
|
||||
self._config = config
|
||||
self._scanner = backend._scanner
|
||||
|
||||
def translate_uri(self, uri):
|
||||
try:
|
||||
scan_result = self._scanner.scan(uri)
|
||||
except exceptions.ScannerError as e:
|
||||
logger.warning(
|
||||
'Problem scanning URI %s: %s', uri, e)
|
||||
return None
|
||||
|
||||
if not (scan_result.mime.startswith('text/') or
|
||||
scan_result.mime.startswith('application/')):
|
||||
return uri
|
||||
|
||||
content = self._download(uri)
|
||||
if content is None:
|
||||
return None
|
||||
|
||||
tracks = list(playlists.parse(content))
|
||||
if tracks:
|
||||
# TODO Test streams and return first that seems to be playable
|
||||
return tracks[0]
|
||||
|
||||
def _download(self, uri):
|
||||
timeout = self._config['stream']['timeout'] / 1000.0
|
||||
|
||||
session = http.get_requests_session(
|
||||
proxy_config=self._config['proxy'],
|
||||
user_agent='%s/%s' % (
|
||||
stream.Extension.dist_name, stream.Extension.version))
|
||||
|
||||
try:
|
||||
response = session.get(
|
||||
uri, stream=True, timeout=timeout)
|
||||
except requests.exceptions.Timeout:
|
||||
logger.warning(
|
||||
'Download of stream playlist (%s) failed due to connection '
|
||||
'timeout after %.3fs', uri, timeout)
|
||||
return None
|
||||
|
||||
deadline = time.time() + timeout
|
||||
content = []
|
||||
for chunk in response.iter_content(4096):
|
||||
content.append(chunk)
|
||||
if time.time() > deadline:
|
||||
logger.warning(
|
||||
'Download of stream playlist (%s) failed due to download '
|
||||
'taking more than %.3fs', uri, timeout)
|
||||
return None
|
||||
|
||||
if not response.ok:
|
||||
logger.warning(
|
||||
'Problem downloading stream playlist %s: %s',
|
||||
uri, response.reason)
|
||||
return None
|
||||
|
||||
return b''.join(content)
|
||||
|
||||
3
setup.py
3
setup.py
@ -24,8 +24,9 @@ setup(
|
||||
zip_safe=False,
|
||||
include_package_data=True,
|
||||
install_requires=[
|
||||
'setuptools',
|
||||
'Pykka >= 1.1',
|
||||
'requests',
|
||||
'setuptools',
|
||||
'tornado >= 2.3',
|
||||
],
|
||||
extras_require={'http': []},
|
||||
|
||||
@ -2,28 +2,39 @@
|
||||
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import io
|
||||
import unittest
|
||||
|
||||
from mopidy.audio import playlists
|
||||
import pytest
|
||||
|
||||
from mopidy.internal import playlists
|
||||
|
||||
|
||||
BAD = b'foobarbaz'
|
||||
|
||||
M3U = b"""#EXTM3U
|
||||
EXTM3U = b"""#EXTM3U
|
||||
#EXTINF:123, Sample artist - Sample title
|
||||
file:///tmp/foo
|
||||
#EXTINF:321,Example Artist - Example \xc5\xa7\xc5\x95
|
||||
file:///tmp/bar
|
||||
|
||||
#EXTINF:213,Some Artist - Other title
|
||||
file:///tmp/baz
|
||||
"""
|
||||
|
||||
URILIST = b"""
|
||||
file:///tmp/foo
|
||||
# a comment
|
||||
file:///tmp/bar
|
||||
|
||||
file:///tmp/baz
|
||||
"""
|
||||
|
||||
PLS = b"""[Playlist]
|
||||
NumberOfEntries=3
|
||||
File1=file:///tmp/foo
|
||||
Title1=Sample Title
|
||||
Length1=123
|
||||
|
||||
File2=file:///tmp/bar
|
||||
Title2=Example \xc5\xa7\xc5\x95
|
||||
Length2=321
|
||||
@ -76,14 +87,20 @@ XSPF = b"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
</playlist>
|
||||
"""
|
||||
|
||||
EXPECTED = [b'file:///tmp/foo', b'file:///tmp/bar', b'file:///tmp/baz']
|
||||
|
||||
class TypeFind(object):
|
||||
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
def peek(self, start, end):
|
||||
return self.data[start:end]
|
||||
@pytest.mark.parametrize('data,result', [
|
||||
(BAD, []),
|
||||
(URILIST, EXPECTED),
|
||||
(EXTM3U, EXPECTED),
|
||||
(PLS, EXPECTED),
|
||||
(ASX, EXPECTED),
|
||||
(SIMPLE_ASX, EXPECTED),
|
||||
(XSPF, EXPECTED),
|
||||
])
|
||||
def test_parse(data, result):
|
||||
assert playlists.parse(data) == result
|
||||
|
||||
|
||||
class BasePlaylistTest(object):
|
||||
@ -93,26 +110,25 @@ class BasePlaylistTest(object):
|
||||
parse = None
|
||||
|
||||
def test_detect_valid_header(self):
|
||||
self.assertTrue(self.detect(TypeFind(self.valid)))
|
||||
self.assertTrue(self.detect(self.valid))
|
||||
|
||||
def test_detect_invalid_header(self):
|
||||
self.assertFalse(self.detect(TypeFind(self.invalid)))
|
||||
self.assertFalse(self.detect(self.invalid))
|
||||
|
||||
def test_parse_valid_playlist(self):
|
||||
uris = list(self.parse(io.BytesIO(self.valid)))
|
||||
expected = [b'file:///tmp/foo', b'file:///tmp/bar', b'file:///tmp/baz']
|
||||
self.assertEqual(uris, expected)
|
||||
uris = list(self.parse(self.valid))
|
||||
self.assertEqual(uris, EXPECTED)
|
||||
|
||||
def test_parse_invalid_playlist(self):
|
||||
uris = list(self.parse(io.BytesIO(self.invalid)))
|
||||
uris = list(self.parse(self.invalid))
|
||||
self.assertEqual(uris, [])
|
||||
|
||||
|
||||
class M3uPlaylistTest(BasePlaylistTest, unittest.TestCase):
|
||||
valid = M3U
|
||||
class ExtM3uPlaylistTest(BasePlaylistTest, unittest.TestCase):
|
||||
valid = EXTM3U
|
||||
invalid = BAD
|
||||
detect = staticmethod(playlists.detect_m3u_header)
|
||||
parse = staticmethod(playlists.parse_m3u)
|
||||
detect = staticmethod(playlists.detect_extm3u_header)
|
||||
parse = staticmethod(playlists.parse_extm3u)
|
||||
|
||||
|
||||
class PlsPlaylistTest(BasePlaylistTest, unittest.TestCase):
|
||||
@ -141,3 +157,17 @@ class XspfPlaylistTest(BasePlaylistTest, unittest.TestCase):
|
||||
invalid = BAD
|
||||
detect = staticmethod(playlists.detect_xspf_header)
|
||||
parse = staticmethod(playlists.parse_xspf)
|
||||
|
||||
|
||||
class UriListPlaylistTest(unittest.TestCase):
|
||||
valid = URILIST
|
||||
invalid = BAD
|
||||
parse = staticmethod(playlists.parse_urilist)
|
||||
|
||||
def test_parse_valid_playlist(self):
|
||||
uris = list(self.parse(self.valid))
|
||||
self.assertEqual(uris, EXPECTED)
|
||||
|
||||
def test_parse_invalid_playlist(self):
|
||||
uris = list(self.parse(self.invalid))
|
||||
self.assertEqual(uris, [])
|
||||
@ -1,16 +1,10 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import unittest
|
||||
|
||||
import gobject
|
||||
gobject.threads_init()
|
||||
|
||||
import mock
|
||||
|
||||
import pygst
|
||||
pygst.require('0.10')
|
||||
import gst # noqa: pygst magic is needed to import correct gst
|
||||
import pytest
|
||||
|
||||
from mopidy.audio import scan
|
||||
from mopidy.internal import path
|
||||
from mopidy.models import Track
|
||||
from mopidy.stream import actor
|
||||
@ -18,27 +12,44 @@ from mopidy.stream import actor
|
||||
from tests import path_to_data_dir
|
||||
|
||||
|
||||
class LibraryProviderTest(unittest.TestCase):
|
||||
@pytest.fixture
|
||||
def scanner():
|
||||
return scan.Scanner(timeout=100, proxy_config={})
|
||||
|
||||
def setUp(self): # noqa: N802
|
||||
self.backend = mock.Mock()
|
||||
self.backend.uri_schemes = ['file']
|
||||
self.uri = path.path_to_uri(path_to_data_dir('song1.wav'))
|
||||
|
||||
def test_lookup_ignores_unknown_scheme(self):
|
||||
library = actor.StreamLibraryProvider(self.backend, 1000, [], {})
|
||||
self.assertFalse(library.lookup('http://example.com'))
|
||||
@pytest.fixture
|
||||
def backend(scanner):
|
||||
backend = mock.Mock()
|
||||
backend.uri_schemes = ['file']
|
||||
backend._scanner = scanner
|
||||
return backend
|
||||
|
||||
def test_lookup_respects_blacklist(self):
|
||||
library = actor.StreamLibraryProvider(self.backend, 10, [self.uri], {})
|
||||
self.assertEqual([Track(uri=self.uri)], library.lookup(self.uri))
|
||||
|
||||
def test_lookup_respects_blacklist_globbing(self):
|
||||
blacklist = [path.path_to_uri(path_to_data_dir('')) + '*']
|
||||
library = actor.StreamLibraryProvider(self.backend, 100, blacklist, {})
|
||||
self.assertEqual([Track(uri=self.uri)], library.lookup(self.uri))
|
||||
@pytest.fixture
|
||||
def track_uri():
|
||||
return path.path_to_uri(path_to_data_dir('song1.wav'))
|
||||
|
||||
def test_lookup_converts_uri_metadata_to_track(self):
|
||||
library = actor.StreamLibraryProvider(self.backend, 100, [], {})
|
||||
self.assertEqual([Track(length=4406, uri=self.uri)],
|
||||
library.lookup(self.uri))
|
||||
|
||||
def test_lookup_ignores_unknown_scheme(backend):
|
||||
library = actor.StreamLibraryProvider(backend, [])
|
||||
|
||||
assert library.lookup('http://example.com') == []
|
||||
|
||||
|
||||
def test_lookup_respects_blacklist(backend, track_uri):
|
||||
library = actor.StreamLibraryProvider(backend, [track_uri])
|
||||
|
||||
assert library.lookup(track_uri) == [Track(uri=track_uri)]
|
||||
|
||||
|
||||
def test_lookup_respects_blacklist_globbing(backend, track_uri):
|
||||
blacklist = [path.path_to_uri(path_to_data_dir('')) + '*']
|
||||
library = actor.StreamLibraryProvider(backend, blacklist)
|
||||
|
||||
assert library.lookup(track_uri) == [Track(uri=track_uri)]
|
||||
|
||||
|
||||
def test_lookup_converts_uri_metadata_to_track(backend, track_uri):
|
||||
library = actor.StreamLibraryProvider(backend, [])
|
||||
|
||||
assert library.lookup(track_uri) == [Track(length=4406, uri=track_uri)]
|
||||
|
||||
145
tests/stream/test_playback.py
Normal file
145
tests/stream/test_playback.py
Normal file
@ -0,0 +1,145 @@
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import mock
|
||||
|
||||
import pytest
|
||||
|
||||
import requests
|
||||
|
||||
import responses
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.audio import scan
|
||||
from mopidy.stream import actor
|
||||
|
||||
|
||||
TIMEOUT = 1000
|
||||
URI = 'http://example.com/listen.m3u'
|
||||
BODY = """
|
||||
#EXTM3U
|
||||
http://example.com/stream.mp3
|
||||
http://foo.bar/baz
|
||||
""".strip()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def config():
|
||||
return {
|
||||
'proxy': {},
|
||||
'stream': {
|
||||
'timeout': TIMEOUT,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def audio():
|
||||
return mock.Mock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def scanner():
|
||||
scanner = mock.Mock(spec=scan.Scanner)
|
||||
scanner.scan.return_value.mime = 'text/foo'
|
||||
return scanner
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def backend(scanner):
|
||||
backend = mock.Mock()
|
||||
backend.uri_schemes = ['file']
|
||||
backend._scanner = scanner
|
||||
return backend
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def provider(audio, backend, config):
|
||||
return actor.StreamPlaybackProvider(audio, backend, config)
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_of_audio_stream_returns_same_uri(
|
||||
scanner, provider):
|
||||
|
||||
scanner.scan.return_value.mime = 'audio/ogg'
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
|
||||
scanner.scan.assert_called_once_with(URI)
|
||||
assert result == URI
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_of_playlist_returns_first_uri_in_list(
|
||||
scanner, provider):
|
||||
|
||||
responses.add(
|
||||
responses.GET, URI, body=BODY, content_type='audio/x-mpegurl')
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
|
||||
scanner.scan.assert_called_once_with(URI)
|
||||
assert result == 'http://example.com/stream.mp3'
|
||||
assert responses.calls[0].request.headers['User-Agent'].startswith(
|
||||
'Mopidy-Stream/')
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_of_playlist_with_xml_mimetype(scanner, provider):
|
||||
scanner.scan.return_value.mime = 'application/xspf+xml'
|
||||
responses.add(
|
||||
responses.GET, URI, body=BODY, content_type='application/xspf+xml')
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
|
||||
scanner.scan.assert_called_once_with(URI)
|
||||
assert result == 'http://example.com/stream.mp3'
|
||||
|
||||
|
||||
def test_translate_uri_when_scanner_fails(scanner, provider, caplog):
|
||||
scanner.scan.side_effect = exceptions.ScannerError('foo failed')
|
||||
|
||||
result = provider.translate_uri('bar')
|
||||
|
||||
assert result is None
|
||||
assert 'Problem scanning URI bar: foo failed' in caplog.text()
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_when_playlist_download_fails(provider, caplog):
|
||||
responses.add(responses.GET, URI, body=BODY, status=500)
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
|
||||
assert result is None
|
||||
assert 'Problem downloading stream playlist' in caplog.text()
|
||||
|
||||
|
||||
def test_translate_uri_times_out_if_connection_times_out(provider, caplog):
|
||||
with mock.patch.object(actor.requests, 'Session') as session_mock:
|
||||
get_mock = session_mock.return_value.get
|
||||
get_mock.side_effect = requests.exceptions.Timeout
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
|
||||
get_mock.assert_called_once_with(URI, timeout=1.0, stream=True)
|
||||
assert result is None
|
||||
assert (
|
||||
'Download of stream playlist (%s) failed due to connection '
|
||||
'timeout after 1.000s' % URI in caplog.text())
|
||||
|
||||
|
||||
@responses.activate
|
||||
def test_translate_uri_times_out_if_download_is_slow(provider, caplog):
|
||||
responses.add(
|
||||
responses.GET, URI, body=BODY, content_type='audio/x-mpegurl')
|
||||
|
||||
with mock.patch.object(actor, 'time') as time_mock:
|
||||
time_mock.time.side_effect = [0, TIMEOUT + 1]
|
||||
|
||||
result = provider.translate_uri(URI)
|
||||
|
||||
assert result is None
|
||||
assert (
|
||||
'Download of stream playlist (%s) failed due to download taking '
|
||||
'more than 1.000s' % URI in caplog.text())
|
||||
Loading…
Reference in New Issue
Block a user