Add unit tests for export/restore core state

Fix issues shown by test code
This commit is contained in:
Jens Luetjen 2016-01-03 18:29:35 +01:00
parent a5a9178b06
commit e56c39ee78
10 changed files with 357 additions and 60 deletions

View File

@ -138,18 +138,19 @@ class Core(
def on_start(self):
logger.debug("core on_start")
try:
amount = self._config['core']['restore_state']
coverage = []
if not amount or 'off' == amount:
pass
elif 'load' == amount:
coverage = ['tracklist', 'mode', 'volume', 'history']
elif 'play' == amount:
coverage = ['tracklist', 'mode', 'autoplay', 'volume',
'history']
else:
logger.warn('Unknown value for config '
'core.restore_state: %s', amount)
if self._config and 'restore_state' in self._config['core']:
amount = self._config['core']['restore_state']
if not amount or 'off' == amount:
pass
elif 'load' == amount:
coverage = ['tracklist', 'mode', 'volume', 'history']
elif 'play' == amount:
coverage = ['tracklist', 'mode', 'autoplay', 'volume',
'history']
else:
logger.warn('Unknown value for config '
'core.restore_state: %s', amount)
if len(coverage):
self.load_state('persistent', coverage)
except Exception as e:
@ -159,9 +160,10 @@ class Core(
def on_stop(self):
logger.debug("core on_stop")
try:
amount = self._config['core']['restore_state']
if amount and 'off' != amount:
self.save_state('persistent')
if self._config and 'restore_state' in self._config['core']:
amount = self._config['core']['restore_state']
if amount and 'off' != amount:
self.save_state('persistent')
except Exception as e:
logger.warn('on_stop: Unexpected error: %s', str(e))
pykka.ThreadingActor.on_stop(self)
@ -183,10 +185,10 @@ class Core(
logger.info('Save state to "%s"', file_name)
data = {}
self.tracklist._state_export(data)
self.history._state_export(data)
self.playback._state_export(data)
self.mixer._state_export(data)
data['tracklist'] = self.tracklist._export_state()
data['history'] = self.history._export_state()
data['playback'] = self.playback._export_state()
data['mixer'] = self.mixer._export_state()
storage.save(file_name, data)
def load_state(self, name, coverage):
@ -217,11 +219,15 @@ class Core(
file_name += '.state'
data = storage.load(file_name)
self.history._state_import(data, coverage)
self.tracklist._state_import(data, coverage)
self.playback._state_import(data, coverage)
self.mixer._state_import(data, coverage)
logger.info('Load state done')
if 'history' in data:
self.history._restore_state(data['history'], coverage)
if 'tracklist' in data:
self.tracklist._restore_state(data['tracklist'], coverage)
if 'playback' in data:
self.playback._restore_state(data['playback'], coverage)
if 'mixer' in data:
self.mixer._restore_state(data['mixer'], coverage)
logger.debug('Load state done. file_name="%s"', file_name)
class Backends(list):

View File

@ -58,19 +58,20 @@ class HistoryController(object):
"""
return copy.copy(self._history)
def _state_export(self, data):
def _export_state(self):
"""Internal method for :class:`mopidy.Core`."""
history_list = []
for timestamp, track in self._history:
history_list.append(models.HistoryTrack(
timestamp=timestamp, track=track))
data['history'] = models.HistoryState(history=history_list)
return models.HistoryState(history=history_list)
def _state_import(self, data, coverage):
def _restore_state(self, state, coverage):
"""Internal method for :class:`mopidy.Core`."""
if 'history' in data:
hstate = data['history']
if state:
if not isinstance(state, models.HistoryState):
raise TypeError('Expect an argument of type "HistoryState"')
if 'history' in coverage:
self._history = []
for htrack in hstate.history:
for htrack in state.history:
self._history.append((htrack.timestamp, htrack.track))

View File

@ -101,13 +101,15 @@ class MixerController(object):
return False
def _state_export(self, data):
def _export_state(self):
"""Internal method for :class:`mopidy.Core`."""
data['mixer'] = MixerState(volume=self.get_volume())
return MixerState(volume=self.get_volume())
def _state_import(self, data, coverage):
def _restore_state(self, state, coverage):
"""Internal method for :class:`mopidy.Core`."""
if 'mixer' in data:
ms = data['mixer']
if state:
if not isinstance(state, MixerState):
raise TypeError('Expect an argument of type "MixerState"')
if 'volume' in coverage:
self.set_volume(ms.volume)
if state.volume:
self.set_volume(state.volume)

View File

@ -524,20 +524,19 @@ class PlaybackController(object):
logger.debug('Triggering seeked event')
listener.CoreListener.send('seeked', time_position=time_position)
def _state_export(self, data):
def _export_state(self):
"""Internal method for :class:`mopidy.Core`."""
data['playback'] = models.PlaybackState(
return models.PlaybackState(
tl_track=self.get_current_tl_track(),
position=self.get_time_position(),
state=self.get_state())
def _state_import(self, data, coverage):
def _restore_state(self, state, coverage):
"""Internal method for :class:`mopidy.Core`."""
if 'playback' in data:
ps = data['playback']
if state:
if not isinstance(state, models.PlaybackState):
raise TypeError('Expect an argument of type "PlaybackState"')
if 'autoplay' in coverage:
tl_track = ps.tl_track
if tl_track is not None:
self.play(tl_track=tl_track)
# TODO: Seek not working. It seeks to early.
# self.seek(ps.position)
if state.tl_track is not None:
self.play(tl_track=state.tl_track)
# TODO: seek to state.position?

View File

@ -645,9 +645,9 @@ class TracklistController(object):
logger.debug('Triggering options changed event')
listener.CoreListener.send('options_changed')
def _state_export(self, data):
def _export_state(self):
"""Internal method for :class:`mopidy.Core`."""
data['tracklist'] = TracklistState(
return TracklistState(
tracks=self._tl_tracks,
next_tlid=self._next_tlid,
consume=self.get_consume(),
@ -655,19 +655,20 @@ class TracklistController(object):
repeat=self.get_repeat(),
single=self.get_single())
def _state_import(self, data, coverage):
def _restore_state(self, state, coverage):
"""Internal method for :class:`mopidy.Core`."""
if 'tracklist' in data:
tls = data['tracklist']
if state:
if not isinstance(state, TracklistState):
raise TypeError('Expect an argument of type "TracklistState"')
if 'mode' in coverage:
self.set_consume(tls.consume)
self.set_random(tls.random)
self.set_repeat(tls.repeat)
self.set_single(tls.single)
self.set_consume(state.consume)
self.set_random(state.random)
self.set_repeat(state.repeat)
self.set_single(state.single)
if 'tracklist' in coverage:
if tls.next_tlid > self._next_tlid:
self._next_tlid = tls.next_tlid
if state.next_tlid > self._next_tlid:
self._next_tlid = state.next_tlid
self._tl_tracks = []
for track in tls.tracks:
for track in state.tracks:
self._tl_tracks.append(track)
self._trigger_tracklist_changed()

View File

@ -1,5 +1,7 @@
from __future__ import absolute_import, unicode_literals
import shutil
import tempfile
import unittest
import mock
@ -43,3 +45,29 @@ class CoreActorTest(unittest.TestCase):
def test_version(self):
self.assertEqual(self.core.version, versioning.get_version())
class CoreActorExportRestoreTest(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
config = {
'core': {
'max_tracklist_length': 10000,
'restore_state': 'play',
'data_dir': self.temp_dir,
}
}
self.core = Core.start(
config=config, mixer=None, backends=[]).proxy()
def tearDown(self): # noqa: N802
pykka.ActorRegistry.stop_all()
shutil.rmtree(self.temp_dir)
def test_restore_on_start(self):
# cover mopidy.core.actor.on_start and .on_stop
# starting the actor by calling any function:
self.core.get_version()
pass

View File

@ -4,7 +4,7 @@ import unittest
from mopidy import compat
from mopidy.core import HistoryController
from mopidy.models import Artist, Track
from mopidy.models import Artist, HistoryState, HistoryTrack, Ref, Track
class PlaybackHistoryTest(unittest.TestCase):
@ -46,3 +46,60 @@ class PlaybackHistoryTest(unittest.TestCase):
self.assertIn(track.name, ref.name)
for artist in track.artists:
self.assertIn(artist.name, ref.name)
class CoreHistoryExportRestoreTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.tracks = [
Track(uri='dummy1:a', name='foober'),
Track(uri='dummy2:a', name='foo'),
Track(uri='dummy3:a', name='bar')
]
self.refs = []
for t in self.tracks:
self.refs.append(Ref.track(uri=t.uri, name=t.name))
self.history = HistoryController()
def test_export(self):
self.history._add_track(self.tracks[2])
self.history._add_track(self.tracks[1])
value = self.history._export_state()
self.assertEqual(len(value.history), 2)
# last in, first out
self.assertEqual(value.history[0].track, self.refs[1])
self.assertEqual(value.history[1].track, self.refs[2])
def test_import(self):
state = HistoryState(history=[
HistoryTrack(timestamp=34, track=self.refs[0]),
HistoryTrack(timestamp=45, track=self.refs[2]),
HistoryTrack(timestamp=56, track=self.refs[1])])
coverage = ['history']
self.history._restore_state(state, coverage)
hist = self.history.get_history()
self.assertEqual(len(hist), 3)
self.assertEqual(hist[0], (34, self.refs[0]))
self.assertEqual(hist[1], (45, self.refs[2]))
self.assertEqual(hist[2], (56, self.refs[1]))
# after import, adding more tracks must be possible
self.history._add_track(self.tracks[1])
hist = self.history.get_history()
self.assertEqual(len(hist), 4)
self.assertEqual(hist[0][1], self.refs[1])
self.assertEqual(hist[1], (34, self.refs[0]))
self.assertEqual(hist[2], (45, self.refs[2]))
self.assertEqual(hist[3], (56, self.refs[1]))
def test_import_invalid_type(self):
with self.assertRaises(TypeError):
self.history._restore_state(11, None)
def test_import_none(self):
self.history._restore_state(None, None)

View File

@ -7,6 +7,7 @@ import mock
import pykka
from mopidy import core, mixer
from mopidy.models import MixerState
from tests import dummy_mixer
@ -154,3 +155,39 @@ class SetMuteBadBackendTest(MockBackendCoreMixerBase):
def test_backend_returns_wrong_type(self):
self.mixer.set_mute.return_value.get.return_value = 'done'
self.assertFalse(self.core.mixer.set_mute(True))
class CoreMixerExportRestoreTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.mixer = dummy_mixer.create_proxy()
self.core = core.Core(mixer=self.mixer, backends=[])
def test_export(self):
volume = 32
target = MixerState(volume=volume)
self.core.mixer.set_volume(volume)
value = self.core.mixer._export_state()
self.assertEqual(target, value)
def test_import(self):
self.core.mixer.set_volume(11)
volume = 45
target = MixerState(volume=volume)
coverage = ['volume']
self.core.mixer._restore_state(target, coverage)
self.assertEqual(volume, self.core.mixer.get_volume())
def test_import_not_covered(self):
self.core.mixer.set_volume(21)
target = MixerState(volume=56)
coverage = ['other']
self.core.mixer._restore_state(target, coverage)
self.assertEqual(21, self.core.mixer.get_volume())
def test_import_invalid_type(self):
with self.assertRaises(TypeError):
self.core.mixer._restore_state(11, None)
def test_import_none(self):
self.core.mixer._restore_state(None, None)

View File

@ -8,7 +8,7 @@ import pykka
from mopidy import backend, core
from mopidy.internal import deprecation
from mopidy.models import Track
from mopidy.models import PlaybackState, Track
from tests import dummy_audio
@ -874,3 +874,59 @@ class Bug1177RegressionTest(unittest.TestCase):
c.playback.pause()
c.playback.next()
b.playback.change_track.assert_called_once_with(track2)
class CorePlaybackExportRestoreTest(BaseTest):
def test_export(self):
tl_tracks = self.core.tracklist.get_tl_tracks()
self.core.playback.play(tl_tracks[1])
self.replay_events()
state = PlaybackState(
position=0, state='playing', tl_track=tl_tracks[1])
value = self.core.playback._export_state()
self.assertEqual(state, value)
def test_import(self):
tl_tracks = self.core.tracklist.get_tl_tracks()
self.core.playback.stop()
self.replay_events()
self.assertEqual('stopped', self.core.playback.get_state())
state = PlaybackState(
position=0, state='playing', tl_track=tl_tracks[2])
coverage = ['autoplay']
self.core.playback._restore_state(state, coverage)
self.replay_events()
self.assertEqual('playing', self.core.playback.get_state())
self.assertEqual(tl_tracks[2],
self.core.playback.get_current_tl_track())
def test_import_not_covered(self):
tl_tracks = self.core.tracklist.get_tl_tracks()
self.core.playback.stop()
self.replay_events()
self.assertEqual('stopped', self.core.playback.get_state())
state = PlaybackState(
position=0, state='playing', tl_track=tl_tracks[2])
coverage = ['other']
self.core.playback._restore_state(state, coverage)
self.replay_events()
self.assertEqual('stopped', self.core.playback.get_state())
self.assertEqual(None,
self.core.playback.get_current_tl_track())
def test_import_invalid_type(self):
with self.assertRaises(TypeError):
self.core.playback._restore_state(11, None)
def test_import_none(self):
self.core.playback._restore_state(None, None)

View File

@ -6,7 +6,7 @@ import mock
from mopidy import backend, core
from mopidy.internal import deprecation
from mopidy.models import TlTrack, Track
from mopidy.models import TlTrack, Track, TracklistState
class TracklistTest(unittest.TestCase):
@ -177,3 +177,113 @@ class TracklistIndexTest(unittest.TestCase):
self.assertEqual(0, self.core.tracklist.index())
self.assertEqual(1, self.core.tracklist.index())
self.assertEqual(2, self.core.tracklist.index())
class TracklistExportRestoreTest(unittest.TestCase):
def setUp(self): # noqa: N802
config = {
'core': {
'max_tracklist_length': 10000,
}
}
self.tracks = [
Track(uri='dummy1:a', name='foo'),
Track(uri='dummy1:b', name='foo'),
Track(uri='dummy1:c', name='bar'),
]
self.tl_tracks = [
TlTrack(tlid=4, track=Track(uri='first', name='First')),
TlTrack(tlid=5, track=Track(uri='second', name='Second')),
TlTrack(tlid=6, track=Track(uri='third', name='Third')),
TlTrack(tlid=8, track=Track(uri='last', name='Last'))
]
def lookup(uris):
return {u: [t for t in self.tracks if t.uri == u] for u in uris}
self.core = core.Core(config, mixer=None, backends=[])
self.core.library = mock.Mock(spec=core.LibraryController)
self.core.library.lookup.side_effect = lookup
self.core.playback = mock.Mock(spec=core.PlaybackController)
def test_export(self):
tl_tracks = self.core.tracklist.add(uris=[
t.uri for t in self.tracks])
consume = True
next_tlid = len(tl_tracks) + 1
self.core.tracklist.set_consume(consume)
target = TracklistState(consume=consume,
repeat=False,
single=False,
random=False,
next_tlid=next_tlid,
tracks=tl_tracks)
value = self.core.tracklist._export_state()
self.assertEqual(target, value)
def test_import(self):
target = TracklistState(consume=False,
repeat=True,
single=True,
random=False,
next_tlid=12,
tracks=self.tl_tracks)
coverage = ['mode', 'tracklist']
self.core.tracklist._restore_state(target, coverage)
self.assertEqual(False, self.core.tracklist.get_consume())
self.assertEqual(True, self.core.tracklist.get_repeat())
self.assertEqual(True, self.core.tracklist.get_single())
self.assertEqual(False, self.core.tracklist.get_random())
self.assertEqual(12, self.core.tracklist._next_tlid)
self.assertEqual(4, self.core.tracklist.get_length())
self.assertEqual(self.tl_tracks, self.core.tracklist.get_tl_tracks())
# after import, adding more tracks must be possible
self.core.tracklist.add(uris=[self.tracks[1].uri])
self.assertEqual(13, self.core.tracklist._next_tlid)
self.assertEqual(5, self.core.tracklist.get_length())
def test_import_mode_only(self):
target = TracklistState(consume=False,
repeat=True,
single=True,
random=False,
next_tlid=12,
tracks=self.tl_tracks)
coverage = ['mode']
self.core.tracklist._restore_state(target, coverage)
self.assertEqual(False, self.core.tracklist.get_consume())
self.assertEqual(True, self.core.tracklist.get_repeat())
self.assertEqual(True, self.core.tracklist.get_single())
self.assertEqual(False, self.core.tracklist.get_random())
self.assertEqual(1, self.core.tracklist._next_tlid)
self.assertEqual(0, self.core.tracklist.get_length())
self.assertEqual([], self.core.tracklist.get_tl_tracks())
def test_import_tracklist_only(self):
target = TracklistState(consume=False,
repeat=True,
single=True,
random=False,
next_tlid=12,
tracks=self.tl_tracks)
coverage = ['tracklist']
self.core.tracklist._restore_state(target, coverage)
self.assertEqual(False, self.core.tracklist.get_consume())
self.assertEqual(False, self.core.tracklist.get_repeat())
self.assertEqual(False, self.core.tracklist.get_single())
self.assertEqual(False, self.core.tracklist.get_random())
self.assertEqual(12, self.core.tracklist._next_tlid)
self.assertEqual(4, self.core.tracklist.get_length())
self.assertEqual(self.tl_tracks, self.core.tracklist.get_tl_tracks())
def test_import_invalid_type(self):
with self.assertRaises(TypeError):
self.core.tracklist._restore_state(11, None)
def test_import_none(self):
self.core.tracklist._restore_state(None, None)