Merge branch 'multiprocessing'

This commit is contained in:
Stein Magnus Jodal 2010-03-20 01:29:30 +01:00
commit 71031701d6
8 changed files with 109 additions and 32 deletions

View File

@ -1,11 +1,35 @@
import logging
from multiprocessing.reduction import reduce_connection
import pickle
from mopidy import settings as raw_settings
logger = logging.getLogger('mopidy')
def get_version():
return u'0.1.dev'
def get_mpd_protocol_version():
return u'0.16.0'
def get_class(name):
module_name = name[:name.rindex('.')]
class_name = name[name.rindex('.') + 1:]
logger.info('Loading: %s from %s', class_name, module_name)
module = __import__(module_name, globals(), locals(), [class_name], -1)
class_object = getattr(module, class_name)
return class_object
def pickle_connection(connection):
return pickle.dumps(reduce_connection(connection))
def unpickle_connection(pickled_connection):
# From http://stackoverflow.com/questions/1446004
unpickled = pickle.loads(pickled_connection)
func = unpickled[0]
args = unpickled[1]
return func(*args)
class SettingsError(Exception):
pass

View File

@ -1,21 +1,23 @@
import asyncore
import logging
import multiprocessing
import os
import sys
sys.path.insert(0,
os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))
from mopidy import settings, SettingsError
from mopidy.mpd.server import MpdServer
from mopidy import get_class, settings, SettingsError
from mopidy.core import CoreProcess
logger = logging.getLogger('mopidy')
logger = logging.getLogger('mopidy.main')
def main():
_setup_logging(2)
mixer = _get_class(settings.MIXER)()
backend = _get_class(settings.BACKENDS[0])(mixer=mixer)
MpdServer(backend=backend)
core_queue = multiprocessing.Queue()
core = CoreProcess(core_queue)
core.start()
get_class(settings.SERVER)(core_queue=core_queue)
asyncore.loop()
def _setup_logging(verbosity_level):
@ -30,14 +32,6 @@ def _setup_logging(verbosity_level):
level=level,
)
def _get_class(name):
module_name = name[:name.rindex('.')]
class_name = name[name.rindex('.') + 1:]
logger.info('Loading: %s from %s', class_name, module_name)
module = __import__(module_name, globals(), locals(), [class_name], -1)
class_object = getattr(module, class_name)
return class_object
if __name__ == '__main__':
try:
main()

View File

@ -3,13 +3,22 @@ import logging
import random
import time
from mopidy import get_class, settings
from mopidy.models import Playlist
logger = logging.getLogger('backends.base')
class BaseBackend(object):
def __init__(self, mixer=None):
self.mixer = mixer
def __init__(self, core_queue=None, mixer=None):
self.core_queue = core_queue
if mixer is not None:
self.mixer = mixer
else:
self.mixer = get_class(settings.MIXER)()
#: A :class:`multiprocessing.Queue` which can be used by e.g. library
#: callbacks to send messages to the core.
core_queue = None
#: The current playlist controller. An instance of
#: :class:`BaseCurrentPlaylistController`.
@ -379,6 +388,13 @@ class BasePlaybackController(object):
def volume(self, volume):
self.backend.mixer.volume = volume
def end_of_track_callback(self):
"""Tell the playback controller that end of track is reached."""
if self.next_track is not None:
self.next()
else:
self.stop()
def new_playlist_loaded_callback(self):
"""Tell the playback controller that a new playlist has been loaded."""
self.current_track = None

View File

@ -53,7 +53,8 @@ class DespotifyBackend(BaseBackend):
logger.info(u'Connecting to Spotify')
return DespotifySessionManager(
settings.SPOTIFY_USERNAME.encode(ENCODING),
settings.SPOTIFY_PASSWORD.encode(ENCODING))
settings.SPOTIFY_PASSWORD.encode(ENCODING),
core_queue=self.core_queue)
class DespotifyCurrentPlaylistController(BaseCurrentPlaylistController):
@ -164,9 +165,10 @@ class DespotifySessionManager(spytify.Spytify):
def __init__(self, *args, **kwargs):
kwargs['callback'] = self.callback
self.core_queue = kwargs.pop('core_queue')
super(DespotifySessionManager, self).__init__(*args, **kwargs)
def callback(self, signal, data):
if signal == self.DESPOTIFY_END_OF_PLAYLIST:
logger.debug('Despotify signalled end of playlist')
# TODO Ask backend to play next track
self.core_queue.put({'command': 'end_of_track'})

25
mopidy/core.py Normal file
View File

