Merge branch 'develop' into feature/xdg-folders

This commit is contained in:
Thomas Adamcik 2011-07-17 23:58:42 +02:00
commit 728fbe7fe3
21 changed files with 1402 additions and 201 deletions

View File

@ -32,6 +32,10 @@ v0.6.0 (in development)
- Add Listener API, :mod:`mopidy.listeners`, to be implemented by actors
wanting to receive events from the backend. This is a formalization of the
ad hoc events the Last.fm scrobbler has already been using for some time.
- Replaced all of the MPD network code that was provided by asyncore with
custom stack. This change was made to facilitate the future support of the
`idle` command, and to reduce the number of event loops being used.
- Fix metadata update in Shoutcast streaming (Fixes: :issue:`122`)
v0.5.0 (2011-06-15)

View File

@ -3,7 +3,9 @@ import optparse
import os
import signal
import sys
import time
import gobject
gobject.threads_init()
# Extract any non-GStreamer arguments, and leave the GStreamer arguments for
# processing by GStreamer. This needs to be done before GStreamer is imported,
@ -24,26 +26,25 @@ from mopidy.gstreamer import GStreamer
from mopidy.utils import get_class
from mopidy.utils.log import setup_logging
from mopidy.utils.path import get_or_create_folder, get_or_create_file
from mopidy.utils.process import (GObjectEventThread, exit_handler,
stop_remaining_actors, stop_actors_by_class)
from mopidy.utils.process import (exit_handler, stop_remaining_actors,
stop_actors_by_class)
from mopidy.utils.settings import list_settings_optparse_callback
logger = logging.getLogger('mopidy.core')
def main():
signal.signal(signal.SIGTERM, exit_handler)
loop = gobject.MainLoop()
try:
options = parse_options()
setup_logging(options.verbosity_level, options.save_debug_log)
check_old_folders()
setup_settings(options.interactive)
setup_gobject_loop()
setup_gstreamer()
setup_mixer()
setup_backend()
setup_frontends()
while True:
time.sleep(1)
loop.run()
except SettingsError as e:
logger.error(e.message)
except KeyboardInterrupt:
@ -51,6 +52,7 @@ def main():
except Exception as e:
logger.exception(e)
finally:
loop.quit()
stop_frontends()
stop_backend()
stop_mixer()
@ -99,9 +101,6 @@ def setup_settings(interactive):
logger.error(e.message)
sys.exit(1)
def setup_gobject_loop():
GObjectEventThread().start()
def setup_gstreamer():
GStreamer.start()

View File

