Merge pull request #915 from adamcik/feature/taglist-converter

Add support for tracking metadata changes in audio
This commit is contained in:
Stein Magnus Jodal 2014-12-28 20:21:37 +01:00
commit 097172d574
17 changed files with 693 additions and 476 deletions

View File

@ -35,3 +35,9 @@ Audio scanner
.. autoclass:: mopidy.audio.scan.Scanner
:members:
Audio utils
===========
.. automodule:: mopidy.audio.utils
:members:

View File

@ -63,12 +63,35 @@ v0.20.0 (UNRELEASED)
- Add foundation for trying to re-add multiple output support.
- Add internal helper for converting GStreamer data types to Python.
- Move MusicBrainz coverart code out of audio and into local.
- Reduce scope of audio scanner to just tags + duration. Mtime, uri and min
length handling are now outside of this class.
- Update scanner to operate with milliseconds for duration.
- Add :meth:`mopidy.audio.AudioListener.tags_changed`. Notifies core when new tags
are found.
- Add :meth:`mopidy.audio.Audio.get_current_tags` for looking up the current
tags of the playing media.
- Move and rename helper for converting tags to tracks.
- Helper now ignores albums without a name.
- Kill support for visualizers. Feature was originally added as a workaround for
all the people asking for ncmpcpp visualizer support. And since we could get
it almost for free thanks to GStreamer. But this feature didn't really ever
make sense for a server such as Mopidy. Currently the only way to find out if
it is in use and will be missed is to go ahead and remove it.
**Stream backend**
- Add basic tests for the stream library provider.
v0.19.5 (2014-12-23)
====================

View File