@ -0,0 +1,25 @@
import logging
import multiprocessing
from mopidy import get_class, settings, unpickle_connection
logger = logging.getLogger('mopidy.core')
class CoreProcess(multiprocessing.Process):
def __init__(self, core_queue):
multiprocessing.Process.__init__(self)
self.core_queue = core_queue
def run(self):
backend = get_class(settings.BACKENDS[0])(core_queue=self.core_queue)
frontend = get_class(settings.FRONTEND)(backend=backend)
while True:
message = self.core_queue.get()
if message['command'] == 'mpd_request':
response = frontend.handle_request(message['request'])
connection = unpickle_connection(message['reply_to'])
connection.send(response)
elif message['command'] == 'end_of_track':
backend.playback.end_of_track_callback()
else:
logger.warning(u'Cannot handle message: %s', message)

View File

@ -10,23 +10,23 @@ from mopidy.mpd.session import MpdSession
logger = logging.getLogger(u'mpd.server')
class MpdServer(asyncore.dispatcher):
def __init__(self, session_class=MpdSession, backend=None):
def __init__(self, session_class=MpdSession, core_queue=None):
asyncore.dispatcher.__init__(self)
self.session_class = session_class
self.backend = backend
self.core_queue = core_queue
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((settings.MPD_SERVER_HOSTNAME, settings.MPD_SERVER_PORT))
self.bind((settings.SERVER_HOSTNAME, settings.SERVER_PORT))
self.listen(1)
self.started_at = int(time.time())
logger.info(u'Please connect to %s port %s using an MPD client.',
settings.MPD_SERVER_HOSTNAME, settings.MPD_SERVER_PORT)
settings.SERVER_HOSTNAME, settings.SERVER_PORT)
def handle_accept(self):
(client_socket, client_address) = self.accept()
logger.info(u'Connection from: [%s]:%s', *client_address)
self.session_class(self, client_socket, client_address,
backend=self.backend)
core_queue=self.core_queue)
def handle_close(self):
self.close()

View File

@ -1,9 +1,9 @@
import asynchat
import logging
import multiprocessing
from mopidy import get_mpd_protocol_version
from mopidy import get_mpd_protocol_version, pickle_connection
from mopidy.mpd import MpdAckError
from mopidy.mpd.handler import MpdHandler
logger = logging.getLogger(u'mpd.session')
@ -22,14 +22,13 @@ def indent(string, places=4, linebreak=LINE_TERMINATOR):
return result
class MpdSession(asynchat.async_chat):
def __init__(self, server, client_socket, client_address, backend,
handler_class=MpdHandler):
def __init__(self, server, client_socket, client_address, core_queue):
asynchat.async_chat.__init__(self, sock=client_socket)
self.server = server
self.client_address = client_address
self.core_queue = core_queue
self.input_buffer = []
self.set_terminator(LINE_TERMINATOR.encode(ENCODING))
self.handler = handler_class(session=self, backend=backend)
self.send_response(u'OK MPD %s' % get_mpd_protocol_version())
def do_close(self):
@ -51,7 +50,14 @@ class MpdSession(asynchat.async_chat):
def handle_request(self, input):
try:
response = self.handler.handle_request(input)
my_end, other_end = multiprocessing.Pipe()
self.core_queue.put({
'command': 'mpd_request',
'request': input,
'reply_to': pickle_connection(other_end),
})
my_end.poll(None)
response = my_end.recv()
if response is not None:
self.handle_response(response)
except MpdAckError, e:

View File

@ -23,7 +23,12 @@ BACKENDS = (
#: The log format used on the console. See
#: http://docs.python.org/library/logging.html#formatter-objects for details on
#: the format.
CONSOLE_LOG_FORMAT = u'%(levelname)-8s %(asctime)s [%(threadName)s] %(name)s\n %(message)s'
CONSOLE_LOG_FORMAT = u'%(levelname)-8s %(asctime)s [%(process)d:%(threadName)s] %(name)s\n %(message)s'
#: Protocol frontend to use. Default::
#:
#: FRONTEND = u'mopidy.mpd.handler.MpdHandler'
FRONTEND = u'mopidy.mpd.handler.MpdHandler'
#: Sound mixer to use. See :mod:`mopidy.mixers` for all available mixers.
#:
@ -65,16 +70,21 @@ MIXER_EXT_SPEAKERS_A = None
#: *Default:* :class:`None`.
MIXER_EXT_SPEAKERS_B = None
#: Server to use. Default::
#:
#: SERVER = u'mopidy.mpd.server.MpdServer'
SERVER = u'mopidy.mpd.server.MpdServer'
#: Which address Mopidy should bind to. Examples:
#:
#: ``localhost``
#: Listens only on the loopback interface. *Default.*
#: ``0.0.0.0``
#: Listens on all interfaces.
MPD_SERVER_HOSTNAME = u'localhost'
SERVER_HOSTNAME = u'localhost'
#: Which TCP port Mopidy should listen to. *Default: 6600*
MPD_SERVER_PORT = 6600
SERVER_PORT = 6600
#: Your Spotify Premium username. Used by all Spotify backends.
SPOTIFY_USERNAME = u''