Merge pull request #276 from jodal/feature/scanner-encoding-support

Scanner encoding support
This commit is contained in:
Thomas Adamcik 2012-12-12 15:11:10 -08:00
commit 8ee4b431ac
6 changed files with 117 additions and 45 deletions

View File

@ -29,6 +29,10 @@ v0.10.0 (in development)
:option:`-v`/:option:`--verbose` options to control the amount of logging
output when scanning.
- The scanner can now handle files with other encodings than UTF-8. Rebuild
your tag cache with ``mopidy-scan`` to include tracks that may have been
ignored previously.
**HTTP frontend**
- Added new optional HTTP frontend which exposes Mopidy's core API through

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
import logging
import urllib
from mopidy.models import Track, Artist, Album
from mopidy.utils.encoding import locale_decode
@ -139,6 +140,7 @@ def _convert_mpd_data(data, tracks, music_dir):
path = data['file'][1:]
else:
path = data['file']
path = urllib.unquote(path)
if artist_kwargs:
artist = Artist(**artist_kwargs)

View File

@ -2,6 +2,7 @@ from __future__ import unicode_literals
import os
import re
import urllib
from mopidy import settings
from mopidy.frontends.mpd import protocol
@ -153,40 +154,56 @@ def tracks_to_tag_cache_format(tracks):
def _add_to_tag_cache(result, folders, files):
music_folder = settings.LOCAL_MUSIC_PATH
base_path = settings.LOCAL_MUSIC_PATH.encode('utf-8')
for path, entry in folders.items():
name = os.path.split(path)[1]
mtime = get_mtime(os.path.join(music_folder, path))
result.append(('directory', path))
result.append(('mtime', mtime))
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
name = os.path.split(text_path)[1]
result.append(('directory', text_path))
result.append(('mtime', get_mtime(os.path.join(base_path, path))))
result.append(('begin', name))
_add_to_tag_cache(result, *entry)
result.append(('end', name))
result.append(('songList begin',))
for track in files:
track_result = dict(track_to_mpd_format(track))
track_result['mtime'] = get_mtime(uri_to_path(track_result['file']))
track_result['file'] = track_result['file']
track_result['key'] = os.path.basename(track_result['file'])
path = uri_to_path(track_result['file'])
try:
text_path = path.decode('utf-8')
except UnicodeDecodeError:
text_path = urllib.quote(path).decode('utf-8')
relative_path = os.path.relpath(path, base_path)
relative_uri = urllib.quote(relative_path)
track_result['file'] = relative_uri
track_result['mtime'] = get_mtime(path)
track_result['key'] = os.path.basename(text_path)
track_result = order_mpd_track_info(track_result.items())
result.extend(track_result)
result.append(('songList end',))
def tracks_to_directory_tree(tracks):
directories = ({}, [])
for track in tracks:
path = ''
path = b''
current = directories
local_folder = settings.LOCAL_MUSIC_PATH
track_path = uri_to_path(track.uri)
track_path = re.sub('^' + re.escape(local_folder), '', track_path)
track_dir = os.path.dirname(track_path)
absolute_track_dir_path = os.path.dirname(uri_to_path(track.uri))
relative_track_dir_path = re.sub(
'^' + re.escape(settings.LOCAL_MUSIC_PATH), b'',
absolute_track_dir_path)
for part in split_path(track_dir):
for part in split_path(relative_track_dir_path):
path = os.path.join(path, part)
if path not in current[0]:
current[0][path] = ({}, [])

View File

@ -51,19 +51,40 @@ def get_or_create_file(filename):
def path_to_uri(*paths):
"""
Convert OS specific path to file:// URI.
Accepts either unicode strings or bytestrings. The encoding of any
bytestring will be maintained so that :func:`uri_to_path` can return the
same bytestring.
Returns a file:// URI as an unicode string.
"""
path = os.path.join(*paths)
path = path.encode('utf-8')
if isinstance(path, unicode):
path = path.encode('utf-8')
if sys.platform == 'win32':
return 'file:' + urllib.pathname2url(path)
return 'file://' + urllib.pathname2url(path)
return 'file:' + urllib.quote(path)
return 'file://' + urllib.quote(path)
def uri_to_path(uri):
"""
Convert the file:// to a OS specific path.
Returns a bytestring, since the file path can contain chars with other
encoding than UTF-8.
If we had returned these paths as unicode strings, you wouldn't be able to
look up the matching dir or file on your file system because the exact path
would be lost by ignoring its encoding.
"""
if isinstance(uri, unicode):
uri = uri.encode('utf-8')
if sys.platform == 'win32':
path = urllib.url2pathname(re.sub('^file:', '', uri))
return urllib.unquote(re.sub(b'^file:', b'', uri))
else:
path = urllib.url2pathname(re.sub('^file://', '', uri))
return path.encode('latin1').decode('utf-8') # Undo double encoding
return urllib.unquote(re.sub(b'^file://', b'', uri))
def split_path(path):
@ -72,7 +93,7 @@ def split_path(path):
path, part = os.path.split(path)
if part:
parts.insert(0, part)
if not path or path == '/':
if not path or path == b'/':
break
return parts
@ -85,30 +106,32 @@ def expand_path(path):
def find_files(path):
"""
Finds all files within a path.
Directories and files with names starting with ``.`` is ignored.
:returns: yields the full path to files as bytestrings
"""
if isinstance(path, unicode):
path = path.encode('utf-8')
if os.path.isfile(path):
if not isinstance(path, unicode):
path = path.decode('utf-8')
if not os.path.basename(path).startswith('.'):
if not os.path.basename(path).startswith(b'.'):
yield path
else:
for dirpath, dirnames, filenames in os.walk(path):
# Filter out hidden folders by modifying dirnames in place.
for dirname in dirnames:
if dirname.startswith('.'):
if dirname.startswith(b'.'):
# Skip hidden folders by modifying dirnames inplace
dirnames.remove(dirname)
for filename in filenames:
# Skip hidden files.
if filename.startswith('.'):
if filename.startswith(b'.'):
# Skip hidden files
continue
filename = os.path.join(dirpath, filename)
if not isinstance(filename, unicode):
try:
filename = filename.decode('utf-8')
except UnicodeDecodeError:
filename = filename.decode('latin1')
yield filename
yield os.path.join(dirpath, filename)
def check_file_path_is_inside_base_dir(file_path, base_path):

