m3u: Major refactoring, add default_encoding and default_extension settings.

This commit is contained in:
Thomas Kemmer 2016-01-01 20:27:00 +01:00
parent 811131f716
commit b2d1e1b4f7
9 changed files with 369 additions and 409 deletions

View File

@ -21,10 +21,11 @@ class Extension(ext.Extension):
def get_config_schema(self):
schema = super(Extension, self).get_config_schema()
schema['default_encoding'] = config.String()
schema['default_extension'] = config.String(choices=['.m3u', '.m3u8'])
schema['playlists_dir'] = config.Path(optional=True)
return schema
def setup(self, registry):
from .actor import M3UBackend
from .backend import M3UBackend
registry.add('backend', M3UBackend)

View File

@ -1,36 +0,0 @@
from __future__ import absolute_import, unicode_literals
import logging
import pykka
from mopidy import backend, m3u
from mopidy.internal import encoding, path
from mopidy.m3u.library import M3ULibraryProvider
from mopidy.m3u.playlists import M3UPlaylistsProvider
logger = logging.getLogger(__name__)
class M3UBackend(pykka.ThreadingActor, backend.Backend):
uri_schemes = ['m3u']
def __init__(self, config, audio):
super(M3UBackend, self).__init__()
self._config = config
if config['m3u']['playlists_dir'] is not None:
self._playlists_dir = config['m3u']['playlists_dir']
try:
path.get_or_create_dir(self._playlists_dir)
except EnvironmentError as error:
logger.warning(
'Could not create M3U playlists dir: %s',
encoding.locale_decode(error))
else:
self._playlists_dir = m3u.Extension.get_data_dir(config)
self.playlists = M3UPlaylistsProvider(backend=self)
self.library = M3ULibraryProvider(backend=self)

15
mopidy/m3u/backend.py Normal file
View File

@ -0,0 +1,15 @@
from __future__ import absolute_import, unicode_literals
import pykka
from mopidy import backend
from . import playlists
class M3UBackend(pykka.ThreadingActor, backend.Backend):
uri_schemes = ['m3u']
def __init__(self, config, audio):
super(M3UBackend, self).__init__()
self.playlists = playlists.M3UPlaylistsProvider(self, config)

View File

@ -1,3 +1,8 @@
[m3u]
enabled = true
default_encoding = latin-1
default_extension = .m3u
playlists_dir =

View File

@ -1,19 +0,0 @@
from __future__ import absolute_import, unicode_literals
import logging
from mopidy import backend
logger = logging.getLogger(__name__)
class M3ULibraryProvider(backend.LibraryProvider):
"""Library for looking up M3U playlists."""
def __init__(self, backend):
super(M3ULibraryProvider, self).__init__(backend)
def lookup(self, uri):
# TODO Lookup tracks in M3U playlist
return []

View File

