Move recv code to LineProtocol and add source removal

Fixes problem where timed out sockets where not being removed from event loop
causing excess CPU usage.
This commit is contained in:
Thomas Adamcik 2011-07-09 14:28:35 +02:00
parent cdb68d61f5
commit 22ebb1bc29

View File

@ -77,38 +77,7 @@ class Server(object):
pass
return True
sock.setblocking(False)
actor_ref = self.protocol.start(sock, addr)
gobject.io_add_watch(sock.fileno(), gobject.IO_IN | gobject.IO_ERR |
gobject.IO_HUP, self.handle_client, sock, actor_ref)
return True
def handle_client(self, fd, flags, sock, actor_ref):
"""
Read client data when possible.
Returns false when reading failed in order to deregister with the event
loop.
"""
if flags & (gobject.IO_ERR | gobject.IO_HUP):
actor_ref.stop()
return False
try:
data = sock.recv(4096)
except socket.error as e:
if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN):
return True
actor_ref.stop()
return False
if not data:
actor_ref.stop()
return False
actor_ref.send_one_way({'received': data})
self.protocol.start(sock, addr)
return True
@ -131,14 +100,21 @@ class LineProtocol(ThreadingActor):
timeout = 30
def __init__(self, sock, addr):
sock.setblocking(False)
self.sock = sock
self.host, self.port = addr[:2] # IPv6 has larger addr
self.send_lock = threading.Lock()
self.recv_buffer = ''
self.send_buffer = ''
self.terminator_re = re.compile(self.terminator)
self.send_id = None
self.recv_id = None
self.timeout_id = None
self.sock.setblocking(False)
self.enable_recv()
self.enable_timeout()
def on_line_received(self, line):
@ -167,6 +143,8 @@ class LineProtocol(ThreadingActor):
def on_stop(self):
"""Ensure that cleanup when actor stops."""
self.disable_timeout()
self.disable_recv()
self.disable_send()
try:
self.sock.close()
except socket.error:
@ -174,8 +152,33 @@ class LineProtocol(ThreadingActor):
def disable_timeout(self):
"""Deactivate timeout mechanism."""
if self.timeout_id:
if self.timeout_id is not None:
gobject.source_remove(self.timeout_id)
self.timeout_id = None
def disable_recv(self):
"""Deactivate recv mechanism."""
if self.recv_id is not None:
gobject.source_remove(self.recv_id)
self.recv_id = None
def disable_send(self):
"""Deactivate send mechanism."""
if self.send_id:
gobject.source_remove(self.send_id)
self.send_id = None
def enable_recv(self):
"""Reactivate recv mechanism."""
if self.recv_id is None:
self.recv_id = gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN |
gobject.IO_ERR | gobject.IO_HUP, self._recv)
def enable_send(self):
"""Reactivate send mechanism."""
if self.send_id is None:
self.send_id = gobject.io_add_watch(self.sock.fileno(),
gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, self._send)
def enable_timeout(self):
"""Reactivate timeout mechanism."""
@ -274,19 +277,37 @@ class LineProtocol(ThreadingActor):
def send_raw(self, data):
"""Send data to client exactly as is."""
self.send_lock.acquire(True)
should_register_sender = len(self.send_buffer) == 0
self.send_buffer += data
self.send_lock.release()
self.enable_send()
if should_register_sender:
gobject.io_add_watch(self.sock.fileno(), gobject.IO_OUT |
gobject.IO_ERR | gobject.IO_HUP, self._send)
def _send(self, fd, flags):
def _recv(self, fd, flags):
# NOTE: This code is _not_ run in the actor's thread, but in the same
# one as the event loop. If this blocks, rest of gobject code will
# likely be blocked as well...
if flags & (gobject.IO_ERR | gobject.IO_HUP):
actor_ref.stop()
return False
try:
data = self.sock.recv(4096)
except socket.error as e:
if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN):
return True
self.actor_ref.stop()
return False
if not data:
self.actor_ref.stop()
return False
self.actor_ref.send_one_way({'received': data})
return True
def _send(self, fd, flags):
# NOTE: This code is _not_ run in the actor's thread...
# If with can't get the lock, simply try again next time socket is
# ready for sending.
if not self.send_lock.acquire(False):