View File

@ -4,7 +4,7 @@ import datetime
import os
from mopidy import settings
from mopidy.utils.path import mtime
from mopidy.utils.path import mtime, uri_to_path
from mopidy.frontends.mpd import translator, protocol
from mopidy.models import Album, Artist, TlTrack, Playlist, Track
@ -131,7 +131,9 @@ class TracksToTagCacheFormatTest(unittest.TestCase):
mtime.undo_fake()
def translate(self, track):
base_path = settings.LOCAL_MUSIC_PATH.encode('utf-8')
result = dict(translator.track_to_mpd_format(track))
result['file'] = uri_to_path(result['file'])[len(base_path) + 1:]
result['key'] = os.path.basename(result['file'])
result['mtime'] = mtime('')
return translator.order_mpd_track_info(result.items())

View File

@ -90,31 +90,55 @@ class PathToFileURITest(unittest.TestCase):
result = path.path_to_uri('/tmp/æøå')
self.assertEqual(result, 'file:///tmp/%C3%A6%C3%B8%C3%A5')
def test_utf8_in_path(self):
if sys.platform == 'win32':
result = path.path_to_uri('C:/æøå'.encode('utf-8'))
self.assertEqual(result, 'file:///C://%C3%A6%C3%B8%C3%A5')
else:
result = path.path_to_uri('/tmp/æøå'.encode('utf-8'))
self.assertEqual(result, 'file:///tmp/%C3%A6%C3%B8%C3%A5')
def test_latin1_in_path(self):
if sys.platform == 'win32':
result = path.path_to_uri('C:/æøå'.encode('latin-1'))
self.assertEqual(result, 'file:///C://%E6%F8%E5')
else:
result = path.path_to_uri('/tmp/æøå'.encode('latin-1'))
self.assertEqual(result, 'file:///tmp/%E6%F8%E5')
class UriToPathTest(unittest.TestCase):
def test_simple_uri(self):
if sys.platform == 'win32':
result = path.uri_to_path('file:///C://WINDOWS/clock.avi')
self.assertEqual(result, 'C:/WINDOWS/clock.avi')
self.assertEqual(result, 'C:/WINDOWS/clock.avi'.encode('utf-8'))
else:
result = path.uri_to_path('file:///etc/fstab')
self.assertEqual(result, '/etc/fstab')
self.assertEqual(result, '/etc/fstab'.encode('utf-8'))
def test_space_in_uri(self):
if sys.platform == 'win32':
result = path.uri_to_path('file:///C://test%20this')
self.assertEqual(result, 'C:/test this')
self.assertEqual(result, 'C:/test this'.encode('utf-8'))
else:
result = path.uri_to_path('file:///tmp/test%20this')
self.assertEqual(result, '/tmp/test this')
self.assertEqual(result, '/tmp/test this'.encode('utf-8'))
def test_unicode_in_uri(self):
if sys.platform == 'win32':
result = path.uri_to_path('file:///C://%C3%A6%C3%B8%C3%A5')
self.assertEqual(result, 'C:/æøå')
self.assertEqual(result, 'C:/æøå'.encode('utf-8'))
else:
result = path.uri_to_path('file:///tmp/%C3%A6%C3%B8%C3%A5')
self.assertEqual(result, '/tmp/æøå')
self.assertEqual(result, '/tmp/æøå'.encode('utf-8'))
def test_latin1_in_uri(self):
if sys.platform == 'win32':
result = path.uri_to_path('file:///C://%E6%F8%E5')
self.assertEqual(result, 'C:/æøå'.encode('latin-1'))
else:
result = path.uri_to_path('file:///tmp/%E6%F8%E5')
self.assertEqual(result, '/tmp/æøå'.encode('latin-1'))
class SplitPathTest(unittest.TestCase):
@ -177,11 +201,11 @@ class FindFilesTest(unittest.TestCase):
self.assertEqual(len(files), 1)
self.assert_(files[0], path_to_data_dir('blank.mp3'))
def test_names_are_unicode(self):
is_unicode = lambda f: isinstance(f, unicode)
def test_names_are_bytestrings(self):
is_bytes = lambda f: isinstance(f, bytes)
for name in self.find(''):
self.assert_(
is_unicode(name), '%s is not unicode object' % repr(name))
is_bytes(name), '%s is not bytes object' % repr(name))
def test_ignores_hidden_folders(self):
self.assertEqual(self.find('.hidden'), [])