diff --git a/mopidy/frontends/mpd/__init__.py b/mopidy/frontends/mpd/__init__.py index 8dbbf3db..e0ad28fc 100644 --- a/mopidy/frontends/mpd/__init__.py +++ b/mopidy/frontends/mpd/__init__.py @@ -49,8 +49,8 @@ class MpdSession(network.LineProtocol): terminator = protocol.LINE_TERMINATOR encoding = protocol.ENCODING - def __init__(self, sock, addr): - super(MpdSession, self).__init__(sock, addr) + def __init__(self, client): + super(MpdSession, self).__init__(client) self.dispatcher = dispatcher.MpdDispatcher(self) def on_start(self): diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index d949eb40..15ddb98e 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -47,9 +47,11 @@ def format_hostname(hostname): class Server(object): """Setup listener and register it with gobject's event loop.""" - def __init__(self, host, port, protocol, max_connections=15): + def __init__(self, host, port, protocol, max_connections=5, timeout=30): self.protocol = protocol self.max_connections = max_connections + self.timeout = timeout + self.listener = create_socket() self.listener.setblocking(False) self.listener.bind((host, port)) @@ -57,6 +59,7 @@ class Server(object): gobject.io_add_watch( self.listener.fileno(), gobject.IO_IN, self.handle_accept) + logger.debug(u'Listening on [%s]:%s using %s as protocol handler', host, port, self.protocol.__name__) @@ -77,10 +80,135 @@ class Server(object): pass return True - self.protocol.start(sock, addr) + client = Client(self.protocol, sock, addr, self.timeout) + client.start() + return True +class Client(object): + def __init__(self, protocol, sock, addr, timeout): + sock.setblocking(False) + + self._sock = sock + self.host, self.port = addr[:2] # IPv6 has larger addr + self._protocol = protocol + self._timeout_time = timeout + + self._send_lock = threading.Lock() + self._send_buffer = '' + + self._actor_ref = None + + self._recv_id = None + self._send_id = None + self._timeout_id = None + + def start(self): + self._actor_ref = self._protocol.start(self) + self._enable_recv() + self.enable_timeout() + + def stop(self): + self._actor_ref.stop() + self.disable_timeout() + self._disable_recv() + self._disable_send() + try: + self._sock.close() + except socket.error: + pass + return False + + def send(self, data): + """Send data to client exactly as is.""" + self._send_lock.acquire(True) + self._send_buffer += data + self._send_lock.release() + self._enable_send() + + def enable_timeout(self): + """Reactivate timeout mechanism.""" + self.disable_timeout() + if self._timeout_time > 0: + self._timeout_id = gobject.timeout_add_seconds( + self._timeout_time, self._timeout) + + def disable_timeout(self): + """Deactivate timeout mechanism.""" + if self._timeout_id is not None: + gobject.source_remove(self._timeout_id) + self._timeout_id = None + + def _enable_recv(self): + if self._recv_id is None: + self._recv_id = gobject.io_add_watch(self._sock.fileno(), + gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP, self._recv) + + def _disable_recv(self): + if self._recv_id is not None: + gobject.source_remove(self._recv_id) + self._recv_id = None + + def _enable_send(self): + if self._send_id is None: + self._send_id = gobject.io_add_watch(self._sock.fileno(), + gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, self._send) + + def _disable_send(self): + if self._send_id: + gobject.source_remove(self._send_id) + self._send_id = None + + def _recv(self, fd, flags): + # NOTE: This code is _not_ run in the actor's thread, but in the same + # one as the event loop. If this blocks, rest of gobject code will + # likely be blocked as well... + + if flags & (gobject.IO_ERR | gobject.IO_HUP): + return self.stop() + + try: + data = self._sock.recv(4096) + except socket.error as e: + if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): + return True + return self.stop() + + if not data: + return self.stop() + + self._actor_ref.send_one_way({'received': data}) + return True + + def _send(self, fd, flags): + # NOTE: This code is _not_ run in the actor's thread... + + # If with can't get the lock, simply try again next time socket is + # ready for sending. + if not self._send_lock.acquire(False): + return True + + try: + sent = self._sock.send(self._send_buffer) + self._send_buffer = self._send_buffer[sent:] + if not self._send_buffer: + self._disable_send() + except socket.error as e: + if e.errno not in (errno.EAGAIN, errno.EWOULDBLOCK): + #self.log_error(e) # FIXME log error + return self.stop() + finally: + self._send_lock.release() + + return True + + def _timeout(self): + # NOTE: This code is _not_ run in the actor's thread... + #self.log_timeout() # FIXME log this + return self.stop() + + class LineProtocol(ThreadingActor): """ Base class for handling line based protocols. @@ -95,27 +223,11 @@ class LineProtocol(ThreadingActor): #: What encoding to expect incomming data to be in, can be :class:`None`. encoding = 'utf-8' - #: How long to wait before disconnecting client due to inactivity in - #: seconds. - timeout = 30 + def __init__(self, client): + self.client = client - def __init__(self, sock, addr): - sock.setblocking(False) - - self.sock = sock - self.host, self.port = addr[:2] # IPv6 has larger addr - self.send_lock = threading.Lock() self.recv_buffer = '' - self.send_buffer = '' self.terminator_re = re.compile(self.terminator) - self.send_id = None - self.recv_id = None - self.timeout_id = None - - self.sock.setblocking(False) - - self.enable_recv() - self.enable_timeout() def on_line_received(self, line): """ @@ -130,7 +242,7 @@ class LineProtocol(ThreadingActor): if 'received' not in message: return - self.disable_timeout() + self.client.disable_timeout() self.log_raw_data(message['received']) for line in self.parse_lines(message['received']): @@ -138,53 +250,11 @@ class LineProtocol(ThreadingActor): self.log_request(line) self.on_line_received(line) - self.enable_timeout() + self.client.enable_timeout() def on_stop(self): """Ensure that cleanup when actor stops.""" - self.disable_timeout() - self.disable_recv() - self.disable_send() - try: - self.sock.close() - except socket.error: - pass - - def disable_timeout(self): - """Deactivate timeout mechanism.""" - if self.timeout_id is not None: - gobject.source_remove(self.timeout_id) - self.timeout_id = None - - def disable_recv(self): - """Deactivate recv mechanism.""" - if self.recv_id is not None: - gobject.source_remove(self.recv_id) - self.recv_id = None - - def disable_send(self): - """Deactivate send mechanism.""" - if self.send_id: - gobject.source_remove(self.send_id) - self.send_id = None - - def enable_recv(self): - """Reactivate recv mechanism.""" - if self.recv_id is None: - self.recv_id = gobject.io_add_watch(self.sock.fileno(), gobject.IO_IN | - gobject.IO_ERR | gobject.IO_HUP, self._recv) - - def enable_send(self): - """Reactivate send mechanism.""" - if self.send_id is None: - self.send_id = gobject.io_add_watch(self.sock.fileno(), - gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, self._send) - - def enable_timeout(self): - """Reactivate timeout mechanism.""" - self.disable_timeout() - self.timeout_id = gobject.timeout_add_seconds(self.timeout, - self._timeout) + self.client.stop() def parse_lines(self, new_data=None): """Consume new data and yield any lines found.""" @@ -201,8 +271,8 @@ class LineProtocol(ThreadingActor): Can be overridden by subclasses to change logging behaviour. """ - logger.debug(u'Got %s from event loop in %s', - repr(data), self.actor_urn) + logger.debug(u'Got %s from event loop in %s', repr(data), + self.actor_urn) def log_request(self, request): """ @@ -210,8 +280,8 @@ class LineProtocol(ThreadingActor): Can be overridden by subclasses to change logging behaviour. """ - logger.debug(u'Request from [%s]:%s to %s: %s', - self.host, self.port, self.actor_urn, indent(request)) + logger.debug(u'Request from %s to %s: %s', self.client, self.actor_urn, + indent(request)) def log_response(self, response): """ @@ -219,8 +289,8 @@ class LineProtocol(ThreadingActor): Can be overridden by subclasses to change logging behaviour. """ - logger.debug(u'Response to [%s]:%s from %s: %s', - self.host, self.port, self.actor_urn, indent(response)) + logger.debug(u'Response to %s from %s: %s', self.client, + self.actor_urn, indent(response)) def log_error(self, error): """ @@ -228,8 +298,8 @@ class LineProtocol(ThreadingActor): Can be overridden by subclasses to change logging behaviour. """ - logger.warning(u'Problem with connection to [%s]:%s in %s: %s', - self.host, self.port, self.actor_urn, error) + logger.warning(u'Problem with connection to %s in %s: %s', + self.client, self.actor_urn, error) def log_timeout(self): """ @@ -237,8 +307,8 @@ class LineProtocol(ThreadingActor): Can be overridden by subclasses to change logging behaviour. """ - logger.debug(u'Closing connection to [%s]:%s in %s due to timeout.', - self.host, self.port, self.actor_urn) + logger.debug(u'Closing connection to %s in %s due to timeout.', + self.client, self.actor_urn) def encode(self, line): """ @@ -272,62 +342,4 @@ class LineProtocol(ThreadingActor): data = self.terminator.join(lines) self.log_response(data) - self.send_raw(self.encode(data + self.terminator)) - - def send_raw(self, data): - """Send data to client exactly as is.""" - self.send_lock.acquire(True) - self.send_buffer += data - self.send_lock.release() - self.enable_send() - - def _recv(self, fd, flags): - # NOTE: This code is _not_ run in the actor's thread, but in the same - # one as the event loop. If this blocks, rest of gobject code will - # likely be blocked as well... - - if flags & (gobject.IO_ERR | gobject.IO_HUP): - actor_ref.stop() - return False - - try: - data = self.sock.recv(4096) - except socket.error as e: - if e.errno in (errno.EWOULDBLOCK, errno.EAGAIN): - return True - self.actor_ref.stop() - return False - - if not data: - self.actor_ref.stop() - return False - - self.actor_ref.send_one_way({'received': data}) - return True - - def _send(self, fd, flags): - # NOTE: This code is _not_ run in the actor's thread... - - # If with can't get the lock, simply try again next time socket is - # ready for sending. - if not self.send_lock.acquire(False): - return True - - try: - sent = self.sock.send(self.send_buffer) - self.send_buffer = self.send_buffer[sent:] - return bool(self.send_buffer) - except socket.error as e: - if e.errno in (errno.EAGAIN, errno.EWOULDBLOCK): - return True - self.log_error(e) - self.actor_ref.stop() - return False - finally: - self.send_lock.release() - - def _timeout(self): - # NOTE: This code is _not_ run in the actor's thread... - self.log_timeout() - self.actor_ref.stop() - return False + self.client.send(self.encode(data + self.terminator))