diff --git a/docs/changes.rst b/docs/changes.rst index ed3cb080..ed05050c 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -41,6 +41,8 @@ No description yet. sound output work with GStreamer >= 0.10.29, which includes the versions used in Ubuntu 10.10 and on OS X if using Homebrew. (Fixes: :issue:`21`, :issue:`24`, contributes to :issue:`14`) +- Improved handling of uncaught exceptions in threads. The entire process + should now exit immediately. 0.1.0 (2010-08-23) diff --git a/mopidy/backends/libspotify/session_manager.py b/mopidy/backends/libspotify/session_manager.py index 3b6f9f0b..7f541236 100644 --- a/mopidy/backends/libspotify/session_manager.py +++ b/mopidy/backends/libspotify/session_manager.py @@ -19,12 +19,8 @@ class LibspotifySessionManager(SpotifySessionManager, BaseThread): def __init__(self, username, password, core_queue, output): SpotifySessionManager.__init__(self, username, password) - BaseThread.__init__(self) + BaseThread.__init__(self, core_queue) self.name = 'LibspotifySMThread' - # Run as a daemon thread, so Mopidy won't wait for this thread to exit - # before Mopidy exits. - self.daemon = True - self.core_queue = core_queue self.output = output self.connected = threading.Event() self.session = None diff --git a/mopidy/core.py b/mopidy/core.py index 5351e2a5..69760094 100644 --- a/mopidy/core.py +++ b/mopidy/core.py @@ -1,20 +1,22 @@ import logging import multiprocessing import optparse +import sys from mopidy import get_version, settings, OptionalDependencyError from mopidy.utils import get_class from mopidy.utils.log import setup_logging from mopidy.utils.path import get_or_create_folder, get_or_create_file -from mopidy.utils.process import BaseProcess +from mopidy.utils.process import BaseThread from mopidy.utils.settings import list_settings_optparse_callback logger = logging.getLogger('mopidy.core') -class CoreProcess(BaseProcess): +class CoreProcess(BaseThread): def __init__(self): - super(CoreProcess, self).__init__(name='CoreProcess') self.core_queue = multiprocessing.Queue() + super(CoreProcess, self).__init__(self.core_queue) + self.name = 'CoreProcess' self.options = self.parse_options() self.output = None self.backend = None @@ -79,7 +81,9 @@ class CoreProcess(BaseProcess): return frontends def process_message(self, message): - if message.get('to') == 'output': + if message.get('to') == 'core': + self.process_message_to_core(message) + elif message.get('to') == 'output': self.output.process_message(message) elif message.get('to') == 'frontend': for frontend in self.frontends: @@ -92,3 +96,12 @@ class CoreProcess(BaseProcess): self.backend.stored_playlists.playlists = message['playlists'] else: logger.warning(u'Cannot handle message: %s', message) + + def process_message_to_core(self, message): + assert message['to'] == 'core', u'Message recipient must be "core".' + if message['command'] == 'exit': + if message['reason'] is not None: + logger.info(u'Exiting (%s)', message['reason']) + sys.exit(message['status']) + else: + logger.warning(u'Cannot handle message: %s', message) diff --git a/mopidy/frontends/lastfm.py b/mopidy/frontends/lastfm.py index 42dd16c7..e91dd272 100644 --- a/mopidy/frontends/lastfm.py +++ b/mopidy/frontends/lastfm.py @@ -45,7 +45,7 @@ class LastfmFrontend(BaseFrontend): def __init__(self, *args, **kwargs): super(LastfmFrontend, self).__init__(*args, **kwargs) (self.connection, other_end) = multiprocessing.Pipe() - self.thread = LastfmFrontendThread(other_end) + self.thread = LastfmFrontendThread(self.core_queue, other_end) def start(self): self.thread.start() @@ -58,10 +58,9 @@ class LastfmFrontend(BaseFrontend): class LastfmFrontendThread(BaseThread): - def __init__(self, connection): - super(LastfmFrontendThread, self).__init__() + def __init__(self, core_queue, connection): + super(LastfmFrontendThread, self).__init__(core_queue) self.name = u'LastfmFrontendThread' - self.daemon = True self.connection = connection self.lastfm = None self.scrobbler = None diff --git a/mopidy/frontends/mpd/protocol/music_db.py b/mopidy/frontends/mpd/protocol/music_db.py index 4c2031aa..fb3a3a09 100644 --- a/mopidy/frontends/mpd/protocol/music_db.py +++ b/mopidy/frontends/mpd/protocol/music_db.py @@ -196,6 +196,7 @@ def _list_build_query(field, mpd_query): query = {} while tokens: key = tokens[0].lower() + key = str(key) # Needed for kwargs keys on OS X and Windows value = tokens[1] tokens = tokens[2:] if key not in (u'artist', u'album', u'date', u'genre'): diff --git a/mopidy/frontends/mpd/thread.py b/mopidy/frontends/mpd/thread.py index 0fb048ec..0ad5ee68 100644 --- a/mopidy/frontends/mpd/thread.py +++ b/mopidy/frontends/mpd/thread.py @@ -8,10 +8,8 @@ logger = logging.getLogger('mopidy.frontends.mpd.thread') class MpdThread(BaseThread): def __init__(self, core_queue): - super(MpdThread, self).__init__() + super(MpdThread, self).__init__(core_queue) self.name = u'MpdThread' - self.daemon = True - self.core_queue = core_queue def run_inside_try(self): logger.debug(u'Starting MPD server thread') diff --git a/mopidy/mixers/nad.py b/mopidy/mixers/nad.py index 929d2e1d..7a8f006e 100644 --- a/mopidy/mixers/nad.py +++ b/mopidy/mixers/nad.py @@ -4,7 +4,7 @@ from multiprocessing import Pipe from mopidy import settings from mopidy.mixers import BaseMixer -from mopidy.utils.process import BaseProcess +from mopidy.utils.process import BaseThread logger = logging.getLogger('mopidy.mixers.nad') @@ -50,7 +50,7 @@ class NadMixer(BaseMixer): self._pipe.send({'command': 'set_volume', 'volume': volume}) -class NadTalker(BaseProcess): +class NadTalker(BaseThread): """ Independent process which does the communication with the NAD device. diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index a53fcd20..3714fed6 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -29,7 +29,7 @@ class GStreamerOutput(BaseOutput): def __init__(self, *args, **kwargs): super(GStreamerOutput, self).__init__(*args, **kwargs) # Start a helper thread that can run the gobject.MainLoop - self.messages_thread = GStreamerMessagesThread() + self.messages_thread = GStreamerMessagesThread(self.core_queue) # Start a helper thread that can process the output_queue self.output_queue = multiprocessing.Queue() @@ -91,10 +91,9 @@ class GStreamerOutput(BaseOutput): class GStreamerMessagesThread(BaseThread): - def __init__(self): - super(GStreamerMessagesThread, self).__init__() + def __init__(self, core_queue): + super(GStreamerMessagesThread, self).__init__(core_queue) self.name = u'GStreamerMessagesThread' - self.daemon = True def run_inside_try(self): gobject.MainLoop().run() @@ -113,10 +112,8 @@ class GStreamerPlayerThread(BaseThread): """ def __init__(self, core_queue, output_queue): - super(GStreamerPlayerThread, self).__init__() + super(GStreamerPlayerThread, self).__init__(core_queue) self.name = u'GStreamerPlayerThread' - self.daemon = True - self.core_queue = core_queue self.output_queue = output_queue self.gst_pipeline = None diff --git a/mopidy/utils/process.py b/mopidy/utils/process.py index 0acccb4d..7855d69c 100644 --- a/mopidy/utils/process.py +++ b/mopidy/utils/process.py @@ -19,22 +19,26 @@ def unpickle_connection(pickled_connection): class BaseProcess(multiprocessing.Process): + def __init__(self, core_queue): + super(BaseProcess, self).__init__() + self.core_queue = core_queue + def run(self): logger.debug(u'%s: Starting process', self.name) try: self.run_inside_try() except KeyboardInterrupt: - logger.info(u'%s: Interrupted by user', self.name) - sys.exit(0) + logger.info(u'Interrupted by user') + self.exit(0, u'Interrupted by user') except SettingsError as e: logger.error(e.message) - sys.exit(1) + self.exit(1, u'Settings error') except ImportError as e: logger.error(e) - sys.exit(1) + self.exit(2, u'Import error') except Exception as e: logger.exception(e) - raise e + self.exit(3, u'Unknown error') def run_inside_try(self): raise NotImplementedError @@ -42,27 +46,43 @@ class BaseProcess(multiprocessing.Process): def destroy(self): self.terminate() + def exit(self, status=0, reason=None): + self.core_queue.put({'to': 'core', 'command': 'exit', + 'status': status, 'reason': reason}) + self.destroy() + class BaseThread(multiprocessing.dummy.Process): + def __init__(self, core_queue): + super(BaseThread, self).__init__() + self.core_queue = core_queue + # No thread should block process from exiting + self.daemon = True + def run(self): logger.debug(u'%s: Starting thread', self.name) try: self.run_inside_try() except KeyboardInterrupt: - logger.info(u'%s: Interrupted by user', self.name) - sys.exit(0) + logger.info(u'Interrupted by user') + self.exit(0, u'Interrupted by user') except SettingsError as e: logger.error(e.message) - sys.exit(1) + self.exit(1, u'Settings error') except ImportError as e: logger.error(e) - sys.exit(1) + self.exit(2, u'Import error') except Exception as e: logger.exception(e) - raise e + self.exit(3, u'Unknown error') def run_inside_try(self): raise NotImplementedError def destroy(self): pass + + def exit(self, status=0, reason=None): + self.core_queue.put({'to': 'core', 'command': 'exit', + 'status': status, 'reason': reason}) + self.destroy()