Move MpdHandler usage from MpdSession to CoreProcess
This commit is contained in:
parent
af4d1f702e
commit
0e0a9e67dd
@ -19,7 +19,6 @@ def main():
|
||||
# multiprocessing branch plan
|
||||
# ---------------------------
|
||||
#
|
||||
# TODO Init MpdHandler from backend/core
|
||||
# TODO Init MpdServer in MainThread or in new Process?
|
||||
|
||||
main_queue = Queue()
|
||||
@ -31,7 +30,7 @@ def main():
|
||||
while True:
|
||||
message = main_queue.get()
|
||||
if message['command'] == 'core_ready':
|
||||
MpdServer(backend=None)
|
||||
MpdServer(core_queue=core_queue)
|
||||
asyncore.loop()
|
||||
else:
|
||||
logger.warning(u'Cannot handle message: %s', message)
|
||||
|
||||
@ -1,23 +1,26 @@
|
||||
import logging
|
||||
from multiprocessing import Process, Queue
|
||||
import multiprocessing
|
||||
|
||||
from mopidy import get_class, settings
|
||||
from mopidy import get_class, settings, unpickle_connection
|
||||
|
||||
logger = logging.getLogger('mopidy.core')
|
||||
|
||||
class CoreProcess(Process):
|
||||
class CoreProcess(multiprocessing.Process):
|
||||
def __init__(self, core_queue=None, main_queue=None, server_queue=None):
|
||||
Process.__init__(self)
|
||||
multiprocessing.Process.__init__(self)
|
||||
self.queue = core_queue
|
||||
self.main_queue = main_queue
|
||||
self.server_queue = server_queue
|
||||
|
||||
def run(self):
|
||||
self._setup()
|
||||
backend = get_class(settings.BACKENDS[0])()
|
||||
frontend = get_class(settings.FRONTEND)(backend=backend)
|
||||
self.main_queue.put({'command': 'core_ready'})
|
||||
while True:
|
||||
message = self.queue.get()
|
||||
# TODO Do something with the message
|
||||
|
||||
def _setup(self):
|
||||
self.backend = get_class(settings.BACKENDS[0])()
|
||||
self.main_queue.put({'command': 'core_ready'})
|
||||
if message['command'] == 'mpd_request':
|
||||
response = frontend.handle_request(message['request'])
|
||||
connection = unpickle_connection(message['reply_to'])
|
||||
connection.send(response)
|
||||
else:
|
||||
logger.warning(u'Cannot handle message: %s', message)
|
||||
|
||||
@ -10,10 +10,10 @@ from mopidy.mpd.session import MpdSession
|
||||
logger = logging.getLogger(u'mpd.server')
|
||||
|
||||
class MpdServer(asyncore.dispatcher):
|
||||
def __init__(self, session_class=MpdSession, backend=None):
|
||||
def __init__(self, session_class=MpdSession, core_queue=None):
|
||||
asyncore.dispatcher.__init__(self)
|
||||
self.session_class = session_class
|
||||
self.backend = backend
|
||||
self.core_queue = core_queue
|
||||
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.set_reuse_addr()
|
||||
self.bind((settings.MPD_SERVER_HOSTNAME, settings.MPD_SERVER_PORT))
|
||||
@ -26,7 +26,7 @@ class MpdServer(asyncore.dispatcher):
|
||||
(client_socket, client_address) = self.accept()
|
||||
logger.info(u'Connection from: [%s]:%s', *client_address)
|
||||
self.session_class(self, client_socket, client_address,
|
||||
backend=self.backend)
|
||||
core_queue=self.core_queue)
|
||||
|
||||
def handle_close(self):
|
||||
self.close()
|
||||
|
||||
@ -1,9 +1,9 @@
|
||||
import asynchat
|
||||
import logging
|
||||
import multiprocessing
|
||||
|
||||
from mopidy import get_mpd_protocol_version
|
||||
from mopidy import get_mpd_protocol_version, pickle_connection
|
||||
from mopidy.mpd import MpdAckError
|
||||
from mopidy.mpd.handler import MpdHandler
|
||||
|
||||
logger = logging.getLogger(u'mpd.session')
|
||||
|
||||
@ -22,14 +22,13 @@ def indent(string, places=4, linebreak=LINE_TERMINATOR):
|
||||
return result
|
||||
|
||||
class MpdSession(asynchat.async_chat):
|
||||
def __init__(self, server, client_socket, client_address, backend,
|
||||
handler_class=MpdHandler):
|
||||
def __init__(self, server, client_socket, client_address, core_queue):
|
||||
asynchat.async_chat.__init__(self, sock=client_socket)
|
||||
self.server = server
|
||||
self.client_address = client_address
|
||||
self.core_queue = core_queue
|
||||
self.input_buffer = []
|
||||
self.set_terminator(LINE_TERMINATOR.encode(ENCODING))
|
||||
self.handler = handler_class(session=self, backend=backend)
|
||||
self.send_response(u'OK MPD %s' % get_mpd_protocol_version())
|
||||
|
||||
def do_close(self):
|
||||
@ -51,7 +50,14 @@ class MpdSession(asynchat.async_chat):
|
||||
|
||||
def handle_request(self, input):
|
||||
try:
|
||||
response = self.handler.handle_request(input)
|
||||
my_end, other_end = multiprocessing.Pipe()
|
||||
self.core_queue.put({
|
||||
'command': 'mpd_request',
|
||||
'request': input,
|
||||
'reply_to': pickle_connection(other_end),
|
||||
})
|
||||
my_end.poll(None)
|
||||
response = my_end.recv()
|
||||
if response is not None:
|
||||
self.handle_response(response)
|
||||
except MpdAckError, e:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user