diff --git a/docs/changes.rst b/docs/changes.rst index 563eac53..6cda6668 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -32,6 +32,10 @@ v0.6.0 (in development) - Add Listener API, :mod:`mopidy.listeners`, to be implemented by actors wanting to receive events from the backend. This is a formalization of the ad hoc events the Last.fm scrobbler has already been using for some time. +- Replaced all of the MPD network code that was provided by asyncore with + custom stack. This change was made to facilitate the future support of the + `idle` command, and to reduce the number of event loops being used. +- Fix metadata update in Shoutcast streaming (Fixes: :issue:`122`) v0.5.0 (2011-06-15) diff --git a/mopidy/core.py b/mopidy/core.py index ea6da28c..97b4ed5c 100644 --- a/mopidy/core.py +++ b/mopidy/core.py @@ -3,7 +3,9 @@ import optparse import os import signal import sys -import time + +import gobject +gobject.threads_init() # Extract any non-GStreamer arguments, and leave the GStreamer arguments for # processing by GStreamer. This needs to be done before GStreamer is imported, @@ -24,26 +26,25 @@ from mopidy.gstreamer import GStreamer from mopidy.utils import get_class from mopidy.utils.log import setup_logging from mopidy.utils.path import get_or_create_folder, get_or_create_file -from mopidy.utils.process import (GObjectEventThread, exit_handler, - stop_remaining_actors, stop_actors_by_class) +from mopidy.utils.process import (exit_handler, stop_remaining_actors, + stop_actors_by_class) from mopidy.utils.settings import list_settings_optparse_callback logger = logging.getLogger('mopidy.core') def main(): signal.signal(signal.SIGTERM, exit_handler) + loop = gobject.MainLoop() try: options = parse_options() setup_logging(options.verbosity_level, options.save_debug_log) check_old_folders() setup_settings(options.interactive) - setup_gobject_loop() setup_gstreamer() setup_mixer() setup_backend() setup_frontends() - while True: - time.sleep(1) + loop.run() except SettingsError as e: logger.error(e.message) except KeyboardInterrupt: @@ -51,6 +52,7 @@ def main(): except Exception as e: logger.exception(e) finally: + loop.quit() stop_frontends() stop_backend() stop_mixer() @@ -99,9 +101,6 @@ def setup_settings(interactive): logger.error(e.message) sys.exit(1) -def setup_gobject_loop(): - GObjectEventThread().start() - def setup_gstreamer(): GStreamer.start() diff --git a/mopidy/frontends/mpd/__init__.py b/mopidy/frontends/mpd/__init__.py index f37b2deb..4deb7b89 100644 --- a/mopidy/frontends/mpd/__init__.py +++ b/mopidy/frontends/mpd/__init__.py @@ -1,10 +1,11 @@ -import asyncore import logging +import sys from pykka.actor import ThreadingActor -from mopidy.frontends.mpd.server import MpdServer -from mopidy.utils.process import BaseThread +from mopidy import settings +from mopidy.frontends.mpd import dispatcher, protocol +from mopidy.utils import network, process, log logger = logging.getLogger('mopidy.frontends.mpd') @@ -24,23 +25,50 @@ class MpdFrontend(ThreadingActor): """ def __init__(self): - self._thread = None + hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME) + port = settings.MPD_SERVER_PORT + + try: + network.Server(hostname, port, protocol=MpdSession) + except IOError, e: + logger.error(u'MPD server startup failed: %s', e) + sys.exit(1) + + logger.info(u'MPD server running at [%s]:%s', hostname, port) + + def on_stop(self): + process.stop_actors_by_class(MpdSession) + + +class MpdSession(network.LineProtocol): + """ + The MPD client session. Keeps track of a single client session. Any + requests from the client is passed on to the MPD request dispatcher. + """ + + terminator = protocol.LINE_TERMINATOR + encoding = protocol.ENCODING + + def __init__(self, client): + super(MpdSession, self).__init__(client) + self.dispatcher = dispatcher.MpdDispatcher(self) def on_start(self): - self._thread = MpdThread() - self._thread.start() + logger.info(u'New MPD connection from [%s]:%s', self.host, self.port) + self.send_lines([u'OK MPD %s' % protocol.VERSION]) - def on_receive(self, message): - pass # Ignore any messages + def on_line_received(self, line): + logger.debug(u'Request from [%s]:%s to %s: %s', self.host, self.port, + self.actor_urn, line) + response = self.dispatcher.handle_request(line) + if not response: + return -class MpdThread(BaseThread): - def __init__(self): - super(MpdThread, self).__init__() - self.name = u'MpdThread' + logger.debug(u'Response to [%s]:%s from %s: %s', self.host, self.port, + self.actor_urn, log.indent(self.terminator.join(response))) + + self.send_lines(response) - def run_inside_try(self): - logger.debug(u'Starting MPD server thread') - server = MpdServer() - server.start() - asyncore.loop() + def close(self): + self.stop() diff --git a/mopidy/frontends/mpd/dispatcher.py b/mopidy/frontends/mpd/dispatcher.py index 18f994de..0f0f0299 100644 --- a/mopidy/frontends/mpd/dispatcher.py +++ b/mopidy/frontends/mpd/dispatcher.py @@ -178,7 +178,7 @@ class MpdContext(object): #: The current :class:`MpdDispatcher`. dispatcher = None - #: The current :class:`mopidy.frontends.mpd.session.MpdSession`. + #: The current :class:`mopidy.frontends.mpd.MpdSession`. session = None def __init__(self, dispatcher, session=None): diff --git a/mopidy/frontends/mpd/server.py b/mopidy/frontends/mpd/server.py deleted file mode 100644 index 62e443fb..00000000 --- a/mopidy/frontends/mpd/server.py +++ /dev/null @@ -1,38 +0,0 @@ -import asyncore -import logging -import sys - -from mopidy import settings -from mopidy.utils import network -from .session import MpdSession - -logger = logging.getLogger('mopidy.frontends.mpd.server') - -class MpdServer(asyncore.dispatcher): - """ - The MPD server. Creates a :class:`mopidy.frontends.mpd.session.MpdSession` - for each client connection. - """ - - def start(self): - """Start MPD server.""" - try: - self.set_socket(network.create_socket()) - self.set_reuse_addr() - hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME) - port = settings.MPD_SERVER_PORT - logger.debug(u'MPD server is binding to [%s]:%s', hostname, port) - self.bind((hostname, port)) - self.listen(1) - logger.info(u'MPD server running at [%s]:%s', hostname, port) - except IOError, e: - logger.error(u'MPD server startup failed: %s' % - str(e).decode('utf-8')) - sys.exit(1) - - def handle_accept(self): - """Called by asyncore when a new client connects.""" - (client_socket, client_socket_address) = self.accept() - logger.info(u'MPD client connection from [%s]:%s', - client_socket_address[0], client_socket_address[1]) - MpdSession(self, client_socket, client_socket_address) diff --git a/mopidy/frontends/mpd/session.py b/mopidy/frontends/mpd/session.py deleted file mode 100644 index ce5d3be7..00000000 --- a/mopidy/frontends/mpd/session.py +++ /dev/null @@ -1,58 +0,0 @@ -import asynchat -import logging - -from mopidy.frontends.mpd.dispatcher import MpdDispatcher -from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR, VERSION -from mopidy.utils.log import indent - -logger = logging.getLogger('mopidy.frontends.mpd.session') - -class MpdSession(asynchat.async_chat): - """ - The MPD client session. Keeps track of a single client session. Any - requests from the client is passed on to the MPD request dispatcher. - """ - - def __init__(self, server, client_socket, client_socket_address): - asynchat.async_chat.__init__(self, sock=client_socket) - self.server = server - self.client_address = client_socket_address[0] - self.client_port = client_socket_address[1] - self.input_buffer = [] - self.authenticated = False - self.set_terminator(LINE_TERMINATOR.encode(ENCODING)) - self.dispatcher = MpdDispatcher(session=self) - self.send_response([u'OK MPD %s' % VERSION]) - - def collect_incoming_data(self, data): - """Called by asynchat when new data arrives.""" - self.input_buffer.append(data) - - def found_terminator(self): - """Called by asynchat when a terminator is found in incoming data.""" - data = ''.join(self.input_buffer).strip() - self.input_buffer = [] - try: - self.send_response(self.handle_request(data)) - except UnicodeDecodeError as e: - logger.warning(u'Received invalid data: %s', e) - - def handle_request(self, request): - """Handle the request using the MPD command handlers.""" - request = request.decode(ENCODING) - logger.debug(u'Request from [%s]:%s: %s', self.client_address, - self.client_port, indent(request)) - return self.dispatcher.handle_request(request) - - def send_response(self, response): - """ - Format a response from the MPD command handlers and send it to the - client. - """ - if response: - response = LINE_TERMINATOR.join(response) - logger.debug(u'Response to [%s]:%s: %s', self.client_address, - self.client_port, indent(response)) - response = u'%s%s' % (response, LINE_TERMINATOR) - data = response.encode(ENCODING) - self.push(data) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 166c487e..edcb3084 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -43,9 +43,6 @@ class GStreamer(ThreadingActor): self._handlers = {} def on_start(self): - # **Warning:** :class:`GStreamer` requires - # :class:`mopidy.utils.process.GObjectEventThread` to be running. This - # is not enforced by :class:`GStreamer` itself. self._setup_pipeline() self._setup_outputs() self._setup_message_processor() @@ -277,10 +274,18 @@ class GStreamer(ThreadingActor): taglist = gst.TagList() artists = [a for a in (track.artists or []) if a.name] + # Default to blank data to trick shoutcast into clearing any previous + # values it might have. + taglist[gst.TAG_ARTIST] = u' ' + taglist[gst.TAG_TITLE] = u' ' + taglist[gst.TAG_ALBUM] = u' ' + if artists: taglist[gst.TAG_ARTIST] = u', '.join([a.name for a in artists]) + if track.name: taglist[gst.TAG_TITLE] = track.name + if track.album and track.album.name: taglist[gst.TAG_ALBUM] = track.album.name diff --git a/mopidy/utils/__init__.py b/mopidy/utils/__init__.py index acbb4664..9d7532a0 100644 --- a/mopidy/utils/__init__.py +++ b/mopidy/utils/__init__.py @@ -18,9 +18,11 @@ def import_module(name): return sys.modules[name] def get_class(name): + logger.debug('Loading: %s', name) + if '.' not in name: + raise ImportError("Couldn't load: %s" % name) module_name = name[:name.rindex('.')] class_name = name[name.rindex('.') + 1:] - logger.debug('Loading: %s', name) try: module = import_module(module_name) class_object = getattr(module, class_name) diff --git a/mopidy/utils/network.py b/mopidy/utils/network.py index 1dedf7d7..b7cc144d 100644 --- a/mopidy/utils/network.py +++ b/mopidy/utils/network.py @@ -1,10 +1,20 @@ +import errno +import gobject import logging import re import socket +import threading + +from pykka import ActorDeadError +from pykka.actor import ThreadingActor +from pykka.registry import ActorRegistry logger = logging.getLogger('mopidy.utils.server') -def _try_ipv6_socket(): +class ShouldRetrySocketCall(Exception): + """Indicate that attempted socket call should be retried""" + +def try_ipv6_socket(): """Determine if system really supports IPv6""" if not socket.has_ipv6: return False @@ -17,7 +27,7 @@ def _try_ipv6_socket(): return False #: Boolean value that indicates if creating an IPv6 socket will succeed. -has_ipv6 = _try_ipv6_socket() +has_ipv6 = try_ipv6_socket() def create_socket(): """Create a TCP socket with or without IPv6 depending on system support""" @@ -27,6 +37,7 @@ def create_socket(): sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) else: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return sock def format_hostname(hostname): @@ -34,3 +45,330 @@ def format_hostname(hostname): if (has_ipv6 and re.match('\d+.\d+.\d+.\d+', hostname) is not None): hostname = '::ffff:%s' % hostname return hostname + +class Server(object): + """Setup listener and register it with gobject's event loop.""" + + def __init__(self, host, port, protocol, max_connections=5, timeout=30): + self.protocol = protocol + self.max_connections = max_connections + self.timeout = timeout + self.server_socket = self.create_server_socket(host, port) + + self.register_server_socket(self.server_socket.fileno()) + + def create_server_socket(self, host, port): + sock = create_socket() + sock.setblocking(False) + sock.bind((host, port)) + sock.listen(1) + return sock + + def register_server_socket(self, fileno): + gobject.io_add_watch(fileno, gobject.IO_IN, self.handle_connection) + + def handle_connection(self, fd, flags): + try: + sock, addr = self.accept_connection() + except ShouldRetrySocketCall: + return True + + if self.maximum_connections_exceeded(): + self.reject_connection(sock, addr) + else: + self.init_connection(sock, addr) + return True + + def accept_connection(self): + try: + return self.server_socket.accept() + except socket.error as e: + if e.errno in (errno.EAGAIN, errno.EINTR): + raise ShouldRetrySocketCall + raise + + def maximum_connections_exceeded(self): + return (self.max_connections is not None and + self.number_of_connections() >= self.max_connections) + + def number_of_connections(self): + return len(ActorRegistry.get_by_class(self.protocol)) + + def reject_connection(self, sock, addr): + # FIXME provide more context in logging? + logger.warning(u'Rejected connection from [%s]:%s', addr[0], addr[1]) + try: + sock.close() + except socket.error: + pass + + def init_connection(self, sock, addr): + Connection(self.protocol, sock, addr, self.timeout) + + +class Connection(object): + # NOTE: the callback code is _not_ run in the actor's thread, but in the + # same one as the event loop. If code in the callbacks blocks, the rest of + # gobject code will likely be blocked as well... + # + # Also note that source_remove() return values are ignored on purpose, a + # false return value would only tell us that what we thought was registered + # is already gone, there is really nothing more we can do. + + def __init__(self, protocol, sock, addr, timeout): + sock.setblocking(False) + + self.host, self.port = addr[:2] # IPv6 has larger addr + + self.sock = sock + self.protocol = protocol + self.timeout = timeout + + self.send_lock = threading.Lock() + self.send_buffer = '' + + self.stopping = False + + self.recv_id = None + self.send_id = None + self.timeout_id = None + + self.actor_ref = self.protocol.start(self) + + self.enable_recv() + self.enable_timeout() + + def stop(self, reason, level=logging.DEBUG): + if self.stopping: + logger.log(level, 'Already stopping: %s' % reason) + return + else: + self.stopping = True + + logger.log(level, reason) + + try: + self.actor_ref.stop() + except ActorDeadError: + pass + + self.disable_timeout() + self.disable_recv() + self.disable_send() + + try: + self.sock.close() + except socket.error: + pass + + def send(self, data): + """Send data to client exactly as is.""" + self.send_lock.acquire(True) + self.send_buffer += data + self.send_lock.release() + self.enable_send() + + def enable_timeout(self): + """Reactivate timeout mechanism.""" + if self.timeout <= 0: + return + + self.disable_timeout() + self.timeout_id = gobject.timeout_add_seconds( + self.timeout, self.timeout_callback) + + def disable_timeout(self): + """Deactivate timeout mechanism.""" + if self.timeout_id is None: + return + gobject.source_remove(self.timeout_id) + self.timeout_id = None + + def enable_recv(self): + if self.recv_id is not None: + return + + try: + self.recv_id = gobject.io_add_watch(self.sock.fileno(), + gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP, + self.recv_callback) + except socket.error as e: + self.stop(u'Problem with connection: %s' % e) + + def disable_recv(self): + if self.recv_id is None: + return + gobject.source_remove(self.recv_id) + self.recv_id = None + + def enable_send(self): + if self.send_id is not None: + return + + try: + self.send_id = gobject.io_add_watch(self.sock.fileno(), + gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, + self.send_callback) + except socket.error as e: + self.stop(u'Problem with connection: %s' % e) + + def disable_send(self): + if self.send_id is None: + return + + gobject.source_remove(self.send_id) + self.send_id = None + + def recv_callback(self, fd, flags): + if flags & (gobject.IO_ERR | gobject.IO_HUP): + self.stop(u'Bad client flags: %s' % flags) + return True + + try: + data = self.sock.recv(4096) + except socket.error as e: + if e.errno not in (errno.EWOULDBLOCK, errno.EINTR): + self.stop(u'Unexpected client error: %s' % e) + return True + + if not data: + self.stop(u'Client most likely disconnected.') + return True + + try: + self.actor_ref.send_one_way({'received': data}) + except ActorDeadError: + self.stop(u'Actor is dead.') + + return True + + def send_callback(self, fd, flags): + if flags & (gobject.IO_ERR | gobject.IO_HUP): + self.stop(u'Bad client flags: %s' % flags) + return True + + # If with can't get the lock, simply try again next time socket is + # ready for sending. + if not self.send_lock.acquire(False): + return True + + try: + sent = self.sock.send(self.send_buffer) + self.send_buffer = self.send_buffer[sent:] + if not self.send_buffer: + self.disable_send() + except socket.error as e: + if e.errno not in (errno.EWOULDBLOCK, errno.EINTR): + self.stop(u'Unexpected client error: %s' % e) + finally: + self.send_lock.release() + + return True + + def timeout_callback(self): + self.stop(u'Client timeout out after %s seconds' % self.timeout) + return False + + +class LineProtocol(ThreadingActor): + """ + Base class for handling line based protocols. + + Takes care of receiving new data from server's client code, decoding and + then splitting data along line boundaries. + """ + + #: What terminator to use to split lines. + terminator = '\n' + + #: What encoding to expect incomming data to be in, can be :class:`None`. + encoding = 'utf-8' + + def __init__(self, connection): + self.connection = connection + self.recv_buffer = '' + + @property + def host(self): + return self.connection.host + + @property + def port(self): + return self.connection.port + + def on_line_received(self, line): + """ + Called whenever a new line is found. + + Should be implemented by subclasses. + """ + raise NotImplementedError + + def on_receive(self, message): + """Handle messages with new data from server.""" + if 'received' not in message: + return + + self.connection.disable_timeout() + self.recv_buffer += message['received'] + + for line in self.parse_lines(): + line = self.decode(line) + self.on_line_received(line) + + self.connection.enable_timeout() + + def on_stop(self): + """Ensure that cleanup when actor stops.""" + self.connection.stop(u'Actor is shutting down.') + + def parse_lines(self): + """Consume new data and yield any lines found.""" + while re.search(self.terminator, self.recv_buffer): + line, self.recv_buffer = re.split(self.terminator, + self.recv_buffer, 1) + yield line + + def encode(self, line): + """ + Handle encoding of line. + + Can be overridden by subclasses to change encoding behaviour. + """ + try: + return line.encode(self.encoding) + except UnicodeError: + logger.warning(u'Stopping actor due to encode problem, data ' + 'supplied by client was not valid %s', self.encoding) + self.stop() + + def decode(self, line): + """ + Handle decoding of line. + + Can be overridden by subclasses to change decoding behaviour. + """ + try: + return line.decode(self.encoding) + except UnicodeError: + logger.warning(u'Stopping actor due to decode problem, data ' + 'supplied by client was not valid %s', self.encoding) + self.stop() + + def join_lines(self, lines): + if not lines: + return u'' + return self.terminator.join(lines) + self.terminator + + def send_lines(self, lines): + """ + Send array of lines to client via connection. + + Join lines using the terminator that is set for this class, encode it + and send it to the client. + """ + if not lines: + return + + data = self.join_lines(lines) + self.connection.send(self.encode(data)) diff --git a/mopidy/utils/process.py b/mopidy/utils/process.py index 758f8943..80d850fe 100644 --- a/mopidy/utils/process.py +++ b/mopidy/utils/process.py @@ -3,9 +3,6 @@ import signal import thread import threading -import gobject -gobject.threads_init() - from pykka import ActorDeadError from pykka.registry import ActorRegistry @@ -68,25 +65,3 @@ class BaseThread(threading.Thread): def run_inside_try(self): raise NotImplementedError - - -class GObjectEventThread(BaseThread): - """ - A GObject event loop which is shared by all Mopidy components that uses - libraries that need a GObject event loop, like GStreamer and D-Bus. - - Should be started by Mopidy's core and used by - :mod:`mopidy.output.gstreamer`, :mod:`mopidy.frontend.mpris`, etc. - """ - - def __init__(self): - super(GObjectEventThread, self).__init__() - self.name = u'GObjectEventThread' - self.loop = None - - def run_inside_try(self): - self.loop = gobject.MainLoop().run() - - def destroy(self): - self.loop.quit() - super(GObjectEventThread, self).destroy() diff --git a/requirements/tests.txt b/requirements/tests.txt index f8cf2eb3..0bc8380f 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -1,4 +1,4 @@ coverage -mock +mock >= 0.7 nose tox diff --git a/tests/__init__.py b/tests/__init__.py index 1d4d2e3d..663b89ec 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -22,3 +22,22 @@ def path_to_data_dir(name): path = os.path.abspath(path) return os.path.join(path, name) +class IsA(object): + def __init__(self, klass): + self.klass = klass + + def __eq__(self, rhs): + try: + return isinstance(rhs, self.klass) + except TypeError: + return type(rhs) == type(self.klass) + + def __ne__(self, rhs): + return not self.__eq__(rhs) + + def __repr__(self): + return str(self.klass) + +any_int = IsA(int) +any_str = IsA(str) +any_unicode = IsA(unicode) diff --git a/tests/frontends/mpd/authentication_test.py b/tests/frontends/mpd/authentication_test.py index d795d726..fb32ea54 100644 --- a/tests/frontends/mpd/authentication_test.py +++ b/tests/frontends/mpd/authentication_test.py @@ -2,8 +2,8 @@ import mock import unittest from mopidy import settings +from mopidy.frontends.mpd import MpdSession from mopidy.frontends.mpd.dispatcher import MpdDispatcher -from mopidy.frontends.mpd.session import MpdSession class AuthenticationTest(unittest.TestCase): def setUp(self): diff --git a/tests/frontends/mpd/connection_test.py b/tests/frontends/mpd/connection_test.py index bc995a5e..82debabb 100644 --- a/tests/frontends/mpd/connection_test.py +++ b/tests/frontends/mpd/connection_test.py @@ -3,8 +3,8 @@ import unittest from mopidy import settings from mopidy.backends.dummy import DummyBackend +from mopidy.frontends.mpd import MpdSession from mopidy.frontends.mpd.dispatcher import MpdDispatcher -from mopidy.frontends.mpd.session import MpdSession from mopidy.mixers.dummy import DummyMixer class ConnectionHandlerTest(unittest.TestCase): diff --git a/tests/frontends/mpd/server_test.py b/tests/frontends/mpd/server_test.py deleted file mode 100644 index b2e27559..00000000 --- a/tests/frontends/mpd/server_test.py +++ /dev/null @@ -1,23 +0,0 @@ -import unittest - -from mopidy import settings -from mopidy.backends.dummy import DummyBackend -from mopidy.frontends.mpd import server -from mopidy.mixers.dummy import DummyMixer - -class MpdSessionTest(unittest.TestCase): - def setUp(self): - self.backend = DummyBackend.start().proxy() - self.mixer = DummyMixer.start().proxy() - self.session = server.MpdSession(None, None, (None, None)) - - def tearDown(self): - self.backend.stop().get() - self.mixer.stop().get() - settings.runtime.clear() - - def test_found_terminator_catches_decode_error(self): - # Pressing Ctrl+C in a telnet session sends a 0xff byte to the server. - self.session.input_buffer = ['\xff'] - self.session.found_terminator() - self.assertEqual(len(self.session.input_buffer), 0) diff --git a/tests/utils/init_test.py b/tests/utils/init_test.py index fb38e2ea..70dd7e36 100644 --- a/tests/utils/init_test.py +++ b/tests/utils/init_test.py @@ -4,12 +4,13 @@ from mopidy.utils import get_class class GetClassTest(unittest.TestCase): def test_loading_module_that_does_not_exist(self): - test = lambda: get_class('foo.bar.Baz') - self.assertRaises(ImportError, test) + self.assertRaises(ImportError, get_class, 'foo.bar.Baz') def test_loading_class_that_does_not_exist(self): - test = lambda: get_class('unittest.FooBarBaz') - self.assertRaises(ImportError, test) + self.assertRaises(ImportError, get_class, 'unittest.FooBarBaz') + + def test_loading_incorrect_class_path(self): + self.assertRaises(ImportError, get_class, 'foobarbaz') def test_import_error_message_contains_complete_class_path(self): try: diff --git a/tests/utils/network/__init__.py b/tests/utils/network/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/network/connection_test.py b/tests/utils/network/connection_test.py new file mode 100644 index 00000000..6e68f250 --- /dev/null +++ b/tests/utils/network/connection_test.py @@ -0,0 +1,524 @@ +import errno +import gobject +import logging +import pykka +import socket +import unittest + +from mopidy.utils import network + +from mock import patch, sentinel, Mock +from tests import any_int, any_unicode + +class ConnectionTest(unittest.TestCase): + def setUp(self): + self.mock = Mock(spec=network.Connection) + + def test_init_ensure_nonblocking_io(self): + sock = Mock(spec=socket.SocketType) + + network.Connection.__init__(self.mock, Mock(), sock, + (sentinel.host, sentinel.port), sentinel.timeout) + sock.setblocking.assert_called_once_with(False) + + def test_init_starts_actor(self): + protocol = Mock(spec=network.LineProtocol) + + network.Connection.__init__(self.mock, protocol, Mock(), + (sentinel.host, sentinel.port), sentinel.timeout) + protocol.start.assert_called_once_with(self.mock) + + def test_init_enables_recv_and_timeout(self): + network.Connection.__init__(self.mock, Mock(), Mock(), + (sentinel.host, sentinel.port), sentinel.timeout) + self.mock.enable_recv.assert_called_once_with() + self.mock.enable_timeout.assert_called_once_with() + + def test_init_stores_values_in_attributes(self): + addr = (sentinel.host, sentinel.port) + protocol = Mock(spec=network.LineProtocol) + sock = Mock(spec=socket.SocketType) + + network.Connection.__init__( + self.mock, protocol, sock, addr, sentinel.timeout) + self.assertEqual(sock, self.mock.sock) + self.assertEqual(protocol, self.mock.protocol) + self.assertEqual(sentinel.timeout, self.mock.timeout) + self.assertEqual(sentinel.host, self.mock.host) + self.assertEqual(sentinel.port, self.mock.port) + + def test_init_handles_ipv6_addr(self): + addr = (sentinel.host, sentinel.port, + sentinel.flowinfo, sentinel.scopeid) + protocol = Mock(spec=network.LineProtocol) + sock = Mock(spec=socket.SocketType) + + network.Connection.__init__( + self.mock, protocol, sock, addr, sentinel.timeout) + self.assertEqual(sentinel.host, self.mock.host) + self.assertEqual(sentinel.port, self.mock.port) + + def test_stop_disables_recv_send_and_timeout(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + self.mock.disable_timeout.assert_called_once_with() + self.mock.disable_recv.assert_called_once_with() + self.mock.disable_send.assert_called_once_with() + + def test_stop_closes_socket(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + self.mock.sock.close.assert_called_once_with() + + def test_stop_closes_socket_error(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.close.side_effect = socket.error + + network.Connection.stop(self.mock, sentinel.reason) + self.mock.sock.close.assert_called_once_with() + + def test_stop_stops_actor(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + self.mock.actor_ref.stop.assert_called_once_with() + + def test_stop_handles_actor_already_being_stopped(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.actor_ref.stop.side_effect = pykka.ActorDeadError() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + self.mock.actor_ref.stop.assert_called_once_with() + + def test_stop_sets_stopping_to_true(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + self.assertEqual(True, self.mock.stopping) + + def test_stop_does_not_proceed_when_already_stopping(self): + self.mock.stopping = True + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + self.assertEqual(0, self.mock.actor_ref.stop.call_count) + self.assertEqual(0, self.mock.sock.close.call_count) + + @patch.object(network.logger, 'log', new=Mock()) + def test_stop_logs_reason(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + network.logger.log.assert_called_once_with( + logging.DEBUG, sentinel.reason) + + @patch.object(network.logger, 'log', new=Mock()) + def test_stop_logs_reason_with_level(self): + self.mock.stopping = False + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason, + level=sentinel.level) + network.logger.log.assert_called_once_with( + sentinel.level, sentinel.reason) + + @patch.object(network.logger, 'log', new=Mock()) + def test_stop_logs_that_it_is_calling_itself(self): + self.mock.stopping = True + self.mock.actor_ref = Mock() + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.stop(self.mock, sentinel.reason) + network.logger.log(any_int, any_unicode) + + @patch.object(gobject, 'io_add_watch', new=Mock()) + def test_enable_recv_registers_with_gobject(self): + self.mock.recv_id = None + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.fileno.return_value = sentinel.fileno + gobject.io_add_watch.return_value = sentinel.tag + + network.Connection.enable_recv(self.mock) + gobject.io_add_watch.assert_called_once_with(sentinel.fileno, + gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP, + self.mock.recv_callback) + self.assertEqual(sentinel.tag, self.mock.recv_id) + + @patch.object(gobject, 'io_add_watch', new=Mock()) + def test_enable_recv_already_registered(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.recv_id = sentinel.tag + + network.Connection.enable_recv(self.mock) + self.assertEqual(0, gobject.io_add_watch.call_count) + + def test_enable_recv_does_not_change_tag(self): + self.mock.recv_id = sentinel.tag + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.enable_recv(self.mock) + self.assertEqual(sentinel.tag, self.mock.recv_id) + + @patch.object(gobject, 'source_remove', new=Mock()) + def test_disable_recv_deregisters(self): + self.mock.recv_id = sentinel.tag + + network.Connection.disable_recv(self.mock) + gobject.source_remove.assert_called_once_with(sentinel.tag) + self.assertEqual(None, self.mock.recv_id) + + @patch.object(gobject, 'source_remove', new=Mock()) + def test_disable_recv_already_deregistered(self): + self.mock.recv_id = None + + network.Connection.disable_recv(self.mock) + self.assertEqual(0, gobject.source_remove.call_count) + self.assertEqual(None, self.mock.recv_id) + + def test_enable_recv_on_closed_socket(self): + self.mock.recv_id = None + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.fileno.side_effect = socket.error(errno.EBADF, '') + + network.Connection.enable_recv(self.mock) + self.mock.stop.assert_called_once_with(any_unicode) + self.assertEqual(None, self.mock.recv_id) + + @patch.object(gobject, 'io_add_watch', new=Mock()) + def test_enable_send_registers_with_gobject(self): + self.mock.send_id = None + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.fileno.return_value = sentinel.fileno + gobject.io_add_watch.return_value = sentinel.tag + + network.Connection.enable_send(self.mock) + gobject.io_add_watch.assert_called_once_with(sentinel.fileno, + gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP, + self.mock.send_callback) + self.assertEqual(sentinel.tag, self.mock.send_id) + + @patch.object(gobject, 'io_add_watch', new=Mock()) + def test_enable_send_already_registered(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.send_id = sentinel.tag + + network.Connection.enable_send(self.mock) + self.assertEqual(0, gobject.io_add_watch.call_count) + + def test_enable_send_does_not_change_tag(self): + self.mock.send_id = sentinel.tag + self.mock.sock = Mock(spec=socket.SocketType) + + network.Connection.enable_send(self.mock) + self.assertEqual(sentinel.tag, self.mock.send_id) + + @patch.object(gobject, 'source_remove', new=Mock()) + def test_disable_send_deregisters(self): + self.mock.send_id = sentinel.tag + + network.Connection.disable_send(self.mock) + gobject.source_remove.assert_called_once_with(sentinel.tag) + self.assertEqual(None, self.mock.send_id) + + @patch.object(gobject, 'source_remove', new=Mock()) + def test_disable_send_already_deregistered(self): + self.mock.send_id = None + + network.Connection.disable_send(self.mock) + self.assertEqual(0, gobject.source_remove.call_count) + self.assertEqual(None, self.mock.send_id) + + def test_enable_send_on_closed_socket(self): + self.mock.send_id = None + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.fileno.side_effect = socket.error(errno.EBADF, '') + + network.Connection.enable_send(self.mock) + self.assertEqual(None, self.mock.send_id) + + @patch.object(gobject, 'timeout_add_seconds', new=Mock()) + def test_enable_timeout_clears_existing_timeouts(self): + self.mock.timeout = 10 + + network.Connection.enable_timeout(self.mock) + self.mock.disable_timeout.assert_called_once_with() + + @patch.object(gobject, 'timeout_add_seconds', new=Mock()) + def test_enable_timeout_add_gobject_timeout(self): + self.mock.timeout = 10 + gobject.timeout_add_seconds.return_value = sentinel.tag + + network.Connection.enable_timeout(self.mock) + gobject.timeout_add_seconds.assert_called_once_with(10, + self.mock.timeout_callback) + self.assertEqual(sentinel.tag, self.mock.timeout_id) + + @patch.object(gobject, 'timeout_add_seconds', new=Mock()) + def test_enable_timeout_does_not_add_timeout(self): + self.mock.timeout = 0 + network.Connection.enable_timeout(self.mock) + self.assertEqual(0, gobject.timeout_add_seconds.call_count) + + self.mock.timeout = -1 + network.Connection.enable_timeout(self.mock) + self.assertEqual(0, gobject.timeout_add_seconds.call_count) + + self.mock.timeout = None + network.Connection.enable_timeout(self.mock) + self.assertEqual(0, gobject.timeout_add_seconds.call_count) + + def test_enable_timeout_does_not_call_disable_for_invalid_timeout(self): + self.mock.timeout = 0 + network.Connection.enable_timeout(self.mock) + self.assertEqual(0, self.mock.disable_timeout.call_count) + + self.mock.timeout = -1 + network.Connection.enable_timeout(self.mock) + self.assertEqual(0, self.mock.disable_timeout.call_count) + + self.mock.timeout = None + network.Connection.enable_timeout(self.mock) + self.assertEqual(0, self.mock.disable_timeout.call_count) + + @patch.object(gobject, 'source_remove', new=Mock()) + def test_disable_timeout_deregisters(self): + self.mock.timeout_id = sentinel.tag + + network.Connection.disable_timeout(self.mock) + gobject.source_remove.assert_called_once_with(sentinel.tag) + self.assertEqual(None, self.mock.timeout_id) + + @patch.object(gobject, 'source_remove', new=Mock()) + def test_disable_timeout_already_deregistered(self): + self.mock.timeout_id = None + + network.Connection.disable_timeout(self.mock) + self.assertEqual(0, gobject.source_remove.call_count) + self.assertEqual(None, self.mock.timeout_id) + + def test_send_acquires_and_releases_lock(self): + self.mock.send_lock = Mock() + self.mock.send_buffer = '' + + network.Connection.send(self.mock, 'data') + self.mock.send_lock.acquire.assert_called_once_with(True) + self.mock.send_lock.release.assert_called_once_with() + + def test_send_appends_to_send_buffer(self): + self.mock.send_lock = Mock() + self.mock.send_buffer = '' + + network.Connection.send(self.mock, 'abc') + self.assertEqual('abc', self.mock.send_buffer) + + network.Connection.send(self.mock, 'def') + self.assertEqual('abcdef', self.mock.send_buffer) + + network.Connection.send(self.mock, '') + self.assertEqual('abcdef', self.mock.send_buffer) + + def test_send_calls_enable_send(self): + self.mock.send_lock = Mock() + self.mock.send_buffer = '' + + network.Connection.send(self.mock, 'data') + self.mock.enable_send.assert_called_once_with() + + def test_recv_callback_respects_io_err(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.actor_ref = Mock() + + self.assertTrue(network.Connection.recv_callback(self.mock, + sentinel.fd, gobject.IO_IN | gobject.IO_ERR)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_recv_callback_respects_io_hup(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.actor_ref = Mock() + + self.assertTrue(network.Connection.recv_callback(self.mock, + sentinel.fd, gobject.IO_IN | gobject.IO_HUP)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_recv_callback_respects_io_hup_and_io_err(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.actor_ref = Mock() + + self.assertTrue(network.Connection.recv_callback(self.mock, + sentinel.fd, gobject.IO_IN | gobject.IO_HUP | gobject.IO_ERR)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_recv_callback_sends_data_to_actor(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.recv.return_value = 'data' + self.mock.actor_ref = Mock() + + self.assertTrue(network.Connection.recv_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.actor_ref.send_one_way.assert_called_once_with( + {'received': 'data'}) + + def test_recv_callback_handles_dead_actors(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.recv.return_value = 'data' + self.mock.actor_ref = Mock() + self.mock.actor_ref.send_one_way.side_effect = pykka.ActorDeadError() + + self.assertTrue(network.Connection.recv_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_recv_callback_gets_no_data(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.recv.return_value = '' + self.mock.actor_ref = Mock() + + self.assertTrue(network.Connection.recv_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_recv_callback_recoverable_error(self): + self.mock.sock = Mock(spec=socket.SocketType) + + for error in (errno.EWOULDBLOCK, errno.EINTR): + self.mock.sock.recv.side_effect = socket.error(error, '') + self.assertTrue(network.Connection.recv_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.assertEqual(0, self.mock.stop.call_count) + + def test_recv_callback_unrecoverable_error(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.recv.side_effect = socket.error + + self.assertTrue(network.Connection.recv_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_send_callback_respects_io_err(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.send.return_value = 1 + self.mock.send_lock = Mock() + self.mock.actor_ref = Mock() + self.mock.send_buffer = '' + + self.assertTrue(network.Connection.send_callback(self.mock, + sentinel.fd, gobject.IO_IN | gobject.IO_ERR)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_send_callback_respects_io_hup(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.send.return_value = 1 + self.mock.send_lock = Mock() + self.mock.actor_ref = Mock() + self.mock.send_buffer = '' + + self.assertTrue(network.Connection.send_callback(self.mock, + sentinel.fd, gobject.IO_IN | gobject.IO_HUP)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_send_callback_respects_io_hup_and_io_err(self): + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.send.return_value = 1 + self.mock.send_lock = Mock() + self.mock.actor_ref = Mock() + self.mock.send_buffer = '' + + self.assertTrue(network.Connection.send_callback(self.mock, + sentinel.fd, gobject.IO_IN | gobject.IO_HUP | gobject.IO_ERR)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_send_callback_acquires_and_releases_lock(self): + self.mock.send_lock = Mock() + self.mock.send_lock.acquire.return_value = True + self.mock.send_buffer = '' + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.send.return_value = 0 + + self.assertTrue(network.Connection.send_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.send_lock.acquire.assert_called_once_with(False) + self.mock.send_lock.release.assert_called_once_with() + + def test_send_callback_fails_to_acquire_lock(self): + self.mock.send_lock = Mock() + self.mock.send_lock.acquire.return_value = False + self.mock.send_buffer = '' + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.send.return_value = 0 + + self.assertTrue(network.Connection.send_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.send_lock.acquire.assert_called_once_with(False) + self.assertEqual(0, self.mock.sock.send.call_count) + + def test_send_callback_sends_all_data(self): + self.mock.send_lock = Mock() + self.mock.send_lock.acquire.return_value = True + self.mock.send_buffer = 'data' + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.send.return_value = 4 + + self.assertTrue(network.Connection.send_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.disable_send.assert_called_once_with() + self.mock.sock.send.assert_called_once_with('data') + self.assertEqual('', self.mock.send_buffer) + + def test_send_callback_sends_partial_data(self): + self.mock.send_lock = Mock() + self.mock.send_lock.acquire.return_value = True + self.mock.send_buffer = 'data' + self.mock.sock = Mock(spec=socket.SocketType) + self.mock.sock.send.return_value = 2 + + self.assertTrue(network.Connection.send_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.sock.send.assert_called_once_with('data') + self.assertEqual('ta', self.mock.send_buffer) + + def test_send_callback_recoverable_error(self): + self.mock.send_lock = Mock() + self.mock.send_lock.acquire.return_value = True + self.mock.send_buffer = 'data' + self.mock.sock = Mock(spec=socket.SocketType) + + for error in (errno.EWOULDBLOCK, errno.EINTR): + self.mock.sock.send.side_effect = socket.error(error, '') + self.assertTrue(network.Connection.send_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.assertEqual(0, self.mock.stop.call_count) + + def test_send_callback_unrecoverable_error(self): + self.mock.send_lock = Mock() + self.mock.send_lock.acquire.return_value = True + self.mock.send_buffer = 'data' + self.mock.sock = Mock(spec=socket.SocketType) + + self.mock.sock.send.side_effect = socket.error + self.assertTrue(network.Connection.send_callback( + self.mock, sentinel.fd, gobject.IO_IN)) + self.mock.stop.assert_called_once_with(any_unicode) + + def test_timeout_callback(self): + self.mock.timeout = 10 + + self.assertFalse(network.Connection.timeout_callback(self.mock)) + self.mock.stop.assert_called_once_with(any_unicode) diff --git a/tests/utils/network/lineprotocol_test.py b/tests/utils/network/lineprotocol_test.py new file mode 100644 index 00000000..a87f461c --- /dev/null +++ b/tests/utils/network/lineprotocol_test.py @@ -0,0 +1,239 @@ +#encoding: utf-8 + +import unittest + +from mopidy.utils import network + +from mock import sentinel, Mock + +class LineProtocolTest(unittest.TestCase): + def setUp(self): + self.mock = Mock(spec=network.LineProtocol) + self.mock.terminator = network.LineProtocol.terminator + self.mock.encoding = network.LineProtocol.encoding + + def test_init_stores_values_in_attributes(self): + network.LineProtocol.__init__(self.mock, sentinel.connection) + self.assertEqual(sentinel.connection, self.mock.connection) + self.assertEqual('', self.mock.recv_buffer) + + def test_on_receive_no_new_lines_adds_to_recv_buffer(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.recv_buffer = '' + self.mock.parse_lines.return_value = [] + + network.LineProtocol.on_receive(self.mock, {'received': 'data'}) + self.assertEqual('data', self.mock.recv_buffer) + self.mock.parse_lines.assert_called_once_with() + self.assertEqual(0, self.mock.on_line_received.call_count) + + def test_on_receive_toggles_timeout(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.recv_buffer = '' + self.mock.parse_lines.return_value = [] + + network.LineProtocol.on_receive(self.mock, {'received': 'data'}) + self.mock.connection.disable_timeout.assert_called_once_with() + self.mock.connection.enable_timeout.assert_called_once_with() + + def test_on_receive_no_new_lines_calls_parse_lines(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.recv_buffer = '' + self.mock.parse_lines.return_value = [] + + network.LineProtocol.on_receive(self.mock, {'received': 'data'}) + self.mock.parse_lines.assert_called_once_with() + self.assertEqual(0, self.mock.on_line_received.call_count) + + def test_on_receive_with_new_line_calls_decode(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.recv_buffer = '' + self.mock.parse_lines.return_value = [sentinel.line] + + network.LineProtocol.on_receive(self.mock, {'received': 'data\n'}) + self.mock.parse_lines.assert_called_once_with() + self.mock.decode.assert_called_once_with(sentinel.line) + + def test_on_receive_with_new_line_calls_on_recieve(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.recv_buffer = '' + self.mock.parse_lines.return_value = [sentinel.line] + self.mock.decode.return_value = sentinel.decoded + + network.LineProtocol.on_receive(self.mock, {'received': 'data\n'}) + self.mock.on_line_received.assert_called_once_with(sentinel.decoded) + + def test_on_receive_with_new_lines_calls_on_recieve(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.recv_buffer = '' + self.mock.parse_lines.return_value = ['line1', 'line2'] + self.mock.decode.return_value = sentinel.decoded + + network.LineProtocol.on_receive(self.mock, + {'received': 'line1\nline2\n'}) + self.assertEqual(2, self.mock.on_line_received.call_count) + + def test_parse_lines_emtpy_buffer(self): + self.mock.recv_buffer = '' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertRaises(StopIteration, lines.next) + + def test_parse_lines_no_terminator(self): + self.mock.recv_buffer = 'data' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertRaises(StopIteration, lines.next) + + def test_parse_lines_termintor(self): + self.mock.recv_buffer = 'data\n' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertEqual('data', lines.next()) + self.assertRaises(StopIteration, lines.next) + self.assertEqual('', self.mock.recv_buffer) + + def test_parse_lines_no_data_before_terminator(self): + self.mock.recv_buffer = '\n' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertEqual('', lines.next()) + self.assertRaises(StopIteration, lines.next) + self.assertEqual('', self.mock.recv_buffer) + + def test_parse_lines_extra_data_after_terminator(self): + self.mock.recv_buffer = 'data1\ndata2' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertEqual('data1', lines.next()) + self.assertRaises(StopIteration, lines.next) + self.assertEqual('data2', self.mock.recv_buffer) + + def test_parse_lines_unicode(self): + self.mock.recv_buffer = u'æøå\n'.encode('utf-8') + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertEqual(u'æøå'.encode('utf-8'), lines.next()) + self.assertRaises(StopIteration, lines.next) + self.assertEqual('', self.mock.recv_buffer) + + def test_parse_lines_multiple_lines(self): + self.mock.recv_buffer = 'abc\ndef\nghi\njkl' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertEqual('abc', lines.next()) + self.assertEqual('def', lines.next()) + self.assertEqual('ghi', lines.next()) + self.assertRaises(StopIteration, lines.next) + self.assertEqual('jkl', self.mock.recv_buffer) + + def test_parse_lines_multiple_calls(self): + self.mock.recv_buffer = 'data1' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertRaises(StopIteration, lines.next) + self.assertEqual('data1', self.mock.recv_buffer) + + self.mock.recv_buffer += '\ndata2' + + lines = network.LineProtocol.parse_lines(self.mock) + self.assertEqual('data1', lines.next()) + self.assertRaises(StopIteration, lines.next) + self.assertEqual('data2', self.mock.recv_buffer) + + def test_send_lines_called_with_no_lines(self): + self.mock.connection = Mock(spec=network.Connection) + + network.LineProtocol.send_lines(self.mock, []) + self.assertEqual(0, self.mock.encode.call_count) + self.assertEqual(0, self.mock.connection.send.call_count) + + def test_send_lines_calls_join_lines(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.join_lines.return_value = 'lines' + + network.LineProtocol.send_lines(self.mock, sentinel.lines) + self.mock.join_lines.assert_called_once_with(sentinel.lines) + + def test_send_line_encodes_joined_lines_with_final_terminator(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.join_lines.return_value = u'lines\n' + + network.LineProtocol.send_lines(self.mock, sentinel.lines) + self.mock.encode.assert_called_once_with(u'lines\n') + + def test_send_lines_sends_encoded_string(self): + self.mock.connection = Mock(spec=network.Connection) + self.mock.join_lines.return_value = 'lines' + self.mock.encode.return_value = sentinel.data + + network.LineProtocol.send_lines(self.mock, sentinel.lines) + self.mock.connection.send.assert_called_once_with(sentinel.data) + + def test_join_lines_returns_empty_string_for_no_lines(self): + self.assertEqual(u'', network.LineProtocol.join_lines(self.mock, [])) + + def test_join_lines_returns_joined_lines(self): + self.assertEqual(u'1\n2\n', network.LineProtocol.join_lines( + self.mock, [u'1', u'2'])) + + def test_decode_calls_decode_on_string(self): + string = Mock() + + network.LineProtocol.decode(self.mock, string) + string.decode.assert_called_once_with(self.mock.encoding) + + def test_decode_plain_ascii(self): + result = network.LineProtocol.decode(self.mock, 'abc') + self.assertEqual(u'abc', result) + self.assertEqual(unicode, type(result)) + + def test_decode_utf8(self): + result = network.LineProtocol.decode( + self.mock, u'æøå'.encode('utf-8')) + self.assertEqual(u'æøå', result) + self.assertEqual(unicode, type(result)) + + def test_decode_invalid_data(self): + string = Mock() + string.decode.side_effect = UnicodeError + + network.LineProtocol.decode(self.mock, string) + self.mock.stop.assert_called_once_with() + + def test_encode_calls_encode_on_string(self): + string = Mock() + + network.LineProtocol.encode(self.mock, string) + string.encode.assert_called_once_with(self.mock.encoding) + + def test_encode_plain_ascii(self): + result = network.LineProtocol.encode(self.mock, u'abc') + self.assertEqual('abc', result) + self.assertEqual(str, type(result)) + + def test_encode_utf8(self): + result = network.LineProtocol.encode(self.mock, u'æøå') + self.assertEqual(u'æøå'.encode('utf-8'), result) + self.assertEqual(str, type(result)) + + def test_encode_invalid_data(self): + string = Mock() + string.encode.side_effect = UnicodeError + + network.LineProtocol.encode(self.mock, string) + self.mock.stop.assert_called_once_with() + + def test_host_property(self): + mock = Mock(spec=network.Connection) + mock.host = sentinel.host + + lineprotocol = network.LineProtocol(mock) + self.assertEqual(sentinel.host, lineprotocol.host) + + def test_port_property(self): + mock = Mock(spec=network.Connection) + mock.port = sentinel.port + + lineprotocol = network.LineProtocol(mock) + self.assertEqual(sentinel.port, lineprotocol.port) diff --git a/tests/utils/network/server_test.py b/tests/utils/network/server_test.py new file mode 100644 index 00000000..75b33d61 --- /dev/null +++ b/tests/utils/network/server_test.py @@ -0,0 +1,186 @@ +import errno +import gobject +import socket +import unittest + +from mopidy.utils import network + +from mock import patch, sentinel, Mock +from tests import any_int + +class ServerTest(unittest.TestCase): + def setUp(self): + self.mock = Mock(spec=network.Server) + + def test_init_calls_create_server_socket(self): + network.Server.__init__(self.mock, sentinel.host, + sentinel.port, sentinel.protocol) + self.mock.create_server_socket.assert_called_once_with( + sentinel.host, sentinel.port) + + def test_init_calls_register_server(self): + sock = Mock(spec=socket.SocketType) + sock.fileno.return_value = sentinel.fileno + self.mock.create_server_socket.return_value = sock + + network.Server.__init__(self.mock, sentinel.host, + sentinel.port, sentinel.protocol) + self.mock.register_server_socket.assert_called_once_with( + sentinel.fileno) + + def test_init_fails_on_fileno_call(self): + sock = Mock(spec=socket.SocketType) + sock.fileno.side_effect = socket.error + self.mock.create_server_socket.return_value = sock + + self.assertRaises(socket.error, network.Server.__init__, + self.mock, sentinel.host, sentinel.port, sentinel.protocol) + + def test_init_stores_values_in_attributes(self): + # This need to be a mock and no a sentinel as fileno() is called on it + sock = Mock(spec=socket.SocketType) + self.mock.create_server_socket.return_value = sock + + network.Server.__init__(self.mock, sentinel.host, sentinel.port, + sentinel.protocol, max_connections=sentinel.max_connections, + timeout=sentinel.timeout) + self.assertEqual(sentinel.protocol, self.mock.protocol) + self.assertEqual(sentinel.max_connections, self.mock.max_connections) + self.assertEqual(sentinel.timeout, self.mock.timeout) + self.assertEqual(sock, self.mock.server_socket) + + @patch.object(network, 'create_socket', spec=socket.SocketType) + def test_create_server_socket_sets_up_listener(self, create_socket): + sock = create_socket.return_value + + network.Server.create_server_socket(self.mock, + sentinel.host, sentinel.port) + sock.setblocking.assert_called_once_with(False) + sock.bind.assert_called_once_with((sentinel.host, sentinel.port)) + sock.listen.assert_called_once_with(any_int) + + @patch.object(network, 'create_socket', new=Mock()) + def test_create_server_socket_fails(self): + network.create_socket.side_effect = socket.error + self.assertRaises(socket.error, network.Server.create_server_socket, + self.mock, sentinel.host, sentinel.port) + + @patch.object(network, 'create_socket', new=Mock()) + def test_create_server_bind_fails(self): + sock = network.create_socket.return_value + sock.bind.side_effect = socket.error + + self.assertRaises(socket.error, network.Server.create_server_socket, + self.mock, sentinel.host, sentinel.port) + + @patch.object(network, 'create_socket', new=Mock()) + def test_create_server_listen_fails(self): + sock = network.create_socket.return_value + sock.listen.side_effect = socket.error + + self.assertRaises(socket.error, network.Server.create_server_socket, + self.mock, sentinel.host, sentinel.port) + + @patch.object(gobject, 'io_add_watch', new=Mock()) + def test_register_server_socket_sets_up_io_watch(self): + network.Server.register_server_socket(self.mock, sentinel.fileno) + gobject.io_add_watch.assert_called_once_with(sentinel.fileno, + gobject.IO_IN, self.mock.handle_connection) + + def test_handle_connection(self): + self.mock.accept_connection.return_value = ( + sentinel.sock, sentinel.addr) + self.mock.maximum_connections_exceeded.return_value = False + + self.assertTrue(network.Server.handle_connection( + self.mock, sentinel.fileno, gobject.IO_IN)) + self.mock.accept_connection.assert_called_once_with() + self.mock.maximum_connections_exceeded.assert_called_once_with() + self.mock.init_connection.assert_called_once_with( + sentinel.sock, sentinel.addr) + self.assertEqual(0, self.mock.reject_connection.call_count) + + def test_handle_connection_exceeded_connections(self): + self.mock.accept_connection.return_value = ( + sentinel.sock, sentinel.addr) + self.mock.maximum_connections_exceeded.return_value = True + + self.assertTrue(network.Server.handle_connection( + self.mock, sentinel.fileno, gobject.IO_IN)) + self.mock.accept_connection.assert_called_once_with() + self.mock.maximum_connections_exceeded.assert_called_once_with() + self.mock.reject_connection.assert_called_once_with( + sentinel.sock, sentinel.addr) + self.assertEqual(0, self.mock.init_connection.call_count) + + def test_accept_connection(self): + sock = Mock(spec=socket.SocketType) + sock.accept.return_value = (sentinel.sock, sentinel.addr) + self.mock.server_socket = sock + + sock, addr = network.Server.accept_connection(self.mock) + self.assertEqual(sentinel.sock, sock) + self.assertEqual(sentinel.addr, addr) + + def test_accept_connection_recoverable_error(self): + sock = Mock(spec=socket.SocketType) + self.mock.server_socket = sock + + for error in (errno.EAGAIN, errno.EINTR): + sock.accept.side_effect = socket.error(error, '') + self.assertRaises(network.ShouldRetrySocketCall, + network.Server.accept_connection, self.mock) + + # FIXME decide if this should be allowed to propegate + def test_accept_connection_unrecoverable_error(self): + sock = Mock(spec=socket.SocketType) + self.mock.server_socket = sock + sock.accept.side_effect = socket.error + self.assertRaises(socket.error, + network.Server.accept_connection, self.mock) + + def test_maximum_connections_exceeded(self): + self.mock.max_connections = 10 + + self.mock.number_of_connections.return_value = 11 + self.assertTrue(network.Server.maximum_connections_exceeded(self.mock)) + + self.mock.number_of_connections.return_value = 10 + self.assertTrue(network.Server.maximum_connections_exceeded(self.mock)) + + self.mock.number_of_connections.return_value = 9 + self.assertFalse(network.Server.maximum_connections_exceeded(self.mock)) + + @patch('pykka.registry.ActorRegistry.get_by_class') + def test_number_of_connections(self, get_by_class): + self.mock.protocol = sentinel.protocol + + get_by_class.return_value = [1, 2, 3] + self.assertEqual(3, network.Server.number_of_connections(self.mock)) + + get_by_class.return_value = [] + self.assertEqual(0, network.Server.number_of_connections(self.mock)) + + @patch.object(network, 'Connection', new=Mock()) + def test_init_connection(self): + self.mock.protocol = sentinel.protocol + self.mock.timeout = sentinel.timeout + + network.Server.init_connection(self.mock, sentinel.sock, sentinel.addr) + network.Connection.assert_called_once_with(sentinel.protocol, + sentinel.sock, sentinel.addr, sentinel.timeout) + + def test_reject_connection(self): + sock = Mock(spec=socket.SocketType) + + network.Server.reject_connection(self.mock, sock, + (sentinel.host, sentinel.port)) + sock.close.assert_called_once_with() + + def test_reject_connection_error(self): + sock = Mock(spec=socket.SocketType) + sock.close.side_effect = socket.error + + network.Server.reject_connection(self.mock, sock, + (sentinel.host, sentinel.port)) + sock.close.assert_called_once_with() diff --git a/tests/utils/network_test.py b/tests/utils/network/utils_test.py similarity index 61% rename from tests/utils/network_test.py rename to tests/utils/network/utils_test.py index 66229036..ada1de01 100644 --- a/tests/utils/network_test.py +++ b/tests/utils/network/utils_test.py @@ -1,52 +1,52 @@ -import mock import socket import unittest from mopidy.utils import network +from mock import patch, Mock from tests import SkipTest class FormatHostnameTest(unittest.TestCase): - @mock.patch('mopidy.utils.network.has_ipv6', True) + @patch('mopidy.utils.network.has_ipv6', True) def test_format_hostname_prefixes_ipv4_addresses_when_ipv6_available(self): network.has_ipv6 = True self.assertEqual(network.format_hostname('0.0.0.0'), '::ffff:0.0.0.0') self.assertEqual(network.format_hostname('1.0.0.1'), '::ffff:1.0.0.1') - @mock.patch('mopidy.utils.network.has_ipv6', False) + @patch('mopidy.utils.network.has_ipv6', False) def test_format_hostname_does_nothing_when_only_ipv4_available(self): network.has_ipv6 = False - self.assertEquals(network.format_hostname('0.0.0.0'), '0.0.0.0') + self.assertEqual(network.format_hostname('0.0.0.0'), '0.0.0.0') class TryIPv6SocketTest(unittest.TestCase): - @mock.patch('socket.has_ipv6', False) + @patch('socket.has_ipv6', False) def test_system_that_claims_no_ipv6_support(self): - self.assertFalse(network._try_ipv6_socket()) + self.assertFalse(network.try_ipv6_socket()) - @mock.patch('socket.has_ipv6', True) - @mock.patch('socket.socket') + @patch('socket.has_ipv6', True) + @patch('socket.socket') def test_system_with_broken_ipv6(self, socket_mock): socket_mock.side_effect = IOError() - self.assertFalse(network._try_ipv6_socket()) + self.assertFalse(network.try_ipv6_socket()) - @mock.patch('socket.has_ipv6', True) - @mock.patch('socket.socket') + @patch('socket.has_ipv6', True) + @patch('socket.socket') def test_with_working_ipv6(self, socket_mock): - socket_mock.return_value = mock.Mock() - self.assertTrue(network._try_ipv6_socket()) + socket_mock.return_value = Mock() + self.assertTrue(network.try_ipv6_socket()) class CreateSocketTest(unittest.TestCase): - @mock.patch('mopidy.utils.network.has_ipv6', False) - @mock.patch('socket.socket') + @patch('mopidy.utils.network.has_ipv6', False) + @patch('socket.socket') def test_ipv4_socket(self, socket_mock): network.create_socket() self.assertEqual(socket_mock.call_args[0], (socket.AF_INET, socket.SOCK_STREAM)) - @mock.patch('mopidy.utils.network.has_ipv6', True) - @mock.patch('socket.socket') + @patch('mopidy.utils.network.has_ipv6', True) + @patch('socket.socket') def test_ipv6_socket(self, socket_mock): network.create_socket() self.assertEqual(socket_mock.call_args[0],