@ -1,117 +1,117 @@
from __future__ import absolute_import, division, unicode_literals
from __future__ import absolute_import, unicode_literals
import glob
import io
import locale
import logging
import operator
import os
import re
import sys
from mopidy import backend
from mopidy.m3u import translator
from mopidy.models import Playlist, Ref
from . import Extension, translator
logger = logging.getLogger(__name__)
def log_environment_error(message, error):
if isinstance(error.strerror, bytes):
strerror = error.strerror.decode(locale.getpreferredencoding())
else:
strerror = error.strerror
logger.error('%s: %s', message, strerror)
class M3UPlaylistsProvider(backend.PlaylistsProvider):
# TODO: currently this only handles UNIX file systems
_invalid_filename_chars = re.compile(r'[/]')
def __init__(self, backend, config):
super(M3UPlaylistsProvider, self).__init__(backend)
def __init__(self, *args, **kwargs):
super(M3UPlaylistsProvider, self).__init__(*args, **kwargs)
self._playlists_dir = self.backend._playlists_dir
self._playlists = {}
self.refresh()
ext_config = config[Extension.ext_name]
if ext_config['playlists_dir'] is None:
self._playlists_dir = Extension.get_data_dir(config)
else:
self._playlists_dir = ext_config['playlists_dir']
self._default_encoding = ext_config['default_encoding']
self._default_extension = ext_config['default_extension']
def as_list(self):
refs = [
Ref.playlist(uri=pl.uri, name=pl.name)
for pl in self._playlists.values()]
return sorted(refs, key=operator.attrgetter('name'))
def get_items(self, uri):
playlist = self._playlists.get(uri)
if playlist is None:
return None
return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks]
result = []
for entry in os.listdir(self._playlists_dir):
if not entry.endswith((b'.m3u', b'.m3u8')):
continue
elif not os.path.isfile(self._abspath(entry)):
continue
else:
result.append(translator.path_to_ref(entry))
result.sort(key=operator.attrgetter('name'))
return result
def create(self, name):
playlist = self._save_m3u(Playlist(name=name))
self._playlists[playlist.uri] = playlist
logger.info('Created playlist %s', playlist.uri)
return playlist
path = translator.path_from_name(name.strip(), self._default_extension)
try:
with self._open(path, 'w'):
pass
mtime = os.path.getmtime(self._abspath(path))
except EnvironmentError as e:
log_environment_error('Error creating playlist %s' % name, e)
else:
return translator.playlist(path, [], mtime)
def delete(self, uri):
if uri in self._playlists:
path = translator.playlist_uri_to_path(uri, self._playlists_dir)
if os.path.exists(path):
os.remove(path)
else:
logger.warning(
'Trying to delete missing playlist file %s', path)
del self._playlists[uri]
logger.info('Deleted playlist %s', uri)
path = translator.uri_to_path(uri)
try:
os.remove(self._abspath(path))
except EnvironmentError as e:
log_environment_error('Error deleting playlist %s' % uri, e)
def get_items(self, uri):
path = translator.uri_to_path(uri)
try:
with self._open(path, 'r') as fp:
items = translator.load_items(fp, self._playlists_dir)
except EnvironmentError as e:
log_environment_error('Error reading playlist %s' % uri, e)
else:
logger.warning('Trying to delete unknown playlist %s', uri)
return items
def lookup(self, uri):
return self._playlists.get(uri)
path = translator.uri_to_path(uri)
try:
with self._open(path, 'r') as fp:
items = translator.load_items(fp, self._playlists_dir)
mtime = os.path.getmtime(self._abspath(path))
except EnvironmentError as e:
log_environment_error('Error reading playlist %s' % uri, e)
else:
return translator.playlist(path, items, mtime)
def refresh(self):
playlists = {}
encoding = sys.getfilesystemencoding()
for path in glob.glob(os.path.join(self._playlists_dir, b'*.m3u*')):
relpath = os.path.basename(path)
uri = translator.path_to_playlist_uri(relpath)
name = os.path.splitext(relpath)[0].decode(encoding, 'replace')
tracks = translator.parse_m3u(path)
playlists[uri] = Playlist(uri=uri, name=name, tracks=tracks)
self._playlists = playlists
logger.info(
'Loaded %d M3U playlists from %s',
len(playlists), self._playlists_dir)
# TODO Trigger playlists_loaded event?
pass # nothing to do
def save(self, playlist):
assert playlist.uri, 'Cannot save playlist without URI'
assert playlist.uri in self._playlists, \
'Cannot save playlist with unknown URI: %s' % playlist.uri
original_uri = playlist.uri
playlist = self._save_m3u(playlist)
if playlist.uri != original_uri and original_uri in self._playlists:
self.delete(original_uri)
self._playlists[playlist.uri] = playlist
return playlist
def _sanitize_m3u_name(self, name, encoding=sys.getfilesystemencoding()):
name = self._invalid_filename_chars.sub('|', name.strip())
# make sure we end up with a valid path segment
name = name.encode(encoding, errors='replace')
name = os.path.basename(name) # paranoia?
name = name.decode(encoding)
return name
def _save_m3u(self, playlist, encoding=sys.getfilesystemencoding()):
if playlist.name:
name = self._sanitize_m3u_name(playlist.name, encoding)
uri = translator.path_to_playlist_uri(
name.encode(encoding) + b'.m3u')
path = translator.playlist_uri_to_path(uri, self._playlists_dir)
elif playlist.uri:
uri = playlist.uri
path = translator.playlist_uri_to_path(uri, self._playlists_dir)
name, _ = os.path.splitext(os.path.basename(path).decode(encoding))
path = translator.uri_to_path(playlist.uri)
name = translator.name_from_path(path)
try:
with self._open(path, 'w') as fp:
translator.dump_items(playlist.tracks, fp)
if playlist.name and playlist.name != name:
opath, ext = os.path.splitext(path)
path = translator.path_from_name(playlist.name.strip()) + ext
os.rename(self._abspath(opath + ext), self._abspath(path))
mtime = os.path.getmtime(self._abspath(path))
except EnvironmentError as e:
log_environment_error('Error saving playlist %s' % playlist.uri, e)
else:
raise ValueError('M3U playlist needs name or URI')
translator.save_m3u(path, playlist.tracks, 'latin1')
# assert playlist name matches file name/uri
return playlist.replace(uri=uri, name=name)
return translator.playlist(path, playlist.tracks, mtime)
def _abspath(self, path):
return os.path.join(self._playlists_dir, path)
def _open(self, path, mode='r'):
if path.endswith(b'.m3u8'):
encoding = 'utf-8'
else:
encoding = self._default_encoding
if not os.path.isabs(path):
path = os.path.join(self._playlists_dir, path)
return io.open(path, mode, encoding=encoding, errors='replace')

