Merge pull request #1376 from dublok/fix/310-persist-mopidy-state-between-runs

core: Persist mopidy state between runs. Fix #310
This commit is contained in:
Stein Magnus Jodal 2016-09-17 22:06:12 +02:00 committed by GitHub
commit 70e510e459
25 changed files with 1091 additions and 64 deletions

View File

@ -53,7 +53,6 @@ in core see :class:`~mopidy.core.CoreListener`.
.. automethod:: get_version
Tracklist controller
====================

View File

@ -10,6 +10,9 @@ v2.1.0 (UNRELEASED)
Feature release.
- Core: Mopidy restores its last state when started. Can be enabled by setting
the config value :confval:`core/restore_state` to ``true``.
- MPD: Fix MPD protocol for ``replay_gain_status`` command. The actual command
remains unimplemented. (PR: :issue:`1520`)

View File

@ -111,6 +111,13 @@ Core config section
The original MPD server only supports 10000 tracks in the tracklist. Some
MPD clients will crash if this limit is exceeded.
.. confval:: core/restore_state
When set to ``true``, Mopidy restores its last state when started.
The restored state includes the tracklist, playback history,
the playback state, the volume, and mute state.
Default is ``false``.
.. _audio-config:

View File

@ -295,6 +295,7 @@ class RootCommand(Command):
mixer_class = self.get_mixer_class(config, args.registry['mixer'])
backend_classes = args.registry['backend']
frontend_classes = args.registry['frontend']
core = None
exit_status_code = 0
try:
@ -321,7 +322,7 @@ class RootCommand(Command):
finally:
loop.quit()
self.stop_frontends(frontend_classes)
self.stop_core()
self.stop_core(core)
self.stop_backends(backend_classes)
self.stop_audio()
if mixer_class is not None:
@ -397,8 +398,10 @@ class RootCommand(Command):
def start_core(self, config, mixer, backends, audio):
logger.info('Starting Mopidy core')
return Core.start(
core = Core.start(
config=config, mixer=mixer, backends=backends, audio=audio).proxy()
core.setup().get()
return core
def start_frontends(self, config, frontend_classes, core):
logger.info(
@ -415,8 +418,10 @@ class RootCommand(Command):
for frontend_class in frontend_classes:
process.stop_actors_by_class(frontend_class)
def stop_core(self):
def stop_core(self, core):
logger.info('Stopping Mopidy core')
if core:
core.teardown().get()
process.stop_actors_by_class(Core)
def stop_backends(self, backend_classes):

View File

@ -24,6 +24,7 @@ _core_schema['config_dir'] = Path()
_core_schema['data_dir'] = Path()
# MPD supports at most 10k tracks, some clients segfault when this is exceeded.
_core_schema['max_tracklist_length'] = Integer(minimum=1, maximum=10000)
_core_schema['restore_state'] = Boolean(optional=True)
_logging_schema = ConfigSchema('logging')
_logging_schema['color'] = Boolean()

View File

@ -3,6 +3,7 @@ cache_dir = $XDG_CACHE_DIR/mopidy
config_dir = $XDG_CONFIG_DIR/mopidy
data_dir = $XDG_DATA_DIR/mopidy
max_tracklist_length = 10000
restore_state = false
[logging]
color = true

View File

@ -3,9 +3,12 @@ from __future__ import absolute_import, unicode_literals
import collections
import itertools
import logging
import os
import pykka
import mopidy
from mopidy import audio, backend, mixer
from mopidy.audio import PlaybackState
from mopidy.core.history import HistoryController
@ -15,8 +18,9 @@ from mopidy.core.mixer import MixerController
from mopidy.core.playback import PlaybackController
from mopidy.core.playlists import PlaylistsController
from mopidy.core.tracklist import TracklistController
from mopidy.internal import versioning
from mopidy.internal import path, storage, validation, versioning
from mopidy.internal.deprecation import deprecated_property
from mopidy.internal.models import CoreState
logger = logging.getLogger(__name__)
@ -136,6 +140,91 @@ class Core(
self.playback._stream_title = title
CoreListener.send('stream_title_changed', title=title)
def setup(self):
"""Do not call this function. It is for internal use at startup."""
try:
coverage = []
if self._config and 'restore_state' in self._config['core']:
if self._config['core']['restore_state']:
coverage = ['tracklist', 'mode', 'play-last', 'mixer',
'history']
if len(coverage):
self._load_state(coverage)
except Exception as e:
logger.warn('Restore state: Unexpected error: %s', str(e))
def teardown(self):
"""Do not call this function. It is for internal use at shutdown."""
try:
if self._config and 'restore_state' in self._config['core']:
if self._config['core']['restore_state']:
self._save_state()
except Exception as e:
logger.warn('Unexpected error while saving state: %s', str(e))
def _get_data_dir(self):
# get or create data director for core
data_dir_path = os.path.join(self._config['core']['data_dir'], b'core')
path.get_or_create_dir(data_dir_path)
return data_dir_path
def _save_state(self):
"""
Save current state to disk.
"""
file_name = os.path.join(self._get_data_dir(), b'state.json.gz')
logger.info('Saving state to %s', file_name)
data = {}
data['version'] = mopidy.__version__
data['state'] = CoreState(
tracklist=self.tracklist._save_state(),
history=self.history._save_state(),
playback=self.playback._save_state(),
mixer=self.mixer._save_state())
storage.dump(file_name, data)
logger.debug('Saving state done')
def _load_state(self, coverage):
"""
Restore state from disk.
Load state from disk and restore it. Parameter ``coverage``
limits the amount of data to restore. Possible
values for ``coverage`` (list of one or more of):
- 'tracklist' fill the tracklist
- 'mode' set tracklist properties (consume, random, repeat, single)
- 'play-last' restore play state ('tracklist' also required)
- 'mixer' set mixer volume and mute state
- 'history' restore history
:param coverage: amount of data to restore
:type coverage: list of strings
"""
file_name = os.path.join(self._get_data_dir(), b'state.json.gz')
logger.info('Loading state from %s', file_name)
data = storage.load(file_name)
try:
# Try only once. If something goes wrong, the next start is clean.
os.remove(file_name)
except OSError:
logger.info('Failed to delete %s', file_name)
if 'state' in data:
core_state = data['state']
validation.check_instance(core_state, CoreState)
self.history._load_state(core_state.history, coverage)
self.tracklist._load_state(core_state.tracklist, coverage)
self.mixer._load_state(core_state.mixer, coverage)
# playback after tracklist
self.playback._load_state(core_state.playback, coverage)
logger.debug('Loading state done')
class Backends(list):

View File

@ -5,7 +5,7 @@ import logging
import time
from mopidy import models
from mopidy.internal.models import HistoryState, HistoryTrack
logger = logging.getLogger(__name__)
@ -57,3 +57,21 @@ class HistoryController(object):
:rtype: list of (timestamp, :class:`mopidy.models.Ref`) tuples
"""
return copy.copy(self._history)
def _save_state(self):
# 500 tracks a 3 minutes -> 24 hours history
count_max = 500
count = 1
history_list = []
for timestamp, track in self._history:
history_list.append(
HistoryTrack(timestamp=timestamp, track=track))
count += 1
if count_max < count:
logger.info('Limiting history to %s tracks', count_max)
break
return HistoryState(history=history_list)
def _load_state(self, state, coverage):
if state and 'history' in coverage:
self._history = [(h.timestamp, h.track) for h in state.history]

View File

@ -5,6 +5,7 @@ import logging
from mopidy import exceptions
from mopidy.internal import validation
from mopidy.internal.models import MixerState
logger = logging.getLogger(__name__)
@ -99,3 +100,13 @@ class MixerController(object):
return result
return False
def _save_state(self):
return MixerState(volume=self.get_volume(),
mute=self.get_mute())
def _load_state(self, state, coverage):
if state and 'mixer' in coverage:
self.set_mute(state.mute)
if state.volume:
self.set_volume(state.volume)

View File

@ -2,11 +2,10 @@ from __future__ import absolute_import, unicode_literals
import logging
from mopidy import models
from mopidy.audio import PlaybackState
from mopidy.compat import urllib
from mopidy.core import listener
from mopidy.internal import deprecation, validation
from mopidy.internal import deprecation, models, validation
logger = logging.getLogger(__name__)
@ -30,6 +29,9 @@ class PlaybackController(object):
self._last_position = None
self._previous = False
self._start_at_position = None
self._start_paused = False
if self._audio:
self._audio.set_about_to_finish_callback(
self._on_about_to_finish_callback)
@ -226,6 +228,13 @@ class PlaybackController(object):
if self._pending_position is None:
self.set_state(PlaybackState.PLAYING)
self._trigger_track_playback_started()
seek_ok = False
if self._start_at_position:
seek_ok = self.seek(self._start_at_position)
self._start_at_position = None
if not seek_ok and self._start_paused:
self.pause()
self._start_paused = False
else:
self._seek(self._pending_position)
@ -233,6 +242,9 @@ class PlaybackController(object):
if self._pending_position is not None:
self._trigger_seeked(self._pending_position)
self._pending_position = None
if self._start_paused:
self._start_paused = False
self.pause()
def _on_about_to_finish_callback(self):
"""Callback that performs a blocking actor call to the real callback.
@ -596,3 +608,17 @@ class PlaybackController(object):
# TODO: Trigger this from audio events?
logger.debug('Triggering seeked event')
listener.CoreListener.send('seeked', time_position=time_position)
def _save_state(self):
return models.PlaybackState(
tlid=self.get_current_tlid(),
time_position=self.get_time_position(),
state=self.get_state())
def _load_state(self, state, coverage):
if state and 'play-last' in coverage and state.tlid is not None:
if state.state == PlaybackState.PAUSED:
self._start_paused = True
if state.state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
self._start_at_position = state.time_position
self.play(tlid=state.tlid)

View File

@ -6,6 +6,7 @@ import random
from mopidy import exceptions
from mopidy.core import listener
from mopidy.internal import deprecation, validation
from mopidy.internal.models import TracklistState
from mopidy.models import TlTrack, Track
logger = logging.getLogger(__name__)
@ -646,3 +647,24 @@ class TracklistController(object):
def _trigger_options_changed(self):
logger.debug('Triggering options changed event')
listener.CoreListener.send('options_changed')
def _save_state(self):
return TracklistState(
tl_tracks=self._tl_tracks,
next_tlid=self._next_tlid,
consume=self.get_consume(),
random=self.get_random(),
repeat=self.get_repeat(),
single=self.get_single())
def _load_state(self, state, coverage):
if state:
if 'mode' in coverage:
self.set_consume(state.consume)
self.set_random(state.random)
self.set_repeat(state.repeat)
self.set_single(state.single)
if 'tracklist' in coverage:
self._next_tlid = max(state.next_tlid, self._next_tlid)
self._tl_tracks = list(state.tl_tracks)
self._increase_version()

144
mopidy/internal/models.py Normal file
View File

@ -0,0 +1,144 @@
from __future__ import absolute_import, unicode_literals
from mopidy.internal import validation
from mopidy.models import Ref, TlTrack, fields
from mopidy.models.immutable import ValidatedImmutableObject
class HistoryTrack(ValidatedImmutableObject):
"""
A history track. Wraps a :class:`Ref` and its timestamp.
:param timestamp: the timestamp
:type timestamp: int
:param track: the track reference
:type track: :class:`Ref`
"""
# The timestamp. Read-only.
timestamp = fields.Integer()
# The track reference. Read-only.
track = fields.Field(type=Ref)
class HistoryState(ValidatedImmutableObject):
"""
State of the history controller.
Internally used for save/load state.
:param history: the track history
:type history: list of :class:`HistoryTrack`
"""
# The tracks. Read-only.
history = fields.Collection(type=HistoryTrack, container=tuple)
class MixerState(ValidatedImmutableObject):
"""
State of the mixer controller.
Internally used for save/load state.
:param volume: the volume
:type volume: int
:param mute: the mute state
:type mute: int
"""
# The volume. Read-only.
volume = fields.Integer(min=0, max=100)
# The mute state. Read-only.
mute = fields.Boolean(default=False)
class PlaybackState(ValidatedImmutableObject):
"""
State of the playback controller.
Internally used for save/load state.
:param tlid: current track tlid
:type tlid: int
:param time_position: play position
:type time_position: int
:param state: playback state
:type state: :class:`validation.PLAYBACK_STATES`
"""
# The tlid of current playing track. Read-only.
tlid = fields.Integer(min=1)
# The playback position. Read-only.
time_position = fields.Integer(min=0)
# The playback state. Read-only.
state = fields.Field(choices=validation.PLAYBACK_STATES)
class TracklistState(ValidatedImmutableObject):
"""
State of the tracklist controller.
Internally used for save/load state.
:param repeat: the repeat mode
:type repeat: bool
:param consume: the consume mode
:type consume: bool
:param random: the random mode
:type random: bool
:param single: the single mode
:type single: bool
:param next_tlid: the id for the next added track
:type next_tlid: int
:param tl_tracks: the list of tracks
:type tl_tracks: list of :class:`TlTrack`
"""
# The repeat mode. Read-only.
repeat = fields.Boolean()
# The consume mode. Read-only.
consume = fields.Boolean()
# The random mode. Read-only.
random = fields.Boolean()
# The single mode. Read-only.
single = fields.Boolean()
# The id of the track to play. Read-only.
next_tlid = fields.Integer(min=0)
# The list of tracks. Read-only.
tl_tracks = fields.Collection(type=TlTrack, container=tuple)
class CoreState(ValidatedImmutableObject):
"""
State of all Core controller.
Internally used for save/load state.
:param history: State of the history controller
:type history: :class:`HistorState`
:param mixer: State of the mixer controller
:type mixer: :class:`MixerState`
:param playback: State of the playback controller
:type playback: :class:`PlaybackState`
:param tracklist: State of the tracklist controller
:type tracklist: :class:`TracklistState`
"""
# State of the history controller.
history = fields.Field(type=HistoryState)
# State of the mixer controller.
mixer = fields.Field(type=MixerState)
# State of the playback controller.
playback = fields.Field(type=PlaybackState)
# State of the tracklist controller.
tracklist = fields.Field(type=TracklistState)

View File

@ -0,0 +1,60 @@
from __future__ import absolute_import, unicode_literals
import gzip
import json
import logging
import os
import tempfile
from mopidy import models
from mopidy.internal import encoding
logger = logging.getLogger(__name__)
def load(path):
"""
Deserialize data from file.
:param path: full path to import file
:type path: bytes
:return: deserialized data
:rtype: dict
"""
# Todo: raise an exception in case of error?
if not os.path.isfile(path):
logger.info('File does not exist: %s', path)
return {}
try:
with gzip.open(path, 'rb') as fp:
return json.load(fp, object_hook=models.model_json_decoder)
except (IOError, ValueError) as error:
logger.warning(
'Loading JSON failed: %s',
encoding.locale_decode(error))
return {}
def dump(path, data):
"""
Serialize data to file.
:param path: full path to export file
:type path: bytes
:param data: dictionary containing data to save
:type data: dict
"""
directory, basename = os.path.split(path)
# TODO: cleanup directory/basename.* files.
tmp = tempfile.NamedTemporaryFile(
prefix=basename + '.', dir=directory, delete=False)
try:
with gzip.GzipFile(fileobj=tmp, mode='wb') as fp:
json.dump(data, fp, cls=models.ModelJSONEncoder,
indent=2, separators=(',', ': '))
os.rename(tmp.name, path)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)

View File

@ -1,60 +1,22 @@
from __future__ import absolute_import, absolute_import, unicode_literals
import collections
import gzip
import json
import logging
import os
import re
import sys
import tempfile
import mopidy
from mopidy import compat, local, models
from mopidy.internal import encoding, timer
from mopidy.internal import storage as internal_storage
from mopidy.internal import timer
from mopidy.local import search, storage, translator
logger = logging.getLogger(__name__)
# TODO: move to load and dump in models?
def load_library(json_file):
if not os.path.isfile(json_file):
logger.info(
'No local library metadata cache found at %s. Please run '
'`mopidy local scan` to index your local music library. '
'If you do not have a local music collection, you can disable the '
'local backend to hide this message.',
json_file)
return {}
try:
with gzip.open(json_file, 'rb') as fp:
return json.load(fp, object_hook=models.model_json_decoder)
except (IOError, ValueError) as error:
logger.warning(
'Loading JSON local library failed: %s',
encoding.locale_decode(error))
return {}
def write_library(json_file, data):
data['version'] = mopidy.__version__
directory, basename = os.path.split(json_file)
# TODO: cleanup directory/basename.* files.
tmp = tempfile.NamedTemporaryFile(
prefix=basename + '.', dir=directory, delete=False)
try:
with gzip.GzipFile(fileobj=tmp, mode='wb') as fp:
json.dump(data, fp, cls=models.ModelJSONEncoder,
indent=2, separators=(',', ': '))
os.rename(tmp.name, json_file)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
class _BrowseCache(object):
encoding = sys.getfilesystemencoding()
splitpath_re = re.compile(r'([^/]+)')
@ -128,8 +90,18 @@ class JsonLibrary(local.Library):
def load(self):
logger.debug('Loading library: %s', self._json_file)
with timer.time_logger('Loading tracks'):
library = load_library(self._json_file)
self._tracks = dict((t.uri, t) for t in library.get('tracks', []))
if not os.path.isfile(self._json_file):
logger.info(
'No local library metadata cache found at %s. Please run '
'`mopidy local scan` to index your local music library. '
'If you do not have a local music collection, you can '
'disable the local backend to hide this message.',
self._json_file)
self._tracks = {}
else:
library = internal_storage.load(self._json_file)
self._tracks = dict((t.uri, t) for t in
library.get('tracks', []))
with timer.time_logger('Building browse cache'):
self._browse_cache = _BrowseCache(sorted(self._tracks.keys()))
return len(self._tracks)
@ -195,7 +167,10 @@ class JsonLibrary(local.Library):
self._tracks.pop(uri, None)
def close(self):
write_library(self._json_file, {'tracks': self._tracks.values()})
internal_storage.dump(self._json_file, {
'version': mopidy.__version__,
'tracks': self._tracks.values()
})
def clear(self):
try:

View File

@ -138,6 +138,17 @@ class Integer(Field):
return value
class Boolean(Field):
"""
:class:`Field` for storing boolean values
:param default: default value for field
"""
def __init__(self, default=None):
super(Boolean, self).__init__(type=bool, default=default)
class Collection(Field):
"""
:class:`Field` for storing collections of a given type.

View File

@ -8,6 +8,10 @@ from mopidy.internal import deprecation
from mopidy.models.fields import Field
# Registered models for automatic deserialization
_models = {}
class ImmutableObject(object):
"""
Superclass for immutable objects whose fields can only be modified via the
@ -150,9 +154,14 @@ class _ValidatedImmutableObjectMeta(type):
attrs['_instances'] = weakref.WeakValueDictionary()
attrs['__slots__'] = list(attrs.get('__slots__', [])) + fields.values()
return super(_ValidatedImmutableObjectMeta, cls).__new__(
clsc = super(_ValidatedImmutableObjectMeta, cls).__new__(
cls, name, bases, attrs)
if clsc.__name__ != 'ValidatedImmutableObject':
_models[clsc.__name__] = clsc
return clsc
def __call__(cls, *args, **kwargs): # noqa: N805
instance = super(_ValidatedImmutableObjectMeta, cls).__call__(
*args, **kwargs)

View File

@ -4,8 +4,6 @@ import json
from mopidy.models import immutable
_MODELS = ['Ref', 'Artist', 'Album', 'Track', 'TlTrack', 'Playlist']
class ModelJSONEncoder(json.JSONEncoder):
@ -40,8 +38,8 @@ def model_json_decoder(dct):
"""
if '__model__' in dct:
from mopidy import models
model_name = dct.pop('__model__')
if model_name in _MODELS:
return getattr(models, model_name)(**dct)
if model_name in immutable._models:
cls = immutable._models[model_name]
return cls(**dct)
return dct

View File

@ -1,13 +1,20 @@
from __future__ import absolute_import, unicode_literals
import os
import shutil
import tempfile
import unittest
import mock
import pykka
import mopidy
from mopidy.core import Core
from mopidy.internal import versioning
from mopidy.internal import models, storage, versioning
from mopidy.models import Track
from tests import dummy_mixer
class CoreActorTest(unittest.TestCase):
@ -43,3 +50,106 @@ class CoreActorTest(unittest.TestCase):
def test_version(self):
self.assertEqual(self.core.version, versioning.get_version())
class CoreActorSaveLoadStateTest(unittest.TestCase):
def setUp(self):
self.temp_dir = tempfile.mkdtemp()
self.state_file = os.path.join(self.temp_dir,
b'core', b'state.json.gz')
config = {
'core': {
'max_tracklist_length': 10000,
'restore_state': True,
'data_dir': self.temp_dir,
}
}
os.mkdir(os.path.join(self.temp_dir, b'core'))
self.mixer = dummy_mixer.create_proxy()
self.core = Core(
config=config, mixer=self.mixer, backends=[])
def tearDown(self): # noqa: N802
pykka.ActorRegistry.stop_all()
shutil.rmtree(self.temp_dir)
def test_save_state(self):
self.core.teardown()
assert os.path.isfile(self.state_file)
reload_data = storage.load(self.state_file)
data = {}
data['version'] = mopidy.__version__
data['state'] = models.CoreState(
tracklist=models.TracklistState(
repeat=False, random=False,
consume=False, single=False,
next_tlid=1),
history=models.HistoryState(),
playback=models.PlaybackState(state='stopped',
time_position=0),
mixer=models.MixerState())
assert data == reload_data
def test_load_state_no_file(self):
self.core.setup()
assert self.core.mixer.get_mute() is None
assert self.core.mixer.get_volume() is None
assert self.core.tracklist._next_tlid == 1
assert self.core.tracklist.get_repeat() is False
assert self.core.tracklist.get_random() is False
assert self.core.tracklist.get_consume() is False
assert self.core.tracklist.get_single() is False
assert self.core.tracklist.get_length() == 0
assert self.core.playback._start_paused is False
assert self.core.playback._start_at_position is None
assert self.core.history.get_length() == 0
def test_load_state_with_data(self):
data = {}
data['version'] = mopidy.__version__
data['state'] = models.CoreState(
tracklist=models.TracklistState(
repeat=True, random=True,
consume=False, single=False,
tl_tracks=[models.TlTrack(tlid=12, track=Track(uri='a:a'))],
next_tlid=14),
history=models.HistoryState(history=[
models.HistoryTrack(
timestamp=12,
track=models.Ref.track(uri='a:a', name='a')),
models.HistoryTrack(
timestamp=13,
track=models.Ref.track(uri='a:b', name='b'))]),
playback=models.PlaybackState(tlid=12, state='paused',
time_position=432),
mixer=models.MixerState(mute=True, volume=12))
storage.dump(self.state_file, data)
self.core.setup()
assert self.core.mixer.get_mute() is True
assert self.core.mixer.get_volume() == 12
assert self.core.tracklist._next_tlid == 14
assert self.core.tracklist.get_repeat() is True
assert self.core.tracklist.get_random() is True
assert self.core.tracklist.get_consume() is False
assert self.core.tracklist.get_single() is False
assert self.core.tracklist.get_length() == 1
assert self.core.playback._start_paused is True
assert self.core.playback._start_at_position == 432
assert self.core.history.get_length() == 2
def test_delete_state_file_on_restore(self):
data = {}
storage.dump(self.state_file, data)
assert os.path.isfile(self.state_file)
self.core.setup()
assert not os.path.isfile(self.state_file)

View File

@ -4,7 +4,8 @@ import unittest
from mopidy import compat
from mopidy.core import HistoryController
from mopidy.models import Artist, Track
from mopidy.internal.models import HistoryState, HistoryTrack
from mopidy.models import Artist, Ref, Track
class PlaybackHistoryTest(unittest.TestCase):
@ -46,3 +47,60 @@ class PlaybackHistoryTest(unittest.TestCase):
self.assertIn(track.name, ref.name)
for artist in track.artists:
self.assertIn(artist.name, ref.name)
class CoreHistorySaveLoadStateTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.tracks = [
Track(uri='dummy1:a', name='foober'),
Track(uri='dummy2:a', name='foo'),
Track(uri='dummy3:a', name='bar')
]
self.refs = []
for t in self.tracks:
self.refs.append(Ref.track(uri=t.uri, name=t.name))
self.history = HistoryController()
def test_save(self):
self.history._add_track(self.tracks[2])
self.history._add_track(self.tracks[1])
value = self.history._save_state()
self.assertEqual(len(value.history), 2)
# last in, first out
self.assertEqual(value.history[0].track, self.refs[1])
self.assertEqual(value.history[1].track, self.refs[2])
def test_load(self):
state = HistoryState(history=[
HistoryTrack(timestamp=34, track=self.refs[0]),
HistoryTrack(timestamp=45, track=self.refs[2]),
HistoryTrack(timestamp=56, track=self.refs[1])])
coverage = ['history']
self.history._load_state(state, coverage)
hist = self.history.get_history()
self.assertEqual(len(hist), 3)
self.assertEqual(hist[0], (34, self.refs[0]))
self.assertEqual(hist[1], (45, self.refs[2]))
self.assertEqual(hist[2], (56, self.refs[1]))
# after import, adding more tracks must be possible
self.history._add_track(self.tracks[1])
hist = self.history.get_history()
self.assertEqual(len(hist), 4)
self.assertEqual(hist[0][1], self.refs[1])
self.assertEqual(hist[1], (34, self.refs[0]))
self.assertEqual(hist[2], (45, self.refs[2]))
self.assertEqual(hist[3], (56, self.refs[1]))
def test_load_invalid_type(self):
with self.assertRaises(TypeError):
self.history._load_state(11, None)
def test_load_none(self):
self.history._load_state(None, None)

View File

@ -7,6 +7,7 @@ import mock
import pykka
from mopidy import core, mixer
from mopidy.internal.models import MixerState
from tests import dummy_mixer
@ -154,3 +155,68 @@ class SetMuteBadBackendTest(MockBackendCoreMixerBase):
def test_backend_returns_wrong_type(self):
self.mixer.set_mute.return_value.get.return_value = 'done'
self.assertFalse(self.core.mixer.set_mute(True))
class CoreMixerSaveLoadStateTest(unittest.TestCase):
def setUp(self): # noqa: N802
self.mixer = dummy_mixer.create_proxy()
self.core = core.Core(mixer=self.mixer, backends=[])
def test_save_mute(self):
volume = 32
mute = False
target = MixerState(volume=volume, mute=mute)
self.core.mixer.set_volume(volume)
self.core.mixer.set_mute(mute)
value = self.core.mixer._save_state()
self.assertEqual(target, value)
def test_save_unmute(self):
volume = 33
mute = True
target = MixerState(volume=volume, mute=mute)
self.core.mixer.set_volume(volume)
self.core.mixer.set_mute(mute)
value = self.core.mixer._save_state()
self.assertEqual(target, value)
def test_load(self):
self.core.mixer.set_volume(11)
volume = 45
target = MixerState(volume=volume)
coverage = ['mixer']
self.core.mixer._load_state(target, coverage)
self.assertEqual(volume, self.core.mixer.get_volume())
def test_load_not_covered(self):
self.core.mixer.set_volume(21)
self.core.mixer.set_mute(True)
target = MixerState(volume=56, mute=False)
coverage = ['other']
self.core.mixer._load_state(target, coverage)
self.assertEqual(21, self.core.mixer.get_volume())
self.assertEqual(True, self.core.mixer.get_mute())
def test_load_mute_on(self):
self.core.mixer.set_mute(False)
self.assertEqual(False, self.core.mixer.get_mute())
target = MixerState(mute=True)
coverage = ['mixer']
self.core.mixer._load_state(target, coverage)
self.assertEqual(True, self.core.mixer.get_mute())
def test_load_mute_off(self):
self.core.mixer.set_mute(True)
self.assertEqual(True, self.core.mixer.get_mute())
target = MixerState(mute=False)
coverage = ['mixer']
self.core.mixer._load_state(target, coverage)
self.assertEqual(False, self.core.mixer.get_mute())
def test_load_invalid_type(self):
with self.assertRaises(TypeError):
self.core.mixer._load_state(11, None)
def test_load_none(self):
self.core.mixer._load_state(None, None)

View File

@ -8,6 +8,7 @@ import pykka
from mopidy import backend, core
from mopidy.internal import deprecation
from mopidy.internal.models import PlaybackState
from mopidy.models import Track
from tests import dummy_audio
@ -1132,6 +1133,62 @@ class TestBug1177Regression(unittest.TestCase):
b.playback.change_track.assert_called_once_with(track2)
class TestCorePlaybackSaveLoadState(BaseTest):
def test_save(self):
tl_tracks = self.core.tracklist.get_tl_tracks()
self.core.playback.play(tl_tracks[1])
self.replay_events()
state = PlaybackState(
time_position=0, state='playing', tlid=tl_tracks[1].tlid)
value = self.core.playback._save_state()
self.assertEqual(state, value)
def test_load(self):
tl_tracks = self.core.tracklist.get_tl_tracks()
self.core.playback.stop()
self.replay_events()
self.assertEqual('stopped', self.core.playback.get_state())
state = PlaybackState(
time_position=0, state='playing', tlid=tl_tracks[2].tlid)
coverage = ['play-last']
self.core.playback._load_state(state, coverage)
self.replay_events()
self.assertEqual('playing', self.core.playback.get_state())
self.assertEqual(tl_tracks[2],
self.core.playback.get_current_tl_track())
def test_load_not_covered(self):
tl_tracks = self.core.tracklist.get_tl_tracks()
self.core.playback.stop()
self.replay_events()
self.assertEqual('stopped', self.core.playback.get_state())
state = PlaybackState(
time_position=0, state='playing', tlid=tl_tracks[2].tlid)
coverage = ['other']
self.core.playback._load_state(state, coverage)
self.replay_events()
self.assertEqual('stopped', self.core.playback.get_state())
self.assertEqual(None,
self.core.playback.get_current_tl_track())
def test_load_invalid_type(self):
with self.assertRaises(TypeError):
self.core.playback._load_state(11, None)
def test_load_none(self):
self.core.playback._load_state(None, None)
class TestBug1352Regression(BaseTest):
tracks = [
Track(uri='dummy:a', length=40000),

View File

@ -6,6 +6,7 @@ import mock
from mopidy import backend, core
from mopidy.internal import deprecation
from mopidy.internal.models import TracklistState
from mopidy.models import TlTrack, Track
@ -177,3 +178,119 @@ class TracklistIndexTest(unittest.TestCase):
self.assertEqual(0, self.core.tracklist.index())
self.assertEqual(1, self.core.tracklist.index())
self.assertEqual(2, self.core.tracklist.index())
class TracklistSaveLoadStateTest(unittest.TestCase):
def setUp(self): # noqa: N802
config = {
'core': {
'max_tracklist_length': 10000,
}
}
self.tracks = [
Track(uri='dummy1:a', name='foo'),
Track(uri='dummy1:b', name='foo'),
Track(uri='dummy1:c', name='bar'),
]
self.tl_tracks = [
TlTrack(tlid=4, track=Track(uri='first', name='First')),
TlTrack(tlid=5, track=Track(uri='second', name='Second')),
TlTrack(tlid=6, track=Track(uri='third', name='Third')),
TlTrack(tlid=8, track=Track(uri='last', name='Last'))
]
def lookup(uris):
return {u: [t for t in self.tracks if t.uri == u] for u in uris}
self.core = core.Core(config, mixer=None, backends=[])
self.core.library = mock.Mock(spec=core.LibraryController)
self.core.library.lookup.side_effect = lookup
self.core.playback = mock.Mock(spec=core.PlaybackController)
def test_save(self):
tl_tracks = self.core.tracklist.add(uris=[
t.uri for t in self.tracks])
consume = True
next_tlid = len(tl_tracks) + 1
self.core.tracklist.set_consume(consume)
target = TracklistState(consume=consume,
repeat=False,
single=False,
random=False,
next_tlid=next_tlid,
tl_tracks=tl_tracks)
value = self.core.tracklist._save_state()
self.assertEqual(target, value)
def test_load(self):
old_version = self.core.tracklist.get_version()
target = TracklistState(consume=False,
repeat=True,
single=True,
random=False,
next_tlid=12,
tl_tracks=self.tl_tracks)
coverage = ['mode', 'tracklist']
self.core.tracklist._load_state(target, coverage)
self.assertEqual(False, self.core.tracklist.get_consume())
self.assertEqual(True, self.core.tracklist.get_repeat())
self.assertEqual(True, self.core.tracklist.get_single())
self.assertEqual(False, self.core.tracklist.get_random())
self.assertEqual(12, self.core.tracklist._next_tlid)
self.assertEqual(4, self.core.tracklist.get_length())
self.assertEqual(self.tl_tracks, self.core.tracklist.get_tl_tracks())
self.assertGreater(self.core.tracklist.get_version(), old_version)
# after load, adding more tracks must be possible
self.core.tracklist.add(uris=[self.tracks[1].uri])
self.assertEqual(13, self.core.tracklist._next_tlid)
self.assertEqual(5, self.core.tracklist.get_length())
def test_load_mode_only(self):
old_version = self.core.tracklist.get_version()
target = TracklistState(consume=False,
repeat=True,
single=True,
random=False,
next_tlid=12,
tl_tracks=self.tl_tracks)
coverage = ['mode']
self.core.tracklist._load_state(target, coverage)
self.assertEqual(False, self.core.tracklist.get_consume())
self.assertEqual(True, self.core.tracklist.get_repeat())
self.assertEqual(True, self.core.tracklist.get_single())
self.assertEqual(False, self.core.tracklist.get_random())
self.assertEqual(1, self.core.tracklist._next_tlid)
self.assertEqual(0, self.core.tracklist.get_length())
self.assertEqual([], self.core.tracklist.get_tl_tracks())
self.assertEqual(self.core.tracklist.get_version(), old_version)
def test_load_tracklist_only(self):
old_version = self.core.tracklist.get_version()
target = TracklistState(consume=False,
repeat=True,
single=True,
random=False,
next_tlid=12,
tl_tracks=self.tl_tracks)
coverage = ['tracklist']
self.core.tracklist._load_state(target, coverage)
self.assertEqual(False, self.core.tracklist.get_consume())
self.assertEqual(False, self.core.tracklist.get_repeat())
self.assertEqual(False, self.core.tracklist.get_single())
self.assertEqual(False, self.core.tracklist.get_random())
self.assertEqual(12, self.core.tracklist._next_tlid)
self.assertEqual(4, self.core.tracklist.get_length())
self.assertEqual(self.tl_tracks, self.core.tracklist.get_tl_tracks())
self.assertGreater(self.core.tracklist.get_version(), old_version)
def test_load_invalid_type(self):
with self.assertRaises(TypeError):
self.core.tracklist._load_state(11, None)
def test_load_none(self):
self.core.tracklist._load_state(None, None)

View File

@ -0,0 +1,218 @@
from __future__ import absolute_import, unicode_literals
import json
import unittest
from mopidy.internal.models import (
HistoryState, HistoryTrack, MixerState, PlaybackState, TracklistState)
from mopidy.models import (
ModelJSONEncoder, Ref, TlTrack, Track, model_json_decoder)
class HistoryTrackTest(unittest.TestCase):
def test_track(self):
track = Ref.track()
result = HistoryTrack(track=track)
self.assertEqual(result.track, track)
with self.assertRaises(AttributeError):
result.track = None
def test_timestamp(self):
timestamp = 1234
result = HistoryTrack(timestamp=timestamp)
self.assertEqual(result.timestamp, timestamp)
with self.assertRaises(AttributeError):
result.timestamp = None
def test_to_json_and_back(self):
result = HistoryTrack(track=Ref.track(), timestamp=1234)
serialized = json.dumps(result, cls=ModelJSONEncoder)
deserialized = json.loads(serialized, object_hook=model_json_decoder)
self.assertEqual(result, deserialized)
class HistoryStateTest(unittest.TestCase):
def test_history_list(self):
history = (HistoryTrack(),
HistoryTrack())
result = HistoryState(history=history)
self.assertEqual(result.history, history)
with self.assertRaises(AttributeError):
result.history = None
def test_history_string_fail(self):
history = 'not_a_valid_history'
with self.assertRaises(TypeError):
HistoryState(history=history)
def test_to_json_and_back(self):
result = HistoryState(history=(HistoryTrack(), HistoryTrack()))
serialized = json.dumps(result, cls=ModelJSONEncoder)
deserialized = json.loads(serialized, object_hook=model_json_decoder)
self.assertEqual(result, deserialized)
class MixerStateTest(unittest.TestCase):
def test_volume(self):
volume = 37
result = MixerState(volume=volume)
self.assertEqual(result.volume, volume)
with self.assertRaises(AttributeError):
result.volume = None
def test_volume_invalid(self):
volume = 105
with self.assertRaises(ValueError):
MixerState(volume=volume)
def test_mute_false(self):
mute = False
result = MixerState(mute=mute)
self.assertEqual(result.mute, mute)
with self.assertRaises(AttributeError):
result.mute = None
def test_mute_true(self):
mute = True
result = MixerState(mute=mute)
self.assertEqual(result.mute, mute)
with self.assertRaises(AttributeError):
result.mute = False
def test_mute_default(self):
result = MixerState()
self.assertEqual(result.mute, False)
def test_to_json_and_back(self):
result = MixerState(volume=77)
serialized = json.dumps(result, cls=ModelJSONEncoder)
deserialized = json.loads(serialized, object_hook=model_json_decoder)
self.assertEqual(result, deserialized)
class PlaybackStateTest(unittest.TestCase):
def test_position(self):
time_position = 123456
result = PlaybackState(time_position=time_position)
self.assertEqual(result.time_position, time_position)
with self.assertRaises(AttributeError):
result.time_position = None
def test_position_invalid(self):
time_position = -1
with self.assertRaises(ValueError):
PlaybackState(time_position=time_position)
def test_tl_track(self):
tlid = 42
result = PlaybackState(tlid=tlid)
self.assertEqual(result.tlid, tlid)
with self.assertRaises(AttributeError):
result.tlid = None
def test_tl_track_none(self):
tlid = None
result = PlaybackState(tlid=tlid)
self.assertEqual(result.tlid, tlid)
with self.assertRaises(AttributeError):
result.tl_track = None
def test_tl_track_invalid(self):
tl_track = Track()
with self.assertRaises(TypeError):
PlaybackState(tlid=tl_track)
def test_state(self):
state = 'playing'
result = PlaybackState(state=state)
self.assertEqual(result.state, state)
with self.assertRaises(AttributeError):
result.state = None
def test_state_invalid(self):
state = 'not_a_state'
with self.assertRaises(TypeError):
PlaybackState(state=state)
def test_to_json_and_back(self):
result = PlaybackState(state='playing', tlid=4321)
serialized = json.dumps(result, cls=ModelJSONEncoder)
deserialized = json.loads(serialized, object_hook=model_json_decoder)
self.assertEqual(result, deserialized)
class TracklistStateTest(unittest.TestCase):
def test_repeat_true(self):
repeat = True
result = TracklistState(repeat=repeat)
self.assertEqual(result.repeat, repeat)
with self.assertRaises(AttributeError):
result.repeat = None
def test_repeat_false(self):
repeat = False
result = TracklistState(repeat=repeat)
self.assertEqual(result.repeat, repeat)
with self.assertRaises(AttributeError):
result.repeat = None
def test_repeat_invalid(self):
repeat = 33
with self.assertRaises(TypeError):
TracklistState(repeat=repeat)
def test_consume_true(self):
val = True
result = TracklistState(consume=val)
self.assertEqual(result.consume, val)
with self.assertRaises(AttributeError):
result.repeat = None
def test_random_true(self):
val = True
result = TracklistState(random=val)
self.assertEqual(result.random, val)
with self.assertRaises(AttributeError):
result.random = None
def test_single_true(self):
val = True
result = TracklistState(single=val)
self.assertEqual(result.single, val)
with self.assertRaises(AttributeError):
result.single = None
def test_next_tlid(self):
val = 654
result = TracklistState(next_tlid=val)
self.assertEqual(result.next_tlid, val)
with self.assertRaises(AttributeError):
result.next_tlid = None
def test_next_tlid_invalid(self):
val = -1
with self.assertRaises(ValueError):
TracklistState(next_tlid=val)
def test_tracks(self):
tracks = (TlTrack(), TlTrack())
result = TracklistState(tl_tracks=tracks)
self.assertEqual(result.tl_tracks, tracks)
with self.assertRaises(AttributeError):
result.tl_tracks = None
def test_tracks_invalid(self):
tracks = (Track(), Track())
with self.assertRaises(TypeError):
TracklistState(tl_tracks=tracks)
def test_to_json_and_back(self):
result = TracklistState(tl_tracks=(TlTrack(), TlTrack()), next_tlid=4)
serialized = json.dumps(result, cls=ModelJSONEncoder)
deserialized = json.loads(serialized, object_hook=model_json_decoder)
self.assertEqual(result, deserialized)

View File

@ -4,7 +4,8 @@ from __future__ import absolute_import, unicode_literals
import unittest
from mopidy.models.fields import Collection, Field, Identifier, Integer, String
from mopidy.models.fields import (Boolean, Collection, Field, Identifier,
Integer, String)
def create_instance(field):
@ -211,6 +212,27 @@ class IntegerTest(unittest.TestCase):
instance.attr = 11
class BooleanTest(unittest.TestCase):
def test_default_handling(self):
instance = create_instance(Boolean(default=True))
self.assertEqual(True, instance.attr)
def test_true_allowed(self):
instance = create_instance(Boolean())
instance.attr = True
self.assertEqual(True, instance.attr)
def test_false_allowed(self):
instance = create_instance(Boolean())
instance.attr = False
self.assertEqual(False, instance.attr)
def test_int_forbidden(self):
instance = create_instance(Boolean())
with self.assertRaises(TypeError):
instance.attr = 1
class CollectionTest(unittest.TestCase):
def test_container_instance_is_default(self):
instance = create_instance(Collection(type=int, container=frozenset))

View File

@ -4,8 +4,8 @@ import json
import unittest
from mopidy.models import (
Album, Artist, Image, ModelJSONEncoder, Playlist, Ref, SearchResult,
TlTrack, Track, model_json_decoder)
Album, Artist, Image, ModelJSONEncoder, Playlist,
Ref, SearchResult, TlTrack, Track, model_json_decoder)
class InheritanceTest(unittest.TestCase):