@ -1,10 +1,11 @@
import asyncore
import logging
import sys
from pykka.actor import ThreadingActor
from mopidy.frontends.mpd.server import MpdServer
from mopidy.utils.process import BaseThread
from mopidy import settings
from mopidy.frontends.mpd import dispatcher, protocol
from mopidy.utils import network, process, log
logger = logging.getLogger('mopidy.frontends.mpd')
@ -24,23 +25,50 @@ class MpdFrontend(ThreadingActor):
"""
def __init__(self):
self._thread = None
hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME)
port = settings.MPD_SERVER_PORT
try:
network.Server(hostname, port, protocol=MpdSession)
except IOError, e:
logger.error(u'MPD server startup failed: %s', e)
sys.exit(1)
logger.info(u'MPD server running at [%s]:%s', hostname, port)
def on_stop(self):
process.stop_actors_by_class(MpdSession)
class MpdSession(network.LineProtocol):
"""
The MPD client session. Keeps track of a single client session. Any
requests from the client is passed on to the MPD request dispatcher.
"""
terminator = protocol.LINE_TERMINATOR
encoding = protocol.ENCODING
def __init__(self, client):
super(MpdSession, self).__init__(client)
self.dispatcher = dispatcher.MpdDispatcher(self)
def on_start(self):
self._thread = MpdThread()
self._thread.start()
logger.info(u'New MPD connection from [%s]:%s', self.host, self.port)
self.send_lines([u'OK MPD %s' % protocol.VERSION])
def on_receive(self, message):
pass # Ignore any messages
def on_line_received(self, line):
logger.debug(u'Request from [%s]:%s to %s: %s', self.host, self.port,
self.actor_urn, line)
response = self.dispatcher.handle_request(line)
if not response:
return
class MpdThread(BaseThread):
def __init__(self):
super(MpdThread, self).__init__()
self.name = u'MpdThread'
logger.debug(u'Response to [%s]:%s from %s: %s', self.host, self.port,
self.actor_urn, log.indent(self.terminator.join(response)))
self.send_lines(response)
def run_inside_try(self):
logger.debug(u'Starting MPD server thread')
server = MpdServer()
server.start()
asyncore.loop()
def close(self):
self.stop()

View File

@ -178,7 +178,7 @@ class MpdContext(object):
#: The current :class:`MpdDispatcher`.
dispatcher = None
#: The current :class:`mopidy.frontends.mpd.session.MpdSession`.
#: The current :class:`mopidy.frontends.mpd.MpdSession`.
session = None
def __init__(self, dispatcher, session=None):

View File

@ -1,38 +0,0 @@
import asyncore
import logging
import sys
from mopidy import settings
from mopidy.utils import network
from .session import MpdSession
logger = logging.getLogger('mopidy.frontends.mpd.server')
class MpdServer(asyncore.dispatcher):
"""
The MPD server. Creates a :class:`mopidy.frontends.mpd.session.MpdSession`
for each client connection.
"""
def start(self):
"""Start MPD server."""
try:
self.set_socket(network.create_socket())
self.set_reuse_addr()
hostname = network.format_hostname(settings.MPD_SERVER_HOSTNAME)
port = settings.MPD_SERVER_PORT
logger.debug(u'MPD server is binding to [%s]:%s', hostname, port)
self.bind((hostname, port))
self.listen(1)
logger.info(u'MPD server running at [%s]:%s', hostname, port)
except IOError, e:
logger.error(u'MPD server startup failed: %s' %
str(e).decode('utf-8'))
sys.exit(1)
def handle_accept(self):
"""Called by asyncore when a new client connects."""
(client_socket, client_socket_address) = self.accept()
logger.info(u'MPD client connection from [%s]:%s',
client_socket_address[0], client_socket_address[1])
MpdSession(self, client_socket, client_socket_address)

View File

@ -1,58 +0,0 @@
import asynchat
import logging
from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR, VERSION
from mopidy.utils.log import indent
logger = logging.getLogger('mopidy.frontends.mpd.session')
class MpdSession(asynchat.async_chat):
"""
The MPD client session. Keeps track of a single client session. Any
requests from the client is passed on to the MPD request dispatcher.
"""
def __init__(self, server, client_socket, client_socket_address):
asynchat.async_chat.__init__(self, sock=client_socket)
self.server = server
self.client_address = client_socket_address[0]
self.client_port = client_socket_address[1]
self.input_buffer = []
self.authenticated = False
self.set_terminator(LINE_TERMINATOR.encode(ENCODING))
self.dispatcher = MpdDispatcher(session=self)
self.send_response([u'OK MPD %s' % VERSION])
def collect_incoming_data(self, data):
"""Called by asynchat when new data arrives."""
self.input_buffer.append(data)
def found_terminator(self):
"""Called by asynchat when a terminator is found in incoming data."""
data = ''.join(self.input_buffer).strip()
self.input_buffer = []
try:
self.send_response(self.handle_request(data))
except UnicodeDecodeError as e:
logger.warning(u'Received invalid data: %s', e)
def handle_request(self, request):
"""Handle the request using the MPD command handlers."""
request = request.decode(ENCODING)
logger.debug(u'Request from [%s]:%s: %s', self.client_address,
self.client_port, indent(request))
return self.dispatcher.handle_request(request)
def send_response(self, response):
"""
Format a response from the MPD command handlers and send it to the
client.
"""
if response:
response = LINE_TERMINATOR.join(response)
logger.debug(u'Response to [%s]:%s: %s', self.client_address,
self.client_port, indent(response))
response = u'%s%s' % (response, LINE_TERMINATOR)
data = response.encode(ENCODING)
self.push(data)

View File

@ -43,9 +43,6 @@ class GStreamer(ThreadingActor):
self._handlers = {}
def on_start(self):
# **Warning:** :class:`GStreamer` requires
# :class:`mopidy.utils.process.GObjectEventThread` to be running. This
# is not enforced by :class:`GStreamer` itself.
self._setup_pipeline()
self._setup_outputs()
self._setup_message_processor()
@ -277,10 +274,18 @@ class GStreamer(ThreadingActor):
taglist = gst.TagList()
artists = [a for a in (track.artists or []) if a.name]
# Default to blank data to trick shoutcast into clearing any previous
# values it might have.
taglist[gst.TAG_ARTIST] = u' '
taglist[gst.TAG_TITLE] = u' '
taglist[gst.TAG_ALBUM] = u' '
if artists:
taglist[gst.TAG_ARTIST] = u', '.join([a.name for a in artists])
if track.name:
taglist[gst.TAG_TITLE] = track.name
if track.album and track.album.name:
taglist[gst.TAG_ALBUM] = track.album.name

View File

@ -18,9 +18,11 @@ def import_module(name):
return sys.modules[name]
def get_class(name):
logger.debug('Loading: %s', name)
if '.' not in name:
raise ImportError("Couldn't load: %s" % name)
module_name = name[:name.rindex('.')]
class_name = name[name.rindex('.') + 1:]
logger.debug('Loading: %s', name)
try:
module = import_module(module_name)
class_object = getattr(module, class_name)

View File

@ -1,10 +1,20 @@
import errno
import gobject
import logging
import re
import socket
import threading
from pykka import ActorDeadError
from pykka.actor import ThreadingActor
from pykka.registry import ActorRegistry
logger = logging.getLogger('mopidy.utils.server')
def _try_ipv6_socket():
class ShouldRetrySocketCall(Exception):
"""Indicate that attempted socket call should be retried"""
def try_ipv6_socket():
"""Determine if system really supports IPv6"""
if not socket.has_ipv6:
return False
@ -17,7 +27,7 @@ def _try_ipv6_socket():
return False
#: Boolean value that indicates if creating an IPv6 socket will succeed.
has_ipv6 = _try_ipv6_socket()
has_ipv6 = try_ipv6_socket()
def create_socket():
"""Create a TCP socket with or without IPv6 depending on system support"""
@ -27,6 +37,7 @@ def create_socket():
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return sock
def format_hostname(hostname):
@ -34,3 +45,330 @@ def format_hostname(hostname):
if (has_ipv6 and re.match('\d+.\d+.\d+.\d+', hostname) is not None):
hostname = '::ffff:%s' % hostname
return hostname
class Server(object):
"""Setup listener and register it with gobject's event loop."""
def __init__(self, host, port, protocol, max_connections=5, timeout=30):
self.protocol = protocol
self.max_connections = max_connections
self.timeout = timeout
self.server_socket = self.create_server_socket(host, port)
self.register_server_socket(self.server_socket.fileno())
def create_server_socket(self, host, port):
sock = create_socket()
sock.setblocking(False)
sock.bind((host, port))
sock.listen(1)
return sock
def register_server_socket(self, fileno):
gobject.io_add_watch(fileno, gobject.IO_IN, self.handle_connection)
def handle_connection(self, fd, flags):
try:
sock, addr = self.accept_connection()
except ShouldRetrySocketCall:
return True
if self.maximum_connections_exceeded():
self.reject_connection(sock, addr)
else:
self.init_connection(sock, addr)
return True
def accept_connection(self):
try:
return self.server_socket.accept()
except socket.error as e:
if e.errno in (errno.EAGAIN, errno.EINTR):
raise ShouldRetrySocketCall
raise
def maximum_connections_exceeded(self):
return (self.max_connections is not None and
self.number_of_connections() >= self.max_connections)
def number_of_connections(self):
return len(ActorRegistry.get_by_class(self.protocol))
def reject_connection(self, sock, addr):
# FIXME provide more context in logging?
logger.warning(u'Rejected connection from [%s]:%s', addr[0], addr[1])
try:
sock.close()
except socket.error:
pass
def init_connection(self, sock, addr):
Connection(self.protocol, sock, addr, self.timeout)
class Connection(object):
# NOTE: the callback code is _not_ run in the actor's thread, but in the
# same one as the event loop. If code in the callbacks blocks, the rest of
# gobject code will likely be blocked as well...
#
# Also note that source_remove() return values are ignored on purpose, a
# false return value would only tell us that what we thought was registered
# is already gone, there is really nothing more we can do.
def __init__(self, protocol, sock, addr, timeout):
sock.setblocking(False)
self.host, self.port = addr[:2] # IPv6 has larger addr
self.sock = sock
self.protocol = protocol
self.timeout = timeout
self.send_lock = threading.Lock()
self.send_buffer = ''
self.stopping = False
self.recv_id = None
self.send_id = None
self.timeout_id = None
self.actor_ref = self.protocol.start(self)
self.enable_recv()
self.enable_timeout()
def stop(self, reason, level=logging.DEBUG):
if self.stopping:
logger.log(level, 'Already stopping: %s' % reason)
return
else:
self.stopping = True
logger.log(level, reason)
try:
self.actor_ref.stop()
except ActorDeadError:
pass
self.disable_timeout()
self.disable_recv()
self.disable_send()
try:
self.sock.close()
except socket.error:
pass
def send(self, data):
"""Send data to client exactly as is."""
self.send_lock.acquire(True)
self.send_buffer += data
self.send_lock.release()
self.enable_send()
def enable_timeout(self):
"""Reactivate timeout mechanism."""
if self.timeout <= 0:
return
self.disable_timeout()
self.timeout_id = gobject.timeout_add_seconds(
self.timeout, self.timeout_callback)
def disable_timeout(self):
"""Deactivate timeout mechanism."""
if self.timeout_id is None:
return
gobject.source_remove(self.timeout_id)
self.timeout_id = None
def enable_recv(self):
if self.recv_id is not None:
return
try:
self.recv_id = gobject.io_add_watch(self.sock.fileno(),
gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP,
self.recv_callback)
except socket.error as e:
self.stop(u'Problem with connection: %s' % e)
def disable_recv(self):
if self.recv_id is None:
return
gobject.source_remove(self.recv_id)
self.recv_id = None
def enable_send(self):
if self.send_id is not None:
return
try:
self.send_id = gobject.io_add_watch(self.sock.fileno(),
gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP,
self.send_callback)
except socket.error as e:
self.stop(u'Problem with connection: %s' % e)
def disable_send(self):
if self.send_id is None:
return
gobject.source_remove(self.send_id)
self.send_id = None
def recv_callback(self, fd, flags):
if flags & (gobject.IO_ERR | gobject.IO_HUP):
self.stop(u'Bad client flags: %s' % flags)
return True
try:
data = self.sock.recv(4096)
except socket.error as e:
if e.errno not in (errno.EWOULDBLOCK, errno.EINTR):
self.stop(u'Unexpected client error: %s' % e)
return True
if not data:
self.stop(u'Client most likely disconnected.')
return True
try:
self.actor_ref.send_one_way({'received': data})
except ActorDeadError:
self.stop(u'Actor is dead.')
return True
def send_callback(self, fd, flags):
if flags & (gobject.IO_ERR | gobject.IO_HUP):
self.stop(u'Bad client flags: %s' % flags)
return True
# If with can't get the lock, simply try again next time socket is
# ready for sending.
if not self.send_lock.acquire(False):
return True
try:
sent = self.sock.send(self.send_buffer)
self.send_buffer = self.send_buffer[sent:]
if not self.send_buffer:
self.disable_send()
except socket.error as e:
if e.errno not in (errno.EWOULDBLOCK, errno.EINTR):
self.stop(u'Unexpected client error: %s' % e)
finally:
self.send_lock.release()
return True
def timeout_callback(self):
self.stop(u'Client timeout out after %s seconds' % self.timeout)
return False
class LineProtocol(ThreadingActor):
"""
Base class for handling line based protocols.
Takes care of receiving new data from server's client code, decoding and
then splitting data along line boundaries.
"""
#: What terminator to use to split lines.
terminator = '\n'
#: What encoding to expect incomming data to be in, can be :class:`None`.
encoding = 'utf-8'
def __init__(self, connection):
self.connection = connection
self.recv_buffer = ''
@property
def host(self):
return self.connection.host
@property
def port(self):
return self.connection.port
def on_line_received(self, line):
"""
Called whenever a new line is found.
Should be implemented by subclasses.
"""
raise NotImplementedError
def on_receive(self, message):
"""Handle messages with new data from server."""
if 'received' not in message:
return
self.connection.disable_timeout()
self.recv_buffer += message['received']
for line in self.parse_lines():
line = self.decode(line)
self.on_line_received(line)
self.connection.enable_timeout()
def on_stop(self):
"""Ensure that cleanup when actor stops."""
self.connection.stop(u'Actor is shutting down.')
def parse_lines(self):
"""Consume new data and yield any lines found."""
while re.search(self.terminator, self.recv_buffer):
line, self.recv_buffer = re.split(self.terminator,
self.recv_buffer, 1)
yield line
def encode(self, line):
"""
Handle encoding of line.
Can be overridden by subclasses to change encoding behaviour.
"""
try:
return line.encode(self.encoding)
except UnicodeError:
logger.warning(u'Stopping actor due to encode problem, data '
'supplied by client was not valid %s', self.encoding)
self.stop()
def decode(self, line):
"""
Handle decoding of line.
Can be overridden by subclasses to change decoding behaviour.
"""
try:
return line.decode(self.encoding)
except UnicodeError:
logger.warning(u'Stopping actor due to decode problem, data '
'supplied by client was not valid %s', self.encoding)
self.stop()
def join_lines(self, lines):
if not lines:
return u''
return self.terminator.join(lines) + self.terminator
def send_lines(self, lines):
"""
Send array of lines to client via connection.
Join lines using the terminator that is set for this class, encode it
and send it to the client.
"""
if not lines:
return
data = self.join_lines(lines)
self.connection.send(self.encode(data))

