Fix #937: Local playlists refactoring.
This commit is contained in:
parent
7520b13aa1
commit
dd54fdb086
@ -3,15 +3,14 @@ from __future__ import absolute_import, division, unicode_literals
|
||||
import glob
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
from mopidy import backend
|
||||
from mopidy.models import Playlist
|
||||
from mopidy.utils import formatting, path
|
||||
|
||||
from .translator import local_playlist_uri_to_path, path_to_local_playlist_uri
|
||||
from .translator import parse_m3u
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -23,18 +22,27 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider):
|
||||
self.refresh()
|
||||
|
||||
def create(self, name):
|
||||
name = formatting.slugify(name)
|
||||
uri = 'local:playlist:%s.m3u' % name
|
||||
playlist = Playlist(uri=uri, name=name)
|
||||
return self.save(playlist)
|
||||
playlist = self._save_m3u(Playlist(name=name))
|
||||
old_playlist = self.lookup(playlist.uri)
|
||||
if old_playlist is not None:
|
||||
index = self._playlists.index(old_playlist)
|
||||
self._playlists[index] = playlist
|
||||
else:
|
||||
self._playlists.append(playlist)
|
||||
logger.info('Created playlist %s', playlist.uri)
|
||||
return playlist
|
||||
|
||||
def delete(self, uri):
|
||||
playlist = self.lookup(uri)
|
||||
if not playlist:
|
||||
logger.warn('Trying to delete unknown playlist %s', uri)
|
||||
return
|
||||
|
||||
path = local_playlist_uri_to_path(uri, self._playlists_dir)
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
else:
|
||||
logger.warn('Trying to delete missing playlist file %s', path)
|
||||
self._playlists.remove(playlist)
|
||||
self._delete_m3u(playlist.uri)
|
||||
|
||||
def lookup(self, uri):
|
||||
# TODO: store as {uri: playlist}?
|
||||
@ -45,12 +53,14 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider):
|
||||
def refresh(self):
|
||||
playlists = []
|
||||
|
||||
for m3u in glob.glob(os.path.join(self._playlists_dir, '*.m3u')):
|
||||
name = os.path.splitext(os.path.basename(m3u))[0]
|
||||
uri = 'local:playlist:%s' % name
|
||||
encoding = sys.getfilesystemencoding()
|
||||
for path in glob.glob(os.path.join(self._playlists_dir, b'*.m3u')):
|
||||
relpath = os.path.basename(path)
|
||||
name = os.path.splitext(relpath)[0].decode(encoding)
|
||||
uri = path_to_local_playlist_uri(relpath)
|
||||
|
||||
tracks = []
|
||||
for track in parse_m3u(m3u, self._media_dir):
|
||||
for track in parse_m3u(path, self._media_dir):
|
||||
tracks.append(track)
|
||||
|
||||
playlist = Playlist(uri=uri, name=name, tracks=tracks)
|
||||
@ -67,38 +77,53 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider):
|
||||
def save(self, playlist):
|
||||
assert playlist.uri, 'Cannot save playlist without URI'
|
||||
|
||||
old_playlist = self.lookup(playlist.uri)
|
||||
uri = playlist.uri
|
||||
# TODO: require existing (created) playlist - currently, this
|
||||
# is a *should* in https://docs.mopidy.com/en/latest/api/core/
|
||||
try:
|
||||
index = self._playlists.index(self.lookup(uri))
|
||||
except ValueError:
|
||||
logger.warn('Saving playlist with new URI %s', uri)
|
||||
index = -1
|
||||
|
||||
if old_playlist and playlist.name != old_playlist.name:
|
||||
playlist = playlist.copy(name=formatting.slugify(playlist.name))
|
||||
playlist = self._rename_m3u(playlist)
|
||||
|
||||
self._save_m3u(playlist)
|
||||
|
||||
if old_playlist is not None:
|
||||
index = self._playlists.index(old_playlist)
|
||||
playlist = self._save_m3u(playlist)
|
||||
if index >= 0 and uri != playlist.uri:
|
||||
path = local_playlist_uri_to_path(uri, self._playlists_dir)
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
else:
|
||||
logger.warn('Trying to delete missing playlist file %s', path)
|
||||
if index >= 0:
|
||||
self._playlists[index] = playlist
|
||||
else:
|
||||
self._playlists.append(playlist)
|
||||
|
||||
return playlist
|
||||
|
||||
def _m3u_uri_to_path(self, uri):
|
||||
# TODO: create uri handling helpers for local uri types.
|
||||
file_path = path.uri_to_path(uri).split(':', 1)[1]
|
||||
file_path = os.path.join(self._playlists_dir, file_path)
|
||||
path.check_file_path_is_inside_base_dir(file_path, self._playlists_dir)
|
||||
return file_path
|
||||
|
||||
def _write_m3u_extinf(self, file_handle, track):
|
||||
title = track.name.encode('latin-1', 'replace')
|
||||
runtime = track.length // 1000 if track.length else -1
|
||||
file_handle.write('#EXTINF:' + str(runtime) + ',' + title + '\n')
|
||||
|
||||
def _save_m3u(self, playlist):
|
||||
file_path = self._m3u_uri_to_path(playlist.uri)
|
||||
def _sanitize_m3u_name(self, name, encoding=sys.getfilesystemencoding()):
|
||||
name = name.encode(encoding, errors='replace')
|
||||
name = os.path.basename(name)
|
||||
name = name.decode(encoding)
|
||||
return name
|
||||
|
||||
def _save_m3u(self, playlist, encoding=sys.getfilesystemencoding()):
|
||||
if playlist.name:
|
||||
name = self._sanitize_m3u_name(playlist.name, encoding)
|
||||
uri = path_to_local_playlist_uri(name.encode(encoding) + b'.m3u')
|
||||
path = local_playlist_uri_to_path(uri, self._playlists_dir)
|
||||
elif playlist.uri:
|
||||
uri = playlist.uri
|
||||
path = local_playlist_uri_to_path(uri, self._playlists_dir)
|
||||
name, _ = os.path.splitext(os.path.basename(path).decode(encoding))
|
||||
else:
|
||||
raise ValueError('M3U playlist needs name or URI')
|
||||
extended = any(track.name for track in playlist.tracks)
|
||||
with open(file_path, 'w') as file_handle:
|
||||
|
||||
with open(path, 'w') as file_handle:
|
||||
if extended:
|
||||
file_handle.write('#EXTM3U\n')
|
||||
for track in playlist.tracks:
|
||||
@ -106,17 +131,5 @@ class LocalPlaylistsProvider(backend.PlaylistsProvider):
|
||||
self._write_m3u_extinf(file_handle, track)
|
||||
file_handle.write(track.uri + '\n')
|
||||
|
||||
def _delete_m3u(self, uri):
|
||||
file_path = self._m3u_uri_to_path(uri)
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
|
||||
def _rename_m3u(self, playlist):
|
||||
dst_name = formatting.slugify(playlist.name)
|
||||
dst_uri = 'local:playlist:%s.m3u' % dst_name
|
||||
|
||||
src_file_path = self._m3u_uri_to_path(playlist.uri)
|
||||
dst_file_path = self._m3u_uri_to_path(dst_uri)
|
||||
|
||||
shutil.move(src_file_path, dst_file_path)
|
||||
return playlist.copy(uri=dst_uri)
|
||||
# assert playlist name matches file name/uri
|
||||
return playlist.copy(uri=uri, name=name)
|
||||
|
||||
@ -37,8 +37,15 @@ def local_track_uri_to_path(uri, media_dir):
|
||||
return os.path.join(media_dir, file_path)
|
||||
|
||||
|
||||
def local_playlist_uri_to_path(uri, playlists_dir):
|
||||
if not uri.startswith('local:playlist:'):
|
||||
raise ValueError('Invalid URI %s' % uri)
|
||||
file_path = uri_to_path(uri).split(b':', 1)[1]
|
||||
return os.path.join(playlists_dir, file_path)
|
||||
|
||||
|
||||
def path_to_local_track_uri(relpath):
|
||||
"""Convert path releative to media_dir to local track URI."""
|
||||
"""Convert path relative to media_dir to local track URI."""
|
||||
if isinstance(relpath, compat.text_type):
|
||||
relpath = relpath.encode('utf-8')
|
||||
return b'local:track:%s' % urllib.quote(relpath)
|
||||
@ -51,6 +58,13 @@ def path_to_local_directory_uri(relpath):
|
||||
return b'local:directory:%s' % urllib.quote(relpath)
|
||||
|
||||
|
||||
def path_to_local_playlist_uri(relpath):
|
||||
"""Convert path relative to playlists_dir to local playlist URI."""
|
||||
if isinstance(relpath, compat.text_type):
|
||||
relpath = relpath.encode('utf-8')
|
||||
return b'local:playlist:%s' % urllib.quote(relpath)
|
||||
|
||||
|
||||
def m3u_extinf_to_track(line):
|
||||
"""Convert extended M3U directive to track template."""
|
||||
m = M3U_EXTINF_RE.match(line)
|
||||
|
||||
@ -9,6 +9,7 @@ import pykka
|
||||
|
||||
from mopidy import core
|
||||
from mopidy.local import actor
|
||||
from mopidy.local.translator import local_playlist_uri_to_path
|
||||
from mopidy.models import Playlist, Track
|
||||
|
||||
from tests import dummy_audio, path_to_data_dir
|
||||
@ -41,49 +42,50 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
shutil.rmtree(self.playlists_dir)
|
||||
|
||||
def test_created_playlist_is_persisted(self):
|
||||
path = os.path.join(self.playlists_dir, 'test.m3u')
|
||||
uri = 'local:playlist:test.m3u'
|
||||
path = local_playlist_uri_to_path(uri, self.playlists_dir)
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
self.core.playlists.create('test')
|
||||
playlist = self.core.playlists.create('test')
|
||||
self.assertEqual('test', playlist.name)
|
||||
self.assertEqual(uri, playlist.uri)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
def test_create_slugifies_playlist_name(self):
|
||||
path = os.path.join(self.playlists_dir, 'test-foo-bar.m3u')
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
playlist = self.core.playlists.create('test FOO baR')
|
||||
self.assertEqual('test-foo-bar', playlist.name)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
def test_create_slugifies_names_which_tries_to_change_directory(self):
|
||||
path = os.path.join(self.playlists_dir, 'test-foo-bar.m3u')
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
def test_create_sanitizes_playlist_name(self):
|
||||
playlist = self.core.playlists.create('../../test FOO baR')
|
||||
self.assertEqual('test-foo-bar', playlist.name)
|
||||
self.assertEqual('test FOO baR', playlist.name)
|
||||
path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir)
|
||||
self.assertEqual(self.playlists_dir, os.path.dirname(path))
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
def test_saved_playlist_is_persisted(self):
|
||||
path1 = os.path.join(self.playlists_dir, 'test1.m3u')
|
||||
path2 = os.path.join(self.playlists_dir, 'test2-foo-bar.m3u')
|
||||
uri1 = 'local:playlist:test1.m3u'
|
||||
uri2 = 'local:playlist:test2.m3u'
|
||||
|
||||
path1 = local_playlist_uri_to_path(uri1, self.playlists_dir)
|
||||
path2 = local_playlist_uri_to_path(uri2, self.playlists_dir)
|
||||
|
||||
playlist = self.core.playlists.create('test1')
|
||||
|
||||
self.assertEqual('test1', playlist.name)
|
||||
self.assertEqual(uri1, playlist.uri)
|
||||
self.assertTrue(os.path.exists(path1))
|
||||
self.assertFalse(os.path.exists(path2))
|
||||
|
||||
playlist = playlist.copy(name='test2 FOO baR')
|
||||
playlist = self.core.playlists.save(playlist)
|
||||
|
||||
self.assertEqual('test2-foo-bar', playlist.name)
|
||||
playlist = self.core.playlists.save(playlist.copy(name='test2'))
|
||||
self.assertEqual('test2', playlist.name)
|
||||
self.assertEqual(uri2, playlist.uri)
|
||||
self.assertFalse(os.path.exists(path1))
|
||||
self.assertTrue(os.path.exists(path2))
|
||||
|
||||
def test_deleted_playlist_is_removed(self):
|
||||
path = os.path.join(self.playlists_dir, 'test.m3u')
|
||||
uri = 'local:playlist:test.m3u'
|
||||
path = local_playlist_uri_to_path(uri, self.playlists_dir)
|
||||
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
playlist = self.core.playlists.create('test')
|
||||
self.assertEqual('test', playlist.name)
|
||||
self.assertEqual(uri, playlist.uri)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
self.core.playlists.delete(playlist.uri)
|
||||
@ -92,24 +94,22 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
def test_playlist_contents_is_written_to_disk(self):
|
||||
track = Track(uri=generate_song(1))
|
||||
playlist = self.core.playlists.create('test')
|
||||
playlist_path = os.path.join(self.playlists_dir, 'test.m3u')
|
||||
playlist = playlist.copy(tracks=[track])
|
||||
playlist = self.core.playlists.save(playlist)
|
||||
playlist = self.core.playlists.save(playlist.copy(tracks=[track]))
|
||||
path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir)
|
||||
|
||||
with open(playlist_path) as playlist_file:
|
||||
contents = playlist_file.read()
|
||||
with open(path) as f:
|
||||
contents = f.read()
|
||||
|
||||
self.assertEqual(track.uri, contents.strip())
|
||||
|
||||
def test_extended_playlist_contents_is_written_to_disk(self):
|
||||
track = Track(uri=generate_song(1), name='Test', length=60000)
|
||||
playlist = self.core.playlists.create('test')
|
||||
playlist_path = os.path.join(self.playlists_dir, 'test.m3u')
|
||||
playlist = playlist.copy(tracks=[track])
|
||||
playlist = self.core.playlists.save(playlist)
|
||||
playlist = self.core.playlists.save(playlist.copy(tracks=[track]))
|
||||
path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir)
|
||||
|
||||
with open(playlist_path) as playlist_file:
|
||||
contents = playlist_file.read().splitlines()
|
||||
with open(path) as f:
|
||||
contents = f.read().splitlines()
|
||||
|
||||
self.assertEqual(contents, ['#EXTM3U', '#EXTINF:60,Test', track.uri])
|
||||
|
||||
@ -123,7 +123,7 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
|
||||
self.assert_(backend.playlists.playlists)
|
||||
self.assertEqual(
|
||||
'local:playlist:test', backend.playlists.playlists[0].uri)
|
||||
playlist.uri, backend.playlists.playlists[0].uri)
|
||||
self.assertEqual(
|
||||
playlist.name, backend.playlists.playlists[0].name)
|
||||
self.assertEqual(
|
||||
@ -154,7 +154,7 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
self.assert_(not self.core.playlists.playlists)
|
||||
|
||||
def test_delete_non_existant_playlist(self):
|
||||
self.core.playlists.delete('file:///unknown/playlist')
|
||||
self.core.playlists.delete('local:playlist:unknown')
|
||||
|
||||
def test_delete_playlist_removes_it_from_the_collection(self):
|
||||
playlist = self.core.playlists.create('test')
|
||||
@ -164,6 +164,19 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
|
||||
self.assertNotIn(playlist, self.core.playlists.playlists)
|
||||
|
||||
def test_delete_playlist_without_file(self):
|
||||
playlist = self.core.playlists.create('test')
|
||||
self.assertIn(playlist, self.core.playlists.playlists)
|
||||
|
||||
path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
os.remove(path)
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
self.core.playlists.delete(playlist.uri)
|
||||
self.assertNotIn(playlist, self.core.playlists.playlists)
|
||||
|
||||
def test_filter_without_criteria(self):
|
||||
self.assertEqual(
|
||||
self.core.playlists.playlists, self.core.playlists.filter())
|
||||
@ -201,9 +214,13 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
|
||||
self.assertEqual(original_playlist, looked_up_playlist)
|
||||
|
||||
@unittest.SkipTest
|
||||
def test_refresh(self):
|
||||
pass
|
||||
playlist = self.core.playlists.create('test')
|
||||
self.assertIn(playlist, self.core.playlists.playlists)
|
||||
|
||||
self.core.playlists.refresh()
|
||||
|
||||
self.assertIn(playlist, self.core.playlists.playlists)
|
||||
|
||||
def test_save_replaces_existing_playlist_with_updated_playlist(self):
|
||||
playlist1 = self.core.playlists.create('test1')
|
||||
@ -214,6 +231,27 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
self.assertNotIn(playlist1, self.core.playlists.playlists)
|
||||
self.assertIn(playlist2, self.core.playlists.playlists)
|
||||
|
||||
def test_create_replaces_existing_playlist_with_updated_playlist(self):
|
||||
track = Track(uri=generate_song(1))
|
||||
playlist1 = self.core.playlists.create('test')
|
||||
playlist1 = self.core.playlists.save(playlist1.copy(tracks=[track]))
|
||||
self.assertIn(playlist1, self.core.playlists.playlists)
|
||||
|
||||
playlist2 = self.core.playlists.create('test')
|
||||
self.assertEqual(playlist1.uri, playlist2.uri)
|
||||
self.assertNotIn(playlist1, self.core.playlists.playlists)
|
||||
self.assertIn(playlist2, self.core.playlists.playlists)
|
||||
|
||||
def test_save_playlist_with_new_uri(self):
|
||||
# you *should* not do this
|
||||
uri = 'local:playlist:test.m3u'
|
||||
playlist = self.core.playlists.save(Playlist(uri=uri))
|
||||
self.assertIn(playlist, self.core.playlists.playlists)
|
||||
self.assertEqual(uri, playlist.uri)
|
||||
self.assertEqual('test', playlist.name)
|
||||
path = local_playlist_uri_to_path(playlist.uri, self.playlists_dir)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
def test_playlist_with_unknown_track(self):
|
||||
track = Track(uri='file:///dev/null')
|
||||
playlist = self.core.playlists.create('test')
|
||||
@ -224,7 +262,7 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
|
||||
self.assert_(backend.playlists.playlists)
|
||||
self.assertEqual(
|
||||
'local:playlist:test', backend.playlists.playlists[0].uri)
|
||||
'local:playlist:test.m3u', backend.playlists.playlists[0].uri)
|
||||
self.assertEqual(
|
||||
playlist.name, backend.playlists.playlists[0].name)
|
||||
self.assertEqual(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user