Merge branch 'gstreamer-local-backend' into develop

This commit is contained in:
Stein Magnus Jodal 2010-08-21 23:28:06 +02:00
commit 8358345c6c
8 changed files with 179 additions and 124 deletions

View File

@ -440,7 +440,10 @@ class BasePlaybackController(object):
:param time_position: time position in milliseconds
:type time_position: int
:rtype: :class:`True` if successful, else :class:`False`
"""
# FIXME I think return value is only really useful for internal
# testing, as such it should probably not be exposed in API.
if self.state == self.STOPPED:
self.play()
elif self.state == self.PAUSED:
@ -455,7 +458,7 @@ class BasePlaybackController(object):
self._play_time_started = self._current_wall_time
self._play_time_accumulated = time_position
self._seek(time_position)
return self._seek(time_position)
def _seek(self, time_position):
"""

View File

@ -1,34 +1,18 @@
import gobject
gobject.threads_init()
# FIXME make sure we don't get hit by
# http://jameswestby.net/
# weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html
import pygst
pygst.require('0.10')
import gst
import logging
import os
import glob
import logging
import multiprocessing
import os
import shutil
import threading
from mopidy import settings
from mopidy.backends.base import *
from mopidy.models import Playlist, Track, Album
from mopidy.utils.process import pickle_connection
from .translator import parse_m3u, parse_mpd_tag_cache
logger = logging.getLogger(u'mopidy.backends.local')
class LocalMessages(threading.Thread):
def run(self):
gobject.MainLoop().run()
message_thread = LocalMessages()
message_thread.daemon = True
message_thread.start()
class LocalBackend(BaseBackend):
"""
A backend for playing music from a local music archive.
@ -49,71 +33,40 @@ class LocalBackend(BaseBackend):
class LocalPlaybackController(BasePlaybackController):
def __init__(self, backend):
super(LocalPlaybackController, self).__init__(backend)
self._bin = gst.element_factory_make("playbin", "player")
self._bus = self._bin.get_bus()
sink = gst.element_factory_make("fakesink", "fakesink")
# FIXME cleanup fakesink?
self._bin.set_property("video-sink", sink)
self._bus.add_signal_watch()
self._bus_id = self._bus.connect('message', self._message)
self.stop()
def _set_state(self, state):
self._bin.set_state(state)
(_, new, _) = self._bin.get_state()
return new == state
def _send_recv(self, message):
(my_end, other_end) = multiprocessing.Pipe()
message.update({'reply_to': pickle_connection(other_end)})
self.backend.output_queue.put(message)
my_end.poll(None)
return my_end.recv()
def _message(self, bus, message):
if message.type == gst.MESSAGE_EOS:
self.on_end_of_track()
elif message.type == gst.MESSAGE_ERROR:
self._bin.set_state(gst.STATE_NULL)
error, debug = message.parse_error()
logger.error('%s %s', error, debug)
def _send(self, message):
self.backend.output_queue.put(message)
def _set_state(self, state):
return self._send_recv({'command': 'set_state', 'state': state})
def _play(self, track):
self._bin.set_state(gst.STATE_READY)
self._bin.set_property('uri', track.uri)
return self._set_state(gst.STATE_PLAYING)
return self._send_recv({'command': 'play_uri', 'uri': track.uri})
def _stop(self):
return self._set_state(gst.STATE_READY)
return self._set_state('READY')
def _pause(self):
return self._set_state(gst.STATE_PAUSED)
return self._set_state('PAUSED')
def _resume(self):
return self._set_state(gst.STATE_PLAYING)
return self._set_state('PLAYING')
def _seek(self, time_position):
self._bin.seek_simple(gst.Format(gst.FORMAT_TIME),
gst.SEEK_FLAG_FLUSH, time_position * gst.MSECOND)
self._set_state(gst.STATE_PLAYING)
return self._send_recv({'command': 'set_position',
'position': time_position})
@property
def time_position(self):
try:
return self._bin.query_position(gst.FORMAT_TIME)[0] // gst.MSECOND
except gst.QueryError, e:
logger.error('time_position failed: %s', e)
return 0
def destroy(self):
playbin, self._bin = self._bin, None
bus, self._bus = self._bus, None
bus.disconnect(self._bus_id)
bus.remove_signal_watch()
playbin.get_state()
playbin.set_state(gst.STATE_NULL)
bus.set_flushing(True)
del bus
del playbin
return self._send_recv({'command': 'get_position'})
class LocalStoredPlaylistsController(BaseStoredPlaylistsController):

View File

@ -8,6 +8,7 @@ import gst
import logging
import threading
from mopidy import settings
from mopidy.utils.process import BaseProcess, unpickle_connection
logger = logging.getLogger('mopidy.outputs.gstreamer')
@ -42,22 +43,11 @@ class GStreamerProcess(BaseProcess):
http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html.
"""
pipeline_description = ' ! '.join([
'appsrc name=src',
'volume name=volume',
'autoaudiosink name=sink',
])
def __init__(self, core_queue, output_queue):
super(GStreamerProcess, self).__init__(name='GStreamerProcess')
self.core_queue = core_queue
self.output_queue = output_queue
self.gst_pipeline = None
self.gst_bus = None
self.gst_bus_id = None
self.gst_uri_bin = None
self.gst_data_src = None
self.gst_volume = None
def run_inside_try(self):
self.setup()
@ -73,16 +63,30 @@ class GStreamerProcess(BaseProcess):
messages_thread.daemon = True
messages_thread.start()
self.gst_pipeline = gst.parse_launch(self.pipeline_description)
self.gst_data_src = self.gst_pipeline.get_by_name('src')
#self.gst_uri_bin = self.gst_pipeline.get_by_name('uri')
self.gst_volume = self.gst_pipeline.get_by_name('volume')
self.gst_pipeline = gst.parse_launch(' ! '.join([
'audioconvert name=convert',
'volume name=volume',
settings.GSTREAMER_AUDIO_SINK,
]))
pad = self.gst_pipeline.get_by_name('convert').get_pad('sink')
if settings.BACKENDS[0] == 'mopidy.backends.local.LocalBackend':
uri_bin = gst.element_factory_make('uridecodebin', 'uri')
uri_bin.connect('pad-added', self.process_new_pad, pad)
self.gst_pipeline.add(uri_bin)
else:
app_src = gst.element_factory_make('appsrc', 'src')
self.gst_pipeline.add(app_src)
app_src.get_pad('src').link(pad)
# Setup bus and message processor
self.gst_bus = self.gst_pipeline.get_bus()
self.gst_bus.add_signal_watch()
self.gst_bus_id = self.gst_bus.connect('message',
self.process_gst_message)
gst_bus = self.gst_pipeline.get_bus()
gst_bus.add_signal_watch()
gst_bus.connect('message', self.process_gst_message)
def process_new_pad(self, source, pad, target_pad):
pad.link(target_pad)
def process_mopidy_message(self, message):
"""Process messages from the rest of Mopidy."""
@ -104,6 +108,14 @@ class GStreamerProcess(BaseProcess):
connection.send(volume)
elif message['command'] == 'set_volume':
self.set_volume(message['volume'])
elif message['command'] == 'set_position':
response = self.set_position(message['position'])
connection = unpickle_connection(message['reply_to'])
connection.send(response)
elif message['command'] == 'get_position':
response = self.get_position()
connection = unpickle_connection(message['reply_to'])
connection.send(response)
else:
logger.warning(u'Cannot handle message: %s', message)
@ -123,16 +135,17 @@ class GStreamerProcess(BaseProcess):
def play_uri(self, uri):
"""Play audio at URI"""
self.set_state('READY')
self.gst_uri_bin.set_property('uri', uri)
self.gst_pipeline.get_by_name('uri').set_property('uri', uri)
return self.set_state('PLAYING')
def deliver_data(self, caps_string, data):
"""Deliver audio data to be played"""
data_src = self.gst_pipeline.get_by_name('src')
caps = gst.caps_from_string(caps_string)
buffer_ = gst.Buffer(buffer(data))
buffer_.set_caps(caps)
self.gst_data_src.set_property('caps', caps)
self.gst_data_src.emit('push-buffer', buffer_)
data_src.set_property('caps', caps)
data_src.emit('push-buffer', buffer_)
def end_of_data_stream(self):
"""
@ -141,7 +154,7 @@ class GStreamerProcess(BaseProcess):
We will get a GStreamer message when the stream playback reaches the
token, and can then do any end-of-stream related tasks.
"""
self.gst_data_src.emit('end-of-stream')
self.gst_pipeline.get_by_name('src').emit('end-of-stream')
def set_state(self, state_name):
"""
@ -171,10 +184,25 @@ class GStreamerProcess(BaseProcess):
def get_volume(self):
"""Get volume in range [0..100]"""
gst_volume = self.gst_volume.get_property('volume')
return int(gst_volume * 100)
gst_volume = self.gst_pipeline.get_by_name('volume')
return int(gst_volume.get_property('volume') * 100)
def set_volume(self, volume):
"""Set volume in range [0..100]"""
gst_volume = volume / 100.0
self.gst_volume.set_property('volume', gst_volume)
gst_volume = self.gst_pipeline.get_by_name('volume')
gst_volume.set_property('volume', volume / 100.0)
def set_position(self, position):
self.gst_pipeline.get_state() # block until state changes are done
handeled = self.gst_pipeline.seek_simple(gst.Format(gst.FORMAT_TIME),
gst.SEEK_FLAG_FLUSH, position * gst.MSECOND)
self.gst_pipeline.get_state() # block until seek is done
return handeled
def get_position(self):
try:
position = self.gst_pipeline.query_position(gst.FORMAT_TIME)[0]
return position // gst.MSECOND
except gst.QueryError, e:
logger.error('time_position failed: %s', e)
return 0

View File

@ -51,6 +51,13 @@ DUMP_LOG_FILENAME = u'dump.log'
#: Currently only the first frontend in the list is used.
FRONTENDS = (u'mopidy.frontends.mpd.MpdFrontend',)
#: Which GStreamer audio sink to use in :mod:`mopidy.outputs.gstreamer`.
#:
#: Default::
#:
#: GSTREAMER_AUDIO_SINK = u'autoaudiosink'
GSTREAMER_AUDIO_SINK = u'autoaudiosink'
#: Path to folder with local music.
#:
#: Used by :mod:`mopidy.backends.local`.

View File

@ -9,6 +9,11 @@ except ImportError:
class SkipTest(Exception):
pass
from mopidy import settings
# Nuke any local settings to ensure same test env all over
settings.local.clear()
def data_folder(name):
folder = os.path.dirname(__file__)
folder = os.path.join(folder, 'data')

View File

@ -1,13 +1,14 @@
import multiprocessing
import os
import random
import shutil
import tempfile
import threading
import time
from mopidy import settings
from mopidy.mixers.dummy import DummyMixer
from mopidy.models import Playlist, Track, Album, Artist
from mopidy.utils import get_class
from tests import SkipTest, data_folder
@ -32,7 +33,10 @@ class BaseCurrentPlaylistControllerTest(object):
backend_class = None
def setUp(self):
self.backend = self.backend_class(mixer_class=DummyMixer)
self.output_queue = multiprocessing.Queue()
self.core_queue = multiprocessing.Queue()
self.output = get_class(settings.OUTPUT)(self.core_queue, self.output_queue)
self.backend = self.backend_class(self.core_queue, self.output_queue, DummyMixer)
self.controller = self.backend.current_playlist
self.playback = self.backend.playback
@ -40,6 +44,7 @@ class BaseCurrentPlaylistControllerTest(object):
def tearDown(self):
self.backend.destroy()
self.output.destroy()
def test_add(self):
for track in self.tracks:
@ -275,7 +280,10 @@ class BasePlaybackControllerTest(object):
backend_class = None
def setUp(self):
self.backend = self.backend_class(mixer_class=DummyMixer)
self.output_queue = multiprocessing.Queue()
self.core_queue = multiprocessing.Queue()
self.output = get_class(settings.OUTPUT)(self.core_queue, self.output_queue)
self.backend = self.backend_class(self.core_queue, self.output_queue, DummyMixer)
self.playback = self.backend.playback
self.current_playlist = self.backend.current_playlist
@ -286,6 +294,7 @@ class BasePlaybackControllerTest(object):
def tearDown(self):
self.backend.destroy()
self.output.destroy()
def test_initial_state_is_stopped(self):
self.assertEqual(self.playback.state, self.playback.STOPPED)
@ -335,6 +344,17 @@ class BasePlaybackControllerTest(object):
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(track, self.playback.current_track)
@populate_playlist
def test_play_when_pause_after_next(self):
self.playback.play()
self.playback.next()
self.playback.next()
track = self.playback.current_track
self.playback.pause()
self.playback.play()
self.assertEqual(self.playback.state, self.playback.PLAYING)
self.assertEqual(track, self.playback.current_track)
@populate_playlist
def test_play_sets_current_track(self):
self.playback.play()
@ -772,23 +792,12 @@ class BasePlaybackControllerTest(object):
self.assert_(wrapper.called)
@populate_playlist
def test_on_end_of_track_gets_called(self):
on_end_of_track = self.playback.on_end_of_track
event = threading.Event()
def wrapper():
result = on_end_of_track()
event.set()
return result
self.playback.on_end_of_track = wrapper
def test_end_of_track_callback_gets_called(self):
self.playback.play()
self.playback.seek(self.tracks[0].length - 10)
event.wait(5)
self.assert_(event.is_set())
result = self.playback.seek(self.tracks[0].length - 10)
self.assert_(result, 'Seek failed')
message = self.core_queue.get()
self.assertEqual('end_of_track', message['command'])
@populate_playlist
def test_on_current_playlist_change_when_playing(self):
@ -871,11 +880,20 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_when_stopped(self):
result = self.playback.seek(1000)
self.assert_(result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_when_stopped_updates_position(self):
self.playback.seek(1000)
position = self.playback.time_position
self.assert_(position >= 990, position)
def test_seek_on_empty_playlist(self):
result = self.playback.seek(0)
self.assert_(not result, 'Seek return value was %s' % result)
def test_seek_on_empty_playlist_updates_position(self):
self.playback.seek(0)
self.assertEqual(self.playback.state, self.playback.STOPPED)
@ -886,6 +904,12 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_when_playing(self):
self.playback.play()
result = self.playback.seek(self.tracks[0].length - 1000)
self.assert_(result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_when_playing_updates_position(self):
length = self.backend.current_playlist.tracks[0].length
self.playback.play()
self.playback.seek(length - 1000)
@ -894,6 +918,13 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_when_paused(self):
self.playback.play()
self.playback.pause()
result = self.playback.seek(self.tracks[0].length - 1000)
self.assert_(result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_when_paused_updates_position(self):
length = self.backend.current_playlist.tracks[0].length
self.playback.play()
self.playback.pause()
@ -910,6 +941,13 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_beyond_end_of_song(self):
raise SkipTest # FIXME need to decide return value
self.playback.play()
result = self.playback.seek(self.tracks[0].length*100)
self.assert_(not result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_beyond_end_of_song_jumps_to_next_song(self):
self.playback.play()
self.playback.seek(self.tracks[0].length*100)
self.assertEqual(self.playback.current_track, self.tracks[1])
@ -922,17 +960,19 @@ class BasePlaybackControllerTest(object):
@populate_playlist
def test_seek_beyond_start_of_song(self):
raise SkipTest # FIXME need to decide return value
self.playback.play()
result = self.playback.seek(-1000)
self.assert_(not result, 'Seek return value was %s' % result)
@populate_playlist
def test_seek_beyond_start_of_song_update_postion(self):
self.playback.play()
self.playback.seek(-1000)
position = self.playback.time_position
self.assert_(position >= 0, position)
self.assertEqual(self.playback.state, self.playback.PLAYING)
@populate_playlist
def test_seek_return_value(self):
self.playback.play()
self.assertEqual(self.playback.seek(0), None)
@populate_playlist
def test_stop_when_stopped(self):
self.playback.stop()

View File

@ -27,6 +27,15 @@ class LocalCurrentPlaylistControllerTest(BaseCurrentPlaylistControllerTest,
backend_class = LocalBackend
def setUp(self):
self.original_backends = settings.BACKENDS
settings.BACKENDS = ('mopidy.backends.local.LocalBackend',)
super(LocalCurrentPlaylistControllerTest, self).setUp()
def tearDown(self):
super(LocalCurrentPlaylistControllerTest, self).tearDown()
settings.BACKENDS = settings.original_backends
class LocalPlaybackControllerTest(BasePlaybackControllerTest,
unittest.TestCase):
@ -35,10 +44,17 @@ class LocalPlaybackControllerTest(BasePlaybackControllerTest,
backend_class = LocalBackend
def setUp(self):
self.original_backends = settings.BACKENDS
settings.BACKENDS = ('mopidy.backends.local.LocalBackend',)
super(LocalPlaybackControllerTest, self).setUp()
# Two tests does not work at all when using the fake sink
#self.backend.playback.use_fake_sink()
def tearDown(self):
super(LocalPlaybackControllerTest, self).tearDown()
settings.BACKENDS = settings.original_backends
def add_track(self, path):
uri = path_to_uri(data_folder(path))
track = Track(uri=uri, length=4464)

View File

@ -1,6 +1,7 @@
import multiprocessing
import unittest
from mopidy import settings
from mopidy.outputs.gstreamer import GStreamerOutput
from mopidy.utils.path import path_to_uri
from mopidy.utils.process import pickle_connection
@ -9,6 +10,8 @@ from tests import data_folder, SkipTest
class GStreamerOutputTest(unittest.TestCase):
def setUp(self):
self.original_backends = settings.BACKENDS
settings.BACKENDS = ('mopidy.backends.local.LocalBackend',)
self.song_uri = path_to_uri(data_folder('song1.wav'))
self.output_queue = multiprocessing.Queue()
self.core_queue = multiprocessing.Queue()
@ -16,6 +19,7 @@ class GStreamerOutputTest(unittest.TestCase):
def tearDown(self):
self.output.destroy()
settings.BACKENDS = settings.original_backends
def send_recv(self, message):
(my_end, other_end) = multiprocessing.Pipe()
@ -24,15 +28,14 @@ class GStreamerOutputTest(unittest.TestCase):
my_end.poll(None)
return my_end.recv()
def send(self, message):
self.output_queue.put(message)
@SkipTest
def test_play_uri_existing_file(self):
message = {'command': 'play_uri', 'uri': self.song_uri}
self.assertEqual(True, self.send_recv(message))
@SkipTest
def test_play_uri_non_existing_file(self):
message = {'command': 'play_uri', 'uri': self.song_uri + 'bogus'}
self.assertEqual(False, self.send_recv(message))