diff --git a/mopidy/frontends/mpd/server.py b/mopidy/frontends/mpd/server.py index 62e443fb..1e5f5d4a 100644 --- a/mopidy/frontends/mpd/server.py +++ b/mopidy/frontends/mpd/server.py @@ -1,14 +1,17 @@ -import asyncore import logging import sys +import gobject + from mopidy import settings from mopidy.utils import network -from .session import MpdSession +from mopidy.frontends.mpd.dispatcher import MpdDispatcher +from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR, VERSION +from mopidy.utils.log import indent logger = logging.getLogger('mopidy.frontends.mpd.server') -class MpdServer(asyncore.dispatcher): +class MpdServer(object): """ The MPD server. Creates a :class:`mopidy.frontends.mpd.session.MpdSession` for each client connection. @@ -17,22 +20,46 @@ class MpdServer(asyncore.dispatcher): def start(self): """Start MPD server.""" try: - self.set_socket(network.create_socket()) - self.set_reuse_addr() hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME) port = settings.MPD_SERVER_PORT logger.debug(u'MPD server is binding to [%s]:%s', hostname, port) - self.bind((hostname, port)) - self.listen(1) + network.Listener((hostname, port), handler=MpdHandler) logger.info(u'MPD server running at [%s]:%s', hostname, port) except IOError, e: logger.error(u'MPD server startup failed: %s' % str(e).decode('utf-8')) sys.exit(1) - def handle_accept(self): - """Called by asyncore when a new client connects.""" - (client_socket, client_socket_address) = self.accept() - logger.info(u'MPD client connection from [%s]:%s', - client_socket_address[0], client_socket_address[1]) - MpdSession(self, client_socket, client_socket_address) + +class MpdHandler(network.BaseHandler): + """ + The MPD client session. Keeps track of a single client session. Any + requests from the client is passed on to the MPD request dispatcher. + """ + + terminator = LINE_TERMINATOR + + def __init__(self, (sock, addr)): + super(MpdHandler, self).__init__((sock, addr)) + self.dispatcher = MpdDispatcher(session=self) + self.send_response([u'OK MPD %s' % VERSION]) + + def recv(self, line): + """Handle the request using the MPD command handlers.""" + request = line.decode(ENCODING) + logger.debug(u'Request from [%s]:%s: %s', self.addr[0], + self.addr[1], indent(request)) + self.send_response(self.dispatcher.handle_request(request)) + + def send_response(self, response): + """ + Format a response from the MPD command handlers and send it to the + client. + """ + if response: + response = LINE_TERMINATOR.join(response) + logger.debug(u'Response to [%s]:%s: %s', self.addr[0], + self.addr[1], indent(response)) + response = u'%s%s' % (response, LINE_TERMINATOR) + data = response.encode(ENCODING) + self.send(data) diff --git a/mopidy/frontends/mpd/session.py b/mopidy/frontends/mpd/session.py deleted file mode 100644 index ce5d3be7..00000000 --- a/mopidy/frontends/mpd/session.py +++ /dev/null @@ -1,58 +0,0 @@ -import asynchat -import logging - -from mopidy.frontends.mpd.dispatcher import MpdDispatcher -from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR, VERSION -from mopidy.utils.log import indent - -logger = logging.getLogger('mopidy.frontends.mpd.session') - -class MpdSession(asynchat.async_chat): - """ - The MPD client session. Keeps track of a single client session. Any - requests from the client is passed on to the MPD request dispatcher. - """ - - def __init__(self, server, client_socket, client_socket_address): - asynchat.async_chat.__init__(self, sock=client_socket) - self.server = server - self.client_address = client_socket_address[0] - self.client_port = client_socket_address[1] - self.input_buffer = [] - self.authenticated = False - self.set_terminator(LINE_TERMINATOR.encode(ENCODING)) - self.dispatcher = MpdDispatcher(session=self) - self.send_response([u'OK MPD %s' % VERSION]) - - def collect_incoming_data(self, data): - """Called by asynchat when new data arrives.""" - self.input_buffer.append(data) - - def found_terminator(self): - """Called by asynchat when a terminator is found in incoming data.""" - data = ''.join(self.input_buffer).strip() - self.input_buffer = [] - try: - self.send_response(self.handle_request(data)) - except UnicodeDecodeError as e: - logger.warning(u'Received invalid data: %s', e) - - def handle_request(self, request): - """Handle the request using the MPD command handlers.""" - request = request.decode(ENCODING) - logger.debug(u'Request from [%s]:%s: %s', self.client_address, - self.client_port, indent(request)) - return self.dispatcher.handle_request(request) - - def send_response(self, response): - """ - Format a response from the MPD command handlers and send it to the - client. - """ - if response: - response = LINE_TERMINATOR.join(response) - logger.debug(u'Response to [%s]:%s: %s', self.client_address, - self.client_port, indent(response)) - response = u'%s%s' % (response, LINE_TERMINATOR) - data = response.encode(ENCODING) - self.push(data) diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index 80a51c77..a5680466 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -1,6 +1,7 @@ import logging import re import socket +import gobject logger = logging.getLogger('mopidy.utils.server') @@ -35,3 +36,129 @@ def format_hostname(hostname): if (has_ipv6 and re.match('\d+.\d+.\d+.\d+', hostname) is not None): hostname = '::ffff:%s' % hostname return hostname + +class BaseHandler(object): + """Buffered lined based client, subclass for use.""" + + #: Line terminator to use in parse_line, can be overridden by subclasses. + terminator = '\n' + + def __init__(self, (sock, addr)): + logger.debug('Established connection from %s', addr) + + self.sock, self.addr = sock, addr + self.receiver = None + self.sender = None + self.recv_buffer = '' + self.send_buffer = '' + + self.sock.setblocking(0) + self.add_recv_watch() + + def add_recv_watch(self): + """Register recv and error handling of socket.""" + if self.receiver is None: + self.receiver = gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN + | gobject.IO_ERR | gobject.IO_HUP, self.handle) + + def clear_recv_watch(self): + if self.receiver is not None: + gobject.source_remove(self.receiver) + self.receiver = None + + def add_send_watch(self): + """Register send handling if it has not already been done.""" + if self.sender is None: + self.sender = gobject.io_add_watch(self.sock.fileno(), + gobject.IO_OUT, self.handle) + + def clear_send_watch(self): + """Remove send watcher if it is set.""" + if self.sender is not None: + gobject.source_remove(self.sender) + self.sender = None + + def handle(self, fd, flags): + """Dispatch based on current flags.""" + if flags & (gobject.IO_ERR | gobject.IO_HUP): + return self.close() + if flags & gobject.IO_IN: + return self.io_in() + if flags & gobject.IO_OUT: + return self.io_out() + logger.error('Unknown flag: %s', flags) + return False + + def io_in(self): + """Record any incoming data to buffer and parse lines.""" + data = self.sock.recv(1024) + self.recv_buffer += data # XXX limit buffer size? + if data: + return self.parse_lines() + else: + return self.close() + + def io_out(self): + """Send as much of outgoing buffer as possible.""" + if self.send_buffer: + sent = self.sock.send(self.send_buffer) + self.send_buffer = self.send_buffer[sent:] + if not self.send_buffer: + self.clear_send_watch() + return True + + def close(self): + """Close connection.""" + logger.debug('Closing connection from %s', self.addr) + self.clear_send_watch() + self.sock.close() + return False + + def release(self): + """Forget about socket so that other loop can take over FD. + + Note that other code will still need to keep a ref to the socket in + order to prevent GC cleanup closing it. + """ + self.clear_recv_watch() + self.clear_send_watch() + return self.sock + + def send(self, data): + """Add raw data to send to outbound buffer.""" + self.add_send_watch() + self.send_buffer += data # XXX limit buffer size? + + def recv(self, line): + """Recv one and one line of request. Must be sub-classed.""" + raise NotImplementedError + + def parse_lines(self): + """Parse lines by splitting at terminator.""" + while self.terminator in self.recv_buffer: + line, self.recv_buffer = self.recv_buffer.split(self.terminator, 1) + self.recv(line) + return True + +class EchoHandler(BaseHandler): + """Basic handler used for debuging of Listener and Handler code itself.""" + def recv(self, line): + print repr(line) + self.send(line) + +class Listener(object): + """Setup listener and register it with gobject loop.""" + def __init__(self, addr, handler=EchoHandler): + self.handler = handler + self.sock = create_socket() + self.sock.setblocking(0) + self.sock.bind(addr) + self.sock.listen(5) + + gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN, self.handle) + logger.debug('Listening on %s using %s', addr, self.handler) + + def handle(self, fd, flags): + self.handler(self.sock.accept()) + return True +