config: Move everything to mopidy.config sub-modules

This commit is contained in:
Thomas Adamcik 2013-04-10 22:47:37 +02:00
parent 5816b5e099
commit d90a977a3b
18 changed files with 651 additions and 630 deletions

View File

@ -30,6 +30,7 @@ sys.path.insert(
from mopidy import exceptions, ext
from mopidy.audio import Audio
from mopidy.config import default_config, config_schemas
from mopidy import config as config_utils # TODO: cleanup
from mopidy.core import Core
from mopidy.utils import deps, log, path, process, versioning

View File

@ -1,8 +1,8 @@
from __future__ import unicode_literals
import mopidy
from mopidy import ext
from mopidy.utils import config, formatting
from mopidy import config, ext
from mopidy.utils import formatting
default_config = """

View File

@ -1,9 +1,8 @@
from __future__ import unicode_literals
import mopidy
from mopidy import ext
from mopidy.exceptions import ExtensionError
from mopidy.utils import config, formatting
from mopidy import config, exceptions, ext
from mopidy.utils import formatting
default_config = """
@ -96,7 +95,7 @@ class Extension(ext.Extension):
try:
import spotify # noqa
except ImportError as e:
raise ExtensionError('pyspotify library not found', e)
raise exceptions.ExtensionError('pyspotify library not found', e)
def get_backend_classes(self):
from .actor import SpotifyBackend

View File

@ -1,8 +1,8 @@
from __future__ import unicode_literals
import mopidy
from mopidy import ext
from mopidy.utils import config, formatting
from mopidy import config, ext
from mopidy.utils import formatting
default_config = """

View File

@ -1,6 +1,7 @@
from __future__ import unicode_literals
from mopidy.utils import config
from mopidy.config.schemas import *
from mopidy.config.values import *
default_config = """
@ -24,22 +25,22 @@ password =
"""
config_schemas = {} # TODO: use ordered dict?
config_schemas['logging'] = config.ConfigSchema()
config_schemas['logging']['console_format'] = config.String()
config_schemas['logging']['debug_format'] = config.String()
config_schemas['logging']['debug_file'] = config.Path()
config_schemas['logging'] = ConfigSchema()
config_schemas['logging']['console_format'] = String()
config_schemas['logging']['debug_format'] = String()
config_schemas['logging']['debug_file'] = Path()
config_schemas['logging.levels'] = config.LogLevelConfigSchema()
config_schemas['logging.levels'] = LogLevelConfigSchema()
config_schemas['audio'] = config.ConfigSchema()
config_schemas['audio']['mixer'] = config.String()
config_schemas['audio']['mixer_track'] = config.String(optional=True)
config_schemas['audio']['output'] = config.String()
config_schemas['audio'] = ConfigSchema()
config_schemas['audio']['mixer'] = String()
config_schemas['audio']['mixer_track'] = String(optional=True)
config_schemas['audio']['output'] = String()
config_schemas['proxy'] = config.ConfigSchema()
config_schemas['proxy']['hostname'] = config.Hostname(optional=True)
config_schemas['proxy']['username'] = config.String(optional=True)
config_schemas['proxy']['password'] = config.String(optional=True, secret=True)
config_schemas['proxy'] = ConfigSchema()
config_schemas['proxy']['hostname'] = Hostname(optional=True)
config_schemas['proxy']['username'] = String(optional=True)
config_schemas['proxy']['password'] = String(optional=True, secret=True)
# NOTE: if multiple outputs ever comes something like LogLevelConfigSchema
#config_schemas['audio.outputs'] = config.AudioOutputConfigSchema()

137
mopidy/config/schemas.py Normal file
View File

@ -0,0 +1,137 @@
from mopidy import exceptions
from mopidy.config import values
def _did_you_mean(name, choices):
"""Suggest most likely setting based on levenshtein."""
if not choices:
return None
name = name.lower()
candidates = [(_levenshtein(name, c), c) for c in choices]
candidates.sort()
if candidates[0][0] <= 3:
return candidates[0][1]
return None
def _levenshtein(a, b):
"""Calculates the Levenshtein distance between a and b."""
n, m = len(a), len(b)
if n > m:
return _levenshtein(b, a)
current = xrange(n + 1)
for i in xrange(1, m + 1):
previous, current = current, [i] + [0] * n
for j in xrange(1, n + 1):
add, delete = previous[j] + 1, current[j - 1] + 1
change = previous[j - 1]
if a[j - 1] != b[i - 1]:
change += 1
current[j] = min(add, delete, change)
return current[n]
class ConfigSchema(object):
"""Logical group of config values that correspond to a config section.
Schemas are set up by assigning config keys with config values to
instances. Once setup :meth:`convert` can be called with a list of
``(key, value)`` tuples to process. For convienience we also support
:meth:`format` method that can used for printing out the converted values.
"""
# TODO: Use collections.OrderedDict once 2.6 support is gone (#344)
def __init__(self):
self._schema = {}
self._order = []
def __setitem__(self, key, value):
if key not in self._schema:
self._order.append(key)
self._schema[key] = value
def __getitem__(self, key):
return self._schema[key]
def format(self, name, values):
# TODO: should the output be encoded utf-8 since we use that in
# serialize for strings?
lines = ['[%s]' % name]
for key in self._order:
value = values.get(key)
if value is not None:
lines.append('%s = %s' % (
key, self._schema[key].format(value)))
return '\n'.join(lines)
def convert(self, items):
errors = {}
values = {}
for key, value in items:
try:
values[key] = self._schema[key].deserialize(value)
except KeyError: # not in our schema
errors[key] = 'unknown config key.'
suggestion = _did_you_mean(key, self._schema.keys())
if suggestion:
errors[key] += ' Did you mean %s?' % suggestion
except ValueError as e: # deserialization failed
errors[key] = str(e)
for key in self._schema:
if key not in values and key not in errors:
errors[key] = 'config key not found.'
if errors:
raise exceptions.ConfigError(errors)
return values
class ExtensionConfigSchema(ConfigSchema):
"""Sub-classed :class:`ConfigSchema` for use in extensions.
Ensures that ``enabled`` config value is present.
"""
def __init__(self):
super(ExtensionConfigSchema, self).__init__()
self['enabled'] = values.Boolean()
class LogLevelConfigSchema(object):
"""Special cased schema for handling a config section with loglevels.
Expects the config keys to be logger names and the values to be log levels
as understood by the :class:`LogLevel` config value. Does not sub-class
:class:`ConfigSchema`, but implements the same interface.
"""
def __init__(self):
self._config_value = values.LogLevel()
def format(self, name, values):
lines = ['[%s]' % name]
for key, value in sorted(values.items()):
if value is not None:
lines.append('%s = %s' % (
key, self._config_value.format(value)))
return '\n'.join(lines)
def convert(self, items):
errors = {}
values = {}
for key, value in items:
try:
if value.strip():
values[key] = self._config_value.deserialize(value)
except ValueError as e: # deserialization failed
errors[key] = str(e)
if errors:
raise exceptions.ConfigError(errors)
return values

View File

@ -0,0 +1,28 @@
from __future__ import unicode_literals
# TODO: add validate regexp?
def validate_required(value, required):
"""Required validation, normally called in config value's validate() on the
raw string, _not_ the converted value."""
if required and not value.strip():
raise ValueError('must be set.')
def validate_choice(value, choices):
"""Choice validation, normally called in config value's validate()."""
if choices is not None and value not in choices:
names = ', '.join(repr(c) for c in choices)
raise ValueError('must be one of %s, not %s.' % (names, value))
def validate_minimum(value, minimum):
"""Minimum validation, normally called in config value's validate()."""
if minimum is not None and value < minimum:
raise ValueError('%r must be larger than %r.' % (value, minimum))
def validate_maximum(value, maximum):
"""Maximum validation, normally called in config value's validate()."""
if maximum is not None and value > maximum:
raise ValueError('%r must be smaller than %r.' % (value, maximum))

212
mopidy/config/values.py Normal file
View File

@ -0,0 +1,212 @@
from __future__ import unicode_literals
import logging
import re
import socket
from mopidy.utils import path
from mopidy.config import validators
class ConfigValue(object):
"""Represents a config key's value and how to handle it.
Normally you will only be interacting with sub-classes for config values
that encode either deserialization behavior and/or validation.
Each config value should be used for the following actions:
1. Deserializing from a raw string and validating, raising ValueError on
failure.
2. Serializing a value back to a string that can be stored in a config.
3. Formatting a value to a printable form (useful for masking secrets).
:class:`None` values should not be deserialized, serialized or formatted,
the code interacting with the config should simply skip None config values.
"""
#: Collection of valid choices for converted value. Must be combined with
#: :function:`validate_choices` in :method:`validate` do any thing.
choices = None
#: Minimum of converted value. Must be combined with
#: :function:`validate_minimum` in :method:`validate` do any thing.
minimum = None
#: Maximum of converted value. Must be combined with
#: :function:`validate_maximum` in :method:`validate` do any thing.
maximum = None
#: Indicate if this field is required.
optional = None
#: Indicate if we should mask the when printing for human consumption.
secret = None
def __init__(self, **kwargs):
self.choices = kwargs.get('choices')
self.minimum = kwargs.get('minimum')
self.maximum = kwargs.get('maximum')
self.optional = kwargs.get('optional')
self.secret = kwargs.get('secret')
def deserialize(self, value):
"""Cast raw string to appropriate type."""
return value
def serialize(self, value):
"""Convert value back to string for saving."""
return str(value)
def format(self, value):
"""Format value for display."""
if self.secret and value is not None:
return '********'
return self.serialize(value)
class String(ConfigValue):
"""String values.
Supports: optional, choices and secret.
"""
def deserialize(self, value):
value = value.strip()
validators.validate_required(value, not self.optional)
validators.validate_choice(value, self.choices)
if not value:
return None
return value
def serialize(self, value):
return value.encode('utf-8').encode('string-escape')
class Integer(ConfigValue):
"""Integer values.
Supports: choices, minimum, maximum and secret.
"""
def deserialize(self, value):
value = int(value)
validators.validate_choice(value, self.choices)
validators.validate_minimum(value, self.minimum)
validators.validate_maximum(value, self.maximum)
return value
class Boolean(ConfigValue):
"""Boolean values.
Supports: secret.
"""
true_values = ('1', 'yes', 'true', 'on')
false_values = ('0', 'no', 'false', 'off')
def deserialize(self, value):
if value.lower() in self.true_values:
return True
elif value.lower() in self.false_values:
return False
raise ValueError('invalid value for boolean: %r' % value)
def serialize(self, value):
if value:
return 'true'
else:
return 'false'
class List(ConfigValue):
"""List values split by comma or newline.
Supports: optional and secret.
"""
def deserialize(self, value):
validators.validate_required(value, not self.optional)
if '\n' in value:
values = re.split(r'\s*\n\s*', value.strip())
else:
values = re.split(r'\s*,\s*', value.strip())
return tuple([v for v in values if v])
def serialize(self, value):
return '\n ' + '\n '.join(v.encode('utf-8') for v in value)
class LogLevel(ConfigValue):
"""Log level values.
Supports: secret.
"""
levels = {
'critical': logging.CRITICAL,
'error': logging.ERROR,
'warning': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG,
}
def deserialize(self, value):
validators.validate_choice(value.lower(), self.levels.keys())
return self.levels.get(value.lower())
def serialize(self, value):
return dict((v, k) for k, v in self.levels.items()).get(value)
class Hostname(ConfigValue):
"""Hostname values.
Supports: optional and secret.
"""
def deserialize(self, value):
validators.validate_required(value, not self.optional)
if not value.strip():
return None
try:
socket.getaddrinfo(value, None)
except socket.error:
raise ValueError('must be a resolveable hostname or valid IP')
return value
class Port(Integer):
"""Port values limited to 1-65535.
Supports: choices and secret.
"""
# TODO: consider probing if port is free or not?
def __init__(self, **kwargs):
super(Port, self).__init__(**kwargs)
self.minimum = 1
self.maximum = 2 ** 16 - 1
class ExpandedPath(bytes):
def __new__(self, value):
expanded = path.expand_path(value)
return super(ExpandedPath, self).__new__(self, expanded)
def __init__(self, value):
self.original = value
class Path(ConfigValue):
"""File system path that will be expanded with mopidy.utils.path.expand_path
Supports: optional, choices and secret.
"""
def deserialize(self, value):
value = value.strip()
validators.validate_required(value, not self.optional)
validators.validate_choice(value, self.choices)
if not value:
return None
return ExpandedPath(value)
def serialize(self, value):
if isinstance(value, ExpandedPath):
return value.original
return value

View File

@ -4,7 +4,7 @@ import logging
import pkg_resources
from mopidy import exceptions
from mopidy.utils import config as config_utils
from mopidy import config as config_utils
logger = logging.getLogger('mopidy.ext')

View File

@ -1,8 +1,8 @@
from __future__ import unicode_literals
import mopidy
from mopidy import exceptions, ext
from mopidy.utils import config, formatting
from mopidy import config, exceptions, ext
from mopidy.utils import formatting
default_config = """

View File

@ -1,8 +1,8 @@
from __future__ import unicode_literals
import mopidy
from mopidy import ext
from mopidy.utils import config, formatting
from mopidy import config, ext
from mopidy.utils import formatting
default_config = """

View File

@ -3,8 +3,8 @@ from __future__ import unicode_literals
import os
import mopidy
from mopidy import exceptions, ext
from mopidy.utils import formatting, config
from mopidy import config, exceptions, ext
from mopidy.utils import formatting
default_config = """

View File

@ -1,8 +1,8 @@
from __future__ import unicode_literals
import mopidy
from mopidy import exceptions, ext
from mopidy.utils import config, formatting
from mopidy import config, exceptions, ext
from mopidy.utils import formatting
default_config = """

View File

@ -1,371 +0,0 @@
from __future__ import unicode_literals
import logging
import re
import socket
from mopidy import exceptions
from mopidy.utils import path
def validate_required(value, required):
"""Required validation, normally called in config value's validate() on the
raw string, _not_ the converted value."""
if required and not value.strip():
raise ValueError('must be set.')
def validate_choice(value, choices):
"""Choice validation, normally called in config value's validate()."""
if choices is not None and value not in choices:
names = ', '.join(repr(c) for c in choices)
raise ValueError('must be one of %s, not %s.' % (names, value))
def validate_minimum(value, minimum):
"""Minimum validation, normally called in config value's validate()."""
if minimum is not None and value < minimum:
raise ValueError('%r must be larger than %r.' % (value, minimum))
def validate_maximum(value, maximum):
"""Maximum validation, normally called in config value's validate()."""
if maximum is not None and value > maximum:
raise ValueError('%r must be smaller than %r.' % (value, maximum))
# TODO: move this and levenshtein to a more appropriate class.
def did_you_mean(name, choices):
"""Suggest most likely setting based on levenshtein."""
if not choices:
return None
name = name.lower()
candidates = [(levenshtein(name, c), c) for c in choices]
candidates.sort()
if candidates[0][0] <= 3:
return candidates[0][1]
return None
def levenshtein(a, b):
"""Calculates the Levenshtein distance between a and b."""
n, m = len(a), len(b)
if n > m:
return levenshtein(b, a)
current = xrange(n + 1)
for i in xrange(1, m + 1):
previous, current = current, [i] + [0] * n
for j in xrange(1, n + 1):
add, delete = previous[j] + 1, current[j - 1] + 1
change = previous[j - 1]
if a[j - 1] != b[i - 1]:
change += 1
current[j] = min(add, delete, change)
return current[n]
class ConfigValue(object):
"""Represents a config key's value and how to handle it.
Normally you will only be interacting with sub-classes for config values
that encode either deserialization behavior and/or validation.
Each config value should be used for the following actions:
1. Deserializing from a raw string and validating, raising ValueError on
failure.
2. Serializing a value back to a string that can be stored in a config.
3. Formatting a value to a printable form (useful for masking secrets).
:class:`None` values should not be deserialized, serialized or formatted,
the code interacting with the config should simply skip None config values.
"""
#: Collection of valid choices for converted value. Must be combined with
#: :function:`validate_choices` in :method:`validate` do any thing.
choices = None
#: Minimum of converted value. Must be combined with
#: :function:`validate_minimum` in :method:`validate` do any thing.
minimum = None
#: Maximum of converted value. Must be combined with
#: :function:`validate_maximum` in :method:`validate` do any thing.
maximum = None
#: Indicate if this field is required.
optional = None
#: Indicate if we should mask the when printing for human consumption.
secret = None
def __init__(self, **kwargs):
self.choices = kwargs.get('choices')
self.minimum = kwargs.get('minimum')
self.maximum = kwargs.get('maximum')
self.optional = kwargs.get('optional')
self.secret = kwargs.get('secret')
def deserialize(self, value):
"""Cast raw string to appropriate type."""
return value
def serialize(self, value):
"""Convert value back to string for saving."""
return str(value)
def format(self, value):
"""Format value for display."""
if self.secret and value is not None:
return '********'
return self.serialize(value)
class String(ConfigValue):
"""String values.
Supports: optional, choices and secret.
"""
def deserialize(self, value):
value = value.strip()
validate_required(value, not self.optional)
validate_choice(value, self.choices)
if not value:
return None
return value
def serialize(self, value):
return value.encode('utf-8').encode('string-escape')
class Integer(ConfigValue):
"""Integer values.
Supports: choices, minimum, maximum and secret.
"""
def deserialize(self, value):
value = int(value)
validate_choice(value, self.choices)
validate_minimum(value, self.minimum)
validate_maximum(value, self.maximum)
return value
class Boolean(ConfigValue):
"""Boolean values.
Supports: secret.
"""
true_values = ('1', 'yes', 'true', 'on')
false_values = ('0', 'no', 'false', 'off')
def deserialize(self, value):
if value.lower() in self.true_values:
return True
elif value.lower() in self.false_values:
return False
raise ValueError('invalid value for boolean: %r' % value)
def serialize(self, value):
if value:
return 'true'
else:
return 'false'
class List(ConfigValue):
"""List values split by comma or newline.
Supports: optional and secret.
"""
def deserialize(self, value):
validate_required(value, not self.optional)
if '\n' in value:
values = re.split(r'\s*\n\s*', value.strip())
else:
values = re.split(r'\s*,\s*', value.strip())
return tuple([v for v in values if v])
def serialize(self, value):
return '\n ' + '\n '.join(v.encode('utf-8') for v in value)
class LogLevel(ConfigValue):
"""Log level values.
Supports: secret.
"""
levels = {
'critical': logging.CRITICAL,
'error': logging.ERROR,
'warning': logging.WARNING,
'info': logging.INFO,
'debug': logging.DEBUG,
}
def deserialize(self, value):
validate_choice(value.lower(), self.levels.keys())
return self.levels.get(value.lower())
def serialize(self, value):
return dict((v, k) for k, v in self.levels.items()).get(value)
class Hostname(ConfigValue):
"""Hostname values.
Supports: optional and secret.
"""
def deserialize(self, value):
validate_required(value, not self.optional)
if not value.strip():
return None
try:
socket.getaddrinfo(value, None)
except socket.error:
raise ValueError('must be a resolveable hostname or valid IP')
return value
class Port(Integer):
"""Port values limited to 1-65535.
Supports: choices and secret.
"""
# TODO: consider probing if port is free or not?
def __init__(self, **kwargs):
super(Port, self).__init__(**kwargs)
self.minimum = 1
self.maximum = 2 ** 16 - 1
class ExpandedPath(bytes):
def __new__(self, value):
expanded = path.expand_path(value)
return super(ExpandedPath, self).__new__(self, expanded)
def __init__(self, value):
self.original = value
class Path(ConfigValue):
"""File system path that will be expanded with mopidy.utils.path.expand_path
Supports: optional, choices and secret.
"""
def deserialize(self, value):
value = value.strip()
validate_required(value, not self.optional)
validate_choice(value, self.choices)
if not value:
return None
return ExpandedPath(value)
def serialize(self, value):
if isinstance(value, ExpandedPath):
return value.original
return value
class ConfigSchema(object):
"""Logical group of config values that correspond to a config section.
Schemas are set up by assigning config keys with config values to
instances. Once setup :meth:`convert` can be called with a list of
``(key, value)`` tuples to process. For convienience we also support
:meth:`format` method that can used for printing out the converted values.
"""
# TODO: Use collections.OrderedDict once 2.6 support is gone (#344)
def __init__(self):
self._schema = {}
self._order = []
def __setitem__(self, key, value):
if key not in self._schema:
self._order.append(key)
self._schema[key] = value
def __getitem__(self, key):
return self._schema[key]
def format(self, name, values):
# TODO: should the output be encoded utf-8 since we use that in
# serialize for strings?
lines = ['[%s]' % name]
for key in self._order:
value = values.get(key)
if value is not None:
lines.append('%s = %s' % (
key, self._schema[key].format(value)))
return '\n'.join(lines)
def convert(self, items):
errors = {}
values = {}
for key, value in items:
try:
values[key] = self._schema[key].deserialize(value)
except KeyError: # not in our schema
errors[key] = 'unknown config key.'
suggestion = did_you_mean(key, self._schema.keys())
if suggestion:
errors[key] += ' Did you mean %s?' % suggestion
except ValueError as e: # deserialization failed
errors[key] = str(e)
for key in self._schema:
if key not in values and key not in errors:
errors[key] = 'config key not found.'
if errors:
raise exceptions.ConfigError(errors)
return values
class ExtensionConfigSchema(ConfigSchema):
"""Sub-classed :class:`ConfigSchema` for use in extensions.
Ensures that ``enabled`` config value is present.
"""
def __init__(self):
super(ExtensionConfigSchema, self).__init__()
self['enabled'] = Boolean()
class LogLevelConfigSchema(object):
"""Special cased schema for handling a config section with loglevels.
Expects the config keys to be logger names and the values to be log levels
as understood by the :class:`LogLevel` config value. Does not sub-class
:class:`ConfigSchema`, but implements the same interface.
"""
def __init__(self):
self._config_value = LogLevel()
def format(self, name, values):
lines = ['[%s]' % name]
for key, value in sorted(values.items()):
if value is not None:
lines.append('%s = %s' % (
key, self._config_value.format(value)))
return '\n'.join(lines)
def convert(self, items):
errors = {}
values = {}
for key, value in items:
try:
if value.strip():
values[key] = self._config_value.deserialize(value)
except ValueError as e: # deserialization failed
errors[key] = str(e)
if errors:
raise exceptions.ConfigError(errors)
return values

View File

@ -0,0 +1,127 @@
from __future__ import unicode_literals
import logging
import mock
from mopidy import exceptions
from mopidy.config import schemas
from tests import unittest
class ConfigSchemaTest(unittest.TestCase):
def setUp(self):
self.schema = schemas.ConfigSchema()
self.schema['foo'] = mock.Mock()
self.schema['bar'] = mock.Mock()
self.schema['baz'] = mock.Mock()
self.values = {'bar': '123', 'foo': '456', 'baz': '678'}
def test_format(self):
self.schema['foo'].format.return_value = 'qwe'
self.schema['bar'].format.return_value = 'asd'
self.schema['baz'].format.return_value = 'zxc'
expected = ['[qwerty]', 'foo = qwe', 'bar = asd', 'baz = zxc']
result = self.schema.format('qwerty', self.values)
self.assertEqual('\n'.join(expected), result)
def test_format_unkwown_value(self):
self.schema['foo'].format.return_value = 'qwe'
self.schema['bar'].format.return_value = 'asd'
self.schema['baz'].format.return_value = 'zxc'
self.values['unknown'] = 'rty'
result = self.schema.format('qwerty', self.values)
self.assertNotIn('unknown = rty', result)
def test_convert(self):
self.schema.convert(self.values.items())
def test_convert_with_missing_value(self):
del self.values['foo']
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('not found', cm.exception['foo'])
def test_convert_with_extra_value(self):
self.values['extra'] = '123'
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('unknown', cm.exception['extra'])
def test_convert_with_deserialization_error(self):
self.schema['foo'].deserialize.side_effect = ValueError('failure')
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('failure', cm.exception['foo'])
def test_convert_with_multiple_deserialization_errors(self):
self.schema['foo'].deserialize.side_effect = ValueError('failure')
self.schema['bar'].deserialize.side_effect = ValueError('other')
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('failure', cm.exception['foo'])
self.assertIn('other', cm.exception['bar'])
def test_convert_deserialization_unknown_and_missing_errors(self):
self.values['extra'] = '123'
self.schema['bar'].deserialize.side_effect = ValueError('failure')
del self.values['baz']
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('unknown', cm.exception['extra'])
self.assertNotIn('foo', cm.exception)
self.assertIn('failure', cm.exception['bar'])
self.assertIn('not found', cm.exception['baz'])
class ExtensionConfigSchemaTest(unittest.TestCase):
def test_schema_includes_enabled(self):
schema = schemas.ExtensionConfigSchema()
self.assertIsInstance(schema['enabled'], values.Boolean)
class LogLevelConfigSchemaTest(unittest.TestCase):
def test_conversion(self):
schema = schemas.LogLevelConfigSchema()
result = schema.convert([('foo.bar', 'DEBUG'), ('baz', 'INFO')])
self.assertEqual(logging.DEBUG, result['foo.bar'])
self.assertEqual(logging.INFO, result['baz'])
def test_format(self):
schema = schemas.LogLevelConfigSchema()
expected = ['[levels]', 'baz = info', 'foo.bar = debug']
result = schema.format('levels', {'foo.bar': logging.DEBUG, 'baz': logging.INFO})
self.assertEqual('\n'.join(expected), result)
class DidYouMeanTest(unittest.TestCase):
def testSuggestoins(self):
choices = ('enabled', 'username', 'password', 'bitrate', 'timeout')
suggestion = schemas._did_you_mean('bitrate', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = schemas._did_you_mean('bitrote', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = schemas._did_you_mean('Bitrot', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = schemas._did_you_mean('BTROT', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = schemas._did_you_mean('btro', choices)
self.assertEqual(suggestion, None)

View File

@ -0,0 +1,66 @@
from __future__ import unicode_literals
from mopidy.config import validators
from tests import unittest
class ValidateChoiceTest(unittest.TestCase):
def test_no_choices_passes(self):
validators.validate_choice('foo', None)
def test_valid_value_passes(self):
validators.validate_choice('foo', ['foo', 'bar', 'baz'])
validators.validate_choice(1, [1, 2, 3])
def test_empty_choices_fails(self):
self.assertRaises(ValueError,validators.validate_choice, 'foo', [])
def test_invalid_value_fails(self):
words = ['foo', 'bar', 'baz']
self.assertRaises(ValueError, validators.validate_choice, 'foobar', words)
self.assertRaises(ValueError, validators.validate_choice, 5, [1, 2, 3])
class ValidateMinimumTest(unittest.TestCase):
def test_no_minimum_passes(self):
validators.validate_minimum(10, None)
def test_valid_value_passes(self):
validators.validate_minimum(10, 5)
def test_to_small_value_fails(self):
self.assertRaises(ValueError, validators.validate_minimum, 10, 20)
def test_to_small_value_fails_with_zero_as_minimum(self):
self.assertRaises(ValueError, validators.validate_minimum, -1, 0)
class ValidateMaximumTest(unittest.TestCase):
def test_no_maximum_passes(self):
validators.validate_maximum(5, None)
def test_valid_value_passes(self):
validators.validate_maximum(5, 10)
def test_to_large_value_fails(self):
self.assertRaises(ValueError, validators.validate_maximum, 10, 5)
def test_to_large_value_fails_with_zero_as_maximum(self):
self.assertRaises(ValueError, validators.validate_maximum, 5, 0)
class ValidateRequiredTest(unittest.TestCase):
def test_passes_when_false(self):
validators.validate_required('foo', False)
validators.validate_required('', False)
validators.validate_required(' ', False)
def test_passes_when_required_and_set(self):
validators.validate_required('foo', True)
validators.validate_required(' foo ', True)
def test_blocks_when_required_and_emtpy(self):
self.assertRaises(ValueError, validators.validate_required, '', True)
self.assertRaises(ValueError, validators.validate_required, ' ', True)

View File

@ -5,74 +5,14 @@ import mock
import socket
from mopidy import exceptions
from mopidy.utils import config
from mopidy.config import values
from tests import unittest
class ValidateChoiceTest(unittest.TestCase):
def test_no_choices_passes(self):
config.validate_choice('foo', None)
def test_valid_value_passes(self):
config.validate_choice('foo', ['foo', 'bar', 'baz'])
config.validate_choice(1, [1, 2, 3])
def test_empty_choices_fails(self):
self.assertRaises(ValueError, config.validate_choice, 'foo', [])
def test_invalid_value_fails(self):
words = ['foo', 'bar', 'baz']
self.assertRaises(ValueError, config.validate_choice, 'foobar', words)
self.assertRaises(ValueError, config.validate_choice, 5, [1, 2, 3])
class ValidateMinimumTest(unittest.TestCase):
def test_no_minimum_passes(self):
config.validate_minimum(10, None)
def test_valid_value_passes(self):
config.validate_minimum(10, 5)
def test_to_small_value_fails(self):
self.assertRaises(ValueError, config.validate_minimum, 10, 20)
def test_to_small_value_fails_with_zero_as_minimum(self):
self.assertRaises(ValueError, config.validate_minimum, -1, 0)
class ValidateMaximumTest(unittest.TestCase):
def test_no_maximum_passes(self):
config.validate_maximum(5, None)
def test_valid_value_passes(self):
config.validate_maximum(5, 10)
def test_to_large_value_fails(self):
self.assertRaises(ValueError, config.validate_maximum, 10, 5)
def test_to_large_value_fails_with_zero_as_maximum(self):
self.assertRaises(ValueError, config.validate_maximum, 5, 0)
class ValidateRequiredTest(unittest.TestCase):
def test_passes_when_false(self):
config.validate_required('foo', False)
config.validate_required('', False)
config.validate_required(' ', False)
def test_passes_when_required_and_set(self):
config.validate_required('foo', True)
config.validate_required(' foo ', True)
def test_blocks_when_required_and_emtpy(self):
self.assertRaises(ValueError, config.validate_required, '', True)
self.assertRaises(ValueError, config.validate_required, ' ', True)
class ConfigValueTest(unittest.TestCase):
def test_init(self):
value = config.ConfigValue()
value = values.ConfigValue()
self.assertIsNone(value.choices)
self.assertIsNone(value.maximum)
self.assertIsNone(value.minimum)
@ -82,7 +22,7 @@ class ConfigValueTest(unittest.TestCase):
def test_init_with_params(self):
kwargs = {'choices': ['foo'], 'minimum': 0, 'maximum': 10,
'secret': True, 'optional': True}
value = config.ConfigValue(**kwargs)
value = values.ConfigValue(**kwargs)
self.assertEqual(['foo'], value.choices)
self.assertEqual(0, value.minimum)
self.assertEqual(10, value.maximum)
@ -90,90 +30,90 @@ class ConfigValueTest(unittest.TestCase):
self.assertEqual(True, value.secret)
def test_deserialize_passes_through(self):
value = config.ConfigValue()
value = values.ConfigValue()
obj = object()
self.assertEqual(obj, value.deserialize(obj))
def test_serialize_conversion_to_string(self):
value = config.ConfigValue()
value = values.ConfigValue()
self.assertIsInstance(value.serialize(object()), basestring)
def test_format_uses_serialize(self):
value = config.ConfigValue()
value = values.ConfigValue()
obj = object()
self.assertEqual(value.serialize(obj), value.format(obj))
def test_format_masks_secrets(self):
value = config.ConfigValue(secret=True)
value = values.ConfigValue(secret=True)
self.assertEqual('********', value.format(object()))
class StringTest(unittest.TestCase):
def test_deserialize_conversion_success(self):
value = config.String()
value = values.String()
self.assertEqual('foo', value.deserialize(' foo '))
def test_deserialize_enforces_choices(self):
value = config.String(choices=['foo', 'bar', 'baz'])
value = values.String(choices=['foo', 'bar', 'baz'])
self.assertEqual('foo', value.deserialize('foo'))
self.assertRaises(ValueError, value.deserialize, 'foobar')
def test_deserialize_enforces_required(self):
value = config.String()
value = values.String()
self.assertRaises(ValueError, value.deserialize, '')
self.assertRaises(ValueError, value.deserialize, ' ')
def test_deserialize_respects_optional(self):
value = config.String(optional=True)
value = values.String(optional=True)
self.assertIsNone(value.deserialize(''))
self.assertIsNone(value.deserialize(' '))
def test_serialize_string_escapes(self):
value = config.String()
value = values.String()
self.assertEqual(r'\r\n\t', value.serialize('\r\n\t'))
def test_format_masks_secrets(self):
value = config.String(secret=True)
value = values.String(secret=True)
self.assertEqual('********', value.format('s3cret'))
class IntegerTest(unittest.TestCase):
def test_deserialize_conversion_success(self):
value = config.Integer()
value = values.Integer()
self.assertEqual(123, value.deserialize('123'))
self.assertEqual(0, value.deserialize('0'))
self.assertEqual(-10, value.deserialize('-10'))
def test_deserialize_conversion_failure(self):
value = config.Integer()
value = values.Integer()
self.assertRaises(ValueError, value.deserialize, 'asd')
self.assertRaises(ValueError, value.deserialize, '3.14')
self.assertRaises(ValueError, value.deserialize, '')
self.assertRaises(ValueError, value.deserialize, ' ')
def test_deserialize_enforces_choices(self):
value = config.Integer(choices=[1, 2, 3])
value = values.Integer(choices=[1, 2, 3])
self.assertEqual(3, value.deserialize('3'))
self.assertRaises(ValueError, value.deserialize, '5')
def test_deserialize_enforces_minimum(self):
value = config.Integer(minimum=10)
value = values.Integer(minimum=10)
self.assertEqual(15, value.deserialize('15'))
self.assertRaises(ValueError, value.deserialize, '5')
def test_deserialize_enforces_maximum(self):
value = config.Integer(maximum=10)
value = values.Integer(maximum=10)
self.assertEqual(5, value.deserialize('5'))
self.assertRaises(ValueError, value.deserialize, '15')
def test_format_masks_secrets(self):
value = config.Integer(secret=True)
value = values.Integer(secret=True)
self.assertEqual('********', value.format('1337'))
class BooleanTest(unittest.TestCase):
def test_deserialize_conversion_success(self):
value = config.Boolean()
value = values.Boolean()
for true in ('1', 'yes', 'true', 'on'):
self.assertIs(value.deserialize(true), True)
self.assertIs(value.deserialize(true.upper()), True)
@ -184,24 +124,24 @@ class BooleanTest(unittest.TestCase):
self.assertIs(value.deserialize(false.capitalize()), False)
def test_deserialize_conversion_failure(self):
value = config.Boolean()
value = values.Boolean()
self.assertRaises(ValueError, value.deserialize, 'nope')
self.assertRaises(ValueError, value.deserialize, 'sure')
self.assertRaises(ValueError, value.deserialize, '')
def test_serialize(self):
value = config.Boolean()
value = values.Boolean()
self.assertEqual('true', value.serialize(True))
self.assertEqual('false', value.serialize(False))
def test_format_masks_secrets(self):
value = config.Boolean(secret=True)
value = values.Boolean(secret=True)
self.assertEqual('********', value.format('true'))
class ListTest(unittest.TestCase):
def test_deserialize_conversion_success(self):
value = config.List()
value = values.List()
expected = ('foo', 'bar', 'baz')
self.assertEqual(expected, value.deserialize('foo, bar ,baz '))
@ -210,17 +150,17 @@ class ListTest(unittest.TestCase):
self.assertEqual(expected, value.deserialize(' foo,bar\nbar\nbaz'))
def test_deserialize_enforces_required(self):
value = config.List()
value = values.List()
self.assertRaises(ValueError, value.deserialize, '')
self.assertRaises(ValueError, value.deserialize, ' ')
def test_deserialize_respects_optional(self):
value = config.List(optional=True)
value = values.List(optional=True)
self.assertEqual(tuple(), value.deserialize(''))
self.assertEqual(tuple(), value.deserialize(' '))
def test_serialize(self):
value = config.List()
value = values.List()
result = value.serialize(('foo', 'bar', 'baz'))
self.assertRegexpMatches(result, r'foo\n\s*bar\n\s*baz')
@ -233,21 +173,21 @@ class BooleanTest(unittest.TestCase):
'debug': logging.DEBUG}
def test_deserialize_conversion_success(self):
value = config.LogLevel()
value = values.LogLevel()
for name, level in self.levels.items():
self.assertEqual(level, value.deserialize(name))
self.assertEqual(level, value.deserialize(name.upper()))
self.assertEqual(level, value.deserialize(name.capitalize()))
def test_deserialize_conversion_failure(self):
value = config.LogLevel()
value = values.LogLevel()
self.assertRaises(ValueError, value.deserialize, 'nope')
self.assertRaises(ValueError, value.deserialize, 'sure')
self.assertRaises(ValueError, value.deserialize, '')
self.assertRaises(ValueError, value.deserialize, ' ')
def test_serialize(self):
value = config.LogLevel()
value = values.LogLevel()
for name, level in self.levels.items():
self.assertEqual(name, value.serialize(level))
self.assertIsNone(value.serialize(1337))
@ -256,26 +196,26 @@ class BooleanTest(unittest.TestCase):
class HostnameTest(unittest.TestCase):
@mock.patch('socket.getaddrinfo')
def test_deserialize_conversion_success(self, getaddrinfo_mock):
value = config.Hostname()
value = values.Hostname()
value.deserialize('example.com')
getaddrinfo_mock.assert_called_once_with('example.com', None)
@mock.patch('socket.getaddrinfo')
def test_deserialize_conversion_failure(self, getaddrinfo_mock):
value = config.Hostname()
value = values.Hostname()
getaddrinfo_mock.side_effect = socket.error
self.assertRaises(ValueError, value.deserialize, 'example.com')
@mock.patch('socket.getaddrinfo')
def test_deserialize_enforces_required(self, getaddrinfo_mock):
value = config.Hostname()
value = values.Hostname()
self.assertRaises(ValueError, value.deserialize, '')
self.assertRaises(ValueError, value.deserialize, ' ')
self.assertEqual(0, getaddrinfo_mock.call_count)
@mock.patch('socket.getaddrinfo')
def test_deserialize_respects_optional(self, getaddrinfo_mock):
value = config.Hostname(optional=True)
value = values.Hostname(optional=True)
self.assertIsNone(value.deserialize(''))
self.assertIsNone(value.deserialize(' '))
self.assertEqual(0, getaddrinfo_mock.call_count)
@ -283,14 +223,14 @@ class HostnameTest(unittest.TestCase):
class PortTest(unittest.TestCase):
def test_valid_ports(self):
value = config.Port()
value = values.Port()
self.assertEqual(1, value.deserialize('1'))
self.assertEqual(80, value.deserialize('80'))
self.assertEqual(6600, value.deserialize('6600'))
self.assertEqual(65535, value.deserialize('65535'))
def test_invalid_ports(self):
value = config.Port()
value = values.Port()
self.assertRaises(ValueError, value.deserialize, '65536')
self.assertRaises(ValueError, value.deserialize, '100000')
self.assertRaises(ValueError, value.deserialize, '0')
@ -300,166 +240,48 @@ class PortTest(unittest.TestCase):
class ExpandedPathTest(unittest.TestCase):
def test_is_bytes(self):
self.assertIsInstance(config.ExpandedPath('/tmp'), bytes)
self.assertIsInstance(values.ExpandedPath('/tmp'), bytes)
@mock.patch('mopidy.utils.path.expand_path')
def test_defaults_to_expanded(self, expand_path_mock):
expand_path_mock.return_value = 'expanded_path'
self.assertEqual('expanded_path', config.ExpandedPath('~'))
self.assertEqual('expanded_path', values.ExpandedPath('~'))
@mock.patch('mopidy.utils.path.expand_path')
def test_orginal_stores_unexpanded(self, expand_path_mock):
self.assertEqual('~', config.ExpandedPath('~').original)
self.assertEqual('~', values.ExpandedPath('~').original)
class PathTest(unittest.TestCase):
def test_deserialize_conversion_success(self):
result = config.Path().deserialize('/foo')
result = values.Path().deserialize('/foo')
self.assertEqual('/foo', result)
self.assertIsInstance(result, config.ExpandedPath)
self.assertIsInstance(result, values.ExpandedPath)
self.assertIsInstance(result, bytes)
def test_deserialize_enforces_choices(self):
value = config.Path(choices=['/foo', '/bar', '/baz'])
value = values.Path(choices=['/foo', '/bar', '/baz'])
self.assertEqual('/foo', value.deserialize('/foo'))
self.assertRaises(ValueError, value.deserialize, '/foobar')
def test_deserialize_enforces_required(self):
value = config.Path()
value = values.Path()
self.assertRaises(ValueError, value.deserialize, '')
self.assertRaises(ValueError, value.deserialize, ' ')
def test_deserialize_respects_optional(self):
value = config.Path(optional=True)
value = values.Path(optional=True)
self.assertIsNone(value.deserialize(''))
self.assertIsNone(value.deserialize(' '))
@mock.patch('mopidy.utils.path.expand_path')
def test_serialize_uses_original(self, expand_path_mock):
expand_path_mock.return_value = 'expanded_path'
path = config.ExpandedPath('original_path')
value = config.Path()
path = values.ExpandedPath('original_path')
value = values.Path()
self.assertEqual('expanded_path', path)
self.assertEqual('original_path', value.serialize(path))
def test_serialize_plain_string(self):
value = config.Path()
value = values.Path()
self.assertEqual('path', value.serialize('path'))
class ConfigSchemaTest(unittest.TestCase):
def setUp(self):
self.schema = config.ConfigSchema()
self.schema['foo'] = mock.Mock()
self.schema['bar'] = mock.Mock()
self.schema['baz'] = mock.Mock()
self.values = {'bar': '123', 'foo': '456', 'baz': '678'}
def test_format(self):
self.schema['foo'].format.return_value = 'qwe'
self.schema['bar'].format.return_value = 'asd'
self.schema['baz'].format.return_value = 'zxc'
expected = ['[qwerty]', 'foo = qwe', 'bar = asd', 'baz = zxc']
result = self.schema.format('qwerty', self.values)
self.assertEqual('\n'.join(expected), result)
def test_format_unkwown_value(self):
self.schema['foo'].format.return_value = 'qwe'
self.schema['bar'].format.return_value = 'asd'
self.schema['baz'].format.return_value = 'zxc'
self.values['unknown'] = 'rty'
result = self.schema.format('qwerty', self.values)
self.assertNotIn('unknown = rty', result)
def test_convert(self):
self.schema.convert(self.values.items())
def test_convert_with_missing_value(self):
del self.values['foo']
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('not found', cm.exception['foo'])
def test_convert_with_extra_value(self):
self.values['extra'] = '123'
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('unknown', cm.exception['extra'])
def test_convert_with_deserialization_error(self):
self.schema['foo'].deserialize.side_effect = ValueError('failure')
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('failure', cm.exception['foo'])
def test_convert_with_multiple_deserialization_errors(self):
self.schema['foo'].deserialize.side_effect = ValueError('failure')
self.schema['bar'].deserialize.side_effect = ValueError('other')
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('failure', cm.exception['foo'])
self.assertIn('other', cm.exception['bar'])
def test_convert_deserialization_unknown_and_missing_errors(self):
self.values['extra'] = '123'
self.schema['bar'].deserialize.side_effect = ValueError('failure')
del self.values['baz']
with self.assertRaises(exceptions.ConfigError) as cm:
self.schema.convert(self.values.items())
self.assertIn('unknown', cm.exception['extra'])
self.assertNotIn('foo', cm.exception)
self.assertIn('failure', cm.exception['bar'])
self.assertIn('not found', cm.exception['baz'])
class ExtensionConfigSchemaTest(unittest.TestCase):
def test_schema_includes_enabled(self):
schema = config.ExtensionConfigSchema()
self.assertIsInstance(schema['enabled'], config.Boolean)
class LogLevelConfigSchemaTest(unittest.TestCase):
def test_conversion(self):
schema = config.LogLevelConfigSchema()
result = schema.convert([('foo.bar', 'DEBUG'), ('baz', 'INFO')])
self.assertEqual(logging.DEBUG, result['foo.bar'])
self.assertEqual(logging.INFO, result['baz'])
def test_format(self):
schema = config.LogLevelConfigSchema()
expected = ['[levels]', 'baz = info', 'foo.bar = debug']
result = schema.format('levels', {'foo.bar': logging.DEBUG, 'baz': logging.INFO})
self.assertEqual('\n'.join(expected), result)
class DidYouMeanTest(unittest.TestCase):
def testSuggestoins(self):
choices = ('enabled', 'username', 'password', 'bitrate', 'timeout')
suggestion = config.did_you_mean('bitrate', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = config.did_you_mean('bitrote', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = config.did_you_mean('Bitrot', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = config.did_you_mean('BTROT', choices)
self.assertEqual(suggestion, 'bitrate')
suggestion = config.did_you_mean('btro', choices)
self.assertEqual(suggestion, None)

View File

@ -1,14 +1,13 @@
from __future__ import unicode_literals
from mopidy.ext import Extension
from mopidy.utils import config
from mopidy import config, ext
from tests import unittest
class ExtensionTest(unittest.TestCase):
def setUp(self):
self.ext = Extension()
self.ext = ext.Extension()
def test_dist_name_is_none(self):
self.assertIsNone(self.ext.dist_name)