From 5b0faa196d67866658bcee32de1df12b24263130 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Thu, 12 Aug 2010 01:38:11 +0200 Subject: [PATCH] Switch from output_connection to output_queue, as we are going to have multiple producers with time --- docs/changes.rst | 2 +- mopidy/backends/base/__init__.py | 9 ++++----- mopidy/backends/libspotify/__init__.py | 12 ++++++------ mopidy/outputs/gstreamer.py | 13 ++++++------- mopidy/process.py | 11 ++++++----- 5 files changed, 23 insertions(+), 24 deletions(-) diff --git a/docs/changes.rst b/docs/changes.rst index 811eb482..d38a135a 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -36,7 +36,7 @@ Another great release. keyword arguments of the form ``search(artist=['foo', 'fighters'], album=['bar', 'grooves'])``. - :meth:`mopidy.backends.base.BaseBackend()` now accepts an - ``output_connection`` which it can use to send messages (i.e. audio data) + ``output_queue`` which it can use to send messages (i.e. audio data) to the output process. diff --git a/mopidy/backends/base/__init__.py b/mopidy/backends/base/__init__.py index 942b190e..3a484865 100644 --- a/mopidy/backends/base/__init__.py +++ b/mopidy/backends/base/__init__.py @@ -23,17 +23,16 @@ class BaseBackend(object): :param core_queue: a queue for sending messages to :class:`mopidy.process.CoreProcess` :type core_queue: :class:`multiprocessing.Queue` - :param output_connection: a connection for sending messages to the - output process - :type output_connection: :class:`multiprocessing.Connection` + :param output_queue: a queue for sending messages to the output process + :type output_queue: :class:`multiprocessing.Queue` :param mixer: either a mixer instance, or :class:`None` to use the mixer defined in settings :type mixer: :class:`mopidy.mixers.BaseMixer` or :class:`None` """ - def __init__(self, core_queue=None, output_connection=None, mixer=None): + def __init__(self, core_queue=None, output_queue=None, mixer=None): self.core_queue = core_queue - self.output_connection = output_connection + self.output_queue = output_queue if mixer is not None: self.mixer = mixer else: diff --git a/mopidy/backends/libspotify/__init__.py b/mopidy/backends/libspotify/__init__.py index 1fa07836..cc853314 100644 --- a/mopidy/backends/libspotify/__init__.py +++ b/mopidy/backends/libspotify/__init__.py @@ -50,7 +50,7 @@ class LibspotifyBackend(BaseBackend): spotify = LibspotifySessionManager( settings.SPOTIFY_USERNAME, settings.SPOTIFY_PASSWORD, core_queue=self.core_queue, - output_connection=self.output_connection) + output_queue=self.output_queue) spotify.start() return spotify @@ -95,7 +95,7 @@ class LibspotifyPlaybackController(BasePlaybackController): def _set_output_state(self, state_name): logger.debug(u'Setting output state to %s ...', state_name) (my_end, other_end) = multiprocessing.Pipe() - self.backend.output_connection.send({ + self.backend.output_queue.put({ 'command': 'set_state', 'state': state_name, 'reply_to': pickle_connection(other_end), @@ -210,11 +210,11 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): appkey_file = os.path.expanduser(settings.SPOTIFY_LIB_APPKEY) user_agent = 'Mopidy %s' % get_version() - def __init__(self, username, password, core_queue, output_connection): + def __init__(self, username, password, core_queue, output_queue): SpotifySessionManager.__init__(self, username, password) threading.Thread.__init__(self) self.core_queue = core_queue - self.output_connection = output_connection + self.output_queue = output_queue self.connected = threading.Event() self.session = None @@ -268,7 +268,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): signed=True, rate=(int)44100 """ - self.output_connection.send({ + self.output_queue.put({ 'command': 'deliver_data', 'caps': caps_string, 'data': bytes(frames), @@ -286,7 +286,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def end_of_track(self, session): """Callback used by pyspotify""" logger.debug('End of data stream.') - self.output_connection.send({'command': 'end_of_data_stream'}) + self.output_queue.put({'command': 'end_of_data_stream'}) def search(self, query, connection): """Search method used by Mopidy backend""" diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index bda04b25..65b65504 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -19,8 +19,8 @@ class GStreamerOutput(object): Starts the :class:`GStreamerProcess`. """ - def __init__(self, core_queue, input_connection): - process = GStreamerProcess(core_queue, input_connection) + def __init__(self, core_queue, output_queue): + process = GStreamerProcess(core_queue, output_queue) process.start() class GStreamerMessagesThread(threading.Thread): @@ -39,10 +39,10 @@ class GStreamerProcess(BaseProcess): http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html. """ - def __init__(self, core_queue, input_connection): + def __init__(self, core_queue, output_queue): super(GStreamerProcess, self).__init__() self.core_queue = core_queue - self.input_connection = input_connection + self.output_queue = output_queue self.gst_pipeline = None self.gst_bus = None self.gst_bus_id = None @@ -54,9 +54,8 @@ class GStreamerProcess(BaseProcess): def run_inside_try(self): self.setup() while True: - if self.input_connection.poll(None): - message = self.input_connection.recv() - self.process_mopidy_message(message) + message = self.output_queue.get() + self.process_mopidy_message(message) def setup(self): logger.debug(u'Setting up GStreamer pipeline') diff --git a/mopidy/process.py b/mopidy/process.py index ff30160c..9759c4e6 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -37,7 +37,7 @@ class CoreProcess(BaseProcess): def __init__(self, core_queue): super(CoreProcess, self).__init__() self.core_queue = core_queue - self.output_connection = None + self.output_queue = None self.output = None self.backend = None self.frontend = None @@ -49,15 +49,16 @@ class CoreProcess(BaseProcess): self.process_message(message) def setup(self): - (recv_end, self.output_connection) = multiprocessing.Pipe(False) - self.output = get_class(settings.OUTPUT)(self.core_queue, recv_end) + self.output_queue = multiprocessing.Queue() + self.output = get_class(settings.OUTPUT)(self.core_queue, + self.output_queue) self.backend = get_class(settings.BACKENDS[0])(self.core_queue, - self.output_connection) + self.output_queue) self.frontend = get_class(settings.FRONTEND)(self.backend) def process_message(self, message): if message.get('to') == 'output': - self.output_connection.send(message) + self.output_queue.put(message) elif message['command'] == 'mpd_request': response = self.frontend.handle_request(message['request']) connection = unpickle_connection(message['reply_to'])