diff --git a/mopidy/exceptions.py b/mopidy/exceptions.py index 32a2bd9a..d02a288a 100644 --- a/mopidy/exceptions.py +++ b/mopidy/exceptions.py @@ -46,3 +46,7 @@ class ScannerError(MopidyException): class AudioException(MopidyException): pass + + +class ValidationError(ValueError): + pass diff --git a/mopidy/utils/validation.py b/mopidy/utils/validation.py new file mode 100644 index 00000000..a0306564 --- /dev/null +++ b/mopidy/utils/validation.py @@ -0,0 +1,102 @@ +from __future__ import absolute_import, unicode_literals + +import collections +import urlparse + +from mopidy import compat, exceptions + +PLAYBACK_STATES = {'paused', 'stopped', 'playing'} + +QUERY_FIELDS = { + 'uri', 'track_name', 'album', 'artist', 'albumartist', 'composer', + 'performer', 'track_no', 'genre', 'date', 'comment', 'any', 'tlid', 'name'} + +DISTINCT_FIELDS = { + 'artist', 'albumartist', 'album', 'composer', 'performer', 'date', 'genre'} + + +# TODO: _check_iterable(check, msg, **kwargs) + [check(a) for a in arg]? +def _check_iterable(arg, msg, **kwargs): + """Ensure we have an iterable which is not a string.""" + if isinstance(arg, compat.string_types): + raise exceptions.ValidationError(msg.format(arg=arg, **kwargs)) + elif not isinstance(arg, collections.Iterable): + raise exceptions.ValidationError(msg.format(arg=arg, **kwargs)) + + +def check_choice(arg, choices, msg='Expected one of {choices}, not {arg!r}'): + if arg not in choices: + raise exceptions.ValidationError(msg.format( + arg=arg, choices=tuple(choices))) + + +def check_boolean(arg, msg='Expected a boolean, not {arg!r}'): + check_instance(arg, bool, msg=msg) + + +def check_instance(arg, cls, msg='Expected a {name} instance, not {arg!r}'): + if not isinstance(arg, cls): + raise exceptions.ValidationError( + msg.format(arg=arg, name=cls.__name__)) + + +def check_instances(arg, cls, msg='Expected a list of {name}, not {arg!r}'): + _check_iterable(arg, msg, name=cls.__name__) + if not all(isinstance(instance, cls) for instance in arg): + raise exceptions.ValidationError( + msg.format(arg=arg, name=cls.__name__)) + + +def check_integer(arg, min=None, max=None): + if not isinstance(arg, (int, long)): + raise exceptions.ValidationError('Expected an integer, not %r' % arg) + elif min is not None and arg < min: + raise exceptions.ValidationError( + 'Expected number larger or equal to %d, not %r' % (min, arg)) + elif max is not None and arg > max: + raise exceptions.ValidationError( + 'Expected number smaller or equal to %d, not %r' % (max, arg)) + + +def check_query(arg, list_values=True): + # TODO: normalize name -> track_name + # TODO: normalize value -> [value] + # TODO: normalize blank -> [] or just remove field? + # TODO: normalize int -> str or remove int support? + # TODO: remove list_values? + # TODO: don't allow for instance tlid field in all queries? + + if not isinstance(arg, collections.Mapping): + raise exceptions.ValidationError( + 'Expected a query dictionary, not {arg!r}'.format(arg=arg)) + + for key, value in arg.items(): + check_choice(key, QUERY_FIELDS, msg='Expected query field to be one ' + 'of {choices}, not {arg!r}') + if list_values: + msg = 'Expected "{key}" values to be list of strings, not {arg!r}' + _check_iterable(value, msg, key=key) + [_check_query_value(key, v, msg) for v in value] + else: + _check_query_value(key, value, 'Expected "{key}" value to be a ' + 'string, not {arg!r}') + + +def _check_query_value(key, arg, msg): + if isinstance(arg, compat.string_types): + if not arg.strip(): + raise exceptions.ValidationError(msg.format(arg=arg, key=key)) + elif not isinstance(arg, (int, long)): + raise exceptions.ValidationError(msg.format(arg=arg, key=key)) + + +def check_uri(arg, msg='Expected a valid URI, not {arg!r}'): + if not isinstance(arg, compat.string_types): + raise exceptions.ValidationError(msg.format(arg=arg)) + elif urlparse.urlparse(arg).scheme == '': + raise exceptions.ValidationError(msg.format(arg=arg)) + + +def check_uris(arg, msg='Expected a list of URIs, not {arg!r}'): + _check_iterable(arg, msg) + [check_uri(a, msg) for a in arg] diff --git a/tests/utils/test_validation.py b/tests/utils/test_validation.py new file mode 100644 index 00000000..d55a918e --- /dev/null +++ b/tests/utils/test_validation.py @@ -0,0 +1,163 @@ +from __future__ import absolute_import, unicode_literals + +from pytest import raises + +from mopidy import compat, exceptions +from mopidy.utils import validation + + +def test_check_boolean_with_valid_values(): + for value in (True, False): + validation.check_boolean(value) + + +def test_check_boolean_with_truthy_values(): + for value in 1, 0, None, '', list(), tuple(): + with raises(exceptions.ValidationError): + validation.check_boolean(value) + + +def test_check_boolean_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_boolean(1234) + assert 'Expected a boolean, not 1234' == str(excinfo.value) + + +def test_check_choice_with_valid_values(): + for value, choices in (2, (1, 2, 3)), ('abc', ('abc', 'def')): + validation.check_choice(value, choices) + + +def test_check_choice_with_invalid_values(): + for value, choices in (5, (1, 2, 3)), ('xyz', ('abc', 'def')): + with raises(exceptions.ValidationError): + validation.check_choice(value, choices) + + +def test_check_choice_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_choice(5, (1, 2, 3)) + assert 'Expected one of (1, 2, 3), not 5' == str(excinfo.value) + + +def test_check_instance_with_valid_choices(): + for value, cls in ((True, bool), ('a', compat.text_type), (123, int)): + validation.check_instance(value, cls) + + +def test_check_instance_with_invalid_values(): + for value, cls in (1, str), ('abc', int): + with raises(exceptions.ValidationError): + validation.check_instance(value, cls) + + +def test_check_instance_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_instance(1, dict) + assert 'Expected a dict instance, not 1' == str(excinfo.value) + + +def test_check_instances_with_valid_values(): + validation.check_instances([], int) + validation.check_instances([1, 2], int) + validation.check_instances((1, 2), int) + + +def test_check_instances_with_invalid_values(): + with raises(exceptions.ValidationError): + validation.check_instances('abc', compat.string_types) + with raises(exceptions.ValidationError): + validation.check_instances(['abc', 123], compat.string_types) + with raises(exceptions.ValidationError): + validation.check_instances(None, compat.string_types) + with raises(exceptions.ValidationError): + validation.check_instances([None], compat.string_types) + + +def test_check_instances_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_instances([1], compat.string_types) + assert 'Expected a list of basestring, not [1]' == str(excinfo.value) + + +def test_check_query_valid_values(): + for value in {}, {'any': []}, {'any': ['abc']}: + validation.check_query(value) + + +def test_check_query_random_iterables(): + for value in None, tuple(), list(), 'abc': + with raises(exceptions.ValidationError): + validation.check_query(value) + + +def test_check_mapping_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_query([]) + assert 'Expected a query dictionary, not []' == str(excinfo.value) + + +def test_check_query_invalid_fields(): + for value in 'wrong', 'bar', 'foo': + with raises(exceptions.ValidationError): + validation.check_query({value: []}) + + +def test_check_field_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_query({'wrong': ['abc']}) + assert 'Expected query field to be one of ' in str(excinfo.value) + + +def test_check_query_invalid_values(): + for value in '', None, 'foo', 123, [''], [None]: + with raises(exceptions.ValidationError): + validation.check_query({'any': value}) + + +def test_check_values_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_query({'any': 'abc'}) + assert 'Expected "any" values to be list of strings' in str(excinfo.value) + + +def test_check_uri_with_valid_values(): + for value in 'foobar:', 'http://example.com', 'git+http://example.com': + validation.check_uri(value) + + +def test_check_uri_with_invalid_values(): + # Note that tuple catches a potential bug with using "'foo' % arg" for + # formatting. + for value in ('foobar', 'htt p://example.com', None, 1234, tuple()): + with raises(exceptions.ValidationError): + validation.check_uri(value) + + +def test_check_uri_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_uri('testing') + assert "Expected a valid URI, not u'testing'" == str(excinfo.value) + + +def test_check_uris_with_valid_values(): + validation.check_uris([]) + validation.check_uris(['foobar:']) + validation.check_uris(('foobar:',)) + + +def test_check_uris_with_invalid_values(): + with raises(exceptions.ValidationError): + validation.check_uris('foobar:') + with raises(exceptions.ValidationError): + validation.check_uris(None) + with raises(exceptions.ValidationError): + validation.check_uris([None]) + with raises(exceptions.ValidationError): + validation.check_uris(['foobar:', 'foobar']) + + +def test_check_uris_error_message(): + with raises(exceptions.ValidationError) as excinfo: + validation.check_uris('testing') + assert "Expected a list of URIs, not u'testing'" == str(excinfo.value)