diff --git a/mopidy/frontends/mpd/__init__.py b/mopidy/frontends/mpd/__init__.py index 175aa0ee..b6088a41 100644 --- a/mopidy/frontends/mpd/__init__.py +++ b/mopidy/frontends/mpd/__init__.py @@ -44,4 +44,3 @@ class MpdThread(BaseThread): logger.debug(u'Starting MPD server thread') server = MpdServer() server.start() - asyncore.loop() diff --git a/mopidy/frontends/mpd/server.py b/mopidy/frontends/mpd/server.py index 1e5f5d4a..12e6a92a 100644 --- a/mopidy/frontends/mpd/server.py +++ b/mopidy/frontends/mpd/server.py @@ -3,6 +3,8 @@ import sys import gobject +from pykka.actor import ThreadingActor + from mopidy import settings from mopidy.utils import network from mopidy.frontends.mpd.dispatcher import MpdDispatcher @@ -23,7 +25,7 @@ class MpdServer(object): hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME) port = settings.MPD_SERVER_PORT logger.debug(u'MPD server is binding to [%s]:%s', hostname, port) - network.Listener((hostname, port), handler=MpdHandler) + network.Listener((hostname, port), session=MpdSession) logger.info(u'MPD server running at [%s]:%s', hostname, port) except IOError, e: logger.error(u'MPD server startup failed: %s' % @@ -31,25 +33,33 @@ class MpdServer(object): sys.exit(1) -class MpdHandler(network.BaseHandler): +class MpdSession(ThreadingActor): """ The MPD client session. Keeps track of a single client session. Any requests from the client is passed on to the MPD request dispatcher. """ - terminator = LINE_TERMINATOR - - def __init__(self, (sock, addr)): - super(MpdHandler, self).__init__((sock, addr)) + def __init__(self, sock, addr): + self.sock = sock # Prevent premature GC + self.addr = addr + self.channel = gobject.IOChannel(sock.fileno()) self.dispatcher = MpdDispatcher(session=self) - self.send_response([u'OK MPD %s' % VERSION]) - def recv(self, line): - """Handle the request using the MPD command handlers.""" - request = line.decode(ENCODING) - logger.debug(u'Request from [%s]:%s: %s', self.addr[0], - self.addr[1], indent(request)) - self.send_response(self.dispatcher.handle_request(request)) + def on_start(self): + try: + self.send_response([u'OK MPD %s' % VERSION]) + self.request_loop() + except gobject.GError, e: + self.stop() + + def request_loop(self): + while True: + logger.debug('Trying to readline') + request = self.channel.readline()[:-1].decode(ENCODING) + logger.debug(u'Request from [%s]:%s: %s', self.addr[0], + self.addr[1], indent(request)) + response = self.dispatcher.handle_request(request) + self.send_response(response) def send_response(self, response): """ @@ -62,4 +72,5 @@ class MpdHandler(network.BaseHandler): self.addr[1], indent(response)) response = u'%s%s' % (response, LINE_TERMINATOR) data = response.encode(ENCODING) - self.send(data) + self.channel.write(data) + self.channel.flush() diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index a5680466..df4f9292 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -37,128 +37,21 @@ def format_hostname(hostname): hostname = '::ffff:%s' % hostname return hostname -class BaseHandler(object): - """Buffered lined based client, subclass for use.""" - - #: Line terminator to use in parse_line, can be overridden by subclasses. - terminator = '\n' - - def __init__(self, (sock, addr)): - logger.debug('Established connection from %s', addr) - - self.sock, self.addr = sock, addr - self.receiver = None - self.sender = None - self.recv_buffer = '' - self.send_buffer = '' - - self.sock.setblocking(0) - self.add_recv_watch() - - def add_recv_watch(self): - """Register recv and error handling of socket.""" - if self.receiver is None: - self.receiver = gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN - | gobject.IO_ERR | gobject.IO_HUP, self.handle) - - def clear_recv_watch(self): - if self.receiver is not None: - gobject.source_remove(self.receiver) - self.receiver = None - - def add_send_watch(self): - """Register send handling if it has not already been done.""" - if self.sender is None: - self.sender = gobject.io_add_watch(self.sock.fileno(), - gobject.IO_OUT, self.handle) - - def clear_send_watch(self): - """Remove send watcher if it is set.""" - if self.sender is not None: - gobject.source_remove(self.sender) - self.sender = None - - def handle(self, fd, flags): - """Dispatch based on current flags.""" - if flags & (gobject.IO_ERR | gobject.IO_HUP): - return self.close() - if flags & gobject.IO_IN: - return self.io_in() - if flags & gobject.IO_OUT: - return self.io_out() - logger.error('Unknown flag: %s', flags) - return False - - def io_in(self): - """Record any incoming data to buffer and parse lines.""" - data = self.sock.recv(1024) - self.recv_buffer += data # XXX limit buffer size? - if data: - return self.parse_lines() - else: - return self.close() - - def io_out(self): - """Send as much of outgoing buffer as possible.""" - if self.send_buffer: - sent = self.sock.send(self.send_buffer) - self.send_buffer = self.send_buffer[sent:] - if not self.send_buffer: - self.clear_send_watch() - return True - - def close(self): - """Close connection.""" - logger.debug('Closing connection from %s', self.addr) - self.clear_send_watch() - self.sock.close() - return False - - def release(self): - """Forget about socket so that other loop can take over FD. - - Note that other code will still need to keep a ref to the socket in - order to prevent GC cleanup closing it. - """ - self.clear_recv_watch() - self.clear_send_watch() - return self.sock - - def send(self, data): - """Add raw data to send to outbound buffer.""" - self.add_send_watch() - self.send_buffer += data # XXX limit buffer size? - - def recv(self, line): - """Recv one and one line of request. Must be sub-classed.""" - raise NotImplementedError - - def parse_lines(self): - """Parse lines by splitting at terminator.""" - while self.terminator in self.recv_buffer: - line, self.recv_buffer = self.recv_buffer.split(self.terminator, 1) - self.recv(line) - return True - -class EchoHandler(BaseHandler): - """Basic handler used for debuging of Listener and Handler code itself.""" - def recv(self, line): - print repr(line) - self.send(line) - class Listener(object): """Setup listener and register it with gobject loop.""" - def __init__(self, addr, handler=EchoHandler): - self.handler = handler + def __init__(self, addr, session): + self.session = session self.sock = create_socket() self.sock.setblocking(0) self.sock.bind(addr) self.sock.listen(5) gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN, self.handle) - logger.debug('Listening on %s using %s', addr, self.handler) + logger.debug('Listening on %s using %s', addr, self.session) def handle(self, fd, flags): - self.handler(self.sock.accept()) + sock, addr = self.sock.accept() + logger.debug('Got connection from %s', addr) + self.session.start(sock, addr) return True