Switch to thread safe send queue and use event loop to send data

This commit is contained in:
Thomas Adamcik 2011-07-06 00:23:54 +02:00
parent 52087bd5b4
commit a0f6ba7dc4

View File

@ -2,6 +2,7 @@ import logging
import re
import socket
import gobject
import Queue as queue
from pykka.actor import ThreadingActor
@ -115,8 +116,9 @@ class LineProtocol(ThreadingActor):
def __init__(self, sock, addr):
self.sock = sock
self.host, self.port = addr[:2]
self.host, self.port = addr[:2] # IPv6 has larger addr
self.recv_buffer = ''
self.send_queue = queue.Queue()
def on_line_received(self, line):
"""
@ -216,26 +218,31 @@ class LineProtocol(ThreadingActor):
def send_raw(self, data):
"""Send data to client exactly as is."""
start_sender = self.send_queue.empty()
self.send_queue.put(data)
if start_sender:
gobject.io_add_watch(self.sock.fileno(), gobject.IO_OUT |
gobject.IO_ERR | gobject.IO_HUP, self._send)
def _send(self, fd, flags):
# NOTE: This code is _not_ run in the actor's thread, but in the same
# one as the event loop. If this blocks, rest of gobject code will
# likely be blocked as well...
try:
data = self.send_queue.get_nowait()
sent = self.sock.send(data)
# FIXME we are assuming that sock send will not fail as the OS send
# buffer is big enough compared to our need. This can of course
# fail and will be caught and handled fairly poorly with the
# following assert.
#
# Safer, and more complex way of handling this would be to ensure
# that data can be send by putting a data sender in the event loop
# and appending to its buffer. Once the buffer is empty the sender
# must be removed from the loop. This option is doable, but adds
# extra complexity.
#
# The other simpler option would be to try and recall raw_send with
# remaining data. Probably with a decrementing retry counter to
# prevent an inf. loop.
assert len(data) == sent, u'All data was not sent'
except queue.Empty:
return False # No more data to send, remove callback
except socket.error as e:
# FIXME should this be handled in a better manner, for instance
# retry? For instance would block errors and interrupted system call
# errors would warrant a retry.
logger.debug(u'send() failed with: %s', e)
self.stop()
if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
self.send_queue.put(data)
return True
self.actor_ref.stop()
return False
if len(data) != sent: # Retry remaining data
self.send_queue.put(data[sent:])
return not self.send_queue.empty()