diff --git a/mopidy/m3u/__init__.py b/mopidy/m3u/__init__.py index 06825932..df769c88 100644 --- a/mopidy/m3u/__init__.py +++ b/mopidy/m3u/__init__.py @@ -21,10 +21,11 @@ class Extension(ext.Extension): def get_config_schema(self): schema = super(Extension, self).get_config_schema() + schema['default_encoding'] = config.String() + schema['default_extension'] = config.String(choices=['.m3u', '.m3u8']) schema['playlists_dir'] = config.Path(optional=True) return schema def setup(self, registry): - from .actor import M3UBackend - + from .backend import M3UBackend registry.add('backend', M3UBackend) diff --git a/mopidy/m3u/actor.py b/mopidy/m3u/actor.py deleted file mode 100644 index 55257f87..00000000 --- a/mopidy/m3u/actor.py +++ /dev/null @@ -1,36 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -import logging - -import pykka - -from mopidy import backend, m3u -from mopidy.internal import encoding, path -from mopidy.m3u.library import M3ULibraryProvider -from mopidy.m3u.playlists import M3UPlaylistsProvider - - -logger = logging.getLogger(__name__) - - -class M3UBackend(pykka.ThreadingActor, backend.Backend): - uri_schemes = ['m3u'] - - def __init__(self, config, audio): - super(M3UBackend, self).__init__() - - self._config = config - - if config['m3u']['playlists_dir'] is not None: - self._playlists_dir = config['m3u']['playlists_dir'] - try: - path.get_or_create_dir(self._playlists_dir) - except EnvironmentError as error: - logger.warning( - 'Could not create M3U playlists dir: %s', - encoding.locale_decode(error)) - else: - self._playlists_dir = m3u.Extension.get_data_dir(config) - - self.playlists = M3UPlaylistsProvider(backend=self) - self.library = M3ULibraryProvider(backend=self) diff --git a/mopidy/m3u/backend.py b/mopidy/m3u/backend.py new file mode 100644 index 00000000..02719cc7 --- /dev/null +++ b/mopidy/m3u/backend.py @@ -0,0 +1,15 @@ +from __future__ import absolute_import, unicode_literals + +import pykka + +from mopidy import backend + +from . import playlists + + +class M3UBackend(pykka.ThreadingActor, backend.Backend): + uri_schemes = ['m3u'] + + def __init__(self, config, audio): + super(M3UBackend, self).__init__() + self.playlists = playlists.M3UPlaylistsProvider(self, config) diff --git a/mopidy/m3u/ext.conf b/mopidy/m3u/ext.conf index adc0d00a..e4e68ff8 100644 --- a/mopidy/m3u/ext.conf +++ b/mopidy/m3u/ext.conf @@ -1,3 +1,8 @@ [m3u] enabled = true + +default_encoding = latin-1 + +default_extension = .m3u + playlists_dir = diff --git a/mopidy/m3u/library.py b/mopidy/m3u/library.py deleted file mode 100644 index 291a6194..00000000 --- a/mopidy/m3u/library.py +++ /dev/null @@ -1,19 +0,0 @@ -from __future__ import absolute_import, unicode_literals - -import logging - -from mopidy import backend - -logger = logging.getLogger(__name__) - - -class M3ULibraryProvider(backend.LibraryProvider): - - """Library for looking up M3U playlists.""" - - def __init__(self, backend): - super(M3ULibraryProvider, self).__init__(backend) - - def lookup(self, uri): - # TODO Lookup tracks in M3U playlist - return [] diff --git a/mopidy/m3u/playlists.py b/mopidy/m3u/playlists.py index 3567f8aa..e2b35d1d 100644 --- a/mopidy/m3u/playlists.py +++ b/mopidy/m3u/playlists.py @@ -1,117 +1,117 @@ -from __future__ import absolute_import, division, unicode_literals +from __future__ import absolute_import, unicode_literals -import glob +import io +import locale import logging import operator import os -import re -import sys from mopidy import backend -from mopidy.m3u import translator -from mopidy.models import Playlist, Ref +from . import Extension, translator logger = logging.getLogger(__name__) +def log_environment_error(message, error): + if isinstance(error.strerror, bytes): + strerror = error.strerror.decode(locale.getpreferredencoding()) + else: + strerror = error.strerror + logger.error('%s: %s', message, strerror) + + class M3UPlaylistsProvider(backend.PlaylistsProvider): - # TODO: currently this only handles UNIX file systems - _invalid_filename_chars = re.compile(r'[/]') + def __init__(self, backend, config): + super(M3UPlaylistsProvider, self).__init__(backend) - def __init__(self, *args, **kwargs): - super(M3UPlaylistsProvider, self).__init__(*args, **kwargs) - - self._playlists_dir = self.backend._playlists_dir - self._playlists = {} - self.refresh() + ext_config = config[Extension.ext_name] + if ext_config['playlists_dir'] is None: + self._playlists_dir = Extension.get_data_dir(config) + else: + self._playlists_dir = ext_config['playlists_dir'] + self._default_encoding = ext_config['default_encoding'] + self._default_extension = ext_config['default_extension'] def as_list(self): - refs = [ - Ref.playlist(uri=pl.uri, name=pl.name) - for pl in self._playlists.values()] - return sorted(refs, key=operator.attrgetter('name')) - - def get_items(self, uri): - playlist = self._playlists.get(uri) - if playlist is None: - return None - return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks] + result = [] + for entry in os.listdir(self._playlists_dir): + if not entry.endswith((b'.m3u', b'.m3u8')): + continue + elif not os.path.isfile(self._abspath(entry)): + continue + else: + result.append(translator.path_to_ref(entry)) + result.sort(key=operator.attrgetter('name')) + return result def create(self, name): - playlist = self._save_m3u(Playlist(name=name)) - self._playlists[playlist.uri] = playlist - logger.info('Created playlist %s', playlist.uri) - return playlist + path = translator.path_from_name(name.strip(), self._default_extension) + try: + with self._open(path, 'w'): + pass + mtime = os.path.getmtime(self._abspath(path)) + except EnvironmentError as e: + log_environment_error('Error creating playlist %s' % name, e) + else: + return translator.playlist(path, [], mtime) def delete(self, uri): - if uri in self._playlists: - path = translator.playlist_uri_to_path(uri, self._playlists_dir) - if os.path.exists(path): - os.remove(path) - else: - logger.warning( - 'Trying to delete missing playlist file %s', path) - del self._playlists[uri] - logger.info('Deleted playlist %s', uri) + path = translator.uri_to_path(uri) + try: + os.remove(self._abspath(path)) + except EnvironmentError as e: + log_environment_error('Error deleting playlist %s' % uri, e) + + def get_items(self, uri): + path = translator.uri_to_path(uri) + try: + with self._open(path, 'r') as fp: + items = translator.load_items(fp, self._playlists_dir) + except EnvironmentError as e: + log_environment_error('Error reading playlist %s' % uri, e) else: - logger.warning('Trying to delete unknown playlist %s', uri) + return items def lookup(self, uri): - return self._playlists.get(uri) + path = translator.uri_to_path(uri) + try: + with self._open(path, 'r') as fp: + items = translator.load_items(fp, self._playlists_dir) + mtime = os.path.getmtime(self._abspath(path)) + except EnvironmentError as e: + log_environment_error('Error reading playlist %s' % uri, e) + else: + return translator.playlist(path, items, mtime) def refresh(self): - playlists = {} - - encoding = sys.getfilesystemencoding() - for path in glob.glob(os.path.join(self._playlists_dir, b'*.m3u*')): - relpath = os.path.basename(path) - uri = translator.path_to_playlist_uri(relpath) - name = os.path.splitext(relpath)[0].decode(encoding, 'replace') - tracks = translator.parse_m3u(path) - playlists[uri] = Playlist(uri=uri, name=name, tracks=tracks) - - self._playlists = playlists - - logger.info( - 'Loaded %d M3U playlists from %s', - len(playlists), self._playlists_dir) - - # TODO Trigger playlists_loaded event? + pass # nothing to do def save(self, playlist): - assert playlist.uri, 'Cannot save playlist without URI' - assert playlist.uri in self._playlists, \ - 'Cannot save playlist with unknown URI: %s' % playlist.uri - - original_uri = playlist.uri - playlist = self._save_m3u(playlist) - if playlist.uri != original_uri and original_uri in self._playlists: - self.delete(original_uri) - self._playlists[playlist.uri] = playlist - return playlist - - def _sanitize_m3u_name(self, name, encoding=sys.getfilesystemencoding()): - name = self._invalid_filename_chars.sub('|', name.strip()) - # make sure we end up with a valid path segment - name = name.encode(encoding, errors='replace') - name = os.path.basename(name) # paranoia? - name = name.decode(encoding) - return name - - def _save_m3u(self, playlist, encoding=sys.getfilesystemencoding()): - if playlist.name: - name = self._sanitize_m3u_name(playlist.name, encoding) - uri = translator.path_to_playlist_uri( - name.encode(encoding) + b'.m3u') - path = translator.playlist_uri_to_path(uri, self._playlists_dir) - elif playlist.uri: - uri = playlist.uri - path = translator.playlist_uri_to_path(uri, self._playlists_dir) - name, _ = os.path.splitext(os.path.basename(path).decode(encoding)) + path = translator.uri_to_path(playlist.uri) + name = translator.name_from_path(path) + try: + with self._open(path, 'w') as fp: + translator.dump_items(playlist.tracks, fp) + if playlist.name and playlist.name != name: + opath, ext = os.path.splitext(path) + path = translator.path_from_name(playlist.name.strip()) + ext + os.rename(self._abspath(opath + ext), self._abspath(path)) + mtime = os.path.getmtime(self._abspath(path)) + except EnvironmentError as e: + log_environment_error('Error saving playlist %s' % playlist.uri, e) else: - raise ValueError('M3U playlist needs name or URI') - translator.save_m3u(path, playlist.tracks, 'latin1') - # assert playlist name matches file name/uri - return playlist.replace(uri=uri, name=name) + return translator.playlist(path, playlist.tracks, mtime) + + def _abspath(self, path): + return os.path.join(self._playlists_dir, path) + + def _open(self, path, mode='r'): + if path.endswith(b'.m3u8'): + encoding = 'utf-8' + else: + encoding = self._default_encoding + if not os.path.isabs(path): + path = os.path.join(self._playlists_dir, path) + return io.open(path, mode, encoding=encoding, errors='replace') diff --git a/mopidy/m3u/translator.py b/mopidy/m3u/translator.py index 764cf84b..da74cc1b 100644 --- a/mopidy/m3u/translator.py +++ b/mopidy/m3u/translator.py @@ -1,130 +1,119 @@ -from __future__ import absolute_import, unicode_literals +from __future__ import absolute_import, print_function, unicode_literals -import codecs -import logging import os -import re -from mopidy import compat -from mopidy.compat import urllib -from mopidy.internal import encoding, path -from mopidy.models import Track +from mopidy import models + +from . import Extension + +try: + from urllib.parse import quote_from_bytes, unquote_to_bytes +except ImportError: + import urllib + + def quote_from_bytes(bytes, safe=b'/'): + # Python 3 returns Unicode string + return urllib.quote(bytes, safe).decode('utf-8') + + def unquote_to_bytes(string): + if isinstance(string, bytes): + return urllib.unquote(string) + else: + return urllib.unquote(string.encode('utf-8')) + +try: + from urllib.parse import urlsplit, urlunsplit +except ImportError: + from urlparse import urlsplit, urlunsplit -M3U_EXTINF_RE = re.compile(r'#EXTINF:(-1|\d+),(.*)') +try: + from os import fsencode, fsdecode +except ImportError: + import sys -logger = logging.getLogger(__name__) + # no 'surrogateescape' in Python 2; 'replace' for backward compatibility + def fsencode(filename, encoding=sys.getfilesystemencoding()): + return filename.encode(encoding, 'replace') + + def fsdecode(filename, encoding=sys.getfilesystemencoding()): + return filename.decode(encoding, 'replace') -def playlist_uri_to_path(uri, playlists_dir): - if not uri.startswith('m3u:'): - raise ValueError('Invalid URI %s' % uri) - file_path = path.uri_to_path(uri) - return os.path.join(playlists_dir, file_path) +def path_to_uri(path, scheme=Extension.ext_name): + """Convert file path to URI.""" + assert isinstance(path, bytes), 'Mopidy paths should be bytes' + uripath = quote_from_bytes(os.path.normpath(path)) + return urlunsplit((scheme, None, uripath, None, None)) -def path_to_playlist_uri(relpath): - """Convert path relative to playlists_dir to M3U URI.""" - if isinstance(relpath, compat.text_type): - relpath = relpath.encode('utf-8') - return b'm3u:%s' % urllib.parse.quote(relpath) +def uri_to_path(uri): + """Convert URI to file path.""" + # TODO: decide on Unicode vs. bytes for URIs + return unquote_to_bytes(urlsplit(uri).path) -def m3u_extinf_to_track(line): - """Convert extended M3U directive to track template.""" - m = M3U_EXTINF_RE.match(line) - if not m: - logger.warning('Invalid extended M3U directive: %s', line) - return Track() - (runtime, title) = m.groups() - if int(runtime) > 0: - return Track(name=title, length=1000 * int(runtime)) - else: - return Track(name=title) - - -def parse_m3u(file_path, media_dir=None): - r""" - Convert M3U file list to list of tracks - - Example M3U data:: - - # This is a comment - Alternative\Band - Song.mp3 - Classical\Other Band - New Song.mp3 - Stuff.mp3 - D:\More Music\Foo.mp3 - http://www.example.com:8000/Listen.pls - http://www.example.com/~user/Mine.mp3 - - Example extended M3U data:: - - #EXTM3U - #EXTINF:123, Sample artist - Sample title - Sample.mp3 - #EXTINF:321,Example Artist - Example title - Greatest Hits\Example.ogg - #EXTINF:-1,Radio XMP - http://mp3stream.example.com:8000/ - - - Relative paths of songs should be with respect to location of M3U. - - Paths are normally platform specific. - - Lines starting with # are ignored, except for extended M3U directives. - - Track.name and Track.length are set from extended M3U directives. - - m3u files are latin-1. - - m3u8 files are utf-8 - """ - # TODO: uris as bytes - file_encoding = 'utf-8' if file_path.endswith(b'.m3u8') else 'latin1' - - tracks = [] +def name_from_path(path): + """Extract name from file path.""" + name, _ = os.path.splitext(os.path.basename(path)) try: - with codecs.open(file_path, 'rb', file_encoding, 'replace') as m3u: - contents = m3u.readlines() - except IOError as error: - logger.warning('Couldn\'t open m3u: %s', encoding.locale_decode(error)) - return tracks + return fsdecode(name) + except UnicodeError: + return None - if not contents: - return tracks - # Strip newlines left by codecs - contents = [line.strip() for line in contents] +def path_from_name(name, ext=None, sep='|'): + """Convert name with optional extension to file path.""" + if ext: + return fsencode(name.replace(os.sep, sep) + ext) + else: + return fsencode(name.replace(os.sep, sep)) - extended = contents[0].startswith('#EXTM3U') - track = Track() - for line in contents: +def path_to_ref(path): + return models.Ref.playlist( + uri=path_to_uri(path), + name=name_from_path(path) + ) + + +def load_items(fp, basedir): + refs = [] + name = None + for line in filter(None, (line.strip() for line in fp)): if line.startswith('#'): - if extended and line.startswith('#EXTINF'): - track = m3u_extinf_to_track(line) + if line.startswith('#EXTINF:'): + name = line.partition(',')[2] continue - if not track.name: - name = os.path.basename(os.path.splitext(line)[0]) - track = track.replace(name=urllib.parse.unquote(name)) - if urllib.parse.urlsplit(line).scheme: - tracks.append(track.replace(uri=line)) - elif os.path.normpath(line) == os.path.abspath(line): - uri = path.path_to_uri(line) - tracks.append(track.replace(uri=uri)) - elif media_dir is not None: - uri = path.path_to_uri(os.path.join(media_dir, line)) - tracks.append(track.replace(uri=uri)) - - track = Track() - return tracks + elif not urlsplit(line).scheme: + path = os.path.join(basedir, fsencode(line)) + if not name: + name = name_from_path(path) + uri = path_to_uri(path, scheme='file') + else: + uri = line # do *not* extract name from (stream?) URI path + refs.append(models.Ref.track(uri=uri, name=name)) + name = None + return refs -def save_m3u(filename, tracks, encoding='latin1', errors='replace'): - extended = any(track.name for track in tracks) - # codecs.open() always uses binary mode, just being explicit here - with codecs.open(filename, 'wb', encoding, errors) as m3u: - if extended: - m3u.write('#EXTM3U' + os.linesep) - for track in tracks: - if extended and track.name: - m3u.write('#EXTINF:%d,%s%s' % ( - track.length // 1000 if track.length else -1, - track.name, - os.linesep)) - m3u.write(track.uri + os.linesep) +def dump_items(items, fp): + if any(item.name for item in items): + print('#EXTM3U', file=fp) + for item in items: + if item.name: + print('#EXTINF:-1,%s' % item.name, file=fp) + # TODO: convert file URIs to (relative) paths? + if isinstance(item.uri, bytes): + print(item.uri.decode('utf-8'), file=fp) + else: + print(item.uri, file=fp) + + +def playlist(path, items=[], mtime=None): + return models.Playlist( + uri=path_to_uri(path), + name=name_from_path(path), + tracks=[models.Track(uri=item.uri, name=item.name) for item in items], + last_modified=(int(mtime * 1000) if mtime else None) + ) diff --git a/tests/m3u/test_playlists.py b/tests/m3u/test_playlists.py index edebe65b..664da9e9 100644 --- a/tests/m3u/test_playlists.py +++ b/tests/m3u/test_playlists.py @@ -7,14 +7,12 @@ import platform import shutil import tempfile import unittest -import urllib import pykka from mopidy import core from mopidy.internal import deprecation -from mopidy.m3u import actor -from mopidy.m3u.translator import playlist_uri_to_path +from mopidy.m3u.backend import M3UBackend from mopidy.models import Playlist, Track from tests import dummy_audio, path_to_data_dir @@ -22,9 +20,12 @@ from tests.m3u import generate_song class M3UPlaylistsProviderTest(unittest.TestCase): - backend_class = actor.M3UBackend + backend_class = M3UBackend config = { 'm3u': { + 'enabled': True, + 'default_encoding': 'latin-1', + 'default_extension': '.m3u', 'playlists_dir': path_to_data_dir(''), } } @@ -34,7 +35,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase): self.playlists_dir = self.config['m3u']['playlists_dir'] audio = dummy_audio.create_proxy() - backend = actor.M3UBackend.start( + backend = M3UBackend.start( config=self.config, audio=audio).proxy() self.core = core.Core(backends=[backend]) @@ -46,7 +47,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase): def test_created_playlist_is_persisted(self): uri = 'm3u:test.m3u' - path = playlist_uri_to_path(uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'test.m3u') self.assertFalse(os.path.exists(path)) playlist = self.core.playlists.create('test') @@ -57,7 +58,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase): def test_create_sanitizes_playlist_name(self): playlist = self.core.playlists.create(' ../../test FOO baR ') self.assertEqual('..|..|test FOO baR', playlist.name) - path = playlist_uri_to_path(playlist.uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'..|..|test FOO baR.m3u') self.assertEqual(self.playlists_dir, os.path.dirname(path)) self.assertTrue(os.path.exists(path)) @@ -65,8 +66,8 @@ class M3UPlaylistsProviderTest(unittest.TestCase): uri1 = 'm3u:test1.m3u' uri2 = 'm3u:test2.m3u' - path1 = playlist_uri_to_path(uri1, self.playlists_dir) - path2 = playlist_uri_to_path(uri2, self.playlists_dir) + path1 = os.path.join(self.playlists_dir, b'test1.m3u') + path2 = os.path.join(self.playlists_dir, b'test2.m3u') playlist = self.core.playlists.create('test1') self.assertEqual('test1', playlist.name) @@ -82,7 +83,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase): def test_deleted_playlist_is_removed(self): uri = 'm3u:test.m3u' - path = playlist_uri_to_path(uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'test.m3u') self.assertFalse(os.path.exists(path)) @@ -98,7 +99,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase): track = Track(uri=generate_song(1)) playlist = self.core.playlists.create('test') playlist = self.core.playlists.save(playlist.replace(tracks=[track])) - path = playlist_uri_to_path(playlist.uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'test.m3u') with open(path) as f: contents = f.read() @@ -109,32 +110,32 @@ class M3UPlaylistsProviderTest(unittest.TestCase): track = Track(uri=generate_song(1), name='Test', length=60000) playlist = self.core.playlists.create('test') playlist = self.core.playlists.save(playlist.replace(tracks=[track])) - path = playlist_uri_to_path(playlist.uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'test.m3u') with open(path) as f: m3u = f.read().splitlines() - self.assertEqual(['#EXTM3U', '#EXTINF:60,Test', track.uri], m3u) + self.assertEqual(['#EXTM3U', '#EXTINF:-1,Test', track.uri], m3u) def test_latin1_playlist_contents_is_written_to_disk(self): track = Track(uri=generate_song(1), name='Test\x9f', length=60000) playlist = self.core.playlists.create('test') playlist = self.core.playlists.save(playlist.copy(tracks=[track])) - path = playlist_uri_to_path(playlist.uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'test.m3u') with open(path, 'rb') as f: m3u = f.read().splitlines() - self.assertEqual([b'#EXTM3U', b'#EXTINF:60,Test\x9f', track.uri], m3u) + self.assertEqual([b'#EXTM3U', b'#EXTINF:-1,Test\x9f', track.uri], m3u) def test_utf8_playlist_contents_is_replaced_and_written_to_disk(self): track = Track(uri=generate_song(1), name='Test\u07b4', length=60000) playlist = self.core.playlists.create('test') playlist = self.core.playlists.save(playlist.copy(tracks=[track])) - path = playlist_uri_to_path(playlist.uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'test.m3u') with open(path, 'rb') as f: m3u = f.read().splitlines() - self.assertEqual([b'#EXTM3U', b'#EXTINF:60,Test?', track.uri], m3u) + self.assertEqual([b'#EXTM3U', b'#EXTINF:-1,Test?', track.uri], m3u) def test_playlists_are_loaded_at_startup(self): track = Track(uri='dummy:track:path2') @@ -149,8 +150,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase): self.assertEqual(track.uri, result.tracks[0].uri) def test_load_playlist_with_nonfilesystem_encoding_of_filename(self): - uri = 'm3u:%s.m3u' % urllib.quote('øæå'.encode('latin-1')) - path = playlist_uri_to_path(uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, 'øæå.m3u'.encode('latin-1')) with open(path, 'wb+') as f: f.write(b'#EXTM3U\n') @@ -198,7 +198,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase): playlist = self.core.playlists.create('test') self.assertEqual(playlist, self.core.playlists.lookup(playlist.uri)) - path = playlist_uri_to_path(playlist.uri, self.playlists_dir) + path = os.path.join(self.playlists_dir, b'test.m3u') self.assertTrue(os.path.exists(path)) os.remove(path) @@ -245,12 +245,9 @@ class M3UPlaylistsProviderTest(unittest.TestCase): def test_save_playlist_with_new_uri(self): uri = 'm3u:test.m3u' - - with self.assertRaises(AssertionError): - self.core.playlists.save(Playlist(uri=uri)) - - path = playlist_uri_to_path(uri, self.playlists_dir) - self.assertFalse(os.path.exists(path)) + self.core.playlists.save(Playlist(uri=uri)) + path = os.path.join(self.playlists_dir, b'test.m3u') + self.assertTrue(os.path.exists(path)) def test_playlist_with_unknown_track(self): track = Track(uri='file:///dev/null') diff --git a/tests/m3u/test_translator.py b/tests/m3u/test_translator.py index 88387cb3..35efed4c 100644 --- a/tests/m3u/test_translator.py +++ b/tests/m3u/test_translator.py @@ -2,137 +2,145 @@ from __future__ import absolute_import, unicode_literals -import os -import tempfile -import unittest +import io -from mopidy.internal import path from mopidy.m3u import translator -from mopidy.models import Track - -from tests import path_to_data_dir - -data_dir = path_to_data_dir('') -song1_path = path_to_data_dir('song1.mp3') -song2_path = path_to_data_dir('song2.mp3') -song3_path = path_to_data_dir('φοο.mp3') -encoded_path = path_to_data_dir('æøå.mp3') -song1_uri = path.path_to_uri(song1_path) -song2_uri = path.path_to_uri(song2_path) -song3_uri = path.path_to_uri(song3_path) -song4_uri = 'http://example.com/foo%20bar.mp3' -encoded_uri = path.path_to_uri(encoded_path) -song1_track = Track(name='song1', uri=song1_uri) -song2_track = Track(name='song2', uri=song2_uri) -song3_track = Track(name='φοο', uri=song3_uri) -song4_track = Track(name='foo bar', uri=song4_uri) -encoded_track = Track(name='æøå', uri=encoded_uri) -song1_ext_track = song1_track.replace(name='Song #1') -song2_ext_track = song2_track.replace(name='Song #2', length=60000) -encoded_ext_track = encoded_track.replace(name='æøå') +from mopidy.models import Playlist, Ref, Track -# FIXME use mock instead of tempfile.NamedTemporaryFile - -class M3UToUriTest(unittest.TestCase): - - def parse(self, name): - return translator.parse_m3u(name, data_dir) - - def test_empty_file(self): - tracks = self.parse(path_to_data_dir('empty.m3u')) - self.assertEqual([], tracks) - - def test_basic_file(self): - tracks = self.parse(path_to_data_dir('one.m3u')) - self.assertEqual([song1_track], tracks) - - def test_file_with_comment(self): - tracks = self.parse(path_to_data_dir('comment.m3u')) - self.assertEqual([song1_track], tracks) - - def test_file_is_relative_to_correct_dir(self): - with tempfile.NamedTemporaryFile(delete=False) as tmp: - tmp.write('song1.mp3') - try: - tracks = self.parse(tmp.name) - self.assertEqual([song1_track], tracks) - finally: - if os.path.exists(tmp.name): - os.remove(tmp.name) - - def test_file_with_absolute_files(self): - with tempfile.NamedTemporaryFile(delete=False) as tmp: - tmp.write(song1_path) - try: - tracks = self.parse(tmp.name) - self.assertEqual([song1_track], tracks) - finally: - if os.path.exists(tmp.name): - os.remove(tmp.name) - - def test_file_with_multiple_absolute_files(self): - with tempfile.NamedTemporaryFile(delete=False) as tmp: - tmp.write(song1_path + '\n') - tmp.write('# comment \n') - tmp.write(song2_path) - try: - tracks = self.parse(tmp.name) - self.assertEqual([song1_track, song2_track], tracks) - finally: - if os.path.exists(tmp.name): - os.remove(tmp.name) - - def test_file_with_uri(self): - with tempfile.NamedTemporaryFile(delete=False) as tmp: - tmp.write(song1_uri) - tmp.write('\n') - tmp.write(song4_uri) - try: - tracks = self.parse(tmp.name) - self.assertEqual([song1_track, song4_track], tracks) - finally: - if os.path.exists(tmp.name): - os.remove(tmp.name) - - def test_encoding_is_latin1(self): - tracks = self.parse(path_to_data_dir('encoding.m3u')) - self.assertEqual([encoded_track], tracks) - - def test_open_missing_file(self): - tracks = self.parse(path_to_data_dir('non-existant.m3u')) - self.assertEqual([], tracks) - - def test_empty_ext_file(self): - tracks = self.parse(path_to_data_dir('empty-ext.m3u')) - self.assertEqual([], tracks) - - def test_basic_ext_file(self): - tracks = self.parse(path_to_data_dir('one-ext.m3u')) - self.assertEqual([song1_ext_track], tracks) - - def test_multi_ext_file(self): - tracks = self.parse(path_to_data_dir('two-ext.m3u')) - self.assertEqual([song1_ext_track, song2_ext_track], tracks) - - def test_ext_file_with_comment(self): - tracks = self.parse(path_to_data_dir('comment-ext.m3u')) - self.assertEqual([song1_ext_track], tracks) - - def test_ext_encoding_is_latin1(self): - tracks = self.parse(path_to_data_dir('encoding-ext.m3u')) - self.assertEqual([encoded_ext_track], tracks) - - def test_m3u8_file(self): - with tempfile.NamedTemporaryFile(suffix='.m3u8', delete=False) as tmp: - tmp.write(song3_path) - try: - tracks = self.parse(tmp.name) - self.assertEqual([song3_track], tracks) - finally: - if os.path.exists(tmp.name): - os.remove(tmp.name) +def loads(s, basedir=b'.'): + return translator.load_items(io.StringIO(s), basedir=basedir) -class URItoM3UTest(unittest.TestCase): - pass +def dumps(items): + fp = io.StringIO() + translator.dump_items(items, fp) + return fp.getvalue() + + +def test_path_to_uri(): + from mopidy.m3u.translator import path_to_uri + + assert path_to_uri(b'test') == 'm3u:test' + assert path_to_uri(b'test.m3u') == 'm3u:test.m3u' + assert path_to_uri(b'./test.m3u') == 'm3u:test.m3u' + assert path_to_uri(b'foo/../test.m3u') == 'm3u:test.m3u' + assert path_to_uri(b'Test Playlist.m3u') == 'm3u:Test%20Playlist.m3u' + assert path_to_uri(b'test.mp3', scheme='file') == 'file:///test.mp3' + + +def test_latin1_path_to_uri(): + path = 'æøå.m3u'.encode('latin-1') + assert translator.path_to_uri(path) == 'm3u:%E6%F8%E5.m3u' + + +def test_utf8_path_to_uri(): + path = 'æøå.m3u'.encode('utf-8') + assert translator.path_to_uri(path) == 'm3u:%C3%A6%C3%B8%C3%A5.m3u' + + +def test_uri_to_path(): + from mopidy.m3u.translator import uri_to_path + + assert uri_to_path('m3u:test.m3u') == b'test.m3u' + assert uri_to_path(b'm3u:test.m3u') == b'test.m3u' + assert uri_to_path('m3u:Test%20Playlist.m3u') == b'Test Playlist.m3u' + assert uri_to_path(b'm3u:Test%20Playlist.m3u') == b'Test Playlist.m3u' + assert uri_to_path('m3u:%E6%F8%E5.m3u') == b'\xe6\xf8\xe5.m3u' + assert uri_to_path(b'm3u:%E6%F8%E5.m3u') == b'\xe6\xf8\xe5.m3u' + assert uri_to_path('file:///test.mp3') == b'/test.mp3' + assert uri_to_path(b'file:///test.mp3') == b'/test.mp3' + + +def test_name_from_path(): + from mopidy.m3u.translator import name_from_path + + assert name_from_path(b'test') == 'test' + assert name_from_path(b'test.m3u') == 'test' + assert name_from_path(b'../test.m3u') == 'test' + + +def test_path_from_name(): + from mopidy.m3u.translator import path_from_name + + assert path_from_name('test') == b'test' + assert path_from_name('test', '.m3u') == b'test.m3u' + assert path_from_name('foo/bar', sep='-') == b'foo-bar' + + +def test_path_to_ref(): + from mopidy.m3u.translator import path_to_ref + + assert path_to_ref(b'test.m3u') == Ref.playlist( + uri='m3u:test.m3u', name='test' + ) + assert path_to_ref(b'Test Playlist.m3u') == Ref.playlist( + uri='m3u:Test%20Playlist.m3u', name='Test Playlist' + ) + + +def test_load_items(): + assert loads('') == [] + + assert loads('test.mp3', basedir=b'/playlists') == [ + Ref.track(uri='file:///playlists/test.mp3', name='test') + ] + assert loads('../test.mp3', basedir=b'/playlists') == [ + Ref.track(uri='file:///test.mp3', name='test') + ] + assert loads('/test.mp3') == [ + Ref.track(uri='file:///test.mp3', name='test') + ] + assert loads('file:///test.mp3') == [ + Ref.track(uri='file:///test.mp3') + ] + assert loads('http://example.com/stream') == [ + Ref.track(uri='http://example.com/stream') + ] + + assert loads('#EXTM3U\n#EXTINF:42,Test\nfile:///test.mp3\n') == [ + Ref.track(uri='file:///test.mp3', name='Test') + ] + assert loads('#EXTM3U\n#EXTINF:-1,Test\nhttp://example.com/stream\n') == [ + Ref.track(uri='http://example.com/stream', name='Test') + ] + + +def test_dump_items(): + assert dumps([]) == '' + assert dumps([Ref.track(uri='file:///test.mp3')]) == ( + 'file:///test.mp3\n' + ) + assert dumps([Ref.track(uri='file:///test.mp3', name='test')]) == ( + '#EXTM3U\n' + '#EXTINF:-1,test\n' + 'file:///test.mp3\n' + ) + assert dumps([Track(uri='file:///test.mp3', name='test', length=42)]) == ( + '#EXTM3U\n' + '#EXTINF:-1,test\n' + 'file:///test.mp3\n' + ) + assert dumps([Track(uri='http://example.com/stream')]) == ( + 'http://example.com/stream\n' + ) + assert dumps([Track(uri='http://example.com/stream', name='Test')]) == ( + '#EXTM3U\n' + '#EXTINF:-1,Test\n' + 'http://example.com/stream\n' + ) + + +def test_playlist(): + from mopidy.m3u.translator import playlist + + assert playlist(b'test.m3u') == Playlist( + uri='m3u:test.m3u', + name='test' + ) + assert playlist(b'test.m3u', [Ref(uri='file:///test.mp3')], 1) == Playlist( + uri='m3u:test.m3u', + name='test', + tracks=[Track(uri='file:///test.mp3')], + last_modified=1000 + )