View File

@ -3,9 +3,6 @@ import signal
import thread
import threading
import gobject
gobject.threads_init()
from pykka import ActorDeadError
from pykka.registry import ActorRegistry
@ -68,25 +65,3 @@ class BaseThread(threading.Thread):
def run_inside_try(self):
raise NotImplementedError
class GObjectEventThread(BaseThread):
"""
A GObject event loop which is shared by all Mopidy components that uses
libraries that need a GObject event loop, like GStreamer and D-Bus.
Should be started by Mopidy's core and used by
:mod:`mopidy.output.gstreamer`, :mod:`mopidy.frontend.mpris`, etc.
"""
def __init__(self):
super(GObjectEventThread, self).__init__()
self.name = u'GObjectEventThread'
self.loop = None
def run_inside_try(self):
self.loop = gobject.MainLoop().run()
def destroy(self):
self.loop.quit()
super(GObjectEventThread, self).destroy()

View File

@ -1,4 +1,4 @@
coverage
mock
mock >= 0.7
nose
tox

View File

@ -22,3 +22,22 @@ def path_to_data_dir(name):
path = os.path.abspath(path)
return os.path.join(path, name)
class IsA(object):
def __init__(self, klass):
self.klass = klass
def __eq__(self, rhs):
try:
return isinstance(rhs, self.klass)
except TypeError:
return type(rhs) == type(self.klass)
def __ne__(self, rhs):
return not self.__eq__(rhs)
def __repr__(self):
return str(self.klass)
any_int = IsA(int)
any_str = IsA(str)
any_unicode = IsA(unicode)

View File

@ -2,8 +2,8 @@ import mock
import unittest
from mopidy import settings
from mopidy.frontends.mpd import MpdSession
from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.session import MpdSession
class AuthenticationTest(unittest.TestCase):
def setUp(self):

View File

@ -3,8 +3,8 @@ import unittest
from mopidy import settings
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import MpdSession
from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.session import MpdSession
from mopidy.mixers.dummy import DummyMixer
class ConnectionHandlerTest(unittest.TestCase):

View File

@ -1,23 +0,0 @@
import unittest
from mopidy import settings
from mopidy.backends.dummy import DummyBackend
from mopidy.frontends.mpd import server
from mopidy.mixers.dummy import DummyMixer
class MpdSessionTest(unittest.TestCase):
def setUp(self):
self.backend = DummyBackend.start().proxy()
self.mixer = DummyMixer.start().proxy()
self.session = server.MpdSession(None, None, (None, None))
def tearDown(self):
self.backend.stop().get()
self.mixer.stop().get()
settings.runtime.clear()
def test_found_terminator_catches_decode_error(self):
# Pressing Ctrl+C in a telnet session sends a 0xff byte to the server.
self.session.input_buffer = ['\xff']
self.session.found_terminator()
self.assertEqual(len(self.session.input_buffer), 0)

View File

@ -4,12 +4,13 @@ from mopidy.utils import get_class
class GetClassTest(unittest.TestCase):
def test_loading_module_that_does_not_exist(self):
test = lambda: get_class('foo.bar.Baz')
self.assertRaises(ImportError, test)
self.assertRaises(ImportError, get_class, 'foo.bar.Baz')
def test_loading_class_that_does_not_exist(self):
test = lambda: get_class('unittest.FooBarBaz')
self.assertRaises(ImportError, test)
self.assertRaises(ImportError, get_class, 'unittest.FooBarBaz')
def test_loading_incorrect_class_path(self):
self.assertRaises(ImportError, get_class, 'foobarbaz')
def test_import_error_message_contains_complete_class_path(self):
try:

