Merge branch 'feature/threads-not-processes' into develop

This commit is contained in:
Stein Magnus Jodal 2010-08-27 12:19:30 +02:00
commit 71014beb46
7 changed files with 88 additions and 51 deletions

View File

@ -5,12 +5,13 @@ import threading
from spotify.manager import SpotifySessionManager from spotify.manager import SpotifySessionManager
from mopidy import get_version, settings from mopidy import get_version, settings
from mopidy.models import Playlist
from mopidy.backends.libspotify.translator import LibspotifyTranslator from mopidy.backends.libspotify.translator import LibspotifyTranslator
from mopidy.models import Playlist
from mopidy.utils.process import BaseThread
logger = logging.getLogger('mopidy.backends.libspotify.session_manager') logger = logging.getLogger('mopidy.backends.libspotify.session_manager')
class LibspotifySessionManager(SpotifySessionManager, threading.Thread): class LibspotifySessionManager(SpotifySessionManager, BaseThread):
cache_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) cache_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE)
settings_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) settings_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE)
appkey_file = os.path.join(os.path.dirname(__file__), 'spotify_appkey.key') appkey_file = os.path.join(os.path.dirname(__file__), 'spotify_appkey.key')
@ -18,7 +19,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
def __init__(self, username, password, core_queue, output): def __init__(self, username, password, core_queue, output):
SpotifySessionManager.__init__(self, username, password) SpotifySessionManager.__init__(self, username, password)
threading.Thread.__init__(self) BaseThread.__init__(self)
self.name = 'LibspotifySMThread' self.name = 'LibspotifySMThread'
# Run as a daemon thread, so Mopidy won't wait for this thread to exit # Run as a daemon thread, so Mopidy won't wait for this thread to exit
# before Mopidy exits. # before Mopidy exits.
@ -28,7 +29,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread):
self.connected = threading.Event() self.connected = threading.Event()
self.session = None self.session = None
def run(self): def run_inside_try(self):
self.connect() self.connect()
def logged_in(self, session, error): def logged_in(self, session, error):

View File

@ -11,7 +11,7 @@ except ImportError as e:
from mopidy import get_version, settings, SettingsError from mopidy import get_version, settings, SettingsError
from mopidy.frontends.base import BaseFrontend from mopidy.frontends.base import BaseFrontend
from mopidy.utils.process import BaseProcess from mopidy.utils.process import BaseThread
logger = logging.getLogger('mopidy.frontends.lastfm') logger = logging.getLogger('mopidy.frontends.lastfm')
@ -45,22 +45,22 @@ class LastfmFrontend(BaseFrontend):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(LastfmFrontend, self).__init__(*args, **kwargs) super(LastfmFrontend, self).__init__(*args, **kwargs)
(self.connection, other_end) = multiprocessing.Pipe() (self.connection, other_end) = multiprocessing.Pipe()
self.process = LastfmFrontendProcess(other_end) self.thread = LastfmFrontendThread(other_end)
def start(self): def start(self):
self.process.start() self.thread.start()
def destroy(self): def destroy(self):
self.process.destroy() self.thread.destroy()
def process_message(self, message): def process_message(self, message):
self.connection.send(message) self.connection.send(message)
class LastfmFrontendProcess(BaseProcess): class LastfmFrontendThread(BaseThread):
def __init__(self, connection): def __init__(self, connection):
super(LastfmFrontendProcess, self).__init__() super(LastfmFrontendThread, self).__init__()
self.name = u'LastfmFrontendProcess' self.name = u'LastfmFrontendThread'
self.daemon = True self.daemon = True
self.connection = connection self.connection = connection
self.lastfm = None self.lastfm = None

View File

@ -2,7 +2,7 @@ import logging
from mopidy.frontends.base import BaseFrontend from mopidy.frontends.base import BaseFrontend
from mopidy.frontends.mpd.dispatcher import MpdDispatcher from mopidy.frontends.mpd.dispatcher import MpdDispatcher
from mopidy.frontends.mpd.process import MpdProcess from mopidy.frontends.mpd.thread import MpdThread
from mopidy.utils.process import unpickle_connection from mopidy.utils.process import unpickle_connection
logger = logging.getLogger('mopidy.frontends.mpd') logger = logging.getLogger('mopidy.frontends.mpd')
@ -19,17 +19,17 @@ class MpdFrontend(BaseFrontend):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(MpdFrontend, self).__init__(*args, **kwargs) super(MpdFrontend, self).__init__(*args, **kwargs)
self.process = None self.thread = None
self.dispatcher = MpdDispatcher(self.backend) self.dispatcher = MpdDispatcher(self.backend)
def start(self): def start(self):
"""Starts the MPD server.""" """Starts the MPD server."""
self.process = MpdProcess(self.core_queue) self.thread = MpdThread(self.core_queue)
self.process.start() self.thread.start()
def destroy(self): def destroy(self):
"""Destroys the MPD server.""" """Destroys the MPD server."""
self.process.destroy() self.thread.destroy()
def process_message(self, message): def process_message(self, message):
""" """

View File

@ -1,18 +0,0 @@
import asyncore
import logging
from mopidy.frontends.mpd.server import MpdServer
from mopidy.utils.process import BaseProcess
logger = logging.getLogger('mopidy.frontends.mpd.process')
class MpdProcess(BaseProcess):
def __init__(self, core_queue):
super(MpdProcess, self).__init__()
self.name = 'MpdProcess'
self.core_queue = core_queue
def run_inside_try(self):
server = MpdServer(self.core_queue)
server.start()
asyncore.loop()