View File

@ -1,130 +1,119 @@
from __future__ import absolute_import, unicode_literals
from __future__ import absolute_import, print_function, unicode_literals
import codecs
import logging
import os
import re
from mopidy import compat
from mopidy.compat import urllib
from mopidy.internal import encoding, path
from mopidy.models import Track
from mopidy import models
from . import Extension
try:
from urllib.parse import quote_from_bytes, unquote_to_bytes
except ImportError:
import urllib
def quote_from_bytes(bytes, safe=b'/'):
# Python 3 returns Unicode string
return urllib.quote(bytes, safe).decode('utf-8')
def unquote_to_bytes(string):
if isinstance(string, bytes):
return urllib.unquote(string)
else:
return urllib.unquote(string.encode('utf-8'))
try:
from urllib.parse import urlsplit, urlunsplit
except ImportError:
from urlparse import urlsplit, urlunsplit
M3U_EXTINF_RE = re.compile(r'#EXTINF:(-1|\d+),(.*)')
try:
from os import fsencode, fsdecode
except ImportError:
import sys
logger = logging.getLogger(__name__)
# no 'surrogateescape' in Python 2; 'replace' for backward compatibility
def fsencode(filename, encoding=sys.getfilesystemencoding()):
return filename.encode(encoding, 'replace')
def fsdecode(filename, encoding=sys.getfilesystemencoding()):
return filename.decode(encoding, 'replace')
def playlist_uri_to_path(uri, playlists_dir):
if not uri.startswith('m3u:'):
raise ValueError('Invalid URI %s' % uri)
file_path = path.uri_to_path(uri)
return os.path.join(playlists_dir, file_path)
def path_to_uri(path, scheme=Extension.ext_name):
"""Convert file path to URI."""
assert isinstance(path, bytes), 'Mopidy paths should be bytes'
uripath = quote_from_bytes(os.path.normpath(path))
return urlunsplit((scheme, None, uripath, None, None))
def path_to_playlist_uri(relpath):
"""Convert path relative to playlists_dir to M3U URI."""
if isinstance(relpath, compat.text_type):
relpath = relpath.encode('utf-8')
return b'm3u:%s' % urllib.parse.quote(relpath)
def uri_to_path(uri):
"""Convert URI to file path."""
# TODO: decide on Unicode vs. bytes for URIs
return unquote_to_bytes(urlsplit(uri).path)
def m3u_extinf_to_track(line):
"""Convert extended M3U directive to track template."""
m = M3U_EXTINF_RE.match(line)
if not m:
logger.warning('Invalid extended M3U directive: %s', line)
return Track()
(runtime, title) = m.groups()
if int(runtime) > 0:
return Track(name=title, length=1000 * int(runtime))
else:
return Track(name=title)
def parse_m3u(file_path, media_dir=None):
r"""
Convert M3U file list to list of tracks
Example M3U data::
# This is a comment
Alternative\Band - Song.mp3
Classical\Other Band - New Song.mp3
Stuff.mp3
D:\More Music\Foo.mp3
http://www.example.com:8000/Listen.pls
http://www.example.com/~user/Mine.mp3
Example extended M3U data::
#EXTM3U
#EXTINF:123, Sample artist - Sample title
Sample.mp3
#EXTINF:321,Example Artist - Example title
Greatest Hits\Example.ogg
#EXTINF:-1,Radio XMP
http://mp3stream.example.com:8000/
- Relative paths of songs should be with respect to location of M3U.
- Paths are normally platform specific.
- Lines starting with # are ignored, except for extended M3U directives.
- Track.name and Track.length are set from extended M3U directives.
- m3u files are latin-1.
- m3u8 files are utf-8
"""
# TODO: uris as bytes
file_encoding = 'utf-8' if file_path.endswith(b'.m3u8') else 'latin1'
tracks = []
def name_from_path(path):
"""Extract name from file path."""
name, _ = os.path.splitext(os.path.basename(path))
try:
with codecs.open(file_path, 'rb', file_encoding, 'replace') as m3u:
contents = m3u.readlines()
except IOError as error:
logger.warning('Couldn\'t open m3u: %s', encoding.locale_decode(error))
return tracks
return fsdecode(name)
except UnicodeError:
return None
if not contents:
return tracks
# Strip newlines left by codecs
contents = [line.strip() for line in contents]
def path_from_name(name, ext=None, sep='|'):
"""Convert name with optional extension to file path."""
if ext:
return fsencode(name.replace(os.sep, sep) + ext)
else:
return fsencode(name.replace(os.sep, sep))
extended = contents[0].startswith('#EXTM3U')
track = Track()
for line in contents:
def path_to_ref(path):
return models.Ref.playlist(
uri=path_to_uri(path),
name=name_from_path(path)
)
def load_items(fp, basedir):
refs = []
name = None
for line in filter(None, (line.strip() for line in fp)):
if line.startswith('#'):
if extended and line.startswith('#EXTINF'):
track = m3u_extinf_to_track(line)
if line.startswith('#EXTINF:'):
name = line.partition(',')[2]
continue
if not track.name:
name = os.path.basename(os.path.splitext(line)[0])
track = track.replace(name=urllib.parse.unquote(name))
if urllib.parse.urlsplit(line).scheme:
tracks.append(track.replace(uri=line))
elif os.path.normpath(line) == os.path.abspath(line):
uri = path.path_to_uri(line)
tracks.append(track.replace(uri=uri))
elif media_dir is not None:
uri = path.path_to_uri(os.path.join(media_dir, line))
tracks.append(track.replace(uri=uri))
track = Track()
return tracks
elif not urlsplit(line).scheme:
path = os.path.join(basedir, fsencode(line))
if not name:
name = name_from_path(path)
uri = path_to_uri(path, scheme='file')
else:
uri = line # do *not* extract name from (stream?) URI path
refs.append(models.Ref.track(uri=uri, name=name))
name = None
return refs
def save_m3u(filename, tracks, encoding='latin1', errors='replace'):
extended = any(track.name for track in tracks)
# codecs.open() always uses binary mode, just being explicit here
with codecs.open(filename, 'wb', encoding, errors) as m3u:
if extended:
m3u.write('#EXTM3U' + os.linesep)
for track in tracks:
if extended and track.name:
m3u.write('#EXTINF:%d,%s%s' % (
track.length // 1000 if track.length else -1,
track.name,
os.linesep))
m3u.write(track.uri + os.linesep)
def dump_items(items, fp):
if any(item.name for item in items):
print('#EXTM3U', file=fp)
for item in items:
if item.name:
print('#EXTINF:-1,%s' % item.name, file=fp)
# TODO: convert file URIs to (relative) paths?
if isinstance(item.uri, bytes):
print(item.uri.decode('utf-8'), file=fp)
else:
print(item.uri, file=fp)
def playlist(path, items=[], mtime=None):
return models.Playlist(
uri=path_to_uri(path),
name=name_from_path(path),
tracks=[models.Track(uri=item.uri, name=item.name) for item in items],
last_modified=(int(mtime * 1000) if mtime else None)
)

