import multiprocessing import unittest from tests import SkipTest # FIXME Our Windows build server does not support GStreamer yet import sys if sys.platform == 'win32': raise SkipTest from mopidy import settings from mopidy.outputs.gstreamer import GStreamerOutput from mopidy.utils.path import path_to_uri from mopidy.utils.process import pickle_connection from tests import data_folder class GStreamerOutputTest(unittest.TestCase): def setUp(self): self.original_backends = settings.BACKENDS settings.BACKENDS = ('mopidy.backends.local.LocalBackend',) self.song_uri = path_to_uri(data_folder('song1.wav')) self.core_queue = multiprocessing.Queue() self.output = GStreamerOutput(self.core_queue) self.output.start() def tearDown(self): self.output.destroy() settings.BACKENDS = settings.original_backends def send_recv(self, message): (my_end, other_end) = multiprocessing.Pipe() message.update({ 'to': 'output', 'reply_to': pickle_connection(other_end), }) self.output.process_message(message) my_end.poll(None) return my_end.recv() def send(self, message): message.update({'to': 'output'}) self.output.process_message(message) def test_play_uri_existing_file(self): message = {'command': 'play_uri', 'uri': self.song_uri} self.assertEqual(True, self.send_recv(message)) def test_play_uri_non_existing_file(self): message = {'command': 'play_uri', 'uri': self.song_uri + 'bogus'} self.assertEqual(False, self.send_recv(message)) def test_default_get_volume_result(self): message = {'command': 'get_volume'} self.assertEqual(100, self.send_recv(message)) def test_set_volume(self): self.send({'command': 'set_volume', 'volume': 50}) self.assertEqual(50, self.send_recv({'command': 'get_volume'})) def test_set_volume_to_zero(self): self.send({'command': 'set_volume', 'volume': 0}) self.assertEqual(0, self.send_recv({'command': 'get_volume'})) def test_set_volume_to_one_hundred(self): self.send({'command': 'set_volume', 'volume': 100}) self.assertEqual(100, self.send_recv({'command': 'get_volume'})) @SkipTest def test_set_state(self): raise NotImplementedError