Merge pull request #128 from adamcik/feature/prevent-send-context-switch
This commit is contained in:
commit
0bfaff9c50
@ -161,12 +161,24 @@ class Connection(object):
|
||||
except socket.error:
|
||||
pass
|
||||
|
||||
def send(self, data):
|
||||
"""Send data to client exactly as is."""
|
||||
def queue_send(self, data):
|
||||
"""Try to send data to client exactly as is and queue rest."""
|
||||
self.send_lock.acquire(True)
|
||||
self.send_buffer += data
|
||||
self.send_buffer = self.send(self.send_buffer + data)
|
||||
self.send_lock.release()
|
||||
self.enable_send()
|
||||
if self.send_buffer:
|
||||
self.enable_send()
|
||||
|
||||
def send(self, data):
|
||||
"""Send data to client, return any unsent data."""
|
||||
try:
|
||||
sent = self.sock.send(data)
|
||||
return data[sent:]
|
||||
except socket.error as e:
|
||||
if e.errno in (errno.EWOULDBLOCK, errno.EINTR):
|
||||
return data
|
||||
self.stop(u'Unexpected client error: %s' % e)
|
||||
return ''
|
||||
|
||||
def enable_timeout(self):
|
||||
"""Reactivate timeout mechanism."""
|
||||
@ -253,13 +265,9 @@ class Connection(object):
|
||||
return True
|
||||
|
||||
try:
|
||||
sent = self.sock.send(self.send_buffer)
|
||||
self.send_buffer = self.send_buffer[sent:]
|
||||
self.send_buffer = self.send(self.send_buffer)
|
||||
if not self.send_buffer:
|
||||
self.disable_send()
|
||||
except socket.error as e:
|
||||
if e.errno not in (errno.EWOULDBLOCK, errno.EINTR):
|
||||
self.stop(u'Unexpected client error: %s' % e)
|
||||
finally:
|
||||
self.send_lock.release()
|
||||
|
||||
@ -383,4 +391,4 @@ class LineProtocol(ThreadingActor):
|
||||
return
|
||||
|
||||
data = self.join_lines(lines)
|
||||
self.connection.send(self.encode(data))
|
||||
self.connection.queue_send(self.encode(data))
|
||||
|
||||
@ -14,7 +14,7 @@ class MockConnetion(mock.Mock):
|
||||
self.port = mock.sentinel.port
|
||||
self.response = []
|
||||
|
||||
def send(self, data):
|
||||
def queue_send(self, data):
|
||||
lines = (line for line in data.split('\n') if line)
|
||||
self.response.extend(lines)
|
||||
|
||||
|
||||
@ -8,7 +8,7 @@ import unittest
|
||||
from mopidy.utils import network
|
||||
|
||||
from mock import patch, sentinel, Mock
|
||||
from tests import any_int, any_unicode
|
||||
from tests import any_int, any_unicode, SkipTest
|
||||
|
||||
class ConnectionTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
@ -314,33 +314,43 @@ class ConnectionTest(unittest.TestCase):
|
||||
self.assertEqual(0, gobject.source_remove.call_count)
|
||||
self.assertEqual(None, self.mock.timeout_id)
|
||||
|
||||
def test_send_acquires_and_releases_lock(self):
|
||||
def test_queue_send_acquires_and_releases_lock(self):
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send_buffer = ''
|
||||
|
||||
network.Connection.send(self.mock, 'data')
|
||||
network.Connection.queue_send(self.mock, 'data')
|
||||
self.mock.send_lock.acquire.assert_called_once_with(True)
|
||||
self.mock.send_lock.release.assert_called_once_with()
|
||||
|
||||
def test_send_appends_to_send_buffer(self):
|
||||
self.mock.send_lock = Mock()
|
||||
def test_queue_send_calls_send(self):
|
||||
self.mock.send_buffer = ''
|
||||
|
||||
network.Connection.send(self.mock, 'abc')
|
||||
self.assertEqual('abc', self.mock.send_buffer)
|
||||
|
||||
network.Connection.send(self.mock, 'def')
|
||||
self.assertEqual('abcdef', self.mock.send_buffer)
|
||||
|
||||
network.Connection.send(self.mock, '')
|
||||
self.assertEqual('abcdef', self.mock.send_buffer)
|
||||
|
||||
def test_send_calls_enable_send(self):
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send_buffer = ''
|
||||
self.mock.send.return_value = ''
|
||||
|
||||
network.Connection.send(self.mock, 'data')
|
||||
network.Connection.queue_send(self.mock, 'data')
|
||||
self.mock.send.assert_called_once_with('data')
|
||||
self.assertEqual(0, self.mock.enable_send.call_count)
|
||||
self.assertEqual('', self.mock.send_buffer)
|
||||
|
||||
def test_queue_send_calls_enable_send_for_partial_send(self):
|
||||
self.mock.send_buffer = ''
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send.return_value = 'ta'
|
||||
|
||||
network.Connection.queue_send(self.mock, 'data')
|
||||
self.mock.send.assert_called_once_with('data')
|
||||
self.mock.enable_send.assert_called_once_with()
|
||||
self.assertEqual('ta', self.mock.send_buffer)
|
||||
|
||||
def test_queue_send_calls_send_with_existing_buffer(self):
|
||||
self.mock.send_buffer = 'foo'
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send.return_value = ''
|
||||
|
||||
network.Connection.queue_send(self.mock, 'bar')
|
||||
self.mock.send.assert_called_once_with('foobar')
|
||||
self.assertEqual(0, self.mock.enable_send.call_count)
|
||||
self.assertEqual('', self.mock.send_buffer)
|
||||
|
||||
def test_recv_callback_respects_io_err(self):
|
||||
self.mock.sock = Mock(spec=socket.SocketType)
|
||||
@ -473,48 +483,53 @@ class ConnectionTest(unittest.TestCase):
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send_lock.acquire.return_value = True
|
||||
self.mock.send_buffer = 'data'
|
||||
self.mock.sock = Mock(spec=socket.SocketType)
|
||||
self.mock.sock.send.return_value = 4
|
||||
self.mock.send.return_value = ''
|
||||
|
||||
self.assertTrue(network.Connection.send_callback(
|
||||
self.mock, sentinel.fd, gobject.IO_IN))
|
||||
self.mock.disable_send.assert_called_once_with()
|
||||
self.mock.sock.send.assert_called_once_with('data')
|
||||
self.mock.send.assert_called_once_with('data')
|
||||
self.assertEqual('', self.mock.send_buffer)
|
||||
|
||||
def test_send_callback_sends_partial_data(self):
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send_lock.acquire.return_value = True
|
||||
self.mock.send_buffer = 'data'
|
||||
self.mock.sock = Mock(spec=socket.SocketType)
|
||||
self.mock.sock.send.return_value = 2
|
||||
self.mock.send.return_value = 'ta'
|
||||
|
||||
self.assertTrue(network.Connection.send_callback(
|
||||
self.mock, sentinel.fd, gobject.IO_IN))
|
||||
self.mock.sock.send.assert_called_once_with('data')
|
||||
self.mock.send.assert_called_once_with('data')
|
||||
self.assertEqual('ta', self.mock.send_buffer)
|
||||
|
||||
def test_send_callback_recoverable_error(self):
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send_lock.acquire.return_value = True
|
||||
self.mock.send_buffer = 'data'
|
||||
def test_send_recoverable_error(self):
|
||||
self.mock.sock = Mock(spec=socket.SocketType)
|
||||
|
||||
for error in (errno.EWOULDBLOCK, errno.EINTR):
|
||||
self.mock.sock.send.side_effect = socket.error(error, '')
|
||||
self.assertTrue(network.Connection.send_callback(
|
||||
self.mock, sentinel.fd, gobject.IO_IN))
|
||||
|
||||
network.Connection.send(self.mock, 'data')
|
||||
self.assertEqual(0, self.mock.stop.call_count)
|
||||
|
||||
def test_send_callback_unrecoverable_error(self):
|
||||
self.mock.send_lock = Mock()
|
||||
self.mock.send_lock.acquire.return_value = True
|
||||
self.mock.send_buffer = 'data'
|
||||
def test_send_calls_socket_send(self):
|
||||
self.mock.sock = Mock(spec=socket.SocketType)
|
||||
self.mock.sock.send.return_value = 4
|
||||
|
||||
self.assertEqual('', network.Connection.send(self.mock, 'data'))
|
||||
self.mock.sock.send.assert_called_once_with('data')
|
||||
|
||||
def test_send_calls_socket_send_partial_send(self):
|
||||
self.mock.sock = Mock(spec=socket.SocketType)
|
||||
self.mock.sock.send.return_value = 2
|
||||
|
||||
self.assertEqual('ta', network.Connection.send(self.mock, 'data'))
|
||||
self.mock.sock.send.assert_called_once_with('data')
|
||||
|
||||
def test_send_unrecoverable_error(self):
|
||||
self.mock.sock = Mock(spec=socket.SocketType)
|
||||
self.mock.sock.send.side_effect = socket.error
|
||||
self.assertTrue(network.Connection.send_callback(
|
||||
self.mock, sentinel.fd, gobject.IO_IN))
|
||||
|
||||
self.assertEqual('', network.Connection.send(self.mock, 'data'))
|
||||
self.mock.stop.assert_called_once_with(any_unicode)
|
||||
|
||||
def test_timeout_callback(self):
|
||||
|
||||
@ -196,7 +196,7 @@ class LineProtocolTest(unittest.TestCase):
|
||||
|
||||
network.LineProtocol.send_lines(self.mock, [])
|
||||
self.assertEqual(0, self.mock.encode.call_count)
|
||||
self.assertEqual(0, self.mock.connection.send.call_count)
|
||||
self.assertEqual(0, self.mock.connection.queue_send.call_count)
|
||||
|
||||
def test_send_lines_calls_join_lines(self):
|
||||
self.mock.connection = Mock(spec=network.Connection)
|
||||
@ -218,7 +218,7 @@ class LineProtocolTest(unittest.TestCase):
|
||||
self.mock.encode.return_value = sentinel.data
|
||||
|
||||
network.LineProtocol.send_lines(self.mock, sentinel.lines)
|
||||
self.mock.connection.send.assert_called_once_with(sentinel.data)
|
||||
self.mock.connection.queue_send.assert_called_once_with(sentinel.data)
|
||||
|
||||
def test_join_lines_returns_empty_string_for_no_lines(self):
|
||||
self.assertEqual(u'', network.LineProtocol.join_lines(self.mock, []))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user