diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index 542baaa0..89d573a3 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -2,6 +2,7 @@ import logging import re import socket import gobject +import Queue as queue from pykka.actor import ThreadingActor @@ -115,8 +116,9 @@ class LineProtocol(ThreadingActor): def __init__(self, sock, addr): self.sock = sock - self.host, self.port = addr[:2] + self.host, self.port = addr[:2] # IPv6 has larger addr self.recv_buffer = '' + self.send_queue = queue.Queue() def on_line_received(self, line): """ @@ -216,26 +218,31 @@ class LineProtocol(ThreadingActor): def send_raw(self, data): """Send data to client exactly as is.""" + start_sender = self.send_queue.empty() + + self.send_queue.put(data) + + if start_sender: + gobject.io_add_watch(self.sock.fileno(), gobject.IO_OUT | + gobject.IO_ERR | gobject.IO_HUP, self._send) + + def _send(self, fd, flags): + # NOTE: This code is _not_ run in the actor's thread, but in the same + # one as the event loop. If this blocks, rest of gobject code will + # likely be blocked as well... try: + data = self.send_queue.get_nowait() sent = self.sock.send(data) - # FIXME we are assuming that sock send will not fail as the OS send - # buffer is big enough compared to our need. This can of course - # fail and will be caught and handled fairly poorly with the - # following assert. - # - # Safer, and more complex way of handling this would be to ensure - # that data can be send by putting a data sender in the event loop - # and appending to its buffer. Once the buffer is empty the sender - # must be removed from the loop. This option is doable, but adds - # extra complexity. - # - # The other simpler option would be to try and recall raw_send with - # remaining data. Probably with a decrementing retry counter to - # prevent an inf. loop. - assert len(data) == sent, u'All data was not sent' + except queue.Empty: + return False # No more data to send, remove callback except socket.error as e: - # FIXME should this be handled in a better manner, for instance - # retry? For instance would block errors and interrupted system call - # errors would warrant a retry. - logger.debug(u'send() failed with: %s', e) - self.stop() + if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK): + self.send_queue.put(data) + return True + self.actor_ref.stop() + return False + + if len(data) != sent: # Retry remaining data + self.send_queue.put(data[sent:]) + + return not self.send_queue.empty()