basic EXTM3U playlist support
This commit is contained in:
parent
e90c3eaf19
commit
a5c02c1034
@ -6,7 +6,7 @@ import os
|
||||
import shutil
|
||||
|
||||
from mopidy.backends import base, listener
|
||||
from mopidy.models import Playlist, Track
|
||||
from mopidy.models import Playlist
|
||||
from mopidy.utils import formatting, path
|
||||
|
||||
from .translator import parse_m3u
|
||||
@ -50,9 +50,8 @@ class LocalPlaylistsProvider(base.BasePlaylistsProvider):
|
||||
uri = 'local:playlist:%s' % name
|
||||
|
||||
tracks = []
|
||||
for track_uri in parse_m3u(m3u, self._media_dir):
|
||||
# TODO: switch to having playlists being a list of uris
|
||||
tracks.append(Track(uri=track_uri))
|
||||
for track in parse_m3u(m3u, self._media_dir):
|
||||
tracks.append(track)
|
||||
|
||||
playlist = Playlist(uri=uri, name=name, tracks=tracks)
|
||||
playlists.append(playlist)
|
||||
|
||||
@ -2,12 +2,16 @@ from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import urlparse
|
||||
import urllib
|
||||
|
||||
from mopidy.models import Track
|
||||
from mopidy.utils.encoding import locale_decode
|
||||
from mopidy.utils.path import path_to_uri, uri_to_path
|
||||
|
||||
EXTINF_RE = re.compile(r'^#EXTINF:\s*(-1|\d+)\s*,\s*(.+?)\s*$')
|
||||
|
||||
logger = logging.getLogger('mopidy.backends.local')
|
||||
|
||||
|
||||
@ -29,9 +33,22 @@ def path_to_local_track_uri(relpath):
|
||||
return b'local:track:%s' % urllib.quote(relpath)
|
||||
|
||||
|
||||
def extm3u_directive_to_track(line):
|
||||
"""Convert extended M3U directive to track template."""
|
||||
m = EXTINF_RE.match(line)
|
||||
if not m:
|
||||
logger.warning('Invalid extended M3U directive: %s', line)
|
||||
return Track()
|
||||
(runtime, title) = m.groups()
|
||||
if int(runtime) > 0:
|
||||
return Track(name=title, length=1000*int(runtime))
|
||||
else:
|
||||
return Track(name=title)
|
||||
|
||||
|
||||
def parse_m3u(file_path, media_dir):
|
||||
r"""
|
||||
Convert M3U file list of uris
|
||||
Convert M3U file list to list of tracks
|
||||
|
||||
Example M3U data::
|
||||
|
||||
@ -43,34 +60,50 @@ def parse_m3u(file_path, media_dir):
|
||||
http://www.example.com:8000/Listen.pls
|
||||
http://www.example.com/~user/Mine.mp3
|
||||
|
||||
Example extended M3U data::
|
||||
|
||||
#EXTM3U
|
||||
#EXTINF:123, Sample artist - Sample title
|
||||
Sample.mp3
|
||||
#EXTINF:321,Example Artist - Example title
|
||||
Greatest Hits\Example.ogg
|
||||
#EXTINF:-1,Radio XMP
|
||||
http://mp3stream.example.com:8000/
|
||||
|
||||
- Relative paths of songs should be with respect to location of M3U.
|
||||
- Paths are normaly platform specific.
|
||||
- Lines starting with # should be ignored.
|
||||
- Paths are normally platform specific.
|
||||
- Lines starting with # are ignored, except for extended M3U directives.
|
||||
- Track.name and Track.length are set from extended M3U directives.
|
||||
- m3u files are latin-1.
|
||||
- This function does not bother with Extended M3U directives.
|
||||
"""
|
||||
# TODO: uris as bytes
|
||||
uris = []
|
||||
tracks = []
|
||||
try:
|
||||
with open(file_path) as m3u:
|
||||
contents = m3u.readlines()
|
||||
except IOError as error:
|
||||
logger.warning('Couldn\'t open m3u: %s', locale_decode(error))
|
||||
return uris
|
||||
return tracks
|
||||
|
||||
extm3u = contents and contents[0].decode('latin1').startswith('#EXTM3U')
|
||||
|
||||
track = Track()
|
||||
for line in contents:
|
||||
line = line.strip().decode('latin1')
|
||||
|
||||
if line.startswith('#'):
|
||||
if extm3u and line.startswith('#EXTINF'):
|
||||
track = extm3u_directive_to_track(line)
|
||||
continue
|
||||
|
||||
if urlparse.urlsplit(line).scheme:
|
||||
uris.append(line)
|
||||
tracks.append(track.copy(uri=line))
|
||||
elif os.path.normpath(line) == os.path.abspath(line):
|
||||
path = path_to_uri(line)
|
||||
uris.append(path)
|
||||
tracks.append(track.copy(uri=path))
|
||||
else:
|
||||
path = path_to_uri(os.path.join(media_dir, line))
|
||||
uris.append(path)
|
||||
tracks.append(track.copy(uri=path))
|
||||
|
||||
return uris
|
||||
track = Track()
|
||||
return tracks
|
||||
|
||||
@ -7,6 +7,7 @@ import tempfile
|
||||
import unittest
|
||||
|
||||
from mopidy.backends.local.translator import parse_m3u
|
||||
from mopidy.models import Track
|
||||
from mopidy.utils.path import path_to_uri
|
||||
|
||||
from tests import path_to_data_dir
|
||||
@ -18,29 +19,32 @@ encoded_path = path_to_data_dir('æøå.mp3')
|
||||
song1_uri = path_to_uri(song1_path)
|
||||
song2_uri = path_to_uri(song2_path)
|
||||
encoded_uri = path_to_uri(encoded_path)
|
||||
song1_track = Track(uri=song1_uri)
|
||||
song2_track = Track(uri=song2_uri)
|
||||
encoded_track = Track(uri=encoded_uri)
|
||||
|
||||
|
||||
# FIXME use mock instead of tempfile.NamedTemporaryFile
|
||||
|
||||
|
||||
class M3UToUriTest(unittest.TestCase):
|
||||
def test_empty_file(self):
|
||||
uris = parse_m3u(path_to_data_dir('empty.m3u'), data_dir)
|
||||
self.assertEqual([], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('empty.m3u'), data_dir)
|
||||
self.assertEqual([], tracks)
|
||||
|
||||
def test_basic_file(self):
|
||||
uris = parse_m3u(path_to_data_dir('one.m3u'), data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('one.m3u'), data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
|
||||
def test_file_with_comment(self):
|
||||
uris = parse_m3u(path_to_data_dir('comment.m3u'), data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('comment.m3u'), data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
|
||||
def test_file_is_relative_to_correct_dir(self):
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||
tmp.write('song1.mp3')
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
@ -49,8 +53,8 @@ class M3UToUriTest(unittest.TestCase):
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||
tmp.write(song1_path)
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
@ -61,8 +65,8 @@ class M3UToUriTest(unittest.TestCase):
|
||||
tmp.write('# comment \n')
|
||||
tmp.write(song2_path)
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri, song2_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track, song2_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
@ -71,19 +75,19 @@ class M3UToUriTest(unittest.TestCase):
|
||||
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||
tmp.write(song1_uri)
|
||||
try:
|
||||
uris = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_uri], uris)
|
||||
tracks = parse_m3u(tmp.name, data_dir)
|
||||
self.assertEqual([song1_track], tracks)
|
||||
finally:
|
||||
if os.path.exists(tmp.name):
|
||||
os.remove(tmp.name)
|
||||
|
||||
def test_encoding_is_latin1(self):
|
||||
uris = parse_m3u(path_to_data_dir('encoding.m3u'), data_dir)
|
||||
self.assertEqual([encoded_uri], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('encoding.m3u'), data_dir)
|
||||
self.assertEqual([encoded_track], tracks)
|
||||
|
||||
def test_open_missing_file(self):
|
||||
uris = parse_m3u(path_to_data_dir('non-existant.m3u'), data_dir)
|
||||
self.assertEqual([], uris)
|
||||
tracks = parse_m3u(path_to_data_dir('non-existant.m3u'), data_dir)
|
||||
self.assertEqual([], tracks)
|
||||
|
||||
|
||||
class URItoM3UTest(unittest.TestCase):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user