Switch to async globject based loop

This commit is contained in:
Thomas Adamcik 2011-06-16 23:17:37 +02:00
parent 49f39977ec
commit 24dbba2fa9
3 changed files with 167 additions and 71 deletions

View File

@ -1,14 +1,17 @@
import asyncore
import logging import logging
import sys import sys
import gobject
from mopidy import settings from mopidy import settings
from mopidy.utils import network from mopidy.utils import network
from .session import MpdSession from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR, VERSION
from mopidy.utils.log import indent
logger = logging.getLogger('mopidy.frontends.mpd.server') logger = logging.getLogger('mopidy.frontends.mpd.server')
class MpdServer(asyncore.dispatcher): class MpdServer(object):
""" """
The MPD server. Creates a :class:`mopidy.frontends.mpd.session.MpdSession` The MPD server. Creates a :class:`mopidy.frontends.mpd.session.MpdSession`
for each client connection. for each client connection.
@ -17,22 +20,46 @@ class MpdServer(asyncore.dispatcher):
def start(self): def start(self):
"""Start MPD server.""" """Start MPD server."""
try: try:
self.set_socket(network.create_socket())
self.set_reuse_addr()
hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME) hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME)
port = settings.MPD_SERVER_PORT port = settings.MPD_SERVER_PORT
logger.debug(u'MPD server is binding to [%s]:%s', hostname, port) logger.debug(u'MPD server is binding to [%s]:%s', hostname, port)
self.bind((hostname, port)) network.Listener((hostname, port), handler=MpdHandler)
self.listen(1)
logger.info(u'MPD server running at [%s]:%s', hostname, port) logger.info(u'MPD server running at [%s]:%s', hostname, port)
except IOError, e: except IOError, e:
logger.error(u'MPD server startup failed: %s' % logger.error(u'MPD server startup failed: %s' %
str(e).decode('utf-8')) str(e).decode('utf-8'))
sys.exit(1) sys.exit(1)
def handle_accept(self):
"""Called by asyncore when a new client connects.""" class MpdHandler(network.BaseHandler):
(client_socket, client_socket_address) = self.accept() """
logger.info(u'MPD client connection from [%s]:%s', The MPD client session. Keeps track of a single client session. Any
client_socket_address[0], client_socket_address[1]) requests from the client is passed on to the MPD request dispatcher.
MpdSession(self, client_socket, client_socket_address) """
terminator = LINE_TERMINATOR
def __init__(self, (sock, addr)):
super(MpdHandler, self).__init__((sock, addr))
self.dispatcher = MpdDispatcher(session=self)
self.send_response([u'OK MPD %s' % VERSION])
def recv(self, line):
"""Handle the request using the MPD command handlers."""
request = line.decode(ENCODING)
logger.debug(u'Request from [%s]:%s: %s', self.addr[0],
self.addr[1], indent(request))
self.send_response(self.dispatcher.handle_request(request))
def send_response(self, response):
"""
Format a response from the MPD command handlers and send it to the
client.
"""
if response:
response = LINE_TERMINATOR.join(response)
logger.debug(u'Response to [%s]:%s: %s', self.addr[0],
self.addr[1], indent(response))
response = u'%s%s' % (response, LINE_TERMINATOR)
data = response.encode(ENCODING)
self.send(data)

View File

@ -1,58 +0,0 @@
import asynchat
import logging
from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR, VERSION
from mopidy.utils.log import indent
logger = logging.getLogger('mopidy.frontends.mpd.session')
class MpdSession(asynchat.async_chat):
"""
The MPD client session. Keeps track of a single client session. Any
requests from the client is passed on to the MPD request dispatcher.
"""
def __init__(self, server, client_socket, client_socket_address):
asynchat.async_chat.__init__(self, sock=client_socket)
self.server = server
self.client_address = client_socket_address[0]
self.client_port = client_socket_address[1]
self.input_buffer = []
self.authenticated = False
self.set_terminator(LINE_TERMINATOR.encode(ENCODING))
self.dispatcher = MpdDispatcher(session=self)
self.send_response([u'OK MPD %s' % VERSION])
def collect_incoming_data(self, data):
"""Called by asynchat when new data arrives."""
self.input_buffer.append(data)
def found_terminator(self):
"""Called by asynchat when a terminator is found in incoming data."""
data = ''.join(self.input_buffer).strip()
self.input_buffer = []
try:
self.send_response(self.handle_request(data))
except UnicodeDecodeError as e:
logger.warning(u'Received invalid data: %s', e)
def handle_request(self, request):
"""Handle the request using the MPD command handlers."""
request = request.decode(ENCODING)
logger.debug(u'Request from [%s]:%s: %s', self.client_address,
self.client_port, indent(request))
return self.dispatcher.handle_request(request)
def send_response(self, response):
"""
Format a response from the MPD command handlers and send it to the
client.
"""
if response:
response = LINE_TERMINATOR.join(response)
logger.debug(u'Response to [%s]:%s: %s', self.client_address,
self.client_port, indent(response))
response = u'%s%s' % (response, LINE_TERMINATOR)
data = response.encode(ENCODING)
self.push(data)

