Merge branch 'feature/exit-on-uncaught-exception' into develop

This commit is contained in:
Stein Magnus Jodal 2010-10-24 20:09:09 +02:00
commit bade4a01e0
9 changed files with 61 additions and 35 deletions

View File

@ -41,6 +41,8 @@ No description yet.
sound output work with GStreamer >= 0.10.29, which includes the versions used
in Ubuntu 10.10 and on OS X if using Homebrew. (Fixes: :issue:`21`,
:issue:`24`, contributes to :issue:`14`)
- Improved handling of uncaught exceptions in threads. The entire process
should now exit immediately.
0.1.0 (2010-08-23)

View File

@ -19,12 +19,8 @@ class LibspotifySessionManager(SpotifySessionManager, BaseThread):
def __init__(self, username, password, core_queue, output):
SpotifySessionManager.__init__(self, username, password)
BaseThread.__init__(self)
BaseThread.__init__(self, core_queue)
self.name = 'LibspotifySMThread'
# Run as a daemon thread, so Mopidy won't wait for this thread to exit
# before Mopidy exits.
self.daemon = True
self.core_queue = core_queue
self.output = output
self.connected = threading.Event()
self.session = None

View File

@ -1,20 +1,22 @@
import logging
import multiprocessing
import optparse
import sys
from mopidy import get_version, settings, OptionalDependencyError
from mopidy.utils import get_class
from mopidy.utils.log import setup_logging
from mopidy.utils.path import get_or_create_folder, get_or_create_file
from mopidy.utils.process import BaseProcess
from mopidy.utils.process import BaseThread
from mopidy.utils.settings import list_settings_optparse_callback
logger = logging.getLogger('mopidy.core')
class CoreProcess(BaseProcess):
class CoreProcess(BaseThread):
def __init__(self):
super(CoreProcess, self).__init__(name='CoreProcess')
self.core_queue = multiprocessing.Queue()
super(CoreProcess, self).__init__(self.core_queue)
self.name = 'CoreProcess'
self.options = self.parse_options()
self.output = None
self.backend = None
@ -79,7 +81,9 @@ class CoreProcess(BaseProcess):
return frontends
def process_message(self, message):
if message.get('to') == 'output':
if message.get('to') == 'core':
self.process_message_to_core(message)
elif message.get('to') == 'output':
self.output.process_message(message)
elif message.get('to') == 'frontend':
for frontend in self.frontends:
@ -92,3 +96,12 @@ class CoreProcess(BaseProcess):
self.backend.stored_playlists.playlists = message['playlists']
else:
logger.warning(u'Cannot handle message: %s', message)
def process_message_to_core(self, message):
assert message['to'] == 'core', u'Message recipient must be "core".'
if message['command'] == 'exit':
if message['reason'] is not None:
logger.info(u'Exiting (%s)', message['reason'])
sys.exit(message['status'])
else:
logger.warning(u'Cannot handle message: %s', message)

View File

@ -45,7 +45,7 @@ class LastfmFrontend(BaseFrontend):
def __init__(self, *args, **kwargs):
super(LastfmFrontend, self).__init__(*args, **kwargs)
(self.connection, other_end) = multiprocessing.Pipe()
self.thread = LastfmFrontendThread(other_end)
self.thread = LastfmFrontendThread(self.core_queue, other_end)
def start(self):
self.thread.start()
@ -58,10 +58,9 @@ class LastfmFrontend(BaseFrontend):
class LastfmFrontendThread(BaseThread):
def __init__(self, connection):
super(LastfmFrontendThread, self).__init__()
def __init__(self, core_queue, connection):
super(LastfmFrontendThread, self).__init__(core_queue)
self.name = u'LastfmFrontendThread'
self.daemon = True
self.connection = connection
self.lastfm = None
self.scrobbler = None

View File

@ -196,6 +196,7 @@ def _list_build_query(field, mpd_query):
query = {}
while tokens:
key = tokens[0].lower()
key = str(key) # Needed for kwargs keys on OS X and Windows
value = tokens[1]
tokens = tokens[2:]
if key not in (u'artist', u'album', u'date', u'genre'):

View File

