Split mopidy.utils into smaller pieces
This commit is contained in:
parent
664c731f77
commit
e4edd70c6d
@ -14,10 +14,10 @@ import glob
|
||||
import shutil
|
||||
import threading
|
||||
|
||||
from mopidy import settings
|
||||
from mopidy.backends.base import *
|
||||
from mopidy.models import Playlist, Track, Album
|
||||
from mopidy import settings
|
||||
from mopidy.utils import parse_m3u, parse_mpd_tag_cache
|
||||
from .translator import parse_m3u, parse_mpd_tag_cache
|
||||
|
||||
logger = logging.getLogger(u'mopidy.backends.local')
|
||||
|
||||
|
||||
@ -3,57 +3,10 @@ import os
|
||||
import sys
|
||||
import urllib
|
||||
|
||||
logger = logging.getLogger('mopidy.utils')
|
||||
logger = logging.getLogger('mopidy.backends.local.translator')
|
||||
|
||||
from mopidy.models import Track, Artist, Album
|
||||
|
||||
def flatten(the_list):
|
||||
result = []
|
||||
for element in the_list:
|
||||
if isinstance(element, list):
|
||||
result.extend(flatten(element))
|
||||
else:
|
||||
result.append(element)
|
||||
return result
|
||||
|
||||
def import_module(name):
|
||||
__import__(name)
|
||||
return sys.modules[name]
|
||||
|
||||
def get_class(name):
|
||||
module_name = name[:name.rindex('.')]
|
||||
class_name = name[name.rindex('.') + 1:]
|
||||
logger.debug('Loading: %s', name)
|
||||
try:
|
||||
module = import_module(module_name)
|
||||
class_object = getattr(module, class_name)
|
||||
except (ImportError, AttributeError):
|
||||
raise ImportError("Couldn't load: %s" % name)
|
||||
return class_object
|
||||
|
||||
def get_or_create_folder(folder):
|
||||
folder = os.path.expanduser(folder)
|
||||
if not os.path.isdir(folder):
|
||||
logger.info(u'Creating %s', folder)
|
||||
os.mkdir(folder, 0755)
|
||||
return folder
|
||||
|
||||
def path_to_uri(*paths):
|
||||
path = os.path.join(*paths)
|
||||
#path = os.path.expanduser(path) # FIXME
|
||||
path = path.encode('utf-8')
|
||||
if sys.platform == 'win32':
|
||||
return 'file:' + urllib.pathname2url(path)
|
||||
return 'file://' + urllib.pathname2url(path)
|
||||
|
||||
def indent(string, places=4, linebreak='\n'):
|
||||
lines = string.split(linebreak)
|
||||
if len(lines) == 1:
|
||||
return string
|
||||
result = u''
|
||||
for line in lines:
|
||||
result += linebreak + ' ' * places + line
|
||||
return result
|
||||
from mopidy.utils.path import path_to_uri
|
||||
|
||||
def parse_m3u(file_path):
|
||||
"""
|
||||
38
mopidy/utils/__init__.py
Normal file
38
mopidy/utils/__init__.py
Normal file
@ -0,0 +1,38 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
logger = logging.getLogger('mopidy.utils')
|
||||
|
||||
def flatten(the_list):
|
||||
result = []
|
||||
for element in the_list:
|
||||
if isinstance(element, list):
|
||||
result.extend(flatten(element))
|
||||
else:
|
||||
result.append(element)
|
||||
return result
|
||||
|
||||
def import_module(name):
|
||||
__import__(name)
|
||||
return sys.modules[name]
|
||||
|
||||
def get_class(name):
|
||||
module_name = name[:name.rindex('.')]
|
||||
class_name = name[name.rindex('.') + 1:]
|
||||
logger.debug('Loading: %s', name)
|
||||
try:
|
||||
module = import_module(module_name)
|
||||
class_object = getattr(module, class_name)
|
||||
except (ImportError, AttributeError):
|
||||
raise ImportError("Couldn't load: %s" % name)
|
||||
return class_object
|
||||
|
||||
def indent(string, places=4, linebreak='\n'):
|
||||
lines = string.split(linebreak)
|
||||
if len(lines) == 1:
|
||||
return string
|
||||
result = u''
|
||||
for line in lines:
|
||||
result += linebreak + ' ' * places + line
|
||||
return result
|
||||
21
mopidy/utils/path.py
Normal file
21
mopidy/utils/path.py
Normal file
@ -0,0 +1,21 @@
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import urllib
|
||||
|
||||
logger = logging.getLogger('mopidy.utils.path')
|
||||
|
||||
def get_or_create_folder(folder):
|
||||
folder = os.path.expanduser(folder)
|
||||
if not os.path.isdir(folder):
|
||||
logger.info(u'Creating %s', folder)
|
||||
os.mkdir(folder, 0755)
|
||||
return folder
|
||||
|
||||
def path_to_uri(*paths):
|
||||
path = os.path.join(*paths)
|
||||
#path = os.path.expanduser(path) # FIXME Waiting for test case?
|
||||
path = path.encode('utf-8')
|
||||
if sys.platform == 'win32':
|
||||
return 'file:' + urllib.pathname2url(path)
|
||||
return 'file://' + urllib.pathname2url(path)
|
||||
0
tests/backends/local/__init__.py
Normal file
0
tests/backends/local/__init__.py
Normal file
@ -11,7 +11,7 @@ from mopidy import settings
|
||||
from mopidy.backends.local import LocalBackend
|
||||
from mopidy.mixers.dummy import DummyMixer
|
||||
from mopidy.models import Playlist, Track
|
||||
from mopidy.utils import path_to_uri
|
||||
from mopidy.utils.path import path_to_uri
|
||||
|
||||
from tests.backends.base import *
|
||||
from tests import SkipTest, data_folder
|
||||
@ -1,96 +1,15 @@
|
||||
#encoding: utf-8
|
||||
# encoding: utf-8
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from mopidy.utils import *
|
||||
from mopidy.utils.path import path_to_uri
|
||||
from mopidy.backends.local.translator import parse_m3u, parse_mpd_tag_cache
|
||||
from mopidy.models import Track, Artist, Album
|
||||
|
||||
from tests import SkipTest, data_folder
|
||||
|
||||
class GetClassTest(unittest.TestCase):
|
||||
def test_loading_module_that_does_not_exist(self):
|
||||
test = lambda: get_class('foo.bar.Baz')
|
||||
self.assertRaises(ImportError, test)
|
||||
|
||||
def test_loading_class_that_does_not_exist(self):
|
||||
test = lambda: get_class('unittest.FooBarBaz')
|
||||
self.assertRaises(ImportError, test)
|
||||
|
||||
def test_import_error_message_contains_complete_class_path(self):
|
||||
try:
|
||||
get_class('foo.bar.Baz')
|
||||
except ImportError as e:
|
||||
self.assert_('foo.bar.Baz' in str(e))
|
||||
|
||||
def test_loading_existing_class(self):
|
||||
cls = get_class('unittest.TestCase')
|
||||
self.assertEqual(cls.__name__, 'TestCase')
|
||||
|
||||
class GetOrCreateFolderTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.parent = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.isdir(self.parent):
|
||||
shutil.rmtree(self.parent)
|
||||
|
||||
def test_creating_folder(self):
|
||||
folder = os.path.join(self.parent, 'test')
|
||||
self.assert_(not os.path.exists(folder))
|
||||
self.assert_(not os.path.isdir(folder))
|
||||
created = get_or_create_folder(folder)
|
||||
self.assert_(os.path.exists(folder))
|
||||
self.assert_(os.path.isdir(folder))
|
||||
self.assertEqual(created, folder)
|
||||
|
||||
def test_creating_existing_folder(self):
|
||||
created = get_or_create_folder(self.parent)
|
||||
self.assert_(os.path.exists(self.parent))
|
||||
self.assert_(os.path.isdir(self.parent))
|
||||
self.assertEqual(created, self.parent)
|
||||
|
||||
def test_that_userfolder_is_expanded(self):
|
||||
raise SkipTest # Not sure how to safely test this
|
||||
|
||||
|
||||
class PathToFileURITest(unittest.TestCase):
|
||||
def test_simple_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/WINDOWS/clock.avi')
|
||||
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
|
||||
else:
|
||||
result = path_to_uri(u'/etc/fstab')
|
||||
self.assertEqual(result, 'file:///etc/fstab')
|
||||
|
||||
def test_folder_and_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/WINDOWS/', u'clock.avi')
|
||||
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
|
||||
else:
|
||||
result = path_to_uri(u'/etc', u'fstab')
|
||||
self.assertEqual(result, u'file:///etc/fstab')
|
||||
|
||||
def test_space_in_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/test this')
|
||||
self.assertEqual(result, 'file:///C://test%20this')
|
||||
else:
|
||||
result = path_to_uri(u'/tmp/test this')
|
||||
self.assertEqual(result, u'file:///tmp/test%20this')
|
||||
|
||||
def test_unicode_in_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/æøå')
|
||||
self.assertEqual(result, 'file:///C://%C3%A6%C3%B8%C3%A5')
|
||||
else:
|
||||
result = path_to_uri(u'/tmp/æøå')
|
||||
self.assertEqual(result, u'file:///tmp/%C3%A6%C3%B8%C3%A5')
|
||||
|
||||
|
||||
song1_path = data_folder('song1.mp3')
|
||||
song2_path = data_folder('song2.mp3')
|
||||
encoded_path = data_folder(u'æøå.mp3')
|
||||
@ -98,7 +17,6 @@ song1_uri = path_to_uri(song1_path)
|
||||
song2_uri = path_to_uri(song2_path)
|
||||
encoded_uri = path_to_uri(encoded_path)
|
||||
|
||||
|
||||
class M3UToUriTest(unittest.TestCase):
|
||||
def test_empty_file(self):
|
||||
uris = parse_m3u(data_folder('empty.m3u'))
|
||||
@ -1,15 +1,15 @@
|
||||
import multiprocessing
|
||||
import unittest
|
||||
|
||||
from mopidy.utils import path_to_uri
|
||||
from mopidy.process import pickle_connection
|
||||
from mopidy.outputs.gstreamer import GStreamerOutput
|
||||
from mopidy.process import pickle_connection
|
||||
from mopidy.utils.path import path_to_uri
|
||||
|
||||
from tests import data_folder, SkipTest
|
||||
|
||||
class GStreamerOutputTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.song_uri = path_to_uri(data_folder('song1.wav'))
|
||||
self.song_uri = path_to_uri(data_folder('song1.wav'))
|
||||
self.output_queue = multiprocessing.Queue()
|
||||
self.core_queue = multiprocessing.Queue()
|
||||
self.output = GStreamerOutput(self.core_queue, self.output_queue)
|
||||
|
||||
0
tests/utils/__init__.py
Normal file
0
tests/utils/__init__.py
Normal file
22
tests/utils/init_test.py
Normal file
22
tests/utils/init_test.py
Normal file
@ -0,0 +1,22 @@
|
||||
import unittest
|
||||
|
||||
from mopidy.utils import get_class
|
||||
|
||||
class GetClassTest(unittest.TestCase):
|
||||
def test_loading_module_that_does_not_exist(self):
|
||||
test = lambda: get_class('foo.bar.Baz')
|
||||
self.assertRaises(ImportError, test)
|
||||
|
||||
def test_loading_class_that_does_not_exist(self):
|
||||
test = lambda: get_class('unittest.FooBarBaz')
|
||||
self.assertRaises(ImportError, test)
|
||||
|
||||
def test_import_error_message_contains_complete_class_path(self):
|
||||
try:
|
||||
get_class('foo.bar.Baz')
|
||||
except ImportError as e:
|
||||
self.assert_('foo.bar.Baz' in str(e))
|
||||
|
||||
def test_loading_existing_class(self):
|
||||
cls = get_class('unittest.TestCase')
|
||||
self.assertEqual(cls.__name__, 'TestCase')
|
||||
71
tests/utils/path_test.py
Normal file
71
tests/utils/path_test.py
Normal file
@ -0,0 +1,71 @@
|
||||
# encoding: utf-8
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
from mopidy.utils.path import get_or_create_folder, path_to_uri
|
||||
|
||||
from tests import SkipTest
|
||||
|
||||
class GetOrCreateFolderTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.parent = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.isdir(self.parent):
|
||||
shutil.rmtree(self.parent)
|
||||
|
||||
def test_creating_folder(self):
|
||||
folder = os.path.join(self.parent, 'test')
|
||||
self.assert_(not os.path.exists(folder))
|
||||
self.assert_(not os.path.isdir(folder))
|
||||
created = get_or_create_folder(folder)
|
||||
self.assert_(os.path.exists(folder))
|
||||
self.assert_(os.path.isdir(folder))
|
||||
self.assertEqual(created, folder)
|
||||
|
||||
def test_creating_existing_folder(self):
|
||||
created = get_or_create_folder(self.parent)
|
||||
self.assert_(os.path.exists(self.parent))
|
||||
self.assert_(os.path.isdir(self.parent))
|
||||
self.assertEqual(created, self.parent)
|
||||
|
||||
def test_that_userfolder_is_expanded(self):
|
||||
raise SkipTest # Not sure how to safely test this
|
||||
|
||||
|
||||
class PathToFileURITest(unittest.TestCase):
|
||||
def test_simple_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/WINDOWS/clock.avi')
|
||||
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
|
||||
else:
|
||||
result = path_to_uri(u'/etc/fstab')
|
||||
self.assertEqual(result, 'file:///etc/fstab')
|
||||
|
||||
def test_folder_and_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/WINDOWS/', u'clock.avi')
|
||||
self.assertEqual(result, 'file:///C://WINDOWS/clock.avi')
|
||||
else:
|
||||
result = path_to_uri(u'/etc', u'fstab')
|
||||
self.assertEqual(result, u'file:///etc/fstab')
|
||||
|
||||
def test_space_in_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/test this')
|
||||
self.assertEqual(result, 'file:///C://test%20this')
|
||||
else:
|
||||
result = path_to_uri(u'/tmp/test this')
|
||||
self.assertEqual(result, u'file:///tmp/test%20this')
|
||||
|
||||
def test_unicode_in_path(self):
|
||||
if sys.platform == 'win32':
|
||||
result = path_to_uri(u'C:/æøå')
|
||||
self.assertEqual(result, 'file:///C://%C3%A6%C3%B8%C3%A5')
|
||||
else:
|
||||
result = path_to_uri(u'/tmp/æøå')
|
||||
self.assertEqual(result, u'file:///tmp/%C3%A6%C3%B8%C3%A5')
|
||||
Loading…
Reference in New Issue
Block a user