View File

@ -1,6 +1,7 @@
import logging import logging
import re import re
import socket import socket
import gobject
logger = logging.getLogger('mopidy.utils.server') logger = logging.getLogger('mopidy.utils.server')
@ -35,3 +36,129 @@ def format_hostname(hostname):
if (has_ipv6 and re.match('\d+.\d+.\d+.\d+', hostname) is not None): if (has_ipv6 and re.match('\d+.\d+.\d+.\d+', hostname) is not None):
hostname = '::ffff:%s' % hostname hostname = '::ffff:%s' % hostname
return hostname return hostname
class BaseHandler(object):
"""Buffered lined based client, subclass for use."""
#: Line terminator to use in parse_line, can be overridden by subclasses.
terminator = '\n'
def __init__(self, (sock, addr)):
logger.debug('Established connection from %s', addr)
self.sock, self.addr = sock, addr
self.receiver = None
self.sender = None
self.recv_buffer = ''
self.send_buffer = ''
self.sock.setblocking(0)
self.add_recv_watch()
def add_recv_watch(self):
"""Register recv and error handling of socket."""
if self.receiver is None:
self.receiver = gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN
| gobject.IO_ERR | gobject.IO_HUP, self.handle)
def clear_recv_watch(self):
if self.receiver is not None:
gobject.source_remove(self.receiver)
self.receiver = None
def add_send_watch(self):
"""Register send handling if it has not already been done."""
if self.sender is None:
self.sender = gobject.io_add_watch(self.sock.fileno(),
gobject.IO_OUT, self.handle)
def clear_send_watch(self):
"""Remove send watcher if it is set."""
if self.sender is not None:
gobject.source_remove(self.sender)
self.sender = None
def handle(self, fd, flags):
"""Dispatch based on current flags."""
if flags & (gobject.IO_ERR | gobject.IO_HUP):
return self.close()
if flags & gobject.IO_IN:
return self.io_in()
if flags & gobject.IO_OUT:
return self.io_out()
logger.error('Unknown flag: %s', flags)
return False
def io_in(self):
"""Record any incoming data to buffer and parse lines."""
data = self.sock.recv(1024)
self.recv_buffer += data # XXX limit buffer size?
if data:
return self.parse_lines()
else:
return self.close()
def io_out(self):
"""Send as much of outgoing buffer as possible."""
if self.send_buffer:
sent = self.sock.send(self.send_buffer)
self.send_buffer = self.send_buffer[sent:]
if not self.send_buffer:
self.clear_send_watch()
return True
def close(self):
"""Close connection."""
logger.debug('Closing connection from %s', self.addr)
self.clear_send_watch()
self.sock.close()
return False
def release(self):
"""Forget about socket so that other loop can take over FD.
Note that other code will still need to keep a ref to the socket in
order to prevent GC cleanup closing it.
"""
self.clear_recv_watch()
self.clear_send_watch()
return self.sock
def send(self, data):
"""Add raw data to send to outbound buffer."""
self.add_send_watch()
self.send_buffer += data # XXX limit buffer size?
def recv(self, line):
"""Recv one and one line of request. Must be sub-classed."""
raise NotImplementedError
def parse_lines(self):
"""Parse lines by splitting at terminator."""
while self.terminator in self.recv_buffer:
line, self.recv_buffer = self.recv_buffer.split(self.terminator, 1)
self.recv(line)
return True
class EchoHandler(BaseHandler):
"""Basic handler used for debuging of Listener and Handler code itself."""
def recv(self, line):
print repr(line)
self.send(line)
class Listener(object):
"""Setup listener and register it with gobject loop."""
def __init__(self, addr, handler=EchoHandler):
self.handler = handler
self.sock = create_socket()
self.sock.setblocking(0)
self.sock.bind(addr)
self.sock.listen(5)
gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN, self.handle)
logger.debug('Listening on %s using %s', addr, self.handler)
def handle(self, fd, flags):
self.handler(self.sock.accept())
return True