Add command_list_ok_begin support, ack exceptions

This commit is contained in:
Stein Magnus Jodal 2009-12-26 02:44:32 +01:00
parent 8bd6d5092e
commit 814eb98b83
4 changed files with 222 additions and 129 deletions

6
mopidy/exceptions.py Normal file
View File

@ -0,0 +1,6 @@
class MpdAckError(Exception):
pass
class MpdNotImplemented(MpdAckError):
def __init__(self, *args):
super(MpdNotImplemented, self).__init__(u'Not implemented', *args)

View File

@ -4,6 +4,7 @@ import sys
from mopidy import settings
from mopidy.backends.spotify import SpotifyBackend
from mopidy.exceptions import MpdAckError, MpdNotImplemented
logger = logging.getLogger('handler')
@ -18,26 +19,40 @@ def register(pattern):
return func
return decorator
def flatten(the_list):
result = []
for element in the_list:
if isinstance(element, list):
result.extend(flatten(element))
else:
result.append(element)
return result
class MpdHandler(object):
def __init__(self, session=None, backend=None):
self.session = session
self.backend = backend
self.command_list = False
self.buffer = False
def handle_request(self, request):
def handle_request(self, request, add_ok=True):
if self.command_list is not False and request != u'command_list_end':
self.command_list.append(request)
return None
for pattern in _request_handlers:
matches = re.match(pattern, request)
if matches is not None:
groups = matches.groupdict()
response = _request_handlers[pattern](self, **groups)
if self.buffer:
self.response_buffer.append(response)
result = _request_handlers[pattern](self, **groups)
if self.command_list is not False:
return None
else:
return response
response = []
if result is not None:
response.append(result)
if add_ok:
response.append(u'OK')
return flatten(response)
logger.warning(u'Unhandled request: %s', request)
return False
raise MpdAckError(u'Unhandled request: %s' % request)
@register(r'^add "(?P<uri>[^"]*)"$')
def _add(self, uri):
@ -62,13 +77,26 @@ class MpdHandler(object):
@register(r'^command_list_begin$')
def _command_list_begin(self):
self.response_buffer = []
self.buffer = True
self.command_list = []
self.command_list_ok = False
@register(r'^command_list_ok_begin$')
def _command_list_ok_begin(self):
self.command_list = []
self.command_list_ok = True
@register(r'^command_list_end$')
def _command_list_end(self):
self.buffer = False
return self.response_buffer
(command_list, self.command_list) = (self.command_list, False)
(command_list_ok, self.command_list_ok) = (self.command_list_ok, False)
result = []
for command in command_list:
response = self.handle_request(command, add_ok=False)
if response is not None:
result.append(response)
if command_list_ok:
response.append(u'list_OK')
return result
@register(r'^consume "(?P<state>[01])"$')
def _consume(self, state):
@ -121,10 +149,9 @@ class MpdHandler(object):
def _kill(self):
self.session.do_kill()
@register(r'^list (?P<type>(artist|album))( (?P<artist>.*))*$')
@register(r'^list (?P<type>artist)$')
@register(r'^list (?P<type>album)( (?P<artist>.*))*$')
def _list(self, type, artist=None):
if type == u'artist' and artist is not None:
return False
pass # TODO
@register(r'^listall "(?P<uri>[^"]+)"')

View File

@ -2,6 +2,7 @@ import asynchat
import logging
from mopidy import get_mpd_version, settings
from mopidy.exceptions import MpdAckError
from mopidy.handler import MpdHandler
logger = logging.getLogger(u'session')
@ -36,10 +37,11 @@ class MpdSession(asynchat.async_chat):
self.handle_request(input)
def handle_request(self, input):
response = self.handler.handle_request(input)
self.handle_response(response)
if not self.handler.buffer:
self.send_response(u'OK')
try:
response = self.handler.handle_request(input)
self.handle_response(response)
except MpdAckError, e:
self.send_response(u'ACK %s' % e)
def handle_response(self, response):
if isinstance(response, list):

