Move MPD authentication check from MpdSession to MpdDispatcher

This commit is contained in:
Stein Magnus Jodal 2011-06-04 00:42:51 +02:00
parent 1db84dccca
commit a68bdae751
4 changed files with 93 additions and 77 deletions

View File

@ -4,6 +4,7 @@ import re
from pykka import ActorDeadError
from pykka.registry import ActorRegistry
from mopidy import settings
from mopidy.backends.base import Backend
from mopidy.frontends.mpd.exceptions import (MpdAckError, MpdArgError,
MpdUnknownCommand, MpdSystemError)
@ -28,12 +29,18 @@ class MpdDispatcher(object):
"""
def __init__(self, session=None):
self.authenticated = False
self.command_list = False
self.command_list_ok = False
self.context = MpdContext(self, session=session)
def handle_request(self, request, current_command_list_index=None):
"""Dispatch incoming requests to the correct handler."""
if not self.authenticated:
(self.authenticated, result) = self._check_password(request)
if result:
return result
if self._is_receiving_command_list(request):
self.command_list.append(request)
return None
@ -55,6 +62,29 @@ class MpdDispatcher(object):
return self._format_response(result)
def _check_password(self, request):
"""
Takes any request and tries to authenticate the client using it.
:rtype: a two-tuple containing (is_authenticated, response_message). If
the response_message is :class:`None`, normal processing should
continue, even though the client may not be authenticated.
"""
if settings.MPD_SERVER_PASSWORD is None:
return (True, None)
command = request.split(' ')[0]
if command == 'password':
if request == 'password "%s"' % settings.MPD_SERVER_PASSWORD:
return (True, [u'OK'])
else:
return (False, [u'ACK [3@0] {password} incorrect password'])
if command in ('close', 'commands', 'notcommands', 'ping'):
return (False, None)
else:
return (False,
[u'ACK [4@0] {%(c)s} you don\'t have permission for "%(c)s"' %
{'c': command}])
def _is_receiving_command_list(self, request):
return (self.command_list is not False
and request != u'command_list_end')

View File

@ -46,11 +46,6 @@ class MpdSession(asynchat.async_chat):
def handle_request(self, request):
"""Handle request using the MPD command handlers."""
if not self.authenticated:
(self.authenticated, response) = self.check_password(request)
if response is not None:
self.send_response(response)
return
response = self.dispatcher.handle_request(request)
if response is not None:
self.handle_response(response)
@ -66,26 +61,3 @@ class MpdSession(asynchat.async_chat):
output = u'%s%s' % (output, LINE_TERMINATOR)
data = output.encode(ENCODING)
self.push(data)
def check_password(self, request):
"""
Takes any request and tries to authenticate the client using it.
:rtype: a two-tuple containing (is_authenticated, response_message). If
the response_message is :class:`None`, normal processing should
continue, even though the client may not be authenticated.
"""
if settings.MPD_SERVER_PASSWORD is None:
return (True, None)
command = request.split(' ')[0]
if command == 'password':
if request == 'password "%s"' % settings.MPD_SERVER_PASSWORD:
return (True, u'OK')
else:
return (False, u'ACK [3@0] {password} incorrect password')
if command in ('close', 'commands', 'notcommands', 'ping'):
return (False, None)
else:
return (False,
u'ACK [4@0] {%(c)s} you don\'t have permission for "%(c)s"' %
{'c': command})

View File

@ -0,0 +1,63 @@
import mock
import unittest
from mopidy import settings
from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.session import MpdSession
class AuthenticationTest(unittest.TestCase):
def setUp(self):
self.session = mock.Mock(spec=MpdSession)
self.dispatcher = MpdDispatcher(session=self.session)
def tearDown(self):
settings.runtime.clear()
def test_authentication_with_valid_password_is_accepted(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
response = self.dispatcher.handle_request(u'password "topsecret"')
self.assertTrue(self.dispatcher.authenticated)
self.assert_(u'OK' in response)
def test_authentication_with_invalid_password_is_not_accepted(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
response = self.dispatcher.handle_request(u'password "secret"')
self.assertFalse(self.dispatcher.authenticated)
self.assert_(u'ACK [3@0] {password} incorrect password' in response)
def test_authentication_with_anything_when_password_check_turned_off(self):
settings.MPD_SERVER_PASSWORD = None
response = self.dispatcher.handle_request(u'any request at all')
self.assertTrue(self.dispatcher.authenticated)
self.assert_('ACK [5@0] {} unknown command "any"' in response)
def test_anything_when_not_authenticated_should_fail(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
response = self.dispatcher.handle_request(u'any request at all')
self.assertFalse(self.dispatcher.authenticated)
self.assert_(
u'ACK [4@0] {any} you don\'t have permission for "any"' in response)
def test_close_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
response = self.dispatcher.handle_request(u'close')
self.assertFalse(self.dispatcher.authenticated)
self.assert_(u'OK' in response)
def test_commands_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
response = self.dispatcher.handle_request(u'commands')
self.assertFalse(self.dispatcher.authenticated)
self.assert_(u'OK' in response)
def test_notcommands_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
response = self.dispatcher.handle_request(u'notcommands')
self.assertFalse(self.dispatcher.authenticated)
self.assert_(u'OK' in response)
def test_ping_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
response = self.dispatcher.handle_request(u'ping')
self.assertFalse(self.dispatcher.authenticated)
self.assert_(u'OK' in response)

View File

@ -44,52 +44,3 @@ class MpdSessionTest(unittest.TestCase):
self.session.input_buffer = ['\xff']
self.session.found_terminator()
self.assertEqual(len(self.session.input_buffer), 0)
def test_authentication_with_valid_password_is_accepted(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'password "topsecret"')
self.assertTrue(authed)
self.assertEqual(u'OK', response)
def test_authentication_with_invalid_password_is_not_accepted(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'password "secret"')
self.assertFalse(authed)
self.assertEqual(u'ACK [3@0] {password} incorrect password', response)
def test_authentication_with_anything_when_password_check_turned_off(self):
settings.MPD_SERVER_PASSWORD = None
authed, response = self.session.check_password(u'any request at all')
self.assertTrue(authed)
self.assertEqual(None, response)
def test_anything_when_not_authenticated_should_fail(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'any request at all')
self.assertFalse(authed)
self.assertEqual(
u'ACK [4@0] {any} you don\'t have permission for "any"', response)
def test_close_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'close')
self.assertFalse(authed)
self.assertEqual(None, response)
def test_commands_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'commands')
self.assertFalse(authed)
self.assertEqual(None, response)
def test_notcommands_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'notcommands')
self.assertFalse(authed)
self.assertEqual(None, response)
def test_ping_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'ping')
self.assertFalse(authed)
self.assertEqual(None, response)