From 6e5e5a8930feecf74f808e3eecab75b3bf999037 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 00:29:44 +0200 Subject: [PATCH 01/21] [WIP] Add GStreamerOutput --- mopidy/outputs/__init__.py | 0 mopidy/outputs/gstreamer.py | 130 ++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 mopidy/outputs/__init__.py create mode 100644 mopidy/outputs/gstreamer.py diff --git a/mopidy/outputs/__init__.py b/mopidy/outputs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py new file mode 100644 index 00000000..16844adf --- /dev/null +++ b/mopidy/outputs/gstreamer.py @@ -0,0 +1,130 @@ +import gobject + +import pygst +pygst.require('0.10') + +import gst +import logging + +from mopidy.process import BaseProcess + +logger = logging.getLogger('mopidy.outputs.gstreamer') + +class GStreamerOutput(object): + """ + Audio output through GStreamer. + + Starts the :class:`GStreamerProcess`. + """ + + def __init__(self, core_queue): + process = GStreamerProcess(core_queue) + process.start() + +class GStreamerProcess(BaseProcess): + """ + A process for all work related to GStreamer. + + The main loop polls for events from both Mopidy and GStreamer. + """ + + def __init__(self, core_queue): + self.core_queue = core_queue + + # See http://www.jejik.com/articles/2007/01/ + # python-gstreamer_threading_and_the_main_loop/ for details. + gobject.threads_init() + self.gobject_context = gobject.MainLoop().get_context() + + # A pipeline consisting of many elements + self.gst_pipeline = gst.Pipeline("pipeline") + + # Setup bus and message processor + self.gst_bus = self.gst_pipeline.get_bus() + self.gst_bus.add_signal_watch() + self.gst_bus_id = self.gst_bus.connect('message', self.process_message) + + # Bin for playing audio URIs + self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src') + self.gst_pipeline.add(self.gst_uri_src) + + # Bin for playing audio data + self.gst_data_src = gst.element_factory_make('appsrc', 'data_src') + self.gst_pipeline.add(self.gst_data_src) + + # Volume filter + self.gst_volume = gst.element_factory_make('volume', 'volume') + self.gst_pipeline.add(self.gst_volume) + + # Audio output sink + self.gst_sink = gst.element_factory_make('autoaudiosink', 'sink') + self.gst_pipeline.add(self.gst_sink) + + # The audio URI chain + gst.element_link_many(self.gst_uri_src, self.gst_volume, self.gst_sink) + + # The audio data chain + gst.element_link_many(self.gst_data_src, self.gst_volume, + self.gst_sink) + + def _run(self): + while True: + # TODO Handle commands + self.gobject_context.iteration(True) + + def process_message(self, bus, message): + if message.type == gst.MESSAGE_EOS: + pass # TODO Handle end of track/stream + elif message.type == gst.MESSAGE_ERROR: + self.gst_bin.set_state(gst.STATE_NULL) + error, debug = message.parse_error() + logger.error(u'%s %s', error, debug) + + def deliver_data(self, caps_string, data): + """Deliver audio data to be played""" + caps = gst.caps_from_string(caps_string) + buffer_ = gst.Buffer(data) + buffer_.set_caps(caps) + self.gst_data_src.emit('push-buffer', buffer_) + + def play_uri(self, uri): + """Play audio at URI""" + self.gst_uri_src.set_state(gst.STATE_READY) + self.gst_uri_src.set_property('uri', uri) + self.gst_uri_src.set_state(gst.STATE_PLAYING) + # TODO Return status + + def state_playing(self): + """ + Set the state to PLAYING. + + Previous state should be READY or PAUSED. + """ + result = self.gst_uri_src.set_state(gst.STATE_PLAYING) + return result == gst.STATE_CHANGE_SUCCESS + + def state_paused(self): + """ + Set the state to PAUSED. + + Previous state should be PLAYING. + """ + result = self.gst_uri_src.set_state(gst.STATE_PAUSED) + return result == gst.STATE_CHANGE_SUCCESS + + def state_ready(self): + """ + Set the state to READY. + """ + result = self.gst_uri_src.set_state(gst.STATE_READY) + return result == gst.STATE_CHANGE_SUCCESS + + def get_volume(self): + """Get volume in range [0..100]""" + gst_volume = self.gst_volume.get_property('volume') + return int(gst_volume * 100) + + def set_volume(self, volume): + """Set volume in range [0..100]""" + gst_volume = volume / 100.0 + self.gst_volume.set_property('volume', gst_volume) From 93e6aa13cf7eff38a0cce394db9439e84839725b Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 00:29:51 +0200 Subject: [PATCH 02/21] [WIP] Add GStreamerMixer --- mopidy/mixers/gstreamer.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 mopidy/mixers/gstreamer.py diff --git a/mopidy/mixers/gstreamer.py b/mopidy/mixers/gstreamer.py new file mode 100644 index 00000000..83394bd0 --- /dev/null +++ b/mopidy/mixers/gstreamer.py @@ -0,0 +1,14 @@ +from mopidy.mixers import BaseMixer + +class GStreamerMixer(BaseMixer): + """Mixer which uses GStreamer to control volume.""" + + def __init__(self, *args, **kwargs): + super(GStreamerMixer, self).__init__(*args, **kwargs) + + def _get_volume(self): + # TODO Get volume from GStreamerProcess + + def _set_volume(self, volume): + # TODO Send volume to GStreamerProcess + From 420bdee1a0e10a43e62c264caeadbdd254bbf6b3 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:01:46 +0200 Subject: [PATCH 03/21] GstreamerProcess: Add method for processing msgs from core --- mopidy/outputs/gstreamer.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 16844adf..824eb877 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -42,7 +42,8 @@ class GStreamerProcess(BaseProcess): # Setup bus and message processor self.gst_bus = self.gst_pipeline.get_bus() self.gst_bus.add_signal_watch() - self.gst_bus_id = self.gst_bus.connect('message', self.process_message) + self.gst_bus_id = self.gst_bus.connect('message', + self.process_gst_message) # Bin for playing audio URIs self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src') @@ -72,7 +73,12 @@ class GStreamerProcess(BaseProcess): # TODO Handle commands self.gobject_context.iteration(True) - def process_message(self, bus, message): + def process_core_message(self, message): + """Processes messages from the rest of Mopidy.""" + pass # TODO + + def process_gst_message(self, bus, message): + """Processes message from GStreamer.""" if message.type == gst.MESSAGE_EOS: pass # TODO Handle end of track/stream elif message.type == gst.MESSAGE_ERROR: From dcdc22702eaf6e79416e23ff08f3c955ab6de384 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:04:07 +0200 Subject: [PATCH 04/21] GStreamerProcess: Log if state changes succeeds or fails --- mopidy/outputs/gstreamer.py | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 824eb877..2a282d95 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -95,9 +95,9 @@ class GStreamerProcess(BaseProcess): def play_uri(self, uri): """Play audio at URI""" - self.gst_uri_src.set_state(gst.STATE_READY) + self.state_ready() self.gst_uri_src.set_property('uri', uri) - self.gst_uri_src.set_state(gst.STATE_PLAYING) + self.state_playing() # TODO Return status def state_playing(self): @@ -107,7 +107,12 @@ class GStreamerProcess(BaseProcess): Previous state should be READY or PAUSED. """ result = self.gst_uri_src.set_state(gst.STATE_PLAYING) - return result == gst.STATE_CHANGE_SUCCESS + if result == gst.STATE_CHANGE_SUCCESS: + logger.debug('Setting GStreamer state to PLAYING: OK') + return True + else: + logger.warning('Setting GStreamer state to PLAYING: failed') + return False def state_paused(self): """ @@ -116,14 +121,24 @@ class GStreamerProcess(BaseProcess): Previous state should be PLAYING. """ result = self.gst_uri_src.set_state(gst.STATE_PAUSED) - return result == gst.STATE_CHANGE_SUCCESS + if result == gst.STATE_CHANGE_SUCCESS: + logger.debug('Setting GStreamer state to PAUSED: OK') + return True + else: + logger.warning('Setting GStreamer state to PAUSED: failed') + return False def state_ready(self): """ Set the state to READY. """ result = self.gst_uri_src.set_state(gst.STATE_READY) - return result == gst.STATE_CHANGE_SUCCESS + if result == gst.STATE_CHANGE_SUCCESS: + logger.debug('Setting GStreamer state to READY: OK') + return True + else: + logger.warning('Setting GStreamer state to READY: failed') + return False def get_volume(self): """Get volume in range [0..100]""" From 7721ae3db8b2c0a7dc4f6041358b1d9e50c91719 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:04:36 +0200 Subject: [PATCH 05/21] GStreamerProcess: Move all gst init till after the process has started --- mopidy/outputs/gstreamer.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 2a282d95..d3746729 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -29,8 +29,25 @@ class GStreamerProcess(BaseProcess): """ def __init__(self, core_queue): + super(GStreamerProcess, self).__init__() self.core_queue = core_queue + self.gobject_context = None + self.gst_pipeline = None + self.gst_bus = None + self.gst_bus_id = None + self.gst_uri_src = None + self.gst_data_src = None + self.gst_volume = None + self.gst_sink = None + def _run(self): + self.setup() + while True: + message = self.core_queue.get() + self.process_core_message(message) + self.gobject_context.iteration(True) + + def setup(self): # See http://www.jejik.com/articles/2007/01/ # python-gstreamer_threading_and_the_main_loop/ for details. gobject.threads_init() @@ -68,11 +85,6 @@ class GStreamerProcess(BaseProcess): gst.element_link_many(self.gst_data_src, self.gst_volume, self.gst_sink) - def _run(self): - while True: - # TODO Handle commands - self.gobject_context.iteration(True) - def process_core_message(self, message): """Processes messages from the rest of Mopidy.""" pass # TODO From 3bc1d751a34c01f15f0941281895e01c374eec5e Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:07:12 +0200 Subject: [PATCH 06/21] Unbreak tests by requiring a GStreamer version before importing 'gst' --- mopidy/backends/libspotify/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mopidy/backends/libspotify/__init__.py b/mopidy/backends/libspotify/__init__.py index 0b1f2483..b098d0ac 100644 --- a/mopidy/backends/libspotify/__init__.py +++ b/mopidy/backends/libspotify/__init__.py @@ -5,6 +5,10 @@ import os import multiprocessing import threading +import pygst +pygst.require('0.10') +import gst + from spotify import Link, SpotifyError from spotify.manager import SpotifySessionManager from spotify.alsahelper import AlsaController @@ -15,7 +19,6 @@ from mopidy.backends.base import (BaseBackend, BaseCurrentPlaylistController, BaseStoredPlaylistsController) from mopidy.models import Artist, Album, Track, Playlist -import gst logger = logging.getLogger('mopidy.backends.libspotify') ENCODING = 'utf-8' From 55b5645ba8caef2098fbb706197c459760c646cb Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:19:03 +0200 Subject: [PATCH 07/21] GStreamerMixer: Fix syntax error --- mopidy/mixers/gstreamer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mopidy/mixers/gstreamer.py b/mopidy/mixers/gstreamer.py index 83394bd0..3be94db0 100644 --- a/mopidy/mixers/gstreamer.py +++ b/mopidy/mixers/gstreamer.py @@ -7,8 +7,8 @@ class GStreamerMixer(BaseMixer): super(GStreamerMixer, self).__init__(*args, **kwargs) def _get_volume(self): - # TODO Get volume from GStreamerProcess + pass # TODO Get volume from GStreamerProcess def _set_volume(self, volume): - # TODO Send volume to GStreamerProcess + pass # TODO Send volume to GStreamerProcess From 5f16538f7e7f83864daf8c5b3937c012ca5a3991 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:19:55 +0200 Subject: [PATCH 08/21] Move (un)pickle_connection from mopidy.{utils => process}. Utils should be as small as possible. --- mopidy/frontends/mpd/server.py | 3 ++- mopidy/process.py | 13 ++++++++++++- mopidy/utils.py | 10 ---------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/mopidy/frontends/mpd/server.py b/mopidy/frontends/mpd/server.py index 57b6211f..5bdbb85a 100644 --- a/mopidy/frontends/mpd/server.py +++ b/mopidy/frontends/mpd/server.py @@ -8,7 +8,8 @@ import sys from mopidy import get_mpd_protocol_version, settings from mopidy.frontends.mpd.protocol import ENCODING, LINE_TERMINATOR -from mopidy.utils import indent, pickle_connection +from mopidy.process import pickle_connection +from mopidy.utils import indent logger = logging.getLogger('mopidy.frontends.mpd.server') diff --git a/mopidy/process.py b/mopidy/process.py index d3c1d03e..4a4fa1ae 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -1,12 +1,23 @@ import logging import multiprocessing +from multiprocessing.reduction import reduce_connection +import pickle import sys from mopidy import settings, SettingsError -from mopidy.utils import get_class, unpickle_connection +from mopidy.utils import get_class logger = logging.getLogger('mopidy.process') +def pickle_connection(connection): + return pickle.dumps(reduce_connection(connection)) + +def unpickle_connection(pickled_connection): + # From http://stackoverflow.com/questions/1446004 + (func, args) = pickle.loads(pickled_connection) + return func(*args) + + class BaseProcess(multiprocessing.Process): def run(self): try: diff --git a/mopidy/utils.py b/mopidy/utils.py index 7eac9239..ff032b4e 100644 --- a/mopidy/utils.py +++ b/mopidy/utils.py @@ -1,7 +1,5 @@ import logging -from multiprocessing.reduction import reduce_connection import os -import pickle import sys import urllib @@ -54,14 +52,6 @@ def indent(string, places=4, linebreak='\n'): result += linebreak + ' ' * places + line return result -def pickle_connection(connection): - return pickle.dumps(reduce_connection(connection)) - -def unpickle_connection(pickled_connection): - # From http://stackoverflow.com/questions/1446004 - (func, args) = pickle.loads(pickled_connection) - return func(*args) - def parse_m3u(file_path): """ Convert M3U file list of uris From 1bc62c50468a3b360b6ca5568ee426b60b99fcea Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:44:13 +0200 Subject: [PATCH 09/21] GStreamerProcess: One method is better than three --- mopidy/outputs/gstreamer.py | 44 +++++++++---------------------------- 1 file changed, 10 insertions(+), 34 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index d3746729..3accaaf1 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -107,49 +107,25 @@ class GStreamerProcess(BaseProcess): def play_uri(self, uri): """Play audio at URI""" - self.state_ready() + self.set_state('READY') self.gst_uri_src.set_property('uri', uri) - self.state_playing() + self.set_state('PLAYING') # TODO Return status - def state_playing(self): + def set_state(self, state_name): """ - Set the state to PLAYING. + Set the GStreamer state. Returns :class:`True` if successful. - Previous state should be READY or PAUSED. + :param state_name: READY, PLAYING, or PAUSED + :type state_name: string + :rtype: :class:`True` or :class:`False` """ - result = self.gst_uri_src.set_state(gst.STATE_PLAYING) + result = self.gst_uri_src.set_state(getattr(gst, 'STATE_' + state_name) if result == gst.STATE_CHANGE_SUCCESS: - logger.debug('Setting GStreamer state to PLAYING: OK') + logger.debug('Setting GStreamer state to %s: OK', state_name) return True else: - logger.warning('Setting GStreamer state to PLAYING: failed') - return False - - def state_paused(self): - """ - Set the state to PAUSED. - - Previous state should be PLAYING. - """ - result = self.gst_uri_src.set_state(gst.STATE_PAUSED) - if result == gst.STATE_CHANGE_SUCCESS: - logger.debug('Setting GStreamer state to PAUSED: OK') - return True - else: - logger.warning('Setting GStreamer state to PAUSED: failed') - return False - - def state_ready(self): - """ - Set the state to READY. - """ - result = self.gst_uri_src.set_state(gst.STATE_READY) - if result == gst.STATE_CHANGE_SUCCESS: - logger.debug('Setting GStreamer state to READY: OK') - return True - else: - logger.warning('Setting GStreamer state to READY: failed') + logger.warning('Setting GStreamer state to %s: failed', state_name) return False def get_volume(self): From 5dfc41e3eb2bc032a4e12ea72da8a802fd501580 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:44:44 +0200 Subject: [PATCH 10/21] GStreamerProcess: Add initial core_queue message processing --- mopidy/outputs/gstreamer.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 3accaaf1..16dd2f16 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -6,7 +6,7 @@ pygst.require('0.10') import gst import logging -from mopidy.process import BaseProcess +from mopidy.process import BaseProcess, unpickle_connection logger = logging.getLogger('mopidy.outputs.gstreamer') @@ -25,7 +25,7 @@ class GStreamerProcess(BaseProcess): """ A process for all work related to GStreamer. - The main loop polls for events from both Mopidy and GStreamer. + The main loop processes events from both Mopidy and GStreamer. """ def __init__(self, core_queue): @@ -86,11 +86,24 @@ class GStreamerProcess(BaseProcess): self.gst_sink) def process_core_message(self, message): - """Processes messages from the rest of Mopidy.""" - pass # TODO + """Process messages from the rest of Mopidy.""" + assert message['to'] == 'gstreamer', 'Message must be addressed to us' + if message['command'] == 'play_uri': + response = self.play_uri(message['uri']) + connection = unpickle_connection(message['reply_to']) + connection.send(response) + elif message['command'] == 'deliver_data': + # TODO Do we care about sending responses for every data delivery? + self.deliver_data(message['caps'], message['data']) + elif message['command'] == 'set_state': + response = self.set_state(message['state']) + connection = unpickle_connection(message['reply_to']) + connection.send(response) + else: + logger.warning(u'Cannot handle message: %s', message) def process_gst_message(self, bus, message): - """Processes message from GStreamer.""" + """Process messages from GStreamer.""" if message.type == gst.MESSAGE_EOS: pass # TODO Handle end of track/stream elif message.type == gst.MESSAGE_ERROR: From 485e1eb1e53039a6fb8818ad965fb7ec26aac582 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:52:32 +0200 Subject: [PATCH 11/21] Rename mopidy.process.BaseProcess.{_run => run_inside_try} --- mopidy/mixers/nad.py | 2 +- mopidy/outputs/gstreamer.py | 5 +++-- mopidy/process.py | 6 +++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/mopidy/mixers/nad.py b/mopidy/mixers/nad.py index 1f7f4710..56958005 100644 --- a/mopidy/mixers/nad.py +++ b/mopidy/mixers/nad.py @@ -83,7 +83,7 @@ class NadTalker(BaseProcess): self.pipe = pipe self._device = None - def _run(self): + def run_inside_try(self): self._open_connection() self._set_device_to_known_state() while self.pipe.poll(None): diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 16dd2f16..49c9d5af 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -40,7 +40,7 @@ class GStreamerProcess(BaseProcess): self.gst_volume = None self.gst_sink = None - def _run(self): + def run_inside_try(self): self.setup() while True: message = self.core_queue.get() @@ -133,7 +133,8 @@ class GStreamerProcess(BaseProcess): :type state_name: string :rtype: :class:`True` or :class:`False` """ - result = self.gst_uri_src.set_state(getattr(gst, 'STATE_' + state_name) + result = self.gst_uri_src.set_state( + getattr(gst, 'STATE_' + state_name)) if result == gst.STATE_CHANGE_SUCCESS: logger.debug('Setting GStreamer state to %s: OK', state_name) return True diff --git a/mopidy/process.py b/mopidy/process.py index 4a4fa1ae..8f11f3a8 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -21,7 +21,7 @@ def unpickle_connection(pickled_connection): class BaseProcess(multiprocessing.Process): def run(self): try: - self._run() + self.run_inside_try() except KeyboardInterrupt: logger.info(u'Interrupted by user') sys.exit(0) @@ -29,7 +29,7 @@ class BaseProcess(multiprocessing.Process): logger.error(e.message) sys.exit(1) - def _run(self): + def run_inside_try(self): raise NotImplementedError @@ -40,7 +40,7 @@ class CoreProcess(BaseProcess): self._backend = None self._frontend = None - def _run(self): + def run_inside_try(self): self._setup() while True: message = self.core_queue.get() From fa4c710007c2fe6779b974013d05a033a2e69eac Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 19:54:04 +0200 Subject: [PATCH 12/21] Get rid of all the _-prefixes in mopidy.process.CoreProcess. This is not Java --- mopidy/process.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/mopidy/process.py b/mopidy/process.py index 8f11f3a8..f23ba6fc 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -37,30 +37,30 @@ class CoreProcess(BaseProcess): def __init__(self, core_queue): super(CoreProcess, self).__init__() self.core_queue = core_queue - self._backend = None - self._frontend = None + self.backend = None + self.frontend = None def run_inside_try(self): - self._setup() + self.setup() while True: message = self.core_queue.get() - self._process_message(message) + self.process_message(message) - def _setup(self): - self._backend = get_class(settings.BACKENDS[0])( + def setup(self): + self.backend = get_class(settings.BACKENDS[0])( core_queue=self.core_queue) - self._frontend = get_class(settings.FRONTEND)(backend=self._backend) + self.frontend = get_class(settings.FRONTEND)(backend=self.backend) - def _process_message(self, message): + def process_message(self, message): if message['command'] == 'mpd_request': - response = self._frontend.handle_request(message['request']) + response = self.frontend.handle_request(message['request']) connection = unpickle_connection(message['reply_to']) connection.send(response) elif message['command'] == 'end_of_track': - self._backend.playback.end_of_track_callback() + self.backend.playback.end_of_track_callback() elif message['command'] == 'stop_playback': - self._backend.playback.stop() + self.backend.playback.stop() elif message['command'] == 'set_stored_playlists': - self._backend.stored_playlists.playlists = message['playlists'] + self.backend.stored_playlists.playlists = message['playlists'] else: logger.warning(u'Cannot handle message: %s', message) From 296af3c2af470577aa7f49d7ecaab80a1e8e5d13 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 22:54:46 +0200 Subject: [PATCH 13/21] GStreamerProcess: Now partly tested for the first time. Bunch of fixes. --- mopidy/outputs/gstreamer.py | 58 +++++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 49c9d5af..acae6493 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -17,8 +17,8 @@ class GStreamerOutput(object): Starts the :class:`GStreamerProcess`. """ - def __init__(self, core_queue): - process = GStreamerProcess(core_queue) + def __init__(self, core_queue, input_connection): + process = GStreamerProcess(core_queue, input_connection) process.start() class GStreamerProcess(BaseProcess): @@ -28,9 +28,10 @@ class GStreamerProcess(BaseProcess): The main loop processes events from both Mopidy and GStreamer. """ - def __init__(self, core_queue): + def __init__(self, core_queue, input_connection): super(GStreamerProcess, self).__init__() self.core_queue = core_queue + self.input_connection = input_connection self.gobject_context = None self.gst_pipeline = None self.gst_bus = None @@ -43,8 +44,11 @@ class GStreamerProcess(BaseProcess): def run_inside_try(self): self.setup() while True: - message = self.core_queue.get() - self.process_core_message(message) + # FIXME Should we block on poll() or not? Need to see iteration() + # behaviour first. + if self.input_connection.poll(): + message = self.input_connection.recv() + self.process_core_message(message) self.gobject_context.iteration(True) def setup(self): @@ -78,12 +82,25 @@ class GStreamerProcess(BaseProcess): self.gst_sink = gst.element_factory_make('autoaudiosink', 'sink') self.gst_pipeline.add(self.gst_sink) - # The audio URI chain - gst.element_link_many(self.gst_uri_src, self.gst_volume, self.gst_sink) + # Add callback that will link uri_src output with volume filter input + # when the output pad is ready. + # See http://stackoverflow.com/questions/2993777 for details. + def on_new_decoded_pad(dbin, pad, is_last): + uri_src = pad.get_parent() + pipeline = uri_src.get_parent() + volume = pipeline.get_by_name('volume') + uri_src.link(volume) + logger.debug("Linked uri_src's new decoded pad to volume filter") + # FIXME uridecodebin got no new-decoded-pad signal, but it's + # subcomponent decodebin2 got that signal. Fixing this is postponed + # till after data_src is up and running perfectly + #self.gst_uri_src.connect('new-decoded-pad', on_new_decoded_pad) - # The audio data chain - gst.element_link_many(self.gst_data_src, self.gst_volume, - self.gst_sink) + # Link data source output with volume filter input + self.gst_data_src.link(self.gst_volume) + + # Link volume filter output to audio sink input + self.gst_volume.link(self.gst_sink) def process_core_message(self, message): """Process messages from the rest of Mopidy.""" @@ -93,7 +110,7 @@ class GStreamerProcess(BaseProcess): connection = unpickle_connection(message['reply_to']) connection.send(response) elif message['command'] == 'deliver_data': - # TODO Do we care about sending responses for every data delivery? + # FIXME Do we care about sending responses for every data delivery? self.deliver_data(message['caps'], message['data']) elif message['command'] == 'set_state': response = self.set_state(message['state']) @@ -105,11 +122,13 @@ class GStreamerProcess(BaseProcess): def process_gst_message(self, bus, message): """Process messages from GStreamer.""" if message.type == gst.MESSAGE_EOS: - pass # TODO Handle end of track/stream + self.core_queue.put({'message': 'end_of_track'}) elif message.type == gst.MESSAGE_ERROR: - self.gst_bin.set_state(gst.STATE_NULL) + self.set_state('NULL') error, debug = message.parse_error() logger.error(u'%s %s', error, debug) + # FIXME Should we send 'stop_playback' to core here? Can we + # differentiate on how serious the error is? def deliver_data(self, caps_string, data): """Deliver audio data to be played""" @@ -129,11 +148,20 @@ class GStreamerProcess(BaseProcess): """ Set the GStreamer state. Returns :class:`True` if successful. - :param state_name: READY, PLAYING, or PAUSED + .. digraph:: gst_state_transitions + + "NULL" -> "READY" + "PAUSED" -> "PLAYING" + "PAUSED" -> "READY" + "PLAYING" -> "PAUSED" + "READY" -> "NULL" + "READY" -> "PAUSED" + + :param state_name: NULL, READY, PAUSED, or PLAYING :type state_name: string :rtype: :class:`True` or :class:`False` """ - result = self.gst_uri_src.set_state( + result = self.gst_pipeline.set_state( getattr(gst, 'STATE_' + state_name)) if result == gst.STATE_CHANGE_SUCCESS: logger.debug('Setting GStreamer state to %s: OK', state_name) From 6a88f62211faf21ccf015dec26f219f2dffc9f14 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 22:55:54 +0200 Subject: [PATCH 14/21] Add new setting 'OUTPUT' for selecting output handler --- mopidy/settings.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mopidy/settings.py b/mopidy/settings.py index 1192c28d..8fdc3535 100644 --- a/mopidy/settings.py +++ b/mopidy/settings.py @@ -105,6 +105,11 @@ MIXER_EXT_SPEAKERS_A = None #: Default: :class:`None`. MIXER_EXT_SPEAKERS_B = None +#: Audio output handler to use. Default:: +#: +#: OUTPUT = u'mopidy.outputs.gstreamer.GStreamerOutput' +OUTPUT = u'mopidy.outputs.gstreamer.GStreamerOutput' + #: Server to use. Default:: #: #: SERVER = u'mopidy.frontends.mpd.server.MpdServer' From 1e2dd6f46c6f5b455acd66034de47db2e501a3f1 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 22:57:12 +0200 Subject: [PATCH 15/21] Backend API: Add optional argument 'output_connection' to BaseBackend constructor --- docs/changes.rst | 3 +++ mopidy/backends/base/__init__.py | 6 +++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/docs/changes.rst b/docs/changes.rst index c8e4c912..811eb482 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -35,6 +35,9 @@ Another great release. - :meth:`mopidy.backends.base.BaseLibraryController.search()` now accepts keyword arguments of the form ``search(artist=['foo', 'fighters'], album=['bar', 'grooves'])``. + - :meth:`mopidy.backends.base.BaseBackend()` now accepts an + ``output_connection`` which it can use to send messages (i.e. audio data) + to the output process. diff --git a/mopidy/backends/base/__init__.py b/mopidy/backends/base/__init__.py index e79aceae..942b190e 100644 --- a/mopidy/backends/base/__init__.py +++ b/mopidy/backends/base/__init__.py @@ -23,13 +23,17 @@ class BaseBackend(object): :param core_queue: a queue for sending messages to :class:`mopidy.process.CoreProcess` :type core_queue: :class:`multiprocessing.Queue` + :param output_connection: a connection for sending messages to the + output process + :type output_connection: :class:`multiprocessing.Connection` :param mixer: either a mixer instance, or :class:`None` to use the mixer defined in settings :type mixer: :class:`mopidy.mixers.BaseMixer` or :class:`None` """ - def __init__(self, core_queue=None, mixer=None): + def __init__(self, core_queue=None, output_connection=None, mixer=None): self.core_queue = core_queue + self.output_connection = output_connection if mixer is not None: self.mixer = mixer else: From c81a1162a8e8e5964b989033ae2bba98881292d5 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 22:58:42 +0200 Subject: [PATCH 16/21] Instantiate GStreamerOutput. Create a pipe from CoreProcess and Backend to GStreamerOutput. --- mopidy/process.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/mopidy/process.py b/mopidy/process.py index f23ba6fc..94da3a58 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -37,6 +37,8 @@ class CoreProcess(BaseProcess): def __init__(self, core_queue): super(CoreProcess, self).__init__() self.core_queue = core_queue + self.output_connection = None + self.output = None self.backend = None self.frontend = None @@ -47,12 +49,15 @@ class CoreProcess(BaseProcess): self.process_message(message) def setup(self): - self.backend = get_class(settings.BACKENDS[0])( - core_queue=self.core_queue) - self.frontend = get_class(settings.FRONTEND)(backend=self.backend) + (recv_end, self.output_connection) = multiprocessing.Pipe(False) + self.output = get_class(settings.OUTPUT)(self.core_queue, recv_end) + self.backend = get_class(settings.BACKENDS[0])(self.core_queue) + self.frontend = get_class(settings.FRONTEND)(self.backend) def process_message(self, message): - if message['command'] == 'mpd_request': + if message.get('to') == 'output': + self.output_connection.send(message) + elif message['command'] == 'mpd_request': response = self.frontend.handle_request(message['request']) connection = unpickle_connection(message['reply_to']) connection.send(response) From 6d799a2bfacfa8d84397dd6f71c576849f3ab53f Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Wed, 11 Aug 2010 23:02:50 +0200 Subject: [PATCH 17/21] GStreamerProcess: Do not assert on a to-field in the message as the pipe is only used for sending messages to 'output' anyway --- mopidy/outputs/gstreamer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index acae6493..5415a5ee 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -48,7 +48,7 @@ class GStreamerProcess(BaseProcess): # behaviour first. if self.input_connection.poll(): message = self.input_connection.recv() - self.process_core_message(message) + self.process_mopidy_message(message) self.gobject_context.iteration(True) def setup(self): @@ -102,9 +102,8 @@ class GStreamerProcess(BaseProcess): # Link volume filter output to audio sink input self.gst_volume.link(self.gst_sink) - def process_core_message(self, message): + def process_mopidy_message(self, message): """Process messages from the rest of Mopidy.""" - assert message['to'] == 'gstreamer', 'Message must be addressed to us' if message['command'] == 'play_uri': response = self.play_uri(message['uri']) connection = unpickle_connection(message['reply_to']) From e55e560827452fd87708f41280babf52d7664d2e Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Thu, 12 Aug 2010 01:28:06 +0200 Subject: [PATCH 18/21] Helps to remember to send the output_connection to the backend --- mopidy/process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mopidy/process.py b/mopidy/process.py index 94da3a58..ff30160c 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -51,7 +51,8 @@ class CoreProcess(BaseProcess): def setup(self): (recv_end, self.output_connection) = multiprocessing.Pipe(False) self.output = get_class(settings.OUTPUT)(self.core_queue, recv_end) - self.backend = get_class(settings.BACKENDS[0])(self.core_queue) + self.backend = get_class(settings.BACKENDS[0])(self.core_queue, + self.output_connection) self.frontend = get_class(settings.FRONTEND)(self.backend) def process_message(self, message): From f488b0fe2d49db0d93118839924dadf59136032b Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Thu, 12 Aug 2010 01:29:07 +0200 Subject: [PATCH 19/21] GStreamerOutput: Now working for data deliveries from e.g. LibspotifyBackend --- mopidy/outputs/gstreamer.py | 64 +++++++++++++++++++++++++------------ 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index 5415a5ee..bda04b25 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -1,10 +1,12 @@ import gobject +gobject.threads_init() import pygst pygst.require('0.10') - import gst + import logging +import threading from mopidy.process import BaseProcess, unpickle_connection @@ -21,18 +23,26 @@ class GStreamerOutput(object): process = GStreamerProcess(core_queue, input_connection) process.start() +class GStreamerMessagesThread(threading.Thread): + def run(self): + gobject.MainLoop().run() + class GStreamerProcess(BaseProcess): """ A process for all work related to GStreamer. The main loop processes events from both Mopidy and GStreamer. + + Make sure this subprocess is started by the MainThread in the top-most + parent process, and not some other thread. If not, we can get into the + problems described at + http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html. """ def __init__(self, core_queue, input_connection): super(GStreamerProcess, self).__init__() self.core_queue = core_queue self.input_connection = input_connection - self.gobject_context = None self.gst_pipeline = None self.gst_bus = None self.gst_bus_id = None @@ -44,18 +54,17 @@ class GStreamerProcess(BaseProcess): def run_inside_try(self): self.setup() while True: - # FIXME Should we block on poll() or not? Need to see iteration() - # behaviour first. - if self.input_connection.poll(): + if self.input_connection.poll(None): message = self.input_connection.recv() self.process_mopidy_message(message) - self.gobject_context.iteration(True) def setup(self): - # See http://www.jejik.com/articles/2007/01/ - # python-gstreamer_threading_and_the_main_loop/ for details. - gobject.threads_init() - self.gobject_context = gobject.MainLoop().get_context() + logger.debug(u'Setting up GStreamer pipeline') + + # Start a helper thread that can run the gobject.MainLoop + messages_thread = GStreamerMessagesThread() + messages_thread.daemon = True + messages_thread.start() # A pipeline consisting of many elements self.gst_pipeline = gst.Pipeline("pipeline") @@ -67,8 +76,8 @@ class GStreamerProcess(BaseProcess): self.process_gst_message) # Bin for playing audio URIs - self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src') - self.gst_pipeline.add(self.gst_uri_src) + #self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src') + #self.gst_pipeline.add(self.gst_uri_src) # Bin for playing audio data self.gst_data_src = gst.element_factory_make('appsrc', 'data_src') @@ -109,8 +118,9 @@ class GStreamerProcess(BaseProcess): connection = unpickle_connection(message['reply_to']) connection.send(response) elif message['command'] == 'deliver_data': - # FIXME Do we care about sending responses for every data delivery? self.deliver_data(message['caps'], message['data']) + elif message['command'] == 'end_of_data_stream': + self.end_of_data_stream() elif message['command'] == 'set_state': response = self.set_state(message['state']) connection = unpickle_connection(message['reply_to']) @@ -121,7 +131,9 @@ class GStreamerProcess(BaseProcess): def process_gst_message(self, bus, message): """Process messages from GStreamer.""" if message.type == gst.MESSAGE_EOS: - self.core_queue.put({'message': 'end_of_track'}) + logger.debug(u'GStreamer signalled end-of-stream. ' + 'Sending end_of_track to core_queue ...') + self.core_queue.put({'command': 'end_of_track'}) elif message.type == gst.MESSAGE_ERROR: self.set_state('NULL') error, debug = message.parse_error() @@ -129,13 +141,6 @@ class GStreamerProcess(BaseProcess): # FIXME Should we send 'stop_playback' to core here? Can we # differentiate on how serious the error is? - def deliver_data(self, caps_string, data): - """Deliver audio data to be played""" - caps = gst.caps_from_string(caps_string) - buffer_ = gst.Buffer(data) - buffer_.set_caps(caps) - self.gst_data_src.emit('push-buffer', buffer_) - def play_uri(self, uri): """Play audio at URI""" self.set_state('READY') @@ -143,6 +148,23 @@ class GStreamerProcess(BaseProcess): self.set_state('PLAYING') # TODO Return status + def deliver_data(self, caps_string, data): + """Deliver audio data to be played""" + caps = gst.caps_from_string(caps_string) + buffer_ = gst.Buffer(buffer(data)) + buffer_.set_caps(caps) + self.gst_data_src.set_property('caps', caps) + self.gst_data_src.emit('push-buffer', buffer_) + + def end_of_data_stream(self): + """ + Add end-of-stream token to source. + + We will get a GStreamer message when the stream playback reaches the + token, and can then do any end-of-stream related tasks. + """ + self.gst_data_src.emit('end-of-stream') + def set_state(self, state_name): """ Set the GStreamer state. Returns :class:`True` if successful. From 816f877c4b2749990e5438a139ef12665035a282 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Thu, 12 Aug 2010 01:30:20 +0200 Subject: [PATCH 20/21] Change LibspotifyBackend from using GStreamer directly to just sending it's data to GStreamerOutput --- mopidy/backends/libspotify/__init__.py | 109 +++++++++---------------- 1 file changed, 38 insertions(+), 71 deletions(-) diff --git a/mopidy/backends/libspotify/__init__.py b/mopidy/backends/libspotify/__init__.py index b098d0ac..1fa07836 100644 --- a/mopidy/backends/libspotify/__init__.py +++ b/mopidy/backends/libspotify/__init__.py @@ -1,14 +1,9 @@ import datetime as dt -import gobject import logging import os import multiprocessing import threading -import pygst -pygst.require('0.10') -import gst - from spotify import Link, SpotifyError from spotify.manager import SpotifySessionManager from spotify.alsahelper import AlsaController @@ -18,6 +13,7 @@ from mopidy.backends.base import (BaseBackend, BaseCurrentPlaylistController, BaseLibraryController, BasePlaybackController, BaseStoredPlaylistsController) from mopidy.models import Artist, Album, Track, Playlist +from mopidy.process import pickle_connection logger = logging.getLogger('mopidy.backends.libspotify') @@ -47,7 +43,6 @@ class LibspotifyBackend(BaseBackend): self.stored_playlists = LibspotifyStoredPlaylistsController( backend=self) self.uri_handlers = [u'spotify:', u'http://open.spotify.com/'] - self.gstreamer_pipeline = gst.Pipeline("spotify_pipeline") self.spotify = self._connect() def _connect(self): @@ -55,7 +50,7 @@ class LibspotifyBackend(BaseBackend): spotify = LibspotifySessionManager( settings.SPOTIFY_USERNAME, settings.SPOTIFY_PASSWORD, core_queue=self.core_queue, - gstreamer_pipeline=self.gstreamer_pipeline) + output_connection=self.output_connection) spotify.start() return spotify @@ -97,13 +92,22 @@ class LibspotifyLibraryController(BaseLibraryController): class LibspotifyPlaybackController(BasePlaybackController): + def _set_output_state(self, state_name): + logger.debug(u'Setting output state to %s ...', state_name) + (my_end, other_end) = multiprocessing.Pipe() + self.backend.output_connection.send({ + 'command': 'set_state', + 'state': state_name, + 'reply_to': pickle_connection(other_end), + }) + my_end.poll(None) + return my_end.recv() + def _pause(self): - result = self.backend.gstreamer_pipeline.set_state(gst.STATE_PAUSED) - logger.debug('Changed gstreamer state to paused. Result was: %s' % result) - return result == gst.STATE_CHANGE_SUCCESS + return self._set_output_state('PAUSED') def _play(self, track): - self.backend.gstreamer_pipeline.set_state(gst.STATE_READY) + self._set_output_state('READY') if self.state == self.PLAYING: self.stop() if track.uri is None: @@ -112,24 +116,22 @@ class LibspotifyPlaybackController(BasePlaybackController): self.backend.spotify.session.load( Link.from_string(track.uri).as_track()) self.backend.spotify.session.play(1) - self.backend.gstreamer_pipeline.set_state(gst.STATE_PLAYING) + self._set_output_state('PLAYING') return True except SpotifyError as e: logger.warning('Play %s failed: %s', track.uri, e) return False def _resume(self): - result = self.backend.gstreamer_pipeline.set_state(gst.STATE_PLAYING) - logger.debug('Changed gstreamer state to playing. Result was: %s' % result) - return result == gst.STATE_CHANGE_SUCCESS + return self._set_output_state('PLAYING') def _seek(self, time_position): pass # TODO def _stop(self): - self.backend.gstreamer_pipeline.set_state(gst.STATE_READY) + result = self._set_output_state('READY') self.backend.spotify.session.play(0) - return True + return result class LibspotifyStoredPlaylistsController(BaseStoredPlaylistsController): @@ -202,57 +204,19 @@ class LibspotifyTranslator(object): tracks=[cls.to_mopidy_track(t) for t in spotify_playlist], ) -class GstreamerMessageBusProcess(threading.Thread): - def __init__(self, core_queue, pipeline): - super(GstreamerMessageBusProcess, self).__init__() - self.core_queue = core_queue - self.bus = pipeline.get_bus() - - def run(self): - loop = gobject.MainLoop() - gobject.threads_init() - context = loop.get_context() - while True: - message = self.bus.pop_filtered(gst.MESSAGE_EOS) - if message is not None: - self.core_queue.put({'command': 'end_of_track'}) - logger.debug('Got and handled Gstreamer message of type: %s' % message.type) - context.iteration(True) - class LibspotifySessionManager(SpotifySessionManager, threading.Thread): cache_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) settings_location = os.path.expanduser(settings.SPOTIFY_LIB_CACHE) appkey_file = os.path.expanduser(settings.SPOTIFY_LIB_APPKEY) user_agent = 'Mopidy %s' % get_version() - def __init__(self, username, password, core_queue, gstreamer_pipeline): + def __init__(self, username, password, core_queue, output_connection): SpotifySessionManager.__init__(self, username, password) threading.Thread.__init__(self) self.core_queue = core_queue + self.output_connection = output_connection self.connected = threading.Event() self.session = None - self.gstreamer_pipeline = gstreamer_pipeline - - cap_string = """audio/x-raw-int, - endianness=(int)1234, - channels=(int)2, - width=(int)16, - depth=(int)16, - signed=True, - rate=(int)44100""" - caps = gst.caps_from_string(cap_string) - - self.gsrc = gst.element_factory_make("appsrc", "app-source") - self.gsrc.set_property('caps', caps) - - self.gsink = gst.element_factory_make("autoaudiosink", "autosink") - - self.gstreamer_pipeline.add(self.gsrc, self.gsink) - - gst.element_link_many(self.gsrc, self.gsink) - - message_process = GstreamerMessageBusProcess(self.core_queue, self.gstreamer_pipeline) - message_process.start() def run(self): self.connect() @@ -294,17 +258,21 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def music_delivery(self, session, frames, frame_size, num_frames, sample_type, sample_rate, channels): """Callback used by pyspotify""" - cap_string = """audio/x-raw-int, - endianness=(int)1234, - channels=(int)2, - width=(int)16, - depth=(int)16, - signed=True, - rate=(int)44100""" - caps = gst.caps_from_string(cap_string) - b = gst.Buffer(frames) - b.set_caps(caps) - self.gsrc.emit('push-buffer', b) + # TODO Base caps_string on arguments + caps_string = """ + audio/x-raw-int, + endianness=(int)1234, + channels=(int)2, + width=(int)16, + depth=(int)16, + signed=True, + rate=(int)44100 + """ + self.output_connection.send({ + 'command': 'deliver_data', + 'caps': caps_string, + 'data': bytes(frames), + }) def play_token_lost(self, session): """Callback used by pyspotify""" @@ -317,9 +285,8 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def end_of_track(self, session): """Callback used by pyspotify""" - logger.debug('End of track.') - self.gsrc.emit('end-of-stream') - logger.debug('End of stream sent to gstreamer.') + logger.debug('End of data stream.') + self.output_connection.send({'command': 'end_of_data_stream'}) def search(self, query, connection): """Search method used by Mopidy backend""" From 5b0faa196d67866658bcee32de1df12b24263130 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Thu, 12 Aug 2010 01:38:11 +0200 Subject: [PATCH 21/21] Switch from output_connection to output_queue, as we are going to have multiple producers with time --- docs/changes.rst | 2 +- mopidy/backends/base/__init__.py | 9 ++++----- mopidy/backends/libspotify/__init__.py | 12 ++++++------ mopidy/outputs/gstreamer.py | 13 ++++++------- mopidy/process.py | 11 ++++++----- 5 files changed, 23 insertions(+), 24 deletions(-) diff --git a/docs/changes.rst b/docs/changes.rst index 811eb482..d38a135a 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -36,7 +36,7 @@ Another great release. keyword arguments of the form ``search(artist=['foo', 'fighters'], album=['bar', 'grooves'])``. - :meth:`mopidy.backends.base.BaseBackend()` now accepts an - ``output_connection`` which it can use to send messages (i.e. audio data) + ``output_queue`` which it can use to send messages (i.e. audio data) to the output process. diff --git a/mopidy/backends/base/__init__.py b/mopidy/backends/base/__init__.py index 942b190e..3a484865 100644 --- a/mopidy/backends/base/__init__.py +++ b/mopidy/backends/base/__init__.py @@ -23,17 +23,16 @@ class BaseBackend(object): :param core_queue: a queue for sending messages to :class:`mopidy.process.CoreProcess` :type core_queue: :class:`multiprocessing.Queue` - :param output_connection: a connection for sending messages to the - output process - :type output_connection: :class:`multiprocessing.Connection` + :param output_queue: a queue for sending messages to the output process + :type output_queue: :class:`multiprocessing.Queue` :param mixer: either a mixer instance, or :class:`None` to use the mixer defined in settings :type mixer: :class:`mopidy.mixers.BaseMixer` or :class:`None` """ - def __init__(self, core_queue=None, output_connection=None, mixer=None): + def __init__(self, core_queue=None, output_queue=None, mixer=None): self.core_queue = core_queue - self.output_connection = output_connection + self.output_queue = output_queue if mixer is not None: self.mixer = mixer else: diff --git a/mopidy/backends/libspotify/__init__.py b/mopidy/backends/libspotify/__init__.py index 1fa07836..cc853314 100644 --- a/mopidy/backends/libspotify/__init__.py +++ b/mopidy/backends/libspotify/__init__.py @@ -50,7 +50,7 @@ class LibspotifyBackend(BaseBackend): spotify = LibspotifySessionManager( settings.SPOTIFY_USERNAME, settings.SPOTIFY_PASSWORD, core_queue=self.core_queue, - output_connection=self.output_connection) + output_queue=self.output_queue) spotify.start() return spotify @@ -95,7 +95,7 @@ class LibspotifyPlaybackController(BasePlaybackController): def _set_output_state(self, state_name): logger.debug(u'Setting output state to %s ...', state_name) (my_end, other_end) = multiprocessing.Pipe() - self.backend.output_connection.send({ + self.backend.output_queue.put({ 'command': 'set_state', 'state': state_name, 'reply_to': pickle_connection(other_end), @@ -210,11 +210,11 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): appkey_file = os.path.expanduser(settings.SPOTIFY_LIB_APPKEY) user_agent = 'Mopidy %s' % get_version() - def __init__(self, username, password, core_queue, output_connection): + def __init__(self, username, password, core_queue, output_queue): SpotifySessionManager.__init__(self, username, password) threading.Thread.__init__(self) self.core_queue = core_queue - self.output_connection = output_connection + self.output_queue = output_queue self.connected = threading.Event() self.session = None @@ -268,7 +268,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): signed=True, rate=(int)44100 """ - self.output_connection.send({ + self.output_queue.put({ 'command': 'deliver_data', 'caps': caps_string, 'data': bytes(frames), @@ -286,7 +286,7 @@ class LibspotifySessionManager(SpotifySessionManager, threading.Thread): def end_of_track(self, session): """Callback used by pyspotify""" logger.debug('End of data stream.') - self.output_connection.send({'command': 'end_of_data_stream'}) + self.output_queue.put({'command': 'end_of_data_stream'}) def search(self, query, connection): """Search method used by Mopidy backend""" diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index bda04b25..65b65504 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -19,8 +19,8 @@ class GStreamerOutput(object): Starts the :class:`GStreamerProcess`. """ - def __init__(self, core_queue, input_connection): - process = GStreamerProcess(core_queue, input_connection) + def __init__(self, core_queue, output_queue): + process = GStreamerProcess(core_queue, output_queue) process.start() class GStreamerMessagesThread(threading.Thread): @@ -39,10 +39,10 @@ class GStreamerProcess(BaseProcess): http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html. """ - def __init__(self, core_queue, input_connection): + def __init__(self, core_queue, output_queue): super(GStreamerProcess, self).__init__() self.core_queue = core_queue - self.input_connection = input_connection + self.output_queue = output_queue self.gst_pipeline = None self.gst_bus = None self.gst_bus_id = None @@ -54,9 +54,8 @@ class GStreamerProcess(BaseProcess): def run_inside_try(self): self.setup() while True: - if self.input_connection.poll(None): - message = self.input_connection.recv() - self.process_mopidy_message(message) + message = self.output_queue.get() + self.process_mopidy_message(message) def setup(self): logger.debug(u'Setting up GStreamer pipeline') diff --git a/mopidy/process.py b/mopidy/process.py index ff30160c..9759c4e6 100644 --- a/mopidy/process.py +++ b/mopidy/process.py @@ -37,7 +37,7 @@ class CoreProcess(BaseProcess): def __init__(self, core_queue): super(CoreProcess, self).__init__() self.core_queue = core_queue - self.output_connection = None + self.output_queue = None self.output = None self.backend = None self.frontend = None @@ -49,15 +49,16 @@ class CoreProcess(BaseProcess): self.process_message(message) def setup(self): - (recv_end, self.output_connection) = multiprocessing.Pipe(False) - self.output = get_class(settings.OUTPUT)(self.core_queue, recv_end) + self.output_queue = multiprocessing.Queue() + self.output = get_class(settings.OUTPUT)(self.core_queue, + self.output_queue) self.backend = get_class(settings.BACKENDS[0])(self.core_queue, - self.output_connection) + self.output_queue) self.frontend = get_class(settings.FRONTEND)(self.backend) def process_message(self, message): if message.get('to') == 'output': - self.output_connection.send(message) + self.output_queue.put(message) elif message['command'] == 'mpd_request': response = self.frontend.handle_request(message['request']) connection = unpickle_connection(message['reply_to'])