View File

View File

@ -0,0 +1,524 @@
import errno
import gobject
import logging
import pykka
import socket
import unittest
from mopidy.utils import network
from mock import patch, sentinel, Mock
from tests import any_int, any_unicode
class ConnectionTest(unittest.TestCase):
def setUp(self):
self.mock = Mock(spec=network.Connection)
def test_init_ensure_nonblocking_io(self):
sock = Mock(spec=socket.SocketType)
network.Connection.__init__(self.mock, Mock(), sock,
(sentinel.host, sentinel.port), sentinel.timeout)
sock.setblocking.assert_called_once_with(False)
def test_init_starts_actor(self):
protocol = Mock(spec=network.LineProtocol)
network.Connection.__init__(self.mock, protocol, Mock(),
(sentinel.host, sentinel.port), sentinel.timeout)
protocol.start.assert_called_once_with(self.mock)
def test_init_enables_recv_and_timeout(self):
network.Connection.__init__(self.mock, Mock(), Mock(),
(sentinel.host, sentinel.port), sentinel.timeout)
self.mock.enable_recv.assert_called_once_with()
self.mock.enable_timeout.assert_called_once_with()
def test_init_stores_values_in_attributes(self):
addr = (sentinel.host, sentinel.port)
protocol = Mock(spec=network.LineProtocol)
sock = Mock(spec=socket.SocketType)
network.Connection.__init__(
self.mock, protocol, sock, addr, sentinel.timeout)
self.assertEqual(sock, self.mock.sock)
self.assertEqual(protocol, self.mock.protocol)
self.assertEqual(sentinel.timeout, self.mock.timeout)
self.assertEqual(sentinel.host, self.mock.host)
self.assertEqual(sentinel.port, self.mock.port)
def test_init_handles_ipv6_addr(self):
addr = (sentinel.host, sentinel.port,
sentinel.flowinfo, sentinel.scopeid)
protocol = Mock(spec=network.LineProtocol)
sock = Mock(spec=socket.SocketType)
network.Connection.__init__(
self.mock, protocol, sock, addr, sentinel.timeout)
self.assertEqual(sentinel.host, self.mock.host)
self.assertEqual(sentinel.port, self.mock.port)
def test_stop_disables_recv_send_and_timeout(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
self.mock.disable_timeout.assert_called_once_with()
self.mock.disable_recv.assert_called_once_with()
self.mock.disable_send.assert_called_once_with()
def test_stop_closes_socket(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
self.mock.sock.close.assert_called_once_with()
def test_stop_closes_socket_error(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.close.side_effect = socket.error
network.Connection.stop(self.mock, sentinel.reason)
self.mock.sock.close.assert_called_once_with()
def test_stop_stops_actor(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
self.mock.actor_ref.stop.assert_called_once_with()
def test_stop_handles_actor_already_being_stopped(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.actor_ref.stop.side_effect = pykka.ActorDeadError()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
self.mock.actor_ref.stop.assert_called_once_with()
def test_stop_sets_stopping_to_true(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
self.assertEqual(True, self.mock.stopping)
def test_stop_does_not_proceed_when_already_stopping(self):
self.mock.stopping = True
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
self.assertEqual(0, self.mock.actor_ref.stop.call_count)
self.assertEqual(0, self.mock.sock.close.call_count)
@patch.object(network.logger, 'log', new=Mock())
def test_stop_logs_reason(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
network.logger.log.assert_called_once_with(
logging.DEBUG, sentinel.reason)
@patch.object(network.logger, 'log', new=Mock())
def test_stop_logs_reason_with_level(self):
self.mock.stopping = False
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason,
level=sentinel.level)
network.logger.log.assert_called_once_with(
sentinel.level, sentinel.reason)
@patch.object(network.logger, 'log', new=Mock())
def test_stop_logs_that_it_is_calling_itself(self):
self.mock.stopping = True
self.mock.actor_ref = Mock()
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.stop(self.mock, sentinel.reason)
network.logger.log(any_int, any_unicode)
@patch.object(gobject, 'io_add_watch', new=Mock())
def test_enable_recv_registers_with_gobject(self):
self.mock.recv_id = None
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.fileno.return_value = sentinel.fileno
gobject.io_add_watch.return_value = sentinel.tag
network.Connection.enable_recv(self.mock)
gobject.io_add_watch.assert_called_once_with(sentinel.fileno,
gobject.IO_IN | gobject.IO_ERR | gobject.IO_HUP,
self.mock.recv_callback)
self.assertEqual(sentinel.tag, self.mock.recv_id)
@patch.object(gobject, 'io_add_watch', new=Mock())
def test_enable_recv_already_registered(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.recv_id = sentinel.tag
network.Connection.enable_recv(self.mock)
self.assertEqual(0, gobject.io_add_watch.call_count)
def test_enable_recv_does_not_change_tag(self):
self.mock.recv_id = sentinel.tag
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.enable_recv(self.mock)
self.assertEqual(sentinel.tag, self.mock.recv_id)
@patch.object(gobject, 'source_remove', new=Mock())
def test_disable_recv_deregisters(self):
self.mock.recv_id = sentinel.tag
network.Connection.disable_recv(self.mock)
gobject.source_remove.assert_called_once_with(sentinel.tag)
self.assertEqual(None, self.mock.recv_id)
@patch.object(gobject, 'source_remove', new=Mock())
def test_disable_recv_already_deregistered(self):
self.mock.recv_id = None
network.Connection.disable_recv(self.mock)
self.assertEqual(0, gobject.source_remove.call_count)
self.assertEqual(None, self.mock.recv_id)
def test_enable_recv_on_closed_socket(self):
self.mock.recv_id = None
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.fileno.side_effect = socket.error(errno.EBADF, '')
network.Connection.enable_recv(self.mock)
self.mock.stop.assert_called_once_with(any_unicode)
self.assertEqual(None, self.mock.recv_id)
@patch.object(gobject, 'io_add_watch', new=Mock())
def test_enable_send_registers_with_gobject(self):
self.mock.send_id = None
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.fileno.return_value = sentinel.fileno
gobject.io_add_watch.return_value = sentinel.tag
network.Connection.enable_send(self.mock)
gobject.io_add_watch.assert_called_once_with(sentinel.fileno,
gobject.IO_OUT | gobject.IO_ERR | gobject.IO_HUP,
self.mock.send_callback)
self.assertEqual(sentinel.tag, self.mock.send_id)
@patch.object(gobject, 'io_add_watch', new=Mock())
def test_enable_send_already_registered(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.send_id = sentinel.tag
network.Connection.enable_send(self.mock)
self.assertEqual(0, gobject.io_add_watch.call_count)
def test_enable_send_does_not_change_tag(self):
self.mock.send_id = sentinel.tag
self.mock.sock = Mock(spec=socket.SocketType)
network.Connection.enable_send(self.mock)
self.assertEqual(sentinel.tag, self.mock.send_id)
@patch.object(gobject, 'source_remove', new=Mock())
def test_disable_send_deregisters(self):
self.mock.send_id = sentinel.tag
network.Connection.disable_send(self.mock)
gobject.source_remove.assert_called_once_with(sentinel.tag)
self.assertEqual(None, self.mock.send_id)
@patch.object(gobject, 'source_remove', new=Mock())
def test_disable_send_already_deregistered(self):
self.mock.send_id = None
network.Connection.disable_send(self.mock)
self.assertEqual(0, gobject.source_remove.call_count)
self.assertEqual(None, self.mock.send_id)
def test_enable_send_on_closed_socket(self):
self.mock.send_id = None
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.fileno.side_effect = socket.error(errno.EBADF, '')
network.Connection.enable_send(self.mock)
self.assertEqual(None, self.mock.send_id)
@patch.object(gobject, 'timeout_add_seconds', new=Mock())
def test_enable_timeout_clears_existing_timeouts(self):
self.mock.timeout = 10
network.Connection.enable_timeout(self.mock)
self.mock.disable_timeout.assert_called_once_with()
@patch.object(gobject, 'timeout_add_seconds', new=Mock())
def test_enable_timeout_add_gobject_timeout(self):
self.mock.timeout = 10
gobject.timeout_add_seconds.return_value = sentinel.tag
network.Connection.enable_timeout(self.mock)
gobject.timeout_add_seconds.assert_called_once_with(10,
self.mock.timeout_callback)
self.assertEqual(sentinel.tag, self.mock.timeout_id)
@patch.object(gobject, 'timeout_add_seconds', new=Mock())
def test_enable_timeout_does_not_add_timeout(self):
self.mock.timeout = 0
network.Connection.enable_timeout(self.mock)
self.assertEqual(0, gobject.timeout_add_seconds.call_count)
self.mock.timeout = -1
network.Connection.enable_timeout(self.mock)
self.assertEqual(0, gobject.timeout_add_seconds.call_count)
self.mock.timeout = None
network.Connection.enable_timeout(self.mock)
self.assertEqual(0, gobject.timeout_add_seconds.call_count)
def test_enable_timeout_does_not_call_disable_for_invalid_timeout(self):
self.mock.timeout = 0
network.Connection.enable_timeout(self.mock)
self.assertEqual(0, self.mock.disable_timeout.call_count)
self.mock.timeout = -1
network.Connection.enable_timeout(self.mock)
self.assertEqual(0, self.mock.disable_timeout.call_count)
self.mock.timeout = None
network.Connection.enable_timeout(self.mock)
self.assertEqual(0, self.mock.disable_timeout.call_count)
@patch.object(gobject, 'source_remove', new=Mock())
def test_disable_timeout_deregisters(self):
self.mock.timeout_id = sentinel.tag
network.Connection.disable_timeout(self.mock)
gobject.source_remove.assert_called_once_with(sentinel.tag)
self.assertEqual(None, self.mock.timeout_id)
@patch.object(gobject, 'source_remove', new=Mock())
def test_disable_timeout_already_deregistered(self):
self.mock.timeout_id = None
network.Connection.disable_timeout(self.mock)
self.assertEqual(0, gobject.source_remove.call_count)
self.assertEqual(None, self.mock.timeout_id)
def test_send_acquires_and_releases_lock(self):
self.mock.send_lock = Mock()
self.mock.send_buffer = ''
network.Connection.send(self.mock, 'data')
self.mock.send_lock.acquire.assert_called_once_with(True)
self.mock.send_lock.release.assert_called_once_with()
def test_send_appends_to_send_buffer(self):
self.mock.send_lock = Mock()
self.mock.send_buffer = ''
network.Connection.send(self.mock, 'abc')
self.assertEqual('abc', self.mock.send_buffer)
network.Connection.send(self.mock, 'def')
self.assertEqual('abcdef', self.mock.send_buffer)
network.Connection.send(self.mock, '')
self.assertEqual('abcdef', self.mock.send_buffer)
def test_send_calls_enable_send(self):
self.mock.send_lock = Mock()
self.mock.send_buffer = ''
network.Connection.send(self.mock, 'data')
self.mock.enable_send.assert_called_once_with()
def test_recv_callback_respects_io_err(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.actor_ref = Mock()
self.assertTrue(network.Connection.recv_callback(self.mock,
sentinel.fd, gobject.IO_IN | gobject.IO_ERR))
self.mock.stop.assert_called_once_with(any_unicode)
def test_recv_callback_respects_io_hup(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.actor_ref = Mock()
self.assertTrue(network.Connection.recv_callback(self.mock,
sentinel.fd, gobject.IO_IN | gobject.IO_HUP))
self.mock.stop.assert_called_once_with(any_unicode)
def test_recv_callback_respects_io_hup_and_io_err(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.actor_ref = Mock()
self.assertTrue(network.Connection.recv_callback(self.mock,
sentinel.fd, gobject.IO_IN | gobject.IO_HUP | gobject.IO_ERR))
self.mock.stop.assert_called_once_with(any_unicode)
def test_recv_callback_sends_data_to_actor(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.recv.return_value = 'data'
self.mock.actor_ref = Mock()
self.assertTrue(network.Connection.recv_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.actor_ref.send_one_way.assert_called_once_with(
{'received': 'data'})
def test_recv_callback_handles_dead_actors(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.recv.return_value = 'data'
self.mock.actor_ref = Mock()
self.mock.actor_ref.send_one_way.side_effect = pykka.ActorDeadError()
self.assertTrue(network.Connection.recv_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.stop.assert_called_once_with(any_unicode)
def test_recv_callback_gets_no_data(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.recv.return_value = ''
self.mock.actor_ref = Mock()
self.assertTrue(network.Connection.recv_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.stop.assert_called_once_with(any_unicode)
def test_recv_callback_recoverable_error(self):
self.mock.sock = Mock(spec=socket.SocketType)
for error in (errno.EWOULDBLOCK, errno.EINTR):
self.mock.sock.recv.side_effect = socket.error(error, '')
self.assertTrue(network.Connection.recv_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.assertEqual(0, self.mock.stop.call_count)
def test_recv_callback_unrecoverable_error(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.recv.side_effect = socket.error
self.assertTrue(network.Connection.recv_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.stop.assert_called_once_with(any_unicode)
def test_send_callback_respects_io_err(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.return_value = 1
self.mock.send_lock = Mock()
self.mock.actor_ref = Mock()
self.mock.send_buffer = ''
self.assertTrue(network.Connection.send_callback(self.mock,
sentinel.fd, gobject.IO_IN | gobject.IO_ERR))
self.mock.stop.assert_called_once_with(any_unicode)
def test_send_callback_respects_io_hup(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.return_value = 1
self.mock.send_lock = Mock()
self.mock.actor_ref = Mock()
self.mock.send_buffer = ''
self.assertTrue(network.Connection.send_callback(self.mock,
sentinel.fd, gobject.IO_IN | gobject.IO_HUP))
self.mock.stop.assert_called_once_with(any_unicode)
def test_send_callback_respects_io_hup_and_io_err(self):
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.return_value = 1
self.mock.send_lock = Mock()
self.mock.actor_ref = Mock()
self.mock.send_buffer = ''
self.assertTrue(network.Connection.send_callback(self.mock,
sentinel.fd, gobject.IO_IN | gobject.IO_HUP | gobject.IO_ERR))
self.mock.stop.assert_called_once_with(any_unicode)
def test_send_callback_acquires_and_releases_lock(self):
self.mock.send_lock = Mock()
self.mock.send_lock.acquire.return_value = True
self.mock.send_buffer = ''
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.return_value = 0
self.assertTrue(network.Connection.send_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.send_lock.acquire.assert_called_once_with(False)
self.mock.send_lock.release.assert_called_once_with()
def test_send_callback_fails_to_acquire_lock(self):
self.mock.send_lock = Mock()
self.mock.send_lock.acquire.return_value = False
self.mock.send_buffer = ''
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.return_value = 0
self.assertTrue(network.Connection.send_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.send_lock.acquire.assert_called_once_with(False)
self.assertEqual(0, self.mock.sock.send.call_count)
def test_send_callback_sends_all_data(self):
self.mock.send_lock = Mock()
self.mock.send_lock.acquire.return_value = True
self.mock.send_buffer = 'data'
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.return_value = 4
self.assertTrue(network.Connection.send_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.disable_send.assert_called_once_with()
self.mock.sock.send.assert_called_once_with('data')
self.assertEqual('', self.mock.send_buffer)
def test_send_callback_sends_partial_data(self):
self.mock.send_lock = Mock()
self.mock.send_lock.acquire.return_value = True
self.mock.send_buffer = 'data'
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.return_value = 2
self.assertTrue(network.Connection.send_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.sock.send.assert_called_once_with('data')
self.assertEqual('ta', self.mock.send_buffer)
def test_send_callback_recoverable_error(self):
self.mock.send_lock = Mock()
self.mock.send_lock.acquire.return_value = True
self.mock.send_buffer = 'data'
self.mock.sock = Mock(spec=socket.SocketType)
for error in (errno.EWOULDBLOCK, errno.EINTR):
self.mock.sock.send.side_effect = socket.error(error, '')
self.assertTrue(network.Connection.send_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.assertEqual(0, self.mock.stop.call_count)
def test_send_callback_unrecoverable_error(self):
self.mock.send_lock = Mock()
self.mock.send_lock.acquire.return_value = True
self.mock.send_buffer = 'data'
self.mock.sock = Mock(spec=socket.SocketType)
self.mock.sock.send.side_effect = socket.error
self.assertTrue(network.Connection.send_callback(
self.mock, sentinel.fd, gobject.IO_IN))
self.mock.stop.assert_called_once_with(any_unicode)
def test_timeout_callback(self):
self.mock.timeout = 10
self.assertFalse(network.Connection.timeout_callback(self.mock))
self.mock.stop.assert_called_once_with(any_unicode)

View File

@ -0,0 +1,239 @@
#encoding: utf-8
import unittest
from mopidy.utils import network
from mock import sentinel, Mock
class LineProtocolTest(unittest.TestCase):
def setUp(self):
self.mock = Mock(spec=network.LineProtocol)
self.mock.terminator = network.LineProtocol.terminator
self.mock.encoding = network.LineProtocol.encoding
def test_init_stores_values_in_attributes(self):
network.LineProtocol.__init__(self.mock, sentinel.connection)
self.assertEqual(sentinel.connection, self.mock.connection)
self.assertEqual('', self.mock.recv_buffer)
def test_on_receive_no_new_lines_adds_to_recv_buffer(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.recv_buffer = ''
self.mock.parse_lines.return_value = []
network.LineProtocol.on_receive(self.mock, {'received': 'data'})
self.assertEqual('data', self.mock.recv_buffer)
self.mock.parse_lines.assert_called_once_with()
self.assertEqual(0, self.mock.on_line_received.call_count)
def test_on_receive_toggles_timeout(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.recv_buffer = ''
self.mock.parse_lines.return_value = []
network.LineProtocol.on_receive(self.mock, {'received': 'data'})
self.mock.connection.disable_timeout.assert_called_once_with()
self.mock.connection.enable_timeout.assert_called_once_with()
def test_on_receive_no_new_lines_calls_parse_lines(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.recv_buffer = ''
self.mock.parse_lines.return_value = []
network.LineProtocol.on_receive(self.mock, {'received': 'data'})
self.mock.parse_lines.assert_called_once_with()
self.assertEqual(0, self.mock.on_line_received.call_count)
def test_on_receive_with_new_line_calls_decode(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.recv_buffer = ''
self.mock.parse_lines.return_value = [sentinel.line]
network.LineProtocol.on_receive(self.mock, {'received': 'data\n'})
self.mock.parse_lines.assert_called_once_with()
self.mock.decode.assert_called_once_with(sentinel.line)
def test_on_receive_with_new_line_calls_on_recieve(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.recv_buffer = ''
self.mock.parse_lines.return_value = [sentinel.line]
self.mock.decode.return_value = sentinel.decoded
network.LineProtocol.on_receive(self.mock, {'received': 'data\n'})
self.mock.on_line_received.assert_called_once_with(sentinel.decoded)
def test_on_receive_with_new_lines_calls_on_recieve(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.recv_buffer = ''
self.mock.parse_lines.return_value = ['line1', 'line2']
self.mock.decode.return_value = sentinel.decoded
network.LineProtocol.on_receive(self.mock,
{'received': 'line1\nline2\n'})
self.assertEqual(2, self.mock.on_line_received.call_count)
def test_parse_lines_emtpy_buffer(self):
self.mock.recv_buffer = ''
lines = network.LineProtocol.parse_lines(self.mock)
self.assertRaises(StopIteration, lines.next)
def test_parse_lines_no_terminator(self):
self.mock.recv_buffer = 'data'
lines = network.LineProtocol.parse_lines(self.mock)
self.assertRaises(StopIteration, lines.next)
def test_parse_lines_termintor(self):
self.mock.recv_buffer = 'data\n'
lines = network.LineProtocol.parse_lines(self.mock)
self.assertEqual('data', lines.next())
self.assertRaises(StopIteration, lines.next)
self.assertEqual('', self.mock.recv_buffer)
def test_parse_lines_no_data_before_terminator(self):
self.mock.recv_buffer = '\n'
lines = network.LineProtocol.parse_lines(self.mock)
self.assertEqual('', lines.next())
self.assertRaises(StopIteration, lines.next)
self.assertEqual('', self.mock.recv_buffer)
def test_parse_lines_extra_data_after_terminator(self):
self.mock.recv_buffer = 'data1\ndata2'
lines = network.LineProtocol.parse_lines(self.mock)
self.assertEqual('data1', lines.next())
self.assertRaises(StopIteration, lines.next)
self.assertEqual('data2', self.mock.recv_buffer)
def test_parse_lines_unicode(self):
self.mock.recv_buffer = u'æøå\n'.encode('utf-8')
lines = network.LineProtocol.parse_lines(self.mock)
self.assertEqual(u'æøå'.encode('utf-8'), lines.next())
self.assertRaises(StopIteration, lines.next)
self.assertEqual('', self.mock.recv_buffer)
def test_parse_lines_multiple_lines(self):
self.mock.recv_buffer = 'abc\ndef\nghi\njkl'
lines = network.LineProtocol.parse_lines(self.mock)
self.assertEqual('abc', lines.next())
self.assertEqual('def', lines.next())
self.assertEqual('ghi', lines.next())
self.assertRaises(StopIteration, lines.next)
self.assertEqual('jkl', self.mock.recv_buffer)
def test_parse_lines_multiple_calls(self):
self.mock.recv_buffer = 'data1'
lines = network.LineProtocol.parse_lines(self.mock)
self.assertRaises(StopIteration, lines.next)
self.assertEqual('data1', self.mock.recv_buffer)
self.mock.recv_buffer += '\ndata2'
lines = network.LineProtocol.parse_lines(self.mock)
self.assertEqual('data1', lines.next())
self.assertRaises(StopIteration, lines.next)
self.assertEqual('data2', self.mock.recv_buffer)
def test_send_lines_called_with_no_lines(self):
self.mock.connection = Mock(spec=network.Connection)
network.LineProtocol.send_lines(self.mock, [])
self.assertEqual(0, self.mock.encode.call_count)
self.assertEqual(0, self.mock.connection.send.call_count)
def test_send_lines_calls_join_lines(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.join_lines.return_value = 'lines'
network.LineProtocol.send_lines(self.mock, sentinel.lines)
self.mock.join_lines.assert_called_once_with(sentinel.lines)
def test_send_line_encodes_joined_lines_with_final_terminator(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.join_lines.return_value = u'lines\n'
network.LineProtocol.send_lines(self.mock, sentinel.lines)
self.mock.encode.assert_called_once_with(u'lines\n')
def test_send_lines_sends_encoded_string(self):
self.mock.connection = Mock(spec=network.Connection)
self.mock.join_lines.return_value = 'lines'
self.mock.encode.return_value = sentinel.data
network.LineProtocol.send_lines(self.mock, sentinel.lines)
self.mock.connection.send.assert_called_once_with(sentinel.data)
def test_join_lines_returns_empty_string_for_no_lines(self):
self.assertEqual(u'', network.LineProtocol.join_lines(self.mock, []))
def test_join_lines_returns_joined_lines(self):
self.assertEqual(u'1\n2\n', network.LineProtocol.join_lines(
self.mock, [u'1', u'2']))
def test_decode_calls_decode_on_string(self):
string = Mock()
network.LineProtocol.decode(self.mock, string)
string.decode.assert_called_once_with(self.mock.encoding)
def test_decode_plain_ascii(self):
result = network.LineProtocol.decode(self.mock, 'abc')
self.assertEqual(u'abc', result)
self.assertEqual(unicode, type(result))
def test_decode_utf8(self):
result = network.LineProtocol.decode(
self.mock, u'æøå'.encode('utf-8'))
self.assertEqual(u'æøå', result)
self.assertEqual(unicode, type(result))
def test_decode_invalid_data(self):
string = Mock()
string.decode.side_effect = UnicodeError
network.LineProtocol.decode(self.mock, string)
self.mock.stop.assert_called_once_with()
def test_encode_calls_encode_on_string(self):
string = Mock()
network.LineProtocol.encode(self.mock, string)
string.encode.assert_called_once_with(self.mock.encoding)
def test_encode_plain_ascii(self):
result = network.LineProtocol.encode(self.mock, u'abc')
self.assertEqual('abc', result)
self.assertEqual(str, type(result))
def test_encode_utf8(self):
result = network.LineProtocol.encode(self.mock, u'æøå')
self.assertEqual(u'æøå'.encode('utf-8'), result)
self.assertEqual(str, type(result))
def test_encode_invalid_data(self):
string = Mock()
string.encode.side_effect = UnicodeError
network.LineProtocol.encode(self.mock, string)
self.mock.stop.assert_called_once_with()
def test_host_property(self):
mock = Mock(spec=network.Connection)
mock.host = sentinel.host
lineprotocol = network.LineProtocol(mock)
self.assertEqual(sentinel.host, lineprotocol.host)
def test_port_property(self):
mock = Mock(spec=network.Connection)
mock.port = sentinel.port
lineprotocol = network.LineProtocol(mock)
self.assertEqual(sentinel.port, lineprotocol.port)

View File

@ -0,0 +1,186 @@
import errno
import gobject
import socket
import unittest
from mopidy.utils import network
from mock import patch, sentinel, Mock
from tests import any_int
class ServerTest(unittest.TestCase):
def setUp(self):
self.mock = Mock(spec=network.Server)
def test_init_calls_create_server_socket(self):
network.Server.__init__(self.mock, sentinel.host,
sentinel.port, sentinel.protocol)
self.mock.create_server_socket.assert_called_once_with(
sentinel.host, sentinel.port)
def test_init_calls_register_server(self):
sock = Mock(spec=socket.SocketType)
sock.fileno.return_value = sentinel.fileno
self.mock.create_server_socket.return_value = sock
network.Server.__init__(self.mock, sentinel.host,
sentinel.port, sentinel.protocol)
self.mock.register_server_socket.assert_called_once_with(
sentinel.fileno)
def test_init_fails_on_fileno_call(self):
sock = Mock(spec=socket.SocketType)
sock.fileno.side_effect = socket.error
self.mock.create_server_socket.return_value = sock
self.assertRaises(socket.error, network.Server.__init__,
self.mock, sentinel.host, sentinel.port, sentinel.protocol)
def test_init_stores_values_in_attributes(self):
# This need to be a mock and no a sentinel as fileno() is called on it
sock = Mock(spec=socket.SocketType)
self.mock.create_server_socket.return_value = sock
network.Server.__init__(self.mock, sentinel.host, sentinel.port,
sentinel.protocol, max_connections=sentinel.max_connections,
timeout=sentinel.timeout)
self.assertEqual(sentinel.protocol, self.mock.protocol)
self.assertEqual(sentinel.max_connections, self.mock.max_connections)
self.assertEqual(sentinel.timeout, self.mock.timeout)
self.assertEqual(sock, self.mock.server_socket)
@patch.object(network, 'create_socket', spec=socket.SocketType)
def test_create_server_socket_sets_up_listener(self, create_socket):
sock = create_socket.return_value
network.Server.create_server_socket(self.mock,
sentinel.host, sentinel.port)
sock.setblocking.assert_called_once_with(False)
sock.bind.assert_called_once_with((sentinel.host, sentinel.port))
sock.listen.assert_called_once_with(any_int)
@patch.object(network, 'create_socket', new=Mock())
def test_create_server_socket_fails(self):
network.create_socket.side_effect = socket.error
self.assertRaises(socket.error, network.Server.create_server_socket,
self.mock, sentinel.host, sentinel.port)
@patch.object(network, 'create_socket', new=Mock())
def test_create_server_bind_fails(self):
sock = network.create_socket.return_value
sock.bind.side_effect = socket.error
self.assertRaises(socket.error, network.Server.create_server_socket,
self.mock, sentinel.host, sentinel.port)
@patch.object(network, 'create_socket', new=Mock())
def test_create_server_listen_fails(self):
sock = network.create_socket.return_value
sock.listen.side_effect = socket.error
self.assertRaises(socket.error, network.Server.create_server_socket,
self.mock, sentinel.host, sentinel.port)
@patch.object(gobject, 'io_add_watch', new=Mock())
def test_register_server_socket_sets_up_io_watch(self):
network.Server.register_server_socket(self.mock, sentinel.fileno)
gobject.io_add_watch.assert_called_once_with(sentinel.fileno,
gobject.IO_IN, self.mock.handle_connection)
def test_handle_connection(self):
self.mock.accept_connection.return_value = (
sentinel.sock, sentinel.addr)
self.mock.maximum_connections_exceeded.return_value = False
self.assertTrue(network.Server.handle_connection(
self.mock, sentinel.fileno, gobject.IO_IN))
self.mock.accept_connection.assert_called_once_with()
self.mock.maximum_connections_exceeded.assert_called_once_with()
self.mock.init_connection.assert_called_once_with(
sentinel.sock, sentinel.addr)
self.assertEqual(0, self.mock.reject_connection.call_count)
def test_handle_connection_exceeded_connections(self):
self.mock.accept_connection.return_value = (
sentinel.sock, sentinel.addr)
self.mock.maximum_connections_exceeded.return_value = True
self.assertTrue(network.Server.handle_connection(
self.mock, sentinel.fileno, gobject.IO_IN))
self.mock.accept_connection.assert_called_once_with()
self.mock.maximum_connections_exceeded.assert_called_once_with()
self.mock.reject_connection.assert_called_once_with(
sentinel.sock, sentinel.addr)
self.assertEqual(0, self.mock.init_connection.call_count)
def test_accept_connection(self):
sock = Mock(spec=socket.SocketType)
sock.accept.return_value = (sentinel.sock, sentinel.addr)
self.mock.server_socket = sock
sock, addr = network.Server.accept_connection(self.mock)
self.assertEqual(sentinel.sock, sock)
self.assertEqual(sentinel.addr, addr)
def test_accept_connection_recoverable_error(self):
sock = Mock(spec=socket.SocketType)
self.mock.server_socket = sock
for error in (errno.EAGAIN, errno.EINTR):
sock.accept.side_effect = socket.error(error, '')
self.assertRaises(network.ShouldRetrySocketCall,
network.Server.accept_connection, self.mock)
# FIXME decide if this should be allowed to propegate
def test_accept_connection_unrecoverable_error(self):
sock = Mock(spec=socket.SocketType)
self.mock.server_socket = sock
sock.accept.side_effect = socket.error
self.assertRaises(socket.error,
network.Server.accept_connection, self.mock)
def test_maximum_connections_exceeded(self):
self.mock.max_connections = 10
self.mock.number_of_connections.return_value = 11
self.assertTrue(network.Server.maximum_connections_exceeded(self.mock))
self.mock.number_of_connections.return_value = 10
self.assertTrue(network.Server.maximum_connections_exceeded(self.mock))
self.mock.number_of_connections.return_value = 9
self.assertFalse(network.Server.maximum_connections_exceeded(self.mock))
@patch('pykka.registry.ActorRegistry.get_by_class')
def test_number_of_connections(self, get_by_class):
self.mock.protocol = sentinel.protocol
get_by_class.return_value = [1, 2, 3]
self.assertEqual(3, network.Server.number_of_connections(self.mock))
get_by_class.return_value = []
self.assertEqual(0, network.Server.number_of_connections(self.mock))
@patch.object(network, 'Connection', new=Mock())
def test_init_connection(self):
self.mock.protocol = sentinel.protocol
self.mock.timeout = sentinel.timeout
network.Server.init_connection(self.mock, sentinel.sock, sentinel.addr)
network.Connection.assert_called_once_with(sentinel.protocol,
sentinel.sock, sentinel.addr, sentinel.timeout)
def test_reject_connection(self):
sock = Mock(spec=socket.SocketType)
network.Server.reject_connection(self.mock, sock,
(sentinel.host, sentinel.port))
sock.close.assert_called_once_with()
def test_reject_connection_error(self):
sock = Mock(spec=socket.SocketType)
sock.close.side_effect = socket.error
network.Server.reject_connection(self.mock, sock,
(sentinel.host, sentinel.port))
sock.close.assert_called_once_with()

View File

@ -1,52 +1,52 @@
import mock
import socket
import unittest
from mopidy.utils import network
from mock import patch, Mock
from tests import SkipTest
class FormatHostnameTest(unittest.TestCase):
@mock.patch('mopidy.utils.network.has_ipv6', True)
@patch('mopidy.utils.network.has_ipv6', True)
def test_format_hostname_prefixes_ipv4_addresses_when_ipv6_available(self):
network.has_ipv6 = True
self.assertEqual(network.format_hostname('0.0.0.0'), '::ffff:0.0.0.0')
self.assertEqual(network.format_hostname('1.0.0.1'), '::ffff:1.0.0.1')
@mock.patch('mopidy.utils.network.has_ipv6', False)
@patch('mopidy.utils.network.has_ipv6', False)
def test_format_hostname_does_nothing_when_only_ipv4_available(self):
network.has_ipv6 = False
self.assertEquals(network.format_hostname('0.0.0.0'), '0.0.0.0')
self.assertEqual(network.format_hostname('0.0.0.0'), '0.0.0.0')
class TryIPv6SocketTest(unittest.TestCase):
@mock.patch('socket.has_ipv6', False)
@patch('socket.has_ipv6', False)
def test_system_that_claims_no_ipv6_support(self):
self.assertFalse(network._try_ipv6_socket())
self.assertFalse(network.try_ipv6_socket())
@mock.patch('socket.has_ipv6', True)
@mock.patch('socket.socket')
@patch('socket.has_ipv6', True)
@patch('socket.socket')
def test_system_with_broken_ipv6(self, socket_mock):
socket_mock.side_effect = IOError()
self.assertFalse(network._try_ipv6_socket())
self.assertFalse(network.try_ipv6_socket())
@mock.patch('socket.has_ipv6', True)
@mock.patch('socket.socket')
@patch('socket.has_ipv6', True)
@patch('socket.socket')
def test_with_working_ipv6(self, socket_mock):
socket_mock.return_value = mock.Mock()
self.assertTrue(network._try_ipv6_socket())
socket_mock.return_value = Mock()
self.assertTrue(network.try_ipv6_socket())
class CreateSocketTest(unittest.TestCase):
@mock.patch('mopidy.utils.network.has_ipv6', False)
@mock.patch('socket.socket')
@patch('mopidy.utils.network.has_ipv6', False)
@patch('socket.socket')
def test_ipv4_socket(self, socket_mock):
network.create_socket()
self.assertEqual(socket_mock.call_args[0],
(socket.AF_INET, socket.SOCK_STREAM))
@mock.patch('mopidy.utils.network.has_ipv6', True)
@mock.patch('socket.socket')
@patch('mopidy.utils.network.has_ipv6', True)
@patch('socket.socket')
def test_ipv6_socket(self, socket_mock):
network.create_socket()
self.assertEqual(socket_mock.call_args[0],