From 4cd6f5f66cb5d0a0854438cf054739116b921057 Mon Sep 17 00:00:00 2001 From: Thomas Adamcik Date: Thu, 7 Jul 2011 21:50:03 +0200 Subject: [PATCH] Switch to lock based protection of send buffer, queue use was flawed --- mopidy/utils/network.py | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index 89d573a3..76bebc1d 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -1,8 +1,8 @@ +import gobject import logging import re import socket -import gobject -import Queue as queue +import threading from pykka.actor import ThreadingActor @@ -117,8 +117,9 @@ class LineProtocol(ThreadingActor): def __init__(self, sock, addr): self.sock = sock self.host, self.port = addr[:2] # IPv6 has larger addr + self.send_lock = threading.Lock() self.recv_buffer = '' - self.send_queue = queue.Queue() + self.send_buffer = '' def on_line_received(self, line): """ @@ -218,11 +219,12 @@ class LineProtocol(ThreadingActor): def send_raw(self, data): """Send data to client exactly as is.""" - start_sender = self.send_queue.empty() + self.send_lock.acquire(True) + should_register_sender = len(self.send_buffer) == 0 + self.send_buffer += data + self.send_lock.release() - self.send_queue.put(data) - - if start_sender: + if should_register_sender: gobject.io_add_watch(self.sock.fileno(), gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, self._send) @@ -230,19 +232,20 @@ class LineProtocol(ThreadingActor): # NOTE: This code is _not_ run in the actor's thread, but in the same # one as the event loop. If this blocks, rest of gobject code will # likely be blocked as well... + + # If with can't get the lock, simply try again next time socket is + # ready for sending. + if not self.send_lock.acquire(False): + return True + try: - data = self.send_queue.get_nowait() - sent = self.sock.send(data) - except queue.Empty: - return False # No more data to send, remove callback + sent = self.sock.send(self.send_buffer) + self.send_buffer = self.send_buffer[sent:] + return bool(self.send_buffer) except socket.error as e: if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK): - self.send_queue.put(data) return True self.actor_ref.stop() return False - - if len(data) != sent: # Retry remaining data - self.send_queue.put(data[sent:]) - - return not self.send_queue.empty() + finally: + self.send_lock.release()