Switch to lock based protection of send buffer, queue use was flawed
This commit is contained in:
parent
a0f6ba7dc4
commit
4cd6f5f66c
@ -1,8 +1,8 @@
|
||||
import gobject
|
||||
import logging
|
||||
import re
|
||||
import socket
|
||||
import gobject
|
||||
import Queue as queue
|
||||
import threading
|
||||
|
||||
from pykka.actor import ThreadingActor
|
||||
|
||||
@ -117,8 +117,9 @@ class LineProtocol(ThreadingActor):
|
||||
def __init__(self, sock, addr):
|
||||
self.sock = sock
|
||||
self.host, self.port = addr[:2] # IPv6 has larger addr
|
||||
self.send_lock = threading.Lock()
|
||||
self.recv_buffer = ''
|
||||
self.send_queue = queue.Queue()
|
||||
self.send_buffer = ''
|
||||
|
||||
def on_line_received(self, line):
|
||||
"""
|
||||
@ -218,11 +219,12 @@ class LineProtocol(ThreadingActor):
|
||||
|
||||
def send_raw(self, data):
|
||||
"""Send data to client exactly as is."""
|
||||
start_sender = self.send_queue.empty()
|
||||
self.send_lock.acquire(True)
|
||||
should_register_sender = len(self.send_buffer) == 0
|
||||
self.send_buffer += data
|
||||
self.send_lock.release()
|
||||
|
||||
self.send_queue.put(data)
|
||||
|
||||
if start_sender:
|
||||
if should_register_sender:
|
||||
gobject.io_add_watch(self.sock.fileno(), gobject.IO_OUT |
|
||||
gobject.IO_ERR | gobject.IO_HUP, self._send)
|
||||
|
||||
@ -230,19 +232,20 @@ class LineProtocol(ThreadingActor):
|
||||
# NOTE: This code is _not_ run in the actor's thread, but in the same
|
||||
# one as the event loop. If this blocks, rest of gobject code will
|
||||
# likely be blocked as well...
|
||||
|
||||
# If with can't get the lock, simply try again next time socket is
|
||||
# ready for sending.
|
||||
if not self.send_lock.acquire(False):
|
||||
return True
|
||||
|
||||
try:
|
||||
data = self.send_queue.get_nowait()
|
||||
sent = self.sock.send(data)
|
||||
except queue.Empty:
|
||||
return False # No more data to send, remove callback
|
||||
sent = self.sock.send(self.send_buffer)
|
||||
self.send_buffer = self.send_buffer[sent:]
|
||||
return bool(self.send_buffer)
|
||||
except socket.error as e:
|
||||
if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
|
||||
self.send_queue.put(data)
|
||||
return True
|
||||
self.actor_ref.stop()
|
||||
return False
|
||||
|
||||
if len(data) != sent: # Retry remaining data
|
||||
self.send_queue.put(data[sent:])
|
||||
|
||||
return not self.send_queue.empty()
|
||||
finally:
|
||||
self.send_lock.release()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user