From 0e0a9e67dd0fb6028fc56617bb8ac4e536180dd4 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Sat, 20 Mar 2010 00:36:43 +0100 Subject: [PATCH] Move MpdHandler usage from MpdSession to CoreProcess --- mopidy/__main__.py | 3 +-- mopidy/core.py | 23 +++++++++++++---------- mopidy/mpd/server.py | 6 +++--- mopidy/mpd/session.py | 18 ++++++++++++------ 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/mopidy/__main__.py b/mopidy/__main__.py index 84b1d6a5..ecd82216 100644 --- a/mopidy/__main__.py +++ b/mopidy/__main__.py @@ -19,7 +19,6 @@ def main(): # multiprocessing branch plan # --------------------------- # - # TODO Init MpdHandler from backend/core # TODO Init MpdServer in MainThread or in new Process? main_queue = Queue() @@ -31,7 +30,7 @@ def main(): while True: message = main_queue.get() if message['command'] == 'core_ready': - MpdServer(backend=None) + MpdServer(core_queue=core_queue) asyncore.loop() else: logger.warning(u'Cannot handle message: %s', message) diff --git a/mopidy/core.py b/mopidy/core.py index bd48c818..bb1674ed 100644 --- a/mopidy/core.py +++ b/mopidy/core.py @@ -1,23 +1,26 @@ import logging -from multiprocessing import Process, Queue +import multiprocessing -from mopidy import get_class, settings +from mopidy import get_class, settings, unpickle_connection logger = logging.getLogger('mopidy.core') -class CoreProcess(Process): +class CoreProcess(multiprocessing.Process): def __init__(self, core_queue=None, main_queue=None, server_queue=None): - Process.__init__(self) + multiprocessing.Process.__init__(self) self.queue = core_queue self.main_queue = main_queue self.server_queue = server_queue def run(self): - self._setup() + backend = get_class(settings.BACKENDS[0])() + frontend = get_class(settings.FRONTEND)(backend=backend) + self.main_queue.put({'command': 'core_ready'}) while True: message = self.queue.get() - # TODO Do something with the message - - def _setup(self): - self.backend = get_class(settings.BACKENDS[0])() - self.main_queue.put({'command': 'core_ready'}) + if message['command'] == 'mpd_request': + response = frontend.handle_request(message['request']) + connection = unpickle_connection(message['reply_to']) + connection.send(response) + else: + logger.warning(u'Cannot handle message: %s', message) diff --git a/mopidy/mpd/server.py b/mopidy/mpd/server.py index e0173574..270f1f6b 100644 --- a/mopidy/mpd/server.py +++ b/mopidy/mpd/server.py @@ -10,10 +10,10 @@ from mopidy.mpd.session import MpdSession logger = logging.getLogger(u'mpd.server') class MpdServer(asyncore.dispatcher): - def __init__(self, session_class=MpdSession, backend=None): + def __init__(self, session_class=MpdSession, core_queue=None): asyncore.dispatcher.__init__(self) self.session_class = session_class - self.backend = backend + self.core_queue = core_queue self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((settings.MPD_SERVER_HOSTNAME, settings.MPD_SERVER_PORT)) @@ -26,7 +26,7 @@ class MpdServer(asyncore.dispatcher): (client_socket, client_address) = self.accept() logger.info(u'Connection from: [%s]:%s', *client_address) self.session_class(self, client_socket, client_address, - backend=self.backend) + core_queue=self.core_queue) def handle_close(self): self.close() diff --git a/mopidy/mpd/session.py b/mopidy/mpd/session.py index 19d8c1c5..4c9bb0c5 100644 --- a/mopidy/mpd/session.py +++ b/mopidy/mpd/session.py @@ -1,9 +1,9 @@ import asynchat import logging +import multiprocessing -from mopidy import get_mpd_protocol_version +from mopidy import get_mpd_protocol_version, pickle_connection from mopidy.mpd import MpdAckError -from mopidy.mpd.handler import MpdHandler logger = logging.getLogger(u'mpd.session') @@ -22,14 +22,13 @@ def indent(string, places=4, linebreak=LINE_TERMINATOR): return result class MpdSession(asynchat.async_chat): - def __init__(self, server, client_socket, client_address, backend, - handler_class=MpdHandler): + def __init__(self, server, client_socket, client_address, core_queue): asynchat.async_chat.__init__(self, sock=client_socket) self.server = server self.client_address = client_address + self.core_queue = core_queue self.input_buffer = [] self.set_terminator(LINE_TERMINATOR.encode(ENCODING)) - self.handler = handler_class(session=self, backend=backend) self.send_response(u'OK MPD %s' % get_mpd_protocol_version()) def do_close(self): @@ -51,7 +50,14 @@ class MpdSession(asynchat.async_chat): def handle_request(self, input): try: - response = self.handler.handle_request(input) + my_end, other_end = multiprocessing.Pipe() + self.core_queue.put({ + 'command': 'mpd_request', + 'request': input, + 'reply_to': pickle_connection(other_end), + }) + my_end.poll(None) + response = my_end.recv() if response is not None: self.handle_response(response) except MpdAckError, e: