parent
fe8d6aa4e8
commit
67d4dac862
@ -1,6 +1,5 @@
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
|
||||
import copy
|
||||
import glob
|
||||
import logging
|
||||
import operator
|
||||
@ -20,65 +19,50 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
|
||||
super(M3UPlaylistsProvider, self).__init__(*args, **kwargs)
|
||||
|
||||
self._playlists_dir = self.backend._config['m3u']['playlists_dir']
|
||||
self._playlists = []
|
||||
self._playlists = {}
|
||||
self.refresh()
|
||||
|
||||
@property
|
||||
def playlists(self):
|
||||
return copy.copy(self._playlists)
|
||||
return sorted(
|
||||
self._playlists.values(), key=operator.attrgetter('name'))
|
||||
|
||||
@playlists.setter
|
||||
def playlists(self, playlists):
|
||||
self._playlists = playlists
|
||||
self._playlists = {playlist.uri: playlist for playlist in playlists}
|
||||
|
||||
def create(self, name):
|
||||
playlist = self._save_m3u(Playlist(name=name))
|
||||
old_playlist = self.lookup(playlist.uri)
|
||||
if old_playlist is not None:
|
||||
index = self._playlists.index(old_playlist)
|
||||
self._playlists[index] = playlist
|
||||
else:
|
||||
self._playlists.append(playlist)
|
||||
self._playlists.sort(key=operator.attrgetter('name'))
|
||||
self._playlists[playlist.uri] = playlist
|
||||
logger.info('Created playlist %s', playlist.uri)
|
||||
return playlist
|
||||
|
||||
def delete(self, uri):
|
||||
playlist = self.lookup(uri)
|
||||
if not playlist:
|
||||
logger.warn('Trying to delete unknown playlist %s', uri)
|
||||
return
|
||||
path = translator.playlist_uri_to_path(uri, self._playlists_dir)
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
if uri in self._playlists:
|
||||
path = translator.playlist_uri_to_path(uri, self._playlists_dir)
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
else:
|
||||
logger.warn('Trying to delete missing playlist file %s', path)
|
||||
del self._playlists[uri]
|
||||
else:
|
||||
logger.warn('Trying to delete missing playlist file %s', path)
|
||||
self._playlists.remove(playlist)
|
||||
logger.warn('Trying to delete unknown playlist %s', uri)
|
||||
|
||||
def lookup(self, uri):
|
||||
# TODO: store as {uri: playlist} when get_playlists() gets
|
||||
# implemented
|
||||
for playlist in self._playlists:
|
||||
if playlist.uri == uri:
|
||||
return playlist
|
||||
return self._playlists.get(uri)
|
||||
|
||||
def refresh(self):
|
||||
playlists = []
|
||||
playlists = {}
|
||||
|
||||
encoding = sys.getfilesystemencoding()
|
||||
for path in glob.glob(os.path.join(self._playlists_dir, b'*.m3u')):
|
||||
relpath = os.path.basename(path)
|
||||
name = os.path.splitext(relpath)[0].decode(encoding)
|
||||
uri = translator.path_to_playlist_uri(relpath)
|
||||
name = os.path.splitext(relpath)[0].decode(encoding)
|
||||
tracks = translator.parse_m3u(path)
|
||||
playlists[uri] = Playlist(uri=uri, name=name, tracks=tracks)
|
||||
|
||||
tracks = []
|
||||
for track in translator.parse_m3u(path):
|
||||
tracks.append(track)
|
||||
|
||||
playlist = Playlist(uri=uri, name=name, tracks=tracks)
|
||||
playlists.append(playlist)
|
||||
|
||||
self.playlists = sorted(playlists, key=operator.attrgetter('name'))
|
||||
self._playlists = playlists
|
||||
|
||||
logger.info(
|
||||
'Loaded %d M3U playlists from %s',
|
||||
@ -86,28 +70,14 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
|
||||
|
||||
def save(self, playlist):
|
||||
assert playlist.uri, 'Cannot save playlist without URI'
|
||||
assert playlist.uri in self._playlists, \
|
||||
'Cannot save playlist with unknown URI: %s' % playlist.uri
|
||||
|
||||
uri = playlist.uri
|
||||
# TODO: require existing (created) playlist - currently, this
|
||||
# is a *should* in https://docs.mopidy.com/en/latest/api/core/
|
||||
try:
|
||||
index = self._playlists.index(self.lookup(uri))
|
||||
except ValueError:
|
||||
logger.warn('Saving playlist with new URI %s', uri)
|
||||
index = -1
|
||||
|
||||
original_uri = playlist.uri
|
||||
playlist = self._save_m3u(playlist)
|
||||
if index >= 0 and uri != playlist.uri:
|
||||
path = translator.playlist_uri_to_path(uri, self._playlists_dir)
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
else:
|
||||
logger.warn('Trying to delete missing playlist file %s', path)
|
||||
if index >= 0:
|
||||
self._playlists[index] = playlist
|
||||
else:
|
||||
self._playlists.append(playlist)
|
||||
self._playlists.sort(key=operator.attrgetter('name'))
|
||||
if playlist.uri != original_uri and original_uri in self._playlists:
|
||||
self.delete(original_uri)
|
||||
self._playlists[playlist.uri] = playlist
|
||||
return playlist
|
||||
|
||||
def _write_m3u_extinf(self, file_handle, track):
|
||||
|
||||
@ -192,14 +192,6 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
|
||||
self.backend.playlists.playlists = [Playlist(name='a'), playlist]
|
||||
self.assertEqual([playlist], self.core.playlists.filter(name='b'))
|
||||
|
||||
def test_filter_by_name_returns_multiple_matches(self):
|
||||
playlist = Playlist(name='b')
|
||||
self.backend.playlists.playlists = [
|
||||
playlist, Playlist(name='a'), Playlist(name='b')]
|
||||
playlists = self.core.playlists.filter(name='b')
|
||||
self.assertIn(playlist, playlists)
|
||||
self.assertEqual(2, len(playlists))
|
||||
|
||||
def test_filter_by_name_returns_no_matches(self):
|
||||
self.backend.playlists.playlists = [
|
||||
Playlist(name='a'), Playlist(name='b')]
|
||||
@ -241,14 +233,13 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
|
||||
self.assertIn(playlist2, self.core.playlists.playlists)
|
||||
|
||||
def test_save_playlist_with_new_uri(self):
|
||||
# you *should* not do this
|
||||
uri = 'm3u:test.m3u'
|
||||
playlist = self.core.playlists.save(Playlist(uri=uri))
|
||||
self.assertIn(playlist, self.core.playlists.playlists)
|
||||
self.assertEqual(uri, playlist.uri)
|
||||
self.assertEqual('test', playlist.name)
|
||||
path = playlist_uri_to_path(playlist.uri, self.playlists_dir)
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
with self.assertRaises(AssertionError):
|
||||
self.core.playlists.save(Playlist(uri=uri))
|
||||
|
||||
path = playlist_uri_to_path(uri, self.playlists_dir)
|
||||
self.assertFalse(os.path.exists(path))
|
||||
|
||||
def test_playlist_with_unknown_track(self):
|
||||
track = Track(uri='file:///dev/null')
|
||||
|
||||
Loading…
Reference in New Issue
Block a user