View File

@ -2,6 +2,7 @@ import unittest
from mopidy import handler
from mopidy.backends.dummy import DummyBackend
from mopidy.exceptions import MpdAckError
class RequestHandlerTest(unittest.TestCase):
def setUp(self):
@ -16,32 +17,58 @@ class RequestHandlerTest(unittest.TestCase):
except ValueError:
pass
def test_handling_unknown_request_returns_none(self):
result = self.h.handle_request('an unhandled request')
self.assertFalse(result)
def test_handling_unknown_request_raises_exception(self):
try:
result = self.h.handle_request('an unhandled request')
self.fail(u'An unknown request should raise an exception')
except MpdAckError:
pass
def test_handling_known_request(self):
expected = 'magic'
handler._request_handlers['known request'] = lambda x: expected
result = self.h.handle_request('known request')
self.assertEquals(expected, result)
self.assert_(u'OK' in result)
self.assert_(expected in result)
class CommandListsTest(unittest.TestCase):
def setUp(self):
self.h = handler.MpdHandler(backend=DummyBackend())
def test_commandlist_begin(self):
self.h.handle_request(u'command_list_begin')
expected = 'magic'
handler._request_handlers['known request'] = lambda x: expected
result = self.h.handle_request('known request')
def test_command_list_begin(self):
result = self.h.handle_request(u'command_list_begin')
self.assert_(result is None)
def test_commandlist_end(self):
expected = 'magic'
self.test_commandlist_begin()
def test_command_list_end(self):
self.h.handle_request(u'command_list_begin')
result = self.h.handle_request(u'command_list_end')
self.assert_(expected, result)
self.assert_(u'OK' in result)
def test_command_list_with_ping(self):
self.h.handle_request(u'command_list_begin')
self.assertEquals([], self.h.command_list)
self.assertEquals(False, self.h.command_list_ok)
self.h.handle_request(u'ping')
self.assert_(u'ping' in self.h.command_list)
result = self.h.handle_request(u'command_list_end')
self.assert_(u'OK' in result)
self.assertEquals(False, self.h.command_list)
def test_command_list_ok_begin(self):
result = self.h.handle_request(u'command_list_ok_begin')
self.assert_(result is None)
def test_command_list_ok_with_ping(self):
self.h.handle_request(u'command_list_ok_begin')
self.assertEquals([], self.h.command_list)
self.assertEquals(True, self.h.command_list_ok)
self.h.handle_request(u'ping')
self.assert_(u'ping' in self.h.command_list)
result = self.h.handle_request(u'command_list_end')
self.assert_(u'list_OK' in result)
self.assert_(u'OK' in result)
self.assertEquals(False, self.h.command_list)
self.assertEquals(False, self.h.command_list_ok)
class StatusHandlerTest(unittest.TestCase):
@ -50,22 +77,24 @@ class StatusHandlerTest(unittest.TestCase):
def test_clearerror(self):
result = self.h.handle_request(u'clearerror')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_currentsong(self):
result = self.h.handle_request(u'currentsong')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_idle_without_subsystems(self):
result = self.h.handle_request(u'idle')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_idle_with_subsystems(self):
result = self.h.handle_request(u'idle database playlist')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_stats(self):
result = self.h.handle_request(u'stats')
self.assert_(u'OK' in result)
result = result[0]
self.assert_('artists' in result)
self.assert_(int(result['artists']) >= 0)
self.assert_('albums' in result)
@ -83,6 +112,8 @@ class StatusHandlerTest(unittest.TestCase):
def test_status(self):
result = self.h.handle_request(u'status')
self.assert_(u'OK' in result)
result = dict(result)
self.assert_('volume' in result)
self.assert_(int(result['volume']) in xrange(0, 101))
self.assert_('repeat' in result)
@ -109,82 +140,84 @@ class PlaybackOptionsHandlerTest(unittest.TestCase):
def test_consume_off(self):
result = self.h.handle_request(u'consume "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_consume_on(self):
result = self.h.handle_request(u'consume "1"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_crossfade(self):
result = self.h.handle_request(u'crossfade "10"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_random_off(self):
result = self.h.handle_request(u'random "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_random_on(self):
result = self.h.handle_request(u'random "1"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_repeat_off(self):
result = self.h.handle_request(u'repeat "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_repeat_on(self):
result = self.h.handle_request(u'repeat "1"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_setvol_below_min(self):
result = self.h.handle_request(u'setvol "-10"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_setvol_min(self):
result = self.h.handle_request(u'setvol "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_setvol_middle(self):
result = self.h.handle_request(u'setvol "50"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_setvol_max(self):
result = self.h.handle_request(u'setvol "100"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_setvol_above_max(self):
result = self.h.handle_request(u'setvol "110"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_single_off(self):
result = self.h.handle_request(u'single "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_single_on(self):
result = self.h.handle_request(u'single "1"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_replay_gain_mode_off(self):
result = self.h.handle_request(u'replay_gain_mode off')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_replay_gain_mode_track(self):
result = self.h.handle_request(u'replay_gain_mode track')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_replay_gain_mode_album(self):
result = self.h.handle_request(u'replay_gain_mode album')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_replay_gain_status_default(self):
expected = u'off'
result = self.h.handle_request(u'replay_gain_status')
self.assertEquals(expected, result)
self.assert_(u'OK' in result)
self.assert_(expected in result)
def test_replay_gain_status_off(self):
expected = u'off'
self.h._replay_gain_mode(expected)
result = self.h.handle_request(u'replay_gain_status')
self.assertEquals(expected, result)
self.assert_(u'OK' in result)
self.assert_(expected in result)
#def test_replay_gain_status_track(self):
# expected = u'track'
@ -206,43 +239,43 @@ class PlaybackControlHandlerTest(unittest.TestCase):
def test_next(self):
result = self.h.handle_request(u'next')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_pause_off(self):
result = self.h.handle_request(u'pause "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
self.assertEquals(self.b.PLAY, self.b.state)
def test_pause_on(self):
result = self.h.handle_request(u'pause "1"')
self.assert_(result is None)
self.assert_(u'OK' in result)
self.assertEquals(self.b.PAUSE, self.b.state)
def test_play(self):
result = self.h.handle_request(u'play "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
self.assertEquals(self.b.PLAY, self.b.state)
def test_playid(self):
result = self.h.handle_request(u'playid "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
self.assertEquals(self.b.PLAY, self.b.state)
def test_previous(self):
result = self.h.handle_request(u'previous')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_seek(self):
result = self.h.handle_request(u'seek 0 30')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_seekid(self):
result = self.h.handle_request(u'seekid 0 30')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_stop(self):
result = self.h.handle_request(u'stop')
self.assert_(result is None)
self.assert_(u'OK' in result)
self.assertEquals(self.b.STOP, self.b.state)
@ -252,51 +285,57 @@ class CurrentPlaylistHandlerTest(unittest.TestCase):
def test_add(self):
result = self.h.handle_request(u'add "file:///dev/urandom"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_addid_without_songpos(self):
result = self.h.handle_request(u'addid "file:///dev/urandom"')
self.assert_('id' in result)
self.assert_(u'OK' in result)
result = result[0]
self.assert_(u'id' in result)
self.assert_(u'id' in result)
def test_addid_with_songpos(self):
result = self.h.handle_request(u'addid "file:///dev/urandom" 0')
self.assert_('id' in result)
self.assert_(u'OK' in result)
result = result[0]
self.assert_(u'id' in result)
def test_clear(self):
result = self.h.handle_request(u'clear')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_delete_songpos(self):
result = self.h.handle_request(u'delete 5')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_delete_open_range(self):
result = self.h.handle_request(u'delete 10:')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_delete_closed_range(self):
result = self.h.handle_request(u'delete 10:20')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_deleteid(self):
result = self.h.handle_request(u'deleteid 0')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_move_songpos(self):
result = self.h.handle_request(u'move 5 0')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_move_open_range(self):
result = self.h.handle_request(u'move 10: 0')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_move_closed_range(self):
result = self.h.handle_request(u'move 10:20 0')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_moveid(self):
result = self.h.handle_request(u'moveid 0 10')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlist_returns_same_as_playlistinfo(self):
playlist_result = self.h.handle_request(u'playlist')
@ -305,63 +344,63 @@ class CurrentPlaylistHandlerTest(unittest.TestCase):
def test_playlistfind(self):
result = self.h.handle_request(u'playlistfind tag needle')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistid_without_songid(self):
result = self.h.handle_request(u'playlistid')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistid_with_songid(self):
result = self.h.handle_request(u'playlistid "10"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistinfo_without_songpos_or_range(self):
result = self.h.handle_request(u'playlistinfo')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistinfo_with_songpos(self):
result = self.h.handle_request(u'playlistinfo "5"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistinfo_with_open_range(self):
result = self.h.handle_request(u'playlistinfo "10:"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistinfo_with_closed_range(self):
result = self.h.handle_request(u'playlistinfo "10:20"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistsearch(self):
result = self.h.handle_request(u'playlistsearch tag needle')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_plchanges(self):
result = self.h.handle_request(u'plchanges "0"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_plchangesposid(self):
result = self.h.handle_request(u'plchangesposid 0')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_shuffle_without_range(self):
result = self.h.handle_request(u'shuffle')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_shuffle_with_open_range(self):
result = self.h.handle_request(u'shuffle 10:')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_shuffle_with_closed_range(self):
result = self.h.handle_request(u'shuffle 10:20')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_swap(self):
result = self.h.handle_request(u'swap 10 20')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_swapid(self):
result = self.h.handle_request(u'swapid 10 20')
self.assert_(result is None)
self.assert_(u'OK' in result)
class StoredPlaylistsHandlerTest(unittest.TestCase):
@ -370,48 +409,48 @@ class StoredPlaylistsHandlerTest(unittest.TestCase):
def test_listplaylist(self):
result = self.h.handle_request(u'listplaylist name')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_listplaylistinfo(self):
result = self.h.handle_request(u'listplaylistinfo name')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_listplaylists(self):
result = self.h.handle_request(u'listplaylists')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_load(self):
result = self.h.handle_request(u'load "name"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistadd(self):
result = self.h.handle_request(
u'playlistadd name "file:///dev/urandom"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistclear(self):
result = self.h.handle_request(u'playlistclear name')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistdelete(self):
result = self.h.handle_request(u'playlistdelete name 5')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_playlistmove(self):
result = self.h.handle_request(u'playlistmove name 5a 10')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_rename(self):
result = self.h.handle_request(u'rename name new_name')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_rm(self):
result = self.h.handle_request(u'rm name')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_save(self):
result = self.h.handle_request(u'save name')
self.assert_(result is None)
self.assert_(u'OK' in result)
class MusicDatabaseHandlerTest(unittest.TestCase):
@ -420,51 +459,57 @@ class MusicDatabaseHandlerTest(unittest.TestCase):
def test_count(self):
result = self.h.handle_request(u'count tag needle')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_find_album(self):
result = self.h.handle_request(u'find album what')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_find_artist(self):
result = self.h.handle_request(u'find artist what')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_find_title(self):
result = self.h.handle_request(u'find title what')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_find_else_should_fail(self):
result = self.h.handle_request(u'find somethingelse what')
self.assert_(result is False)
try:
result = self.h.handle_request(u'find somethingelse what')
self.fail('Find with unknown type should fail')
except MpdAckError:
pass
def test_findadd(self):
result = self.h.handle_request(u'findadd album what')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_list_artist(self):
result = self.h.handle_request(u'list artist')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_list_artist_with_artist_should_fail(self):
result = self.h.handle_request(u'list artist anartist')
self.assert_(result is False)
try:
result = self.h.handle_request(u'list artist anartist')
self.fail(u'Listing artists filtered by an artist should fail')
except MpdAckError:
pass
def test_list_album_without_artist(self):
result = self.h.handle_request(u'list album')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_list_album_with_artist(self):
result = self.h.handle_request(u'list album anartist')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_listall(self):
result = self.h.handle_request(u'listall "file:///dev/urandom"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_listallinfo(self):
result = self.h.handle_request(u'listallinfo "file:///dev/urandom"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_lsinfo_without_path_returns_same_as_listplaylists(self):
lsinfo_result = self.h.handle_request(u'lsinfo')
@ -473,7 +518,7 @@ class MusicDatabaseHandlerTest(unittest.TestCase):
def test_lsinfo_with_path(self):
result = self.h.handle_request(u'lsinfo ""')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_lsinfo_for_root_returns_same_as_listplaylists(self):
lsinfo_result = self.h.handle_request(u'lsinfo "/"')
@ -482,41 +527,52 @@ class MusicDatabaseHandlerTest(unittest.TestCase):
def test_search_album(self):
result = self.h.handle_request(u'search "album" "analbum"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_search_artist(self):
result = self.h.handle_request(u'search "artist" "anartist"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_search_filename(self):
result = self.h.handle_request(u'search "filename" "afilename"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_search_title(self):
result = self.h.handle_request(u'search "title" "atitle"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_search_else_should_fail(self):
result = self.h.handle_request(u'search "sometype" "something"')
self.assert_(result is False)
try:
result = self.h.handle_request(u'search "sometype" "something"')
self.fail(u'Search with unknown type should fail')
except MpdAckError:
pass
def test_update_without_uri(self):
result = self.h.handle_request(u'update')
self.assert_(u'OK' in result)
result = result[0]
self.assert_('updating_db' in result)
self.assert_(result['updating_db'] >= 0)
def test_update_with_uri(self):
result = self.h.handle_request(u'update "file:///dev/urandom"')
self.assert_(u'OK' in result)
result = result[0]
self.assert_('updating_db' in result)
self.assert_(result['updating_db'] >= 0)
def test_rescan_without_uri(self):
result = self.h.handle_request(u'rescan')
self.assert_(u'OK' in result)
result = result[0]
self.assert_('updating_db' in result)
self.assert_(result['updating_db'] >= 0)
def test_rescan_with_uri(self):
result = self.h.handle_request(u'rescan "file:///dev/urandom"')
self.assert_(u'OK' in result)
result = result[0]
self.assert_('updating_db' in result)
self.assert_(result['updating_db'] >= 0)
@ -543,23 +599,23 @@ class ConnectionHandlerTest(unittest.TestCase):
def test_close(self):
result = self.h.handle_request(u'close')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_empty_request(self):
result = self.h.handle_request(u'')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_kill(self):
result = self.h.handle_request(u'kill')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_password(self):
result = self.h.handle_request(u'password "secret"')
self.assert_(result is None)
self.assert_(u'OK' in result)
def test_ping(self):
result = self.h.handle_request(u'ping')
self.assert_(result is None)
self.assert_(u'OK' in result)
class AudioOutputHandlerTest(unittest.TestCase):
def setUp(self):
@ -574,6 +630,8 @@ class ReflectionHandlerTest(unittest.TestCase):
def test_urlhandlers(self):
result = self.h.handle_request(u'urlhandlers')
self.assert_(u'OK' in result)
result = result[0]
self.assert_('dummy:' in result)
pass # TODO