From 775ec649763f77bd869d203e5de7f9cfe1697e5b Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Fri, 21 Jan 2011 02:15:27 +0100 Subject: [PATCH] Test and improve password handling in MpdSession --- mopidy/frontends/mpd/session.py | 32 ++++++++++++------ tests/frontends/mpd/server_test.py | 53 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 10 deletions(-) diff --git a/mopidy/frontends/mpd/session.py b/mopidy/frontends/mpd/session.py index 3ad80c92..b67e7f13 100644 --- a/mopidy/frontends/mpd/session.py +++ b/mopidy/frontends/mpd/session.py @@ -49,8 +49,10 @@ class MpdSession(asynchat.async_chat): def handle_request(self, request): """Handle request by sending it to the MPD frontend.""" if not self.authenticated: - self.authenticated = self.check_password(request) - return + (self.authenticated, response) = self.check_password(request) + if response is not None: + self.send_response(response) + return my_end, other_end = multiprocessing.Pipe() self.core_queue.put({ 'to': 'frontend', @@ -76,13 +78,23 @@ class MpdSession(asynchat.async_chat): self.push(data) def check_password(self, request): + """ + Takes any request and tries to authenticate the client using it. + + :rtype: a two-tuple containing (is_authenticated, response_message). If + the response_message is :class:`None`, normal processing should + continue, even though the client may not be authenticated. + """ if not settings.MPD_SERVER_PASSWORD: - return True - if request == 'password "%s"' % settings.MPD_SERVER_PASSWORD: - self.send_response('OK') - return True + return (True, None) command = request.split(' ')[0] - self.send_response( - "ACK [4@0] {%s} " % command + - "you don't have permission for \"%s\"" % command) - return False + if command == 'password': + if request == 'password "%s"' % settings.MPD_SERVER_PASSWORD: + return (True, u'OK') + else: + return (False, u'ACK [3@0] {password} incorrect password') + if command in ('close', 'commands', 'notcommands', 'ping'): + return (False, None) + else: + return (False, "ACK [4@0] {%s} " % command + + "you don't have permission for \"%s\"" % command) diff --git a/tests/frontends/mpd/server_test.py b/tests/frontends/mpd/server_test.py index 9d006eb3..9d877ed5 100644 --- a/tests/frontends/mpd/server_test.py +++ b/tests/frontends/mpd/server_test.py @@ -1,5 +1,6 @@ import unittest +from mopidy import settings from mopidy.frontends.mpd import server class MpdServerTest(unittest.TestCase): @@ -21,8 +22,60 @@ class MpdSessionTest(unittest.TestCase): def setUp(self): self.session = server.MpdSession(None, None, (None, None), None) + def tearDown(self): + settings.runtime.clear() + def test_found_terminator_catches_decode_error(self): # Pressing Ctrl+C in a telnet session sends a 0xff byte to the server. self.session.input_buffer = ['\xff'] self.session.found_terminator() self.assertEqual(len(self.session.input_buffer), 0) + + def test_authentication_with_valid_password_is_accepted(self): + settings.MPD_SERVER_PASSWORD = u'topsecret' + authed, response = self.session.check_password(u'password "topsecret"') + self.assertTrue(authed) + self.assertEqual(u'OK', response) + + def test_authentication_with_invalid_password_is_not_accepted(self): + settings.MPD_SERVER_PASSWORD = u'topsecret' + authed, response = self.session.check_password(u'password "secret"') + self.assertFalse(authed) + self.assertEqual(u'ACK [3@0] {password} incorrect password', response) + + def test_authentication_with_anything_when_password_check_turned_off(self): + settings.MPD_SERVER_PASSWORD = False + authed, response = self.session.check_password(u'any request at all') + self.assertTrue(authed) + self.assertEqual(None, response) + + def test_anything_when_not_authenticated_should_fail(self): + settings.MPD_SERVER_PASSWORD = u'topsecret' + authed, response = self.session.check_password(u'any request at all') + self.assertFalse(authed) + self.assertEqual( + u'ACK [4@0] {any} you don\'t have permission for "any"', response) + + def test_close_is_allowed_without_authentication(self): + settings.MPD_SERVER_PASSWORD = u'topsecret' + authed, response = self.session.check_password(u'close') + self.assertFalse(authed) + self.assertEqual(None, response) + + def test_commands_is_allowed_without_authentication(self): + settings.MPD_SERVER_PASSWORD = u'topsecret' + authed, response = self.session.check_password(u'commands') + self.assertFalse(authed) + self.assertEqual(None, response) + + def test_notcommands_is_allowed_without_authentication(self): + settings.MPD_SERVER_PASSWORD = u'topsecret' + authed, response = self.session.check_password(u'notcommands') + self.assertFalse(authed) + self.assertEqual(None, response) + + def test_ping_is_allowed_without_authentication(self): + settings.MPD_SERVER_PASSWORD = u'topsecret' + authed, response = self.session.check_password(u'ping') + self.assertFalse(authed) + self.assertEqual(None, response)