diff --git a/mopidy/local/playlists.py b/mopidy/local/playlists.py index deeae2b5..10a97b39 100644 --- a/mopidy/local/playlists.py +++ b/mopidy/local/playlists.py @@ -3,15 +3,14 @@ from __future__ import absolute_import, division, unicode_literals import glob import logging import os -import shutil +import sys from mopidy import backend from mopidy.models import Playlist -from mopidy.utils import formatting, path +from .translator import local_playlist_uri_to_path, path_to_local_playlist_uri from .translator import parse_m3u - logger = logging.getLogger(__name__) @@ -23,18 +22,27 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider): self.refresh() def create(self, name): - name = formatting.slugify(name) - uri = 'local:playlist:%s.m3u' % name - playlist = Playlist(uri=uri, name=name) - return self.save(playlist) + playlist = self._save_m3u(Playlist(name=name)) + old_playlist = self.lookup(playlist.uri) + if old_playlist is not None: + index = self._playlists.index(old_playlist) + self._playlists[index] = playlist + else: + self._playlists.append(playlist) + logger.info('Created playlist %s', playlist.uri) + return playlist def delete(self, uri): playlist = self.lookup(uri) if not playlist: + logger.warn('Trying to delete unknown playlist %s', uri) return - + path = local_playlist_uri_to_path(uri, self._playlists_dir) + if os.path.exists(path): + os.remove(path) + else: + logger.warn('Trying to delete missing playlist file %s', path) self._playlists.remove(playlist) - self._delete_m3u(playlist.uri) def lookup(self, uri): # TODO: store as {uri: playlist}? @@ -45,12 +53,14 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider): def refresh(self): playlists = [] - for m3u in glob.glob(os.path.join(self._playlists_dir, '*.m3u')): - name = os.path.splitext(os.path.basename(m3u))[0] - uri = 'local:playlist:%s' % name + encoding = sys.getfilesystemencoding() + for path in glob.glob(os.path.join(self._playlists_dir, b'*.m3u')): + relpath = os.path.basename(path) + name = os.path.splitext(relpath)[0].decode(encoding) + uri = path_to_local_playlist_uri(relpath) tracks = [] - for track in parse_m3u(m3u, self._media_dir): + for track in parse_m3u(path, self._media_dir): tracks.append(track) playlist = Playlist(uri=uri, name=name, tracks=tracks) @@ -67,38 +77,53 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider): def save(self, playlist): assert playlist.uri, 'Cannot save playlist without URI' - old_playlist = self.lookup(playlist.uri) + uri = playlist.uri + # TODO: require existing (created) playlist - currently, this + # is a *should* in https://docs.mopidy.com/en/latest/api/core/ + try: + index = self._playlists.index(self.lookup(uri)) + except ValueError: + logger.warn('Saving playlist with new URI %s', uri) + index = -1 - if old_playlist and playlist.name != old_playlist.name: - playlist = playlist.copy(name=formatting.slugify(playlist.name)) - playlist = self._rename_m3u(playlist) - - self._save_m3u(playlist) - - if old_playlist is not None: - index = self._playlists.index(old_playlist) + playlist = self._save_m3u(playlist) + if index >= 0 and uri != playlist.uri: + path = local_playlist_uri_to_path(uri, self._playlists_dir) + if os.path.exists(path): + os.remove(path) + else: + logger.warn('Trying to delete missing playlist file %s', path) + if index >= 0: self._playlists[index] = playlist else: self._playlists.append(playlist) - return playlist - def _m3u_uri_to_path(self, uri): - # TODO: create uri handling helpers for local uri types. - file_path = path.uri_to_path(uri).split(':', 1)[1] - file_path = os.path.join(self._playlists_dir, file_path) - path.check_file_path_is_inside_base_dir(file_path, self._playlists_dir) - return file_path - def _write_m3u_extinf(self, file_handle, track): title = track.name.encode('latin-1', 'replace') runtime = track.length // 1000 if track.length else -1 file_handle.write('#EXTINF:' + str(runtime) + ',' + title + '\n') - def _save_m3u(self, playlist): - file_path = self._m3u_uri_to_path(playlist.uri) + def _sanitize_m3u_name(self, name, encoding=sys.getfilesystemencoding()): + name = name.encode(encoding, errors='replace') + name = os.path.basename(name) + name = name.decode(encoding) + return name + + def _save_m3u(self, playlist, encoding=sys.getfilesystemencoding()): + if playlist.name: + name = self._sanitize_m3u_name(playlist.name, encoding) + uri = path_to_local_playlist_uri(name.encode(encoding) + b'.m3u') + path = local_playlist_uri_to_path(uri, self._playlists_dir) + elif playlist.uri: + uri = playlist.uri + path = local_playlist_uri_to_path(uri, self._playlists_dir) + name, _ = os.path.splitext(os.path.basename(path).decode(encoding)) + else: + raise ValueError('M3U playlist needs name or URI') extended = any(track.name for track in playlist.tracks) - with open(file_path, 'w') as file_handle: + + with open(path, 'w') as file_handle: if extended: file_handle.write('#EXTM3U\n') for track in playlist.tracks: @@ -106,17 +131,5 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider): self._write_m3u_extinf(file_handle, track) file_handle.write(track.uri + '\n') - def _delete_m3u(self, uri): - file_path = self._m3u_uri_to_path(uri) - if os.path.exists(file_path): - os.remove(file_path) - - def _rename_m3u(self, playlist): - dst_name = formatting.slugify(playlist.name) - dst_uri = 'local:playlist:%s.m3u' % dst_name - - src_file_path = self._m3u_uri_to_path(playlist.uri) - dst_file_path = self._m3u_uri_to_path(dst_uri) - - shutil.move(src_file_path, dst_file_path) - return playlist.copy(uri=dst_uri) + # assert playlist name matches file name/uri + return playlist.copy(uri=uri, name=name) diff --git a/mopidy/local/translator.py b/mopidy/local/translator.py index ab9fc28f..d0c19c27 100644 --- a/mopidy/local/translator.py +++ b/mopidy/local/translator.py @@ -37,8 +37,15 @@ def local_track_uri_to_path(uri, media_dir): return os.path.join(media_dir, file_path) +def local_playlist_uri_to_path(uri, playlists_dir): + if not uri.startswith('local:playlist:'): + raise ValueError('Invalid URI %s' % uri) + file_path = uri_to_path(uri).split(b':', 1)[1] + return os.path.join(playlists_dir, file_path) + + def path_to_local_track_uri(relpath): - """Convert path releative to media_dir to local track URI.""" + """Convert path relative to media_dir to local track URI.""" if isinstance(relpath, compat.text_type): relpath = relpath.encode('utf-8') return b'local:track:%s' % urllib.quote(relpath) @@ -51,6 +58,13 @@ def path_to_local_directory_uri(relpath): return b'local:directory:%s' % urllib.quote(relpath) +def path_to_local_playlist_uri(relpath): + """Convert path relative to playlists_dir to local playlist URI.""" + if isinstance(relpath, compat.text_type): + relpath = relpath.encode('utf-8') + return b'local:playlist:%s' % urllib.quote(relpath) + + def m3u_extinf_to_track(line): """Convert extended M3U directive to track template.""" m = M3U_EXTINF_RE.match(line) diff --git a/tests/local/test_playlists.py b/tests/local/test_playlists.py index 3e9c280e..d52fed82 100644 --- a/tests/local/test_playlists.py +++ b/tests/local/test_playlists.py @@ -9,6 +9,7 @@ import pykka from mopidy import core from mopidy.local import actor +from mopidy.local.translator import local_playlist_uri_to_path from mopidy.models import Playlist, Track from tests import dummy_audio, path_to_data_dir @@ -41,49 +42,50 @@ class LocalPlaylistsProviderTest(unittest.TestCase): shutil.rmtree(self.playlists_dir) def test_created_playlist_is_persisted(self): - path = os.path.join(self.playlists_dir, 'test.m3u') + uri = 'local:playlist:test.m3u' + path = local_playlist_uri_to_path(uri, self.playlists_dir) self.assertFalse(os.path.exists(path)) - self.core.playlists.create('test') + playlist = self.core.playlists.create('test') + self.assertEqual('test', playlist.name) + self.assertEqual(uri, playlist.uri) self.assertTrue(os.path.exists(path)) - def test_create_slugifies_playlist_name(self): - path = os.path.join(self.playlists_dir, 'test-foo-bar.m3u') - self.assertFalse(os.path.exists(path)) - - playlist = self.core.playlists.create('test FOO baR') - self.assertEqual('test-foo-bar', playlist.name) - self.assertTrue(os.path.exists(path)) - - def test_create_slugifies_names_which_tries_to_change_directory(self): - path = os.path.join(self.playlists_dir, 'test-foo-bar.m3u') - self.assertFalse(os.path.exists(path)) - + def test_create_sanitizes_playlist_name(self): playlist = self.core.playlists.create('../../test FOO baR') - self.assertEqual('test-foo-bar', playlist.name) + self.assertEqual('test FOO baR', playlist.name) + path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir) + self.assertEqual(self.playlists_dir, os.path.dirname(path)) self.assertTrue(os.path.exists(path)) def test_saved_playlist_is_persisted(self): - path1 = os.path.join(self.playlists_dir, 'test1.m3u') - path2 = os.path.join(self.playlists_dir, 'test2-foo-bar.m3u') + uri1 = 'local:playlist:test1.m3u' + uri2 = 'local:playlist:test2.m3u' + + path1 = local_playlist_uri_to_path(uri1, self.playlists_dir) + path2 = local_playlist_uri_to_path(uri2, self.playlists_dir) playlist = self.core.playlists.create('test1') - + self.assertEqual('test1', playlist.name) + self.assertEqual(uri1, playlist.uri) self.assertTrue(os.path.exists(path1)) self.assertFalse(os.path.exists(path2)) - playlist = playlist.copy(name='test2 FOO baR') - playlist = self.core.playlists.save(playlist) - - self.assertEqual('test2-foo-bar', playlist.name) + playlist = self.core.playlists.save(playlist.copy(name='test2')) + self.assertEqual('test2', playlist.name) + self.assertEqual(uri2, playlist.uri) self.assertFalse(os.path.exists(path1)) self.assertTrue(os.path.exists(path2)) def test_deleted_playlist_is_removed(self): - path = os.path.join(self.playlists_dir, 'test.m3u') + uri = 'local:playlist:test.m3u' + path = local_playlist_uri_to_path(uri, self.playlists_dir) + self.assertFalse(os.path.exists(path)) playlist = self.core.playlists.create('test') + self.assertEqual('test', playlist.name) + self.assertEqual(uri, playlist.uri) self.assertTrue(os.path.exists(path)) self.core.playlists.delete(playlist.uri) @@ -92,24 +94,22 @@ class LocalPlaylistsProviderTest(unittest.TestCase): def test_playlist_contents_is_written_to_disk(self): track = Track(uri=generate_song(1)) playlist = self.core.playlists.create('test') - playlist_path = os.path.join(self.playlists_dir, 'test.m3u') - playlist = playlist.copy(tracks=[track]) - playlist = self.core.playlists.save(playlist) + playlist = self.core.playlists.save(playlist.copy(tracks=[track])) + path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir) - with open(playlist_path) as playlist_file: - contents = playlist_file.read() + with open(path) as f: + contents = f.read() self.assertEqual(track.uri, contents.strip()) def test_extended_playlist_contents_is_written_to_disk(self): track = Track(uri=generate_song(1), name='Test', length=60000) playlist = self.core.playlists.create('test') - playlist_path = os.path.join(self.playlists_dir, 'test.m3u') - playlist = playlist.copy(tracks=[track]) - playlist = self.core.playlists.save(playlist) + playlist = self.core.playlists.save(playlist.copy(tracks=[track])) + path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir) - with open(playlist_path) as playlist_file: - contents = playlist_file.read().splitlines() + with open(path) as f: + contents = f.read().splitlines() self.assertEqual(contents, ['#EXTM3U', '#EXTINF:60,Test', track.uri]) @@ -123,7 +123,7 @@ class LocalPlaylistsProviderTest(unittest.TestCase): self.assert_(backend.playlists.playlists) self.assertEqual( - 'local:playlist:test', backend.playlists.playlists[0].uri) + playlist.uri, backend.playlists.playlists[0].uri) self.assertEqual( playlist.name, backend.playlists.playlists[0].name) self.assertEqual( @@ -154,7 +154,7 @@ class LocalPlaylistsProviderTest(unittest.TestCase): self.assert_(not self.core.playlists.playlists) def test_delete_non_existant_playlist(self): - self.core.playlists.delete('file:///unknown/playlist') + self.core.playlists.delete('local:playlist:unknown') def test_delete_playlist_removes_it_from_the_collection(self): playlist = self.core.playlists.create('test') @@ -164,6 +164,19 @@ class LocalPlaylistsProviderTest(unittest.TestCase): self.assertNotIn(playlist, self.core.playlists.playlists) + def test_delete_playlist_without_file(self): + playlist = self.core.playlists.create('test') + self.assertIn(playlist, self.core.playlists.playlists) + + path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir) + self.assertTrue(os.path.exists(path)) + + os.remove(path) + self.assertFalse(os.path.exists(path)) + + self.core.playlists.delete(playlist.uri) + self.assertNotIn(playlist, self.core.playlists.playlists) + def test_filter_without_criteria(self): self.assertEqual( self.core.playlists.playlists, self.core.playlists.filter()) @@ -201,9 +214,13 @@ class LocalPlaylistsProviderTest(unittest.TestCase): self.assertEqual(original_playlist, looked_up_playlist) - @unittest.SkipTest def test_refresh(self): - pass + playlist = self.core.playlists.create('test') + self.assertIn(playlist, self.core.playlists.playlists) + + self.core.playlists.refresh() + + self.assertIn(playlist, self.core.playlists.playlists) def test_save_replaces_existing_playlist_with_updated_playlist(self): playlist1 = self.core.playlists.create('test1') @@ -214,6 +231,27 @@ class LocalPlaylistsProviderTest(unittest.TestCase): self.assertNotIn(playlist1, self.core.playlists.playlists) self.assertIn(playlist2, self.core.playlists.playlists) + def test_create_replaces_existing_playlist_with_updated_playlist(self): + track = Track(uri=generate_song(1)) + playlist1 = self.core.playlists.create('test') + playlist1 = self.core.playlists.save(playlist1.copy(tracks=[track])) + self.assertIn(playlist1, self.core.playlists.playlists) + + playlist2 = self.core.playlists.create('test') + self.assertEqual(playlist1.uri, playlist2.uri) + self.assertNotIn(playlist1, self.core.playlists.playlists) + self.assertIn(playlist2, self.core.playlists.playlists) + + def test_save_playlist_with_new_uri(self): + # you *should* not do this + uri = 'local:playlist:test.m3u' + playlist = self.core.playlists.save(Playlist(uri=uri)) + self.assertIn(playlist, self.core.playlists.playlists) + self.assertEqual(uri, playlist.uri) + self.assertEqual('test', playlist.name) + path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir) + self.assertTrue(os.path.exists(path)) + def test_playlist_with_unknown_track(self): track = Track(uri='file:///dev/null') playlist = self.core.playlists.create('test') @@ -224,7 +262,7 @@ class LocalPlaylistsProviderTest(unittest.TestCase): self.assert_(backend.playlists.playlists) self.assertEqual( - 'local:playlist:test', backend.playlists.playlists[0].uri) + 'local:playlist:test.m3u', backend.playlists.playlists[0].uri) self.assertEqual( playlist.name, backend.playlists.playlists[0].name) self.assertEqual(