Add configuration option in [file] section of mopidy.conf Add documentation for feature with the valid options. Signed-off-by: caysho@internode.on.net
169 lines
5.9 KiB
Python
169 lines
5.9 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import logging
|
|
import operator
|
|
import os
|
|
import sys
|
|
import urllib2
|
|
|
|
from mopidy import backend, exceptions, models
|
|
from mopidy.audio import scan, tags
|
|
from mopidy.internal import path
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
FS_ENCODING = sys.getfilesystemencoding()
|
|
|
|
|
|
class FileLibraryProvider(backend.LibraryProvider):
|
|
"""Library for browsing local files."""
|
|
|
|
# TODO: get_images that can pull from metadata and/or .folder.png etc?
|
|
# TODO: handle playlists?
|
|
|
|
@property
|
|
def root_directory(self):
|
|
if not self._media_dirs:
|
|
return None
|
|
elif len(self._media_dirs) == 1:
|
|
uri = path.path_to_uri(self._media_dirs[0]['path'])
|
|
else:
|
|
uri = 'file:root'
|
|
return models.Ref.directory(name='Files', uri=uri)
|
|
|
|
def __init__(self, backend, config):
|
|
super(FileLibraryProvider, self).__init__(backend)
|
|
self._media_dirs = list(self._get_media_dirs(config))
|
|
self._show_dotfiles = config['file']['show_dotfiles']
|
|
self._excluded_file_extensions = tuple(
|
|
bytes(file_ext.lower())
|
|
for file_ext in config['file']['excluded_file_extensions'])
|
|
self._follow_symlinks = config['file']['follow_symlinks']
|
|
|
|
self._scanner = scan.Scanner(
|
|
timeout=config['file']['metadata_timeout'])
|
|
|
|
self._sort_order = config['file']['sort_order']
|
|
logger.info('File browing sort order is: %s', self._sort_order)
|
|
|
|
self._sort_files_first = (self._sort_order == 'FilesFirst')
|
|
self._sort_directories_first = (self._sort_order ==
|
|
'DirectoriesFirst')
|
|
self._sort_mixed = (self._sort_order == 'Mixed')
|
|
|
|
def browse(self, uri):
|
|
logger.debug('Browsing files at: %s', uri)
|
|
result = []
|
|
result_directories = []
|
|
result_files = []
|
|
local_path = path.uri_to_path(uri)
|
|
|
|
if local_path == 'root':
|
|
return list(self._get_media_dirs_refs())
|
|
|
|
if not self._is_in_basedir(os.path.realpath(local_path)):
|
|
logger.warning(
|
|
'Rejected attempt to browse path (%s) outside dirs defined '
|
|
'in file/media_dirs config.', uri)
|
|
return []
|
|
|
|
for dir_entry in os.listdir(local_path):
|
|
child_path = os.path.join(local_path, dir_entry)
|
|
uri = path.path_to_uri(child_path)
|
|
|
|
if not self._show_dotfiles and dir_entry.startswith(b'.'):
|
|
continue
|
|
|
|
if (self._excluded_file_extensions and
|
|
dir_entry.endswith(self._excluded_file_extensions)):
|
|
continue
|
|
|
|
if os.path.islink(child_path) and not self._follow_symlinks:
|
|
logger.debug('Ignoring symlink: %s', uri)
|
|
continue
|
|
|
|
if not self._is_in_basedir(os.path.realpath(child_path)):
|
|
logger.debug('Ignoring symlink to outside base dir: %s', uri)
|
|
continue
|
|
|
|
name = dir_entry.decode(FS_ENCODING, 'replace')
|
|
if os.path.isdir(child_path):
|
|
result_directories.append(models.Ref.directory(name=name,
|
|
uri=uri))
|
|
elif os.path.isfile(child_path):
|
|
result_files.append(models.Ref.track(name=name, uri=uri))
|
|
|
|
if not self._sort_mixed:
|
|
result_directories.sort(key=operator.attrgetter('name'))
|
|
result_files.sort(key=operator.attrgetter('name'))
|
|
|
|
if self._sort_mixed or self._sort_directories_first:
|
|
result = result_directories + result_files
|
|
|
|
if self._sort_files_first:
|
|
result = result_files + result_directories
|
|
|
|
if self._sort_mixed:
|
|
result.sort(key=operator.attrgetter('name'))
|
|
|
|
return result
|
|
|
|
def lookup(self, uri):
|
|
logger.debug('Looking up file URI: %s', uri)
|
|
local_path = path.uri_to_path(uri)
|
|
|
|
try:
|
|
result = self._scanner.scan(uri)
|
|
track = tags.convert_tags_to_track(result.tags).copy(
|
|
uri=uri, length=result.duration)
|
|
except exceptions.ScannerError as e:
|
|
logger.warning('Failed looking up %s: %s', uri, e)
|
|
track = models.Track(uri=uri)
|
|
|
|
if not track.name:
|
|
filename = os.path.basename(local_path)
|
|
name = urllib2.unquote(filename).decode(FS_ENCODING, 'replace')
|
|
track = track.copy(name=name)
|
|
|
|
return [track]
|
|
|
|
def _get_media_dirs(self, config):
|
|
for entry in config['file']['media_dirs']:
|
|
media_dir = {}
|
|
media_dir_split = entry.split('|', 1)
|
|
local_path = path.expand_path(
|
|
media_dir_split[0].encode(FS_ENCODING))
|
|
|
|
if not local_path:
|
|
logger.debug(
|
|
'Failed expanding path (%s) from file/media_dirs config '
|
|
'value.',
|
|
media_dir_split[0])
|
|
continue
|
|
elif not os.path.isdir(local_path):
|
|
logger.warning(
|
|
'%s is not a directory. Please create the directory or '
|
|
'update the file/media_dirs config value.', local_path)
|
|
continue
|
|
|
|
media_dir['path'] = local_path
|
|
if len(media_dir_split) == 2:
|
|
media_dir['name'] = media_dir_split[1]
|
|
else:
|
|
# TODO Mpd client should accept / in dir name
|
|
media_dir['name'] = media_dir_split[0].replace(os.sep, '+')
|
|
|
|
yield media_dir
|
|
|
|
def _get_media_dirs_refs(self):
|
|
for media_dir in self._media_dirs:
|
|
yield models.Ref.directory(
|
|
name=media_dir['name'],
|
|
uri=path.path_to_uri(media_dir['path']))
|
|
|
|
def _is_in_basedir(self, local_path):
|
|
return any(
|
|
path.is_path_inside_base_dir(
|
|
local_path, media_dir['path'].encode('utf-8'))
|
|
for media_dir in self._media_dirs)
|