diff --git a/docs/changes.rst b/docs/changes.rst index c8e4c912..d38a135a 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -35,6 +35,9 @@ Another great release. - :meth:`mopidy.backends.base.BaseLibraryController.search()` now accepts keyword arguments of the form ``search(artist=['foo', 'fighters'], album=['bar', 'grooves'])``. + - :meth:`mopidy.backends.base.BaseBackend()` now accepts an + ``output_queue`` which it can use to send messages (i.e. audio data) + to the output process. diff --git a/mopidy/backends/base/__init__.py b/mopidy/backends/base/__init__.py index e79aceae..3a484865 100644 --- a/mopidy/backends/base/__init__.py +++ b/mopidy/backends/base/__init__.py @@ -23,13 +23,16 @@ class BaseBackend(object): :param core_queue: a queue for sending messages to :class:`mopidy.process.CoreProcess` :type core_queue: :class:`multiprocessing.Queue` + :param output_queue: a queue for sending messages to the output process + :type output_queue: :class:`multiprocessing.Queue` :param mixer: either a mixer instance, or :class:`None` to use the mixer defined in settings :type mixer: :class:`mopidy.mixers.BaseMixer` or :class:`None` """ - def __init__(self, core_queue=None, mixer=None): + def __init__(self, core_queue=None, output_queue=None, mixer=None): self.core_queue = core_queue + self.output_queue = output_queue if mixer is not None: self.mixer = mixer else: diff --git a/mopidy/backends/libspotify/__init__.py b/mopidy/backends/libspotify/__init__.py index 017a4155..c256b55d 100644 --- a/mopidy/backends/libspotify/__init__.py +++ b/mopidy/backends/libspotify/__init__.py @@ -1,5 +1,4 @@ import datetime as dt -import gobject import logging import os import multiprocessing @@ -14,8 +13,8 @@ from mopidy.backends.base import (BaseBackend, BaseCurrentPlaylistController, BaseLibraryController, BasePlaybackController, BaseStoredPlaylistsController) from mopidy.models import Artist, Album, Track, Playlist +from mopidy.process import pickle_connection -import gst logger = logging.getLogger('mopidy.backends.libspotify') ENCODING = 'utf-8' @@ -44,7 +43,6 @@ class LibspotifyBackend(BaseBackend): self.stored_playlists = LibspotifyStoredPlaylistsController( backend=self) self.uri_handlers = [u'spotify:', u'http://open.spotify.com/'] - self.gstreamer_pipeline = gst.Pipeline("spotify_pipeline") self.spotify = self._connect() def _connect(self): @@ -52,7 +50,7 @@ class LibspotifyBackend(BaseBackend): spotify = LibspotifySessionManager( settings.SPOTIFY_USERNAME, settings.SPOTIFY_PASSWORD, core_queue=self.core_queue, - gstreamer_pipeline=self.gstreamer_pipeline) + output_queue=self.output_queue) spotify.start() return spotify @@ -94,13 +92,22 @@ class LibspotifyLibraryController(BaseLibraryController): class LibspotifyPlaybackController(BasePlaybackController): + def _set_output_state(self, state_name): + logger.debug(u'Setting output state to %s ...', state_name) + (my_end, other_end) = multiprocessing.Pipe() + self.backend.output_queue.put({ + 'command': 'set_state', + 'state': state_name, + 'reply_to': pickle_connection(other_end), + }) + my_end.poll(None) + return my_end.recv() + def _pause(self): - result = self.backend.gstreamer_pipeline.set_state(gst.STATE_PAUSED) - logger.debug('Changed gstreamer state to paused. Result was: %s' % result) - return result == gst.STATE_CHANGE_SUCCESS + return self._set_output_state('PAUSED') def _play(self, track): - self.backend.gstreamer_pipeline.set_state(gst.STATE_READY) + self._set_output_state('READY') if self.state == self.PLAYING: self.stop() if track.uri is None: @@ -109,24 +116,22 @@ class LibspotifyPlaybackController(BasePlaybackController): self.backend.spotify.session.load( Link.from_string(track.uri).as_track()) self.backend.spotify.session.play(1) - self.backend.gstreamer_pipeline.set_state(gst.STATE_PLAYING) + self._set_output_state('PLAYING') return True except SpotifyError as e: logger.warning('Play %s failed: %s', track.uri, e) return False def _resume(self): - result = self.backend.gstreamer_pipeline.set_state(gst.STATE_PLAYING) - logger.debug('Changed gstreamer state to playing. Result was: %s' % result) - return result == gst.STATE_CHANGE_SUCCESS + return self._set_output_state('PLAYING') def _seek(self, time_position): pass # TODO def _stop(self): - self.backend.gstreamer_pipeline.set_state(gst.STATE_READY) + result = self._set_output_state('READY') self.backend.spotify.session.play(0) - return True + return result class LibspotifyStoredPlaylistsController(BaseStoredPlaylistsController): @@ -196,57 +201,19 @@ class LibspotifyTranslator(object): tracks=[cls.to_mopidy_track(t) for t in spotify_playlist], ) -class GstreamerMessageBusProcess(threading.Thread): - def __init__(self, core_queue, pipeline): - super(GstreamerMessageBusProcess, self).__init__() - self.core_queue = core_queue - self.bus = pipeline.get_bus() - - def run(self): - loop = gobject.MainLoop() - gobject.threads_init() - context = loop.get_context() - while True: - message = self.bus.pop_filtered(gst.MESSAGE_EOS) - if message is not None: - self.core_queue.put({'command': 'end_of_track'}) - logger.debug('Got and handled Gstreamer message of type: %s' % message.type) - context.iteration(True) - class LibspotifySessionManager(SpotifySessionManager, threading.Thread): cache_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) settings_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) appkey_file = os.path.expanduser(settings.SPOTIFY_LIB_APPKEY) user_agent = 'Mopidy %s' % get_version() - def __init__(self, username, password, core_queue, gstreamer_pipeline): + def __init__(self, username, password, core_queue, output_queue): SpotifySessionManager.__init__(self, username, password) threading.Thread.__init__(self) self.core_queue = core_queue + self.output_queue = output_queue self.connected = threading.Event() self.session = None - self.gstreamer_pipeline = gstreamer_pipeline - - cap_string = """audio/x-raw-int, - endianness=(int)1234, - channels=(int)2, - width=(int)16, - depth=(int)16, - signed=True, - rate=(int)44100""" - caps = gst.caps_from_string(cap_string) - - self.gsrc = gst.element_factory_make("appsrc", "app-source") - self.gsrc.set_property('caps', caps) - - self.gsink = gst.element_factory_make("autoaudiosink", "autosink") - - self.gstreamer_pipeline.add(self.gsrc, self.gsink) - - gst.element_link_many(self.gsrc, self.gsink) - - message_process = GstreamerMessageBusProcess(self.core_queue, self.gstreamer_pipeline) - message_process.start() def run(self): self.connect() @@ -288,17 +255,21 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def music_delivery(self, session, frames, frame_size, num_frames, sample_type, sample_rate, channels): """Callback used by pyspotify""" - cap_string = """audio/x-raw-int, - endianness=(int)1234, - channels=(int)2, - width=(int)16, - depth=(int)16, - signed=True, - rate=(int)44100""" - caps = gst.caps_from_string(cap_string) - b = gst.Buffer(frames) - b.set_caps(caps) - self.gsrc.emit('push-buffer', b) + # TODO Base caps_string on arguments + caps_string = """ + audio/x-raw-int, + endianness=(int)1234, + channels=(int)2, + width=(int)16, + depth=(int)16, + signed=True, + rate=(int)44100 + """ + self.output_queue.put({ + 'command': 'deliver_data', + 'caps': caps_string, + 'data': bytes(frames), + }) def play_token_lost(self, session): """Callback used by pyspotify""" @@ -311,9 +282,8 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def end_of_track(self, session): """Callback used by pyspotify""" - logger.debug('End of track.') - self.gsrc.emit('end-of-stream') - logger.debug('End of stream sent to gstreamer.') + logger.debug('End of data stream.') + self.output_queue.put({'command': 'end_of_data_stream'}) def search(self, query, connection): """Search method used by Mopidy backend""" diff --git a/mopidy/frontends/mpd/server.py b/mopidy/frontends/mpd/server.py index 57b6211f..5bdbb85a 100644 --- a/mopidy/frontends/mpd/server.py +++ b/mopidy/frontends/mpd/server.py @@ -8,7 +8,8 @@ import sys from mopidy import get_mpd_protocol_version, settings from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR -from mopidy.utils import indent, pickle_connection +from mopidy.process import pickle_connection +from mopidy.utils import indent logger = logging.getLogger('mopidy.frontends.mpd.server') diff --git a/mopidy/mixers/gstreamer.py b/mopidy/mixers/gstreamer.py new file mode 100644 index 00000000..3be94db0 --- /dev/null +++ b/mopidy/mixers/gstreamer.py @@ -0,0 +1,14 @@ +from mopidy.mixers import BaseMixer + +class GStreamerMixer(BaseMixer): + """Mixer which uses GStreamer to control volume.""" + + def __init__(self, *args, **kwargs): + super(GStreamerMixer, self).__init__(*args, **kwargs) + + def _get_volume(self): + pass # TODO Get volume from GStreamerProcess + + def _set_volume(self, volume): + pass # TODO Send volume to GStreamerProcess + diff --git a/mopidy/mixers/nad.py b/mopidy/mixers/nad.py index 1f7f4710..56958005 100644 --- a/mopidy/mixers/nad.py +++ b/mopidy/mixers/nad.py @@ -83,7 +83,7 @@ class NadTalker(BaseProcess): self.pipe = pipe self._device = None - def _run(self): + def run_inside_try(self): self._open_connection() self._set_device_to_known_state() while self.pipe.poll(None): diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py new file mode 100644 index 00000000..65b65504 --- /dev/null +++ b/mopidy/outputs/gstreamer.py @@ -0,0 +1,201 @@ +import gobject +gobject.threads_init() + +import pygst +pygst.require('0.10') +import gst + +import logging +import threading + +from mopidy.process import BaseProcess, unpickle_connection + +logger = logging.getLogger('mopidy.outputs.gstreamer') + +class GStreamerOutput(object): + """ + Audio output through GStreamer. + + Starts the :class:`GStreamerProcess`. + """ + + def __init__(self, core_queue, output_queue): + process = GStreamerProcess(core_queue, output_queue) + process.start() + +class GStreamerMessagesThread(threading.Thread): + def run(self): + gobject.MainLoop().run() + +class GStreamerProcess(BaseProcess): + """ + A process for all work related to GStreamer. + + The main loop processes events from both Mopidy and GStreamer. + + Make sure this subprocess is started by the MainThread in the top-most + parent process, and not some other thread. If not, we can get into the + problems described at + http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html. + """ + + def __init__(self, core_queue, output_queue): + super(GStreamerProcess, self).__init__() + self.core_queue = core_queue + self.output_queue = output_queue + self.gst_pipeline = None + self.gst_bus = None + self.gst_bus_id = None + self.gst_uri_src = None + self.gst_data_src = None + self.gst_volume = None + self.gst_sink = None + + def run_inside_try(self): + self.setup() + while True: + message = self.output_queue.get() + self.process_mopidy_message(message) + + def setup(self): + logger.debug(u'Setting up GStreamer pipeline') + + # Start a helper thread that can run the gobject.MainLoop + messages_thread = GStreamerMessagesThread() + messages_thread.daemon = True + messages_thread.start() + + # A pipeline consisting of many elements + self.gst_pipeline = gst.Pipeline("pipeline") + + # Setup bus and message processor + self.gst_bus = self.gst_pipeline.get_bus() + self.gst_bus.add_signal_watch() + self.gst_bus_id = self.gst_bus.connect('message', + self.process_gst_message) + + # Bin for playing audio URIs + #self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src') + #self.gst_pipeline.add(self.gst_uri_src) + + # Bin for playing audio data + self.gst_data_src = gst.element_factory_make('appsrc', 'data_src') + self.gst_pipeline.add(self.gst_data_src) + + # Volume filter + self.gst_volume = gst.element_factory_make('volume', 'volume') + self.gst_pipeline.add(self.gst_volume) + + # Audio output sink + self.gst_sink = gst.element_factory_make('autoaudiosink', 'sink') + self.gst_pipeline.add(self.gst_sink) + + # Add callback that will link uri_src output with volume filter input + # when the output pad is ready. + # See http://stackoverflow.com/questions/2993777 for details. + def on_new_decoded_pad(dbin, pad, is_last): + uri_src = pad.get_parent() + pipeline = uri_src.get_parent() + volume = pipeline.get_by_name('volume') + uri_src.link(volume) + logger.debug("Linked uri_src's new decoded pad to volume filter") + # FIXME uridecodebin got no new-decoded-pad signal, but it's + # subcomponent decodebin2 got that signal. Fixing this is postponed + # till after data_src is up and running perfectly + #self.gst_uri_src.connect('new-decoded-pad', on_new_decoded_pad) + + # Link data source output with volume filter input + self.gst_data_src.link(self.gst_volume) + + # Link volume filter output to audio sink input + self.gst_volume.link(self.gst_sink) + + def process_mopidy_message(self, message): + """Process messages from the rest of Mopidy.""" + if message['command'] == 'play_uri': + response = self.play_uri(message['uri']) + connection = unpickle_connection(message['reply_to']) + connection.send(response) + elif message['command'] == 'deliver_data': + self.deliver_data(message['caps'], message['data']) + elif message['command'] == 'end_of_data_stream': + self.end_of_data_stream() + elif message['command'] == 'set_state': + response = self.set_state(message['state']) + connection = unpickle_connection(message['reply_to']) + connection.send(response) + else: + logger.warning(u'Cannot handle message: %s', message) + + def process_gst_message(self, bus, message): + """Process messages from GStreamer.""" + if message.type == gst.MESSAGE_EOS: + logger.debug(u'GStreamer signalled end-of-stream. ' + 'Sending end_of_track to core_queue ...') + self.core_queue.put({'command': 'end_of_track'}) + elif message.type == gst.MESSAGE_ERROR: + self.set_state('NULL') + error, debug = message.parse_error() + logger.error(u'%s %s', error, debug) + # FIXME Should we send 'stop_playback' to core here? Can we + # differentiate on how serious the error is? + + def play_uri(self, uri): + """Play audio at URI""" + self.set_state('READY') + self.gst_uri_src.set_property('uri', uri) + self.set_state('PLAYING') + # TODO Return status + + def deliver_data(self, caps_string, data): + """Deliver audio data to be played""" + caps = gst.caps_from_string(caps_string) + buffer_ = gst.Buffer(buffer(data)) + buffer_.set_caps(caps) + self.gst_data_src.set_property('caps', caps) + self.gst_data_src.emit('push-buffer', buffer_) + + def end_of_data_stream(self): + """ + Add end-of-stream token to source. + + We will get a GStreamer message when the stream playback reaches the + token, and can then do any end-of-stream related tasks. + """ + self.gst_data_src.emit('end-of-stream') + + def set_state(self, state_name): + """ + Set the GStreamer state. Returns :class:`True` if successful. + + .. digraph:: gst_state_transitions + + "NULL" -> "READY" + "PAUSED" -> "PLAYING" + "PAUSED" -> "READY" + "PLAYING" -> "PAUSED" + "READY" -> "NULL" + "READY" -> "PAUSED" + + :param state_name: NULL, READY, PAUSED, or PLAYING + :type state_name: string + :rtype: :class:`True` or :class:`False` + """ + result = self.gst_pipeline.set_state( + getattr(gst, 'STATE_' + state_name)) + if result == gst.STATE_CHANGE_SUCCESS: + logger.debug('Setting GStreamer state to %s: OK', state_name) + return True + else: + logger.warning('Setting GStreamer state to %s: failed', state_name) + return False + + def get_volume(self): + """Get volume in range [0..100]""" + gst_volume = self.gst_volume.get_property('volume') + return int(gst_volume * 100) + + def set_volume(self, volume): + """Set volume in range [0..100]""" + gst_volume = volume / 100.0 + self.gst_volume.set_property('volume', gst_volume) diff --git a/mopidy/process.py b/mopidy/process.py index d3c1d03e..9759c4e6 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -1,16 +1,27 @@ import logging import multiprocessing +from multiprocessing.reduction import reduce_connection +import pickle import sys from mopidy import settings, SettingsError -from mopidy.utils import get_class, unpickle_connection +from mopidy.utils import get_class logger = logging.getLogger('mopidy.process') +def pickle_connection(connection): + return pickle.dumps(reduce_connection(connection)) + +def unpickle_connection(pickled_connection): + # From http://stackoverflow.com/questions/1446004 + (func, args) = pickle.loads(pickled_connection) + return func(*args) + + class BaseProcess(multiprocessing.Process): def run(self): try: - self._run() + self.run_inside_try() except KeyboardInterrupt: logger.info(u'Interrupted by user') sys.exit(0) @@ -18,7 +29,7 @@ class BaseProcess(multiprocessing.Process): logger.error(e.message) sys.exit(1) - def _run(self): + def run_inside_try(self): raise NotImplementedError @@ -26,30 +37,37 @@ class CoreProcess(BaseProcess): def __init__(self, core_queue): super(CoreProcess, self).__init__() self.core_queue = core_queue - self._backend = None - self._frontend = None + self.output_queue = None + self.output = None + self.backend = None + self.frontend = None - def _run(self): - self._setup() + def run_inside_try(self): + self.setup() while True: message = self.core_queue.get() - self._process_message(message) + self.process_message(message) - def _setup(self): - self._backend = get_class(settings.BACKENDS[0])( - core_queue=self.core_queue) - self._frontend = get_class(settings.FRONTEND)(backend=self._backend) + def setup(self): + self.output_queue = multiprocessing.Queue() + self.output = get_class(settings.OUTPUT)(self.core_queue, + self.output_queue) + self.backend = get_class(settings.BACKENDS[0])(self.core_queue, + self.output_queue) + self.frontend = get_class(settings.FRONTEND)(self.backend) - def _process_message(self, message): - if message['command'] == 'mpd_request': - response = self._frontend.handle_request(message['request']) + def process_message(self, message): + if message.get('to') == 'output': + self.output_queue.put(message) + elif message['command'] == 'mpd_request': + response = self.frontend.handle_request(message['request']) connection = unpickle_connection(message['reply_to']) connection.send(response) elif message['command'] == 'end_of_track': - self._backend.playback.end_of_track_callback() + self.backend.playback.end_of_track_callback() elif message['command'] == 'stop_playback': - self._backend.playback.stop() + self.backend.playback.stop() elif message['command'] == 'set_stored_playlists': - self._backend.stored_playlists.playlists = message['playlists'] + self.backend.stored_playlists.playlists = message['playlists'] else: logger.warning(u'Cannot handle message: %s', message) diff --git a/mopidy/settings.py b/mopidy/settings.py index 1192c28d..8fdc3535 100644 --- a/mopidy/settings.py +++ b/mopidy/settings.py @@ -105,6 +105,11 @@ MIXER_EXT_SPEAKERS_A = None #: Default: :class:`None`. MIXER_EXT_SPEAKERS_B = None +#: Audio output handler to use. Default:: +#: +#: OUTPUT = u'mopidy.outputs.gstreamer.GStreamerOutput' +OUTPUT = u'mopidy.outputs.gstreamer.GStreamerOutput' + #: Server to use. Default:: #: #: SERVER = u'mopidy.frontends.mpd.server.MpdServer' diff --git a/mopidy/utils.py b/mopidy/utils.py index 7eac9239..ff032b4e 100644 --- a/mopidy/utils.py +++ b/mopidy/utils.py @@ -1,7 +1,5 @@ import logging -from multiprocessing.reduction import reduce_connection import os -import pickle import sys import urllib @@ -54,14 +52,6 @@ def indent(string, places=4, linebreak='\n'): result += linebreak + ' ' * places + line return result -def pickle_connection(connection): - return pickle.dumps(reduce_connection(connection)) - -def unpickle_connection(pickled_connection): - # From http://stackoverflow.com/questions/1446004 - (func, args) = pickle.loads(pickled_connection) - return func(*args) - def parse_m3u(file_path): """ Convert M3U file list of uris