Merge pull request #379 from adamcik/feature/config
Add mopidy.utils.config with new ConfigValue and ConfigSchema classes.
This commit is contained in:
commit
b91c456c90
25
mopidy/config.py
Normal file
25
mopidy/config.py
Normal file
@ -0,0 +1,25 @@
|
||||
from mopidy.utils import config
|
||||
|
||||
schemas = {} # TODO: use ordered dict?
|
||||
schemas['logging'] = config.ConfigSchema()
|
||||
schemas['logging']['config_file'] = config.String()
|
||||
schemas['logging']['console_format'] = config.String()
|
||||
schemas['logging']['debug_format'] = config.String()
|
||||
schemas['logging']['debug_file'] = config.String()
|
||||
schemas['logging']['debug_thread'] = config.Boolean()
|
||||
|
||||
schemas['logging.levels'] = config.LogLevelConfigSchema()
|
||||
|
||||
schemas['audio'] = config.ConfigSchema()
|
||||
schemas['audio']['mixer'] = config.String()
|
||||
schemas['audio']['mixer_track'] = config.String()
|
||||
schemas['audio']['output'] = config.String()
|
||||
|
||||
# NOTE: if multiple outputs ever comes something like LogLevelConfigSchema
|
||||
#schemas['audio.outputs'] = config.AudioOutputConfigSchema()
|
||||
|
||||
|
||||
def register_schema(name, schema):
|
||||
if name in schemas:
|
||||
raise Exception
|
||||
schemas[name] = schema
|
||||
@ -20,6 +20,24 @@ class SettingsError(MopidyException):
|
||||
pass
|
||||
|
||||
|
||||
class ConfigError(MopidyException):
|
||||
def __init__(self, errors):
|
||||
self._errors = errors
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._errors[key]
|
||||
|
||||
def __iter__(self):
|
||||
return self._errors.iterkeys()
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
lines = []
|
||||
for key, msg in self._errors.items():
|
||||
lines.append('%s: %s' % (key, msg))
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
class OptionalDependencyError(MopidyException):
|
||||
pass
|
||||
|
||||
|
||||
262
mopidy/utils/config.py
Normal file
262
mopidy/utils/config.py
Normal file
@ -0,0 +1,262 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import re
|
||||
import socket
|
||||
|
||||
from mopidy import exceptions
|
||||
|
||||
|
||||
def validate_choice(value, choices):
|
||||
"""Choice validation, normally called in config value's validate()."""
|
||||
if choices is not None and value not in choices :
|
||||
names = ', '.join(repr(c) for c in choices)
|
||||
raise ValueError('must be one of %s, not %s.' % (names, value))
|
||||
|
||||
|
||||
def validate_minimum(value, minimum):
|
||||
"""Minimum validation, normally called in config value's validate()."""
|
||||
if minimum is not None and value < minimum:
|
||||
raise ValueError('%r must be larger than %r.' % (value, minimum))
|
||||
|
||||
|
||||
def validate_maximum(value, maximum):
|
||||
"""Maximum validation, normally called in config value's validate()."""
|
||||
if maximum is not None and value > maximum:
|
||||
raise ValueError('%r must be smaller than %r.' % (value, maximum))
|
||||
|
||||
|
||||
class ConfigValue(object):
|
||||
"""Represents a config key's value and how to handle it.
|
||||
|
||||
Normally you will only be interacting with sub-classes for config values
|
||||
that encode either deserialization behavior and/or validation.
|
||||
|
||||
Each config value should be used for the following actions:
|
||||
|
||||
1. Deserializing from a raw string and validating, raising ValueError on
|
||||
failure.
|
||||
2. Serializing a value back to a string that can be stored in a config.
|
||||
3. Formatting a value to a printable form (useful for masking secrets).
|
||||
|
||||
:class:`None` values should not be deserialized, serialized or formatted,
|
||||
the code interacting with the config should simply skip None config values.
|
||||
"""
|
||||
|
||||
#: Collection of valid choices for converted value. Must be combined with
|
||||
#: :function:`validate_choices` in :method:`validate` do any thing.
|
||||
choices = None
|
||||
|
||||
#: Minimum of converted value. Must be combined with
|
||||
#: :function:`validate_minimum` in :method:`validate` do any thing.
|
||||
minimum = None
|
||||
|
||||
#: Maximum of converted value. Must be combined with
|
||||
#: :function:`validate_maximum` in :method:`validate` do any thing.
|
||||
maximum = None
|
||||
|
||||
#: Indicate if we should mask the when printing for human consumption.
|
||||
secret = None
|
||||
|
||||
def __init__(self, choices=None, minimum=None, maximum=None, secret=None):
|
||||
self.choices = choices
|
||||
self.minimum = minimum
|
||||
self.maximum = maximum
|
||||
self.secret = secret
|
||||
|
||||
def deserialize(self, value):
|
||||
"""Cast raw string to appropriate type."""
|
||||
return value
|
||||
|
||||
def serialize(self, value):
|
||||
"""Convert value back to string for saving."""
|
||||
if value is None:
|
||||
return ''
|
||||
return str(value)
|
||||
|
||||
def format(self, value):
|
||||
"""Format value for display."""
|
||||
if self.secret and value is not None:
|
||||
return '********'
|
||||
return self.serialize(value)
|
||||
|
||||
|
||||
class String(ConfigValue):
|
||||
def deserialize(self, value):
|
||||
value = value.strip()
|
||||
validate_choice(value, self.choices)
|
||||
return value
|
||||
|
||||
def serialize(self, value):
|
||||
return value.encode('utf-8')
|
||||
|
||||
|
||||
class Integer(ConfigValue):
|
||||
def deserialize(self, value):
|
||||
value = int(value.strip())
|
||||
validate_choice(value, self.choices)
|
||||
validate_minimum(value, self.minimum)
|
||||
validate_maximum(value, self.maximum)
|
||||
return value
|
||||
|
||||
|
||||
class Boolean(ConfigValue):
|
||||
true_values = ('1', 'yes', 'true', 'on')
|
||||
false_values = ('0', 'no', 'false', 'off')
|
||||
|
||||
def deserialize(self, value):
|
||||
if value.lower() in self.true_values:
|
||||
return True
|
||||
elif value.lower() in self.false_values:
|
||||
return False
|
||||
|
||||
raise ValueError('invalid value for boolean: %r' % value)
|
||||
|
||||
def serialize(self, value):
|
||||
if value:
|
||||
return 'true'
|
||||
else:
|
||||
return 'false'
|
||||
|
||||
|
||||
class List(ConfigValue):
|
||||
def deserialize(self, value):
|
||||
if '\n' in value:
|
||||
return re.split(r'\s*\n\s*', value.strip())
|
||||
else:
|
||||
return re.split(r'\s*,\s*', value.strip())
|
||||
|
||||
def serialize(self, value):
|
||||
return '\n '.join(v.encode('utf-8') for v in value)
|
||||
|
||||
|
||||
class LogLevel(ConfigValue):
|
||||
levels = {'critical' : logging.CRITICAL,
|
||||
'error' : logging.ERROR,
|
||||
'warning' : logging.WARNING,
|
||||
'info' : logging.INFO,
|
||||
'debug' : logging.DEBUG}
|
||||
|
||||
def deserialize(self, value):
|
||||
if value.lower() not in self.levels:
|
||||
raise ValueError('%r must be one of %s.' % (value, ', '.join(self.levels)))
|
||||
return self.levels.get(value.lower())
|
||||
|
||||
def serialize(self, value):
|
||||
return dict((v, k) for k, v in self.levels.items()).get(value)
|
||||
|
||||
|
||||
class Hostname(ConfigValue):
|
||||
def deserialize(self, value):
|
||||
try:
|
||||
socket.getaddrinfo(value, None)
|
||||
except socket.error:
|
||||
raise ValueError('must be a resolveable hostname or valid IP')
|
||||
return value
|
||||
|
||||
|
||||
class Port(Integer):
|
||||
def __init__(self, **kwargs):
|
||||
super(Port, self).__init__(**kwargs)
|
||||
self.minimum = 1
|
||||
self.maximum = 2**16 - 1
|
||||
|
||||
|
||||
class ConfigSchema(object):
|
||||
"""Logical group of config values that correspond to a config section.
|
||||
|
||||
Schemas are set up by assigning config keys with config values to instances.
|
||||
Once setup :meth:`convert` can be called with a list of `(key, value)` tuples to
|
||||
process. For convienience we also support :meth:`format` method that can used
|
||||
for printing out the converted values.
|
||||
"""
|
||||
# TODO: Use collections.OrderedDict once 2.6 support is gone (#344)
|
||||
def __init__(self):
|
||||
self._schema = {}
|
||||
self._order = []
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key not in self._schema:
|
||||
self._order.append(key)
|
||||
self._schema[key] = value
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._schema[key]
|
||||
|
||||
def format(self, name, values):
|
||||
lines = ['[%s]' % name]
|
||||
for key in self._order:
|
||||
value = values.get(key)
|
||||
if value is not None:
|
||||
lines.append('%s = %s' % (key, self._schema[key].format(value)))
|
||||
return '\n'.join(lines)
|
||||
|
||||
def convert(self, items):
|
||||
errors = {}
|
||||
values = {}
|
||||
|
||||
for key, value in items:
|
||||
try:
|
||||
if value.strip():
|
||||
values[key] = self._schema[key].deserialize(value)
|
||||
else: # treat blank entries as none
|
||||
values[key] = None
|
||||
except KeyError: # not in our schema
|
||||
errors[key] = 'unknown config key.'
|
||||
except ValueError as e: # deserialization failed
|
||||
errors[key] = str(e)
|
||||
|
||||
for key in self._schema:
|
||||
if key not in values and key not in errors:
|
||||
errors[key] = 'config key not found.'
|
||||
|
||||
if errors:
|
||||
raise exceptions.ConfigError(errors)
|
||||
return values
|
||||
|
||||
|
||||
class ExtensionConfigSchema(ConfigSchema):
|
||||
"""Sub-classed :class:`ConfigSchema` for use in extensions.
|
||||
|
||||
Ensures that `enabled` config value is present and that section name is
|
||||
prefixed with ext.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(ExtensionConfigSchema, self).__init__()
|
||||
self['enabled'] = Boolean()
|
||||
|
||||
def format(self, name, values):
|
||||
return super(ExtensionConfigSchema, self).format('ext.%s' % name, values)
|
||||
|
||||
|
||||
class LogLevelConfigSchema(object):
|
||||
"""Special cased schema for handling a config section with loglevels.
|
||||
|
||||
Expects the config keys to be logger names and the values to be log levels
|
||||
as understood by the :class:`LogLevel` config value. Does not sub-class
|
||||
:class:`ConfigSchema`, but implements the same interface.
|
||||
"""
|
||||
def __init__(self):
|
||||
self._config_value = LogLevel()
|
||||
|
||||
def format(self, name, values):
|
||||
lines = ['[%s]' % name]
|
||||
for key, value in sorted(values.items()):
|
||||
if value is not None:
|
||||
lines.append('%s = %s' % (key, self._config_value.format(value)))
|
||||
return '\n'.join(lines)
|
||||
|
||||
def convert(self, items):
|
||||
errors = {}
|
||||
values = {}
|
||||
|
||||
for key, value in items:
|
||||
try:
|
||||
if value.strip():
|
||||
values[key] = self._config_value.deserialize(value)
|
||||
except ValueError as e: # deserialization failed
|
||||
errors[key] = str(e)
|
||||
|
||||
if errors:
|
||||
raise exceptions.ConfigError(errors)
|
||||
return values
|
||||
@ -23,3 +23,13 @@ class ExceptionsTest(unittest.TestCase):
|
||||
def test_extension_error_is_a_mopidy_exception(self):
|
||||
self.assert_(issubclass(
|
||||
exceptions.ExtensionError, exceptions.MopidyException))
|
||||
|
||||
def test_config_error_is_a_mopidy_exception(self):
|
||||
self.assert_(issubclass(
|
||||
exceptions.ConfigError, exceptions.MopidyException))
|
||||
|
||||
def test_config_error_provides_getitem(self):
|
||||
exception = exceptions.ConfigError({'field1': 'msg1', 'field2': 'msg2'})
|
||||
self.assertEqual('msg1', exception['field1'])
|
||||
self.assertEqual('msg2', exception['field2'])
|
||||
self.assertItemsEqual(['field1', 'field2'], exception)
|
||||
|
||||
348
tests/utils/config_test.py
Normal file
348
tests/utils/config_test.py
Normal file
@ -0,0 +1,348 @@
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
import mock
|
||||
import socket
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.utils import config
|
||||
|
||||
from tests import unittest
|
||||
|
||||
|
||||
class ValidateChoiceTest(unittest.TestCase):
|
||||
def test_no_choices_passes(self):
|
||||
config.validate_choice('foo', None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
config.validate_choice('foo', ['foo', 'bar', 'baz'])
|
||||
config.validate_choice(1, [1, 2, 3])
|
||||
|
||||
def test_empty_choices_fails(self):
|
||||
self.assertRaises(ValueError, config.validate_choice, 'foo', [])
|
||||
|
||||
def test_invalid_value_fails(self):
|
||||
words = ['foo', 'bar', 'baz']
|
||||
self.assertRaises(ValueError, config.validate_choice, 'foobar', words)
|
||||
self.assertRaises(ValueError, config.validate_choice, 5, [1, 2, 3])
|
||||
|
||||
|
||||
class ValidateMinimumTest(unittest.TestCase):
|
||||
def test_no_minimum_passes(self):
|
||||
config.validate_minimum(10, None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
config.validate_minimum(10, 5)
|
||||
|
||||
def test_to_small_value_fails(self):
|
||||
self.assertRaises(ValueError, config.validate_minimum, 10, 20)
|
||||
|
||||
def test_to_small_value_fails_with_zero_as_minimum(self):
|
||||
self.assertRaises(ValueError, config.validate_minimum, -1, 0)
|
||||
|
||||
|
||||
class ValidateMaximumTest(unittest.TestCase):
|
||||
def test_no_maximum_passes(self):
|
||||
config.validate_maximum(5, None)
|
||||
|
||||
def test_valid_value_passes(self):
|
||||
config.validate_maximum(5, 10)
|
||||
|
||||
def test_to_large_value_fails(self):
|
||||
self.assertRaises(ValueError, config.validate_maximum, 10, 5)
|
||||
|
||||
def test_to_large_value_fails_with_zero_as_maximum(self):
|
||||
self.assertRaises(ValueError, config.validate_maximum, 5, 0)
|
||||
|
||||
|
||||
class ConfigValueTest(unittest.TestCase):
|
||||
def test_init(self):
|
||||
value = config.ConfigValue()
|
||||
self.assertIsNone(value.choices)
|
||||
self.assertIsNone(value.minimum)
|
||||
self.assertIsNone(value.maximum)
|
||||
self.assertIsNone(value.secret)
|
||||
|
||||
def test_init_with_params(self):
|
||||
value = config.ConfigValue(
|
||||
choices=['foo'], minimum=0, maximum=10, secret=True)
|
||||
self.assertEqual(['foo'], value.choices)
|
||||
self.assertEqual(0, value.minimum)
|
||||
self.assertEqual(10, value.maximum)
|
||||
self.assertEqual(True, value.secret)
|
||||
|
||||
def test_deserialize_passes_through(self):
|
||||
value = config.ConfigValue()
|
||||
obj = object()
|
||||
self.assertEqual(obj, value.deserialize(obj))
|
||||
|
||||
def test_serialize_converts_to_string(self):
|
||||
value = config.ConfigValue()
|
||||
self.assertIsInstance(value.serialize(object()), basestring)
|
||||
|
||||
def test_format_uses_serialize(self):
|
||||
value = config.ConfigValue()
|
||||
obj = object()
|
||||
self.assertEqual(value.serialize(obj), value.format(obj))
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.ConfigValue(secret=True)
|
||||
self.assertEqual('********', value.format(object()))
|
||||
|
||||
|
||||
class StringTest(unittest.TestCase):
|
||||
def test_deserialize_strips_whitespace(self):
|
||||
value = config.String()
|
||||
self.assertEqual('foo', value.deserialize(' foo '))
|
||||
|
||||
def test_deserialize_enforces_choices(self):
|
||||
value = config.String(choices=['foo', 'bar', 'baz'])
|
||||
self.assertEqual('foo', value.deserialize('foo'))
|
||||
self.assertRaises(ValueError, value.deserialize, 'foobar')
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.String(secret=True)
|
||||
self.assertEqual('********', value.format('s3cret'))
|
||||
|
||||
|
||||
class IntegerTest(unittest.TestCase):
|
||||
def test_deserialize_converts_to_int(self):
|
||||
value = config.Integer()
|
||||
self.assertEqual(123, value.deserialize('123'))
|
||||
self.assertEqual(0, value.deserialize('0'))
|
||||
self.assertEqual(-10, value.deserialize('-10'))
|
||||
|
||||
def test_deserialize_fails_on_bad_data(self):
|
||||
value = config.Integer()
|
||||
self.assertRaises(ValueError, value.deserialize, 'asd')
|
||||
self.assertRaises(ValueError, value.deserialize, '3.14')
|
||||
|
||||
def test_deserialize_enforces_choices(self):
|
||||
value = config.Integer(choices=[1, 2, 3])
|
||||
self.assertEqual(3, value.deserialize('3'))
|
||||
self.assertRaises(ValueError, value.deserialize, '5')
|
||||
|
||||
def test_deserialize_enforces_minimum(self):
|
||||
value = config.Integer(minimum=10)
|
||||
self.assertEqual(15, value.deserialize('15'))
|
||||
self.assertRaises(ValueError, value.deserialize, '5')
|
||||
|
||||
def test_deserialize_enforces_maximum(self):
|
||||
value = config.Integer(maximum=10)
|
||||
self.assertEqual(5, value.deserialize('5'))
|
||||
self.assertRaises(ValueError, value.deserialize, '15')
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.Integer(secret=True)
|
||||
self.assertEqual('********', value.format('1337'))
|
||||
|
||||
|
||||
class BooleanTest(unittest.TestCase):
|
||||
def test_deserialize_converts_to_bool(self):
|
||||
value = config.Boolean()
|
||||
for true in ('1', 'yes', 'true', 'on'):
|
||||
self.assertIs(value.deserialize(true), True)
|
||||
self.assertIs(value.deserialize(true.upper()), True)
|
||||
self.assertIs(value.deserialize(true.capitalize()), True)
|
||||
for false in ('0', 'no', 'false', 'off'):
|
||||
self.assertIs(value.deserialize(false), False)
|
||||
self.assertIs(value.deserialize(false.upper()), False)
|
||||
self.assertIs(value.deserialize(false.capitalize()), False)
|
||||
|
||||
def test_deserialize_fails_on_bad_data(self):
|
||||
value = config.Boolean()
|
||||
self.assertRaises(ValueError, value.deserialize, 'nope')
|
||||
self.assertRaises(ValueError, value.deserialize, 'sure')
|
||||
|
||||
def test_serialize_normalises_strings(self):
|
||||
value = config.Boolean()
|
||||
self.assertEqual('true', value.serialize(True))
|
||||
self.assertEqual('false', value.serialize(False))
|
||||
|
||||
def test_format_masks_secrets(self):
|
||||
value = config.Boolean(secret=True)
|
||||
self.assertEqual('********', value.format('true'))
|
||||
|
||||
|
||||
class ListTest(unittest.TestCase):
|
||||
def test_deserialize_splits_commas(self):
|
||||
value = config.List()
|
||||
self.assertEqual(['foo', 'bar', 'baz'],
|
||||
value.deserialize('foo, bar,baz'))
|
||||
|
||||
def test_deserialize_splits_newlines(self):
|
||||
value = config.List()
|
||||
self.assertEqual(['foo,bar', 'bar', 'baz'],
|
||||
value.deserialize('foo,bar\nbar\nbaz'))
|
||||
|
||||
def test_serialize_joins_by_newlines(self):
|
||||
value = config.List()
|
||||
self.assertRegexpMatches(value.serialize(['foo', 'bar', 'baz']),
|
||||
r'foo\n\s*bar\n\s*baz')
|
||||
|
||||
|
||||
class BooleanTest(unittest.TestCase):
|
||||
levels = {'critical': logging.CRITICAL,
|
||||
'error': logging.ERROR,
|
||||
'warning': logging.WARNING,
|
||||
'info': logging.INFO,
|
||||
'debug': logging.DEBUG}
|
||||
|
||||
def test_deserialize_converts_to_numeric_loglevel(self):
|
||||
value = config.LogLevel()
|
||||
for name, level in self.levels.items():
|
||||
self.assertEqual(level, value.deserialize(name))
|
||||
self.assertEqual(level, value.deserialize(name.upper()))
|
||||
self.assertEqual(level, value.deserialize(name.capitalize()))
|
||||
|
||||
def test_deserialize_fails_on_bad_data(self):
|
||||
value = config.LogLevel()
|
||||
self.assertRaises(ValueError, value.deserialize, 'nope')
|
||||
self.assertRaises(ValueError, value.deserialize, 'sure')
|
||||
|
||||
def test_serialize_converts_to_string(self):
|
||||
value = config.LogLevel()
|
||||
for name, level in self.levels.items():
|
||||
self.assertEqual(name, value.serialize(level))
|
||||
|
||||
def test_serialize_unknown_level(self):
|
||||
value = config.LogLevel()
|
||||
self.assertIsNone(value.serialize(1337))
|
||||
|
||||
|
||||
class HostnameTest(unittest.TestCase):
|
||||
@mock.patch('socket.getaddrinfo')
|
||||
def test_deserialize_checks_addrinfo(self, getaddrinfo_mock):
|
||||
value = config.Hostname()
|
||||
value.deserialize('example.com')
|
||||
getaddrinfo_mock.assert_called_once_with('example.com', None)
|
||||
|
||||
@mock.patch('socket.getaddrinfo')
|
||||
def test_deserialize_handles_failures(self, getaddrinfo_mock):
|
||||
value = config.Hostname()
|
||||
getaddrinfo_mock.side_effect = socket.error
|
||||
self.assertRaises(ValueError, value.deserialize, 'example.com')
|
||||
|
||||
|
||||
class PortTest(unittest.TestCase):
|
||||
def test_valid_ports(self):
|
||||
value = config.Port()
|
||||
self.assertEqual(1, value.deserialize('1'))
|
||||
self.assertEqual(80, value.deserialize('80'))
|
||||
self.assertEqual(6600, value.deserialize('6600'))
|
||||
self.assertEqual(65535, value.deserialize('65535'))
|
||||
|
||||
def test_invalid_ports(self):
|
||||
value = config.Port()
|
||||
self.assertRaises(ValueError, value.deserialize, '65536')
|
||||
self.assertRaises(ValueError, value.deserialize, '100000')
|
||||
self.assertRaises(ValueError, value.deserialize, '0')
|
||||
self.assertRaises(ValueError, value.deserialize, '-1')
|
||||
|
||||
|
||||
class ConfigSchemaTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.schema = config.ConfigSchema()
|
||||
self.schema['foo'] = mock.Mock()
|
||||
self.schema['bar'] = mock.Mock()
|
||||
self.schema['baz'] = mock.Mock()
|
||||
self.values = {'bar': '123', 'foo': '456', 'baz': '678'}
|
||||
|
||||
def test_format(self):
|
||||
self.schema['foo'].format.return_value = 'qwe'
|
||||
self.schema['bar'].format.return_value = 'asd'
|
||||
self.schema['baz'].format.return_value = 'zxc'
|
||||
|
||||
expected = ['[qwerty]', 'foo = qwe', 'bar = asd', 'baz = zxc']
|
||||
result = self.schema.format('qwerty', self.values)
|
||||
self.assertEqual('\n'.join(expected), result)
|
||||
|
||||
def test_format_unkwown_value(self):
|
||||
self.schema['foo'].format.return_value = 'qwe'
|
||||
self.schema['bar'].format.return_value = 'asd'
|
||||
self.schema['baz'].format.return_value = 'zxc'
|
||||
self.values['unknown'] = 'rty'
|
||||
|
||||
result = self.schema.format('qwerty', self.values)
|
||||
self.assertNotIn('unknown = rty', result)
|
||||
|
||||
def test_convert(self):
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
def test_convert_with_missing_value(self):
|
||||
del self.values['foo']
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('not found', cm.exception['foo'])
|
||||
|
||||
def test_convert_with_extra_value(self):
|
||||
self.values['extra'] = '123'
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('unknown', cm.exception['extra'])
|
||||
|
||||
def test_convert_with_blank_value(self):
|
||||
self.values['foo'] = ''
|
||||
result = self.schema.convert(self.values.items())
|
||||
self.assertIsNone(result['foo'])
|
||||
|
||||
def test_convert_with_deserialization_error(self):
|
||||
self.schema['foo'].deserialize.side_effect = ValueError('failure')
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('failure', cm.exception['foo'])
|
||||
|
||||
def test_convert_with_multiple_deserialization_errors(self):
|
||||
self.schema['foo'].deserialize.side_effect = ValueError('failure')
|
||||
self.schema['bar'].deserialize.side_effect = ValueError('other')
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('failure', cm.exception['foo'])
|
||||
self.assertIn('other', cm.exception['bar'])
|
||||
|
||||
def test_convert_deserialization_unknown_and_missing_errors(self):
|
||||
self.values['extra'] = '123'
|
||||
self.schema['bar'].deserialize.side_effect = ValueError('failure')
|
||||
del self.values['baz']
|
||||
|
||||
with self.assertRaises(exceptions.ConfigError) as cm:
|
||||
self.schema.convert(self.values.items())
|
||||
|
||||
self.assertIn('unknown', cm.exception['extra'])
|
||||
self.assertNotIn('foo', cm.exception)
|
||||
self.assertIn('failure', cm.exception['bar'])
|
||||
self.assertIn('not found', cm.exception['baz'])
|
||||
|
||||
|
||||
class ExtensionConfigSchemaTest(unittest.TestCase):
|
||||
def test_schema_includes_enabled(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
self.assertIsInstance(schema['enabled'], config.Boolean)
|
||||
|
||||
def test_section_name_is_prefixed(self):
|
||||
schema = config.ExtensionConfigSchema()
|
||||
self.assertEqual('[ext.foo]', schema.format('foo', {}))
|
||||
|
||||
|
||||
class LogLevelConfigSchemaTest(unittest.TestCase):
|
||||
def test_conversion(self):
|
||||
schema = config.LogLevelConfigSchema()
|
||||
result = schema.convert([('foo.bar', 'DEBUG'), ('baz', 'INFO')])
|
||||
|
||||
self.assertEqual(logging.DEBUG, result['foo.bar'])
|
||||
self.assertEqual(logging.INFO, result['baz'])
|
||||
|
||||
def test_format(self):
|
||||
schema = config.LogLevelConfigSchema()
|
||||
expected = ['[levels]', 'baz = info', 'foo.bar = debug']
|
||||
result = schema.format('levels', {'foo.bar': logging.DEBUG, 'baz': logging.INFO})
|
||||
self.assertEqual('\n'.join(expected), result)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user