Pass around an 'output' instead of an 'output_queue'
This commit is contained in:
parent
5d61bb1f7d
commit
1ebd95e879
@ -23,17 +23,17 @@ class BaseBackend(object):
|
||||
:param core_queue: a queue for sending messages to
|
||||
:class:`mopidy.process.CoreProcess`
|
||||
:type core_queue: :class:`multiprocessing.Queue`
|
||||
:param output_queue: a queue for sending messages to the output process
|
||||
:type output_queue: :class:`multiprocessing.Queue`
|
||||
:param output: the audio output
|
||||
:type output: :class:`mopidy.outputs.gstreamer.GStreamerOutput` or similar
|
||||
:param mixer_class: either a mixer class, or :class:`None` to use the mixer
|
||||
defined in settings
|
||||
:type mixer_class: a subclass of :class:`mopidy.mixers.BaseMixer` or
|
||||
:class:`None`
|
||||
"""
|
||||
|
||||
def __init__(self, core_queue=None, output_queue=None, mixer_class=None):
|
||||
def __init__(self, core_queue=None, output=None, mixer_class=None):
|
||||
self.core_queue = core_queue
|
||||
self.output_queue = output_queue
|
||||
self.output = output
|
||||
if mixer_class is None:
|
||||
mixer_class = get_class(settings.MIXER)
|
||||
self.mixer = mixer_class(self)
|
||||
|
||||
@ -55,6 +55,6 @@ class LibspotifyBackend(BaseBackend):
|
||||
spotify = LibspotifySessionManager(
|
||||
settings.SPOTIFY_USERNAME, settings.SPOTIFY_PASSWORD,
|
||||
core_queue=self.core_queue,
|
||||
output_queue=self.output_queue)
|
||||
output=self.output)
|
||||
spotify.start()
|
||||
return spotify
|
||||
|
||||
@ -12,7 +12,7 @@ class LibspotifyPlaybackController(BasePlaybackController):
|
||||
def _set_output_state(self, state_name):
|
||||
logger.debug(u'Setting output state to %s ...', state_name)
|
||||
(my_end, other_end) = multiprocessing.Pipe()
|
||||
self.backend.output_queue.put({
|
||||
self.backend.output.process_message({
|
||||
'command': 'set_state',
|
||||
'state': state_name,
|
||||
'reply_to': pickle_connection(other_end),
|
||||
|
||||
@ -16,14 +16,14 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
|
||||
appkey_file = os.path.join(os.path.dirname(__file__), 'spotify_appkey.key')
|
||||
user_agent = 'Mopidy %s' % get_version()
|
||||
|
||||
def __init__(self, username, password, core_queue, output_queue):
|
||||
def __init__(self, username, password, core_queue, output):
|
||||
SpotifySessionManager.__init__(self, username, password)
|
||||
threading.Thread.__init__(self, name='LibspotifySessionManagerThread')
|
||||
# Run as a daemon thread, so Mopidy won't wait for this thread to exit
|
||||
# before Mopidy exits.
|
||||
self.daemon = True
|
||||
self.core_queue = core_queue
|
||||
self.output_queue = output_queue
|
||||
self.output = output
|
||||
self.connected = threading.Event()
|
||||
self.session = None
|
||||
|
||||
@ -48,6 +48,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
|
||||
playlists.append(
|
||||
LibspotifyTranslator.to_mopidy_playlist(spotify_playlist))
|
||||
self.core_queue.put({
|
||||
'to': 'output',
|
||||
'command': 'set_stored_playlists',
|
||||
'playlists': playlists,
|
||||
})
|
||||
@ -77,7 +78,8 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
|
||||
signed=True,
|
||||
rate=(int)44100
|
||||
"""
|
||||
self.output_queue.put({
|
||||
self.output.process_message({
|
||||
'to': 'output',
|
||||
'command': 'deliver_data',
|
||||
'caps': caps_string,
|
||||
'data': bytes(frames),
|
||||
@ -95,7 +97,10 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
|
||||
def end_of_track(self, session):
|
||||
"""Callback used by pyspotify"""
|
||||
logger.debug('End of data stream.')
|
||||
self.output_queue.put({'command': 'end_of_data_stream'})
|
||||
self.output.process_message({
|
||||
'to': 'output',
|
||||
'command': 'end_of_data_stream',
|
||||
})
|
||||
|
||||
def search(self, query, connection):
|
||||
"""Search method used by Mopidy backend"""
|
||||
|
||||
@ -43,13 +43,16 @@ class LocalPlaybackController(BasePlaybackController):
|
||||
|
||||
def _send_recv(self, message):
|
||||
(my_end, other_end) = multiprocessing.Pipe()
|
||||
message.update({'reply_to': pickle_connection(other_end)})
|
||||
self.backend.output_queue.put(message)
|
||||
message.update({
|
||||
'to': 'output',
|
||||
'reply_to': pickle_connection(other_end),
|
||||
})
|
||||
self.backend.output.process_message(message)
|
||||
my_end.poll(None)
|
||||
return my_end.recv()
|
||||
|
||||
def _send(self, message):
|
||||
self.backend.output_queue.put(message)
|
||||
self.backend.output.process_message(message)
|
||||
|
||||
def _set_state(self, state):
|
||||
return self._send_recv({'command': 'set_state', 'state': state})
|
||||
|
||||
@ -16,7 +16,7 @@ class CoreProcess(BaseProcess):
|
||||
super(CoreProcess, self).__init__(name='CoreProcess')
|
||||
self.core_queue = multiprocessing.Queue()
|
||||
self.options = self.parse_options()
|
||||
self.output_queue = None
|
||||
self.output = None
|
||||
self.backend = None
|
||||
self.frontend = None
|
||||
|
||||
@ -46,8 +46,8 @@ class CoreProcess(BaseProcess):
|
||||
def setup(self):
|
||||
self.setup_logging()
|
||||
self.setup_settings()
|
||||
self.output_queue = self.setup_output(self.core_queue)
|
||||
self.backend = self.setup_backend(self.core_queue, self.output_queue)
|
||||
self.output = self.setup_output(self.core_queue)
|
||||
self.backend = self.setup_backend(self.core_queue, self.output)
|
||||
self.frontend = self.setup_frontend(self.core_queue, self.backend)
|
||||
|
||||
def setup_logging(self):
|
||||
@ -59,12 +59,12 @@ class CoreProcess(BaseProcess):
|
||||
settings.validate()
|
||||
|
||||
def setup_output(self, core_queue):
|
||||
output_queue = multiprocessing.Queue()
|
||||
get_class(settings.OUTPUT)(core_queue, output_queue)
|
||||
return output_queue
|
||||
output = get_class(settings.OUTPUT)(core_queue)
|
||||
output.start()
|
||||
return output
|
||||
|
||||
def setup_backend(self, core_queue, output_queue):
|
||||
return get_class(settings.BACKENDS[0])(core_queue, output_queue)
|
||||
def setup_backend(self, core_queue, output):
|
||||
return get_class(settings.BACKENDS[0])(core_queue, output)
|
||||
|
||||
def setup_frontend(self, core_queue, backend):
|
||||
frontend = get_class(settings.FRONTENDS[0])(core_queue, backend)
|
||||
@ -73,7 +73,7 @@ class CoreProcess(BaseProcess):
|
||||
|
||||
def process_message(self, message):
|
||||
if message.get('to') == 'output':
|
||||
self.output_queue.put(message)
|
||||
self.output.process_message(message)
|
||||
elif message.get('to') == 'frontend':
|
||||
self.frontend.process_message(message)
|
||||
elif message['command'] == 'end_of_track':
|
||||
|
||||
@ -6,14 +6,16 @@ pygst.require('0.10')
|
||||
import gst
|
||||
|
||||
import logging
|
||||
import multiprocessing
|
||||
import threading
|
||||
|
||||
from mopidy import settings
|
||||
from mopidy.outputs.base import BaseOutput
|
||||
from mopidy.utils.process import BaseProcess, unpickle_connection
|
||||
|
||||
logger = logging.getLogger('mopidy.outputs.gstreamer')
|
||||
|
||||
class GStreamerOutput(object):
|
||||
class GStreamerOutput(BaseOutput):
|
||||
"""
|
||||
Audio output through GStreamer.
|
||||
|
||||
@ -24,13 +26,25 @@ class GStreamerOutput(object):
|
||||
- :attr:`mopidy.settings.GSTREAMER_AUDIO_SINK`
|
||||
"""
|
||||
|
||||
def __init__(self, core_queue, output_queue):
|
||||
self.process = GStreamerProcess(core_queue, output_queue)
|
||||
def __init__(self, core_queue):
|
||||
super(GStreamerOutput, self).__init__(core_queue)
|
||||
self.output_queue = multiprocessing.Queue()
|
||||
self.process = GStreamerProcess(core_queue, self.output_queue)
|
||||
|
||||
def start(self):
|
||||
self.process.start()
|
||||
|
||||
def destroy(self):
|
||||
self.process.terminate()
|
||||
|
||||
def process_message(self, message):
|
||||
"""
|
||||
Processes messages with the GStreamer output as destination.
|
||||
"""
|
||||
assert message['to'] == 'output', \
|
||||
u'Message recipient must be "output".'
|
||||
self.output_queue.put(message)
|
||||
|
||||
class GStreamerMessagesThread(threading.Thread):
|
||||
def run(self):
|
||||
gobject.MainLoop().run()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user