Merge pull request #814 from adamcik/feature/audio-actor-refactor
mopidy.audio.actor refactoring and cleanups
This commit is contained in:
commit
9182278bc6
@ -7,9 +7,11 @@ import gobject
|
||||
import pygst
|
||||
pygst.require('0.10')
|
||||
import gst # noqa
|
||||
import gst.pbutils
|
||||
|
||||
import pykka
|
||||
|
||||
from mopidy import exceptions
|
||||
from mopidy.audio import playlists, utils
|
||||
from mopidy.audio.constants import PlaybackState
|
||||
from mopidy.audio.listener import AudioListener
|
||||
@ -18,6 +20,11 @@ from mopidy.utils import process
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# This logger is only meant for debug logging of low level gstreamer info such
|
||||
# as callbacks, event, messages and direct interaction with GStreamer such as
|
||||
# set_state on a pipeline.
|
||||
gst_logger = logging.getLogger('mopidy.audio.gst')
|
||||
|
||||
playlists.register_typefinders()
|
||||
playlists.register_elements()
|
||||
|
||||
@ -45,7 +52,210 @@ PLAYBIN_FLAGS = (1 << 1) | (1 << 4) | (1 << 7)
|
||||
PLAYBIN_VIS_FLAGS = PLAYBIN_FLAGS | (1 << 3)
|
||||
|
||||
|
||||
# TODO: split out mixer as these are too intertwined right now
|
||||
class _Signals(object):
|
||||
"""Helper for tracking gobject signal registrations"""
|
||||
def __init__(self):
|
||||
self._ids = {}
|
||||
|
||||
def connect(self, element, event, func, *args):
|
||||
"""Connect a function + args to signal event on an element.
|
||||
|
||||
Each event may only be handled by one callback in this implementation.
|
||||
"""
|
||||
assert (element, event) not in self._ids
|
||||
self._ids[(element, event)] = element.connect(event, func, *args)
|
||||
|
||||
def disconnect(self, element, event):
|
||||
"""Disconnect whatever handler we have for and element+event pair.
|
||||
|
||||
Does nothing it the handler has already been removed.
|
||||
"""
|
||||
signal_id = self._ids.pop((element, event), None)
|
||||
if signal_id is not None:
|
||||
element.disconnect(signal_id)
|
||||
|
||||
def clear(self):
|
||||
"""Clear all registered signal handlers."""
|
||||
for element, event in self._ids.keys():
|
||||
element.disconnect(self._ids.pop((element, event)))
|
||||
|
||||
|
||||
# TODO: expose this as a property on audio?
|
||||
class _Appsrc(object):
|
||||
"""Helper class for dealing with appsrc based playback."""
|
||||
def __init__(self):
|
||||
self._signals = _Signals()
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
"""Reset the helper.
|
||||
|
||||
Should be called whenever the source changes and we are not setting up
|
||||
a new appsrc.
|
||||
"""
|
||||
self.prepare(None, None, None, None)
|
||||
|
||||
def prepare(self, caps, need_data, enough_data, seek_data):
|
||||
"""Store info we will need when the appsrc element gets installed."""
|
||||
self._signals.clear()
|
||||
self._source = None
|
||||
self._caps = caps
|
||||
self._need_data_callback = need_data
|
||||
self._seek_data_callback = seek_data
|
||||
self._enough_data_callback = enough_data
|
||||
|
||||
def configure(self, source):
|
||||
"""Configure the supplied source for use.
|
||||
|
||||
Should be called whenever we get a new appsrc.
|
||||
"""
|
||||
source.set_property('caps', self._caps)
|
||||
source.set_property('format', b'time')
|
||||
source.set_property('stream-type', b'seekable')
|
||||
source.set_property('max-bytes', 1 * MB)
|
||||
source.set_property('min-percent', 50)
|
||||
|
||||
if self._need_data_callback:
|
||||
self._signals.connect(source, 'need-data', self._on_signal,
|
||||
self._need_data_callback)
|
||||
if self._seek_data_callback:
|
||||
self._signals.connect(source, 'seek-data', self._on_signal,
|
||||
self._seek_data_callback)
|
||||
if self._enough_data_callback:
|
||||
self._signals.connect(source, 'enough-data', self._on_signal, None,
|
||||
self._enough_data_callback)
|
||||
|
||||
self._source = source
|
||||
|
||||
def push(self, buffer_):
|
||||
return self._source.emit('push-buffer', buffer_) == gst.FLOW_OK
|
||||
|
||||
def end_of_stream(self):
|
||||
self._source.emit('end-of-stream')
|
||||
|
||||
def _on_signal(self, element, clocktime, func):
|
||||
# This shim is used to ensure we always return true, and also handles
|
||||
# that not all the callbacks have a time argument.
|
||||
if clocktime is None:
|
||||
func()
|
||||
else:
|
||||
func(utils.clocktime_to_millisecond(clocktime))
|
||||
return True
|
||||
|
||||
|
||||
# TODO: expose this as a property on audio when #790 gets further along.
|
||||
class _Outputs(gst.Bin):
|
||||
def __init__(self):
|
||||
gst.Bin.__init__(self)
|
||||
|
||||
self._tee = gst.element_factory_make('tee')
|
||||
self.add(self._tee)
|
||||
|
||||
# Queue element to buy us time between the about to finish event and
|
||||
# the actual switch, i.e. about to switch can block for longer thanks
|
||||
# to this queue.
|
||||
# TODO: make the min-max values a setting?
|
||||
# TODO: move out of this class?
|
||||
queue = gst.element_factory_make('queue')
|
||||
queue.set_property('max-size-buffers', 0)
|
||||
queue.set_property('max-size-bytes', 0)
|
||||
queue.set_property('max-size-time', 5 * gst.SECOND)
|
||||
queue.set_property('min-threshold-time', 3 * gst.SECOND)
|
||||
self.add(queue)
|
||||
|
||||
queue.link(self._tee)
|
||||
|
||||
ghost_pad = gst.GhostPad('sink', queue.get_pad('sink'))
|
||||
self.add_pad(ghost_pad)
|
||||
|
||||
# Add an always connected fakesink so the tee doesn't fail.
|
||||
# XXX disabled for now as we get one stream changed per sink...
|
||||
# self._add(gst.element_factory_make('fakesink'))
|
||||
|
||||
def add_output(self, description):
|
||||
# XXX This only works for pipelines not in use until #790 gets done.
|
||||
try:
|
||||
output = gst.parse_bin_from_description(
|
||||
description, ghost_unconnected_pads=True)
|
||||
except gobject.GError as ex:
|
||||
logger.error(
|
||||
'Failed to create audio output "%s": %s', description, ex)
|
||||
raise exceptions.AudioException(bytes(ex))
|
||||
|
||||
self._add(output)
|
||||
logger.info('Audio output set to "%s"', description)
|
||||
|
||||
def _add(self, element):
|
||||
# All tee branches need a queue in front of them.
|
||||
queue = gst.element_factory_make('queue')
|
||||
self.add(element)
|
||||
self.add(queue)
|
||||
queue.link(element)
|
||||
self._tee.link(queue)
|
||||
|
||||
|
||||
class SoftwareMixer(object):
|
||||
pykka_traversable = True
|
||||
|
||||
def __init__(self, mixer):
|
||||
self._mixer = mixer
|
||||
self._element = None
|
||||
self._last_volume = None
|
||||
self._last_mute = None
|
||||
self._signals = _Signals()
|
||||
|
||||
def setup(self, element, mixer_ref):
|
||||
self._element = element
|
||||
|
||||
self._signals.connect(element, 'notify::volume', self._volume_changed)
|
||||
self._signals.connect(element, 'notify::mute', self._mute_changed)
|
||||
|
||||
self._mixer.setup(mixer_ref)
|
||||
|
||||
def teardown(self):
|
||||
self._signals.clear()
|
||||
self._mixer.teardown()
|
||||
|
||||
def get_volume(self):
|
||||
return int(round(self._element.get_property('volume') * 100))
|
||||
|
||||
def set_volume(self, volume):
|
||||
self._element.set_property('volume', volume / 100.0)
|
||||
|
||||
def get_mute(self):
|
||||
return self._element.get_property('mute')
|
||||
|
||||
def set_mute(self, mute):
|
||||
return self._element.set_property('mute', bool(mute))
|
||||
|
||||
def _volume_changed(self, element, property_):
|
||||
old_volume, self._last_volume = self._last_volume, self.get_volume()
|
||||
if old_volume != self._last_volume:
|
||||
gst_logger.debug('Notify volume: %s', self._last_volume / 100.0)
|
||||
self._mixer.trigger_volume_changed(self._last_volume)
|
||||
|
||||
def _mute_changed(self, element, property_):
|
||||
old_mute, self._last_mute = self._last_mute, self.get_mute()
|
||||
if old_mute != self._last_mute:
|
||||
gst_logger.debug('Notify mute: %s', self._last_mute)
|
||||
self._mixer.trigger_mute_changed(self._last_mute)
|
||||
|
||||
|
||||
def setup_proxy(element, config):
|
||||
# TODO: reuse in scanner code
|
||||
if not config.get('hostname'):
|
||||
return
|
||||
|
||||
proxy = "%s://%s:%d" % (config.get('scheme', 'http'),
|
||||
config.get('hostname'),
|
||||
config.get('port', 80))
|
||||
|
||||
element.set_property('proxy', proxy)
|
||||
element.set_property('proxy-id', config.get('username'))
|
||||
element.set_property('proxy-pw', config.get('password'))
|
||||
|
||||
|
||||
# TODO: create a player class which replaces the actors internals
|
||||
class Audio(pykka.ThreadingActor):
|
||||
"""
|
||||
Audio output through `GStreamer <http://gstreamer.freedesktop.org/>`_.
|
||||
@ -54,23 +264,25 @@ class Audio(pykka.ThreadingActor):
|
||||
#: The GStreamer state mapped to :class:`mopidy.audio.PlaybackState`
|
||||
state = PlaybackState.STOPPED
|
||||
|
||||
#: The software mixing interface :class:`mopidy.audio.actor.SoftwareMixer`
|
||||
mixer = None
|
||||
|
||||
def __init__(self, config, mixer):
|
||||
super(Audio, self).__init__()
|
||||
|
||||
self._config = config
|
||||
self._mixer = mixer
|
||||
self._target_state = gst.STATE_NULL
|
||||
self._buffering = False
|
||||
|
||||
self._playbin = None
|
||||
self._signal_ids = {} # {(element, event): signal_id}
|
||||
self._outputs = None
|
||||
self._about_to_finish_callback = None
|
||||
|
||||
self._appsrc = None
|
||||
self._appsrc_caps = None
|
||||
self._appsrc_need_data_callback = None
|
||||
self._appsrc_enough_data_callback = None
|
||||
self._appsrc_seek_data_callback = None
|
||||
self._appsrc = _Appsrc()
|
||||
self._signals = _Signals()
|
||||
|
||||
if mixer and self._config['audio']['mixer'] == 'software':
|
||||
self.mixer = SoftwareMixer(mixer)
|
||||
|
||||
def on_start(self):
|
||||
try:
|
||||
@ -89,18 +301,8 @@ class Audio(pykka.ThreadingActor):
|
||||
self._teardown_mixer()
|
||||
self._teardown_playbin()
|
||||
|
||||
# TODO: split out signal tracking helper class.
|
||||
def _connect(self, element, event, *args):
|
||||
"""Helper to keep track of signal ids based on element+event"""
|
||||
self._signal_ids[(element, event)] = element.connect(event, *args)
|
||||
|
||||
def _disconnect(self, element, event):
|
||||
"""Helper to disconnect signals created with _connect helper."""
|
||||
signal_id = self._signal_ids.pop((element, event), None)
|
||||
if signal_id is not None:
|
||||
element.disconnect(signal_id)
|
||||
|
||||
def _setup_preferences(self):
|
||||
# TODO: move out of audio actor?
|
||||
# Fix for https://github.com/mopidy/mopidy/issues/604
|
||||
registry = gst.registry_get_default()
|
||||
jacksink = registry.find_feature(
|
||||
@ -112,144 +314,56 @@ class Audio(pykka.ThreadingActor):
|
||||
playbin = gst.element_factory_make('playbin2')
|
||||
playbin.set_property('flags', PLAYBIN_FLAGS)
|
||||
|
||||
# TODO: turn into config values...
|
||||
playbin.set_property('buffer-size', 2*1024*1024)
|
||||
playbin.set_property('buffer-duration', 2*gst.SECOND)
|
||||
|
||||
self._connect(playbin, 'about-to-finish', self._on_about_to_finish)
|
||||
self._connect(playbin, 'notify::source', self._on_new_source)
|
||||
self._connect(playbin, 'source-setup', self._on_source_setup)
|
||||
self._signals.connect(playbin, 'source-setup', self._on_source_setup)
|
||||
self._signals.connect(
|
||||
playbin, 'about-to-finish', self._on_about_to_finish)
|
||||
|
||||
self._playbin = playbin
|
||||
|
||||
def _on_about_to_finish(self, element):
|
||||
source, self._appsrc = self._appsrc, None
|
||||
if source is not None:
|
||||
self._appsrc_caps = None
|
||||
self._disconnect(source, 'need-data')
|
||||
self._disconnect(source, 'enough-data')
|
||||
self._disconnect(source, 'seek-data')
|
||||
|
||||
if self._about_to_finish_callback:
|
||||
logger.debug('Calling about to finish callback.')
|
||||
self._about_to_finish_callback()
|
||||
|
||||
def _on_new_source(self, element, pad):
|
||||
uri = element.get_property('uri')
|
||||
if not uri or not uri.startswith('appsrc://'):
|
||||
return
|
||||
|
||||
source = element.get_property('source')
|
||||
source.set_property('caps', self._appsrc_caps)
|
||||
source.set_property('format', b'time')
|
||||
source.set_property('stream-type', b'seekable')
|
||||
source.set_property('max-bytes', 1 * MB)
|
||||
source.set_property('min-percent', 50)
|
||||
|
||||
self._connect(source, 'need-data', self._appsrc_on_need_data)
|
||||
self._connect(source, 'enough-data', self._appsrc_on_enough_data)
|
||||
self._connect(source, 'seek-data', self._appsrc_on_seek_data)
|
||||
|
||||
self._appsrc = source
|
||||
|
||||
def _on_source_setup(self, element, source):
|
||||
scheme = 'http'
|
||||
hostname = self._config['proxy']['hostname']
|
||||
port = 80
|
||||
|
||||
if hasattr(source.props, 'proxy') and hostname:
|
||||
if self._config['proxy']['port']:
|
||||
port = self._config['proxy']['port']
|
||||
if self._config['proxy']['scheme']:
|
||||
scheme = self._config['proxy']['scheme']
|
||||
|
||||
proxy = "%s://%s:%d" % (scheme, hostname, port)
|
||||
source.set_property('proxy', proxy)
|
||||
source.set_property('proxy-id', self._config['proxy']['username'])
|
||||
source.set_property('proxy-pw', self._config['proxy']['password'])
|
||||
|
||||
def _appsrc_on_need_data(self, appsrc, gst_length_hint):
|
||||
length_hint = utils.clocktime_to_millisecond(gst_length_hint)
|
||||
if self._appsrc_need_data_callback is not None:
|
||||
self._appsrc_need_data_callback(length_hint)
|
||||
return True
|
||||
|
||||
def _appsrc_on_enough_data(self, appsrc):
|
||||
if self._appsrc_enough_data_callback is not None:
|
||||
self._appsrc_enough_data_callback()
|
||||
return True
|
||||
|
||||
def _appsrc_on_seek_data(self, appsrc, gst_position):
|
||||
position = utils.clocktime_to_millisecond(gst_position)
|
||||
if self._appsrc_seek_data_callback is not None:
|
||||
self._appsrc_seek_data_callback(position)
|
||||
return True
|
||||
|
||||
def _teardown_playbin(self):
|
||||
self._disconnect(self._playbin, 'about-to-finish')
|
||||
self._disconnect(self._playbin, 'notify::source')
|
||||
self._disconnect(self._playbin, 'source-setup')
|
||||
self._signals.disconnect(self._playbin, 'about-to-finish')
|
||||
self._signals.disconnect(self._playbin, 'source-setup')
|
||||
self._playbin.set_state(gst.STATE_NULL)
|
||||
|
||||
def _on_about_to_finish(self, element):
|
||||
gst_logger.debug('Got about-to-finish event.')
|
||||
if self._about_to_finish_callback:
|
||||
self._about_to_finish_callback()
|
||||
|
||||
def _on_source_setup(self, element, source):
|
||||
gst_logger.debug('Got source-setup: element=%s', source)
|
||||
|
||||
if source.get_factory().get_name() == 'appsrc':
|
||||
self._appsrc.configure(source)
|
||||
else:
|
||||
self._appsrc.reset()
|
||||
|
||||
if hasattr(source.props, 'proxy'):
|
||||
setup_proxy(source, self._config['proxy'])
|
||||
|
||||
def _setup_output(self):
|
||||
output_desc = self._config['audio']['output']
|
||||
self._outputs = _Outputs()
|
||||
self._outputs.get_pad('sink').add_event_probe(self._on_pad_event)
|
||||
try:
|
||||
user_output = gst.parse_bin_from_description(
|
||||
output_desc, ghost_unconnected_pads=True)
|
||||
except gobject.GError as ex:
|
||||
logger.error(
|
||||
'Failed to create audio output "%s": %s', output_desc, ex)
|
||||
process.exit_process()
|
||||
|
||||
output = gst.Bin('output')
|
||||
|
||||
# Queue element to buy us time between the about to finish event and
|
||||
# the actual switch, i.e. about to switch can block for longer thanks
|
||||
# to this queue.
|
||||
# TODO: make the min-max values a setting?
|
||||
queue = gst.element_factory_make('queue')
|
||||
queue.set_property('max-size-buffers', 0)
|
||||
queue.set_property('max-size-bytes', 0)
|
||||
queue.set_property('max-size-time', 5 * gst.SECOND)
|
||||
queue.set_property('min-threshold-time', 3 * gst.SECOND)
|
||||
|
||||
queue.get_pad('src').add_event_probe(self._on_pad_event)
|
||||
|
||||
output.add(user_output)
|
||||
output.add(queue)
|
||||
|
||||
queue.link(user_output)
|
||||
ghost_pad = gst.GhostPad('sink', queue.get_pad('sink'))
|
||||
output.add_pad(ghost_pad)
|
||||
|
||||
logger.info('Audio output set to "%s"', output_desc)
|
||||
self._playbin.set_property('audio-sink', output)
|
||||
self._outputs.add_output(self._config['audio']['output'])
|
||||
except exceptions.AudioException:
|
||||
process.exit_process() # TODO: move this up the chain
|
||||
self._playbin.set_property('audio-sink', self._outputs)
|
||||
|
||||
def _setup_mixer(self):
|
||||
if self._config['audio']['mixer'] != 'software':
|
||||
return
|
||||
self._mixer.audio = self.actor_ref.proxy()
|
||||
self._connect(self._playbin, 'notify::volume', self._on_mixer_change)
|
||||
self._connect(self._playbin, 'notify::mute', self._on_mixer_change)
|
||||
|
||||
# The Mopidy startup procedure will set the initial volume of a mixer,
|
||||
# but this happens before the audio actor is injected into the software
|
||||
# mixer and has no effect. Thus, we need to set the initial volume
|
||||
# again.
|
||||
initial_volume = self._config['audio']['mixer_volume']
|
||||
if initial_volume is not None:
|
||||
self._mixer.set_volume(initial_volume)
|
||||
|
||||
def _on_mixer_change(self, element, gparamspec):
|
||||
self._mixer.trigger_events_for_changed_values()
|
||||
if self.mixer:
|
||||
self.mixer.setup(self._playbin, self.actor_ref.proxy().mixer)
|
||||
|
||||
def _teardown_mixer(self):
|
||||
if self._config['audio']['mixer'] != 'software':
|
||||
return
|
||||
self._disconnect(self._playbin, 'notify::volume')
|
||||
self._disconnect(self._playbin, 'notify::mute')
|
||||
self._mixer.audio = None
|
||||
if self.mixer:
|
||||
self.mixer.teardown()
|
||||
|
||||
def _setup_visualizer(self):
|
||||
# TODO: kill
|
||||
visualizer_element = self._config['audio']['visualizer']
|
||||
if not visualizer_element:
|
||||
return
|
||||
@ -266,22 +380,26 @@ class Audio(pykka.ThreadingActor):
|
||||
def _setup_message_processor(self):
|
||||
bus = self._playbin.get_bus()
|
||||
bus.add_signal_watch()
|
||||
self._connect(bus, 'message', self._on_message)
|
||||
self._signals.connect(bus, 'message', self._on_message)
|
||||
|
||||
def _teardown_message_processor(self):
|
||||
bus = self._playbin.get_bus()
|
||||
self._disconnect(bus, 'message')
|
||||
self._signals.disconnect(bus, 'message')
|
||||
bus.remove_signal_watch()
|
||||
|
||||
def _on_pad_event(self, pad, event):
|
||||
if event.type == gst.EVENT_NEWSEGMENT:
|
||||
# update, rate, format, start, stop, position
|
||||
position = event.parse_new_segment()[5] // gst.MSECOND
|
||||
logger.debug('Triggering event: position_changed(position=%s)',
|
||||
position)
|
||||
AudioListener.send('position_changed', position=position)
|
||||
update, rate, format_, start, stop, pos = event.parse_new_segment()
|
||||
gst_logger.debug('Got new-segment event: update=%s rate=%s '
|
||||
'format=%s start=%s stop=%s position=%s', update,
|
||||
rate, format_.value_name, start, stop, pos)
|
||||
pos_ms = pos // gst.MSECOND
|
||||
logger.debug('Triggering: position_changed(position=%s)', pos_ms)
|
||||
AudioListener.send('position_changed', position=pos_ms)
|
||||
|
||||
return True
|
||||
|
||||
# TODO: consider splitting this out while we are at it.
|
||||
def _on_message(self, bus, msg):
|
||||
if msg.type == gst.MESSAGE_STATE_CHANGED and msg.src == self._playbin:
|
||||
self._on_playbin_state_changed(*msg.parse_state_changed())
|
||||
@ -293,11 +411,19 @@ class Audio(pykka.ThreadingActor):
|
||||
self._on_error(*msg.parse_error())
|
||||
elif msg.type == gst.MESSAGE_WARNING:
|
||||
self._on_warning(*msg.parse_warning())
|
||||
elif msg.type == gst.MESSAGE_ASYNC_DONE:
|
||||
gst_logger.debug('Got async-done message.')
|
||||
elif msg.type == gst.MESSAGE_ELEMENT:
|
||||
if msg.structure.has_name('playbin2-stream-changed'):
|
||||
self._on_stream_changed(msg.structure['uri'])
|
||||
elif gst.pbutils.is_missing_plugin_message(msg):
|
||||
self._on_missing_plugin(msg)
|
||||
|
||||
def _on_playbin_state_changed(self, old_state, new_state, pending_state):
|
||||
gst_logger.debug('Got state-changed message: old=%s new=%s pending=%s',
|
||||
old_state.value_name, new_state.value_name,
|
||||
pending_state.value_name)
|
||||
|
||||
if new_state == gst.STATE_READY and pending_state == gst.STATE_NULL:
|
||||
# XXX: We're not called on the last state change when going down to
|
||||
# NULL, so we rewrite the second to last call to get the expected
|
||||
@ -318,15 +444,17 @@ class Audio(pykka.ThreadingActor):
|
||||
if target_state == new_state:
|
||||
target_state = None
|
||||
|
||||
logger.debug(
|
||||
'Triggering event: state_changed(old_state=%s, new_state=%s, '
|
||||
'target_state=%s)', old_state, new_state, target_state)
|
||||
logger.debug('Triggering: state_changed(old_state=%s, new_state=%s, '
|
||||
'target_state=%s)', old_state, new_state, target_state)
|
||||
AudioListener.send('state_changed', old_state=old_state,
|
||||
new_state=new_state, target_state=target_state)
|
||||
if new_state == PlaybackState.STOPPED:
|
||||
logger.debug('Triggering: stream_changed(uri=None)')
|
||||
AudioListener.send('stream_changed', uri=None)
|
||||
|
||||
def _on_buffering(self, percent):
|
||||
gst_logger.debug('Got buffering message: percent=%d%%', percent)
|
||||
|
||||
if percent < 10 and not self._buffering:
|
||||
self._playbin.set_state(gst.STATE_PAUSED)
|
||||
self._buffering = True
|
||||
@ -335,27 +463,41 @@ class Audio(pykka.ThreadingActor):
|
||||
if self._target_state == gst.STATE_PLAYING:
|
||||
self._playbin.set_state(gst.STATE_PLAYING)
|
||||
|
||||
logger.debug('Buffer %d%% full', percent)
|
||||
|
||||
def _on_end_of_stream(self):
|
||||
logger.debug('Audio event: reached_end_of_stream')
|
||||
gst_logger.debug('Got end-of-stream message.')
|
||||
logger.debug('Triggering: reached_end_of_stream()')
|
||||
AudioListener.send('reached_end_of_stream')
|
||||
|
||||
def _on_error(self, error, debug):
|
||||
logger.error(
|
||||
'%s Debug message: %s',
|
||||
str(error).decode('utf-8'), debug.decode('utf-8') or 'None')
|
||||
gst_logger.error(str(error).decode('utf-8'))
|
||||
if debug:
|
||||
gst_logger.debug(debug.decode('utf-8'))
|
||||
# TODO: is this needed?
|
||||
self.stop_playback()
|
||||
|
||||
def _on_warning(self, error, debug):
|
||||
logger.warning(
|
||||
'%s Debug message: %s',
|
||||
str(error).decode('utf-8'), debug.decode('utf-8') or 'None')
|
||||
gst_logger.warning(str(error).decode('utf-8'))
|
||||
if debug:
|
||||
gst_logger.debug(debug.decode('utf-8'))
|
||||
|
||||
def _on_stream_changed(self, uri):
|
||||
logger.debug('Triggering event: stream_changed(uri=%s)', uri)
|
||||
gst_logger.debug('Got stream-changed message: uri:%s', uri)
|
||||
logger.debug('Triggering: stream_changed(uri=%s)', uri)
|
||||
AudioListener.send('stream_changed', uri=uri)
|
||||
|
||||
def _on_missing_plugin(self, msg):
|
||||
desc = gst.pbutils.missing_plugin_message_get_description(msg)
|
||||
debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg)
|
||||
|
||||
gst_logger.debug('Got missing-plugin message: description:%s', desc)
|
||||
logger.warning('Could not find a %s to handle media.', desc)
|
||||
if gst.pbutils.install_plugins_supported():
|
||||
logger.info('You might be able to fix this by running: '
|
||||
'gst-installer "%s"', debug)
|
||||
# TODO: store the missing plugins installer info in a file so we can
|
||||
# can provide a 'mopidy install-missing-plugins' if the system has the
|
||||
# required helper installed?
|
||||
|
||||
def set_uri(self, uri):
|
||||
"""
|
||||
Set URI of audio to be played.
|
||||
@ -385,12 +527,8 @@ class Audio(pykka.ThreadingActor):
|
||||
to continue playback
|
||||
:type seek_data: callable which takes time position in ms
|
||||
"""
|
||||
if isinstance(caps, unicode):
|
||||
caps = caps.encode('utf-8')
|
||||
self._appsrc_caps = gst.Caps(caps)
|
||||
self._appsrc_need_data_callback = need_data
|
||||
self._appsrc_enough_data_callback = enough_data
|
||||
self._appsrc_seek_data_callback = seek_data
|
||||
self._appsrc.prepare(
|
||||
gst.Caps(bytes(caps)), need_data, enough_data, seek_data)
|
||||
self._playbin.set_property('uri', 'appsrc://')
|
||||
|
||||
def emit_data(self, buffer_):
|
||||
@ -405,9 +543,7 @@ class Audio(pykka.ThreadingActor):
|
||||
:type buffer_: :class:`gst.Buffer`
|
||||
:rtype: boolean
|
||||
"""
|
||||
if not self._appsrc:
|
||||
return False
|
||||
return self._appsrc.emit('push-buffer', buffer_) == gst.FLOW_OK
|
||||
return self._appsrc.push(buffer_)
|
||||
|
||||
def emit_end_of_stream(self):
|
||||
"""
|
||||
@ -417,8 +553,8 @@ class Audio(pykka.ThreadingActor):
|
||||
We will get a GStreamer message when the stream playback reaches the
|
||||
token, and can then do any end-of-stream related tasks.
|
||||
"""
|
||||
# TODO: replace this with emit_data(None)?
|
||||
self._playbin.get_property('source').emit('end-of-stream')
|
||||
self._appsrc.end_of_stream()
|
||||
gst_logger.debug('Sent appsrc end-of-stream event.')
|
||||
|
||||
def set_about_to_finish_callback(self, callback):
|
||||
"""
|
||||
@ -443,6 +579,8 @@ class Audio(pykka.ThreadingActor):
|
||||
gst_position = self._playbin.query_position(gst.FORMAT_TIME)[0]
|
||||
return utils.clocktime_to_millisecond(gst_position)
|
||||
except gst.QueryError:
|
||||
# TODO: take state into account for this and possibly also return
|
||||
# None as the unknown value instead of zero?
|
||||
logger.debug('Position query failed')
|
||||
return 0
|
||||
|
||||
@ -454,9 +592,12 @@ class Audio(pykka.ThreadingActor):
|
||||
:type position: int
|
||||
:rtype: :class:`True` if successful, else :class:`False`
|
||||
"""
|
||||
# TODO: double check seek flags in use.
|
||||
gst_position = utils.millisecond_to_clocktime(position)
|
||||
return self._playbin.seek_simple(
|
||||
result = self._playbin.seek_simple(
|
||||
gst.Format(gst.FORMAT_TIME), gst.SEEK_FLAG_FLUSH, gst_position)
|
||||
gst_logger.debug('Sent flushing seek: position=%s', gst_position)
|
||||
return result
|
||||
|
||||
def start_playback(self):
|
||||
"""
|
||||
@ -536,63 +677,15 @@ class Audio(pykka.ThreadingActor):
|
||||
"""
|
||||
self._target_state = state
|
||||
result = self._playbin.set_state(state)
|
||||
gst_logger.debug('State change to %s: result=%s', state.value_name,
|
||||
result.value_name)
|
||||
|
||||
if result == gst.STATE_CHANGE_FAILURE:
|
||||
logger.warning(
|
||||
'Setting GStreamer state to %s failed', state.value_name)
|
||||
return False
|
||||
elif result == gst.STATE_CHANGE_ASYNC:
|
||||
logger.debug(
|
||||
'Setting GStreamer state to %s is async', state.value_name)
|
||||
return True
|
||||
else:
|
||||
logger.debug(
|
||||
'Setting GStreamer state to %s is OK', state.value_name)
|
||||
return True
|
||||
|
||||
def get_volume(self):
|
||||
"""
|
||||
Get volume level of the software mixer.
|
||||
|
||||
Example values:
|
||||
|
||||
0:
|
||||
Minimum volume.
|
||||
100:
|
||||
Maximum volume.
|
||||
|
||||
:rtype: int in range [0..100]
|
||||
"""
|
||||
return int(round(self._playbin.get_property('volume') * 100))
|
||||
|
||||
def set_volume(self, volume):
|
||||
"""
|
||||
Set volume level of the software mixer.
|
||||
|
||||
:param volume: the volume in the range [0..100]
|
||||
:type volume: int
|
||||
:rtype: :class:`True` if successful, else :class:`False`
|
||||
"""
|
||||
self._playbin.set_property('volume', volume / 100.0)
|
||||
return True
|
||||
|
||||
def get_mute(self):
|
||||
"""
|
||||
Get mute status of the software mixer.
|
||||
|
||||
:rtype: :class:`True` if muted, :class:`False` if unmuted,
|
||||
:class:`None` if no mixer is installed.
|
||||
"""
|
||||
return self._playbin.get_property('mute')
|
||||
|
||||
def set_mute(self, mute):
|
||||
"""
|
||||
Mute or unmute of the software mixer.
|
||||
|
||||
:param mute: Whether to mute the mixer or not.
|
||||
:type mute: bool
|
||||
:rtype: :class:`True` if successful, else :class:`False`
|
||||
"""
|
||||
self._playbin.set_property('mute', bool(mute))
|
||||
# TODO: at this point we could already emit stopped event instead
|
||||
# of faking it in the message handling when result=OK
|
||||
return True
|
||||
|
||||
def set_metadata(self, track):
|
||||
@ -626,3 +719,4 @@ class Audio(pykka.ThreadingActor):
|
||||
|
||||
event = gst.event_new_tag(taglist)
|
||||
self._playbin.send_event(event)
|
||||
gst_logger.debug('Sent tag event: track=%s', track.uri)
|
||||
|
||||
@ -34,3 +34,7 @@ class MixerError(MopidyException):
|
||||
|
||||
class ScannerError(MopidyException):
|
||||
pass
|
||||
|
||||
|
||||
class AudioException(MopidyException):
|
||||
pass
|
||||
|
||||
@ -17,40 +17,48 @@ class SoftwareMixer(pykka.ThreadingActor, mixer.Mixer):
|
||||
def __init__(self, config):
|
||||
super(SoftwareMixer, self).__init__(config)
|
||||
|
||||
self.audio = None
|
||||
self._last_volume = None
|
||||
self._last_mute = None
|
||||
self._audio_mixer = None
|
||||
self._initial_volume = None
|
||||
self._initial_mute = None
|
||||
|
||||
# TODO: shouldn't this be logged by thing that choose us?
|
||||
logger.info('Mixing using GStreamer software mixing')
|
||||
|
||||
def setup(self, mixer_ref):
|
||||
self._audio_mixer = mixer_ref
|
||||
|
||||
# The Mopidy startup procedure will set the initial volume of a
|
||||
# mixer, but this happens before the audio actor is injected into the
|
||||
# software mixer and has no effect. Thus, we need to set the initial
|
||||
# volume again.
|
||||
if self._initial_volume is not None:
|
||||
self.set_volume(self._initial_volume)
|
||||
if self._initial_mute is not None:
|
||||
self.set_mute(self._initial_mute)
|
||||
|
||||
def teardown(self):
|
||||
self._audio_mixer = None
|
||||
|
||||
def get_volume(self):
|
||||
if self.audio is None:
|
||||
if self._audio_mixer is None:
|
||||
return None
|
||||
return self.audio.get_volume().get()
|
||||
return self._audio_mixer.get_volume().get()
|
||||
|
||||
def set_volume(self, volume):
|
||||
if self.audio is None:
|
||||
if self._audio_mixer is None:
|
||||
self._initial_volume = volume
|
||||
return False
|
||||
self.audio.set_volume(volume)
|
||||
self._audio_mixer.set_volume(volume)
|
||||
return True
|
||||
|
||||
def get_mute(self):
|
||||
if self.audio is None:
|
||||
if self._audio_mixer is None:
|
||||
return None
|
||||
return self.audio.get_mute().get()
|
||||
return self._audio_mixer.get_mute().get()
|
||||
|
||||
def set_mute(self, mute):
|
||||
if self.audio is None:
|
||||
if self._audio_mixer is None:
|
||||
self._initial_mute = mute
|
||||
return False
|
||||
self.audio.set_mute(mute)
|
||||
self._audio_mixer.set_mute(mute)
|
||||
return True
|
||||
|
||||
def trigger_events_for_changed_values(self):
|
||||
old_volume, self._last_volume = self._last_volume, self.get_volume()
|
||||
old_mute, self._last_mute = self._last_mute, self.get_mute()
|
||||
|
||||
if old_volume != self._last_volume:
|
||||
self.trigger_volume_changed(self._last_volume)
|
||||
|
||||
if old_mute != self._last_mute:
|
||||
self.trigger_mute_changed(self._last_mute)
|
||||
|
||||
@ -31,3 +31,7 @@ class ExceptionsTest(unittest.TestCase):
|
||||
def test_scanner_error_is_a_mopidy_exception(self):
|
||||
self.assert_(issubclass(
|
||||
exceptions.ScannerError, exceptions.MopidyException))
|
||||
|
||||
def test_audio_error_is_a_mopidy_exception(self):
|
||||
self.assert_(issubclass(
|
||||
exceptions.AudioException, exceptions.MopidyException))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user