Merge branch 'feature/extm3u' of https://github.com/tkem/mopidy into develop
Conflicts: mopidy/backends/local/translator.py
This commit is contained in:
commit
c51fdc68a2
@ -6,7 +6,7 @@ import os
|
||||
import shutil
|
||||
|
||||
from mopidy.backends import base, listener
|
||||
from mopidy.models import Playlist, Track
|
||||
from mopidy.models import Playlist
|
||||
from mopidy.utils import formatting, path
|
||||
|
||||
from .translator import parse_m3u
|
||||
@ -50,9 +50,8 @@ class LocalPlaylistsProvider(base.BasePlaylistsProvider):
|
||||
uri = 'local:playlist:%s' % name
|
||||
|
||||
tracks = []
|
||||
for track_uri in parse_m3u(m3u, self._media_dir):
|
||||
# TODO: switch to having playlists being a list of uris
|
||||
tracks.append(Track(uri=track_uri))
|
||||
for track in parse_m3u(m3u, self._media_dir):
|
||||
tracks.append(track)
|
||||
|
||||
playlist = Playlist(uri=uri, name=name, tracks=tracks)
|
||||
playlists.append(playlist)
|
||||
@ -91,10 +90,20 @@ class LocalPlaylistsProvider(base.BasePlaylistsProvider):
|
||||
path.check_file_path_is_inside_base_dir(file_path, self._playlists_dir)
|
||||
return file_path
|
||||
|
||||
def _write_m3u_extinf(self, file_handle, track):
|
||||
title = track.name.encode('latin-1', 'replace')
|
||||
runtime = track.length / 1000 if track.length else -1
|
||||
file_handle.write('#EXTINF:' + str(runtime) + ',' + title + '\n')
|
||||
|
||||
def _save_m3u(self, playlist):
|
||||
file_path = self._m3u_uri_to_path(playlist.uri)
|
||||
extended = any(track.name for track in playlist.tracks)
|
||||
with open(file_path, 'w') as file_handle:
|
||||
if extended:
|
||||
file_handle.write('#EXTM3U\n')
|
||||
for track in playlist.tracks:
|
||||
if extended and track.name:
|
||||
self._write_m3u_extinf(file_handle, track)
|
||||
file_handle.write(track.uri + '\n')
|
||||
|
||||
def _delete_m3u(self, uri):
|
||||
|
||||
@ -2,12 +2,16 @@ from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import urlparse
|
||||
import urllib
|
||||
|
||||
from mopidy.models import Track
|
||||
from mopidy.utils.encoding import locale_decode
|
||||
from mopidy.utils.path import path_to_uri, uri_to_path
|
||||
|
||||
M3U_EXTINF_RE = re.compile(r'#EXTINF:(-1|\d+),(.*)')
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -29,9 +33,22 @@ def path_to_local_track_uri(relpath):
|
||||
return b'local:track:%s' % urllib.quote(relpath)
|
||||
|
||||
|
||||
def m3u_extinf_to_track(line):
|
||||
"""Convert extended M3U directive to track template."""
|
||||
m = M3U_EXTINF_RE.match(line)
|
||||
if not m:
|
||||
logger.warning('Invalid extended M3U directive: %s', line)
|
||||
return Track()
|
||||
(runtime, title) = m.groups()
|
||||
if int(runtime) > 0:
|
||||
return Track(name=title, length=1000*int(runtime))
|
||||
else:
|
||||
return Track(name=title)
|
||||
|
||||
|
||||
def parse_m3u(file_path, media_dir):
|
||||
r"""
|
||||
Convert M3U file list of uris
|
||||
Convert M3U file list to list of tracks
|
||||
|
||||
Example M3U data::
|
||||
|
||||
@ -43,34 +60,53 @@ def parse_m3u(file_path, media_dir):
|
||||
http://www.example.com:8000/Listen.pls
|
||||
http://www.example.com/~user/Mine.mp3
|
||||
|
||||
Example extended M3U data::
|
||||
|
||||
#EXTM3U
|
||||
#EXTINF:123, Sample artist - Sample title
|
||||
Sample.mp3
|
||||
#EXTINF:321,Example Artist - Example title
|
||||
Greatest Hits\Example.ogg
|
||||
#EXTINF:-1,Radio XMP
|
||||
http://mp3stream.example.com:8000/
|
||||
|
||||
- Relative paths of songs should be with respect to location of M3U.
|
||||
- Paths are normaly platform specific.
|
||||
- Lines starting with # should be ignored.
|
||||
- Paths are normally platform specific.
|
||||
- Lines starting with # are ignored, except for extended M3U directives.
|
||||
- Track.name and Track.length are set from extended M3U directives.
|
||||
- m3u files are latin-1.
|
||||
- This function does not bother with Extended M3U directives.
|
||||
"""
|
||||
# TODO: uris as bytes
|
||||
uris = []
|
||||
tracks = []
|
||||
try:
|
||||
with open(file_path) as m3u:
|
||||
contents = m3u.readlines()
|
||||
except IOError as error:
|
||||
logger.warning('Couldn\'t open m3u: %s', locale_decode(error))
|
||||
return uris
|
||||
return tracks
|
||||
|
||||
if not contents:
|
||||
return tracks
|
||||
|
||||
extended = contents[0].decode('latin1').startswith('#EXTM3U')
|
||||
|
||||
track = Track()
|
||||
for line in contents:
|
||||
line = line.strip().decode('latin1')
|
||||
|
||||
if line.startswith('#'):
|
||||
if extended and line.startswith('#EXTINF'):
|
||||
track = m3u_extinf_to_track(line)
|
||||
continue
|
||||
|
||||
if urlparse.urlsplit(line).scheme:
|
||||
uris.append(line)
|
||||
tracks.append(track.copy(uri=line))
|
||||
elif os.path.normpath(line) == os.path.abspath(line):
|
||||
path = path_to_uri(line)
|
||||
uris.append(path)
|
||||
tracks.append(track.copy(uri=path))
|
||||
else:
|
||||
path = path_to_uri(os.path.join(media_dir, line))
|
||||
uris.append(path)
|
||||
tracks.append(track.copy(uri=path))
|
||||
|
||||
return uris
|
||||
track = Track()
|
||||
return tracks
|
||||
|
||||
@ -101,6 +101,18 @@ class LocalPlaylistsProviderTest(unittest.TestCase):
|
||||
|
||||
self.assertEqual(track.uri, contents.strip())
|
||||
|
||||
def test_extended_playlist_contents_is_written_to_disk(self):
|
||||
track = Track(uri=generate_song(1), name='Test', length=60000)
|
||||
playlist = self.core.playlists.create('test')
|
||||
playlist_path = os.path.join(self.playlists_dir, 'test.m3u')
|
||||
playlist = playlist.copy(tracks=[track])
|
||||
playlist = self.core.playlists.save(playlist)
|
||||
|
||||
with open(playlist_path) as playlist_file:
|
||||
contents = playlist_file.read().splitlines()
|
||||
|
||||
self.assertEqual(contents, ['#EXTM3U', '#EXTINF:60,Test', track.uri])
|
||||
|
||||
def test_playlists_are_loaded_at_startup(self):
|
||||
track = Track(uri='local:track:path2')
|
||||
playlist = self.core.playlists.create('test')
|
||||
|
||||
@ -7,6 +7,7 @@ import tempfile
|
||||
import unittest
|
||||
|
||||
from mopidy.backends.local.translator import parse_m3u
|
||||
from mopidy.models import Track
|
||||
from mopidy.utils.path import path_to_uri
|
||||
|
||||
from tests import path_to_data_dir
|
||||
@ -18,29 +19,35 @@ encoded_path = path_to_data_dir('æøå.mp3')
|
||||
song1_uri = path_to_uri(song1_path)
|
||||
song2_uri = path_to_uri(song2_path)
|
||||
encoded_uri = path_to_uri(encoded_path)
|
||||
song1_track = Track(uri=song1_uri)
|
||||
song2_track = Track(uri=song2_uri)
|
||||
encoded_track = Track(uri=encoded_uri)
|
||||
song1_ext_track = song1_track.copy(name='song1')
|
||||
song2_ext_track = song2_track.copy(name='song2', length=60000)
|
||||
encoded_ext_track = encoded_track.copy(name='æøå')
|
||||
|
||||
|
||||
# FIXME use mock instead of tempfile.NamedTemporaryFile
|
||||
|
||||
|
||||
class M3UToUriTest(unittest.TestCase):
|
||||
def test_empty_file(self):
|
||||
uris = parse_m3u(path_to_data_dir('empty.m3u'), data_dir)
|
||||
self.assertEqual([], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('empty.m3u'), data_dir)
|
||||
self.assertEqual([], tracks)
|
||||
|
||||
def test_basic_file(self):
|
||||
uris = parse_m3u(path_to_data_dir('one.m3u'), data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('one.m3u'), data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
|
||||
def test_file_with_comment(self):
|
||||
uris = parse_m3u(path_to_data_dir('comment.m3u'), data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('comment.m3u'), data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
|
||||
def test_file_is_relative_to_correct_dir(self):
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||
tmp.write('song1.mp3')
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
@ -49,8 +56,8 @@ class M3UToUriTest(unittest.TestCase):
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||
tmp.write(song1_path)
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
@ -61,8 +68,8 @@ class M3UToUriTest(unittest.TestCase):
|
||||
tmp.write('# comment \n')
|
||||
tmp.write(song2_path)
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri, song2_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track, song2_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
@ -71,19 +78,39 @@ class M3UToUriTest(unittest.TestCase):
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||
tmp.write(song1_uri)
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
|
||||
def test_encoding_is_latin1(self):
|
||||
uris = parse_m3u(path_to_data_dir('encoding.m3u'), data_dir)
|
||||
self.assertEqual([encoded_uri], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('encoding.m3u'), data_dir)
|
||||
self.assertEqual([encoded_track], tracks)
|
||||
|
||||
def test_open_missing_file(self):
|
||||
uris = parse_m3u(path_to_data_dir('non-existant.m3u'), data_dir)
|
||||
self.assertEqual([], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('non-existant.m3u'), data_dir)
|
||||
self.assertEqual([], tracks)
|
||||
|
||||
def test_empty_ext_file(self):
|
||||
tracks = parse_m3u(path_to_data_dir('empty-ext.m3u'), data_dir)
|
||||
self.assertEqual([], tracks)
|
||||
|
||||
def test_basic_ext_file(self):
|
||||
tracks = parse_m3u(path_to_data_dir('one-ext.m3u'), data_dir)
|
||||
self.assertEqual([song1_ext_track], tracks)
|
||||
|
||||
def test_multi_ext_file(self):
|
||||
tracks = parse_m3u(path_to_data_dir('two-ext.m3u'), data_dir)
|
||||
self.assertEqual([song1_ext_track, song2_ext_track], tracks)
|
||||
|
||||
def test_ext_file_with_comment(self):
|
||||
tracks = parse_m3u(path_to_data_dir('comment-ext.m3u'), data_dir)
|
||||
self.assertEqual([song1_ext_track], tracks)
|
||||
|
||||
def test_ext_encoding_is_latin1(self):
|
||||
tracks = parse_m3u(path_to_data_dir('encoding-ext.m3u'), data_dir)
|
||||
self.assertEqual([encoded_ext_track], tracks)
|
||||
|
||||
|
||||
class URItoM3UTest(unittest.TestCase):
|
||||
|
||||
5
tests/data/comment-ext.m3u
Normal file
5
tests/data/comment-ext.m3u
Normal file
@ -0,0 +1,5 @@
|
||||
#EXTM3U
|
||||
# test
|
||||
#EXTINF:-1,song1
|
||||
# test
|
||||
song1.mp3
|
||||
1
tests/data/empty-ext.m3u
Normal file
1
tests/data/empty-ext.m3u
Normal file
@ -0,0 +1 @@
|
||||
#EXTM3U
|
||||
3
tests/data/encoding-ext.m3u
Normal file
3
tests/data/encoding-ext.m3u
Normal file
@ -0,0 +1,3 @@
|
||||
#EXTM3U
|
||||
#EXTINF:-1,<2C><><EFBFBD>
|
||||
<EFBFBD><EFBFBD><EFBFBD>.mp3
|
||||
3
tests/data/one-ext.m3u
Normal file
3
tests/data/one-ext.m3u
Normal file
@ -0,0 +1,3 @@
|
||||
#EXTM3U
|
||||
#EXTINF:-1,song1
|
||||
song1.mp3
|
||||
5
tests/data/two-ext.m3u
Normal file
5
tests/data/two-ext.m3u
Normal file
@ -0,0 +1,5 @@
|
||||
#EXTM3U
|
||||
#EXTINF:-1,song1
|
||||
song1.mp3
|
||||
#EXTINF:60,song2
|
||||
song2.mp3
|
||||
Loading…
Reference in New Issue
Block a user