diff --git a/mopidy/__init__.py b/mopidy/__init__.py index 176bef6a..5770716f 100644 --- a/mopidy/__init__.py +++ b/mopidy/__init__.py @@ -1,11 +1,35 @@ +import logging +from multiprocessing.reduction import reduce_connection +import pickle + from mopidy import settings as raw_settings +logger = logging.getLogger('mopidy') + def get_version(): return u'0.1.dev' def get_mpd_protocol_version(): return u'0.16.0' +def get_class(name): + module_name = name[:name.rindex('.')] + class_name = name[name.rindex('.') + 1:] + logger.info('Loading: %s from %s', class_name, module_name) + module = __import__(module_name, globals(), locals(), [class_name], -1) + class_object = getattr(module, class_name) + return class_object + +def pickle_connection(connection): + return pickle.dumps(reduce_connection(connection)) + +def unpickle_connection(pickled_connection): + # From http://stackoverflow.com/questions/1446004 + unpickled = pickle.loads(pickled_connection) + func = unpickled[0] + args = unpickled[1] + return func(*args) + class SettingsError(Exception): pass diff --git a/mopidy/__main__.py b/mopidy/__main__.py index d2cf09a7..0666d99b 100644 --- a/mopidy/__main__.py +++ b/mopidy/__main__.py @@ -1,21 +1,23 @@ import asyncore import logging +import multiprocessing import os import sys sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../'))) -from mopidy import settings, SettingsError -from mopidy.mpd.server import MpdServer +from mopidy import get_class, settings, SettingsError +from mopidy.core import CoreProcess -logger = logging.getLogger('mopidy') +logger = logging.getLogger('mopidy.main') def main(): _setup_logging(2) - mixer = _get_class(settings.MIXER)() - backend = _get_class(settings.BACKENDS[0])(mixer=mixer) - MpdServer(backend=backend) + core_queue = multiprocessing.Queue() + core = CoreProcess(core_queue) + core.start() + get_class(settings.SERVER)(core_queue=core_queue) asyncore.loop() def _setup_logging(verbosity_level): @@ -30,14 +32,6 @@ def _setup_logging(verbosity_level): level=level, ) -def _get_class(name): - module_name = name[:name.rindex('.')] - class_name = name[name.rindex('.') + 1:] - logger.info('Loading: %s from %s', class_name, module_name) - module = __import__(module_name, globals(), locals(), [class_name], -1) - class_object = getattr(module, class_name) - return class_object - if __name__ == '__main__': try: main() diff --git a/mopidy/backends/__init__.py b/mopidy/backends/__init__.py index 93b74498..16f41b36 100644 --- a/mopidy/backends/__init__.py +++ b/mopidy/backends/__init__.py @@ -3,13 +3,22 @@ import logging import random import time +from mopidy import get_class, settings from mopidy.models import Playlist logger = logging.getLogger('backends.base') class BaseBackend(object): - def __init__(self, mixer=None): - self.mixer = mixer + def __init__(self, core_queue=None, mixer=None): + self.core_queue = core_queue + if mixer is not None: + self.mixer = mixer + else: + self.mixer = get_class(settings.MIXER)() + + #: A :class:`multiprocessing.Queue` which can be used by e.g. library + #: callbacks to send messages to the core. + core_queue = None #: The current playlist controller. An instance of #: :class:`BaseCurrentPlaylistController`. @@ -379,6 +388,13 @@ class BasePlaybackController(object): def volume(self, volume): self.backend.mixer.volume = volume + def end_of_track_callback(self): + """Tell the playback controller that end of track is reached.""" + if self.next_track is not None: + self.next() + else: + self.stop() + def new_playlist_loaded_callback(self): """Tell the playback controller that a new playlist has been loaded.""" self.current_track = None diff --git a/mopidy/backends/despotify.py b/mopidy/backends/despotify.py index 5bd3552b..11ad45c5 100644 --- a/mopidy/backends/despotify.py +++ b/mopidy/backends/despotify.py @@ -53,7 +53,8 @@ class DespotifyBackend(BaseBackend): logger.info(u'Connecting to Spotify') return DespotifySessionManager( settings.SPOTIFY_USERNAME.encode(ENCODING), - settings.SPOTIFY_PASSWORD.encode(ENCODING)) + settings.SPOTIFY_PASSWORD.encode(ENCODING), + core_queue=self.core_queue) class DespotifyCurrentPlaylistController(BaseCurrentPlaylistController): @@ -164,9 +165,10 @@ class DespotifySessionManager(spytify.Spytify): def __init__(self, *args, **kwargs): kwargs['callback'] = self.callback + self.core_queue = kwargs.pop('core_queue') super(DespotifySessionManager, self).__init__(*args, **kwargs) def callback(self, signal, data): if signal == self.DESPOTIFY_END_OF_PLAYLIST: logger.debug('Despotify signalled end of playlist') - # TODO Ask backend to play next track + self.core_queue.put({'command': 'end_of_track'}) diff --git a/mopidy/core.py b/mopidy/core.py new file mode 100644 index 00000000..acb3b984 --- /dev/null +++ b/mopidy/core.py @@ -0,0 +1,25 @@ +import logging +import multiprocessing + +from mopidy import get_class, settings, unpickle_connection + +logger = logging.getLogger('mopidy.core') + +class CoreProcess(multiprocessing.Process): + def __init__(self, core_queue): + multiprocessing.Process.__init__(self) + self.core_queue = core_queue + + def run(self): + backend = get_class(settings.BACKENDS[0])(core_queue=self.core_queue) + frontend = get_class(settings.FRONTEND)(backend=backend) + while True: + message = self.core_queue.get() + if message['command'] == 'mpd_request': + response = frontend.handle_request(message['request']) + connection = unpickle_connection(message['reply_to']) + connection.send(response) + elif message['command'] == 'end_of_track': + backend.playback.end_of_track_callback() + else: + logger.warning(u'Cannot handle message: %s', message) diff --git a/mopidy/mpd/server.py b/mopidy/mpd/server.py index e0173574..a61e2aa8 100644 --- a/mopidy/mpd/server.py +++ b/mopidy/mpd/server.py @@ -10,23 +10,23 @@ from mopidy.mpd.session import MpdSession logger = logging.getLogger(u'mpd.server') class MpdServer(asyncore.dispatcher): - def __init__(self, session_class=MpdSession, backend=None): + def __init__(self, session_class=MpdSession, core_queue=None): asyncore.dispatcher.__init__(self) self.session_class = session_class - self.backend = backend + self.core_queue = core_queue self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() - self.bind((settings.MPD_SERVER_HOSTNAME, settings.MPD_SERVER_PORT)) + self.bind((settings.SERVER_HOSTNAME, settings.SERVER_PORT)) self.listen(1) self.started_at = int(time.time()) logger.info(u'Please connect to %s port %s using an MPD client.', - settings.MPD_SERVER_HOSTNAME, settings.MPD_SERVER_PORT) + settings.SERVER_HOSTNAME, settings.SERVER_PORT) def handle_accept(self): (client_socket, client_address) = self.accept() logger.info(u'Connection from: [%s]:%s', *client_address) self.session_class(self, client_socket, client_address, - backend=self.backend) + core_queue=self.core_queue) def handle_close(self): self.close() diff --git a/mopidy/mpd/session.py b/mopidy/mpd/session.py index 19d8c1c5..4c9bb0c5 100644 --- a/mopidy/mpd/session.py +++ b/mopidy/mpd/session.py @@ -1,9 +1,9 @@ import asynchat import logging +import multiprocessing -from mopidy import get_mpd_protocol_version +from mopidy import get_mpd_protocol_version, pickle_connection from mopidy.mpd import MpdAckError -from mopidy.mpd.handler import MpdHandler logger = logging.getLogger(u'mpd.session') @@ -22,14 +22,13 @@ def indent(string, places=4, linebreak=LINE_TERMINATOR): return result class MpdSession(asynchat.async_chat): - def __init__(self, server, client_socket, client_address, backend, - handler_class=MpdHandler): + def __init__(self, server, client_socket, client_address, core_queue): asynchat.async_chat.__init__(self, sock=client_socket) self.server = server self.client_address = client_address + self.core_queue = core_queue self.input_buffer = [] self.set_terminator(LINE_TERMINATOR.encode(ENCODING)) - self.handler = handler_class(session=self, backend=backend) self.send_response(u'OK MPD %s' % get_mpd_protocol_version()) def do_close(self): @@ -51,7 +50,14 @@ class MpdSession(asynchat.async_chat): def handle_request(self, input): try: - response = self.handler.handle_request(input) + my_end, other_end = multiprocessing.Pipe() + self.core_queue.put({ + 'command': 'mpd_request', + 'request': input, + 'reply_to': pickle_connection(other_end), + }) + my_end.poll(None) + response = my_end.recv() if response is not None: self.handle_response(response) except MpdAckError, e: diff --git a/mopidy/settings/default.py b/mopidy/settings/default.py index 7fd41fa6..874c2f87 100644 --- a/mopidy/settings/default.py +++ b/mopidy/settings/default.py @@ -23,7 +23,12 @@ BACKENDS = ( #: The log format used on the console. See #: http://docs.python.org/library/logging.html#formatter-objects for details on #: the format. -CONSOLE_LOG_FORMAT = u'%(levelname)-8s %(asctime)s [%(threadName)s] %(name)s\n %(message)s' +CONSOLE_LOG_FORMAT = u'%(levelname)-8s %(asctime)s [%(process)d:%(threadName)s] %(name)s\n %(message)s' + +#: Protocol frontend to use. Default:: +#: +#: FRONTEND = u'mopidy.mpd.handler.MpdHandler' +FRONTEND = u'mopidy.mpd.handler.MpdHandler' #: Sound mixer to use. See :mod:`mopidy.mixers` for all available mixers. #: @@ -65,16 +70,21 @@ MIXER_EXT_SPEAKERS_A = None #: *Default:* :class:`None`. MIXER_EXT_SPEAKERS_B = None +#: Server to use. Default:: +#: +#: SERVER = u'mopidy.mpd.server.MpdServer' +SERVER = u'mopidy.mpd.server.MpdServer' + #: Which address Mopidy should bind to. Examples: #: #: ``localhost`` #: Listens only on the loopback interface. *Default.* #: ``0.0.0.0`` #: Listens on all interfaces. -MPD_SERVER_HOSTNAME = u'localhost' +SERVER_HOSTNAME = u'localhost' #: Which TCP port Mopidy should listen to. *Default: 6600* -MPD_SERVER_PORT = 6600 +SERVER_PORT = 6600 #: Your Spotify Premium username. Used by all Spotify backends. SPOTIFY_USERNAME = u''