From 54f09b0157ea9747b8b9bc8b623fa47748b46231 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Wed, 22 Jun 2011 02:49:02 +0200 Subject: [PATCH] Rewrite of client part of listener - takes into account that we will be implementing idle --- mopidy/frontends/mpd/__init__.py | 50 +++---------- mopidy/utils/network.py | 116 +++++++++++++++++++++++++++---- 2 files changed, 114 insertions(+), 52 deletions(-) diff --git a/mopidy/frontends/mpd/__init__.py b/mopidy/frontends/mpd/__init__.py index d0ca761e..3b6b5db1 100644 --- a/mopidy/frontends/mpd/__init__.py +++ b/mopidy/frontends/mpd/__init__.py @@ -1,4 +1,3 @@ -import gobject import logging import sys @@ -9,7 +8,6 @@ from mopidy import settings from mopidy.utils import network from mopidy.frontends.mpd.dispatcher import MpdDispatcher from mopidy.frontends.mpd.protocol import ENCODING, VERSION, LINE_TERMINATOR -from mopidy.utils.log import indent logger = logging.getLogger('mopidy.frontends.mpd') @@ -34,7 +32,7 @@ class MpdFrontend(ThreadingActor, BaseFrontend): port = settings.MPD_SERVER_PORT try: - network.Listener(hostname, port, session=MpdSession) + network.Listener(hostname, port, MpdSession) except IOError, e: logger.error(u'MPD server startup failed: %s', e) sys.exit(1) @@ -42,49 +40,21 @@ class MpdFrontend(ThreadingActor, BaseFrontend): logger.info(u'MPD server running at [%s]:%s', hostname, port) -class MpdSession(ThreadingActor): +class MpdSession(network.LineProtocol): """ The MPD client session. Keeps track of a single client session. Any requests from the client is passed on to the MPD request dispatcher. """ + terminator = LINE_TERMINATOR + encoding = ENCODING + def __init__(self, sock, addr): - self.sock = sock # Prevent premature GC of socket closing it - self.addr = addr - self.channel = gobject.IOChannel(sock.fileno()) - self.dispatcher = MpdDispatcher() + super(MpdSession, self).__init__(sock, addr) + self.dispatcher = MpdDispatcher(self) def on_start(self): - try: - self.send_response([u'OK MPD %s' % VERSION]) - self.request_loop() - except gobject.GError: - self.stop() + self.send_lines([u'OK MPD %s' % VERSION]) - def close(self): - self.channel.close() - - def request_loop(self): - while True: - data = self.channel.readline() - if not data: - return self.close() - request = data.rstrip(LINE_TERMINATOR).decode(ENCODING) - logger.debug(u'Request from [%s]:%s: %s', self.addr[0], - self.addr[1], indent(request)) - response = self.dispatcher.handle_request(request) - self.send_response(response) - - def send_response(self, response): - """ - Format a response from the MPD command handlers and send it to the - client. - """ - if response: - response = LINE_TERMINATOR.join(response) - logger.debug(u'Response to [%s]:%s: %s', self.addr[0], - self.addr[1], indent(response)) - response = u'%s%s' % (response, LINE_TERMINATOR) - data = response.encode(ENCODING) - self.channel.write(data) - self.channel.flush() + def on_line_recieved(self, line): + self.send_lines(self.dispatcher.handle_request(line)) diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index d1536afb..3b597e36 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -3,6 +3,10 @@ import re import socket import gobject +from pykka.actor import ThreadingActor + +from mopidy.utils.log import indent + logger = logging.getLogger('mopidy.utils.server') def _try_ipv6_socket(): @@ -39,19 +43,107 @@ def format_hostname(hostname): class Listener(object): """Setup listener and register it with gobject loop.""" - def __init__(self, host, port, session): - self.session = session - self.sock = create_socket() - self.sock.setblocking(0) - self.sock.bind((host, port)) - self.sock.listen(5) - gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN, self.handle) - logger.debug('Listening on [%s]:%s using %s', host, port, self.session) + def __init__(self, host, port, protcol): + self.protcol = protcol + self.listener = create_socket() + self.listener.setblocking(False) + self.listener.bind((host, port)) + self.listener.listen(1) + + gobject.io_add_watch( + self.listener.fileno(), gobject.IO_IN, self.handle_accept) + logger.debug('Listening on [%s]:%s using %s as protcol handler', + host, port, self.protcol.__name__) + + def handle_accept(self, fd, flags): + sock, addr = self.listener.accept() + sock.setblocking(False) + + actor_ref = self.protcol.start(sock, addr) + gobject.io_add_watch(sock.fileno(), gobject.IO_IN | gobject.IO_ERR | + gobject.IO_HUP, self.handle_client, sock, actor_ref) - def handle(self, fd, flags): - sock, addr = self.sock.accept() - logger.debug('Got connection from %s', addr) - self.session.start(sock, addr) return True + def handle_client(self, fd, flags, sock, actor_ref): + if flags & (gobject.IO_ERR | gobject.IO_HUP): + data = '' + else: + data = sock.recv(1024) + + if not data: + actor_ref.stop() + return False + + actor_ref.send_one_way({'recvieved': data}) + return True + + +class LineProtocol(ThreadingActor): + terminator = '\n' + encoding = 'utf-8' + + def __init__(self, sock, addr): + self.sock = sock + self.host, self.port = addr + self.recv_buffer = '' + + def on_line_recieved(self, line): + raise NotImplemented + + def on_receive(self, message): + if 'recvieved' not in message: + return + + for line in self.parse_lines(message['recvieved']): + line = self.encode(line) + self.log_request(line) + self.on_line_recieved(line) + + def on_stop(self): + try: + self.sock.close() + except socket.error as e: + pass + + def parse_lines(self, new_data=None): + if new_data: + self.recv_buffer += new_data + while self.terminator in self.recv_buffer: + line, self.recv_buffer = self.recv_buffer.split(self.terminator, 1) + yield line + + def log_request(self, request): + logger.debug(u'Request from [%s]:%s: %s', + self.host, self.port, indent(request)) + + def log_response(self, response): + logger.debug(u'Response to [%s]:%s: %s', + self.host, self.port, indent(response)) + + def encode(self, line): + if self.encoding: + return line.encode(self.encoding) + return line + + def decode(self, line): + if self.encoding: + return line.decode(self.encoding) + return line + + def send_lines(self, lines): + if not lines: + return + + data = self.terminator.join(lines) + self.log_response(data) + self.send_raw(self.encode(data + self.terminator)) + + def send_raw(self, data): + try: + sent = self.sock.send(data) + assert len(data) == sent, 'All data was not sent' # FIXME + except socket.error as e: # FIXME + logger.debug('send() failed with: %s', e) + self.stop()