Rewrite of client part of listener - takes into account that we will be implementing idle

This commit is contained in:
Thomas Adamcik 2011-06-22 02:49:02 +02:00
parent 74aa96b300
commit 54f09b0157
2 changed files with 114 additions and 52 deletions

View File

@ -1,4 +1,3 @@
import gobject
import logging import logging
import sys import sys
@ -9,7 +8,6 @@ from mopidy import settings
from mopidy.utils import network from mopidy.utils import network
from mopidy.frontends.mpd.dispatcher import MpdDispatcher from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.protocol import ENCODING, VERSION, LINE_TERMINATOR from mopidy.frontends.mpd.protocol import ENCODING, VERSION, LINE_TERMINATOR
from mopidy.utils.log import indent
logger = logging.getLogger('mopidy.frontends.mpd') logger = logging.getLogger('mopidy.frontends.mpd')
@ -34,7 +32,7 @@ class MpdFrontend(ThreadingActor, BaseFrontend):
port = settings.MPD_SERVER_PORT port = settings.MPD_SERVER_PORT
try: try:
network.Listener(hostname, port, session=MpdSession) network.Listener(hostname, port, MpdSession)
except IOError, e: except IOError, e:
logger.error(u'MPD server startup failed: %s', e) logger.error(u'MPD server startup failed: %s', e)
sys.exit(1) sys.exit(1)
@ -42,49 +40,21 @@ class MpdFrontend(ThreadingActor, BaseFrontend):
logger.info(u'MPD server running at [%s]:%s', hostname, port) logger.info(u'MPD server running at [%s]:%s', hostname, port)
class MpdSession(ThreadingActor): class MpdSession(network.LineProtocol):
""" """
The MPD client session. Keeps track of a single client session. Any The MPD client session. Keeps track of a single client session. Any
requests from the client is passed on to the MPD request dispatcher. requests from the client is passed on to the MPD request dispatcher.
""" """
terminator = LINE_TERMINATOR
encoding = ENCODING
def __init__(self, sock, addr): def __init__(self, sock, addr):
self.sock = sock # Prevent premature GC of socket closing it super(MpdSession, self).__init__(sock, addr)
self.addr = addr self.dispatcher = MpdDispatcher(self)
self.channel = gobject.IOChannel(sock.fileno())
self.dispatcher = MpdDispatcher()
def on_start(self): def on_start(self):
try: self.send_lines([u'OK MPD %s' % VERSION])
self.send_response([u'OK MPD %s' % VERSION])
self.request_loop()
except gobject.GError:
self.stop()
def close(self): def on_line_recieved(self, line):
self.channel.close() self.send_lines(self.dispatcher.handle_request(line))
def request_loop(self):
while True:
data = self.channel.readline()
if not data:
return self.close()
request = data.rstrip(LINE_TERMINATOR).decode(ENCODING)
logger.debug(u'Request from [%s]:%s: %s', self.addr[0],
self.addr[1], indent(request))
response = self.dispatcher.handle_request(request)
self.send_response(response)
def send_response(self, response):
"""
Format a response from the MPD command handlers and send it to the
client.
"""
if response:
response = LINE_TERMINATOR.join(response)
logger.debug(u'Response to [%s]:%s: %s', self.addr[0],
self.addr[1], indent(response))
response = u'%s%s' % (response, LINE_TERMINATOR)
data = response.encode(ENCODING)
self.channel.write(data)
self.channel.flush()

View File

@ -3,6 +3,10 @@ import re
import socket import socket
import gobject import gobject
from pykka.actor import ThreadingActor
from mopidy.utils.log import indent
logger = logging.getLogger('mopidy.utils.server') logger = logging.getLogger('mopidy.utils.server')
def _try_ipv6_socket(): def _try_ipv6_socket():
@ -39,19 +43,107 @@ def format_hostname(hostname):
class Listener(object): class Listener(object):
"""Setup listener and register it with gobject loop.""" """Setup listener and register it with gobject loop."""
def __init__(self, host, port, session):
self.session = session
self.sock = create_socket()
self.sock.setblocking(0)
self.sock.bind((host, port))
self.sock.listen(5)
gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN, self.handle) def __init__(self, host, port, protcol):
logger.debug('Listening on [%s]:%s using %s', host, port, self.session) self.protcol = protcol
self.listener = create_socket()
self.listener.setblocking(False)
self.listener.bind((host, port))
self.listener.listen(1)
gobject.io_add_watch(
self.listener.fileno(), gobject.IO_IN, self.handle_accept)
logger.debug('Listening on [%s]:%s using %s as protcol handler',
host, port, self.protcol.__name__)
def handle_accept(self, fd, flags):
sock, addr = self.listener.accept()
sock.setblocking(False)
actor_ref = self.protcol.start(sock, addr)
gobject.io_add_watch(sock.fileno(), gobject.IO_IN | gobject.IO_ERR |
gobject.IO_HUP, self.handle_client, sock, actor_ref)
def handle(self, fd, flags):
sock, addr = self.sock.accept()
logger.debug('Got connection from %s', addr)
self.session.start(sock, addr)
return True return True
def handle_client(self, fd, flags, sock, actor_ref):
if flags & (gobject.IO_ERR | gobject.IO_HUP):
data = ''
else:
data = sock.recv(1024)
if not data:
actor_ref.stop()
return False
actor_ref.send_one_way({'recvieved': data})
return True
class LineProtocol(ThreadingActor):
terminator = '\n'
encoding = 'utf-8'
def __init__(self, sock, addr):
self.sock = sock
self.host, self.port = addr
self.recv_buffer = ''
def on_line_recieved(self, line):
raise NotImplemented
def on_receive(self, message):
if 'recvieved' not in message:
return
for line in self.parse_lines(message['recvieved']):
line = self.encode(line)
self.log_request(line)
self.on_line_recieved(line)
def on_stop(self):
try:
self.sock.close()
except socket.error as e:
pass
def parse_lines(self, new_data=None):
if new_data:
self.recv_buffer += new_data
while self.terminator in self.recv_buffer:
line, self.recv_buffer = self.recv_buffer.split(self.terminator, 1)
yield line
def log_request(self, request):
logger.debug(u'Request from [%s]:%s: %s',
self.host, self.port, indent(request))
def log_response(self, response):
logger.debug(u'Response to [%s]:%s: %s',
self.host, self.port, indent(response))
def encode(self, line):
if self.encoding:
return line.encode(self.encoding)
return line
def decode(self, line):
if self.encoding:
return line.decode(self.encoding)
return line
def send_lines(self, lines):
if not lines:
return
data = self.terminator.join(lines)
self.log_response(data)
self.send_raw(self.encode(data + self.terminator))
def send_raw(self, data):
try:
sent = self.sock.send(data)
assert len(data) == sent, 'All data was not sent' # FIXME
except socket.error as e: # FIXME
logger.debug('send() failed with: %s', e)
self.stop()