diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index 9280e772..6d6c9472 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -109,88 +109,89 @@ class Server(object): Connection(self.protocol, sock, addr, self.timeout) class Connection(object): + # NOTE: the callback code is _not_ run in the actor's thread, but in the + # same one as the event loop. If code in the callbacks blocks, the rest of + # gobject code will likely be blocked as well... + def __init__(self, protocol, sock, addr, timeout): sock.setblocking(False) self.host, self.port = addr[:2] # IPv6 has larger addr - self._sock = sock - self._protocol = protocol - self._timeout_time = timeout + self.sock = sock + self.protocol = protocol + self.timeout = timeout - self._send_lock = threading.Lock() - self._send_buffer = '' + self.send_lock = threading.Lock() + self.send_buffer = '' - self._recv_id = None - self._send_id = None - self._timeout_id = None + self.recv_id = None + self.send_id = None + self.timeout_id = None - self._actor_ref = self._protocol.start(self) + self.actor_ref = self.protocol.start(self) - self._enable_recv() + self.enable_recv() self.enable_timeout() def stop(self): - self._actor_ref.stop() + self.actor_ref.stop() self.disable_timeout() - self._disable_recv() - self._disable_send() + self.disable_recv() + self.disable_send() try: - self._sock.close() + self.sock.close() except socket.error: pass return False def send(self, data): """Send data to client exactly as is.""" - self._send_lock.acquire(True) - self._send_buffer += data - self._send_lock.release() - self._enable_send() + self.send_lock.acquire(True) + self.send_buffer += data + self.send_lock.release() + self.enable_send() def enable_timeout(self): """Reactivate timeout mechanism.""" self.disable_timeout() - if self._timeout_time > 0: - self._timeout_id = gobject.timeout_add_seconds( - self._timeout_time, self._timeout) + if self.timeout > 0: + self.timeout_id = gobject.timeout_add_seconds( + self.timeout, self.timeout_callback) def disable_timeout(self): """Deactivate timeout mechanism.""" - if self._timeout_id is not None: - gobject.source_remove(self._timeout_id) - self._timeout_id = None + if self.timeout_id is not None: + gobject.source_remove(self.timeout_id) + self.timeout_id = None - def _enable_recv(self): - if self._recv_id is None: - self._recv_id = gobject.io_add_watch(self._sock.fileno(), - gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP, self._recv) + def enable_recv(self): + if self.recv_id is None: + self.recv_id = gobject.io_add_watch(self.sock.fileno(), + gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP, self.recv_callback) - def _disable_recv(self): - if self._recv_id is not None: - gobject.source_remove(self._recv_id) - self._recv_id = None + def disable_recv(self): + if self.recv_id is not None: + gobject.source_remove(self.recv_id) + self.recv_id = None - def _enable_send(self): - if self._send_id is None: - self._send_id = gobject.io_add_watch(self._sock.fileno(), - gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, self._send) + def enable_send(self): + if self.send_id is None: + self.send_id = gobject.io_add_watch(self.sock.fileno(), + gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, + self.send_callback) - def _disable_send(self): - if self._send_id: - gobject.source_remove(self._send_id) - self._send_id = None - - def _recv(self, fd, flags): - # NOTE: This code is _not_ run in the actor's thread, but in the same - # one as the event loop. If this blocks, rest of gobject code will - # likely be blocked as well... + def disable_send(self): + if self.send_id is not None: + gobject.source_remove(self.send_id) + self.send_id = None + def recv_callback(self, fd, flags): if flags & (gobject.IO_ERR | gobject.IO_HUP): return self.stop() try: - data = self._sock.recv(4096) + data = self.sock.recv(4096) except socket.error as e: if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): return True @@ -199,33 +200,30 @@ class Connection(object): if not data: return self.stop() - self._actor_ref.send_one_way({'received': data}) + self.actor_ref.send_one_way({'received': data}) return True - def _send(self, fd, flags): - # NOTE: This code is _not_ run in the actor's thread... - + def send_callback(self, fd, flags): # If with can't get the lock, simply try again next time socket is # ready for sending. - if not self._send_lock.acquire(False): + if not self.send_lock.acquire(False): return True try: - sent = self._sock.send(self._send_buffer) - self._send_buffer = self._send_buffer[sent:] - if not self._send_buffer: - self._disable_send() + sent = self.sock.send(self.send_buffer) + self.send_buffer = self.send_buffer[sent:] + if not self.send_buffer: + self.disable_send() except socket.error as e: if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK): #self.log_error(e) # FIXME log error return self.stop() finally: - self._send_lock.release() + self.send_lock.release() return True - def _timeout(self): - # NOTE: This code is _not_ run in the actor's thread... + def timeout_callback(self): #self.log_timeout() # FIXME log this return self.stop()