diff --git a/mopidy/backends/base/playback.py b/mopidy/backends/base/playback.py index d1acc05a..933424ad 100644 --- a/mopidy/backends/base/playback.py +++ b/mopidy/backends/base/playback.py @@ -440,7 +440,10 @@ class BasePlaybackController(object): :param time_position: time position in milliseconds :type time_position: int + :rtype: :class:`True` if successful, else :class:`False` """ + # FIXME I think return value is only really useful for internal + # testing, as such it should probably not be exposed in API. if self.state == self.STOPPED: self.play() elif self.state == self.PAUSED: @@ -455,7 +458,7 @@ class BasePlaybackController(object): self._play_time_started = self._current_wall_time self._play_time_accumulated = time_position - self._seek(time_position) + return self._seek(time_position) def _seek(self, time_position): """ diff --git a/mopidy/backends/local/__init__.py b/mopidy/backends/local/__init__.py index 45e74e5d..e9e86f34 100644 --- a/mopidy/backends/local/__init__.py +++ b/mopidy/backends/local/__init__.py @@ -1,34 +1,18 @@ -import gobject -gobject.threads_init() -# FIXME make sure we don't get hit by -# http://jameswestby.net/ -# weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html - -import pygst -pygst.require('0.10') - -import gst -import logging -import os import glob +import logging +import multiprocessing +import os import shutil -import threading from mopidy import settings from mopidy.backends.base import * from mopidy.models import Playlist, Track, Album +from mopidy.utils.process import pickle_connection + from .translator import parse_m3u, parse_mpd_tag_cache logger = logging.getLogger(u'mopidy.backends.local') -class LocalMessages(threading.Thread): - def run(self): - gobject.MainLoop().run() - -message_thread = LocalMessages() -message_thread.daemon = True -message_thread.start() - class LocalBackend(BaseBackend): """ A backend for playing music from a local music archive. @@ -49,71 +33,40 @@ class LocalBackend(BaseBackend): class LocalPlaybackController(BasePlaybackController): def __init__(self, backend): super(LocalPlaybackController, self).__init__(backend) - - self._bin = gst.element_factory_make("playbin", "player") - self._bus = self._bin.get_bus() - sink = gst.element_factory_make("fakesink", "fakesink") - - # FIXME cleanup fakesink? - - self._bin.set_property("video-sink", sink) - self._bus.add_signal_watch() - self._bus_id = self._bus.connect('message', self._message) - self.stop() - def _set_state(self, state): - self._bin.set_state(state) - (_, new, _) = self._bin.get_state() - return new == state + def _send_recv(self, message): + (my_end, other_end) = multiprocessing.Pipe() + message.update({'reply_to': pickle_connection(other_end)}) + self.backend.output_queue.put(message) + my_end.poll(None) + return my_end.recv() - def _message(self, bus, message): - if message.type == gst.MESSAGE_EOS: - self.on_end_of_track() - elif message.type == gst.MESSAGE_ERROR: - self._bin.set_state(gst.STATE_NULL) - error, debug = message.parse_error() - logger.error('%s %s', error, debug) + def _send(self, message): + self.backend.output_queue.put(message) + + def _set_state(self, state): + return self._send_recv({'command': 'set_state', 'state': state}) def _play(self, track): - self._bin.set_state(gst.STATE_READY) - self._bin.set_property('uri', track.uri) - return self._set_state(gst.STATE_PLAYING) + return self._send_recv({'command': 'play_uri', 'uri': track.uri}) def _stop(self): - return self._set_state(gst.STATE_READY) + return self._set_state('READY') def _pause(self): - return self._set_state(gst.STATE_PAUSED) + return self._set_state('PAUSED') def _resume(self): - return self._set_state(gst.STATE_PLAYING) + return self._set_state('PLAYING') def _seek(self, time_position): - self._bin.seek_simple(gst.Format(gst.FORMAT_TIME), - gst.SEEK_FLAG_FLUSH, time_position * gst.MSECOND) - self._set_state(gst.STATE_PLAYING) + return self._send_recv({'command': 'set_position', + 'position': time_position}) @property def time_position(self): - try: - return self._bin.query_position(gst.FORMAT_TIME)[0] // gst.MSECOND - except gst.QueryError, e: - logger.error('time_position failed: %s', e) - return 0 - - def destroy(self): - playbin, self._bin = self._bin, None - bus, self._bus = self._bus, None - - bus.disconnect(self._bus_id) - bus.remove_signal_watch() - playbin.get_state() - playbin.set_state(gst.STATE_NULL) - bus.set_flushing(True) - - del bus - del playbin + return self._send_recv({'command': 'get_position'}) class LocalStoredPlaylistsController(BaseStoredPlaylistsController): diff --git a/mopidy/outputs/gstreamer.py b/mopidy/outputs/gstreamer.py index a1544f87..453747d6 100644 --- a/mopidy/outputs/gstreamer.py +++ b/mopidy/outputs/gstreamer.py @@ -8,6 +8,7 @@ import gst import logging import threading +from mopidy import settings from mopidy.utils.process import BaseProcess, unpickle_connection logger = logging.getLogger('mopidy.outputs.gstreamer') @@ -42,22 +43,11 @@ class GStreamerProcess(BaseProcess): http://jameswestby.net/weblog/tech/14-caution-python-multiprocessing-and-glib-dont-mix.html. """ - pipeline_description = ' ! '.join([ - 'appsrc name=src', - 'volume name=volume', - 'autoaudiosink name=sink', - ]) - def __init__(self, core_queue, output_queue): super(GStreamerProcess, self).__init__(name='GStreamerProcess') self.core_queue = core_queue self.output_queue = output_queue self.gst_pipeline = None - self.gst_bus = None - self.gst_bus_id = None - self.gst_uri_bin = None - self.gst_data_src = None - self.gst_volume = None def run_inside_try(self): self.setup() @@ -73,16 +63,30 @@ class GStreamerProcess(BaseProcess): messages_thread.daemon = True messages_thread.start() - self.gst_pipeline = gst.parse_launch(self.pipeline_description) - self.gst_data_src = self.gst_pipeline.get_by_name('src') - #self.gst_uri_bin = self.gst_pipeline.get_by_name('uri') - self.gst_volume = self.gst_pipeline.get_by_name('volume') + self.gst_pipeline = gst.parse_launch(' ! '.join([ + 'audioconvert name=convert', + 'volume name=volume', + settings.GSTREAMER_AUDIO_SINK, + ])) + + pad = self.gst_pipeline.get_by_name('convert').get_pad('sink') + + if settings.BACKENDS[0] == 'mopidy.backends.local.LocalBackend': + uri_bin = gst.element_factory_make('uridecodebin', 'uri') + uri_bin.connect('pad-added', self.process_new_pad, pad) + self.gst_pipeline.add(uri_bin) + else: + app_src = gst.element_factory_make('appsrc', 'src') + self.gst_pipeline.add(app_src) + app_src.get_pad('src').link(pad) # Setup bus and message processor - self.gst_bus = self.gst_pipeline.get_bus() - self.gst_bus.add_signal_watch() - self.gst_bus_id = self.gst_bus.connect('message', - self.process_gst_message) + gst_bus = self.gst_pipeline.get_bus() + gst_bus.add_signal_watch() + gst_bus.connect('message', self.process_gst_message) + + def process_new_pad(self, source, pad, target_pad): + pad.link(target_pad) def process_mopidy_message(self, message): """Process messages from the rest of Mopidy.""" @@ -104,6 +108,14 @@ class GStreamerProcess(BaseProcess): connection.send(volume) elif message['command'] == 'set_volume': self.set_volume(message['volume']) + elif message['command'] == 'set_position': + response = self.set_position(message['position']) + connection = unpickle_connection(message['reply_to']) + connection.send(response) + elif message['command'] == 'get_position': + response = self.get_position() + connection = unpickle_connection(message['reply_to']) + connection.send(response) else: logger.warning(u'Cannot handle message: %s', message) @@ -123,16 +135,17 @@ class GStreamerProcess(BaseProcess): def play_uri(self, uri): """Play audio at URI""" self.set_state('READY') - self.gst_uri_bin.set_property('uri', uri) + self.gst_pipeline.get_by_name('uri').set_property('uri', uri) return self.set_state('PLAYING') def deliver_data(self, caps_string, data): """Deliver audio data to be played""" + data_src = self.gst_pipeline.get_by_name('src') caps = gst.caps_from_string(caps_string) buffer_ = gst.Buffer(buffer(data)) buffer_.set_caps(caps) - self.gst_data_src.set_property('caps', caps) - self.gst_data_src.emit('push-buffer', buffer_) + data_src.set_property('caps', caps) + data_src.emit('push-buffer', buffer_) def end_of_data_stream(self): """ @@ -141,7 +154,7 @@ class GStreamerProcess(BaseProcess): We will get a GStreamer message when the stream playback reaches the token, and can then do any end-of-stream related tasks. """ - self.gst_data_src.emit('end-of-stream') + self.gst_pipeline.get_by_name('src').emit('end-of-stream') def set_state(self, state_name): """ @@ -171,10 +184,25 @@ class GStreamerProcess(BaseProcess): def get_volume(self): """Get volume in range [0..100]""" - gst_volume = self.gst_volume.get_property('volume') - return int(gst_volume * 100) + gst_volume = self.gst_pipeline.get_by_name('volume') + return int(gst_volume.get_property('volume') * 100) def set_volume(self, volume): """Set volume in range [0..100]""" - gst_volume = volume / 100.0 - self.gst_volume.set_property('volume', gst_volume) + gst_volume = self.gst_pipeline.get_by_name('volume') + gst_volume.set_property('volume', volume / 100.0) + + def set_position(self, position): + self.gst_pipeline.get_state() # block until state changes are done + handeled = self.gst_pipeline.seek_simple(gst.Format(gst.FORMAT_TIME), + gst.SEEK_FLAG_FLUSH, position * gst.MSECOND) + self.gst_pipeline.get_state() # block until seek is done + return handeled + + def get_position(self): + try: + position = self.gst_pipeline.query_position(gst.FORMAT_TIME)[0] + return position // gst.MSECOND + except gst.QueryError, e: + logger.error('time_position failed: %s', e) + return 0 diff --git a/mopidy/settings.py b/mopidy/settings.py index 8a0ae862..699eb16a 100644 --- a/mopidy/settings.py +++ b/mopidy/settings.py @@ -51,6 +51,13 @@ DUMP_LOG_FILENAME = u'dump.log' #: Currently only the first frontend in the list is used. FRONTENDS = (u'mopidy.frontends.mpd.MpdFrontend',) +#: Which GStreamer audio sink to use in :mod:`mopidy.outputs.gstreamer`. +#: +#: Default:: +#: +#: GSTREAMER_AUDIO_SINK = u'autoaudiosink' +GSTREAMER_AUDIO_SINK = u'autoaudiosink' + #: Path to folder with local music. #: #: Used by :mod:`mopidy.backends.local`. diff --git a/tests/__init__.py b/tests/__init__.py index b08afb01..c8618f3f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -9,6 +9,11 @@ except ImportError: class SkipTest(Exception): pass +from mopidy import settings + +# Nuke any local settings to ensure same test env all over +settings.local.clear() + def data_folder(name): folder = os.path.dirname(__file__) folder = os.path.join(folder, 'data') diff --git a/tests/backends/base.py b/tests/backends/base.py index 733c63cc..e9b78453 100644 --- a/tests/backends/base.py +++ b/tests/backends/base.py @@ -1,13 +1,14 @@ +import multiprocessing import os import random import shutil import tempfile -import threading import time from mopidy import settings from mopidy.mixers.dummy import DummyMixer from mopidy.models import Playlist, Track, Album, Artist +from mopidy.utils import get_class from tests import SkipTest, data_folder @@ -32,7 +33,10 @@ class BaseCurrentPlaylistControllerTest(object): backend_class = None def setUp(self): - self.backend = self.backend_class(mixer_class=DummyMixer) + self.output_queue = multiprocessing.Queue() + self.core_queue = multiprocessing.Queue() + self.output = get_class(settings.OUTPUT)(self.core_queue, self.output_queue) + self.backend = self.backend_class(self.core_queue, self.output_queue, DummyMixer) self.controller = self.backend.current_playlist self.playback = self.backend.playback @@ -40,6 +44,7 @@ class BaseCurrentPlaylistControllerTest(object): def tearDown(self): self.backend.destroy() + self.output.destroy() def test_add(self): for track in self.tracks: @@ -275,7 +280,10 @@ class BasePlaybackControllerTest(object): backend_class = None def setUp(self): - self.backend = self.backend_class(mixer_class=DummyMixer) + self.output_queue = multiprocessing.Queue() + self.core_queue = multiprocessing.Queue() + self.output = get_class(settings.OUTPUT)(self.core_queue, self.output_queue) + self.backend = self.backend_class(self.core_queue, self.output_queue, DummyMixer) self.playback = self.backend.playback self.current_playlist = self.backend.current_playlist @@ -286,6 +294,7 @@ class BasePlaybackControllerTest(object): def tearDown(self): self.backend.destroy() + self.output.destroy() def test_initial_state_is_stopped(self): self.assertEqual(self.playback.state, self.playback.STOPPED) @@ -335,6 +344,17 @@ class BasePlaybackControllerTest(object): self.assertEqual(self.playback.state, self.playback.PLAYING) self.assertEqual(track, self.playback.current_track) + @populate_playlist + def test_play_when_pause_after_next(self): + self.playback.play() + self.playback.next() + self.playback.next() + track = self.playback.current_track + self.playback.pause() + self.playback.play() + self.assertEqual(self.playback.state, self.playback.PLAYING) + self.assertEqual(track, self.playback.current_track) + @populate_playlist def test_play_sets_current_track(self): self.playback.play() @@ -772,23 +792,12 @@ class BasePlaybackControllerTest(object): self.assert_(wrapper.called) @populate_playlist - def test_on_end_of_track_gets_called(self): - on_end_of_track = self.playback.on_end_of_track - event = threading.Event() - - def wrapper(): - result = on_end_of_track() - event.set() - return result - - self.playback.on_end_of_track = wrapper - + def test_end_of_track_callback_gets_called(self): self.playback.play() - self.playback.seek(self.tracks[0].length - 10) - - event.wait(5) - - self.assert_(event.is_set()) + result = self.playback.seek(self.tracks[0].length - 10) + self.assert_(result, 'Seek failed') + message = self.core_queue.get() + self.assertEqual('end_of_track', message['command']) @populate_playlist def test_on_current_playlist_change_when_playing(self): @@ -871,11 +880,20 @@ class BasePlaybackControllerTest(object): @populate_playlist def test_seek_when_stopped(self): + result = self.playback.seek(1000) + self.assert_(result, 'Seek return value was %s' % result) + + @populate_playlist + def test_seek_when_stopped_updates_position(self): self.playback.seek(1000) position = self.playback.time_position self.assert_(position >= 990, position) def test_seek_on_empty_playlist(self): + result = self.playback.seek(0) + self.assert_(not result, 'Seek return value was %s' % result) + + def test_seek_on_empty_playlist_updates_position(self): self.playback.seek(0) self.assertEqual(self.playback.state, self.playback.STOPPED) @@ -886,6 +904,12 @@ class BasePlaybackControllerTest(object): @populate_playlist def test_seek_when_playing(self): + self.playback.play() + result = self.playback.seek(self.tracks[0].length - 1000) + self.assert_(result, 'Seek return value was %s' % result) + + @populate_playlist + def test_seek_when_playing_updates_position(self): length = self.backend.current_playlist.tracks[0].length self.playback.play() self.playback.seek(length - 1000) @@ -894,6 +918,13 @@ class BasePlaybackControllerTest(object): @populate_playlist def test_seek_when_paused(self): + self.playback.play() + self.playback.pause() + result = self.playback.seek(self.tracks[0].length - 1000) + self.assert_(result, 'Seek return value was %s' % result) + + @populate_playlist + def test_seek_when_paused_updates_position(self): length = self.backend.current_playlist.tracks[0].length self.playback.play() self.playback.pause() @@ -910,6 +941,13 @@ class BasePlaybackControllerTest(object): @populate_playlist def test_seek_beyond_end_of_song(self): + raise SkipTest # FIXME need to decide return value + self.playback.play() + result = self.playback.seek(self.tracks[0].length*100) + self.assert_(not result, 'Seek return value was %s' % result) + + @populate_playlist + def test_seek_beyond_end_of_song_jumps_to_next_song(self): self.playback.play() self.playback.seek(self.tracks[0].length*100) self.assertEqual(self.playback.current_track, self.tracks[1]) @@ -922,17 +960,19 @@ class BasePlaybackControllerTest(object): @populate_playlist def test_seek_beyond_start_of_song(self): + raise SkipTest # FIXME need to decide return value + self.playback.play() + result = self.playback.seek(-1000) + self.assert_(not result, 'Seek return value was %s' % result) + + @populate_playlist + def test_seek_beyond_start_of_song_update_postion(self): self.playback.play() self.playback.seek(-1000) position = self.playback.time_position self.assert_(position >= 0, position) self.assertEqual(self.playback.state, self.playback.PLAYING) - @populate_playlist - def test_seek_return_value(self): - self.playback.play() - self.assertEqual(self.playback.seek(0), None) - @populate_playlist def test_stop_when_stopped(self): self.playback.stop() diff --git a/tests/backends/local/backend_test.py b/tests/backends/local/backend_test.py index aff84658..b95c6dde 100644 --- a/tests/backends/local/backend_test.py +++ b/tests/backends/local/backend_test.py @@ -27,6 +27,15 @@ class LocalCurrentPlaylistControllerTest(BaseCurrentPlaylistControllerTest, backend_class = LocalBackend + def setUp(self): + self.original_backends = settings.BACKENDS + settings.BACKENDS = ('mopidy.backends.local.LocalBackend',) + super(LocalCurrentPlaylistControllerTest, self).setUp() + + def tearDown(self): + super(LocalCurrentPlaylistControllerTest, self).tearDown() + settings.BACKENDS = settings.original_backends + class LocalPlaybackControllerTest(BasePlaybackControllerTest, unittest.TestCase): @@ -35,10 +44,17 @@ class LocalPlaybackControllerTest(BasePlaybackControllerTest, backend_class = LocalBackend def setUp(self): + self.original_backends = settings.BACKENDS + settings.BACKENDS = ('mopidy.backends.local.LocalBackend',) + super(LocalPlaybackControllerTest, self).setUp() # Two tests does not work at all when using the fake sink #self.backend.playback.use_fake_sink() + def tearDown(self): + super(LocalPlaybackControllerTest, self).tearDown() + settings.BACKENDS = settings.original_backends + def add_track(self, path): uri = path_to_uri(data_folder(path)) track = Track(uri=uri, length=4464) diff --git a/tests/outputs/gstreamer_test.py b/tests/outputs/gstreamer_test.py index 5f681f23..52d1fbe1 100644 --- a/tests/outputs/gstreamer_test.py +++ b/tests/outputs/gstreamer_test.py @@ -1,6 +1,7 @@ import multiprocessing import unittest +from mopidy import settings from mopidy.outputs.gstreamer import GStreamerOutput from mopidy.utils.path import path_to_uri from mopidy.utils.process import pickle_connection @@ -9,6 +10,8 @@ from tests import data_folder, SkipTest class GStreamerOutputTest(unittest.TestCase): def setUp(self): + self.original_backends = settings.BACKENDS + settings.BACKENDS = ('mopidy.backends.local.LocalBackend',) self.song_uri = path_to_uri(data_folder('song1.wav')) self.output_queue = multiprocessing.Queue() self.core_queue = multiprocessing.Queue() @@ -16,6 +19,7 @@ class GStreamerOutputTest(unittest.TestCase): def tearDown(self): self.output.destroy() + settings.BACKENDS = settings.original_backends def send_recv(self, message): (my_end, other_end) = multiprocessing.Pipe() @@ -24,15 +28,14 @@ class GStreamerOutputTest(unittest.TestCase): my_end.poll(None) return my_end.recv() + def send(self, message): self.output_queue.put(message) - @SkipTest def test_play_uri_existing_file(self): message = {'command': 'play_uri', 'uri': self.song_uri} self.assertEqual(True, self.send_recv(message)) - @SkipTest def test_play_uri_non_existing_file(self): message = {'command': 'play_uri', 'uri': self.song_uri + 'bogus'} self.assertEqual(False, self.send_recv(message))