@ -8,10 +8,8 @@ logger = logging.getLogger('mopidy.frontends.mpd.thread')
class MpdThread(BaseThread):
def __init__(self, core_queue):
super(MpdThread, self).__init__()
super(MpdThread, self).__init__(core_queue)
self.name = u'MpdThread'
self.daemon = True
self.core_queue = core_queue
def run_inside_try(self):
logger.debug(u'Starting MPD server thread')

View File

@ -4,7 +4,7 @@ from multiprocessing import Pipe
from mopidy import settings
from mopidy.mixers import BaseMixer
from mopidy.utils.process import BaseProcess
from mopidy.utils.process import BaseThread
logger = logging.getLogger('mopidy.mixers.nad')
@ -50,7 +50,7 @@ class NadMixer(BaseMixer):
self._pipe.send({'command': 'set_volume', 'volume': volume})
class NadTalker(BaseProcess):
class NadTalker(BaseThread):
"""
Independent process which does the communication with the NAD device.

View File

@ -29,7 +29,7 @@ class GStreamerOutput(BaseOutput):
def __init__(self, *args, **kwargs):
super(GStreamerOutput, self).__init__(*args, **kwargs)
# Start a helper thread that can run the gobject.MainLoop
self.messages_thread = GStreamerMessagesThread()
self.messages_thread = GStreamerMessagesThread(self.core_queue)
# Start a helper thread that can process the output_queue
self.output_queue = multiprocessing.Queue()
@ -91,10 +91,9 @@ class GStreamerOutput(BaseOutput):
class GStreamerMessagesThread(BaseThread):
def __init__(self):
super(GStreamerMessagesThread, self).__init__()
def __init__(self, core_queue):
super(GStreamerMessagesThread, self).__init__(core_queue)
self.name = u'GStreamerMessagesThread'
self.daemon = True
def run_inside_try(self):
gobject.MainLoop().run()
@ -113,10 +112,8 @@ class GStreamerPlayerThread(BaseThread):
"""
def __init__(self, core_queue, output_queue):
super(GStreamerPlayerThread, self).__init__()
super(GStreamerPlayerThread, self).__init__(core_queue)
self.name = u'GStreamerPlayerThread'
self.daemon = True
self.core_queue = core_queue
self.output_queue = output_queue
self.gst_pipeline = None

View File

@ -19,22 +19,26 @@ def unpickle_connection(pickled_connection):
class BaseProcess(multiprocessing.Process):
def __init__(self, core_queue):
super(BaseProcess, self).__init__()
self.core_queue = core_queue
def run(self):
logger.debug(u'%s: Starting process', self.name)
try:
self.run_inside_try()
except KeyboardInterrupt:
logger.info(u'%s: Interrupted by user', self.name)
sys.exit(0)
logger.info(u'Interrupted by user')
self.exit(0, u'Interrupted by user')
except SettingsError as e:
logger.error(e.message)
sys.exit(1)
self.exit(1, u'Settings error')
except ImportError as e:
logger.error(e)
sys.exit(1)
self.exit(2, u'Import error')
except Exception as e:
logger.exception(e)
raise e
self.exit(3, u'Unknown error')
def run_inside_try(self):
raise NotImplementedError
@ -42,27 +46,43 @@ class BaseProcess(multiprocessing.Process):
def destroy(self):
self.terminate()
def exit(self, status=0, reason=None):
self.core_queue.put({'to': 'core', 'command': 'exit',
'status': status, 'reason': reason})
self.destroy()
class BaseThread(multiprocessing.dummy.Process):
def __init__(self, core_queue):
super(BaseThread, self).__init__()
self.core_queue = core_queue
# No thread should block process from exiting
self.daemon = True
def run(self):
logger.debug(u'%s: Starting thread', self.name)
try:
self.run_inside_try()
except KeyboardInterrupt:
logger.info(u'%s: Interrupted by user', self.name)
sys.exit(0)
logger.info(u'Interrupted by user')
self.exit(0, u'Interrupted by user')
except SettingsError as e:
logger.error(e.message)
sys.exit(1)
self.exit(1, u'Settings error')
except ImportError as e:
logger.error(e)
sys.exit(1)
self.exit(2, u'Import error')
except Exception as e:
logger.exception(e)
raise e
self.exit(3, u'Unknown error')
def run_inside_try(self):
raise NotImplementedError
def destroy(self):
pass
def exit(self, status=0, reason=None):
self.core_queue.put({'to': 'core', 'command': 'exit',
'status': status, 'reason': reason})
self.destroy()