From 94fdac04a12348ca1efa1427d6a18414008b8f24 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Mon, 10 Sep 2012 00:31:03 +0200 Subject: [PATCH 1/8] Cleanup after GStreamer actor Unregister callbacks and release pipeline resources when GStreamer actor shuts down. Fixes #185. --- mopidy/gstreamer.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index d9157a02..c25dde47 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -47,6 +47,10 @@ class GStreamer(ThreadingActor): self._setup_mixer() self._setup_message_processor() + def on_stop(self): + self._teardown_message_processor() + self._teardown_pipeline() + def _setup_pipeline(self): # TODO: replace with and input bin so we simply have an input bin we # connect to an output bin with a mixer on the side. set_uri on bin? @@ -65,6 +69,9 @@ class GStreamer(ThreadingActor): self._uridecodebin.connect('pad-added', self._on_new_pad, self._pipeline.get_by_name('queue').get_pad('sink')) + def _teardown_pipeline(self): + self._pipeline.set_state(gst.STATE_NULL) + def _setup_output(self): # This will raise a gobject.GError if the description is bad. self._output = gst.parse_bin_from_description( @@ -118,6 +125,10 @@ class GStreamer(ThreadingActor): bus.add_signal_watch() bus.connect('message', self._on_message) + def _teardown_message_processor(self): + bus = self._pipeline.get_bus() + bus.remove_signal_watch() + def _on_new_source(self, element, pad): self._source = element.get_property('source') try: From 1c4ee46c4cf382bcc111e09b4c511743d4d8dbab Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Mon, 10 Sep 2012 00:31:41 +0200 Subject: [PATCH 2/8] Move GStreamer setup back into the actor thread Make sure to terminate the whole process on GError exceptions, so that we fail quickly on non-working output pipelines. --- mopidy/gstreamer.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index c25dde47..a1ddc93e 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -1,6 +1,7 @@ import pygst pygst.require('0.10') import gst +import gobject import logging @@ -9,7 +10,10 @@ from pykka.registry import ActorRegistry from mopidy import settings, utils from mopidy.backends.base import Backend -from mopidy import mixers # Trigger install of gst mixer plugins. +from mopidy.utils import process + +# Trigger install of gst mixer plugins +from mopidy import mixers logger = logging.getLogger('mopidy.gstreamer') @@ -42,10 +46,15 @@ class GStreamer(ThreadingActor): self._output = None self._mixer = None - self._setup_pipeline() - self._setup_output() - self._setup_mixer() - self._setup_message_processor() + def on_start(self): + try: + self._setup_pipeline() + self._setup_output() + self._setup_mixer() + self._setup_message_processor() + except gobject.GError as ex: + logger.exception(ex) + process.exit_process() def on_stop(self): self._teardown_message_processor() From 4bffea8b1f44c21e5e10ba9b2029d669791642bb Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Mon, 10 Sep 2012 00:32:36 +0200 Subject: [PATCH 3/8] Test the GStreamer class as an actor The test should use the same interface and code paths as production code. --- tests/gstreamer_test.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/gstreamer_test.py b/tests/gstreamer_test.py index 790394f5..ce20d2b4 100644 --- a/tests/gstreamer_test.py +++ b/tests/gstreamer_test.py @@ -14,9 +14,10 @@ class GStreamerTest(unittest.TestCase): settings.MIXER = 'fakemixer track_max_volume=65536' settings.OUTPUT = 'fakesink' self.song_uri = path_to_uri(path_to_data_dir('song1.wav')) - self.gstreamer = GStreamer() + self.gstreamer = GStreamer.start().proxy() def tearDown(self): + self.gstreamer.stop() settings.runtime.clear() def prepare_uri(self, uri): @@ -25,21 +26,21 @@ class GStreamerTest(unittest.TestCase): def test_start_playback_existing_file(self): self.prepare_uri(self.song_uri) - self.assertTrue(self.gstreamer.start_playback()) + self.assertTrue(self.gstreamer.start_playback().get()) def test_start_playback_non_existing_file(self): self.prepare_uri(self.song_uri + 'bogus') - self.assertFalse(self.gstreamer.start_playback()) + self.assertFalse(self.gstreamer.start_playback().get()) def test_pause_playback_while_playing(self): self.prepare_uri(self.song_uri) self.gstreamer.start_playback() - self.assertTrue(self.gstreamer.pause_playback()) + self.assertTrue(self.gstreamer.pause_playback().get()) def test_stop_playback_while_playing(self): self.prepare_uri(self.song_uri) self.gstreamer.start_playback() - self.assertTrue(self.gstreamer.stop_playback()) + self.assertTrue(self.gstreamer.stop_playback().get()) @unittest.SkipTest def test_deliver_data(self): @@ -51,8 +52,8 @@ class GStreamerTest(unittest.TestCase): def test_set_volume(self): for value in range(0, 101): - self.assertTrue(self.gstreamer.set_volume(value)) - self.assertEqual(value, self.gstreamer.get_volume()) + self.assertTrue(self.gstreamer.set_volume(value).get()) + self.assertEqual(value, self.gstreamer.get_volume().get()) @unittest.SkipTest def test_set_state_encapsulation(self): From 2f33a6c4ffd780ac9c2fe4d881f516e265032d9a Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Mon, 10 Sep 2012 23:19:40 +0200 Subject: [PATCH 4/8] Only remove signal watch if it has been added If removing unconditionally, we get the following error message: GStreamer-CRITICAL **: Bus bus2 has no signal watches attached --- mopidy/gstreamer.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index a1ddc93e..b8b30d14 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -32,6 +32,7 @@ class GStreamer(ThreadingActor): def __init__(self): super(GStreamer, self).__init__() + self._default_caps = gst.Caps(""" audio/x-raw-int, endianness=(int)1234, @@ -40,12 +41,15 @@ class GStreamer(ThreadingActor): depth=(int)16, signed=(boolean)true, rate=(int)44100""") + self._pipeline = None self._source = None self._uridecodebin = None self._output = None self._mixer = None + self._message_processor_set_up = False + def on_start(self): try: self._setup_pipeline() @@ -133,10 +137,12 @@ class GStreamer(ThreadingActor): bus = self._pipeline.get_bus() bus.add_signal_watch() bus.connect('message', self._on_message) + self._message_processor_set_up = True def _teardown_message_processor(self): - bus = self._pipeline.get_bus() - bus.remove_signal_watch() + if self._message_processor_set_up: + bus = self._pipeline.get_bus() + bus.remove_signal_watch() def _on_new_source(self, element, pad): self._source = element.get_property('source') From 0e56b15fccbfeadfca13e459197e7102861b6386 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Tue, 11 Sep 2012 10:08:59 +0200 Subject: [PATCH 5/8] Teardown GStreamer mixer --- mopidy/gstreamer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index b8b30d14..5bdb7b39 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -62,6 +62,7 @@ class GStreamer(ThreadingActor): def on_stop(self): self._teardown_message_processor() + self._teardown_mixer() self._teardown_pipeline() def _setup_pipeline(self): @@ -133,6 +134,11 @@ class GStreamer(ThreadingActor): gst.interfaces.MIXER_TRACK_OUTPUT): return track + def _teardown_mixer(self): + if self._mixer is not None: + (mixer, track) = self._mixer + mixer.set_state(gst.STATE_NULL) + def _setup_message_processor(self): bus = self._pipeline.get_bus() bus.add_signal_watch() From 51256e18949bbb2acdc9fac97e934511bcfc831b Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Tue, 11 Sep 2012 11:18:08 +0200 Subject: [PATCH 6/8] Log warning message if mixer creation fails --- mopidy/gstreamer.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 5bdb7b39..2ebe2d71 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -101,8 +101,12 @@ class GStreamer(ThreadingActor): logger.info('Not setting up mixer.') return - # This will raise a gobject.GError if the description is bad. - mixerbin = gst.parse_bin_from_description(settings.MIXER, False) + try: + mixerbin = gst.parse_bin_from_description(settings.MIXER, False) + except gobject.GError as ex: + logger.warning('Failed to create mixer "%s": %s', + settings.MIXER, ex) + return # We assume that the bin will contain a single mixer. mixer = mixerbin.get_by_interface('GstMixer') From a1146c2964162ef4cb71e297688dfe34c7ec9797 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Tue, 11 Sep 2012 13:52:43 +0200 Subject: [PATCH 7/8] Log error and exit if output creation fails --- mopidy/gstreamer.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index 2ebe2d71..f131fdfc 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -87,9 +87,14 @@ class GStreamer(ThreadingActor): self._pipeline.set_state(gst.STATE_NULL) def _setup_output(self): - # This will raise a gobject.GError if the description is bad. - self._output = gst.parse_bin_from_description( - settings.OUTPUT, ghost_unconnected_pads=True) + try: + self._output = gst.parse_bin_from_description( + settings.OUTPUT, ghost_unconnected_pads=True) + except gobject.GError as ex: + logger.error('Failed to create output "%s": %s', + settings.OUTPUT, ex) + process.exit_process() + return self._pipeline.add(self._output) gst.element_link_many(self._pipeline.get_by_name('queue'), From a3ea1bc97a0781c3cb845244b1fd042c1767633f Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Tue, 11 Sep 2012 15:37:46 +0200 Subject: [PATCH 8/8] Use kwarg to make meaning obvious --- mopidy/gstreamer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mopidy/gstreamer.py b/mopidy/gstreamer.py index f131fdfc..6dc7b0aa 100644 --- a/mopidy/gstreamer.py +++ b/mopidy/gstreamer.py @@ -107,7 +107,8 @@ class GStreamer(ThreadingActor): return try: - mixerbin = gst.parse_bin_from_description(settings.MIXER, False) + mixerbin = gst.parse_bin_from_description(settings.MIXER, + ghost_unconnected_pads=False) except gobject.GError as ex: logger.warning('Failed to create mixer "%s": %s', settings.MIXER, ex)