diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index ed9c7161..d949eb40 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -77,38 +77,7 @@ class Server(object): pass return True - sock.setblocking(False) - - actor_ref = self.protocol.start(sock, addr) - gobject.io_add_watch(sock.fileno(), gobject.IO_IN | gobject.IO_ERR | - gobject.IO_HUP, self.handle_client, sock, actor_ref) - - return True - - def handle_client(self, fd, flags, sock, actor_ref): - """ - Read client data when possible. - - Returns false when reading failed in order to deregister with the event - loop. - """ - if flags & (gobject.IO_ERR | gobject.IO_HUP): - actor_ref.stop() - return False - - try: - data = sock.recv(4096) - except socket.error as e: - if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): - return True - actor_ref.stop() - return False - - if not data: - actor_ref.stop() - return False - - actor_ref.send_one_way({'received': data}) + self.protocol.start(sock, addr) return True @@ -131,14 +100,21 @@ class LineProtocol(ThreadingActor): timeout = 30 def __init__(self, sock, addr): + sock.setblocking(False) + self.sock = sock self.host, self.port = addr[:2] # IPv6 has larger addr self.send_lock = threading.Lock() self.recv_buffer = '' self.send_buffer = '' self.terminator_re = re.compile(self.terminator) + self.send_id = None + self.recv_id = None self.timeout_id = None + self.sock.setblocking(False) + + self.enable_recv() self.enable_timeout() def on_line_received(self, line): @@ -167,6 +143,8 @@ class LineProtocol(ThreadingActor): def on_stop(self): """Ensure that cleanup when actor stops.""" self.disable_timeout() + self.disable_recv() + self.disable_send() try: self.sock.close() except socket.error: @@ -174,8 +152,33 @@ class LineProtocol(ThreadingActor): def disable_timeout(self): """Deactivate timeout mechanism.""" - if self.timeout_id: + if self.timeout_id is not None: gobject.source_remove(self.timeout_id) + self.timeout_id = None + + def disable_recv(self): + """Deactivate recv mechanism.""" + if self.recv_id is not None: + gobject.source_remove(self.recv_id) + self.recv_id = None + + def disable_send(self): + """Deactivate send mechanism.""" + if self.send_id: + gobject.source_remove(self.send_id) + self.send_id = None + + def enable_recv(self): + """Reactivate recv mechanism.""" + if self.recv_id is None: + self.recv_id = gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN | + gobject.IO_ERR | gobject.IO_HUP, self._recv) + + def enable_send(self): + """Reactivate send mechanism.""" + if self.send_id is None: + self.send_id = gobject.io_add_watch(self.sock.fileno(), + gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, self._send) def enable_timeout(self): """Reactivate timeout mechanism.""" @@ -274,19 +277,37 @@ class LineProtocol(ThreadingActor): def send_raw(self, data): """Send data to client exactly as is.""" self.send_lock.acquire(True) - should_register_sender = len(self.send_buffer) == 0 self.send_buffer += data self.send_lock.release() + self.enable_send() - if should_register_sender: - gobject.io_add_watch(self.sock.fileno(), gobject.IO_OUT | - gobject.IO_ERR | gobject.IO_HUP, self._send) - - def _send(self, fd, flags): + def _recv(self, fd, flags): # NOTE: This code is _not_ run in the actor's thread, but in the same # one as the event loop. If this blocks, rest of gobject code will # likely be blocked as well... + if flags & (gobject.IO_ERR | gobject.IO_HUP): + actor_ref.stop() + return False + + try: + data = self.sock.recv(4096) + except socket.error as e: + if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): + return True + self.actor_ref.stop() + return False + + if not data: + self.actor_ref.stop() + return False + + self.actor_ref.send_one_way({'received': data}) + return True + + def _send(self, fd, flags): + # NOTE: This code is _not_ run in the actor's thread... + # If with can't get the lock, simply try again next time socket is # ready for sending. if not self.send_lock.acquire(False):