GStreamerOutput: Now working for data deliveries from e.g. LibspotifyBackend

This commit is contained in:
Stein Magnus Jodal 2010-08-12 01:29:07 +02:00
parent e55e560827
commit f488b0fe2d

View File

@ -1,10 +1,12 @@
import gobject
gobject.threads_init()
import pygst
pygst.require('0.10')
import gst
import logging
import threading
from mopidy.process import BaseProcess, unpickle_connection
@ -21,18 +23,26 @@ class GStreamerOutput(object):
process = GStreamerProcess(core_queue, input_connection)
process.start()
class GStreamerMessagesThread(threading.Thread):
def run(self):
gobject.MainLoop().run()
class GStreamerProcess(BaseProcess):
"""
A process for all work related to GStreamer.
The main loop processes events from both Mopidy and GStreamer.
Make sure this subprocess is started by the MainThread in the top-most
parent process, and not some other thread. If not, we can get into the
problems described at
http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html.
"""
def __init__(self, core_queue, input_connection):
super(GStreamerProcess, self).__init__()
self.core_queue = core_queue
self.input_connection = input_connection
self.gobject_context = None
self.gst_pipeline = None
self.gst_bus = None
self.gst_bus_id = None
@ -44,18 +54,17 @@ class GStreamerProcess(BaseProcess):
def run_inside_try(self):
self.setup()
while True:
# FIXME Should we block on poll() or not? Need to see iteration()
# behaviour first.
if self.input_connection.poll():
if self.input_connection.poll(None):
message = self.input_connection.recv()
self.process_mopidy_message(message)
self.gobject_context.iteration(True)
def setup(self):
# See http://www.jejik.com/articles/2007/01/
# python-gstreamer_threading_and_the_main_loop/ for details.
gobject.threads_init()
self.gobject_context = gobject.MainLoop().get_context()
logger.debug(u'Setting up GStreamer pipeline')
# Start a helper thread that can run the gobject.MainLoop
messages_thread = GStreamerMessagesThread()
messages_thread.daemon = True
messages_thread.start()
# A pipeline consisting of many elements
self.gst_pipeline = gst.Pipeline("pipeline")
@ -67,8 +76,8 @@ class GStreamerProcess(BaseProcess):
self.process_gst_message)
# Bin for playing audio URIs
self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src')
self.gst_pipeline.add(self.gst_uri_src)
#self.gst_uri_src = gst.element_factory_make('uridecodebin', 'uri_src')
#self.gst_pipeline.add(self.gst_uri_src)
# Bin for playing audio data
self.gst_data_src = gst.element_factory_make('appsrc', 'data_src')
@ -109,8 +118,9 @@ class GStreamerProcess(BaseProcess):
connection = unpickle_connection(message['reply_to'])
connection.send(response)
elif message['command'] == 'deliver_data':
# FIXME Do we care about sending responses for every data delivery?
self.deliver_data(message['caps'], message['data'])
elif message['command'] == 'end_of_data_stream':
self.end_of_data_stream()
elif message['command'] == 'set_state':
response = self.set_state(message['state'])
connection = unpickle_connection(message['reply_to'])
@ -121,7 +131,9 @@ class GStreamerProcess(BaseProcess):
def process_gst_message(self, bus, message):
"""Process messages from GStreamer."""
if message.type == gst.MESSAGE_EOS:
self.core_queue.put({'message': 'end_of_track'})
logger.debug(u'GStreamer signalled end-of-stream. '
'Sending end_of_track to core_queue ...')
self.core_queue.put({'command': 'end_of_track'})
elif message.type == gst.MESSAGE_ERROR:
self.set_state('NULL')
error, debug = message.parse_error()
@ -129,13 +141,6 @@ class GStreamerProcess(BaseProcess):
# FIXME Should we send 'stop_playback' to core here? Can we
# differentiate on how serious the error is?
def deliver_data(self, caps_string, data):
"""Deliver audio data to be played"""
caps = gst.caps_from_string(caps_string)
buffer_ = gst.Buffer(data)
buffer_.set_caps(caps)
self.gst_data_src.emit('push-buffer', buffer_)
def play_uri(self, uri):
"""Play audio at URI"""
self.set_state('READY')
@ -143,6 +148,23 @@ class GStreamerProcess(BaseProcess):
self.set_state('PLAYING')
# TODO Return status
def deliver_data(self, caps_string, data):
"""Deliver audio data to be played"""
caps = gst.caps_from_string(caps_string)
buffer_ = gst.Buffer(buffer(data))
buffer_.set_caps(caps)
self.gst_data_src.set_property('caps', caps)
self.gst_data_src.emit('push-buffer', buffer_)
def end_of_data_stream(self):
"""
Add end-of-stream token to source.
We will get a GStreamer message when the stream playback reaches the
token, and can then do any end-of-stream related tasks.
"""
self.gst_data_src.emit('end-of-stream')
def set_state(self, state_name):
"""
Set the GStreamer state. Returns :class:`True` if successful.