@ -303,6 +303,8 @@ class _Handler(object):
self.on_warning(*msg.parse_warning())
elif msg.type == gst.MESSAGE_ASYNC_DONE:
self.on_async_done()
elif msg.type == gst.MESSAGE_TAG:
self.on_tag(msg.parse_tag())
elif msg.type == gst.MESSAGE_ELEMENT:
if gst.pbutils.is_missing_plugin_message(msg):
self.on_missing_plugin(_get_missing_description(msg),
@ -370,6 +372,7 @@ class _Handler(object):
def on_end_of_stream(self):
gst_logger.debug('Got end-of-stream message.')
logger.debug('Audio event: reached_end_of_stream()')
self._audio._tags = {}
AudioListener.send('reached_end_of_stream')
def on_error(self, error, debug):
@ -387,6 +390,12 @@ class _Handler(object):
def on_async_done(self):
gst_logger.debug('Got async-done.')
def on_tag(self, taglist):
tags = utils.convert_taglist(taglist)
self._audio._tags.update(tags)
logger.debug('Audio event: tags_changed(tags=%r)', tags.keys())
AudioListener.send('tags_changed', tags=tags.keys())
def on_missing_plugin(self, msg):
desc = gst.pbutils.missing_plugin_message_get_description(msg)
debug = gst.pbutils.missing_plugin_message_get_installer_detail(msg)
@ -401,7 +410,7 @@ class _Handler(object):
# required helper installed?
def on_new_segment(self, update, rate, format_, start, stop, position):
gst_logger.debug('Got new-segment event: update=%s rate=%s format=%s'
gst_logger.debug('Got new-segment event: update=%s rate=%s format=%s '
'start=%s stop=%s position=%s', update, rate,
format_.value_name, start, stop, position)
position_ms = position // gst.MSECOND
@ -432,6 +441,7 @@ class Audio(pykka.ThreadingActor):
self._config = config
self._target_state = gst.STATE_NULL
self._buffering = False
self._tags = {}
self._playbin = None
self._outputs = None
@ -538,6 +548,7 @@ class Audio(pykka.ThreadingActor):
:param uri: the URI to play
:type uri: string
"""
self._tags = {} # TODO: add test for this somehow
self._playbin.set_property('uri', uri)
def set_appsrc(
@ -725,6 +736,7 @@ class Audio(pykka.ThreadingActor):
# of faking it in the message handling when result=OK
return True
# TODO: bake this into setup appsrc perhaps?
def set_metadata(self, track):
"""
Set track metadata for currently playing song.
@ -755,5 +767,22 @@ class Audio(pykka.ThreadingActor):
taglist[gst.TAG_ALBUM] = track.album.name
event = gst.event_new_tag(taglist)
# TODO: check if we get this back on our own bus?
self._playbin.send_event(event)
gst_logger.debug('Sent tag event: track=%s', track.uri)
def get_current_tags(self):
"""
Get the currently playing media's tags.
If no tags have been found, or nothing is playing this returns an empty
dictionary. For each set of tags we collect a tags_changed event is
emitted with the keys of the changes tags. After such calls users may
call this function to get the updated values.
:rtype: {key: [values]} dict for the current media.
"""
# TODO: should this be a (deep) copy? most likely yes
# TODO: should we return None when stopped?
# TODO: support only fetching keys we care about?
return self._tags

View File

@ -21,9 +21,11 @@ class DummyAudio(pykka.ThreadingActor):
self._callback = None
self._uri = None
self._state_change_result = True
self._tags = {}
def set_uri(self, uri):
assert self._uri is None, 'prepare change not called before set'
self._tags = {}
self._uri = uri
def set_appsrc(self, *args, **kwargs):
@ -66,6 +68,9 @@ class DummyAudio(pykka.ThreadingActor):
def set_metadata(self, track):
pass
def get_current_tags(self):
return self._tags
def set_about_to_finish_callback(self, callback):
self._callback = callback
@ -91,6 +96,10 @@ class DummyAudio(pykka.ThreadingActor):
AudioListener.send('state_changed', old_state=old_state,
new_state=new_state, target_state=None)
if new_state == PlaybackState.PLAYING:
self._tags['audio-codec'] = [u'fake info...']
AudioListener.send('tags_changed', tags=['audio-codec'])
return self._state_change_result
def trigger_fake_playback_failure(self):
@ -104,6 +113,7 @@ class DummyAudio(pykka.ThreadingActor):
self._callback()
if not self._uri or not self._callback:
self._tags = {}
AudioListener.send('reached_end_of_stream')
else:
AudioListener.send('position_changed', position=0)

View File

@ -75,3 +75,21 @@ class AudioListener(listener.Listener):
field or :class:`None` if this is a final state.
"""
pass
def tags_changed(self, tags):
"""
Called whenever the current audio stream's tags change.
This event signals that some track metadata has been updated. This can
be metadata such as artists, titles, organization, or details about the
actual audio such as bit-rates, numbers of channels etc.
For the available tag keys please refer to GStreamer documentation for
tags.
*MAY* be implemented by actor.
:param tags: The tags that have just been updated.
:type tags: :class:`set` of strings
"""
pass

View File

@ -1,7 +1,5 @@
from __future__ import absolute_import, division, unicode_literals
import datetime
import os
import time
import pygst
@ -9,8 +7,8 @@ pygst.require('0.10')
import gst # noqa
from mopidy import exceptions
from mopidy.models import Album, Artist, Track
from mopidy.utils import encoding, path
from mopidy.audio import utils
from mopidy.utils import encoding
class Scanner(object):
@ -19,13 +17,10 @@ class Scanner(object):
:param timeout: timeout for scanning a URI in ms
:type event: int
:param min_duration: minimum duration of scanned URI in ms, -1 for all.
:type event: int
"""
def __init__(self, timeout=1000, min_duration=100):
def __init__(self, timeout=1000):
self._timeout_ms = timeout
self._min_duration_ms = min_duration
sink = gst.element_factory_make('fakesink')
@ -49,24 +44,19 @@ class Scanner(object):
:param uri: URI of the resource to scan.
:type event: string
:return: Dictionary of tags, duration, mtime and uri information.
:return: (tags, duration) pair. tags is a dictionary of lists for all
the tags we found and duration is the length of the URI in
milliseconds, or :class:`None` if the URI has no duration.
"""
tags, duration = None, None
try:
self._setup(uri)
tags = self._collect() # Ensure collect before queries.
data = {'uri': uri, 'tags': tags,
'mtime': self._query_mtime(uri),
'duration': self._query_duration()}
tags = self._collect()
duration = self._query_duration()
finally:
self._reset()
if self._min_duration_ms is None:
return data
elif data['duration'] >= self._min_duration_ms * gst.MSECOND:
return data
raise exceptions.ScannerError('Rejecting file with less than %dms '
'audio data.' % self._min_duration_ms)
return tags, duration
def _setup(self, uri):
"""Primes the pipeline for collection."""
@ -81,7 +71,7 @@ class Scanner(object):
def _collect(self):
"""Polls for messages to collect data."""
start = time.time()
timeout_s = self._timeout_ms / 1000.
timeout_s = self._timeout_ms / 1000.0
tags = {}
while time.time() - start < timeout_s:
@ -98,16 +88,9 @@ class Scanner(object):
if message.src == self._pipe:
return tags
elif message.type == gst.MESSAGE_TAG:
# Taglists are not really dicts, hence the lack of .items() and
# explicit .keys. We only keep the last tag for each key, as we
# assume this is the best, some formats will produce multiple
# taglists. Lastly we force everything to lists for conformity.
taglist = message.parse_tag()
for key in taglist.keys():
value = taglist[key]
if not isinstance(value, list):
value = [value]
tags[key] = value
# Note that this will only keep the last tag.
tags.update(utils.convert_taglist(taglist))
raise exceptions.ScannerError('Timeout after %dms' % self._timeout_ms)
@ -118,93 +101,11 @@ class Scanner(object):
def _query_duration(self):
try:
return self._pipe.query_duration(gst.FORMAT_TIME, None)[0]
duration = self._pipe.query_duration(gst.FORMAT_TIME, None)[0]
except gst.QueryError:
return None
def _query_mtime(self, uri):
if not uri.startswith('file:'):
if duration < 0:
return None
return os.path.getmtime(path.uri_to_path(uri))
def _artists(tags, artist_name, artist_id=None):
# Name missing, don't set artist
if not tags.get(artist_name):
return None
# One artist name and id, provide artist with id.
if len(tags[artist_name]) == 1 and artist_id in tags:
return [Artist(name=tags[artist_name][0],
musicbrainz_id=tags[artist_id][0])]
# Multiple artist, provide artists without id.
return [Artist(name=name) for name in tags[artist_name]]
def _date(tags):
if not tags.get(gst.TAG_DATE):
return None
try:
date = tags[gst.TAG_DATE][0]
return datetime.date(date.year, date.month, date.day).isoformat()
except ValueError:
return None
def add_musicbrainz_cover_art(track):
if track.album and track.album.musicbrainz_id:
base = "http://coverartarchive.org/release"
images = frozenset(
["{}/{}/front".format(
base,
track.album.musicbrainz_id)])
album = track.album.copy(images=images)
track = track.copy(album=album)
return track
def audio_data_to_track(data):
"""Convert taglist data + our extras to a track."""
tags = data['tags']
album_kwargs = {}
track_kwargs = {}
track_kwargs['composers'] = _artists(tags, gst.TAG_COMPOSER)
track_kwargs['performers'] = _artists(tags, gst.TAG_PERFORMER)
track_kwargs['artists'] = _artists(
tags, gst.TAG_ARTIST, 'musicbrainz-artistid')
album_kwargs['artists'] = _artists(
tags, gst.TAG_ALBUM_ARTIST, 'musicbrainz-albumartistid')
track_kwargs['genre'] = '; '.join(tags.get(gst.TAG_GENRE, []))
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_TITLE, []))
if not track_kwargs['name']:
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_ORGANIZATION, []))
track_kwargs['comment'] = '; '.join(tags.get('comment', []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_LOCATION, []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_COPYRIGHT, []))
track_kwargs['track_no'] = tags.get(gst.TAG_TRACK_NUMBER, [None])[0]
track_kwargs['disc_no'] = tags.get(gst.TAG_ALBUM_VOLUME_NUMBER, [None])[0]
track_kwargs['bitrate'] = tags.get(gst.TAG_BITRATE, [None])[0]
track_kwargs['musicbrainz_id'] = tags.get('musicbrainz-trackid', [None])[0]
album_kwargs['name'] = tags.get(gst.TAG_ALBUM, [None])[0]
album_kwargs['num_tracks'] = tags.get(gst.TAG_TRACK_COUNT, [None])[0]
album_kwargs['num_discs'] = tags.get(gst.TAG_ALBUM_VOLUME_COUNT, [None])[0]
album_kwargs['musicbrainz_id'] = tags.get('musicbrainz-albumid', [None])[0]
track_kwargs['date'] = _date(tags)
track_kwargs['last_modified'] = int(data.get('mtime') or 0)
track_kwargs['length'] = max(
0, (data.get(gst.TAG_DURATION) or 0)) // gst.MSECOND
# Clear out any empty values we found
track_kwargs = {k: v for k, v in track_kwargs.items() if v}
album_kwargs = {k: v for k, v in album_kwargs.items() if v}
track_kwargs['uri'] = data['uri']
track_kwargs['album'] = Album(**album_kwargs)
return Track(**track_kwargs)
else:
return duration // gst.MSECOND

View File

@ -1,10 +1,17 @@
from __future__ import absolute_import, unicode_literals
import datetime
import logging
import numbers
import pygst
pygst.require('0.10')
import gst # noqa
from mopidy import compat
from mopidy.models import Album, Artist, Track
logger = logging.getLogger(__name__)
def calculate_duration(num_samples, sample_rate):
@ -56,3 +63,116 @@ def supported_uri_schemes(uri_schemes):
supported_schemes.add(uri)
return supported_schemes
def _artists(tags, artist_name, artist_id=None):
# Name missing, don't set artist
if not tags.get(artist_name):
return None
# One artist name and id, provide artist with id.
if len(tags[artist_name]) == 1 and artist_id in tags:
return [Artist(name=tags[artist_name][0],
musicbrainz_id=tags[artist_id][0])]
# Multiple artist, provide artists without id.
return [Artist(name=name) for name in tags[artist_name]]
# TODO: split based on "stream" and "track" based conversion? i.e. handle data
# from radios in it's own helper instead?
def convert_tags_to_track(tags):
"""Convert our normalized tags to a track.
:param :class:`dict` tags: dictionary of tag keys with a list of values
:rtype: :class:`mopidy.models.Track`
"""
album_kwargs = {}
track_kwargs = {}
track_kwargs['composers'] = _artists(tags, gst.TAG_COMPOSER)
track_kwargs['performers'] = _artists(tags, gst.TAG_PERFORMER)
track_kwargs['artists'] = _artists(
tags, gst.TAG_ARTIST, 'musicbrainz-artistid')
album_kwargs['artists'] = _artists(
tags, gst.TAG_ALBUM_ARTIST, 'musicbrainz-albumartistid')
track_kwargs['genre'] = '; '.join(tags.get(gst.TAG_GENRE, []))
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_TITLE, []))
if not track_kwargs['name']:
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_ORGANIZATION, []))
track_kwargs['comment'] = '; '.join(tags.get('comment', []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_LOCATION, []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_COPYRIGHT, []))
track_kwargs['track_no'] = tags.get(gst.TAG_TRACK_NUMBER, [None])[0]
track_kwargs['disc_no'] = tags.get(gst.TAG_ALBUM_VOLUME_NUMBER, [None])[0]
track_kwargs['bitrate'] = tags.get(gst.TAG_BITRATE, [None])[0]
track_kwargs['musicbrainz_id'] = tags.get('musicbrainz-trackid', [None])[0]
album_kwargs['name'] = tags.get(gst.TAG_ALBUM, [None])[0]
album_kwargs['num_tracks'] = tags.get(gst.TAG_TRACK_COUNT, [None])[0]
album_kwargs['num_discs'] = tags.get(gst.TAG_ALBUM_VOLUME_COUNT, [None])[0]
album_kwargs['musicbrainz_id'] = tags.get('musicbrainz-albumid', [None])[0]
if tags.get(gst.TAG_DATE) and tags.get(gst.TAG_DATE)[0]:
track_kwargs['date'] = tags[gst.TAG_DATE][0].isoformat()
# Clear out any empty values we found
track_kwargs = {k: v for k, v in track_kwargs.items() if v}
album_kwargs = {k: v for k, v in album_kwargs.items() if v}
# Only bother with album if we have a name to show.
if album_kwargs.get('name'):
track_kwargs['album'] = Album(**album_kwargs)
return Track(**track_kwargs)
def convert_taglist(taglist):
"""Convert a :class:`gst.Taglist` to plain Python types.
Knows how to convert:
- Dates
- Buffers
- Numbers
- Strings
- Booleans
Unknown types will be ignored and debug logged. Tag keys are all strings
defined as part GStreamer under GstTagList_.
.. _GstTagList: http://gstreamer.freedesktop.org/data/doc/gstreamer/\
0.10.36/gstreamer/html/gstreamer-GstTagList.html
:param gst.Taglist taglist: A GStreamer taglist to be converted.
:rtype: dictionary of tag keys with a list of values.
"""
result = {}
# Taglists are not really dicts, hence the lack of .items() and
# explicit use of .keys()
for key in taglist.keys():
result.setdefault(key, [])
values = taglist[key]
if not isinstance(values, list):
values = [values]
for value in values:
if isinstance(value, gst.Date):
try:
date = datetime.date(value.year, value.month, value.day)
result[key].append(date)
except ValueError:
logger.debug('Ignoring invalid date: %r = %r', key, value)
elif isinstance(value, gst.Buffer):
result[key].append(bytes(value))
elif isinstance(value, (basestring, bool, numbers.Number)):
result[key].append(value)
else:
logger.debug('Ignoring unknown data: %r = %r', key, value)
return result

View File

@ -6,13 +6,15 @@ import os
import time
from mopidy import commands, compat, exceptions
from mopidy.audio import scan
from mopidy.audio import scan, utils
from mopidy.local import translator
from mopidy.utils import path
logger = logging.getLogger(__name__)
MIN_DURATION_MS = 100 # Shortest length of track to include.
def _get_library(args, config):
libraries = dict((l.name, l) for l in args.registry['local:library'])
@ -71,9 +73,6 @@ class ScanCommand(commands.Command):
library = _get_library(args, config)
uris_to_update = set()
uris_to_remove = set()
file_mtimes, file_errors = path.find_mtimes(
media_dir, follow=config['local']['scan_follow_symlinks'])
@ -88,14 +87,19 @@ class ScanCommand(commands.Command):
num_tracks = library.load()
logger.info('Checking %d tracks from library.', num_tracks)
uris_to_update = set()
uris_to_remove = set()
uris_in_library = set()
for track in library.begin():
abspath = translator.local_track_uri_to_path(track.uri, media_dir)
mtime = file_mtimes.pop(abspath, None)
mtime = file_mtimes.get(abspath)
if mtime is None:
logger.debug('Missing file %s', track.uri)
uris_to_remove.add(track.uri)
elif mtime > track.last_modified:
uris_to_update.add(track.uri)
uris_in_library.add(track.uri)
logger.info('Removing %d missing tracks.', len(uris_to_remove))
for uri in uris_to_remove:
@ -105,12 +109,11 @@ class ScanCommand(commands.Command):
relpath = os.path.relpath(abspath, media_dir)
uri = translator.path_to_local_track_uri(relpath)
# TODO: move these to a "predicate" check in the finder?
if b'/.' in relpath:
logger.debug('Skipped %s: Hidden directory/file.', uri)
elif relpath.lower().endswith(excluded_file_extensions):
logger.debug('Skipped %s: File extension excluded.', uri)
else:
elif uri not in uris_in_library:
uris_to_update.add(uri)
logger.info(
@ -127,11 +130,18 @@ class ScanCommand(commands.Command):
try:
relpath = translator.local_track_uri_to_path(uri, media_dir)
file_uri = path.path_to_uri(os.path.join(media_dir, relpath))
data = scanner.scan(file_uri)
track = scan.add_musicbrainz_cover_art(
scan.audio_data_to_track(data).copy(uri=uri)).copy(uri=uri)
library.add(track)
logger.debug('Added %s', track.uri)
tags, duration = scanner.scan(file_uri)
if duration < MIN_DURATION_MS:
logger.warning('Failed %s: Track shorter than %dms',
uri, MIN_DURATION_MS)
else:
mtime = file_mtimes.get(os.path.join(media_dir, relpath))
track = utils.convert_tags_to_track(tags).copy(
uri=uri, length=duration, last_modified=mtime)
track = translator.add_musicbrainz_coverart_to_track(track)
# TODO: add tags to call if library supports it.
library.add(track)
logger.debug('Added %s', track.uri)
except exceptions.ScannerError as error:
logger.warning('Failed %s: %s', uri, error)

View File

@ -13,10 +13,19 @@ from mopidy.utils.path import path_to_uri, uri_to_path
M3U_EXTINF_RE = re.compile(r'#EXTINF:(-1|\d+),(.*)')
COVERART_BASE = 'http://coverartarchive.org/release/%s/front'
logger = logging.getLogger(__name__)
def add_musicbrainz_coverart_to_track(track):
if track.album and track.album.musicbrainz_id:
images = [COVERART_BASE % track.album.musicbrainz_id]
album = track.album.copy(images=images)
track = track.copy(album=album)
return track
def local_track_uri_to_file_uri(uri, media_dir):
return path_to_uri(local_track_uri_to_path(uri, media_dir))

View File

@ -8,7 +8,7 @@ import urlparse
import pykka
from mopidy import audio as audio_lib, backend, exceptions
from mopidy.audio import scan
from mopidy.audio import scan, utils
from mopidy.models import Track
logger = logging.getLogger(__name__)
@ -31,7 +31,7 @@ class StreamBackend(pykka.ThreadingActor, backend.Backend):
class StreamLibraryProvider(backend.LibraryProvider):
def __init__(self, backend, timeout, blacklist):
super(StreamLibraryProvider, self).__init__(backend)
self._scanner = scan.Scanner(min_duration=None, timeout=timeout)
self._scanner = scan.Scanner(timeout=timeout)
self._blacklist_re = re.compile(
r'^(%s)$' % '|'.join(fnmatch.translate(u) for u in blacklist))
@ -44,8 +44,9 @@ class StreamLibraryProvider(backend.LibraryProvider):
return [Track(uri=uri)]
try:
data = self._scanner.scan(uri)
track = scan.audio_data_to_track(data)
tags, duration = self._scanner.scan(uri)
track = utils.convert_tags_to_track(tags).copy(
uri=uri, length=duration)
except exceptions.ScannerError as e:
logger.warning('Problem looking up %s: %s', uri, e)
track = Track(uri=uri)

View File

@ -42,7 +42,7 @@ class BaseTest(unittest.TestCase):
audio_class = audio.Audio
def setUp(self):
def setUp(self): # noqa
config = {
'audio': {
'mixer': 'foomixer',
@ -57,7 +57,7 @@ class BaseTest(unittest.TestCase):
self.song_uri = path_to_uri(path_to_data_dir('song1.wav'))
self.audio = self.audio_class.start(config=config, mixer=None).proxy()
def tearDown(self):
def tearDown(self): # noqa
pykka.ActorRegistry.stop_all()
def possibly_trigger_fake_playback_error(self):
@ -135,7 +135,7 @@ class AudioDummyTest(DummyMixin, AudioTest):
@mock.patch.object(audio.AudioListener, 'send')
class AudioEventTest(BaseTest):
def setUp(self):
def setUp(self): # noqa
super(AudioEventTest, self).setUp()
self.audio.enable_sync_handler().get()
@ -292,6 +292,14 @@ class AudioEventTest(BaseTest):
call = mock.call('position_changed', position=2000)
self.assertIn(call, send_mock.call_args_list)
def test_tags_changed_on_playback(self, send_mock):
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.audio.wait_for_state_change().get()
send_mock.assert_any_call('tags_changed', tags=mock.ANY)
# Unlike the other events, having the state changed done is not
# enough to ensure our event is called. So we setup a threading
# event that we can wait for with a timeout while the track playback
@ -330,7 +338,7 @@ class AudioEventTest(BaseTest):
if not event.wait(timeout=1.0):
self.fail('End of stream not reached within deadline')
# Make sure that gapless really works:
self.assertFalse(self.audio.get_current_tags().get())
def test_gapless(self, send_mock):
uris = self.uris[1:]
@ -361,20 +369,74 @@ class AudioEventTest(BaseTest):
if not done.wait(timeout=1.0):
self.fail('EOS not received')
excepted = [
('position_changed', {'position': 0}),
('stream_changed', {'uri': self.uris[0]}),
('state_changed', {'old_state': PlaybackState.STOPPED,
'new_state': PlaybackState.PLAYING,
'target_state': None}),
('position_changed', {'position': 0}),
('stream_changed', {'uri': self.uris[1]}),
('reached_end_of_stream', {})]
self.assertEqual(excepted, events)
# Check that both uris got played
self.assertIn(('stream_changed', {'uri': self.uris[0]}), events)
self.assertIn(('stream_changed', {'uri': self.uris[1]}), events)
# Check that events counts check out.
keys = [k for k, v in events]
self.assertEqual(2, keys.count('stream_changed'))
self.assertEqual(2, keys.count('position_changed'))
self.assertEqual(1, keys.count('state_changed'))
self.assertEqual(1, keys.count('reached_end_of_stream'))
# TODO: test tag states within gaples
def test_current_tags_are_blank_to_begin_with(self, send_mock):
self.assertFalse(self.audio.get_current_tags().get())
def test_current_tags_blank_after_end_of_stream(self, send_mock):
done = threading.Event()
def send(name, **kwargs):
if name == 'reached_end_of_stream':
done.set()
send_mock.side_effect = send
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not done.wait(timeout=1.0):
self.fail('EOS not received')
self.assertFalse(self.audio.get_current_tags().get())
def test_current_tags_stored(self, send_mock):
done = threading.Event()
tags = []
def callback():
tags.append(self.audio.get_current_tags().get())
def send(name, **kwargs):
if name == 'reached_end_of_stream':
done.set()
send_mock.side_effect = send
self.audio.set_about_to_finish_callback(callback).get()
self.audio.prepare_change()
self.audio.set_uri(self.uris[0])
self.audio.start_playback()
self.possibly_trigger_fake_about_to_finish()
self.audio.wait_for_state_change().get()
if not done.wait(timeout=1.0):
self.fail('EOS not received')
self.assertTrue(tags[0])
# TODO: test that we reset when we expect between songs
class AudioDummyEventTest(DummyMixin, AudioEventTest):
pass
"""Exercise the AudioEventTest against our mock audio classes."""
# TODO: move to mixer tests...
@ -399,7 +461,7 @@ class MixerTest(BaseTest):
class AudioStateTest(unittest.TestCase):
def setUp(self):
def setUp(self): # noqa
self.audio = audio.Audio(config=None, mixer=None)
def test_state_starts_as_stopped(self):
@ -444,7 +506,7 @@ class AudioStateTest(unittest.TestCase):
class AudioBufferingTest(unittest.TestCase):
def setUp(self):
def setUp(self): # noqa
self.audio = audio.Audio(config=None, mixer=None)
self.audio._playbin = mock.Mock(spec=['set_state'])

View File

@ -8,7 +8,7 @@ from mopidy import audio
class AudioListenerTest(unittest.TestCase):
def setUp(self):
def setUp(self): # noqa
self.listener = audio.AudioListener()
def test_on_event_forwards_to_specific_handler(self):
@ -32,3 +32,6 @@ class AudioListenerTest(unittest.TestCase):
def test_listener_has_default_impl_for_position_changed(self):
self.listener.position_changed(None)
def test_listener_has_default_impl_for_tags_changed(self):
self.listener.tags_changed([])

View File

@ -8,290 +8,16 @@ gobject.threads_init()
from mopidy import exceptions
from mopidy.audio import scan
from mopidy.models import Album, Artist, Track
from mopidy.utils import path as path_lib
from tests import path_to_data_dir
class FakeGstDate(object):
def __init__(self, year, month, day):
self.year = year
self.month = month
self.day = day
# TODO: keep ids without name?
class TranslatorTest(unittest.TestCase):
def setUp(self):
self.data = {
'uri': 'uri',
'duration': 4531000000,
'mtime': 1234,
'tags': {
'album': ['album'],
'track-number': [1],
'artist': ['artist'],
'composer': ['composer'],
'performer': ['performer'],
'album-artist': ['albumartist'],
'title': ['track'],
'track-count': [2],
'album-disc-number': [2],
'album-disc-count': [3],
'date': [FakeGstDate(2006, 1, 1,)],
'container-format': ['ID3 tag'],
'genre': ['genre'],
'comment': ['comment'],
'musicbrainz-trackid': ['trackid'],
'musicbrainz-albumid': ['albumid'],
'musicbrainz-artistid': ['artistid'],
'musicbrainz-albumartistid': ['albumartistid'],
'bitrate': [1000],
},
}
artist = Artist(name='artist', musicbrainz_id='artistid')
composer = Artist(name='composer')
performer = Artist(name='performer')
albumartist = Artist(name='albumartist',
musicbrainz_id='albumartistid')
album = Album(name='album', num_tracks=2, num_discs=3,
musicbrainz_id='albumid', artists=[albumartist])
self.track = Track(uri='uri', name='track', date='2006-01-01',
genre='genre', track_no=1, disc_no=2, length=4531,
comment='comment', musicbrainz_id='trackid',
last_modified=1234, album=album, bitrate=1000,
artists=[artist], composers=[composer],
performers=[performer])
def check(self, expected):
actual = scan.audio_data_to_track(self.data)
self.assertEqual(expected, actual)
def check_local(self, expected):
actual = scan.add_musicbrainz_cover_art(
scan.audio_data_to_track(self.data))
self.assertEqual(expected, actual)
def test_track(self):
self.check(self.track)
def test_none_track_length(self):
self.data['duration'] = None
self.check(self.track.copy(length=None))
def test_none_track_last_modified(self):
self.data['mtime'] = None
self.check(self.track.copy(last_modified=None))
def test_missing_track_no(self):
del self.data['tags']['track-number']
self.check(self.track.copy(track_no=None))
def test_multiple_track_no(self):
self.data['tags']['track-number'].append(9)
self.check(self.track)
def test_missing_track_disc_no(self):
del self.data['tags']['album-disc-number']
self.check(self.track.copy(disc_no=None))
def test_multiple_track_disc_no(self):
self.data['tags']['album-disc-number'].append(9)
self.check(self.track)
def test_missing_track_name(self):
del self.data['tags']['title']
self.check(self.track.copy(name=None))
def test_multiple_track_name(self):
self.data['tags']['title'] = ['name1', 'name2']
self.check(self.track.copy(name='name1; name2'))
def test_missing_track_musicbrainz_id(self):
del self.data['tags']['musicbrainz-trackid']
self.check(self.track.copy(musicbrainz_id=None))
def test_multiple_track_musicbrainz_id(self):
self.data['tags']['musicbrainz-trackid'].append('id')
self.check(self.track)
def test_missing_track_bitrate(self):
del self.data['tags']['bitrate']
self.check(self.track.copy(bitrate=None))
def test_multiple_track_bitrate(self):
self.data['tags']['bitrate'].append(1234)
self.check(self.track)
def test_missing_track_genre(self):
del self.data['tags']['genre']
self.check(self.track.copy(genre=None))
def test_multiple_track_genre(self):
self.data['tags']['genre'] = ['genre1', 'genre2']
self.check(self.track.copy(genre='genre1; genre2'))
def test_missing_track_date(self):
del self.data['tags']['date']
self.check(self.track.copy(date=None))
def test_multiple_track_date(self):
self.data['tags']['date'].append(FakeGstDate(2030, 1, 1))
self.check(self.track)
def test_invalid_track_date(self):
self.data['tags']['date'] = [FakeGstDate(65535, 1, 1)]
self.check(self.track.copy(date=None))
def test_missing_track_comment(self):
del self.data['tags']['comment']
self.check(self.track.copy(comment=None))
def test_multiple_track_comment(self):
self.data['tags']['comment'] = ['comment1', 'comment2']
self.check(self.track.copy(comment='comment1; comment2'))
def test_missing_track_artist_name(self):
del self.data['tags']['artist']
self.check(self.track.copy(artists=[]))
def test_multiple_track_artist_name(self):
self.data['tags']['artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
self.check(self.track.copy(artists=artists))
def test_missing_track_artist_musicbrainz_id(self):
del self.data['tags']['musicbrainz-artistid']
artist = list(self.track.artists)[0].copy(musicbrainz_id=None)
self.check(self.track.copy(artists=[artist]))
def test_multiple_track_artist_musicbrainz_id(self):
self.data['tags']['musicbrainz-artistid'].append('id')
self.check(self.track)
def test_missing_track_composer_name(self):
del self.data['tags']['composer']
self.check(self.track.copy(composers=[]))
def test_multiple_track_composer_name(self):
self.data['tags']['composer'] = ['composer1', 'composer2']
composers = [Artist(name='composer1'), Artist(name='composer2')]
self.check(self.track.copy(composers=composers))
def test_missing_track_performer_name(self):
del self.data['tags']['performer']
self.check(self.track.copy(performers=[]))
def test_multiple_track_performe_name(self):
self.data['tags']['performer'] = ['performer1', 'performer2']
performers = [Artist(name='performer1'), Artist(name='performer2')]
self.check(self.track.copy(performers=performers))
def test_missing_album_name(self):
del self.data['tags']['album']
album = self.track.album.copy(name=None)
self.check(self.track.copy(album=album))
def test_multiple_album_name(self):
self.data['tags']['album'].append('album2')
self.check(self.track)
def test_missing_album_musicbrainz_id(self):
del self.data['tags']['musicbrainz-albumid']
album = self.track.album.copy(musicbrainz_id=None,
images=[])
self.check(self.track.copy(album=album))
def test_multiple_album_musicbrainz_id(self):
self.data['tags']['musicbrainz-albumid'].append('id')
self.check(self.track)
def test_album_musicbrainz_id_cover(self):
album = self.track.album.copy(
images=frozenset(
['http://coverartarchive.org/release/albumid/front']))
self.check_local(self.track.copy(album=album))
def test_missing_album_num_tracks(self):
del self.data['tags']['track-count']
album = self.track.album.copy(num_tracks=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_tracks(self):
self.data['tags']['track-count'].append(9)
self.check(self.track)
def test_missing_album_num_discs(self):
del self.data['tags']['album-disc-count']
album = self.track.album.copy(num_discs=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_discs(self):
self.data['tags']['album-disc-count'].append(9)
self.check(self.track)
def test_missing_album_artist_name(self):
del self.data['tags']['album-artist']
album = self.track.album.copy(artists=[])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_name(self):
self.data['tags']['album-artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
album = self.track.album.copy(artists=artists)
self.check(self.track.copy(album=album))
def test_missing_album_artist_musicbrainz_id(self):
del self.data['tags']['musicbrainz-albumartistid']
albumartist = list(self.track.album.artists)[0]
albumartist = albumartist.copy(musicbrainz_id=None)
album = self.track.album.copy(artists=[albumartist])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_musicbrainz_id(self):
self.data['tags']['musicbrainz-albumartistid'].append('id')
self.check(self.track)
def test_stream_organization_track_name(self):
del self.data['tags']['title']
self.data['tags']['organization'] = ['organization']
self.check(self.track.copy(name='organization'))
def test_multiple_organization_track_name(self):
del self.data['tags']['title']
self.data['tags']['organization'] = ['organization1', 'organization2']
self.check(self.track.copy(name='organization1; organization2'))
# TODO: combine all comment types?
def test_stream_location_track_comment(self):
del self.data['tags']['comment']
self.data['tags']['location'] = ['location']
self.check(self.track.copy(comment='location'))
def test_multiple_location_track_comment(self):
del self.data['tags']['comment']
self.data['tags']['location'] = ['location1', 'location2']
self.check(self.track.copy(comment='location1; location2'))
def test_stream_copyright_track_comment(self):
del self.data['tags']['comment']
self.data['tags']['copyright'] = ['copyright']
self.check(self.track.copy(comment='copyright'))
def test_multiple_copyright_track_comment(self):
del self.data['tags']['comment']
self.data['tags']['copyright'] = ['copyright1', 'copyright2']
self.check(self.track.copy(comment='copyright1; copyright2'))
class ScannerTest(unittest.TestCase):
def setUp(self):
def setUp(self): # noqa
self.errors = {}
self.data = {}
self.tags = {}
self.durations = {}
def find(self, path):
media_dir = path_to_data_dir(path)
@ -305,54 +31,46 @@ class ScannerTest(unittest.TestCase):
uri = path_lib.path_to_uri(path)
key = uri[len('file://'):]
try:
self.data[key] = scanner.scan(uri)
tags, duration = scanner.scan(uri)
self.tags[key] = tags
self.durations[key] = duration
except exceptions.ScannerError as error:
self.errors[key] = error
def check(self, name, key, value):
name = path_to_data_dir(name)
self.assertEqual(self.data[name][key], value)
self.assertEqual(self.tags[name][key], value)
def check_tag(self, name, key, value):
name = path_to_data_dir(name)
self.assertEqual(self.data[name]['tags'][key], value)
def test_data_is_set(self):
def test_tags_is_set(self):
self.scan(self.find('scanner/simple'))
self.assert_(self.data)
self.assert_(self.tags)
def test_errors_is_not_set(self):
self.scan(self.find('scanner/simple'))
self.assert_(not self.errors)
def test_uri_is_set(self):
self.scan(self.find('scanner/simple'))
self.check(
'scanner/simple/song1.mp3', 'uri',
'file://%s' % path_to_data_dir('scanner/simple/song1.mp3'))
self.check(
'scanner/simple/song1.ogg', 'uri',
'file://%s' % path_to_data_dir('scanner/simple/song1.ogg'))
def test_duration_is_set(self):
self.scan(self.find('scanner/simple'))
self.check('scanner/simple/song1.mp3', 'duration', 4680000000)
self.check('scanner/simple/song1.ogg', 'duration', 4680000000)
self.assertEqual(
self.durations[path_to_data_dir('scanner/simple/song1.mp3')], 4680)
self.assertEqual(
self.durations[path_to_data_dir('scanner/simple/song1.ogg')], 4680)
def test_artist_is_set(self):
self.scan(self.find('scanner/simple'))
self.check_tag('scanner/simple/song1.mp3', 'artist', ['name'])
self.check_tag('scanner/simple/song1.ogg', 'artist', ['name'])
self.check('scanner/simple/song1.mp3', 'artist', ['name'])
self.check('scanner/simple/song1.ogg', 'artist', ['name'])
def test_album_is_set(self):
self.scan(self.find('scanner/simple'))
self.check_tag('scanner/simple/song1.mp3', 'album', ['albumname'])
self.check_tag('scanner/simple/song1.ogg', 'album', ['albumname'])
self.check('scanner/simple/song1.mp3', 'album', ['albumname'])
self.check('scanner/simple/song1.ogg', 'album', ['albumname'])
def test_track_is_set(self):
self.scan(self.find('scanner/simple'))
self.check_tag('scanner/simple/song1.mp3', 'title', ['trackname'])
self.check_tag('scanner/simple/song1.ogg', 'title', ['trackname'])
self.check('scanner/simple/song1.mp3', 'title', ['trackname'])
self.check('scanner/simple/song1.ogg', 'title', ['trackname'])
def test_nonexistant_dir_does_not_fail(self):
self.scan(self.find('scanner/does-not-exist'))
@ -364,11 +82,13 @@ class ScannerTest(unittest.TestCase):
def test_log_file_that_gst_thinks_is_mpeg_1_is_ignored(self):
self.scan([path_to_data_dir('scanner/example.log')])
self.assert_(self.errors)
self.assertLess(
self.durations[path_to_data_dir('scanner/example.log')], 100)
def test_empty_wav_file_is_ignored(self):
def test_empty_wav_file(self):
self.scan([path_to_data_dir('scanner/empty.wav')])
self.assert_(self.errors)
self.assertEqual(
self.durations[path_to_data_dir('scanner/empty.wav')], 0)
@unittest.SkipTest
def test_song_without_time_is_handeled(self):

246
tests/audio/test_utils.py Normal file
View File

@ -0,0 +1,246 @@
from __future__ import absolute_import, unicode_literals
import datetime
import unittest
from mopidy.audio import utils
from mopidy.models import Album, Artist, Track
# TODO: keep ids without name?
# TODO: current test is trying to test everything at once with a complete tags
# set, instead we might want to try with a minimal one making testing easier.
class TagsToTrackTest(unittest.TestCase):
def setUp(self): # noqa
self.tags = {
'album': ['album'],
'track-number': [1],
'artist': ['artist'],
'composer': ['composer'],
'performer': ['performer'],
'album-artist': ['albumartist'],
'title': ['track'],
'track-count': [2],
'album-disc-number': [2],
'album-disc-count': [3],
'date': [datetime.date(2006, 1, 1,)],
'container-format': ['ID3 tag'],
'genre': ['genre'],
'comment': ['comment'],
'musicbrainz-trackid': ['trackid'],
'musicbrainz-albumid': ['albumid'],
'musicbrainz-artistid': ['artistid'],
'musicbrainz-albumartistid': ['albumartistid'],
'bitrate': [1000],
}
artist = Artist(name='artist', musicbrainz_id='artistid')
composer = Artist(name='composer')
performer = Artist(name='performer')
albumartist = Artist(name='albumartist',
musicbrainz_id='albumartistid')
album = Album(name='album', num_tracks=2, num_discs=3,
musicbrainz_id='albumid', artists=[albumartist])
self.track = Track(name='track', date='2006-01-01',
genre='genre', track_no=1, disc_no=2,
comment='comment', musicbrainz_id='trackid',
album=album, bitrate=1000, artists=[artist],
composers=[composer], performers=[performer])
def check(self, expected):
actual = utils.convert_tags_to_track(self.tags)
self.assertEqual(expected, actual)
def test_track(self):
self.check(self.track)
def test_missing_track_no(self):
del self.tags['track-number']
self.check(self.track.copy(track_no=None))
def test_multiple_track_no(self):
self.tags['track-number'].append(9)
self.check(self.track)
def test_missing_track_disc_no(self):
del self.tags['album-disc-number']
self.check(self.track.copy(disc_no=None))
def test_multiple_track_disc_no(self):
self.tags['album-disc-number'].append(9)
self.check(self.track)
def test_missing_track_name(self):
del self.tags['title']
self.check(self.track.copy(name=None))
def test_multiple_track_name(self):
self.tags['title'] = ['name1', 'name2']
self.check(self.track.copy(name='name1; name2'))
def test_missing_track_musicbrainz_id(self):
del self.tags['musicbrainz-trackid']
self.check(self.track.copy(musicbrainz_id=None))
def test_multiple_track_musicbrainz_id(self):
self.tags['musicbrainz-trackid'].append('id')
self.check(self.track)
def test_missing_track_bitrate(self):
del self.tags['bitrate']
self.check(self.track.copy(bitrate=None))
def test_multiple_track_bitrate(self):
self.tags['bitrate'].append(1234)
self.check(self.track)
def test_missing_track_genre(self):
del self.tags['genre']
self.check(self.track.copy(genre=None))
def test_multiple_track_genre(self):
self.tags['genre'] = ['genre1', 'genre2']
self.check(self.track.copy(genre='genre1; genre2'))
def test_missing_track_date(self):
del self.tags['date']
self.check(self.track.copy(date=None))
def test_multiple_track_date(self):
self.tags['date'].append(datetime.date(2030, 1, 1))
self.check(self.track)
def test_missing_track_comment(self):
del self.tags['comment']
self.check(self.track.copy(comment=None))
def test_multiple_track_comment(self):
self.tags['comment'] = ['comment1', 'comment2']
self.check(self.track.copy(comment='comment1; comment2'))
def test_missing_track_artist_name(self):
del self.tags['artist']
self.check(self.track.copy(artists=[]))
def test_multiple_track_artist_name(self):
self.tags['artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
self.check(self.track.copy(artists=artists))
def test_missing_track_artist_musicbrainz_id(self):
del self.tags['musicbrainz-artistid']
artist = list(self.track.artists)[0].copy(musicbrainz_id=None)
self.check(self.track.copy(artists=[artist]))
def test_multiple_track_artist_musicbrainz_id(self):
self.tags['musicbrainz-artistid'].append('id')
self.check(self.track)
def test_missing_track_composer_name(self):
del self.tags['composer']
self.check(self.track.copy(composers=[]))
def test_multiple_track_composer_name(self):
self.tags['composer'] = ['composer1', 'composer2']
composers = [Artist(name='composer1'), Artist(name='composer2')]
self.check(self.track.copy(composers=composers))
def test_missing_track_performer_name(self):
del self.tags['performer']
self.check(self.track.copy(performers=[]))
def test_multiple_track_performe_name(self):
self.tags['performer'] = ['performer1', 'performer2']
performers = [Artist(name='performer1'), Artist(name='performer2')]
self.check(self.track.copy(performers=performers))
def test_missing_album_name(self):
del self.tags['album']
self.check(self.track.copy(album=None))
def test_multiple_album_name(self):
self.tags['album'].append('album2')
self.check(self.track)
def test_missing_album_musicbrainz_id(self):
del self.tags['musicbrainz-albumid']
album = self.track.album.copy(musicbrainz_id=None,
images=[])
self.check(self.track.copy(album=album))
def test_multiple_album_musicbrainz_id(self):
self.tags['musicbrainz-albumid'].append('id')
self.check(self.track)
def test_missing_album_num_tracks(self):
del self.tags['track-count']
album = self.track.album.copy(num_tracks=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_tracks(self):
self.tags['track-count'].append(9)
self.check(self.track)
def test_missing_album_num_discs(self):
del self.tags['album-disc-count']
album = self.track.album.copy(num_discs=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_discs(self):
self.tags['album-disc-count'].append(9)
self.check(self.track)
def test_missing_album_artist_name(self):
del self.tags['album-artist']
album = self.track.album.copy(artists=[])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_name(self):
self.tags['album-artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
album = self.track.album.copy(artists=artists)
self.check(self.track.copy(album=album))
def test_missing_album_artist_musicbrainz_id(self):
del self.tags['musicbrainz-albumartistid']
albumartist = list(self.track.album.artists)[0]
albumartist = albumartist.copy(musicbrainz_id=None)
album = self.track.album.copy(artists=[albumartist])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_musicbrainz_id(self):
self.tags['musicbrainz-albumartistid'].append('id')
self.check(self.track)
def test_stream_organization_track_name(self):
del self.tags['title']
self.tags['organization'] = ['organization']
self.check(self.track.copy(name='organization'))
def test_multiple_organization_track_name(self):
del self.tags['title']
self.tags['organization'] = ['organization1', 'organization2']
self.check(self.track.copy(name='organization1; organization2'))
# TODO: combine all comment types?
def test_stream_location_track_comment(self):
del self.tags['comment']
self.tags['location'] = ['location']
self.check(self.track.copy(comment='location'))
def test_multiple_location_track_comment(self):
del self.tags['comment']
self.tags['location'] = ['location1', 'location2']
self.check(self.track.copy(comment='location1; location2'))
def test_stream_copyright_track_comment(self):
del self.tags['comment']
self.tags['copyright'] = ['copyright']
self.check(self.track.copy(comment='copyright'))
def test_multiple_copyright_track_comment(self):
del self.tags['comment']
self.tags['copyright'] = ['copyright1', 'copyright2']
self.check(self.track.copy(comment='copyright1; copyright2'))

View File

@ -6,9 +6,9 @@ import os
import tempfile
import unittest
from mopidy.local.translator import parse_m3u
from mopidy.models import Track
from mopidy.utils.path import path_to_uri
from mopidy.local import translator
from mopidy.models import Album, Track
from mopidy.utils import path
from tests import path_to_data_dir
@ -16,9 +16,9 @@ data_dir = path_to_data_dir('')
song1_path = path_to_data_dir('song1.mp3')
song2_path = path_to_data_dir('song2.mp3')
encoded_path = path_to_data_dir('æøå.mp3')
song1_uri = path_to_uri(song1_path)
song2_uri = path_to_uri(song2_path)
encoded_uri = path_to_uri(encoded_path)
song1_uri = path.path_to_uri(song1_path)
song2_uri = path.path_to_uri(song2_path)
encoded_uri = path.path_to_uri(encoded_path)
song1_track = Track(uri=song1_uri)
song2_track = Track(uri=song2_uri)
encoded_track = Track(uri=encoded_uri)
@ -30,23 +30,26 @@ encoded_ext_track = encoded_track.copy(name='æøå')
# FIXME use mock instead of tempfile.NamedTemporaryFile
class M3UToUriTest(unittest.TestCase):
def parse(self, name):
return translator.parse_m3u(name, data_dir)
def test_empty_file(self):
tracks = parse_m3u(path_to_data_dir('empty.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('empty.m3u'))
self.assertEqual([], tracks)
def test_basic_file(self):
tracks = parse_m3u(path_to_data_dir('one.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('one.m3u'))
self.assertEqual([song1_track], tracks)
def test_file_with_comment(self):
tracks = parse_m3u(path_to_data_dir('comment.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('comment.m3u'))
self.assertEqual([song1_track], tracks)
def test_file_is_relative_to_correct_dir(self):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write('song1.mp3')
try:
tracks = parse_m3u(tmp.name, data_dir)
tracks = self.parse(tmp.name)
self.assertEqual([song1_track], tracks)
finally:
if os.path.exists(tmp.name):
@ -56,7 +59,7 @@ class M3UToUriTest(unittest.TestCase):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(song1_path)
try:
tracks = parse_m3u(tmp.name, data_dir)
tracks = self.parse(tmp.name)
self.assertEqual([song1_track], tracks)
finally:
if os.path.exists(tmp.name):
@ -68,7 +71,7 @@ class M3UToUriTest(unittest.TestCase):
tmp.write('# comment \n')
tmp.write(song2_path)
try:
tracks = parse_m3u(tmp.name, data_dir)
tracks = self.parse(tmp.name)
self.assertEqual([song1_track, song2_track], tracks)
finally:
if os.path.exists(tmp.name):
@ -78,40 +81,53 @@ class M3UToUriTest(unittest.TestCase):
with tempfile.NamedTemporaryFile(delete=False) as tmp:
tmp.write(song1_uri)
try:
tracks = parse_m3u(tmp.name, data_dir)
tracks = self.parse(tmp.name)
self.assertEqual([song1_track], tracks)
finally:
if os.path.exists(tmp.name):
os.remove(tmp.name)
def test_encoding_is_latin1(self):
tracks = parse_m3u(path_to_data_dir('encoding.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('encoding.m3u'))
self.assertEqual([encoded_track], tracks)
def test_open_missing_file(self):
tracks = parse_m3u(path_to_data_dir('non-existant.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('non-existant.m3u'))
self.assertEqual([], tracks)
def test_empty_ext_file(self):
tracks = parse_m3u(path_to_data_dir('empty-ext.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('empty-ext.m3u'))
self.assertEqual([], tracks)
def test_basic_ext_file(self):
tracks = parse_m3u(path_to_data_dir('one-ext.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('one-ext.m3u'))
self.assertEqual([song1_ext_track], tracks)
def test_multi_ext_file(self):
tracks = parse_m3u(path_to_data_dir('two-ext.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('two-ext.m3u'))
self.assertEqual([song1_ext_track, song2_ext_track], tracks)
def test_ext_file_with_comment(self):
tracks = parse_m3u(path_to_data_dir('comment-ext.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('comment-ext.m3u'))
self.assertEqual([song1_ext_track], tracks)
def test_ext_encoding_is_latin1(self):
tracks = parse_m3u(path_to_data_dir('encoding-ext.m3u'), data_dir)
tracks = self.parse(path_to_data_dir('encoding-ext.m3u'))
self.assertEqual([encoded_ext_track], tracks)
class URItoM3UTest(unittest.TestCase):
pass
class AddMusicbrainzCoverartTest(unittest.TestCase):
def test_add_cover_for_album(self):
album = Album(musicbrainz_id='someid')
track = Track(album=album)
expected = album.copy(
images=['http://coverartarchive.org/release/someid/front'])
self.assertEqual(
track.copy(album=expected),
translator.add_musicbrainz_coverart_to_track(track))

0
tests/stream/__init__.py Normal file
View File

View File

@ -0,0 +1,43 @@
from __future__ import absolute_import, unicode_literals
import unittest
import gobject
gobject.threads_init()
import pygst
pygst.require('0.10')
import gst # noqa: pygst magic is needed to import correct gst
import mock
from mopidy.models import Track
from mopidy.stream import actor
from mopidy.utils.path import path_to_uri
from tests import path_to_data_dir
class LibraryProviderTest(unittest.TestCase):
def setUp(self): # noqa: ignore method must be lowercase
self.backend = mock.Mock()
self.backend.uri_schemes = ['file']
self.uri = path_to_uri(path_to_data_dir('song1.wav'))
def test_lookup_ignores_unknown_scheme(self):
library = actor.StreamLibraryProvider(self.backend, 1000, [])
self.assertFalse(library.lookup('http://example.com'))
def test_lookup_respects_blacklist(self):
library = actor.StreamLibraryProvider(self.backend, 100, [self.uri])
self.assertEqual([Track(uri=self.uri)], library.lookup(self.uri))
def test_lookup_respects_blacklist_globbing(self):
blacklist = [path_to_uri(path_to_data_dir('')) + '*']
library = actor.StreamLibraryProvider(self.backend, 100, blacklist)
self.assertEqual([Track(uri=self.uri)], library.lookup(self.uri))
def test_lookup_converts_uri_metadata_to_track(self):
library = actor.StreamLibraryProvider(self.backend, 100, [])
self.assertEqual([Track(length=4406, uri=self.uri)],
library.lookup(self.uri))