Merge pull request #188 from jodal/feature/gstreamer-resource-cleanup

GStreamer resource cleanup
This commit is contained in:
Stein Magnus Jodal 2012-09-11 06:46:04 -07:00
commit 32987d7a00
2 changed files with 60 additions and 17 deletions

View File

@ -1,6 +1,7 @@
import pygst
pygst.require('0.10')
import gst
import gobject
import logging
@ -9,7 +10,10 @@ from pykka.registry import ActorRegistry
from mopidy import settings, utils
from mopidy.backends.base import Backend
from mopidy import mixers # Trigger install of gst mixer plugins.
from mopidy.utils import process
# Trigger install of gst mixer plugins
from mopidy import mixers
logger = logging.getLogger('mopidy.gstreamer')
@ -28,6 +32,7 @@ class GStreamer(ThreadingActor):
def __init__(self):
super(GStreamer, self).__init__()
self._default_caps = gst.Caps("""
audio/x-raw-int,
endianness=(int)1234,
@ -36,16 +41,29 @@ class GStreamer(ThreadingActor):
depth=(int)16,
signed=(boolean)true,
rate=(int)44100""")
self._pipeline = None
self._source = None
self._uridecodebin = None
self._output = None
self._mixer = None
self._setup_pipeline()
self._setup_output()
self._setup_mixer()
self._setup_message_processor()
self._message_processor_set_up = False
def on_start(self):
try:
self._setup_pipeline()
self._setup_output()
self._setup_mixer()
self._setup_message_processor()
except gobject.GError as ex:
logger.exception(ex)
process.exit_process()
def on_stop(self):
self._teardown_message_processor()
self._teardown_mixer()
self._teardown_pipeline()
def _setup_pipeline(self):
# TODO: replace with and input bin so we simply have an input bin we
@ -65,10 +83,18 @@ class GStreamer(ThreadingActor):
self._uridecodebin.connect('pad-added', self._on_new_pad,
self._pipeline.get_by_name('queue').get_pad('sink'))
def _teardown_pipeline(self):
self._pipeline.set_state(gst.STATE_NULL)
def _setup_output(self):
# This will raise a gobject.GError if the description is bad.
self._output = gst.parse_bin_from_description(
settings.OUTPUT, ghost_unconnected_pads=True)
try:
self._output = gst.parse_bin_from_description(
settings.OUTPUT, ghost_unconnected_pads=True)
except gobject.GError as ex:
logger.error('Failed to create output "%s": %s',
settings.OUTPUT, ex)
process.exit_process()
return
self._pipeline.add(self._output)
gst.element_link_many(self._pipeline.get_by_name('queue'),
@ -80,8 +106,13 @@ class GStreamer(ThreadingActor):
logger.info('Not setting up mixer.')
return
# This will raise a gobject.GError if the description is bad.
mixerbin = gst.parse_bin_from_description(settings.MIXER, False)
try:
mixerbin = gst.parse_bin_from_description(settings.MIXER,
ghost_unconnected_pads=False)
except gobject.GError as ex:
logger.warning('Failed to create mixer "%s": %s',
settings.MIXER, ex)
return
# We assume that the bin will contain a single mixer.
mixer = mixerbin.get_by_interface('GstMixer')
@ -113,10 +144,21 @@ class GStreamer(ThreadingActor):
gst.interfaces.MIXER_TRACK_OUTPUT):
return track
def _teardown_mixer(self):
if self._mixer is not None:
(mixer, track) = self._mixer
mixer.set_state(gst.STATE_NULL)
def _setup_message_processor(self):
bus = self._pipeline.get_bus()
bus.add_signal_watch()
bus.connect('message', self._on_message)
self._message_processor_set_up = True
def _teardown_message_processor(self):
if self._message_processor_set_up:
bus = self._pipeline.get_bus()
bus.remove_signal_watch()
def _on_new_source(self, element, pad):
self._source = element.get_property('source')

View File

@ -14,9 +14,10 @@ class GStreamerTest(unittest.TestCase):
settings.MIXER = 'fakemixer track_max_volume=65536'
settings.OUTPUT = 'fakesink'
self.song_uri = path_to_uri(path_to_data_dir('song1.wav'))
self.gstreamer = GStreamer()
self.gstreamer = GStreamer.start().proxy()
def tearDown(self):
self.gstreamer.stop()
settings.runtime.clear()
def prepare_uri(self, uri):
@ -25,21 +26,21 @@ class GStreamerTest(unittest.TestCase):
def test_start_playback_existing_file(self):
self.prepare_uri(self.song_uri)
self.assertTrue(self.gstreamer.start_playback())
self.assertTrue(self.gstreamer.start_playback().get())
def test_start_playback_non_existing_file(self):
self.prepare_uri(self.song_uri + 'bogus')
self.assertFalse(self.gstreamer.start_playback())
self.assertFalse(self.gstreamer.start_playback().get())
def test_pause_playback_while_playing(self):
self.prepare_uri(self.song_uri)
self.gstreamer.start_playback()
self.assertTrue(self.gstreamer.pause_playback())
self.assertTrue(self.gstreamer.pause_playback().get())
def test_stop_playback_while_playing(self):
self.prepare_uri(self.song_uri)
self.gstreamer.start_playback()
self.assertTrue(self.gstreamer.stop_playback())
self.assertTrue(self.gstreamer.stop_playback().get())
@unittest.SkipTest
def test_deliver_data(self):
@ -51,8 +52,8 @@ class GStreamerTest(unittest.TestCase):
def test_set_volume(self):
for value in range(0, 101):
self.assertTrue(self.gstreamer.set_volume(value))
self.assertEqual(value, self.gstreamer.get_volume())
self.assertTrue(self.gstreamer.set_volume(value).get())
self.assertEqual(value, self.gstreamer.get_volume().get())
@unittest.SkipTest
def test_set_state_encapsulation(self):