View File

@ -0,0 +1,20 @@
import asyncore
import logging
from mopidy.frontends.mpd.server import MpdServer
from mopidy.utils.process import BaseThread
logger = logging.getLogger('mopidy.frontends.mpd.thread')
class MpdThread(BaseThread):
def __init__(self, core_queue):
super(MpdThread, self).__init__()
self.name = u'MpdThread'
self.daemon = True
self.core_queue = core_queue
def run_inside_try(self):
logger.debug(u'Starting MPD server thread')
server = MpdServer(self.core_queue)
server.start()
asyncore.loop()

View File

@ -7,11 +7,10 @@ import gst
import logging import logging
import multiprocessing import multiprocessing
import threading
from mopidy import settings from mopidy import settings
from mopidy.outputs.base import BaseOutput from mopidy.outputs.base import BaseOutput
from mopidy.utils.process import (BaseProcess, pickle_connection, from mopidy.utils.process import (BaseThread, pickle_connection,
unpickle_connection) unpickle_connection)
logger = logging.getLogger('mopidy.outputs.gstreamer') logger = logging.getLogger('mopidy.outputs.gstreamer')
@ -20,23 +19,30 @@ class GStreamerOutput(BaseOutput):
""" """
Audio output through GStreamer. Audio output through GStreamer.
Starts the :class:`GStreamerProcess`. Starts :class:`GStreamerMessagesThread` and :class:`GStreamerPlayerThread`.
**Settings:** **Settings:**
- :attr:`mopidy.settings.GSTREAMER_AUDIO_SINK` - :attr:`mopidy.settings.GSTREAMER_AUDIO_SINK`
""" """
def __init__(self, core_queue): def __init__(self, *args, **kwargs):
super(GStreamerOutput, self).__init__(core_queue) super(GStreamerOutput, self).__init__(*args, **kwargs)
# Start a helper thread that can run the gobject.MainLoop
self.messages_thread = GStreamerMessagesThread()
# Start a helper thread that can process the output_queue
self.output_queue = multiprocessing.Queue() self.output_queue = multiprocessing.Queue()
self.process = GStreamerProcess(core_queue, self.output_queue) self.player_thread = GStreamerPlayerThread(self.core_queue,
self.output_queue)
def start(self): def start(self):
self.process.start() self.messages_thread.start()
self.player_thread.start()
def destroy(self): def destroy(self):
self.process.terminate() self.messages_thread.destroy()
self.player_thread.destroy()
def process_message(self, message): def process_message(self, message):
assert message['to'] == 'output', \ assert message['to'] == 'output', \
@ -84,12 +90,17 @@ class GStreamerOutput(BaseOutput):
return self._send_recv({'command': 'set_volume', 'volume': volume}) return self._send_recv({'command': 'set_volume', 'volume': volume})
class GStreamerMessagesThread(threading.Thread): class GStreamerMessagesThread(BaseThread):
def run(self): def __init__(self):
super(GStreamerMessagesThread, self).__init__()
self.name = u'GStreamerMessagesThread'
self.daemon = True
def run_inside_try(self):
gobject.MainLoop().run() gobject.MainLoop().run()
class GStreamerProcess(BaseProcess): class GStreamerPlayerThread(BaseThread):
""" """
A process for all work related to GStreamer. A process for all work related to GStreamer.
@ -102,7 +113,9 @@ class GStreamerProcess(BaseProcess):
""" """
def __init__(self, core_queue, output_queue): def __init__(self, core_queue, output_queue):
super(GStreamerProcess, self).__init__(name='GStreamerProcess') super(GStreamerPlayerThread, self).__init__()
self.name = u'GStreamerPlayerThread'
self.daemon = True
self.core_queue = core_queue self.core_queue = core_queue
self.output_queue = output_queue self.output_queue = output_queue
self.gst_pipeline = None self.gst_pipeline = None
@ -116,11 +129,6 @@ class GStreamerProcess(BaseProcess):
def setup(self): def setup(self):
logger.debug(u'Setting up GStreamer pipeline') logger.debug(u'Setting up GStreamer pipeline')
# Start a helper thread that can run the gobject.MainLoop
messages_thread = GStreamerMessagesThread()
messages_thread.daemon = True
messages_thread.start()
self.gst_pipeline = gst.parse_launch(' ! '.join([ self.gst_pipeline = gst.parse_launch(' ! '.join([
'audioconvert name=convert', 'audioconvert name=convert',
'volume name=volume', 'volume name=volume',

View File

@ -1,5 +1,6 @@
import logging import logging
import multiprocessing import multiprocessing
import multiprocessing.dummy
from multiprocessing.reduction import reduce_connection from multiprocessing.reduction import reduce_connection
import pickle import pickle
import sys import sys
@ -40,3 +41,28 @@ class BaseProcess(multiprocessing.Process):
def destroy(self): def destroy(self):
self.terminate() self.terminate()
class BaseThread(multiprocessing.dummy.Process):
def run(self):
logger.debug(u'%s: Starting process', self.name)
try:
self.run_inside_try()
except KeyboardInterrupt:
logger.info(u'%s: Interrupted by user', self.name)
sys.exit(0)
except SettingsError as e:
logger.error(e.message)
sys.exit(1)
except ImportError as e:
logger.error(e)
sys.exit(1)
except Exception as e:
logger.exception(e)
raise e
def run_inside_try(self):
raise NotImplementedError
def destroy(self):
pass