Get rid of custom async client code in favour of blocking IOChannel in ThreadingActors

This commit is contained in:
Thomas Adamcik 2011-06-17 00:01:55 +02:00
parent b8ae79038f
commit 9df16e0716
3 changed files with 31 additions and 128 deletions

View File

@ -44,4 +44,3 @@ class MpdThread(BaseThread):
logger.debug(u'Starting MPD server thread')
server = MpdServer()
server.start()
asyncore.loop()

View File

@ -3,6 +3,8 @@ import sys
import gobject
from pykka.actor import ThreadingActor
from mopidy import settings
from mopidy.utils import network
from mopidy.frontends.mpd.dispatcher import MpdDispatcher
@ -23,7 +25,7 @@ class MpdServer(object):
hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME)
port = settings.MPD_SERVER_PORT
logger.debug(u'MPD server is binding to [%s]:%s', hostname, port)
network.Listener((hostname, port), handler=MpdHandler)
network.Listener((hostname, port), session=MpdSession)
logger.info(u'MPD server running at [%s]:%s', hostname, port)
except IOError, e:
logger.error(u'MPD server startup failed: %s' %
@ -31,25 +33,33 @@ class MpdServer(object):
sys.exit(1)
class MpdHandler(network.BaseHandler):
class MpdSession(ThreadingActor):
"""
The MPD client session. Keeps track of a single client session. Any
requests from the client is passed on to the MPD request dispatcher.
"""
terminator = LINE_TERMINATOR
def __init__(self, (sock, addr)):
super(MpdHandler, self).__init__((sock, addr))
def __init__(self, sock, addr):
self.sock = sock # Prevent premature GC
self.addr = addr
self.channel = gobject.IOChannel(sock.fileno())
self.dispatcher = MpdDispatcher(session=self)
self.send_response([u'OK MPD %s' % VERSION])
def recv(self, line):
"""Handle the request using the MPD command handlers."""
request = line.decode(ENCODING)
logger.debug(u'Request from [%s]:%s: %s', self.addr[0],
self.addr[1], indent(request))
self.send_response(self.dispatcher.handle_request(request))
def on_start(self):
try:
self.send_response([u'OK MPD %s' % VERSION])
self.request_loop()
except gobject.GError, e:
self.stop()
def request_loop(self):
while True:
logger.debug('Trying to readline')
request = self.channel.readline()[:-1].decode(ENCODING)
logger.debug(u'Request from [%s]:%s: %s', self.addr[0],
self.addr[1], indent(request))
response = self.dispatcher.handle_request(request)
self.send_response(response)
def send_response(self, response):
"""
@ -62,4 +72,5 @@ class MpdHandler(network.BaseHandler):
self.addr[1], indent(response))
response = u'%s%s' % (response, LINE_TERMINATOR)
data = response.encode(ENCODING)
self.send(data)
self.channel.write(data)
self.channel.flush()

View File

@ -37,128 +37,21 @@ def format_hostname(hostname):
hostname = '::ffff:%s' % hostname
return hostname
class BaseHandler(object):
"""Buffered lined based client, subclass for use."""
#: Line terminator to use in parse_line, can be overridden by subclasses.
terminator = '\n'
def __init__(self, (sock, addr)):
logger.debug('Established connection from %s', addr)
self.sock, self.addr = sock, addr
self.receiver = None
self.sender = None
self.recv_buffer = ''
self.send_buffer = ''
self.sock.setblocking(0)
self.add_recv_watch()
def add_recv_watch(self):
"""Register recv and error handling of socket."""
if self.receiver is None:
self.receiver = gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN
| gobject.IO_ERR | gobject.IO_HUP, self.handle)
def clear_recv_watch(self):
if self.receiver is not None:
gobject.source_remove(self.receiver)
self.receiver = None
def add_send_watch(self):
"""Register send handling if it has not already been done."""
if self.sender is None:
self.sender = gobject.io_add_watch(self.sock.fileno(),
gobject.IO_OUT, self.handle)
def clear_send_watch(self):
"""Remove send watcher if it is set."""
if self.sender is not None:
gobject.source_remove(self.sender)
self.sender = None
def handle(self, fd, flags):
"""Dispatch based on current flags."""
if flags & (gobject.IO_ERR | gobject.IO_HUP):
return self.close()
if flags & gobject.IO_IN:
return self.io_in()
if flags & gobject.IO_OUT:
return self.io_out()
logger.error('Unknown flag: %s', flags)
return False
def io_in(self):
"""Record any incoming data to buffer and parse lines."""
data = self.sock.recv(1024)
self.recv_buffer += data # XXX limit buffer size?
if data:
return self.parse_lines()
else:
return self.close()
def io_out(self):
"""Send as much of outgoing buffer as possible."""
if self.send_buffer:
sent = self.sock.send(self.send_buffer)
self.send_buffer = self.send_buffer[sent:]
if not self.send_buffer:
self.clear_send_watch()
return True
def close(self):
"""Close connection."""
logger.debug('Closing connection from %s', self.addr)
self.clear_send_watch()
self.sock.close()
return False
def release(self):
"""Forget about socket so that other loop can take over FD.
Note that other code will still need to keep a ref to the socket in
order to prevent GC cleanup closing it.
"""
self.clear_recv_watch()
self.clear_send_watch()
return self.sock
def send(self, data):
"""Add raw data to send to outbound buffer."""
self.add_send_watch()
self.send_buffer += data # XXX limit buffer size?
def recv(self, line):
"""Recv one and one line of request. Must be sub-classed."""
raise NotImplementedError
def parse_lines(self):
"""Parse lines by splitting at terminator."""
while self.terminator in self.recv_buffer:
line, self.recv_buffer = self.recv_buffer.split(self.terminator, 1)
self.recv(line)
return True
class EchoHandler(BaseHandler):
"""Basic handler used for debuging of Listener and Handler code itself."""
def recv(self, line):
print repr(line)
self.send(line)
class Listener(object):
"""Setup listener and register it with gobject loop."""
def __init__(self, addr, handler=EchoHandler):
self.handler = handler
def __init__(self, addr, session):
self.session = session
self.sock = create_socket()
self.sock.setblocking(0)
self.sock.bind(addr)
self.sock.listen(5)
gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN, self.handle)
logger.debug('Listening on %s using %s', addr, self.handler)
logger.debug('Listening on %s using %s', addr, self.session)
def handle(self, fd, flags):
self.handler(self.sock.accept())
sock, addr = self.sock.accept()
logger.debug('Got connection from %s', addr)
self.session.start(sock, addr)
return True