Merge remote branch 'adamcik/gstreamer'
This commit is contained in:
commit
3047aeb14a
@ -10,14 +10,14 @@ sys.path.insert(0,
|
|||||||
|
|
||||||
from mopidy import get_version, settings, SettingsError
|
from mopidy import get_version, settings, SettingsError
|
||||||
from mopidy.process import CoreProcess
|
from mopidy.process import CoreProcess
|
||||||
from mopidy.utils import get_class, get_or_create_dotdir
|
from mopidy.utils import get_class, get_or_create_folder
|
||||||
|
|
||||||
logger = logging.getLogger('mopidy.main')
|
logger = logging.getLogger('mopidy.main')
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
options = _parse_options()
|
options = _parse_options()
|
||||||
_setup_logging(options.verbosity_level)
|
_setup_logging(options.verbosity_level)
|
||||||
get_or_create_dotdir('~/.mopidy/')
|
get_or_create_folder('~/.mopidy/')
|
||||||
core_queue = multiprocessing.Queue()
|
core_queue = multiprocessing.Queue()
|
||||||
get_class(settings.SERVER)(core_queue)
|
get_class(settings.SERVER)(core_queue)
|
||||||
core = CoreProcess(core_queue)
|
core = CoreProcess(core_queue)
|
||||||
|
|||||||
@ -188,7 +188,7 @@ class GStreamerLibraryController(BaseLibraryController):
|
|||||||
self.refresh()
|
self.refresh()
|
||||||
|
|
||||||
def refresh(self, uri=None):
|
def refresh(self, uri=None):
|
||||||
tracks, artists, albums = parse_mpd_tag_cache(settings.TAG_CACHE,
|
tracks = parse_mpd_tag_cache(settings.TAG_CACHE,
|
||||||
settings.MUSIC_FOLDER)
|
settings.MUSIC_FOLDER)
|
||||||
|
|
||||||
for track in tracks:
|
for track in tracks:
|
||||||
@ -223,6 +223,7 @@ class GStreamerLibraryController(BaseLibraryController):
|
|||||||
q = query.strip().lower()
|
q = query.strip().lower()
|
||||||
library_tracks = self._uri_mapping.values()
|
library_tracks = self._uri_mapping.values()
|
||||||
|
|
||||||
|
# FIXME this is bound to be slow for large libraries
|
||||||
track_filter = lambda t: q in t.name.lower()
|
track_filter = lambda t: q in t.name.lower()
|
||||||
album_filter = lambda t: q in getattr(t, 'album', Album()).name.lower()
|
album_filter = lambda t: q in getattr(t, 'album', Album()).name.lower()
|
||||||
artist_filter = lambda t: filter(lambda a: q in a.name.lower(),
|
artist_filter = lambda t: filter(lambda a: q in a.name.lower(),
|
||||||
|
|||||||
@ -2,6 +2,7 @@ import logging
|
|||||||
from multiprocessing.reduction import reduce_connection
|
from multiprocessing.reduction import reduce_connection
|
||||||
import os
|
import os
|
||||||
import pickle
|
import pickle
|
||||||
|
import sys
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
logger = logging.getLogger('mopidy.utils')
|
logger = logging.getLogger('mopidy.utils')
|
||||||
@ -25,12 +26,19 @@ def get_class(name):
|
|||||||
class_object = getattr(module, class_name)
|
class_object = getattr(module, class_name)
|
||||||
return class_object
|
return class_object
|
||||||
|
|
||||||
def get_or_create_dotdir(dotdir):
|
def get_or_create_folder(folder):
|
||||||
dotdir = os.path.expanduser(dotdir)
|
folder = os.path.expanduser(folder)
|
||||||
if not os.path.isdir(dotdir):
|
if not os.path.isdir(folder):
|
||||||
logger.info(u'Creating %s', dotdir)
|
logger.info(u'Creating %s', folder)
|
||||||
os.mkdir(dotdir, 0755)
|
os.mkdir(folder, 0755)
|
||||||
return dotdir
|
return folder
|
||||||
|
|
||||||
|
def path_to_uri(*paths):
|
||||||
|
path = os.path.join(*paths)
|
||||||
|
path = path.encode('utf-8')
|
||||||
|
if sys.platform == 'win32':
|
||||||
|
return 'file:' + urllib.pathname2url(path)
|
||||||
|
return 'file://' + urllib.pathname2url(path)
|
||||||
|
|
||||||
def indent(string, places=4, linebreak='\n'):
|
def indent(string, places=4, linebreak='\n'):
|
||||||
lines = string.split(linebreak)
|
lines = string.split(linebreak)
|
||||||
@ -136,9 +144,8 @@ def parse_m3u(file_path):
|
|||||||
if line.startswith('file://'):
|
if line.startswith('file://'):
|
||||||
uris.append(line)
|
uris.append(line)
|
||||||
else:
|
else:
|
||||||
path = os.path.join(folder, line)
|
path = path_to_uri(folder, line)
|
||||||
path = urllib.pathname2url(path.encode('utf-8'))
|
uris.append(path)
|
||||||
uris.append('file://' + path)
|
|
||||||
|
|
||||||
return uris
|
return uris
|
||||||
|
|
||||||
@ -147,15 +154,13 @@ def parse_mpd_tag_cache(tag_cache, music_dir=''):
|
|||||||
Converts a MPD tag_cache into a lists of tracks, artists and albums.
|
Converts a MPD tag_cache into a lists of tracks, artists and albums.
|
||||||
"""
|
"""
|
||||||
tracks = set()
|
tracks = set()
|
||||||
artists = set()
|
|
||||||
albums = set()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(tag_cache) as library:
|
with open(tag_cache) as library:
|
||||||
contents = library.read()
|
contents = library.read()
|
||||||
except IOError, e:
|
except IOError, e:
|
||||||
logger.error('Could not open tag cache: %s', e)
|
logger.error('Could not open tag cache: %s', e)
|
||||||
return tracks, artists, albums
|
return tracks
|
||||||
|
|
||||||
current = {}
|
current = {}
|
||||||
state = None
|
state = None
|
||||||
@ -173,16 +178,16 @@ def parse_mpd_tag_cache(tag_cache, music_dir=''):
|
|||||||
key, value = line.split(': ', 1)
|
key, value = line.split(': ', 1)
|
||||||
|
|
||||||
if key == 'key':
|
if key == 'key':
|
||||||
_convert_mpd_data(current, tracks, artists, albums, music_dir)
|
_convert_mpd_data(current, tracks, music_dir)
|
||||||
current.clear()
|
current.clear()
|
||||||
|
|
||||||
current[key.lower()] = value
|
current[key.lower()] = value
|
||||||
|
|
||||||
_convert_mpd_data(current, tracks, artists, albums, music_dir)
|
_convert_mpd_data(current, tracks, music_dir)
|
||||||
|
|
||||||
return tracks, artists, albums
|
return tracks
|
||||||
|
|
||||||
def _convert_mpd_data(data, tracks, artists, albums, music_dir):
|
def _convert_mpd_data(data, tracks, music_dir):
|
||||||
if not data:
|
if not data:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -195,14 +200,12 @@ def _convert_mpd_data(data, tracks, artists, albums, music_dir):
|
|||||||
|
|
||||||
if 'artist' in data:
|
if 'artist' in data:
|
||||||
artist = Artist(name=data['artist'])
|
artist = Artist(name=data['artist'])
|
||||||
artists.add(artist)
|
|
||||||
track_kwargs['artists'] = [artist]
|
track_kwargs['artists'] = [artist]
|
||||||
album_kwargs['artists'] = [artist]
|
album_kwargs['artists'] = [artist]
|
||||||
|
|
||||||
if 'album' in data:
|
if 'album' in data:
|
||||||
album_kwargs['name'] = data['album']
|
album_kwargs['name'] = data['album']
|
||||||
album = Album(**album_kwargs)
|
album = Album(**album_kwargs)
|
||||||
albums.add(album)
|
|
||||||
track_kwargs['album'] = album
|
track_kwargs['album'] = album
|
||||||
|
|
||||||
if 'title' in data:
|
if 'title' in data:
|
||||||
@ -213,7 +216,7 @@ def _convert_mpd_data(data, tracks, artists, albums, music_dir):
|
|||||||
else:
|
else:
|
||||||
path = os.path.join(music_dir, data['file'])
|
path = os.path.join(music_dir, data['file'])
|
||||||
|
|
||||||
track_kwargs['uri'] = 'file://' + urllib.pathname2url(path)
|
track_kwargs['uri'] = path_to_uri(path)
|
||||||
track_kwargs['length'] = int(data.get('time', 0)) * 1000
|
track_kwargs['length'] = int(data.get('time', 0)) * 1000
|
||||||
|
|
||||||
track = Track(**track_kwargs)
|
track = Track(**track_kwargs)
|
||||||
|
|||||||
@ -77,7 +77,8 @@ class BaseCurrentPlaylistControllerTest(object):
|
|||||||
|
|
||||||
@populate_playlist
|
@populate_playlist
|
||||||
def test_get_by_uri_raises_error_for_invalid_id(self):
|
def test_get_by_uri_raises_error_for_invalid_id(self):
|
||||||
self.assertRaises(LookupError, lambda: self.controller.get(uri='foobar'))
|
test = lambda: self.controller.get(uri='foobar')
|
||||||
|
self.assertRaises(LookupError, test)
|
||||||
|
|
||||||
@populate_playlist
|
@populate_playlist
|
||||||
def test_clear(self):
|
def test_clear(self):
|
||||||
@ -232,7 +233,6 @@ class BaseCurrentPlaylistControllerTest(object):
|
|||||||
|
|
||||||
@populate_playlist
|
@populate_playlist
|
||||||
def test_move_group_invalid_group(self):
|
def test_move_group_invalid_group(self):
|
||||||
tracks = len(self.controller.playlist.tracks)
|
|
||||||
test = lambda: self.controller.move(2, 1, 0)
|
test = lambda: self.controller.move(2, 1, 0)
|
||||||
self.assertRaises(AssertionError, test)
|
self.assertRaises(AssertionError, test)
|
||||||
|
|
||||||
@ -606,11 +606,11 @@ class BasePlaybackControllerTest(object):
|
|||||||
self.assertEqual(self.playback.current_playlist_position, None)
|
self.assertEqual(self.playback.current_playlist_position, None)
|
||||||
|
|
||||||
def test_new_playlist_loaded_callback_gets_called(self):
|
def test_new_playlist_loaded_callback_gets_called(self):
|
||||||
new_playlist_loaded_callback = self.playback.new_playlist_loaded_callback
|
callback = self.playback.new_playlist_loaded_callback
|
||||||
|
|
||||||
def wrapper():
|
def wrapper():
|
||||||
wrapper.called = True
|
wrapper.called = True
|
||||||
return new_playlist_loaded_callback()
|
return callback()
|
||||||
wrapper.called = False
|
wrapper.called = False
|
||||||
|
|
||||||
self.playback.new_playlist_loaded_callback = wrapper
|
self.playback.new_playlist_loaded_callback = wrapper
|
||||||
@ -628,7 +628,7 @@ class BasePlaybackControllerTest(object):
|
|||||||
event.set()
|
event.set()
|
||||||
return result
|
return result
|
||||||
|
|
||||||
self.playback.end_of_track_callback= wrapper
|
self.playback.end_of_track_callback = wrapper
|
||||||
|
|
||||||
self.playback.play()
|
self.playback.play()
|
||||||
self.playback.seek(self.tracks[0].length - 10)
|
self.playback.seek(self.tracks[0].length - 10)
|
||||||
@ -978,6 +978,7 @@ class BaseStoredPlaylistsControllerTest(object):
|
|||||||
def test_create_in_playlists(self):
|
def test_create_in_playlists(self):
|
||||||
playlist = self.stored.create('test')
|
playlist = self.stored.create('test')
|
||||||
self.assert_(self.stored.playlists)
|
self.assert_(self.stored.playlists)
|
||||||
|
self.assert_(playlist in self.stored.playlists)
|
||||||
|
|
||||||
def test_playlists_empty_to_start_with(self):
|
def test_playlists_empty_to_start_with(self):
|
||||||
self.assert_(not self.stored.playlists)
|
self.assert_(not self.stored.playlists)
|
||||||
@ -991,7 +992,7 @@ class BaseStoredPlaylistsControllerTest(object):
|
|||||||
self.assert_(not self.stored.playlists)
|
self.assert_(not self.stored.playlists)
|
||||||
|
|
||||||
def test_get_without_criteria(self):
|
def test_get_without_criteria(self):
|
||||||
test = lambda: self.stored.get()
|
test = self.stored.get
|
||||||
self.assertRaises(LookupError, test)
|
self.assertRaises(LookupError, test)
|
||||||
|
|
||||||
def test_get_with_wrong_cirteria(self):
|
def test_get_with_wrong_cirteria(self):
|
||||||
@ -1065,15 +1066,15 @@ class BaseStoredPlaylistsControllerTest(object):
|
|||||||
|
|
||||||
|
|
||||||
class BaseLibraryControllerTest(object):
|
class BaseLibraryControllerTest(object):
|
||||||
tracks = []
|
|
||||||
|
|
||||||
artists = [Artist(name='artist1'), Artist(name='artist2'), Artist()]
|
artists = [Artist(name='artist1'), Artist(name='artist2'), Artist()]
|
||||||
albums = [Album(name='album1', artists=artists[:1]),
|
albums = [Album(name='album1', artists=artists[:1]),
|
||||||
Album(name='album2', artists=artists[1:2]),
|
Album(name='album2', artists=artists[1:2]),
|
||||||
Album()]
|
Album()]
|
||||||
tracks = [Track(name='track1', length=4000, artists=artists[:1], album=albums[0], uri='file://' + data_folder('uri1')),
|
tracks = [Track(name='track1', length=4000, artists=artists[:1],
|
||||||
Track(name='track2', length=4000, artists=artists[1:2], album=albums[1], uri='file://' + data_folder('uri2')),
|
album=albums[0], uri='file://' + data_folder('uri1')),
|
||||||
Track()]
|
Track(name='track2', length=4000, artists=artists[1:2],
|
||||||
|
album=albums[1], uri='file://' + data_folder('uri2')),
|
||||||
|
Track()]
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.backend = self.backend_class(mixer=DummyMixer())
|
self.backend = self.backend_class(mixer=DummyMixer())
|
||||||
|
|||||||
@ -1,31 +1,35 @@
|
|||||||
import unittest
|
import unittest
|
||||||
import os
|
import os
|
||||||
import urllib
|
|
||||||
|
|
||||||
from mopidy import settings
|
from mopidy import settings
|
||||||
from mopidy.backends.gstreamer import GStreamerBackend
|
from mopidy.backends.gstreamer import GStreamerBackend
|
||||||
from mopidy.mixers.dummy import DummyMixer
|
from mopidy.mixers.dummy import DummyMixer
|
||||||
from mopidy.models import Playlist, Track
|
from mopidy.models import Playlist, Track
|
||||||
|
from mopidy.utils import path_to_uri
|
||||||
|
|
||||||
from tests.backends.base import *
|
from tests.backends.base import *
|
||||||
from tests import SkipTest, data_folder
|
from tests import SkipTest, data_folder
|
||||||
|
|
||||||
song = data_folder('song%s.wav')
|
song = data_folder('song%s.wav')
|
||||||
generate_song = lambda i: 'file://' + urllib.pathname2url(song % i)
|
generate_song = lambda i: path_to_uri(song % i)
|
||||||
|
|
||||||
# FIXME can be switched to generic test
|
# FIXME can be switched to generic test
|
||||||
class GStreamerCurrentPlaylistHandlerTest(BaseCurrentPlaylistControllerTest, unittest.TestCase):
|
class GStreamerCurrentPlaylistControllerTest(BaseCurrentPlaylistControllerTest,
|
||||||
tracks = [Track(uri=generate_song(i), id=i, length=4464) for i in range(1, 4)]
|
unittest.TestCase):
|
||||||
|
tracks = [Track(uri=generate_song(i), id=i, length=4464)
|
||||||
|
for i in range(1, 4)]
|
||||||
|
|
||||||
backend_class = GStreamerBackend
|
backend_class = GStreamerBackend
|
||||||
|
|
||||||
|
|
||||||
class GStreamerPlaybackControllerTest(BasePlaybackControllerTest, unittest.TestCase):
|
class GStreamerPlaybackControllerTest(BasePlaybackControllerTest,
|
||||||
tracks = [Track(uri=generate_song(i), id=i, length=4464) for i in range(1, 4)]
|
unittest.TestCase):
|
||||||
|
tracks = [Track(uri=generate_song(i), id=i, length=4464)
|
||||||
|
for i in range(1, 4)]
|
||||||
backend_class = GStreamerBackend
|
backend_class = GStreamerBackend
|
||||||
|
|
||||||
def add_track(self, file):
|
def add_track(self, path):
|
||||||
uri = 'file://' + urllib.pathname2url(data_folder(file))
|
uri = path_to_uri(data_folder(path))
|
||||||
track = Track(uri=uri, id=1, length=4464)
|
track = Track(uri=uri, id=1, length=4464)
|
||||||
self.backend.current_playlist.add(track)
|
self.backend.current_playlist.add(track)
|
||||||
|
|
||||||
@ -48,7 +52,7 @@ class GStreamerPlaybackControllerTest(BasePlaybackControllerTest, unittest.TestC
|
|||||||
self.assertEqual(self.playback.state, self.playback.PLAYING)
|
self.assertEqual(self.playback.state, self.playback.PLAYING)
|
||||||
|
|
||||||
|
|
||||||
class GStreamerBackendStoredPlaylistsControllerTest(BaseStoredPlaylistsControllerTest,
|
class GStreamerStoredPlaylistsControllerTest(BaseStoredPlaylistsControllerTest,
|
||||||
unittest.TestCase):
|
unittest.TestCase):
|
||||||
|
|
||||||
backend_class = GStreamerBackend
|
backend_class = GStreamerBackend
|
||||||
@ -68,8 +72,8 @@ class GStreamerBackendStoredPlaylistsControllerTest(BaseStoredPlaylistsControlle
|
|||||||
def test_deleted_playlist_get_removed(self):
|
def test_deleted_playlist_get_removed(self):
|
||||||
playlist = self.stored.create('test')
|
playlist = self.stored.create('test')
|
||||||
self.stored.delete(playlist)
|
self.stored.delete(playlist)
|
||||||
file = os.path.join(settings.PLAYLIST_FOLDER, 'test.m3u')
|
path = os.path.join(settings.PLAYLIST_FOLDER, 'test.m3u')
|
||||||
self.assert_(not os.path.exists(file))
|
self.assert_(not os.path.exists(path))
|
||||||
|
|
||||||
def test_renamed_playlist_gets_moved(self):
|
def test_renamed_playlist_gets_moved(self):
|
||||||
playlist = self.stored.create('test')
|
playlist = self.stored.create('test')
|
||||||
@ -84,20 +88,18 @@ class GStreamerBackendStoredPlaylistsControllerTest(BaseStoredPlaylistsControlle
|
|||||||
track = Track(uri=generate_song(1))
|
track = Track(uri=generate_song(1))
|
||||||
uri = track.uri[len('file://'):]
|
uri = track.uri[len('file://'):]
|
||||||
playlist = Playlist(tracks=[track], name='test')
|
playlist = Playlist(tracks=[track], name='test')
|
||||||
file_path = os.path.join(settings.PLAYLIST_FOLDER, 'test.m3u')
|
path = os.path.join(settings.PLAYLIST_FOLDER, 'test.m3u')
|
||||||
|
|
||||||
self.stored.save(playlist)
|
self.stored.save(playlist)
|
||||||
|
|
||||||
with open(file_path) as file:
|
with open(path) as playlist_file:
|
||||||
contents = file.read()
|
contents = playlist_file.read()
|
||||||
|
|
||||||
self.assertEqual(uri, contents.strip())
|
self.assertEqual(uri, contents.strip())
|
||||||
|
|
||||||
def test_playlists_are_loaded_at_startup(self):
|
def test_playlists_are_loaded_at_startup(self):
|
||||||
track = Track(uri=generate_song(1))
|
track = Track(uri=generate_song(1))
|
||||||
uri = track.uri[len('file://'):]
|
|
||||||
playlist = Playlist(tracks=[track], name='test')
|
playlist = Playlist(tracks=[track], name='test')
|
||||||
file_path = os.path.join(settings.PLAYLIST_FOLDER, 'test.m3u')
|
|
||||||
|
|
||||||
self.stored.save(playlist)
|
self.stored.save(playlist)
|
||||||
|
|
||||||
@ -122,7 +124,7 @@ class GStreamerBackendStoredPlaylistsControllerTest(BaseStoredPlaylistsControlle
|
|||||||
raise SkipTest
|
raise SkipTest
|
||||||
|
|
||||||
|
|
||||||
class GStreamerBackendLibraryControllerTest(BaseLibraryControllerTest,
|
class GStreamerLibraryControllerTest(BaseLibraryControllerTest,
|
||||||
unittest.TestCase):
|
unittest.TestCase):
|
||||||
|
|
||||||
backend_class = GStreamerBackend
|
backend_class = GStreamerBackend
|
||||||
@ -132,12 +134,12 @@ class GStreamerBackendLibraryControllerTest(BaseLibraryControllerTest,
|
|||||||
self.original_music_folder = settings.MUSIC_FOLDER
|
self.original_music_folder = settings.MUSIC_FOLDER
|
||||||
settings.TAG_CACHE = data_folder('library_tag_cache')
|
settings.TAG_CACHE = data_folder('library_tag_cache')
|
||||||
settings.MUSIC_FOLDER = data_folder('')
|
settings.MUSIC_FOLDER = data_folder('')
|
||||||
super(GStreamerBackendLibraryControllerTest, self).setUp()
|
super(GStreamerLibraryControllerTest, self).setUp()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
settings.TAG_CACHE = self.original_tag_cache
|
settings.TAG_CACHE = self.original_tag_cache
|
||||||
settings.MUSIC_FOLDER= self.original_music_folder
|
settings.MUSIC_FOLDER = self.original_music_folder
|
||||||
super(GStreamerBackendLibraryControllerTest, self).tearDown()
|
super(GStreamerLibraryControllerTest, self).tearDown()
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
@ -165,8 +165,10 @@ class AlbumTest(unittest.TestCase):
|
|||||||
self.assertNotEqual(hash(album1), hash(album2))
|
self.assertNotEqual(hash(album1), hash(album2))
|
||||||
|
|
||||||
def test_ne(self):
|
def test_ne(self):
|
||||||
album1 = Album(name=u'name1', uri=u'uri1', artists=[Artist(name=u'name1')], num_tracks=1)
|
album1 = Album(name=u'name1', uri=u'uri1',
|
||||||
album2 = Album(name=u'name2', uri=u'uri2', artists=[Artist(name=u'name2')], num_tracks=2)
|
artists=[Artist(name=u'name1')], num_tracks=1)
|
||||||
|
album2 = Album(name=u'name2', uri=u'uri2',
|
||||||
|
artists=[Artist(name=u'name2')], num_tracks=2)
|
||||||
self.assertNotEqual(album1, album2)
|
self.assertNotEqual(album1, album2)
|
||||||
self.assertNotEqual(hash(album1), hash(album2))
|
self.assertNotEqual(hash(album1), hash(album2))
|
||||||
|
|
||||||
@ -221,9 +223,9 @@ class TrackTest(unittest.TestCase):
|
|||||||
self.assertRaises(AttributeError, setattr, track, 'bitrate', None)
|
self.assertRaises(AttributeError, setattr, track, 'bitrate', None)
|
||||||
|
|
||||||
def test_id(self):
|
def test_id(self):
|
||||||
id = 17
|
track_id = 17
|
||||||
track = Track(id=id)
|
track = Track(id=track_id)
|
||||||
self.assertEqual(track.id, id)
|
self.assertEqual(track.id, track_id)
|
||||||
self.assertRaises(AttributeError, setattr, track, 'id', None)
|
self.assertRaises(AttributeError, setattr, track, 'id', None)
|
||||||
|
|
||||||
def test_mpd_format_for_empty_track(self):
|
def test_mpd_format_for_empty_track(self):
|
||||||
|
|||||||
@ -1,21 +1,84 @@
|
|||||||
#encoding: utf-8
|
#encoding: utf-8
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
import sys
|
||||||
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
from mopidy.utils import parse_m3u, parse_mpd_tag_cache
|
from mopidy.utils import *
|
||||||
from mopidy.models import Track, Artist, Album
|
from mopidy.models import Track, Artist, Album
|
||||||
|
|
||||||
from tests import SkipTest, data_folder
|
from tests import SkipTest, data_folder
|
||||||
|
|
||||||
|
class GetOrCreateFolderTest(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.parent = tempfile.mkdtemp()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.isdir(self.parent):
|
||||||
|
shutil.rmtree(self.parent)
|
||||||
|
|
||||||
|
def test_creating_folder(self):
|
||||||
|
folder = os.path.join(self.parent, 'test')
|
||||||
|
self.assert_(not os.path.exists(folder))
|
||||||
|
self.assert_(not os.path.isdir(folder))
|
||||||
|
created = get_or_create_folder(folder)
|
||||||
|
self.assert_(os.path.exists(folder))
|
||||||
|
self.assert_(os.path.isdir(folder))
|
||||||
|
self.assertEqual(created, folder)
|
||||||
|
|
||||||
|
def test_creating_existing_folder(self):
|
||||||
|
created = get_or_create_folder(self.parent)
|
||||||
|
self.assert_(os.path.exists(self.parent))
|
||||||
|
self.assert_(os.path.isdir(self.parent))
|
||||||
|
self.assertEqual(created, self.parent)
|
||||||
|
|
||||||
|
def test_that_userfolder_is_expanded(self):
|
||||||
|
raise SkipTest # Not sure how to safely test this
|
||||||
|
|
||||||
|
|
||||||
|
class PathToFileURITest(unittest.TestCase):
|
||||||
|
def test_simple_path(self):
|
||||||
|
if sys.platform == 'win32':
|
||||||
|
result = path_to_uri(u'C:/WINDOWS/clock.avi')
|
||||||
|
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
|
||||||
|
else:
|
||||||
|
result = path_to_uri(u'/etc/fstab')
|
||||||
|
self.assertEqual(result, 'file:///etc/fstab')
|
||||||
|
|
||||||
|
def test_folder_and_path(self):
|
||||||
|
if sys.platform == 'win32':
|
||||||
|
result = path_to_uri(u'C:/WINDOWS/', u'clock.avi')
|
||||||
|
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
|
||||||
|
else:
|
||||||
|
result = path_to_uri(u'/etc', u'fstab')
|
||||||
|
self.assertEqual(result, u'file:///etc/fstab')
|
||||||
|
|
||||||
|
def test_space_in_path(self):
|
||||||
|
if sys.platform == 'win32':
|
||||||
|
result = path_to_uri(u'C:/test this')
|
||||||
|
self.assertEqual(result, 'file:///C://test%20this')
|
||||||
|
else:
|
||||||
|
result = path_to_uri(u'/tmp/test this')
|
||||||
|
self.assertEqual(result, u'file:///tmp/test%20this')
|
||||||
|
|
||||||
|
def test_unicode_in_path(self):
|
||||||
|
if sys.platform == 'win32':
|
||||||
|
result = path_to_uri(u'C:/æøå')
|
||||||
|
self.assertEqual(result, 'file:///C://%C3%A6%C3%B8%C3%A5')
|
||||||
|
else:
|
||||||
|
result = path_to_uri(u'/tmp/æøå')
|
||||||
|
self.assertEqual(result, u'file:///tmp/%C3%A6%C3%B8%C3%A5')
|
||||||
|
|
||||||
|
|
||||||
song1_path = data_folder('song1.mp3')
|
song1_path = data_folder('song1.mp3')
|
||||||
song2_path = data_folder('song2.mp3')
|
song2_path = data_folder('song2.mp3')
|
||||||
encoded_path = data_folder(u'æøå.mp3')
|
encoded_path = data_folder(u'æøå.mp3')
|
||||||
song1_uri = 'file://' + urllib.pathname2url(song1_path)
|
song1_uri = path_to_uri(song1_path)
|
||||||
song2_uri = 'file://' + urllib.pathname2url(song2_path)
|
song2_uri = path_to_uri(song2_path)
|
||||||
encoded_uri = 'file://' + urllib.pathname2url(encoded_path.encode('utf-8'))
|
encoded_uri = path_to_uri(encoded_path)
|
||||||
|
|
||||||
|
|
||||||
class M3UToUriTest(unittest.TestCase):
|
class M3UToUriTest(unittest.TestCase):
|
||||||
@ -32,27 +95,37 @@ class M3UToUriTest(unittest.TestCase):
|
|||||||
self.assertEqual([song1_uri], uris)
|
self.assertEqual([song1_uri], uris)
|
||||||
|
|
||||||
def test_file_with_absolute_files(self):
|
def test_file_with_absolute_files(self):
|
||||||
with tempfile.NamedTemporaryFile() as file:
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||||
file.write(song1_path)
|
tmp.write(song1_path)
|
||||||
file.flush()
|
try:
|
||||||
uris = parse_m3u(file.name)
|
uris = parse_m3u(tmp.name)
|
||||||
self.assertEqual([song1_uri], uris)
|
self.assertEqual([song1_uri], uris)
|
||||||
|
finally:
|
||||||
|
if os.path.exists(tmp.name):
|
||||||
|
os.remove(tmp.name)
|
||||||
|
|
||||||
def test_file_with_multiple_absolute_files(self):
|
def test_file_with_multiple_absolute_files(self):
|
||||||
with tempfile.NamedTemporaryFile() as file:
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||||
file.write(song1_path+'\n')
|
tmp.write(song1_path+'\n')
|
||||||
file.write('# comment \n')
|
tmp.write('# comment \n')
|
||||||
file.write(song2_path)
|
tmp.write(song2_path)
|
||||||
file.flush()
|
try:
|
||||||
uris = parse_m3u(file.name)
|
uris = parse_m3u(tmp.name)
|
||||||
self.assertEqual([song1_uri, song2_uri], uris)
|
self.assertEqual([song1_uri, song2_uri], uris)
|
||||||
|
finally:
|
||||||
|
if os.path.exists(tmp.name):
|
||||||
|
os.remove(tmp.name)
|
||||||
|
|
||||||
|
|
||||||
def test_file_with_uri(self):
|
def test_file_with_uri(self):
|
||||||
with tempfile.NamedTemporaryFile() as file:
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
||||||
file.write(song1_uri)
|
tmp.write(song1_uri)
|
||||||
file.flush()
|
try:
|
||||||
uris = parse_m3u(file.name)
|
uris = parse_m3u(tmp.name)
|
||||||
self.assertEqual([song1_uri], uris)
|
self.assertEqual([song1_uri], uris)
|
||||||
|
finally:
|
||||||
|
if os.path.exists(tmp.name):
|
||||||
|
os.remove(tmp.name)
|
||||||
|
|
||||||
def test_encoding_is_latin1(self):
|
def test_encoding_is_latin1(self):
|
||||||
uris = parse_m3u(data_folder('encoding.m3u'))
|
uris = parse_m3u(data_folder('encoding.m3u'))
|
||||||
@ -67,11 +140,12 @@ class URItoM3UTest(unittest.TestCase):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
expected_artists = [Artist(name='name')]
|
expected_artists = [Artist(name='name')]
|
||||||
expected_albums = [Album(name='albumname', artists=expected_artists, num_tracks=2)]
|
expected_albums = [Album(name='albumname', artists=expected_artists,
|
||||||
|
num_tracks=2)]
|
||||||
expected_tracks = []
|
expected_tracks = []
|
||||||
|
|
||||||
def generate_track(path):
|
def generate_track(path):
|
||||||
uri = 'file://' + urllib.pathname2url(data_folder(path))
|
uri = path_to_uri(data_folder(path))
|
||||||
track = Track(name='trackname', artists=expected_artists, track_no=1,
|
track = Track(name='trackname', artists=expected_artists, track_no=1,
|
||||||
album=expected_albums[0], length=4000, uri=uri)
|
album=expected_albums[0], length=4000, uri=uri)
|
||||||
expected_tracks.append(track)
|
expected_tracks.append(track)
|
||||||
@ -88,27 +162,19 @@ generate_track('subdir1/subsubdir/song9.mp3')
|
|||||||
|
|
||||||
class MPDTagCacheToTracksTest(unittest.TestCase):
|
class MPDTagCacheToTracksTest(unittest.TestCase):
|
||||||
def test_emtpy_cache(self):
|
def test_emtpy_cache(self):
|
||||||
tracks, artists, albums = parse_mpd_tag_cache(data_folder('empty_tag_cache'),
|
tracks = parse_mpd_tag_cache(data_folder('empty_tag_cache'),
|
||||||
data_folder(''))
|
data_folder(''))
|
||||||
self.assertEqual(set(), tracks)
|
self.assertEqual(set(), tracks)
|
||||||
self.assertEqual(set(), artists)
|
|
||||||
self.assertEqual(set(), albums)
|
|
||||||
|
|
||||||
def test_simple_cache(self):
|
def test_simple_cache(self):
|
||||||
tracks, artists, albums = parse_mpd_tag_cache(data_folder('simple_tag_cache'),
|
tracks = parse_mpd_tag_cache(data_folder('simple_tag_cache'),
|
||||||
data_folder(''))
|
data_folder(''))
|
||||||
|
|
||||||
self.assertEqual(expected_tracks[0], list(tracks)[0])
|
self.assertEqual(expected_tracks[0], list(tracks)[0])
|
||||||
self.assertEqual(set(expected_artists), artists)
|
|
||||||
self.assertEqual(set(expected_albums), albums)
|
|
||||||
|
|
||||||
def test_advanced_cache(self):
|
def test_advanced_cache(self):
|
||||||
tracks, artists, albums = parse_mpd_tag_cache(data_folder('advanced_tag_cache'),
|
tracks = parse_mpd_tag_cache(data_folder('advanced_tag_cache'),
|
||||||
data_folder(''))
|
data_folder(''))
|
||||||
|
|
||||||
self.assertEqual(set(expected_tracks), tracks)
|
self.assertEqual(set(expected_tracks), tracks)
|
||||||
self.assertEqual(set(expected_artists), artists)
|
|
||||||
self.assertEqual(set(expected_albums), albums)
|
|
||||||
|
|
||||||
def test_unicode_cache(self):
|
def test_unicode_cache(self):
|
||||||
raise SkipTest
|
raise SkipTest
|
||||||
@ -118,12 +184,7 @@ class MPDTagCacheToTracksTest(unittest.TestCase):
|
|||||||
raise SkipTest
|
raise SkipTest
|
||||||
|
|
||||||
def test_cache_with_blank_track_info(self):
|
def test_cache_with_blank_track_info(self):
|
||||||
tracks, artists, albums = parse_mpd_tag_cache(data_folder('blank_tag_cache'),
|
tracks = parse_mpd_tag_cache(data_folder('blank_tag_cache'),
|
||||||
data_folder(''))
|
data_folder(''))
|
||||||
|
uri = path_to_uri(data_folder('song1.mp3'))
|
||||||
uri = 'file://' + urllib.pathname2url(data_folder('song1.mp3'))
|
|
||||||
|
|
||||||
self.assertEqual(set([Track(uri=uri, length=4000)]), tracks)
|
self.assertEqual(set([Track(uri=uri, length=4000)]), tracks)
|
||||||
self.assertEqual(set(), artists)
|
|
||||||
self.assertEqual(set(), albums)
|
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user