Merge pull request #406 from adamcik/feature/reshuffle-config
Move config bits an pieces into mopidy.config sub-modules.
This commit is contained in:
commit
4e317681b0
@ -1,12 +1,9 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import codecs
|
||||
import ConfigParser as configparser
|
||||
import logging
|
||||
import optparse
|
||||
import os
|
||||
import signal
|
||||
import StringIO
|
||||
import sys
|
||||
|
||||
import gobject
|
||||
@ -27,13 +24,12 @@ sys.path.insert(
|
||||
0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))
|
||||
|
||||
|
||||
from mopidy import exceptions, ext
|
||||
from mopidy import ext
|
||||
from mopidy.audio import Audio
|
||||
from mopidy.config import default_config, config_schemas
|
||||
from mopidy import config as config_lib
|
||||
from mopidy.core import Core
|
||||
from mopidy.utils import deps, log, path, process, versioning
|
||||
|
||||
|
||||
logger = logging.getLogger('mopidy.main')
|
||||
|
||||
|
||||
@ -50,16 +46,19 @@ def main():
|
||||
|
||||
try:
|
||||
create_file_structures()
|
||||
logging_config = load_config(config_files, config_overrides)
|
||||
logging_config = config_lib.load(config_files, config_overrides)
|
||||
log.setup_logging(
|
||||
logging_config, options.verbosity_level, options.save_debug_log)
|
||||
extensions = ext.load_extensions()
|
||||
raw_config = load_config(config_files, config_overrides, extensions)
|
||||
raw_config = config_lib.load(config_files, config_overrides, extensions)
|
||||
extensions = ext.filter_enabled_extensions(raw_config, extensions)
|
||||
config = validate_config(raw_config, config_schemas, extensions)
|
||||
config = config_lib.validate(
|
||||
raw_config, config_lib.config_schemas, extensions)
|
||||
log.setup_log_levels(config)
|
||||
check_old_locations()
|
||||
|
||||
# TODO: wrap config in RO proxy.
|
||||
|
||||
# Anything that wants to exit after this point must use
|
||||
# mopidy.utils.process.exit_process as actors have been started.
|
||||
audio = setup_audio(config)
|
||||
@ -82,9 +81,7 @@ def main():
|
||||
|
||||
def check_config_override(option, opt, override):
|
||||
try:
|
||||
section, remainder = override.split('/', 1)
|
||||
key, value = remainder.split('=', 1)
|
||||
return (section, key, value)
|
||||
return config_lib.parse_override(override)
|
||||
except ValueError:
|
||||
raise optparse.OptionValueError(
|
||||
'option %s: must have the format section/key=value' % opt)
|
||||
@ -139,12 +136,14 @@ def show_config_callback(option, opt, value, parser):
|
||||
overrides = getattr(parser.values, 'overrides', [])
|
||||
|
||||
extensions = ext.load_extensions()
|
||||
raw_config = load_config(files, overrides, extensions)
|
||||
raw_config = config_lib.load(files, overrides, extensions)
|
||||
enabled_extensions = ext.filter_enabled_extensions(raw_config, extensions)
|
||||
config = validate_config(raw_config, config_schemas, enabled_extensions)
|
||||
config = config_lib.validate(
|
||||
raw_config, config_lib.config_schemas, enabled_extensions)
|
||||
|
||||
# TODO: create mopidy.config.format?
|
||||
output = []
|
||||
for section_name, schema in config_schemas.items():
|
||||
for section_name, schema in config_lib.config_schemas.items():
|
||||
options = config.get(section_name, {})
|
||||
if not options:
|
||||
continue
|
||||
@ -180,73 +179,6 @@ def check_old_locations():
|
||||
'for further instructions.', old_settings_file)
|
||||
|
||||
|
||||
def load_config(files, overrides, extensions=None):
|
||||
parser = configparser.RawConfigParser()
|
||||
|
||||
files = [path.expand_path(f) for f in files]
|
||||
sources = ['builtin-defaults'] + files + ['command-line']
|
||||
logger.info('Loading config from: %s', ', '.join(sources))
|
||||
|
||||
# Read default core config
|
||||
parser.readfp(StringIO.StringIO(default_config))
|
||||
|
||||
# Read default extension config
|
||||
for extension in extensions or []:
|
||||
parser.readfp(StringIO.StringIO(extension.get_default_config()))
|
||||
|
||||
# Load config from a series of config files
|
||||
for filename in files:
|
||||
# TODO: if this is the initial load of logging config we might not have
|
||||
# a logger at this point, we might want to handle this better.
|
||||
try:
|
||||
filehandle = codecs.open(filename, encoding='utf-8')
|
||||
parser.readfp(filehandle)
|
||||
except IOError:
|
||||
logger.debug('Config file %s not found; skipping', filename)
|
||||
continue
|
||||
except UnicodeDecodeError:
|
||||
logger.error('Config file %s is not UTF-8 encoded', filename)
|
||||
sys.exit(1)
|
||||
|
||||
raw_config = {}
|
||||
for section in parser.sections():
|
||||
raw_config[section] = dict(parser.items(section))
|
||||
|
||||
for section, key, value in overrides or []:
|
||||
raw_config.setdefault(section, {})[key] = value
|
||||
|
||||
return raw_config
|
||||
|
||||
|
||||
def validate_config(raw_config, schemas, extensions=None):
|
||||
# Collect config schemas to validate against
|
||||
sections_and_schemas = schemas.items()
|
||||
for extension in extensions or []:
|
||||
sections_and_schemas.append(
|
||||
(extension.ext_name, extension.get_config_schema()))
|
||||
|
||||
# Get validated config
|
||||
config = {}
|
||||
errors = {}
|
||||
for section_name, schema in sections_and_schemas:
|
||||
if section_name not in raw_config:
|
||||
errors[section_name] = {section_name: 'section not found'}
|
||||
try:
|
||||
items = raw_config[section_name].items()
|
||||
config[section_name] = schema.convert(items)
|
||||
except exceptions.ConfigError as error:
|
||||
errors[section_name] = error
|
||||
|
||||
if errors:
|
||||
for section_name, error in errors.items():
|
||||
logger.error('[%s] config errors:', section_name)
|
||||
for key in error:
|
||||
logger.error('%s %s', key, error[key])
|
||||
sys.exit(1)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def create_file_structures():
|
||||
path.get_or_create_dir('$XDG_DATA_DIR/mopidy')
|
||||
path.get_or_create_file('$XDG_CONFIG_DIR/mopidy/mopidy.conf')
|
||||
|
||||
@ -3,8 +3,7 @@ from __future__ import unicode_literals
|
||||
import os
|
||||
|
||||
import mopidy
|
||||
from mopidy import ext
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, ext
|
||||
|
||||
|
||||
class Extension(ext.Extension):
|
||||
@ -15,10 +14,10 @@ class Extension(ext.Extension):
|
||||
|
||||
def get_default_config(self):
|
||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
||||
return open(conf_file).read()
|
||||
return config.read(conf_file)
|
||||
|
||||
def get_config_schema(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
schema['media_dir'] = config.Path()
|
||||
schema['playlists_dir'] = config.Path()
|
||||
schema['tag_cache_file'] = config.Path()
|
||||
|
||||
@ -3,9 +3,7 @@ from __future__ import unicode_literals
|
||||
import os
|
||||
|
||||
import mopidy
|
||||
from mopidy import ext
|
||||
from mopidy.exceptions import ExtensionError
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, exceptions, ext
|
||||
|
||||
|
||||
class Extension(ext.Extension):
|
||||
@ -16,10 +14,10 @@ class Extension(ext.Extension):
|
||||
|
||||
def get_default_config(self):
|
||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
||||
return open(conf_file).read()
|
||||
return config.read(conf_file)
|
||||
|
||||
def get_config_schema(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
schema['username'] = config.String()
|
||||
schema['password'] = config.String(secret=True)
|
||||
schema['bitrate'] = config.Integer(choices=(96, 160, 320))
|
||||
@ -31,7 +29,7 @@ class Extension(ext.Extension):
|
||||
try:
|
||||
import spotify # noqa
|
||||
except ImportError as e:
|
||||
raise ExtensionError('pyspotify library not found', e)
|
||||
raise exceptions.ExtensionError('pyspotify library not found', e)
|
||||
|
||||
def get_backend_classes(self):
|
||||
from .actor import SpotifyBackend
|
||||
|
||||
@ -3,8 +3,7 @@ from __future__ import unicode_literals
|
||||
import os
|
||||
|
||||
import mopidy
|
||||
from mopidy import ext
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, ext
|
||||
|
||||
|
||||
class Extension(ext.Extension):
|
||||
@ -15,10 +14,10 @@ class Extension(ext.Extension):
|
||||
|
||||
def get_default_config(self):
|
||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
||||
return open(conf_file).read()
|
||||
return config.read(conf_file)
|
||||
|
||||
def get_config_schema(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
schema['protocols'] = config.List()
|
||||
return schema
|
||||
|
||||
|
||||
@ -1,37 +0,0 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import os
|
||||
|
||||
from mopidy.utils import config
|
||||
|
||||
|
||||
default_config_file = os.path.join(os.path.dirname(__file__), 'default.conf')
|
||||
default_config = open(default_config_file).read()
|
||||
|
||||
|
||||
config_schemas = {} # TODO: use ordered dict?
|
||||
config_schemas['logging'] = config.ConfigSchema()
|
||||
config_schemas['logging']['console_format'] = config.String()
|
||||
config_schemas['logging']['debug_format'] = config.String()
|
||||
config_schemas['logging']['debug_file'] = config.Path()
|
||||
|
||||
config_schemas['logging.levels'] = config.LogLevelConfigSchema()
|
||||
|
||||
config_schemas['audio'] = config.ConfigSchema()
|
||||
config_schemas['audio']['mixer'] = config.String()
|
||||
config_schemas['audio']['mixer_track'] = config.String(optional=True)
|
||||
config_schemas['audio']['output'] = config.String()
|
||||
|
||||
config_schemas['proxy'] = config.ConfigSchema()
|
||||
config_schemas['proxy']['hostname'] = config.Hostname(optional=True)
|
||||
config_schemas['proxy']['username'] = config.String(optional=True)
|
||||
config_schemas['proxy']['password'] = config.String(optional=True, secret=True)
|
||||
|
||||
# NOTE: if multiple outputs ever comes something like LogLevelConfigSchema
|
||||
#config_schemas['audio.outputs'] = config.AudioOutputConfigSchema()
|
||||
|
||||
|
||||
def register_schema(name, schema):
|
||||
if name in config_schemas:
|
||||
raise Exception
|
||||
config_schemas[name] = schema
|
||||
127
mopidy/config/__init__.py
Normal file
127
mopidy/config/__init__.py
Normal file
@ -0,0 +1,127 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import codecs
|
||||
import ConfigParser as configparser
|
||||
import io
|
||||
import logging
|
||||
import os.path
|
||||
import sys
|
||||
|
||||
from mopidy.config.schemas import *
|
||||
from mopidy.config.types import *
|
||||
from mopidy.utils import path
|
||||
|
||||
logger = logging.getLogger('mopdiy.config')
|
||||
|
||||
config_schemas = {} # TODO: use ordered dict or list?
|
||||
config_schemas['logging'] = ConfigSchema()
|
||||
config_schemas['logging']['console_format'] = String()
|
||||
config_schemas['logging']['debug_format'] = String()
|
||||
config_schemas['logging']['debug_file'] = Path()
|
||||
|
||||
config_schemas['logging.levels'] = LogLevelConfigSchema()
|
||||
|
||||
config_schemas['audio'] = ConfigSchema()
|
||||
config_schemas['audio']['mixer'] = String()
|
||||
config_schemas['audio']['mixer_track'] = String(optional=True)
|
||||
config_schemas['audio']['output'] = String()
|
||||
|
||||
config_schemas['proxy'] = ConfigSchema()
|
||||
config_schemas['proxy']['hostname'] = Hostname(optional=True)
|
||||
config_schemas['proxy']['username'] = String(optional=True)
|
||||
config_schemas['proxy']['password'] = String(optional=True, secret=True)
|
||||
|
||||
# NOTE: if multiple outputs ever comes something like LogLevelConfigSchema
|
||||
#config_schemas['audio.outputs'] = config.AudioOutputConfigSchema()
|
||||
|
||||
|
||||
def read(config_file):
|
||||
"""Helper to load defaults in same way across core and extensions."""
|
||||
with io.open(config_file, 'rb') as filehandle:
|
||||
return filehandle.read()
|
||||
|
||||
|
||||
def load(files, overrides, extensions=None):
|
||||
config_dir = os.path.dirname(__file__)
|
||||
defaults = [read(os.path.join(config_dir, 'default.conf'))]
|
||||
if extensions:
|
||||
defaults.extend(e.get_default_config() for e in extensions)
|
||||
return _load(files, defaults, overrides)
|
||||
|
||||
|
||||
# TODO: replace load() with this version of API.
|
||||
def _load(files, defaults, overrides):
|
||||
parser = configparser.RawConfigParser()
|
||||
|
||||
files = [path.expand_path(f) for f in files]
|
||||
sources = ['builtin-defaults'] + files + ['command-line']
|
||||
logger.info('Loading config from: %s', ', '.join(sources))
|
||||
for default in defaults: # TODO: remove decoding
|
||||
parser.readfp(io.StringIO(default.decode('utf-8')))
|
||||
|
||||
# Load config from a series of config files
|
||||
for filename in files:
|
||||
# TODO: if this is the initial load of logging config we might not have
|
||||
# a logger at this point, we might want to handle this better.
|
||||
try:
|
||||
with codecs.open(filename, encoding='utf-8') as filehandle:
|
||||
parser.readfp(filehandle)
|
||||
except IOError:
|
||||
logger.debug('Config file %s not found; skipping', filename)
|
||||
continue
|
||||
except UnicodeDecodeError:
|
||||
logger.error('Config file %s is not UTF-8 encoded', filename)
|
||||
sys.exit(1)
|
||||
|
||||
raw_config = {}
|
||||
for section in parser.sections():
|
||||
raw_config[section] = dict(parser.items(section))
|
||||
|
||||
for section, key, value in overrides or []:
|
||||
raw_config.setdefault(section, {})[key] = value
|
||||
|
||||
return raw_config
|
||||
|
||||
|
||||
def validate(raw_config, schemas, extensions=None):
|
||||
# Collect config schemas to validate against
|
||||
sections_and_schemas = schemas.items()
|
||||
for extension in extensions or []:
|
||||
sections_and_schemas.append(
|
||||
(extension.ext_name, extension.get_config_schema()))
|
||||
|
||||
config, errors = _validate(raw_config, sections_and_schemas)
|
||||
|
||||
if errors:
|
||||
# TODO: raise error instead.
|
||||
#raise exceptions.ConfigError(errors)
|
||||
for error in errors:
|
||||
logger.error(error)
|
||||
sys.exit(1)
|
||||
|
||||
return config
|
||||
|
||||
|
||||
# TODO: replace validate() with this version of API.
|
||||
def _validate(raw_config, schemas):
|
||||
# Get validated config
|
||||
config = {}
|
||||
errors = []
|
||||
for name, schema in schemas:
|
||||
try:
|
||||
items = raw_config[name].items()
|
||||
config[name] = schema.convert(items)
|
||||
except KeyError:
|
||||
errors.append('%s: section not found.' % name)
|
||||
except exceptions.ConfigError as error:
|
||||
for key in error:
|
||||
errors.append('%s/%s: %s' % (name, key, error[key]))
|
||||
# TODO: raise errors instead of return
|
||||
return config, errors
|
||||
|
||||
|
||||
def parse_override(override):
|
||||
"""Parse section/key=value override."""
|
||||
section, remainder = override.split('/', 1)
|
||||
key, value = remainder.split('=', 1)
|
||||
return (section.strip(), key.strip(), value.strip())
|
||||
136
mopidy/config/schemas.py
Normal file
136
mopidy/config/schemas.py
Normal file
@ -0,0 +1,136 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.config import types
|
||||
|
||||
|
||||
def _did_you_mean(name, choices):
|
||||
"""Suggest most likely setting based on levenshtein."""
|
||||
if not choices:
|
||||
return None
|
||||
|
||||
name = name.lower()
|
||||
candidates = [(_levenshtein(name, c), c) for c in choices]
|
||||
candidates.sort()
|
||||
|
||||
if candidates[0][0] <= 3:
|
||||
return candidates[0][1]
|
||||
return None
|
||||
|
||||
|
||||
def _levenshtein(a, b):
|
||||
"""Calculates the Levenshtein distance between a and b."""
|
||||
n, m = len(a), len(b)
|
||||
if n > m:
|
||||
return _levenshtein(b, a)
|
||||
|
||||
current = xrange(n + 1)
|
||||
for i in xrange(1, m + 1):
|
||||
previous, current = current, [i] + [0] * n
|
||||
for j in xrange(1, n + 1):
|
||||
add, delete = previous[j] + 1, current[j - 1] + 1
|
||||
change = previous[j - 1]
|
||||
if a[j - 1] != b[i - 1]:
|
||||
change += 1
|
||||
current[j] = min(add, delete, change)
|
||||
return current[n]
|
||||
|
||||
|
||||
class ConfigSchema(object):
|
||||
"""Logical group of config values that correspond to a config section.
|
||||
|
||||
Schemas are set up by assigning config keys with config values to
|
||||
instances. Once setup :meth:`convert` can be called with a list of
|
||||
``(key, value)`` tuples to process. For convienience we also support
|
||||
:meth:`format` method that can used for printing out the converted values.
|
||||
"""
|
||||
# TODO: Use collections.OrderedDict once 2.6 support is gone (#344)
|
||||
def __init__(self):
|
||||
self._schema = {}
|
||||
self._order = []
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key not in self._schema:
|
||||
self._order.append(key)
|
||||
self._schema[key] = value
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._schema[key]
|
||||
|
||||
def format(self, name, values):
|
||||
# TODO: should the output be encoded utf-8 since we use that in
|
||||
# serialize for strings?
|
||||
lines = ['[%s]' % name]
|
||||
for key in self._order:
|
||||
value = values.get(key)
|
||||
if value is not None:
|
||||
lines.append('%s = %s' % (
|
||||
key, self._schema[key].format(value)))
|
||||
return '\n'.join(lines)
|
||||
|
||||
def convert(self, items):
|
||||
errors = {}
|
||||
values = {}
|
||||
|
||||
for key, value in items:
|
||||
try:
|
||||
values[key] = self._schema[key].deserialize(value)
|
||||
except KeyError: # not in our schema
|
||||
errors[key] = 'unknown config key.'
|
||||
suggestion = _did_you_mean(key, self._schema.keys())
|
||||
if suggestion:
|
||||
errors[key] += ' Did you mean %s?' % suggestion
|
||||
except ValueError as e: # deserialization failed
|
||||
errors[key] = str(e)
|
||||
|
||||
for key in self._schema:
|
||||
if key not in values and key not in errors:
|
||||
errors[key] = 'config key not found.'
|
||||
|
||||
if errors:
|
||||
raise exceptions.ConfigError(errors)
|
||||
return values
|
||||
|
||||
|
||||
class ExtensionConfigSchema(ConfigSchema):
|
||||
"""Sub-classed :class:`ConfigSchema` for use in extensions.
|
||||
|
||||
Ensures that ``enabled`` config value is present.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(ExtensionConfigSchema, self).__init__()
|
||||
self['enabled'] = types.Boolean()
|
||||
|
||||
|
||||
class LogLevelConfigSchema(object):
|
||||
"""Special cased schema for handling a config section with loglevels.
|
||||
|
||||
Expects the config keys to be logger names and the values to be log levels
|
||||
as understood by the :class:`LogLevel` config value. Does not sub-class
|
||||
:class:`ConfigSchema`, but implements the same interface.
|
||||
"""
|
||||
def __init__(self):
|
||||
self._config_value = types.LogLevel()
|
||||
|
||||
def format(self, name, values):
|
||||
lines = ['[%s]' % name]
|
||||
for key, value in sorted(values.items()):
|
||||
if value is not None:
|
||||
lines.append('%s = %s' % (
|
||||
key, self._config_value.format(value)))
|
||||
return '\n'.join(lines)
|
||||
|
||||
def convert(self, items):
|
||||
errors = {}
|
||||
values = {}
|
||||
|
||||
for key, value in items:
|
||||
try:
|
||||
if value.strip():
|
||||
values[key] = self._config_value.deserialize(value)
|
||||
except ValueError as e: # deserialization failed
|
||||
errors[key] = str(e)
|
||||
|
||||
if errors:
|
||||
raise exceptions.ConfigError(errors)
|
||||
return values
|
||||
212
mopidy/config/types.py
Normal file
212
mopidy/config/types.py
Normal file
@ -0,0 +1,212 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import re
|
||||
import socket
|
||||
|
||||
from mopidy.utils import path
|
||||
from mopidy.config import validators
|
||||
|
||||
|
||||
class ConfigValue(object):
|
||||
"""Represents a config key's value and how to handle it.
|
||||
|
||||
Normally you will only be interacting with sub-classes for config values
|
||||
that encode either deserialization behavior and/or validation.
|
||||
|
||||
Each config value should be used for the following actions:
|
||||
|
||||
1. Deserializing from a raw string and validating, raising ValueError on
|
||||
failure.
|
||||
2. Serializing a value back to a string that can be stored in a config.
|
||||
3. Formatting a value to a printable form (useful for masking secrets).
|
||||
|
||||
:class:`None` values should not be deserialized, serialized or formatted,
|
||||
the code interacting with the config should simply skip None config values.
|
||||
"""
|
||||
|
||||
#: Collection of valid choices for converted value. Must be combined with
|
||||
#: :function:`validate_choices` in :method:`validate` do any thing.
|
||||
choices = None
|
||||
|
||||
#: Minimum of converted value. Must be combined with
|
||||
#: :function:`validate_minimum` in :method:`validate` do any thing.
|
||||
minimum = None
|
||||
|
||||
#: Maximum of converted value. Must be combined with
|
||||
#: :function:`validate_maximum` in :method:`validate` do any thing.
|
||||
maximum = None
|
||||
|
||||
#: Indicate if this field is required.
|
||||
optional = None
|
||||
|
||||
#: Indicate if we should mask the when printing for human consumption.
|
||||
secret = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.choices = kwargs.get('choices')
|
||||
self.minimum = kwargs.get('minimum')
|
||||
self.maximum = kwargs.get('maximum')
|
||||
self.optional = kwargs.get('optional')
|
||||
self.secret = kwargs.get('secret')
|
||||
|
||||
def deserialize(self, value):
|
||||
"""Cast raw string to appropriate type."""
|
||||
return value
|
||||
|
||||
def serialize(self, value):
|
||||
"""Convert value back to string for saving."""
|
||||
return str(value)
|
||||
|
||||
def format(self, value):
|
||||
"""Format value for display."""
|
||||
if self.secret and value is not None:
|
||||
return '********'
|
||||
return self.serialize(value)
|
||||
|
||||
|
||||
class String(ConfigValue):
|
||||
"""String values.
|
||||
|
||||
Supports: optional, choices and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
value = value.strip()
|
||||
validators.validate_required(value, not self.optional)
|
||||
validators.validate_choice(value, self.choices)
|
||||
if not value:
|
||||
return None
|
||||
return value
|
||||
|
||||
def serialize(self, value):
|
||||
return value.encode('utf-8').encode('string-escape')
|
||||
|
||||
|
||||
class Integer(ConfigValue):
|
||||
"""Integer values.
|
||||
|
||||
Supports: choices, minimum, maximum and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
value = int(value)
|
||||
validators.validate_choice(value, self.choices)
|
||||
validators.validate_minimum(value, self.minimum)
|
||||
validators.validate_maximum(value, self.maximum)
|
||||
return value
|
||||
|
||||
|
||||
class Boolean(ConfigValue):
|
||||
"""Boolean values.
|
||||
|
||||
Supports: secret.
|
||||
"""
|
||||
true_values = ('1', 'yes', 'true', 'on')
|
||||
false_values = ('0', 'no', 'false', 'off')
|
||||
|
||||
def deserialize(self, value):
|
||||
if value.lower() in self.true_values:
|
||||
return True
|
||||
elif value.lower() in self.false_values:
|
||||
return False
|
||||
|
||||
raise ValueError('invalid value for boolean: %r' % value)
|
||||
|
||||
def serialize(self, value):
|
||||
if value:
|
||||
return 'true'
|
||||
else:
|
||||
return 'false'
|
||||
|
||||
|
||||
class List(ConfigValue):
|
||||
"""List values split by comma or newline.
|
||||
|
||||
Supports: optional and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
validators.validate_required(value, not self.optional)
|
||||
if '\n' in value:
|
||||
values = re.split(r'\s*\n\s*', value.strip())
|
||||
else:
|
||||
values = re.split(r'\s*,\s*', value.strip())
|
||||
return tuple([v for v in values if v])
|
||||
|
||||
def serialize(self, value):
|
||||
return '\n ' + '\n '.join(v.encode('utf-8') for v in value)
|
||||
|
||||
|
||||
class LogLevel(ConfigValue):
|
||||
"""Log level values.
|
||||
|
||||
Supports: secret.
|
||||
"""
|
||||
levels = {
|
||||
'critical': logging.CRITICAL,
|
||||
'error': logging.ERROR,
|
||||
'warning': logging.WARNING,
|
||||
'info': logging.INFO,
|
||||
'debug': logging.DEBUG,
|
||||
}
|
||||
|
||||
def deserialize(self, value):
|
||||
validators.validate_choice(value.lower(), self.levels.keys())
|
||||
return self.levels.get(value.lower())
|
||||
|
||||
def serialize(self, value):
|
||||
return dict((v, k) for k, v in self.levels.items()).get(value)
|
||||
|
||||
|
||||
class Hostname(ConfigValue):
|
||||
"""Hostname values.
|
||||
|
||||
Supports: optional and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
validators.validate_required(value, not self.optional)
|
||||
if not value.strip():
|
||||
return None
|
||||
try:
|
||||
socket.getaddrinfo(value, None)
|
||||
except socket.error:
|
||||
raise ValueError('must be a resolveable hostname or valid IP')
|
||||
return value
|
||||
|
||||
|
||||
class Port(Integer):
|
||||
"""Port values limited to 1-65535.
|
||||
|
||||
Supports: choices and secret.
|
||||
"""
|
||||
# TODO: consider probing if port is free or not?
|
||||
def __init__(self, **kwargs):
|
||||
super(Port, self).__init__(**kwargs)
|
||||
self.minimum = 1
|
||||
self.maximum = 2 ** 16 - 1
|
||||
|
||||
|
||||
class ExpandedPath(bytes):
|
||||
def __new__(self, value):
|
||||
expanded = path.expand_path(value)
|
||||
return super(ExpandedPath, self).__new__(self, expanded)
|
||||
|
||||
def __init__(self, value):
|
||||
self.original = value
|
||||
|
||||
|
||||
class Path(ConfigValue):
|
||||
"""File system path that will be expanded.
|
||||
|
||||
Supports: optional, choices and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
value = value.strip()
|
||||
validators.validate_required(value, not self.optional)
|
||||
validators.validate_choice(value, self.choices)
|
||||
if not value:
|
||||
return None
|
||||
return ExpandedPath(value)
|
||||
|
||||
def serialize(self, value):
|
||||
if isinstance(value, ExpandedPath):
|
||||
return value.original
|
||||
return value
|
||||
29
mopidy/config/validators.py
Normal file
29
mopidy/config/validators.py
Normal file
@ -0,0 +1,29 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
# TODO: add validate regexp?
|
||||
|
||||
|
||||
def validate_required(value, required):
|
||||
"""Required validation, normally called in config value's validate() on the
|
||||
raw string, _not_ the converted value."""
|
||||
if required and not value.strip():
|
||||
raise ValueError('must be set.')
|
||||
|
||||
|
||||
def validate_choice(value, choices):
|
||||
"""Choice validation, normally called in config value's validate()."""
|
||||
if choices is not None and value not in choices:
|
||||
names = ', '.join(repr(c) for c in choices)
|
||||
raise ValueError('must be one of %s, not %s.' % (names, value))
|
||||
|
||||
|
||||
def validate_minimum(value, minimum):
|
||||
"""Minimum validation, normally called in config value's validate()."""
|
||||
if minimum is not None and value < minimum:
|
||||
raise ValueError('%r must be larger than %r.' % (value, minimum))
|
||||
|
||||
|
||||
def validate_maximum(value, maximum):
|
||||
"""Maximum validation, normally called in config value's validate()."""
|
||||
if maximum is not None and value > maximum:
|
||||
raise ValueError('%r must be smaller than %r.' % (value, maximum))
|
||||
@ -4,7 +4,7 @@ import logging
|
||||
import pkg_resources
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.utils import config as config_utils
|
||||
from mopidy import config as config_lib
|
||||
|
||||
|
||||
logger = logging.getLogger('mopidy.ext')
|
||||
@ -24,7 +24,7 @@ class Extension(object):
|
||||
|
||||
def get_config_schema(self):
|
||||
"""TODO"""
|
||||
return config_utils.ExtensionConfigSchema()
|
||||
return config_lib.ExtensionConfigSchema()
|
||||
|
||||
def validate_environment(self):
|
||||
"""TODO"""
|
||||
@ -83,7 +83,7 @@ def load_extensions():
|
||||
|
||||
|
||||
def filter_enabled_extensions(raw_config, extensions):
|
||||
boolean = config_utils.Boolean()
|
||||
boolean = config_lib.Boolean()
|
||||
enabled_extensions = []
|
||||
enabled_names = []
|
||||
disabled_names = []
|
||||
|
||||
@ -3,8 +3,7 @@ from __future__ import unicode_literals
|
||||
import os
|
||||
|
||||
import mopidy
|
||||
from mopidy import exceptions, ext
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, exceptions, ext
|
||||
|
||||
|
||||
class Extension(ext.Extension):
|
||||
@ -15,10 +14,10 @@ class Extension(ext.Extension):
|
||||
|
||||
def get_default_config(self):
|
||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
||||
return open(conf_file).read()
|
||||
return config.read(conf_file)
|
||||
|
||||
def get_config_schema(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
schema['hostname'] = config.Hostname()
|
||||
schema['port'] = config.Port()
|
||||
schema['static_dir'] = config.Path(optional=True)
|
||||
|
||||
@ -3,8 +3,7 @@ from __future__ import unicode_literals
|
||||
import os
|
||||
|
||||
import mopidy
|
||||
from mopidy import ext
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, ext
|
||||
|
||||
|
||||
class Extension(ext.Extension):
|
||||
@ -15,10 +14,10 @@ class Extension(ext.Extension):
|
||||
|
||||
def get_default_config(self):
|
||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
||||
return open(conf_file).read()
|
||||
return config.read(conf_file)
|
||||
|
||||
def get_config_schema(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
schema['hostname'] = config.Hostname()
|
||||
schema['port'] = config.Port()
|
||||
schema['password'] = config.String(optional=True, secret=True)
|
||||
|
||||
@ -3,8 +3,7 @@ from __future__ import unicode_literals
|
||||
import os
|
||||
|
||||
import mopidy
|
||||
from mopidy import exceptions, ext
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, exceptions, ext
|
||||
|
||||
|
||||
class Extension(ext.Extension):
|
||||
@ -15,10 +14,10 @@ class Extension(ext.Extension):
|
||||
|
||||
def get_default_config(self):
|
||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
||||
return open(conf_file).read()
|
||||
return config.read(conf_file)
|
||||
|
||||
def get_config_schema(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
schema['desktop_file'] = config.Path()
|
||||
return schema
|
||||
|
||||
|
||||
@ -3,8 +3,7 @@ from __future__ import unicode_literals
|
||||
import os
|
||||
|
||||
import mopidy
|
||||
from mopidy import exceptions, ext
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, exceptions, ext
|
||||
|
||||
|
||||
class Extension(ext.Extension):
|
||||
@ -15,10 +14,10 @@ class Extension(ext.Extension):
|
||||
|
||||
def get_default_config(self):
|
||||
conf_file = os.path.join(os.path.dirname(__file__), 'ext.conf')
|
||||
return open(conf_file).read()
|
||||
return config.read(conf_file)
|
||||
|
||||
def get_config_schema(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
schema = super(Extension, self).get_config_schema()
|
||||
schema['username'] = config.String()
|
||||
schema['password'] = config.String(secret=True)
|
||||
return schema
|
||||
|
||||
@ -1,372 +0,0 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import re
|
||||
import socket
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.utils import path
|
||||
|
||||
|
||||
def validate_required(value, required):
|
||||
"""Required validation, normally called in config value's validate() on the
|
||||
raw string, _not_ the converted value."""
|
||||
if required and not value.strip():
|
||||
raise ValueError('must be set.')
|
||||
|
||||
|
||||
def validate_choice(value, choices):
|
||||
"""Choice validation, normally called in config value's validate()."""
|
||||
if choices is not None and value not in choices:
|
||||
names = ', '.join(repr(c) for c in choices)
|
||||
raise ValueError('must be one of %s, not %s.' % (names, value))
|
||||
|
||||
|
||||
def validate_minimum(value, minimum):
|
||||
"""Minimum validation, normally called in config value's validate()."""
|
||||
if minimum is not None and value < minimum:
|
||||
raise ValueError('%r must be larger than %r.' % (value, minimum))
|
||||
|
||||
|
||||
def validate_maximum(value, maximum):
|
||||
"""Maximum validation, normally called in config value's validate()."""
|
||||
if maximum is not None and value > maximum:
|
||||
raise ValueError('%r must be smaller than %r.' % (value, maximum))
|
||||
|
||||
|
||||
# TODO: move this and levenshtein to a more appropriate class.
|
||||
def did_you_mean(name, choices):
|
||||
"""Suggest most likely setting based on levenshtein."""
|
||||
if not choices:
|
||||
return None
|
||||
|
||||
name = name.lower()
|
||||
candidates = [(levenshtein(name, c), c) for c in choices]
|
||||
candidates.sort()
|
||||
|
||||
if candidates[0][0] <= 3:
|
||||
return candidates[0][1]
|
||||
return None
|
||||
|
||||
|
||||
def levenshtein(a, b):
|
||||
"""Calculates the Levenshtein distance between a and b."""
|
||||
n, m = len(a), len(b)
|
||||
if n > m:
|
||||
return levenshtein(b, a)
|
||||
|
||||
current = xrange(n + 1)
|
||||
for i in xrange(1, m + 1):
|
||||
previous, current = current, [i] + [0] * n
|
||||
for j in xrange(1, n + 1):
|
||||
add, delete = previous[j] + 1, current[j - 1] + 1
|
||||
change = previous[j - 1]
|
||||
if a[j - 1] != b[i - 1]:
|
||||
change += 1
|
||||
current[j] = min(add, delete, change)
|
||||
return current[n]
|
||||
|
||||
|
||||
class ConfigValue(object):
|
||||
"""Represents a config key's value and how to handle it.
|
||||
|
||||
Normally you will only be interacting with sub-classes for config values
|
||||
that encode either deserialization behavior and/or validation.
|
||||
|
||||
Each config value should be used for the following actions:
|
||||
|
||||
1. Deserializing from a raw string and validating, raising ValueError on
|
||||
failure.
|
||||
2. Serializing a value back to a string that can be stored in a config.
|
||||
3. Formatting a value to a printable form (useful for masking secrets).
|
||||
|
||||
:class:`None` values should not be deserialized, serialized or formatted,
|
||||
the code interacting with the config should simply skip None config values.
|
||||
"""
|
||||
|
||||
#: Collection of valid choices for converted value. Must be combined with
|
||||
#: :function:`validate_choices` in :method:`validate` do any thing.
|
||||
choices = None
|
||||
|
||||
#: Minimum of converted value. Must be combined with
|
||||
#: :function:`validate_minimum` in :method:`validate` do any thing.
|
||||
minimum = None
|
||||
|
||||
#: Maximum of converted value. Must be combined with
|
||||
#: :function:`validate_maximum` in :method:`validate` do any thing.
|
||||
maximum = None
|
||||
|
||||
#: Indicate if this field is required.
|
||||
optional = None
|
||||
|
||||
#: Indicate if we should mask the when printing for human consumption.
|
||||
secret = None
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.choices = kwargs.get('choices')
|
||||
self.minimum = kwargs.get('minimum')
|
||||
self.maximum = kwargs.get('maximum')
|
||||
self.optional = kwargs.get('optional')
|
||||
self.secret = kwargs.get('secret')
|
||||
|
||||
def deserialize(self, value):
|
||||
"""Cast raw string to appropriate type."""
|
||||
return value
|
||||
|
||||
def serialize(self, value):
|
||||
"""Convert value back to string for saving."""
|
||||
return str(value)
|
||||
|
||||
def format(self, value):
|
||||
"""Format value for display."""
|
||||
if self.secret and value is not None:
|
||||
return '********'
|
||||
return self.serialize(value)
|
||||
|
||||
|
||||
class String(ConfigValue):
|
||||
"""String values.
|
||||
|
||||
Supports: optional, choices and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
value = value.strip()
|
||||
validate_required(value, not self.optional)
|
||||
validate_choice(value, self.choices)
|
||||
if not value:
|
||||
return None
|
||||
return value
|
||||
|
||||
def serialize(self, value):
|
||||
return value.encode('utf-8').encode('string-escape')
|
||||
|
||||
|
||||
class Integer(ConfigValue):
|
||||
"""Integer values.
|
||||
|
||||
Supports: choices, minimum, maximum and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
value = int(value)
|
||||
validate_choice(value, self.choices)
|
||||
validate_minimum(value, self.minimum)
|
||||
validate_maximum(value, self.maximum)
|
||||
return value
|
||||
|
||||
|
||||
class Boolean(ConfigValue):
|
||||
"""Boolean values.
|
||||
|
||||
Supports: secret.
|
||||
"""
|
||||
true_values = ('1', 'yes', 'true', 'on')
|
||||
false_values = ('0', 'no', 'false', 'off')
|
||||
|
||||
def deserialize(self, value):
|
||||
if value.lower() in self.true_values:
|
||||
return True
|
||||
elif value.lower() in self.false_values:
|
||||
return False
|
||||
|
||||
raise ValueError('invalid value for boolean: %r' % value)
|
||||
|
||||
def serialize(self, value):
|
||||
if value:
|
||||
return 'true'
|
||||
else:
|
||||
return 'false'
|
||||
|
||||
|
||||
class List(ConfigValue):
|
||||
"""List values split by comma or newline.
|
||||
|
||||
Supports: optional and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
validate_required(value, not self.optional)
|
||||
if '\n' in value:
|
||||
values = re.split(r'\s*\n\s*', value.strip())
|
||||
else:
|
||||
values = re.split(r'\s*,\s*', value.strip())
|
||||
return tuple([v for v in values if v])
|
||||
|
||||
def serialize(self, value):
|
||||
return '\n ' + '\n '.join(v.encode('utf-8') for v in value)
|
||||
|
||||
|
||||
class LogLevel(ConfigValue):
|
||||
"""Log level values.
|
||||
|
||||
Supports: secret.
|
||||
"""
|
||||
levels = {
|
||||
'critical': logging.CRITICAL,
|
||||
'error': logging.ERROR,
|
||||
'warning': logging.WARNING,
|
||||
'info': logging.INFO,
|
||||
'debug': logging.DEBUG,
|
||||
}
|
||||
|
||||
def deserialize(self, value):
|
||||
validate_choice(value.lower(), self.levels.keys())
|
||||
return self.levels.get(value.lower())
|
||||
|
||||
def serialize(self, value):
|
||||
return dict((v, k) for k, v in self.levels.items()).get(value)
|
||||
|
||||
|
||||
class Hostname(ConfigValue):
|
||||
"""Hostname values.
|
||||
|
||||
Supports: optional and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
validate_required(value, not self.optional)
|
||||
if not value.strip():
|
||||
return None
|
||||
try:
|
||||
socket.getaddrinfo(value, None)
|
||||
except socket.error:
|
||||
raise ValueError('must be a resolveable hostname or valid IP')
|
||||
return value
|
||||
|
||||
|
||||
class Port(Integer):
|
||||
"""Port values limited to 1-65535.
|
||||
|
||||
Supports: choices and secret.
|
||||
"""
|
||||
# TODO: consider probing if port is free or not?
|
||||
def __init__(self, **kwargs):
|
||||
super(Port, self).__init__(**kwargs)
|
||||
self.minimum = 1
|
||||
self.maximum = 2 ** 16 - 1
|
||||
|
||||
|
||||
class ExpandedPath(bytes):
|
||||
def __new__(self, value):
|
||||
expanded = path.expand_path(value)
|
||||
return super(ExpandedPath, self).__new__(self, expanded)
|
||||
|
||||
def __init__(self, value):
|
||||
self.original = value
|
||||
|
||||
|
||||
class Path(ConfigValue):
|
||||
"""File system path that will be expanded with
|
||||
mopidy.utils.path.expand_path
|
||||
|
||||
Supports: optional, choices and secret.
|
||||
"""
|
||||
def deserialize(self, value):
|
||||
value = value.strip()
|
||||
validate_required(value, not self.optional)
|
||||
validate_choice(value, self.choices)
|
||||
if not value:
|
||||
return None
|
||||
return ExpandedPath(value)
|
||||
|
||||
def serialize(self, value):
|
||||
if isinstance(value, ExpandedPath):
|
||||
return value.original
|
||||
return value
|
||||
|
||||
|
||||
class ConfigSchema(object):
|
||||
"""Logical group of config values that correspond to a config section.
|
||||
|
||||
Schemas are set up by assigning config keys with config values to
|
||||
instances. Once setup :meth:`convert` can be called with a list of ``(key,
|
||||
value)`` tuples to process. For convienience we also support :meth:`format`
|
||||
method that can used for printing out the converted values.
|
||||
"""
|
||||
# TODO: Use collections.OrderedDict once 2.6 support is gone (#344)
|
||||
def __init__(self):
|
||||
self._schema = {}
|
||||
self._order = []
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key not in self._schema:
|
||||
self._order.append(key)
|
||||
self._schema[key] = value
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._schema[key]
|
||||
|
||||
def format(self, name, values):
|
||||
# TODO: should the output be encoded utf-8 since we use that in
|
||||
# serialize for strings?
|
||||
lines = ['[%s]' % name]
|
||||
for key in self._order:
|
||||
value = values.get(key)
|
||||
if value is not None:
|
||||
lines.append('%s = %s' % (
|
||||
key, self._schema[key].format(value)))
|
||||
return '\n'.join(lines)
|
||||
|
||||
def convert(self, items):
|
||||
errors = {}
|
||||
values = {}
|
||||
|
||||
for key, value in items:
|
||||
try:
|
||||
values[key] = self._schema[key].deserialize(value)
|
||||
except KeyError: # not in our schema
|
||||
errors[key] = 'unknown config key.'
|
||||
suggestion = did_you_mean(key, self._schema.keys())
|
||||
if suggestion:
|
||||
errors[key] += ' Did you mean %s?' % suggestion
|
||||
except ValueError as e: # deserialization failed
|
||||
errors[key] = str(e)
|
||||
|
||||
for key in self._schema:
|
||||
if key not in values and key not in errors:
|
||||
errors[key] = 'config key not found.'
|
||||
|
||||
if errors:
|
||||
raise exceptions.ConfigError(errors)
|
||||
return values
|
||||
|
||||
|
||||
class ExtensionConfigSchema(ConfigSchema):
|
||||
"""Sub-classed :class:`ConfigSchema` for use in extensions.
|
||||
|
||||
Ensures that ``enabled`` config value is present.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(ExtensionConfigSchema, self).__init__()
|
||||
self['enabled'] = Boolean()
|
||||
|
||||
|
||||
class LogLevelConfigSchema(object):
|
||||
"""Special cased schema for handling a config section with loglevels.
|
||||
|
||||
Expects the config keys to be logger names and the values to be log levels
|
||||
as understood by the :class:`LogLevel` config value. Does not sub-class
|
||||
:class:`ConfigSchema`, but implements the same interface.
|
||||
"""
|
||||
def __init__(self):
|
||||
self._config_value = LogLevel()
|
||||
|
||||
def format(self, name, values):
|
||||
lines = ['[%s]' % name]
|
||||
for key, value in sorted(values.items()):
|
||||
if value is not None:
|
||||
lines.append('%s = %s' % (
|
||||
key, self._config_value.format(value)))
|
||||
return '\n'.join(lines)
|
||||
|
||||
def convert(self, items):
|
||||
errors = {}
|
||||
values = {}
|
||||
|
||||
for key, value in items:
|
||||
try:
|
||||
if value.strip():
|
||||
values[key] = self._config_value.deserialize(value)
|
||||
except ValueError as e: # deserialization failed
|
||||
errors[key] = str(e)
|
||||
|
||||
if errors:
|
||||
raise exceptions.ConfigError(errors)
|
||||
return values
|
||||
106
tests/config/config_test.py
Normal file
106
tests/config/config_test.py
Normal file
@ -0,0 +1,106 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import mock
|
||||
|
||||
from mopidy import config, exceptions
|
||||
|
||||
from tests import unittest, path_to_data_dir
|
||||
|
||||
|
||||
class LoadConfigTest(unittest.TestCase):
|
||||
def test_load_nothing(self):
|
||||
self.assertEqual({}, config._load([], [], []))
|
||||
|
||||
def test_load_single_default(self):
|
||||
default = '[foo]\nbar = baz'
|
||||
expected = {'foo': {'bar': 'baz'}}
|
||||
result = config._load([], [default], [])
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_load_defaults(self):
|
||||
default1 = '[foo]\nbar = baz'
|
||||
default2 = '[foo2]\n'
|
||||
expected = {'foo': {'bar': 'baz'}, 'foo2': {}}
|
||||
result = config._load([], [default1, default2], [])
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_load_single_override(self):
|
||||
override = ('foo', 'bar', 'baz')
|
||||
expected = {'foo': {'bar': 'baz'}}
|
||||
result = config._load([], [], [override])
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_load_overrides(self):
|
||||
override1 = ('foo', 'bar', 'baz')
|
||||
override2 = ('foo2', 'bar', 'baz')
|
||||
expected = {'foo': {'bar': 'baz'}, 'foo2': {'bar': 'baz'}}
|
||||
result = config._load([], [], [override1, override2])
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_load_single_file(self):
|
||||
file1 = path_to_data_dir('file1.conf')
|
||||
expected = {'foo': {'bar': 'baz'}}
|
||||
result = config._load([file1], [], [])
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
def test_load_files(self):
|
||||
file1 = path_to_data_dir('file1.conf')
|
||||
file2 = path_to_data_dir('file2.conf')
|
||||
expected = {'foo': {'bar': 'baz'}, 'foo2': {'bar': 'baz'}}
|
||||
result = config._load([file1, file2], [], [])
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
|
||||
class ValidateTest(unittest.TestCase):
|
||||
def test_empty_config_no_schemas(self):
|
||||
conf, errors = config._validate({}, [])
|
||||
self.assertEqual({}, conf)
|
||||
self.assertEqual([], errors)
|
||||
|
||||
def test_config_no_schemas(self):
|
||||
raw_config = {'foo': {'bar': 'baz'}}
|
||||
conf, errors = config._validate(raw_config, [])
|
||||
self.assertEqual({}, conf)
|
||||
self.assertEqual([], errors)
|
||||
|
||||
def test_empty_config_single_schema(self):
|
||||
conf, errors = config._validate({}, [('foo', mock.Mock())])
|
||||
self.assertEqual({}, conf)
|
||||
self.assertEqual(['foo: section not found.'], errors)
|
||||
|
||||
def test_config_single_schema(self):
|
||||
raw_config = {'foo': {'bar': 'baz'}}
|
||||
schema = mock.Mock()
|
||||
schema.convert.return_value = {'baz': 'bar'}
|
||||
conf, errors = config._validate(raw_config, [('foo', schema)])
|
||||
self.assertEqual({'foo': {'baz': 'bar'}}, conf)
|
||||
self.assertEqual([], errors)
|
||||
|
||||
def test_config_single_schema_config_error(self):
|
||||
raw_config = {'foo': {'bar': 'baz'}}
|
||||
schema = mock.Mock()
|
||||
schema.convert.side_effect = exceptions.ConfigError({'bar': 'bad'})
|
||||
conf, errors = config._validate(raw_config, [('foo', schema)])
|
||||
self.assertEqual(['foo/bar: bad'], errors)
|
||||
self.assertEqual({}, conf)
|
||||
|
||||
# TODO: add more tests
|
||||
|
||||
|
||||
class ParseOverrideTest(unittest.TestCase):
|
||||
def test_valid_override(self):
|
||||
expected = ('section', 'key', 'value')
|
||||
self.assertEqual(expected, config.parse_override('section/key=value'))
|
||||
self.assertEqual(expected, config.parse_override('section/key=value '))
|
||||
self.assertEqual(expected, config.parse_override('section/key =value'))
|
||||
self.assertEqual(expected, config.parse_override('section /key=value'))
|
||||
|
||||
def test_empty_override(self):
|
||||
expected = ('section', 'key', '')
|
||||
self.assertEqual(expected, config.parse_override('section/key='))
|
||||
self.assertEqual(expected, config.parse_override('section/key= '))
|
||||
|
||||
def test_invalid_override(self):
|
||||
self.assertRaises(ValueError, config.parse_override, 'section/key')
|
||||
self.assertRaises(ValueError, config.parse_override, 'section=')
|
||||
self.assertRaises(ValueError, config.parse_override, 'section')
|
||||
128
tests/config/schemas_test.py
Normal file
128
tests/config/schemas_test.py
Normal file
@ -0,0 +1,128 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import mock
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.config import schemas, types
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ConfigSchemaTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.schema = schemas.ConfigSchema()
|
||||
self.schema['foo'] = mock.Mock()
|
||||
self.schema['bar'] = mock.Mock()
|
||||
self.schema['baz'] = mock.Mock()
|
||||
self.values = {'bar': '123', 'foo': '456', 'baz': '678'}
|
||||
|
||||
def test_format(self):
|
||||
self.schema['foo'].format.return_value = 'qwe'
|
||||
self.schema['bar'].format.return_value = 'asd'
|
||||
self.schema['baz'].format.return_value = 'zxc'
|
||||
|
||||
expected = ['[qwerty]', 'foo = qwe', 'bar = asd', 'baz = zxc']
|
||||
result = self.schema.format('qwerty', self.values)
|
||||
self.assertEqual('\n'.join(expected), result)
|
||||
|
||||
def test_format_unkwown_value(self):
|
||||
self.schema['foo'].format.return_value = 'qwe'
|
||||
self.schema['bar'].format.return_value = 'asd'
|
||||
self.schema['baz'].format.return_value = 'zxc'
|
||||
self.values['unknown'] = 'rty'
|
||||
|
||||
result = self.schema.format('qwerty', self.values)
|
||||
self.assertNotIn('unknown = rty', result)
|
||||
|
||||
def test_convert(self):
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
def test_convert_with_missing_value(self):
|
||||
del self.values['foo']
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('not found', cm.exception['foo'])
|
||||
|
||||
def test_convert_with_extra_value(self):
|
||||
self.values['extra'] = '123'
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('unknown', cm.exception['extra'])
|
||||
|
||||
def test_convert_with_deserialization_error(self):
|
||||
self.schema['foo'].deserialize.side_effect = ValueError('failure')
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('failure', cm.exception['foo'])
|
||||
|
||||
def test_convert_with_multiple_deserialization_errors(self):
|
||||
self.schema['foo'].deserialize.side_effect = ValueError('failure')
|
||||
self.schema['bar'].deserialize.side_effect = ValueError('other')
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('failure', cm.exception['foo'])
|
||||
self.assertIn('other', cm.exception['bar'])
|
||||
|
||||
def test_convert_deserialization_unknown_and_missing_errors(self):
|
||||
self.values['extra'] = '123'
|
||||
self.schema['bar'].deserialize.side_effect = ValueError('failure')
|
||||
del self.values['baz']
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('unknown', cm.exception['extra'])
|
||||
self.assertNotIn('foo', cm.exception)
|
||||
self.assertIn('failure', cm.exception['bar'])
|
||||
self.assertIn('not found', cm.exception['baz'])
|
||||
|
||||
|
||||
class ExtensionConfigSchemaTest(unittest.TestCase):
|
||||
def test_schema_includes_enabled(self):
|
||||
schema = schemas.ExtensionConfigSchema()
|
||||
self.assertIsInstance(schema['enabled'], types.Boolean)
|
||||
|
||||
|
||||
class LogLevelConfigSchemaTest(unittest.TestCase):
|
||||
def test_conversion(self):
|
||||
schema = schemas.LogLevelConfigSchema()
|
||||
result = schema.convert([('foo.bar', 'DEBUG'), ('baz', 'INFO')])
|
||||
|
||||
self.assertEqual(logging.DEBUG, result['foo.bar'])
|
||||
self.assertEqual(logging.INFO, result['baz'])
|
||||
|
||||
def test_format(self):
|
||||
schema = schemas.LogLevelConfigSchema()
|
||||
values = {'foo.bar': logging.DEBUG, 'baz': logging.INFO}
|
||||
expected = ['[levels]', 'baz = info', 'foo.bar = debug']
|
||||
result = schema.format('levels', values)
|
||||
self.assertEqual('\n'.join(expected), result)
|
||||
|
||||
|
||||
class DidYouMeanTest(unittest.TestCase):
|
||||
def testSuggestoins(self):
|
||||
choices = ('enabled', 'username', 'password', 'bitrate', 'timeout')
|
||||
|
||||
suggestion = schemas._did_you_mean('bitrate', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = schemas._did_you_mean('bitrote', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = schemas._did_you_mean('Bitrot', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = schemas._did_you_mean('BTROT', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = schemas._did_you_mean('btro', choices)
|
||||
self.assertEqual(suggestion, None)
|
||||
@ -4,75 +4,14 @@ import logging
|
||||
import mock
|
||||
import socket
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.utils import config
|
||||
from mopidy.config import types
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ValidateChoiceTest(unittest.TestCase):
|
||||
def test_no_choices_passes(self):
|
||||
config.validate_choice('foo', None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
config.validate_choice('foo', ['foo', 'bar', 'baz'])
|
||||
config.validate_choice(1, [1, 2, 3])
|
||||
|
||||
def test_empty_choices_fails(self):
|
||||
self.assertRaises(ValueError, config.validate_choice, 'foo', [])
|
||||
|
||||
def test_invalid_value_fails(self):
|
||||
words = ['foo', 'bar', 'baz']
|
||||
self.assertRaises(ValueError, config.validate_choice, 'foobar', words)
|
||||
self.assertRaises(ValueError, config.validate_choice, 5, [1, 2, 3])
|
||||
|
||||
|
||||
class ValidateMinimumTest(unittest.TestCase):
|
||||
def test_no_minimum_passes(self):
|
||||
config.validate_minimum(10, None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
config.validate_minimum(10, 5)
|
||||
|
||||
def test_to_small_value_fails(self):
|
||||
self.assertRaises(ValueError, config.validate_minimum, 10, 20)
|
||||
|
||||
def test_to_small_value_fails_with_zero_as_minimum(self):
|
||||
self.assertRaises(ValueError, config.validate_minimum, -1, 0)
|
||||
|
||||
|
||||
class ValidateMaximumTest(unittest.TestCase):
|
||||
def test_no_maximum_passes(self):
|
||||
config.validate_maximum(5, None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
config.validate_maximum(5, 10)
|
||||
|
||||
def test_to_large_value_fails(self):
|
||||
self.assertRaises(ValueError, config.validate_maximum, 10, 5)
|
||||
|
||||
def test_to_large_value_fails_with_zero_as_maximum(self):
|
||||
self.assertRaises(ValueError, config.validate_maximum, 5, 0)
|
||||
|
||||
|
||||
class ValidateRequiredTest(unittest.TestCase):
|
||||
def test_passes_when_false(self):
|
||||
config.validate_required('foo', False)
|
||||
config.validate_required('', False)
|
||||
config.validate_required(' ', False)
|
||||
|
||||
def test_passes_when_required_and_set(self):
|
||||
config.validate_required('foo', True)
|
||||
config.validate_required(' foo ', True)
|
||||
|
||||
def test_blocks_when_required_and_emtpy(self):
|
||||
self.assertRaises(ValueError, config.validate_required, '', True)
|
||||
self.assertRaises(ValueError, config.validate_required, ' ', True)
|
||||
|
||||
|
||||
class ConfigValueTest(unittest.TestCase):
|
||||
def test_init(self):
|
||||
value = config.ConfigValue()
|
||||
value = types.ConfigValue()
|
||||
self.assertIsNone(value.choices)
|
||||
self.assertIsNone(value.maximum)
|
||||
self.assertIsNone(value.minimum)
|
||||
@ -82,7 +21,7 @@ class ConfigValueTest(unittest.TestCase):
|
||||
def test_init_with_params(self):
|
||||
kwargs = {'choices': ['foo'], 'minimum': 0, 'maximum': 10,
|
||||
'secret': True, 'optional': True}
|
||||
value = config.ConfigValue(**kwargs)
|
||||
value = types.ConfigValue(**kwargs)
|
||||
self.assertEqual(['foo'], value.choices)
|
||||
self.assertEqual(0, value.minimum)
|
||||
self.assertEqual(10, value.maximum)
|
||||
@ -90,90 +29,90 @@ class ConfigValueTest(unittest.TestCase):
|
||||
self.assertEqual(True, value.secret)
|
||||
|
||||
def test_deserialize_passes_through(self):
|
||||
value = config.ConfigValue()
|
||||
value = types.ConfigValue()
|
||||
obj = object()
|
||||
self.assertEqual(obj, value.deserialize(obj))
|
||||
|
||||
def test_serialize_conversion_to_string(self):
|
||||
value = config.ConfigValue()
|
||||
value = types.ConfigValue()
|
||||
self.assertIsInstance(value.serialize(object()), basestring)
|
||||
|
||||
def test_format_uses_serialize(self):
|
||||
value = config.ConfigValue()
|
||||
value = types.ConfigValue()
|
||||
obj = object()
|
||||
self.assertEqual(value.serialize(obj), value.format(obj))
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.ConfigValue(secret=True)
|
||||
value = types.ConfigValue(secret=True)
|
||||
self.assertEqual('********', value.format(object()))
|
||||
|
||||
|
||||
class StringTest(unittest.TestCase):
|
||||
def test_deserialize_conversion_success(self):
|
||||
value = config.String()
|
||||
value = types.String()
|
||||
self.assertEqual('foo', value.deserialize(' foo '))
|
||||
|
||||
def test_deserialize_enforces_choices(self):
|
||||
value = config.String(choices=['foo', 'bar', 'baz'])
|
||||
value = types.String(choices=['foo', 'bar', 'baz'])
|
||||
self.assertEqual('foo', value.deserialize('foo'))
|
||||
self.assertRaises(ValueError, value.deserialize, 'foobar')
|
||||
|
||||
def test_deserialize_enforces_required(self):
|
||||
value = config.String()
|
||||
value = types.String()
|
||||
self.assertRaises(ValueError, value.deserialize, '')
|
||||
self.assertRaises(ValueError, value.deserialize, ' ')
|
||||
|
||||
def test_deserialize_respects_optional(self):
|
||||
value = config.String(optional=True)
|
||||
value = types.String(optional=True)
|
||||
self.assertIsNone(value.deserialize(''))
|
||||
self.assertIsNone(value.deserialize(' '))
|
||||
|
||||
def test_serialize_string_escapes(self):
|
||||
value = config.String()
|
||||
value = types.String()
|
||||
self.assertEqual(r'\r\n\t', value.serialize('\r\n\t'))
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.String(secret=True)
|
||||
value = types.String(secret=True)
|
||||
self.assertEqual('********', value.format('s3cret'))
|
||||
|
||||
|
||||
class IntegerTest(unittest.TestCase):
|
||||
def test_deserialize_conversion_success(self):
|
||||
value = config.Integer()
|
||||
value = types.Integer()
|
||||
self.assertEqual(123, value.deserialize('123'))
|
||||
self.assertEqual(0, value.deserialize('0'))
|
||||
self.assertEqual(-10, value.deserialize('-10'))
|
||||
|
||||
def test_deserialize_conversion_failure(self):
|
||||
value = config.Integer()
|
||||
value = types.Integer()
|
||||
self.assertRaises(ValueError, value.deserialize, 'asd')
|
||||
self.assertRaises(ValueError, value.deserialize, '3.14')
|
||||
self.assertRaises(ValueError, value.deserialize, '')
|
||||
self.assertRaises(ValueError, value.deserialize, ' ')
|
||||
|
||||
def test_deserialize_enforces_choices(self):
|
||||
value = config.Integer(choices=[1, 2, 3])
|
||||
value = types.Integer(choices=[1, 2, 3])
|
||||
self.assertEqual(3, value.deserialize('3'))
|
||||
self.assertRaises(ValueError, value.deserialize, '5')
|
||||
|
||||
def test_deserialize_enforces_minimum(self):
|
||||
value = config.Integer(minimum=10)
|
||||
value = types.Integer(minimum=10)
|
||||
self.assertEqual(15, value.deserialize('15'))
|
||||
self.assertRaises(ValueError, value.deserialize, '5')
|
||||
|
||||
def test_deserialize_enforces_maximum(self):
|
||||
value = config.Integer(maximum=10)
|
||||
value = types.Integer(maximum=10)
|
||||
self.assertEqual(5, value.deserialize('5'))
|
||||
self.assertRaises(ValueError, value.deserialize, '15')
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.Integer(secret=True)
|
||||
value = types.Integer(secret=True)
|
||||
self.assertEqual('********', value.format('1337'))
|
||||
|
||||
|
||||
class BooleanTest(unittest.TestCase):
|
||||
def test_deserialize_conversion_success(self):
|
||||
value = config.Boolean()
|
||||
value = types.Boolean()
|
||||
for true in ('1', 'yes', 'true', 'on'):
|
||||
self.assertIs(value.deserialize(true), True)
|
||||
self.assertIs(value.deserialize(true.upper()), True)
|
||||
@ -184,24 +123,24 @@ class BooleanTest(unittest.TestCase):
|
||||
self.assertIs(value.deserialize(false.capitalize()), False)
|
||||
|
||||
def test_deserialize_conversion_failure(self):
|
||||
value = config.Boolean()
|
||||
value = types.Boolean()
|
||||
self.assertRaises(ValueError, value.deserialize, 'nope')
|
||||
self.assertRaises(ValueError, value.deserialize, 'sure')
|
||||
self.assertRaises(ValueError, value.deserialize, '')
|
||||
|
||||
def test_serialize(self):
|
||||
value = config.Boolean()
|
||||
value = types.Boolean()
|
||||
self.assertEqual('true', value.serialize(True))
|
||||
self.assertEqual('false', value.serialize(False))
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.Boolean(secret=True)
|
||||
value = types.Boolean(secret=True)
|
||||
self.assertEqual('********', value.format('true'))
|
||||
|
||||
|
||||
class ListTest(unittest.TestCase):
|
||||
def test_deserialize_conversion_success(self):
|
||||
value = config.List()
|
||||
value = types.List()
|
||||
|
||||
expected = ('foo', 'bar', 'baz')
|
||||
self.assertEqual(expected, value.deserialize('foo, bar ,baz '))
|
||||
@ -210,22 +149,22 @@ class ListTest(unittest.TestCase):
|
||||
self.assertEqual(expected, value.deserialize(' foo,bar\nbar\nbaz'))
|
||||
|
||||
def test_deserialize_enforces_required(self):
|
||||
value = config.List()
|
||||
value = types.List()
|
||||
self.assertRaises(ValueError, value.deserialize, '')
|
||||
self.assertRaises(ValueError, value.deserialize, ' ')
|
||||
|
||||
def test_deserialize_respects_optional(self):
|
||||
value = config.List(optional=True)
|
||||
value = types.List(optional=True)
|
||||
self.assertEqual(tuple(), value.deserialize(''))
|
||||
self.assertEqual(tuple(), value.deserialize(' '))
|
||||
|
||||
def test_serialize(self):
|
||||
value = config.List()
|
||||
value = types.List()
|
||||
result = value.serialize(('foo', 'bar', 'baz'))
|
||||
self.assertRegexpMatches(result, r'foo\n\s*bar\n\s*baz')
|
||||
|
||||
|
||||
class BooleanTest(unittest.TestCase):
|
||||
class LogLevelTest(unittest.TestCase):
|
||||
levels = {'critical': logging.CRITICAL,
|
||||
'error': logging.ERROR,
|
||||
'warning': logging.WARNING,
|
||||
@ -233,21 +172,21 @@ class BooleanTest(unittest.TestCase):
|
||||
'debug': logging.DEBUG}
|
||||
|
||||
def test_deserialize_conversion_success(self):
|
||||
value = config.LogLevel()
|
||||
value = types.LogLevel()
|
||||
for name, level in self.levels.items():
|
||||
self.assertEqual(level, value.deserialize(name))
|
||||
self.assertEqual(level, value.deserialize(name.upper()))
|
||||
self.assertEqual(level, value.deserialize(name.capitalize()))
|
||||
|
||||
def test_deserialize_conversion_failure(self):
|
||||
value = config.LogLevel()
|
||||
value = types.LogLevel()
|
||||
self.assertRaises(ValueError, value.deserialize, 'nope')
|
||||
self.assertRaises(ValueError, value.deserialize, 'sure')
|
||||
self.assertRaises(ValueError, value.deserialize, '')
|
||||
self.assertRaises(ValueError, value.deserialize, ' ')
|
||||
|
||||
def test_serialize(self):
|
||||
value = config.LogLevel()
|
||||
value = types.LogLevel()
|
||||
for name, level in self.levels.items():
|
||||
self.assertEqual(name, value.serialize(level))
|
||||
self.assertIsNone(value.serialize(1337))
|
||||
@ -256,26 +195,26 @@ class BooleanTest(unittest.TestCase):
|
||||
class HostnameTest(unittest.TestCase):
|
||||
@mock.patch('socket.getaddrinfo')
|
||||
def test_deserialize_conversion_success(self, getaddrinfo_mock):
|
||||
value = config.Hostname()
|
||||
value = types.Hostname()
|
||||
value.deserialize('example.com')
|
||||
getaddrinfo_mock.assert_called_once_with('example.com', None)
|
||||
|
||||
@mock.patch('socket.getaddrinfo')
|
||||
def test_deserialize_conversion_failure(self, getaddrinfo_mock):
|
||||
value = config.Hostname()
|
||||
value = types.Hostname()
|
||||
getaddrinfo_mock.side_effect = socket.error
|
||||
self.assertRaises(ValueError, value.deserialize, 'example.com')
|
||||
|
||||
@mock.patch('socket.getaddrinfo')
|
||||
def test_deserialize_enforces_required(self, getaddrinfo_mock):
|
||||
value = config.Hostname()
|
||||
value = types.Hostname()
|
||||
self.assertRaises(ValueError, value.deserialize, '')
|
||||
self.assertRaises(ValueError, value.deserialize, ' ')
|
||||
self.assertEqual(0, getaddrinfo_mock.call_count)
|
||||
|
||||
@mock.patch('socket.getaddrinfo')
|
||||
def test_deserialize_respects_optional(self, getaddrinfo_mock):
|
||||
value = config.Hostname(optional=True)
|
||||
value = types.Hostname(optional=True)
|
||||
self.assertIsNone(value.deserialize(''))
|
||||
self.assertIsNone(value.deserialize(' '))
|
||||
self.assertEqual(0, getaddrinfo_mock.call_count)
|
||||
@ -283,14 +222,14 @@ class HostnameTest(unittest.TestCase):
|
||||
|
||||
class PortTest(unittest.TestCase):
|
||||
def test_valid_ports(self):
|
||||
value = config.Port()
|
||||
value = types.Port()
|
||||
self.assertEqual(1, value.deserialize('1'))
|
||||
self.assertEqual(80, value.deserialize('80'))
|
||||
self.assertEqual(6600, value.deserialize('6600'))
|
||||
self.assertEqual(65535, value.deserialize('65535'))
|
||||
|
||||
def test_invalid_ports(self):
|
||||
value = config.Port()
|
||||
value = types.Port()
|
||||
self.assertRaises(ValueError, value.deserialize, '65536')
|
||||
self.assertRaises(ValueError, value.deserialize, '100000')
|
||||
self.assertRaises(ValueError, value.deserialize, '0')
|
||||
@ -300,167 +239,48 @@ class PortTest(unittest.TestCase):
|
||||
|
||||
class ExpandedPathTest(unittest.TestCase):
|
||||
def test_is_bytes(self):
|
||||
self.assertIsInstance(config.ExpandedPath('/tmp'), bytes)
|
||||
self.assertIsInstance(types.ExpandedPath('/tmp'), bytes)
|
||||
|
||||
@mock.patch('mopidy.utils.path.expand_path')
|
||||
def test_defaults_to_expanded(self, expand_path_mock):
|
||||
expand_path_mock.return_value = 'expanded_path'
|
||||
self.assertEqual('expanded_path', config.ExpandedPath('~'))
|
||||
self.assertEqual('expanded_path', types.ExpandedPath('~'))
|
||||
|
||||
@mock.patch('mopidy.utils.path.expand_path')
|
||||
def test_orginal_stores_unexpanded(self, expand_path_mock):
|
||||
self.assertEqual('~', config.ExpandedPath('~').original)
|
||||
self.assertEqual('~', types.ExpandedPath('~').original)
|
||||
|
||||
|
||||
class PathTest(unittest.TestCase):
|
||||
def test_deserialize_conversion_success(self):
|
||||
result = config.Path().deserialize('/foo')
|
||||
result = types.Path().deserialize('/foo')
|
||||
self.assertEqual('/foo', result)
|
||||
self.assertIsInstance(result, config.ExpandedPath)
|
||||
self.assertIsInstance(result, types.ExpandedPath)
|
||||
self.assertIsInstance(result, bytes)
|
||||
|
||||
def test_deserialize_enforces_choices(self):
|
||||
value = config.Path(choices=['/foo', '/bar', '/baz'])
|
||||
value = types.Path(choices=['/foo', '/bar', '/baz'])
|
||||
self.assertEqual('/foo', value.deserialize('/foo'))
|
||||
self.assertRaises(ValueError, value.deserialize, '/foobar')
|
||||
|
||||
def test_deserialize_enforces_required(self):
|
||||
value = config.Path()
|
||||
value = types.Path()
|
||||
self.assertRaises(ValueError, value.deserialize, '')
|
||||
self.assertRaises(ValueError, value.deserialize, ' ')
|
||||
|
||||
def test_deserialize_respects_optional(self):
|
||||
value = config.Path(optional=True)
|
||||
value = types.Path(optional=True)
|
||||
self.assertIsNone(value.deserialize(''))
|
||||
self.assertIsNone(value.deserialize(' '))
|
||||
|
||||
@mock.patch('mopidy.utils.path.expand_path')
|
||||
def test_serialize_uses_original(self, expand_path_mock):
|
||||
expand_path_mock.return_value = 'expanded_path'
|
||||
path = config.ExpandedPath('original_path')
|
||||
value = config.Path()
|
||||
path = types.ExpandedPath('original_path')
|
||||
value = types.Path()
|
||||
self.assertEqual('expanded_path', path)
|
||||
self.assertEqual('original_path', value.serialize(path))
|
||||
|
||||
def test_serialize_plain_string(self):
|
||||
value = config.Path()
|
||||
value = types.Path()
|
||||
self.assertEqual('path', value.serialize('path'))
|
||||
|
||||
|
||||
class ConfigSchemaTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.schema = config.ConfigSchema()
|
||||
self.schema['foo'] = mock.Mock()
|
||||
self.schema['bar'] = mock.Mock()
|
||||
self.schema['baz'] = mock.Mock()
|
||||
self.values = {'bar': '123', 'foo': '456', 'baz': '678'}
|
||||
|
||||
def test_format(self):
|
||||
self.schema['foo'].format.return_value = 'qwe'
|
||||
self.schema['bar'].format.return_value = 'asd'
|
||||
self.schema['baz'].format.return_value = 'zxc'
|
||||
|
||||
expected = ['[qwerty]', 'foo = qwe', 'bar = asd', 'baz = zxc']
|
||||
result = self.schema.format('qwerty', self.values)
|
||||
self.assertEqual('\n'.join(expected), result)
|
||||
|
||||
def test_format_unkwown_value(self):
|
||||
self.schema['foo'].format.return_value = 'qwe'
|
||||
self.schema['bar'].format.return_value = 'asd'
|
||||
self.schema['baz'].format.return_value = 'zxc'
|
||||
self.values['unknown'] = 'rty'
|
||||
|
||||
result = self.schema.format('qwerty', self.values)
|
||||
self.assertNotIn('unknown = rty', result)
|
||||
|
||||
def test_convert(self):
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
def test_convert_with_missing_value(self):
|
||||
del self.values['foo']
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('not found', cm.exception['foo'])
|
||||
|
||||
def test_convert_with_extra_value(self):
|
||||
self.values['extra'] = '123'
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('unknown', cm.exception['extra'])
|
||||
|
||||
def test_convert_with_deserialization_error(self):
|
||||
self.schema['foo'].deserialize.side_effect = ValueError('failure')
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('failure', cm.exception['foo'])
|
||||
|
||||
def test_convert_with_multiple_deserialization_errors(self):
|
||||
self.schema['foo'].deserialize.side_effect = ValueError('failure')
|
||||
self.schema['bar'].deserialize.side_effect = ValueError('other')
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('failure', cm.exception['foo'])
|
||||
self.assertIn('other', cm.exception['bar'])
|
||||
|
||||
def test_convert_deserialization_unknown_and_missing_errors(self):
|
||||
self.values['extra'] = '123'
|
||||
self.schema['bar'].deserialize.side_effect = ValueError('failure')
|
||||
del self.values['baz']
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('unknown', cm.exception['extra'])
|
||||
self.assertNotIn('foo', cm.exception)
|
||||
self.assertIn('failure', cm.exception['bar'])
|
||||
self.assertIn('not found', cm.exception['baz'])
|
||||
|
||||
|
||||
class ExtensionConfigSchemaTest(unittest.TestCase):
|
||||
def test_schema_includes_enabled(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
self.assertIsInstance(schema['enabled'], config.Boolean)
|
||||
|
||||
|
||||
class LogLevelConfigSchemaTest(unittest.TestCase):
|
||||
def test_conversion(self):
|
||||
schema = config.LogLevelConfigSchema()
|
||||
result = schema.convert([('foo.bar', 'DEBUG'), ('baz', 'INFO')])
|
||||
|
||||
self.assertEqual(logging.DEBUG, result['foo.bar'])
|
||||
self.assertEqual(logging.INFO, result['baz'])
|
||||
|
||||
def test_format(self):
|
||||
schema = config.LogLevelConfigSchema()
|
||||
expected = ['[levels]', 'baz = info', 'foo.bar = debug']
|
||||
result = schema.format(
|
||||
'levels', {'foo.bar': logging.DEBUG, 'baz': logging.INFO})
|
||||
self.assertEqual('\n'.join(expected), result)
|
||||
|
||||
|
||||
class DidYouMeanTest(unittest.TestCase):
|
||||
def testSuggestoins(self):
|
||||
choices = ('enabled', 'username', 'password', 'bitrate', 'timeout')
|
||||
|
||||
suggestion = config.did_you_mean('bitrate', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = config.did_you_mean('bitrote', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = config.did_you_mean('Bitrot', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = config.did_you_mean('BTROT', choices)
|
||||
self.assertEqual(suggestion, 'bitrate')
|
||||
|
||||
suggestion = config.did_you_mean('btro', choices)
|
||||
self.assertEqual(suggestion, None)
|
||||
67
tests/config/validator_tests.py
Normal file
67
tests/config/validator_tests.py
Normal file
@ -0,0 +1,67 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from mopidy.config import validators
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ValidateChoiceTest(unittest.TestCase):
|
||||
def test_no_choices_passes(self):
|
||||
validators.validate_choice('foo', None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
validators.validate_choice('foo', ['foo', 'bar', 'baz'])
|
||||
validators.validate_choice(1, [1, 2, 3])
|
||||
|
||||
def test_empty_choices_fails(self):
|
||||
self.assertRaises(ValueError, validators.validate_choice, 'foo', [])
|
||||
|
||||
def test_invalid_value_fails(self):
|
||||
words = ['foo', 'bar', 'baz']
|
||||
self.assertRaises(
|
||||
ValueError, validators.validate_choice, 'foobar', words)
|
||||
self.assertRaises(
|
||||
ValueError, validators.validate_choice, 5, [1, 2, 3])
|
||||
|
||||
|
||||
class ValidateMinimumTest(unittest.TestCase):
|
||||
def test_no_minimum_passes(self):
|
||||
validators.validate_minimum(10, None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
validators.validate_minimum(10, 5)
|
||||
|
||||
def test_to_small_value_fails(self):
|
||||
self.assertRaises(ValueError, validators.validate_minimum, 10, 20)
|
||||
|
||||
def test_to_small_value_fails_with_zero_as_minimum(self):
|
||||
self.assertRaises(ValueError, validators.validate_minimum, -1, 0)
|
||||
|
||||
|
||||
class ValidateMaximumTest(unittest.TestCase):
|
||||
def test_no_maximum_passes(self):
|
||||
validators.validate_maximum(5, None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
validators.validate_maximum(5, 10)
|
||||
|
||||
def test_to_large_value_fails(self):
|
||||
self.assertRaises(ValueError, validators.validate_maximum, 10, 5)
|
||||
|
||||
def test_to_large_value_fails_with_zero_as_maximum(self):
|
||||
self.assertRaises(ValueError, validators.validate_maximum, 5, 0)
|
||||
|
||||
|
||||
class ValidateRequiredTest(unittest.TestCase):
|
||||
def test_passes_when_false(self):
|
||||
validators.validate_required('foo', False)
|
||||
validators.validate_required('', False)
|
||||
validators.validate_required(' ', False)
|
||||
|
||||
def test_passes_when_required_and_set(self):
|
||||
validators.validate_required('foo', True)
|
||||
validators.validate_required(' foo ', True)
|
||||
|
||||
def test_blocks_when_required_and_emtpy(self):
|
||||
self.assertRaises(ValueError, validators.validate_required, '', True)
|
||||
self.assertRaises(ValueError, validators.validate_required, ' ', True)
|
||||
2
tests/data/file1.conf
Normal file
2
tests/data/file1.conf
Normal file
@ -0,0 +1,2 @@
|
||||
[foo]
|
||||
bar = baz
|
||||
2
tests/data/file2.conf
Normal file
2
tests/data/file2.conf
Normal file
@ -0,0 +1,2 @@
|
||||
[foo2]
|
||||
bar = baz
|
||||
@ -1,14 +1,13 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from mopidy.ext import Extension
|
||||
from mopidy.utils import config
|
||||
from mopidy import config, ext
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ExtensionTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ext = Extension()
|
||||
self.ext = ext.Extension()
|
||||
|
||||
def test_dist_name_is_none(self):
|
||||
self.assertIsNone(self.ext.dist_name)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user