Switch from output_connection to output_queue, as we are going to have multiple producers with time

This commit is contained in:
Stein Magnus Jodal 2010-08-12 01:38:11 +02:00
parent 816f877c4b
commit 5b0faa196d
5 changed files with 23 additions and 24 deletions

View File

@ -36,7 +36,7 @@ Another great release.
keyword arguments of the form ``search(artist=['foo', 'fighters'],
album=['bar', 'grooves'])``.
- :meth:`mopidy.backends.base.BaseBackend()` now accepts an
``output_connection`` which it can use to send messages (i.e. audio data)
``output_queue`` which it can use to send messages (i.e. audio data)
to the output process.

View File

@ -23,17 +23,16 @@ class BaseBackend(object):
:param core_queue: a queue for sending messages to
:class:`mopidy.process.CoreProcess`
:type core_queue: :class:`multiprocessing.Queue`
:param output_connection: a connection for sending messages to the
output process
:type output_connection: :class:`multiprocessing.Connection`
:param output_queue: a queue for sending messages to the output process
:type output_queue: :class:`multiprocessing.Queue`
:param mixer: either a mixer instance, or :class:`None` to use the mixer
defined in settings
:type mixer: :class:`mopidy.mixers.BaseMixer` or :class:`None`
"""
def __init__(self, core_queue=None, output_connection=None, mixer=None):
def __init__(self, core_queue=None, output_queue=None, mixer=None):
self.core_queue = core_queue
self.output_connection = output_connection
self.output_queue = output_queue
if mixer is not None:
self.mixer = mixer
else:

View File

@ -50,7 +50,7 @@ class LibspotifyBackend(BaseBackend):
spotify = LibspotifySessionManager(
settings.SPOTIFY_USERNAME, settings.SPOTIFY_PASSWORD,
core_queue=self.core_queue,
output_connection=self.output_connection)
output_queue=self.output_queue)
spotify.start()
return spotify
@ -95,7 +95,7 @@ class LibspotifyPlaybackController(BasePlaybackController):
def _set_output_state(self, state_name):
logger.debug(u'Setting output state to %s ...', state_name)
(my_end, other_end) = multiprocessing.Pipe()
self.backend.output_connection.send({
self.backend.output_queue.put({
'command': 'set_state',
'state': state_name,
'reply_to': pickle_connection(other_end),
@ -210,11 +210,11 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
appkey_file = os.path.expanduser(settings.SPOTIFY_LIB_APPKEY)
user_agent = 'Mopidy %s' % get_version()
def __init__(self, username, password, core_queue, output_connection):
def __init__(self, username, password, core_queue, output_queue):
SpotifySessionManager.__init__(self, username, password)
threading.Thread.__init__(self)
self.core_queue = core_queue
self.output_connection = output_connection
self.output_queue = output_queue
self.connected = threading.Event()
self.session = None
@ -268,7 +268,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
signed=True,
rate=(int)44100
"""
self.output_connection.send({
self.output_queue.put({
'command': 'deliver_data',
'caps': caps_string,
'data': bytes(frames),
@ -286,7 +286,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
def end_of_track(self, session):
"""Callback used by pyspotify"""
logger.debug('End of data stream.')
self.output_connection.send({'command': 'end_of_data_stream'})
self.output_queue.put({'command': 'end_of_data_stream'})
def search(self, query, connection):
"""Search method used by Mopidy backend"""

View File

@ -19,8 +19,8 @@ class GStreamerOutput(object):
Starts the :class:`GStreamerProcess`.
"""
def __init__(self, core_queue, input_connection):
process = GStreamerProcess(core_queue, input_connection)
def __init__(self, core_queue, output_queue):
process = GStreamerProcess(core_queue, output_queue)
process.start()
class GStreamerMessagesThread(threading.Thread):
@ -39,10 +39,10 @@ class GStreamerProcess(BaseProcess):
http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html.
"""
def __init__(self, core_queue, input_connection):
def __init__(self, core_queue, output_queue):
super(GStreamerProcess, self).__init__()
self.core_queue = core_queue
self.input_connection = input_connection
self.output_queue = output_queue
self.gst_pipeline = None
self.gst_bus = None
self.gst_bus_id = None
@ -54,9 +54,8 @@ class GStreamerProcess(BaseProcess):
def run_inside_try(self):
self.setup()
while True:
if self.input_connection.poll(None):
message = self.input_connection.recv()
self.process_mopidy_message(message)
message = self.output_queue.get()
self.process_mopidy_message(message)
def setup(self):
logger.debug(u'Setting up GStreamer pipeline')

View File

@ -37,7 +37,7 @@ class CoreProcess(BaseProcess):
def __init__(self, core_queue):
super(CoreProcess, self).__init__()
self.core_queue = core_queue
self.output_connection = None
self.output_queue = None
self.output = None
self.backend = None
self.frontend = None
@ -49,15 +49,16 @@ class CoreProcess(BaseProcess):
self.process_message(message)
def setup(self):
(recv_end, self.output_connection) = multiprocessing.Pipe(False)
self.output = get_class(settings.OUTPUT)(self.core_queue, recv_end)
self.output_queue = multiprocessing.Queue()
self.output = get_class(settings.OUTPUT)(self.core_queue,
self.output_queue)
self.backend = get_class(settings.BACKENDS[0])(self.core_queue,
self.output_connection)
self.output_queue)
self.frontend = get_class(settings.FRONTEND)(self.backend)
def process_message(self, message):
if message.get('to') == 'output':
self.output_connection.send(message)
self.output_queue.put(message)
elif message['command'] == 'mpd_request':
response = self.frontend.handle_request(message['request'])
connection = unpickle_connection(message['reply_to'])