Test and improve password handling in MpdSession

This commit is contained in:
Stein Magnus Jodal 2011-01-21 02:15:27 +01:00
parent 00897e9024
commit 775ec64976
2 changed files with 75 additions and 10 deletions

View File

@ -49,8 +49,10 @@ class MpdSession(asynchat.async_chat):
def handle_request(self, request):
"""Handle request by sending it to the MPD frontend."""
if not self.authenticated:
self.authenticated = self.check_password(request)
return
(self.authenticated, response) = self.check_password(request)
if response is not None:
self.send_response(response)
return
my_end, other_end = multiprocessing.Pipe()
self.core_queue.put({
'to': 'frontend',
@ -76,13 +78,23 @@ class MpdSession(asynchat.async_chat):
self.push(data)
def check_password(self, request):
"""
Takes any request and tries to authenticate the client using it.
:rtype: a two-tuple containing (is_authenticated, response_message). If
the response_message is :class:`None`, normal processing should
continue, even though the client may not be authenticated.
"""
if not settings.MPD_SERVER_PASSWORD:
return True
if request == 'password "%s"' % settings.MPD_SERVER_PASSWORD:
self.send_response('OK')
return True
return (True, None)
command = request.split(' ')[0]
self.send_response(
"ACK [4@0] {%s} " % command +
"you don't have permission for \"%s\"" % command)
return False
if command == 'password':
if request == 'password "%s"' % settings.MPD_SERVER_PASSWORD:
return (True, u'OK')
else:
return (False, u'ACK [3@0] {password} incorrect password')
if command in ('close', 'commands', 'notcommands', 'ping'):
return (False, None)
else:
return (False, "ACK [4@0] {%s} " % command +
"you don't have permission for \"%s\"" % command)

View File

@ -1,5 +1,6 @@
import unittest
from mopidy import settings
from mopidy.frontends.mpd import server
class MpdServerTest(unittest.TestCase):
@ -21,8 +22,60 @@ class MpdSessionTest(unittest.TestCase):
def setUp(self):
self.session = server.MpdSession(None, None, (None, None), None)
def tearDown(self):
settings.runtime.clear()
def test_found_terminator_catches_decode_error(self):
# Pressing Ctrl+C in a telnet session sends a 0xff byte to the server.
self.session.input_buffer = ['\xff']
self.session.found_terminator()
self.assertEqual(len(self.session.input_buffer), 0)
def test_authentication_with_valid_password_is_accepted(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'password "topsecret"')
self.assertTrue(authed)
self.assertEqual(u'OK', response)
def test_authentication_with_invalid_password_is_not_accepted(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'password "secret"')
self.assertFalse(authed)
self.assertEqual(u'ACK [3@0] {password} incorrect password', response)
def test_authentication_with_anything_when_password_check_turned_off(self):
settings.MPD_SERVER_PASSWORD = False
authed, response = self.session.check_password(u'any request at all')
self.assertTrue(authed)
self.assertEqual(None, response)
def test_anything_when_not_authenticated_should_fail(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'any request at all')
self.assertFalse(authed)
self.assertEqual(
u'ACK [4@0] {any} you don\'t have permission for "any"', response)
def test_close_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'close')
self.assertFalse(authed)
self.assertEqual(None, response)
def test_commands_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'commands')
self.assertFalse(authed)
self.assertEqual(None, response)
def test_notcommands_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'notcommands')
self.assertFalse(authed)
self.assertEqual(None, response)
def test_ping_is_allowed_without_authentication(self):
settings.MPD_SERVER_PASSWORD = u'topsecret'
authed, response = self.session.check_password(u'ping')
self.assertFalse(authed)
self.assertEqual(None, response)