Add timeout support to LineProtocol

This commit is contained in:
Thomas Adamcik 2011-07-08 00:28:01 +02:00
parent 63244b9af8
commit 7f77fe38d5

View File

@ -126,6 +126,10 @@ class LineProtocol(ThreadingActor):
#: What encoding to expect incomming data to be in, can be :class:`None`.
encoding = 'utf-8'
#: How long to wait before disconnecting client due to inactivity in
#: milliseconds.
timeout = 30000
def __init__(self, sock, addr):
self.sock = sock
self.host, self.port = addr[:2] # IPv6 has larger addr
@ -133,6 +137,9 @@ class LineProtocol(ThreadingActor):
self.recv_buffer = ''
self.send_buffer = ''
self.terminator_re = re.compile(self.terminator)
self.timeout_id = None
self.enable_timeout()
def on_line_received(self, line):
"""
@ -147,6 +154,7 @@ class LineProtocol(ThreadingActor):
if 'received' not in message:
return
self.disable_timeout()
self.log_raw_data(message['received'])
for line in self.parse_lines(message['received']):
@ -154,13 +162,26 @@ class LineProtocol(ThreadingActor):
self.log_request(line)
self.on_line_received(line)
self.enable_timeout()
def on_stop(self):
"""Ensure that socket is closed when actor stops."""
"""Ensure that cleanup when actor stops."""
self.disable_timeout()
try:
self.sock.close()
except socket.error:
pass
def disable_timeout(self):
"""Deactivate timeout mechanism."""
if self.timeout_id:
gobject.source_remove(self.timeout_id)
def enable_timeout(self):
"""Reactivate timeout mechanism."""
self.disable_timeout()
self.timeout_id = gobject.timeout_add(self.timeout, self._timeout)
def parse_lines(self, new_data=None):
"""Consume new data and yield any lines found."""
if new_data:
@ -206,6 +227,15 @@ class LineProtocol(ThreadingActor):
logger.warning(u'Problem with connection to [%s]:%s in %s: %s',
self.host, self.port, self.actor_urn, error)
def log_timeout(self):
"""
Log timeout for debug purposes.
Can be overridden by subclasses to change logging behaviour.
"""
logger.debug(u'Closing connection to [%s]:%s in %s due to timeout.',
self.host, self.port, self.actor_urn)
def encode(self, line):
"""
Handle encoding of line.
@ -273,3 +303,9 @@ class LineProtocol(ThreadingActor):
return False
finally:
self.send_lock.release()
def _timeout(self):
# NOTE: This code is _not_ run in the actor's thread...
self.log_timeout()
self.actor_ref.stop()
return False