diff --git a/mopidy/backends/libspotify/session_manager.py b/mopidy/backends/libspotify/session_manager.py index fda5216a..9554fa3f 100644 --- a/mopidy/backends/libspotify/session_manager.py +++ b/mopidy/backends/libspotify/session_manager.py @@ -5,12 +5,13 @@ import threading from spotify.manager import SpotifySessionManager from mopidy import get_version, settings -from mopidy.models import Playlist from mopidy.backends.libspotify.translator import LibspotifyTranslator +from mopidy.models import Playlist +from mopidy.utils.process import BaseThread logger = logging.getLogger('mopidy.backends.libspotify.session_manager') -class LibspotifySessionManager(SpotifySessionManager, threading.Thread): +class LibspotifySessionManager(SpotifySessionManager, BaseThread): cache_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) settings_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) appkey_file = os.path.join(os.path.dirname(__file__), 'spotify_appkey.key') @@ -18,7 +19,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def __init__(self, username, password, core_queue, output): SpotifySessionManager.__init__(self, username, password) - threading.Thread.__init__(self) + BaseThread.__init__(self) self.name = 'LibspotifySMThread' # Run as a daemon thread, so Mopidy won't wait for this thread to exit # before Mopidy exits. @@ -28,7 +29,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): self.connected = threading.Event() self.session = None - def run(self): + def run_inside_try(self): self.connect() def logged_in(self, session, error): diff --git a/mopidy/frontends/lastfm.py b/mopidy/frontends/lastfm.py index 03aebc45..bba69a5b 100644 --- a/mopidy/frontends/lastfm.py +++ b/mopidy/frontends/lastfm.py @@ -11,7 +11,7 @@ except ImportError as e: from mopidy import get_version, settings, SettingsError from mopidy.frontends.base import BaseFrontend -from mopidy.utils.process import BaseProcess +from mopidy.utils.process import BaseThread logger = logging.getLogger('mopidy.frontends.lastfm') @@ -45,22 +45,22 @@ class LastfmFrontend(BaseFrontend): def __init__(self, *args, **kwargs): super(LastfmFrontend, self).__init__(*args, **kwargs) (self.connection, other_end) = multiprocessing.Pipe() - self.process = LastfmFrontendProcess(other_end) + self.thread = LastfmFrontendThread(other_end) def start(self): - self.process.start() + self.thread.start() def destroy(self): - self.process.destroy() + self.thread.destroy() def process_message(self, message): self.connection.send(message) -class LastfmFrontendProcess(BaseProcess): +class LastfmFrontendThread(BaseThread): def __init__(self, connection): - super(LastfmFrontendProcess, self).__init__() - self.name = u'LastfmFrontendProcess' + super(LastfmFrontendThread, self).__init__() + self.name = u'LastfmFrontendThread' self.daemon = True self.connection = connection self.lastfm = None diff --git a/mopidy/frontends/mpd/__init__.py b/mopidy/frontends/mpd/__init__.py index 6450889e..ce9abc6d 100644 --- a/mopidy/frontends/mpd/__init__.py +++ b/mopidy/frontends/mpd/__init__.py @@ -2,7 +2,7 @@ import logging from mopidy.frontends.base import BaseFrontend from mopidy.frontends.mpd.dispatcher import MpdDispatcher -from mopidy.frontends.mpd.process import MpdProcess +from mopidy.frontends.mpd.thread import MpdThread from mopidy.utils.process import unpickle_connection logger = logging.getLogger('mopidy.frontends.mpd') @@ -19,17 +19,17 @@ class MpdFrontend(BaseFrontend): def __init__(self, *args, **kwargs): super(MpdFrontend, self).__init__(*args, **kwargs) - self.process = None + self.thread = None self.dispatcher = MpdDispatcher(self.backend) def start(self): """Starts the MPD server.""" - self.process = MpdProcess(self.core_queue) - self.process.start() + self.thread = MpdThread(self.core_queue) + self.thread.start() def destroy(self): """Destroys the MPD server.""" - self.process.destroy() + self.thread.destroy() def process_message(self, message): """ diff --git a/mopidy/frontends/mpd/process.py b/mopidy/frontends/mpd/process.py deleted file mode 100644 index 95f3e271..00000000 --- a/mopidy/frontends/mpd/process.py +++ /dev/null @@ -1,18 +0,0 @@ -import asyncore -import logging - -from mopidy.frontends.mpd.server import MpdServer -from mopidy.utils.process import BaseProcess - -logger = logging.getLogger('mopidy.frontends.mpd.process') - -class MpdProcess(BaseProcess): - def __init__(self, core_queue): - super(MpdProcess, self).__init__() - self.name = 'MpdProcess' - self.core_queue = core_queue - - def run_inside_try(self): - server = MpdServer(self.core_queue) - server.start() - asyncore.loop() diff --git a/mopidy/frontends/mpd/thread.py b/mopidy/frontends/mpd/thread.py new file mode 100644 index 00000000..0fb048ec --- /dev/null +++ b/mopidy/frontends/mpd/thread.py @@ -0,0 +1,20 @@ +import asyncore +import logging + +from mopidy.frontends.mpd.server import MpdServer +from mopidy.utils.process import BaseThread + +logger = logging.getLogger('mopidy.frontends.mpd.thread') + +class MpdThread(BaseThread): + def __init__(self, core_queue): + super(MpdThread, self).__init__() + self.name = u'MpdThread' + self.daemon = True + self.core_queue = core_queue + + def run_inside_try(self): + logger.debug(u'Starting MPD server thread') + server = MpdServer(self.core_queue) + server.start() + asyncore.loop() diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index def37f72..346f6254 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -7,11 +7,10 @@ import gst import logging import multiprocessing -import threading from mopidy import settings from mopidy.outputs.base import BaseOutput -from mopidy.utils.process import (BaseProcess, pickle_connection, +from mopidy.utils.process import (BaseThread, pickle_connection, unpickle_connection) logger = logging.getLogger('mopidy.outputs.gstreamer') @@ -20,23 +19,30 @@ class GStreamerOutput(BaseOutput): """ Audio output through GStreamer. - Starts the :class:`GStreamerProcess`. + Starts :class:`GStreamerMessagesThread` and :class:`GStreamerPlayerThread`. **Settings:** - :attr:`mopidy.settings.GSTREAMER_AUDIO_SINK` """ - def __init__(self, core_queue): - super(GStreamerOutput, self).__init__(core_queue) + def __init__(self, *args, **kwargs): + super(GStreamerOutput, self).__init__(*args, **kwargs) + # Start a helper thread that can run the gobject.MainLoop + self.messages_thread = GStreamerMessagesThread() + + # Start a helper thread that can process the output_queue self.output_queue = multiprocessing.Queue() - self.process = GStreamerProcess(core_queue, self.output_queue) + self.player_thread = GStreamerPlayerThread(self.core_queue, + self.output_queue) def start(self): - self.process.start() + self.messages_thread.start() + self.player_thread.start() def destroy(self): - self.process.terminate() + self.messages_thread.destroy() + self.player_thread.destroy() def process_message(self, message): assert message['to'] == 'output', \ @@ -84,12 +90,17 @@ class GStreamerOutput(BaseOutput): return self._send_recv({'command': 'set_volume', 'volume': volume}) -class GStreamerMessagesThread(threading.Thread): - def run(self): +class GStreamerMessagesThread(BaseThread): + def __init__(self): + super(GStreamerMessagesThread, self).__init__() + self.name = u'GStreamerMessagesThread' + self.daemon = True + + def run_inside_try(self): gobject.MainLoop().run() -class GStreamerProcess(BaseProcess): +class GStreamerPlayerThread(BaseThread): """ A process for all work related to GStreamer. @@ -102,7 +113,9 @@ class GStreamerProcess(BaseProcess): """ def __init__(self, core_queue, output_queue): - super(GStreamerProcess, self).__init__(name='GStreamerProcess') + super(GStreamerPlayerThread, self).__init__() + self.name = u'GStreamerPlayerThread' + self.daemon = True self.core_queue = core_queue self.output_queue = output_queue self.gst_pipeline = None @@ -116,11 +129,6 @@ class GStreamerProcess(BaseProcess): def setup(self): logger.debug(u'Setting up GStreamer pipeline') - # Start a helper thread that can run the gobject.MainLoop - messages_thread = GStreamerMessagesThread() - messages_thread.daemon = True - messages_thread.start() - self.gst_pipeline = gst.parse_launch(' ! '.join([ 'audioconvert name=convert', 'volume name=volume', diff --git a/mopidy/utils/process.py b/mopidy/utils/process.py index c66a8ee8..09446c93 100644 --- a/mopidy/utils/process.py +++ b/mopidy/utils/process.py @@ -1,5 +1,6 @@ import logging import multiprocessing +import multiprocessing.dummy from multiprocessing.reduction import reduce_connection import pickle import sys @@ -40,3 +41,28 @@ class BaseProcess(multiprocessing.Process): def destroy(self): self.terminate() + + +class BaseThread(multiprocessing.dummy.Process): + def run(self): + logger.debug(u'%s: Starting process', self.name) + try: + self.run_inside_try() + except KeyboardInterrupt: + logger.info(u'%s: Interrupted by user', self.name) + sys.exit(0) + except SettingsError as e: + logger.error(e.message) + sys.exit(1) + except ImportError as e: + logger.error(e) + sys.exit(1) + except Exception as e: + logger.exception(e) + raise e + + def run_inside_try(self): + raise NotImplementedError + + def destroy(self): + pass