View File

@ -7,14 +7,12 @@ import platform
import shutil
import tempfile
import unittest
import urllib
import pykka
from mopidy import core
from mopidy.internal import deprecation
from mopidy.m3u import actor
from mopidy.m3u.translator import playlist_uri_to_path
from mopidy.m3u.backend import M3UBackend
from mopidy.models import Playlist, Track
from tests import dummy_audio, path_to_data_dir
@ -22,9 +20,12 @@ from tests.m3u import generate_song
class M3UPlaylistsProviderTest(unittest.TestCase):
backend_class = actor.M3UBackend
backend_class = M3UBackend
config = {
'm3u': {
'enabled': True,
'default_encoding': 'latin-1',
'default_extension': '.m3u',
'playlists_dir': path_to_data_dir(''),
}
}
@ -34,7 +35,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
self.playlists_dir = self.config['m3u']['playlists_dir']
audio = dummy_audio.create_proxy()
backend = actor.M3UBackend.start(
backend = M3UBackend.start(
config=self.config, audio=audio).proxy()
self.core = core.Core(backends=[backend])
@ -46,7 +47,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
def test_created_playlist_is_persisted(self):
uri = 'm3u:test.m3u'
path = playlist_uri_to_path(uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'test.m3u')
self.assertFalse(os.path.exists(path))
playlist = self.core.playlists.create('test')
@ -57,7 +58,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
def test_create_sanitizes_playlist_name(self):
playlist = self.core.playlists.create(' ../../test FOO baR ')
self.assertEqual('..|..|test FOO baR', playlist.name)
path = playlist_uri_to_path(playlist.uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'..|..|test FOO baR.m3u')
self.assertEqual(self.playlists_dir, os.path.dirname(path))
self.assertTrue(os.path.exists(path))
@ -65,8 +66,8 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
uri1 = 'm3u:test1.m3u'
uri2 = 'm3u:test2.m3u'
path1 = playlist_uri_to_path(uri1, self.playlists_dir)
path2 = playlist_uri_to_path(uri2, self.playlists_dir)
path1 = os.path.join(self.playlists_dir, b'test1.m3u')
path2 = os.path.join(self.playlists_dir, b'test2.m3u')
playlist = self.core.playlists.create('test1')
self.assertEqual('test1', playlist.name)
@ -82,7 +83,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
def test_deleted_playlist_is_removed(self):
uri = 'm3u:test.m3u'
path = playlist_uri_to_path(uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'test.m3u')
self.assertFalse(os.path.exists(path))
@ -98,7 +99,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
track = Track(uri=generate_song(1))
playlist = self.core.playlists.create('test')
playlist = self.core.playlists.save(playlist.replace(tracks=[track]))
path = playlist_uri_to_path(playlist.uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'test.m3u')
with open(path) as f:
contents = f.read()
@ -109,32 +110,32 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
track = Track(uri=generate_song(1), name='Test', length=60000)
playlist = self.core.playlists.create('test')
playlist = self.core.playlists.save(playlist.replace(tracks=[track]))
path = playlist_uri_to_path(playlist.uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'test.m3u')
with open(path) as f:
m3u = f.read().splitlines()
self.assertEqual(['#EXTM3U', '#EXTINF:60,Test', track.uri], m3u)
self.assertEqual(['#EXTM3U', '#EXTINF:-1,Test', track.uri], m3u)
def test_latin1_playlist_contents_is_written_to_disk(self):
track = Track(uri=generate_song(1), name='Test\x9f', length=60000)
playlist = self.core.playlists.create('test')
playlist = self.core.playlists.save(playlist.copy(tracks=[track]))
path = playlist_uri_to_path(playlist.uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'test.m3u')
with open(path, 'rb') as f:
m3u = f.read().splitlines()
self.assertEqual([b'#EXTM3U', b'#EXTINF:60,Test\x9f', track.uri], m3u)
self.assertEqual([b'#EXTM3U', b'#EXTINF:-1,Test\x9f', track.uri], m3u)
def test_utf8_playlist_contents_is_replaced_and_written_to_disk(self):
track = Track(uri=generate_song(1), name='Test\u07b4', length=60000)
playlist = self.core.playlists.create('test')
playlist = self.core.playlists.save(playlist.copy(tracks=[track]))
path = playlist_uri_to_path(playlist.uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'test.m3u')
with open(path, 'rb') as f:
m3u = f.read().splitlines()
self.assertEqual([b'#EXTM3U', b'#EXTINF:60,Test?', track.uri], m3u)
self.assertEqual([b'#EXTM3U', b'#EXTINF:-1,Test?', track.uri], m3u)
def test_playlists_are_loaded_at_startup(self):
track = Track(uri='dummy:track:path2')
@ -149,8 +150,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
self.assertEqual(track.uri, result.tracks[0].uri)
def test_load_playlist_with_nonfilesystem_encoding_of_filename(self):
uri = 'm3u:%s.m3u' % urllib.quote('øæå'.encode('latin-1'))
path = playlist_uri_to_path(uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, 'øæå.m3u'.encode('latin-1'))
with open(path, 'wb+') as f:
f.write(b'#EXTM3U\n')
@ -198,7 +198,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
playlist = self.core.playlists.create('test')
self.assertEqual(playlist, self.core.playlists.lookup(playlist.uri))
path = playlist_uri_to_path(playlist.uri, self.playlists_dir)
path = os.path.join(self.playlists_dir, b'test.m3u')
self.assertTrue(os.path.exists(path))
os.remove(path)
@ -245,12 +245,9 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
def test_save_playlist_with_new_uri(self):
uri = 'm3u:test.m3u'
with self.assertRaises(AssertionError):
self.core.playlists.save(Playlist(uri=uri))
path = playlist_uri_to_path(uri, self.playlists_dir)
self.assertFalse(os.path.exists(path))
self.core.playlists.save(Playlist(uri=uri))
path = os.path.join(self.playlists_dir, b'test.m3u')
self.assertTrue(os.path.exists(path))
def test_playlist_with_unknown_track(self):
track = Track(uri='file:///dev/null')

View File

@ -2,137 +2,145 @@
from __future__ import absolute_import, unicode_literals
import os
import tempfile
import unittest
import io
from mopidy.internal import path
from mopidy.m3u import translator
from mopidy.models import Track
from tests import path_to_data_dir
data_dir = path_to_data_dir('')
song1_path = path_to_data_dir('song1.mp3')
song2_path = path_to_data_dir('song2.mp3')
song3_path = path_to_data_dir('φοο.mp3')
encoded_path = path_to_data_dir('æøå.mp3')
song1_uri = path.path_to_uri(song1_path)
song2_uri = path.path_to_uri(song2_path)
song3_uri = path.path_to_uri(song3_path)
song4_uri = 'http://example.com/foo%20bar.mp3'
encoded_uri = path.path_to_uri(encoded_path)
song1_track = Track(name='song1', uri=song1_uri)
song2_track = Track(name='song2', uri=song2_uri)
song3_track = Track(name='φοο', uri=song3_uri)
song4_track = Track(name='foo bar', uri=song4_uri)
encoded_track = Track(name='æøå', uri=encoded_uri)
song1_ext_track = song1_track.replace(name='Song #1')
song2_ext_track = song2_track.replace(name='Song #2', length=60000)
encoded_ext_track = encoded_track.replace(name='æøå')
from mopidy.models import Playlist, Ref, Track
# FIXME use mock instead of tempfile.NamedTemporaryFile
class M3UToUriTest(unittest.TestCase):
def parse(self, name):
return translator.parse_m3u(name, data_dir)
def test_empty_file(self):
tracks = self.parse(path_to_data_dir('empty.m3u'))
self.assertEqual([], tracks)
def test_basic_file(self):
tracks = self.parse(path_to_data_dir('one.m3u'))
self.assertEqual([song1_track], tracks)
def test_file_with_comment(self):
tracks = self.parse(path_to_data_dir('comment.m3u'))
self.assertEqual([song1_track], tracks)
def test_file_is_relative_to_correct_dir(self):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write('song1.mp3')
try:
tracks = self.parse(tmp.name)
self.assertEqual([song1_track], tracks)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
def test_file_with_absolute_files(self):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(song1_path)
try:
tracks = self.parse(tmp.name)
self.assertEqual([song1_track], tracks)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
def test_file_with_multiple_absolute_files(self):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(song1_path + '\n')
tmp.write('# comment \n')
tmp.write(song2_path)
try:
tracks = self.parse(tmp.name)
self.assertEqual([song1_track, song2_track], tracks)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
def test_file_with_uri(self):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(song1_uri)
tmp.write('\n')
tmp.write(song4_uri)
try:
tracks = self.parse(tmp.name)
self.assertEqual([song1_track, song4_track], tracks)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
def test_encoding_is_latin1(self):
tracks = self.parse(path_to_data_dir('encoding.m3u'))
self.assertEqual([encoded_track], tracks)
def test_open_missing_file(self):
tracks = self.parse(path_to_data_dir('non-existant.m3u'))
self.assertEqual([], tracks)
def test_empty_ext_file(self):
tracks = self.parse(path_to_data_dir('empty-ext.m3u'))
self.assertEqual([], tracks)
def test_basic_ext_file(self):
tracks = self.parse(path_to_data_dir('one-ext.m3u'))
self.assertEqual([song1_ext_track], tracks)
def test_multi_ext_file(self):
tracks = self.parse(path_to_data_dir('two-ext.m3u'))
self.assertEqual([song1_ext_track, song2_ext_track], tracks)
def test_ext_file_with_comment(self):
tracks = self.parse(path_to_data_dir('comment-ext.m3u'))
self.assertEqual([song1_ext_track], tracks)
def test_ext_encoding_is_latin1(self):
tracks = self.parse(path_to_data_dir('encoding-ext.m3u'))
self.assertEqual([encoded_ext_track], tracks)
def test_m3u8_file(self):
with tempfile.NamedTemporaryFile(suffix='.m3u8', delete=False) as tmp:
tmp.write(song3_path)
try:
tracks = self.parse(tmp.name)
self.assertEqual([song3_track], tracks)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
def loads(s, basedir=b'.'):
return translator.load_items(io.StringIO(s), basedir=basedir)
class URItoM3UTest(unittest.TestCase):
pass
def dumps(items):
fp = io.StringIO()
translator.dump_items(items, fp)
return fp.getvalue()
def test_path_to_uri():
from mopidy.m3u.translator import path_to_uri
assert path_to_uri(b'test') == 'm3u:test'
assert path_to_uri(b'test.m3u') == 'm3u:test.m3u'
assert path_to_uri(b'./test.m3u') == 'm3u:test.m3u'
assert path_to_uri(b'foo/../test.m3u') == 'm3u:test.m3u'
assert path_to_uri(b'Test Playlist.m3u') == 'm3u:Test%20Playlist.m3u'
assert path_to_uri(b'test.mp3', scheme='file') == 'file:///test.mp3'
def test_latin1_path_to_uri():
path = 'æøå.m3u'.encode('latin-1')
assert translator.path_to_uri(path) == 'm3u:%E6%F8%E5.m3u'
def test_utf8_path_to_uri():
path = 'æøå.m3u'.encode('utf-8')
assert translator.path_to_uri(path) == 'm3u:%C3%A6%C3%B8%C3%A5.m3u'
def test_uri_to_path():
from mopidy.m3u.translator import uri_to_path
assert uri_to_path('m3u:test.m3u') == b'test.m3u'
assert uri_to_path(b'm3u:test.m3u') == b'test.m3u'
assert uri_to_path('m3u:Test%20Playlist.m3u') == b'Test Playlist.m3u'
assert uri_to_path(b'm3u:Test%20Playlist.m3u') == b'Test Playlist.m3u'
assert uri_to_path('m3u:%E6%F8%E5.m3u') == b'\xe6\xf8\xe5.m3u'
assert uri_to_path(b'm3u:%E6%F8%E5.m3u') == b'\xe6\xf8\xe5.m3u'
assert uri_to_path('file:///test.mp3') == b'/test.mp3'
assert uri_to_path(b'file:///test.mp3') == b'/test.mp3'
def test_name_from_path():
from mopidy.m3u.translator import name_from_path
assert name_from_path(b'test') == 'test'
assert name_from_path(b'test.m3u') == 'test'
assert name_from_path(b'../test.m3u') == 'test'
def test_path_from_name():
from mopidy.m3u.translator import path_from_name
assert path_from_name('test') == b'test'
assert path_from_name('test', '.m3u') == b'test.m3u'
assert path_from_name('foo/bar', sep='-') == b'foo-bar'
def test_path_to_ref():
from mopidy.m3u.translator import path_to_ref
assert path_to_ref(b'test.m3u') == Ref.playlist(
uri='m3u:test.m3u', name='test'
)
assert path_to_ref(b'Test Playlist.m3u') == Ref.playlist(
uri='m3u:Test%20Playlist.m3u', name='Test Playlist'
)
def test_load_items():
assert loads('') == []
assert loads('test.mp3', basedir=b'/playlists') == [
Ref.track(uri='file:///playlists/test.mp3', name='test')
]
assert loads('../test.mp3', basedir=b'/playlists') == [
Ref.track(uri='file:///test.mp3', name='test')
]
assert loads('/test.mp3') == [
Ref.track(uri='file:///test.mp3', name='test')
]
assert loads('file:///test.mp3') == [
Ref.track(uri='file:///test.mp3')
]
assert loads('http://example.com/stream') == [
Ref.track(uri='http://example.com/stream')
]
assert loads('#EXTM3U\n#EXTINF:42,Test\nfile:///test.mp3\n') == [
Ref.track(uri='file:///test.mp3', name='Test')
]
assert loads('#EXTM3U\n#EXTINF:-1,Test\nhttp://example.com/stream\n') == [
Ref.track(uri='http://example.com/stream', name='Test')
]
def test_dump_items():
assert dumps([]) == ''
assert dumps([Ref.track(uri='file:///test.mp3')]) == (
'file:///test.mp3\n'
)
assert dumps([Ref.track(uri='file:///test.mp3', name='test')]) == (
'#EXTM3U\n'
'#EXTINF:-1,test\n'
'file:///test.mp3\n'
)
assert dumps([Track(uri='file:///test.mp3', name='test', length=42)]) == (
'#EXTM3U\n'
'#EXTINF:-1,test\n'
'file:///test.mp3\n'
)
assert dumps([Track(uri='http://example.com/stream')]) == (
'http://example.com/stream\n'
)
assert dumps([Track(uri='http://example.com/stream', name='Test')]) == (
'#EXTM3U\n'
'#EXTINF:-1,Test\n'
'http://example.com/stream\n'
)
def test_playlist():
from mopidy.m3u.translator import playlist
assert playlist(b'test.m3u') == Playlist(
uri='m3u:test.m3u',
name='test'
)
assert playlist(b'test.m3u', [Ref(uri='file:///test.mp3')], 1) == Playlist(
uri='m3u:test.m3u',
name='test',
tracks=[Track(uri='file:///test.mp3')],
last_modified=1000
)