diff --git a/mopidy/backends/base/__init__.py b/mopidy/backends/base/__init__.py index 80c4d0c0..491c5b73 100644 --- a/mopidy/backends/base/__init__.py +++ b/mopidy/backends/base/__init__.py @@ -23,17 +23,17 @@ class BaseBackend(object): :param core_queue: a queue for sending messages to :class:`mopidy.process.CoreProcess` :type core_queue: :class:`multiprocessing.Queue` - :param output_queue: a queue for sending messages to the output process - :type output_queue: :class:`multiprocessing.Queue` + :param output: the audio output + :type output: :class:`mopidy.outputs.gstreamer.GStreamerOutput` or similar :param mixer_class: either a mixer class, or :class:`None` to use the mixer defined in settings :type mixer_class: a subclass of :class:`mopidy.mixers.BaseMixer` or :class:`None` """ - def __init__(self, core_queue=None, output_queue=None, mixer_class=None): + def __init__(self, core_queue=None, output=None, mixer_class=None): self.core_queue = core_queue - self.output_queue = output_queue + self.output = output if mixer_class is None: mixer_class = get_class(settings.MIXER) self.mixer = mixer_class(self) diff --git a/mopidy/backends/libspotify/__init__.py b/mopidy/backends/libspotify/__init__.py index 07f3e2f7..0d7e5d0b 100644 --- a/mopidy/backends/libspotify/__init__.py +++ b/mopidy/backends/libspotify/__init__.py @@ -55,6 +55,6 @@ class LibspotifyBackend(BaseBackend): spotify = LibspotifySessionManager( settings.SPOTIFY_USERNAME, settings.SPOTIFY_PASSWORD, core_queue=self.core_queue, - output_queue=self.output_queue) + output=self.output) spotify.start() return spotify diff --git a/mopidy/backends/libspotify/playback.py b/mopidy/backends/libspotify/playback.py index ed5ba697..58f6ec3a 100644 --- a/mopidy/backends/libspotify/playback.py +++ b/mopidy/backends/libspotify/playback.py @@ -12,7 +12,7 @@ class LibspotifyPlaybackController(BasePlaybackController): def _set_output_state(self, state_name): logger.debug(u'Setting output state to %s ...', state_name) (my_end, other_end) = multiprocessing.Pipe() - self.backend.output_queue.put({ + self.backend.output.process_message({ 'command': 'set_state', 'state': state_name, 'reply_to': pickle_connection(other_end), diff --git a/mopidy/backends/libspotify/session_manager.py b/mopidy/backends/libspotify/session_manager.py index 22cbb0a0..f58a32f8 100644 --- a/mopidy/backends/libspotify/session_manager.py +++ b/mopidy/backends/libspotify/session_manager.py @@ -16,14 +16,14 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): appkey_file = os.path.join(os.path.dirname(__file__), 'spotify_appkey.key') user_agent = 'Mopidy %s' % get_version() - def __init__(self, username, password, core_queue, output_queue): + def __init__(self, username, password, core_queue, output): SpotifySessionManager.__init__(self, username, password) threading.Thread.__init__(self, name='LibspotifySessionManagerThread') # Run as a daemon thread, so Mopidy won't wait for this thread to exit # before Mopidy exits. self.daemon = True self.core_queue = core_queue - self.output_queue = output_queue + self.output = output self.connected = threading.Event() self.session = None @@ -48,6 +48,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): playlists.append( LibspotifyTranslator.to_mopidy_playlist(spotify_playlist)) self.core_queue.put({ + 'to': 'output', 'command': 'set_stored_playlists', 'playlists': playlists, }) @@ -77,7 +78,8 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): signed=True, rate=(int)44100 """ - self.output_queue.put({ + self.output.process_message({ + 'to': 'output', 'command': 'deliver_data', 'caps': caps_string, 'data': bytes(frames), @@ -95,7 +97,10 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def end_of_track(self, session): """Callback used by pyspotify""" logger.debug('End of data stream.') - self.output_queue.put({'command': 'end_of_data_stream'}) + self.output.process_message({ + 'to': 'output', + 'command': 'end_of_data_stream', + }) def search(self, query, connection): """Search method used by Mopidy backend""" diff --git a/mopidy/backends/local/__init__.py b/mopidy/backends/local/__init__.py index 50b3d84d..7112fdf1 100644 --- a/mopidy/backends/local/__init__.py +++ b/mopidy/backends/local/__init__.py @@ -43,13 +43,16 @@ class LocalPlaybackController(BasePlaybackController): def _send_recv(self, message): (my_end, other_end) = multiprocessing.Pipe() - message.update({'reply_to': pickle_connection(other_end)}) - self.backend.output_queue.put(message) + message.update({ + 'to': 'output', + 'reply_to': pickle_connection(other_end), + }) + self.backend.output.process_message(message) my_end.poll(None) return my_end.recv() def _send(self, message): - self.backend.output_queue.put(message) + self.backend.output.process_message(message) def _set_state(self, state): return self._send_recv({'command': 'set_state', 'state': state}) diff --git a/mopidy/core.py b/mopidy/core.py index 260c8f8c..d3b2c94f 100644 --- a/mopidy/core.py +++ b/mopidy/core.py @@ -16,7 +16,7 @@ class CoreProcess(BaseProcess): super(CoreProcess, self).__init__(name='CoreProcess') self.core_queue = multiprocessing.Queue() self.options = self.parse_options() - self.output_queue = None + self.output = None self.backend = None self.frontend = None @@ -46,8 +46,8 @@ class CoreProcess(BaseProcess): def setup(self): self.setup_logging() self.setup_settings() - self.output_queue = self.setup_output(self.core_queue) - self.backend = self.setup_backend(self.core_queue, self.output_queue) + self.output = self.setup_output(self.core_queue) + self.backend = self.setup_backend(self.core_queue, self.output) self.frontend = self.setup_frontend(self.core_queue, self.backend) def setup_logging(self): @@ -59,12 +59,12 @@ class CoreProcess(BaseProcess): settings.validate() def setup_output(self, core_queue): - output_queue = multiprocessing.Queue() - get_class(settings.OUTPUT)(core_queue, output_queue) - return output_queue + output = get_class(settings.OUTPUT)(core_queue) + output.start() + return output - def setup_backend(self, core_queue, output_queue): - return get_class(settings.BACKENDS[0])(core_queue, output_queue) + def setup_backend(self, core_queue, output): + return get_class(settings.BACKENDS[0])(core_queue, output) def setup_frontend(self, core_queue, backend): frontend = get_class(settings.FRONTENDS[0])(core_queue, backend) @@ -73,7 +73,7 @@ class CoreProcess(BaseProcess): def process_message(self, message): if message.get('to') == 'output': - self.output_queue.put(message) + self.output.process_message(message) elif message.get('to') == 'frontend': self.frontend.process_message(message) elif message['command'] == 'end_of_track': diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 554e986e..9e9a843b 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -6,14 +6,16 @@ pygst.require('0.10') import gst import logging +import multiprocessing import threading from mopidy import settings +from mopidy.outputs.base import BaseOutput from mopidy.utils.process import BaseProcess, unpickle_connection logger = logging.getLogger('mopidy.outputs.gstreamer') -class GStreamerOutput(object): +class GStreamerOutput(BaseOutput): """ Audio output through GStreamer. @@ -24,13 +26,25 @@ class GStreamerOutput(object): - :attr:`mopidy.settings.GSTREAMER_AUDIO_SINK` """ - def __init__(self, core_queue, output_queue): - self.process = GStreamerProcess(core_queue, output_queue) + def __init__(self, core_queue): + super(GStreamerOutput, self).__init__(core_queue) + self.output_queue = multiprocessing.Queue() + self.process = GStreamerProcess(core_queue, self.output_queue) + + def start(self): self.process.start() def destroy(self): self.process.terminate() + def process_message(self, message): + """ + Processes messages with the GStreamer output as destination. + """ + assert message['to'] == 'output', \ + u'Message recipient must be "output".' + self.output_queue.put(message) + class GStreamerMessagesThread(threading.Thread): def run(self): gobject.MainLoop().run()