audio: Move tags to track conversion to audio utils

This commit is contained in:
Thomas Adamcik 2014-12-18 23:36:56 +01:00
parent 983148a9a4
commit 9be788b129
6 changed files with 312 additions and 300 deletions

View File

@ -8,7 +8,6 @@ import gst # noqa
from mopidy import exceptions
from mopidy.audio import utils
from mopidy.models import Album, Artist, Track
from mopidy.utils import encoding
@ -110,58 +109,3 @@ class Scanner(object):
return None
else:
return duration // gst.MSECOND
def _artists(tags, artist_name, artist_id=None):
# Name missing, don't set artist
if not tags.get(artist_name):
return None
# One artist name and id, provide artist with id.
if len(tags[artist_name]) == 1 and artist_id in tags:
return [Artist(name=tags[artist_name][0],
musicbrainz_id=tags[artist_id][0])]
# Multiple artist, provide artists without id.
return [Artist(name=name) for name in tags[artist_name]]
def tags_to_track(tags):
album_kwargs = {}
track_kwargs = {}
track_kwargs['composers'] = _artists(tags, gst.TAG_COMPOSER)
track_kwargs['performers'] = _artists(tags, gst.TAG_PERFORMER)
track_kwargs['artists'] = _artists(
tags, gst.TAG_ARTIST, 'musicbrainz-artistid')
album_kwargs['artists'] = _artists(
tags, gst.TAG_ALBUM_ARTIST, 'musicbrainz-albumartistid')
track_kwargs['genre'] = '; '.join(tags.get(gst.TAG_GENRE, []))
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_TITLE, []))
if not track_kwargs['name']:
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_ORGANIZATION, []))
track_kwargs['comment'] = '; '.join(tags.get('comment', []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_LOCATION, []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_COPYRIGHT, []))
track_kwargs['track_no'] = tags.get(gst.TAG_TRACK_NUMBER, [None])[0]
track_kwargs['disc_no'] = tags.get(gst.TAG_ALBUM_VOLUME_NUMBER, [None])[0]
track_kwargs['bitrate'] = tags.get(gst.TAG_BITRATE, [None])[0]
track_kwargs['musicbrainz_id'] = tags.get('musicbrainz-trackid', [None])[0]
album_kwargs['name'] = tags.get(gst.TAG_ALBUM, [None])[0]
album_kwargs['num_tracks'] = tags.get(gst.TAG_TRACK_COUNT, [None])[0]
album_kwargs['num_discs'] = tags.get(gst.TAG_ALBUM_VOLUME_COUNT, [None])[0]
album_kwargs['musicbrainz_id'] = tags.get('musicbrainz-albumid', [None])[0]
if tags.get(gst.TAG_DATE) and tags.get(gst.TAG_DATE)[0]:
track_kwargs['date'] = tags[gst.TAG_DATE][0].isoformat()
# Clear out any empty values we found
track_kwargs = {k: v for k, v in track_kwargs.items() if v}
album_kwargs = {k: v for k, v in album_kwargs.items() if v}
track_kwargs['album'] = Album(**album_kwargs)
return Track(**track_kwargs)

245
mopidy/audio/test_utils.py Normal file
View File

@ -0,0 +1,245 @@
from __future__ import absolute_import, unicode_literals
import datetime
import unittest
from mopidy.audio import utils
from mopidy.models import Album, Artist, Track
# TODO: keep ids without name?
class TagsToTrackTest(unittest.TestCase):
def setUp(self): # noqa
self.tags = {
'album': ['album'],
'track-number': [1],
'artist': ['artist'],
'composer': ['composer'],
'performer': ['performer'],
'album-artist': ['albumartist'],
'title': ['track'],
'track-count': [2],
'album-disc-number': [2],
'album-disc-count': [3],
'date': [datetime.date(2006, 1, 1,)],
'container-format': ['ID3 tag'],
'genre': ['genre'],
'comment': ['comment'],
'musicbrainz-trackid': ['trackid'],
'musicbrainz-albumid': ['albumid'],
'musicbrainz-artistid': ['artistid'],
'musicbrainz-albumartistid': ['albumartistid'],
'bitrate': [1000],
}
artist = Artist(name='artist', musicbrainz_id='artistid')
composer = Artist(name='composer')
performer = Artist(name='performer')
albumartist = Artist(name='albumartist',
musicbrainz_id='albumartistid')
album = Album(name='album', num_tracks=2, num_discs=3,
musicbrainz_id='albumid', artists=[albumartist])
self.track = Track(name='track', date='2006-01-01',
genre='genre', track_no=1, disc_no=2,
comment='comment', musicbrainz_id='trackid',
album=album, bitrate=1000, artists=[artist],
composers=[composer], performers=[performer])
def check(self, expected):
actual = utils.convert_tags_to_track(self.tags)
self.assertEqual(expected, actual)
def test_track(self):
self.check(self.track)
def test_missing_track_no(self):
del self.tags['track-number']
self.check(self.track.copy(track_no=None))
def test_multiple_track_no(self):
self.tags['track-number'].append(9)
self.check(self.track)
def test_missing_track_disc_no(self):
del self.tags['album-disc-number']
self.check(self.track.copy(disc_no=None))
def test_multiple_track_disc_no(self):
self.tags['album-disc-number'].append(9)
self.check(self.track)
def test_missing_track_name(self):
del self.tags['title']
self.check(self.track.copy(name=None))
def test_multiple_track_name(self):
self.tags['title'] = ['name1', 'name2']
self.check(self.track.copy(name='name1; name2'))
def test_missing_track_musicbrainz_id(self):
del self.tags['musicbrainz-trackid']
self.check(self.track.copy(musicbrainz_id=None))
def test_multiple_track_musicbrainz_id(self):
self.tags['musicbrainz-trackid'].append('id')
self.check(self.track)
def test_missing_track_bitrate(self):
del self.tags['bitrate']
self.check(self.track.copy(bitrate=None))
def test_multiple_track_bitrate(self):
self.tags['bitrate'].append(1234)
self.check(self.track)
def test_missing_track_genre(self):
del self.tags['genre']
self.check(self.track.copy(genre=None))
def test_multiple_track_genre(self):
self.tags['genre'] = ['genre1', 'genre2']
self.check(self.track.copy(genre='genre1; genre2'))
def test_missing_track_date(self):
del self.tags['date']
self.check(self.track.copy(date=None))
def test_multiple_track_date(self):
self.tags['date'].append(datetime.date(2030, 1, 1))
self.check(self.track)
def test_missing_track_comment(self):
del self.tags['comment']
self.check(self.track.copy(comment=None))
def test_multiple_track_comment(self):
self.tags['comment'] = ['comment1', 'comment2']
self.check(self.track.copy(comment='comment1; comment2'))
def test_missing_track_artist_name(self):
del self.tags['artist']
self.check(self.track.copy(artists=[]))
def test_multiple_track_artist_name(self):
self.tags['artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
self.check(self.track.copy(artists=artists))
def test_missing_track_artist_musicbrainz_id(self):
del self.tags['musicbrainz-artistid']
artist = list(self.track.artists)[0].copy(musicbrainz_id=None)
self.check(self.track.copy(artists=[artist]))
def test_multiple_track_artist_musicbrainz_id(self):
self.tags['musicbrainz-artistid'].append('id')
self.check(self.track)
def test_missing_track_composer_name(self):
del self.tags['composer']
self.check(self.track.copy(composers=[]))
def test_multiple_track_composer_name(self):
self.tags['composer'] = ['composer1', 'composer2']
composers = [Artist(name='composer1'), Artist(name='composer2')]
self.check(self.track.copy(composers=composers))
def test_missing_track_performer_name(self):
del self.tags['performer']
self.check(self.track.copy(performers=[]))
def test_multiple_track_performe_name(self):
self.tags['performer'] = ['performer1', 'performer2']
performers = [Artist(name='performer1'), Artist(name='performer2')]
self.check(self.track.copy(performers=performers))
def test_missing_album_name(self):
del self.tags['album']
album = self.track.album.copy(name=None)
self.check(self.track.copy(album=album))
def test_multiple_album_name(self):
self.tags['album'].append('album2')
self.check(self.track)
def test_missing_album_musicbrainz_id(self):
del self.tags['musicbrainz-albumid']
album = self.track.album.copy(musicbrainz_id=None,
images=[])
self.check(self.track.copy(album=album))
def test_multiple_album_musicbrainz_id(self):
self.tags['musicbrainz-albumid'].append('id')
self.check(self.track)
def test_missing_album_num_tracks(self):
del self.tags['track-count']
album = self.track.album.copy(num_tracks=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_tracks(self):
self.tags['track-count'].append(9)
self.check(self.track)
def test_missing_album_num_discs(self):
del self.tags['album-disc-count']
album = self.track.album.copy(num_discs=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_discs(self):
self.tags['album-disc-count'].append(9)
self.check(self.track)
def test_missing_album_artist_name(self):
del self.tags['album-artist']
album = self.track.album.copy(artists=[])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_name(self):
self.tags['album-artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
album = self.track.album.copy(artists=artists)
self.check(self.track.copy(album=album))
def test_missing_album_artist_musicbrainz_id(self):
del self.tags['musicbrainz-albumartistid']
albumartist = list(self.track.album.artists)[0]
albumartist = albumartist.copy(musicbrainz_id=None)
album = self.track.album.copy(artists=[albumartist])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_musicbrainz_id(self):
self.tags['musicbrainz-albumartistid'].append('id')
self.check(self.track)
def test_stream_organization_track_name(self):
del self.tags['title']
self.tags['organization'] = ['organization']
self.check(self.track.copy(name='organization'))
def test_multiple_organization_track_name(self):
del self.tags['title']
self.tags['organization'] = ['organization1', 'organization2']
self.check(self.track.copy(name='organization1; organization2'))
# TODO: combine all comment types?
def test_stream_location_track_comment(self):
del self.tags['comment']
self.tags['location'] = ['location']
self.check(self.track.copy(comment='location'))
def test_multiple_location_track_comment(self):
del self.tags['comment']
self.tags['location'] = ['location1', 'location2']
self.check(self.track.copy(comment='location1; location2'))
def test_stream_copyright_track_comment(self):
del self.tags['comment']
self.tags['copyright'] = ['copyright']
self.check(self.track.copy(comment='copyright'))
def test_multiple_copyright_track_comment(self):
del self.tags['comment']
self.tags['copyright'] = ['copyright1', 'copyright2']
self.check(self.track.copy(comment='copyright1; copyright2'))

View File

@ -9,6 +9,7 @@ pygst.require('0.10')
import gst # noqa
from mopidy import compat
from mopidy.models import Album, Artist, Track
logger = logging.getLogger(__name__)
@ -64,6 +65,68 @@ def supported_uri_schemes(uri_schemes):
return supported_schemes
def _artists(tags, artist_name, artist_id=None):
# Name missing, don't set artist
if not tags.get(artist_name):
return None
# One artist name and id, provide artist with id.
if len(tags[artist_name]) == 1 and artist_id in tags:
return [Artist(name=tags[artist_name][0],
musicbrainz_id=tags[artist_id][0])]
# Multiple artist, provide artists without id.
return [Artist(name=name) for name in tags[artist_name]]
# TODO: split based on "stream" and "track" based conversion? i.e. handle data
# from radios in it's own helper instead?
def convert_tags_to_track(tags):
"""Convert our normalized tags to a track.
:param :class:`dict` tags: dictionary of tag keys with a list of values
:rtype: :class:`mopidy.models.Track`
"""
album_kwargs = {}
track_kwargs = {}
track_kwargs['composers'] = _artists(tags, gst.TAG_COMPOSER)
track_kwargs['performers'] = _artists(tags, gst.TAG_PERFORMER)
track_kwargs['artists'] = _artists(
tags, gst.TAG_ARTIST, 'musicbrainz-artistid')
album_kwargs['artists'] = _artists(
tags, gst.TAG_ALBUM_ARTIST, 'musicbrainz-albumartistid')
track_kwargs['genre'] = '; '.join(tags.get(gst.TAG_GENRE, []))
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_TITLE, []))
if not track_kwargs['name']:
track_kwargs['name'] = '; '.join(tags.get(gst.TAG_ORGANIZATION, []))
track_kwargs['comment'] = '; '.join(tags.get('comment', []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_LOCATION, []))
if not track_kwargs['comment']:
track_kwargs['comment'] = '; '.join(tags.get(gst.TAG_COPYRIGHT, []))
track_kwargs['track_no'] = tags.get(gst.TAG_TRACK_NUMBER, [None])[0]
track_kwargs['disc_no'] = tags.get(gst.TAG_ALBUM_VOLUME_NUMBER, [None])[0]
track_kwargs['bitrate'] = tags.get(gst.TAG_BITRATE, [None])[0]
track_kwargs['musicbrainz_id'] = tags.get('musicbrainz-trackid', [None])[0]
album_kwargs['name'] = tags.get(gst.TAG_ALBUM, [None])[0]
album_kwargs['num_tracks'] = tags.get(gst.TAG_TRACK_COUNT, [None])[0]
album_kwargs['num_discs'] = tags.get(gst.TAG_ALBUM_VOLUME_COUNT, [None])[0]
album_kwargs['musicbrainz_id'] = tags.get('musicbrainz-albumid', [None])[0]
if tags.get(gst.TAG_DATE) and tags.get(gst.TAG_DATE)[0]:
track_kwargs['date'] = tags[gst.TAG_DATE][0].isoformat()
# Clear out any empty values we found
track_kwargs = {k: v for k, v in track_kwargs.items() if v}
album_kwargs = {k: v for k, v in album_kwargs.items() if v}
track_kwargs['album'] = Album(**album_kwargs)
return Track(**track_kwargs)
def convert_taglist(taglist):
"""Convert a :class:`gst.Taglist` to plain python types.

View File

@ -6,7 +6,7 @@ import os
import time
from mopidy import commands, compat, exceptions
from mopidy.audio import scan
from mopidy.audio import scan, utils
from mopidy.local import translator
from mopidy.utils import path
@ -135,7 +135,7 @@ class ScanCommand(commands.Command):
else:
# TODO: reuse mtime from above...
mtime = os.path.getmtime(os.path.join(media_dir, relpath))
track = scan.tags_to_track(tags).copy(
track = utils.convert_tags_to_track(tags).copy(
uri=uri, length=duration, last_modified=mtime)
track = translator.add_musicbrainz_coverart_to_track(track)
library.add(track)

View File

@ -8,7 +8,7 @@ import urlparse
import pykka
from mopidy import audio as audio_lib, backend, exceptions
from mopidy.audio import scan
from mopidy.audio import scan, utils
from mopidy.models import Track
logger = logging.getLogger(__name__)
@ -45,7 +45,7 @@ class StreamLibraryProvider(backend.LibraryProvider):
try:
tags, duration = self._scanner.scan(uri)
track = scan.tags_to_track(tags).copy(uri=uri, length=duration)
track = utils.tags_to_track(tags).copy(uri=uri, length=duration)
except exceptions.ScannerError as e:
logger.warning('Problem looking up %s: %s', uri, e)
track = Track(uri=uri)

View File

@ -1,6 +1,5 @@
from __future__ import absolute_import, unicode_literals
import datetime
import os
import unittest
@ -9,250 +8,11 @@ gobject.threads_init()
from mopidy import exceptions
from mopidy.audio import scan
from mopidy.models import Album, Artist, Track
from mopidy.utils import path as path_lib
from tests import path_to_data_dir
# TODO: keep ids without name?
class TagsToTrackTest(unittest.TestCase):
def setUp(self): # noqa
self.tags = {
'album': ['album'],
'track-number': [1],
'artist': ['artist'],
'composer': ['composer'],
'performer': ['performer'],
'album-artist': ['albumartist'],
'title': ['track'],
'track-count': [2],
'album-disc-number': [2],
'album-disc-count': [3],
'date': [datetime.date(2006, 1, 1,)],
'container-format': ['ID3 tag'],
'genre': ['genre'],
'comment': ['comment'],
'musicbrainz-trackid': ['trackid'],
'musicbrainz-albumid': ['albumid'],
'musicbrainz-artistid': ['artistid'],
'musicbrainz-albumartistid': ['albumartistid'],
'bitrate': [1000],
}
artist = Artist(name='artist', musicbrainz_id='artistid')
composer = Artist(name='composer')
performer = Artist(name='performer')
albumartist = Artist(name='albumartist',
musicbrainz_id='albumartistid')
album = Album(name='album', num_tracks=2, num_discs=3,
musicbrainz_id='albumid', artists=[albumartist])
self.track = Track(name='track', date='2006-01-01',
genre='genre', track_no=1, disc_no=2,
comment='comment', musicbrainz_id='trackid',
album=album, bitrate=1000, artists=[artist],
composers=[composer], performers=[performer])
def check(self, expected):
actual = scan.tags_to_track(self.tags)
self.assertEqual(expected, actual)
def test_track(self):
self.check(self.track)
def test_missing_track_no(self):
del self.tags['track-number']
self.check(self.track.copy(track_no=None))
def test_multiple_track_no(self):
self.tags['track-number'].append(9)
self.check(self.track)
def test_missing_track_disc_no(self):
del self.tags['album-disc-number']
self.check(self.track.copy(disc_no=None))
def test_multiple_track_disc_no(self):
self.tags['album-disc-number'].append(9)
self.check(self.track)
def test_missing_track_name(self):
del self.tags['title']
self.check(self.track.copy(name=None))
def test_multiple_track_name(self):
self.tags['title'] = ['name1', 'name2']
self.check(self.track.copy(name='name1; name2'))
def test_missing_track_musicbrainz_id(self):
del self.tags['musicbrainz-trackid']
self.check(self.track.copy(musicbrainz_id=None))
def test_multiple_track_musicbrainz_id(self):
self.tags['musicbrainz-trackid'].append('id')
self.check(self.track)
def test_missing_track_bitrate(self):
del self.tags['bitrate']
self.check(self.track.copy(bitrate=None))
def test_multiple_track_bitrate(self):
self.tags['bitrate'].append(1234)
self.check(self.track)
def test_missing_track_genre(self):
del self.tags['genre']
self.check(self.track.copy(genre=None))
def test_multiple_track_genre(self):
self.tags['genre'] = ['genre1', 'genre2']
self.check(self.track.copy(genre='genre1; genre2'))
def test_missing_track_date(self):
del self.tags['date']
self.check(self.track.copy(date=None))
def test_multiple_track_date(self):
self.tags['date'].append(datetime.date(2030, 1, 1))
self.check(self.track)
def test_missing_track_comment(self):
del self.tags['comment']
self.check(self.track.copy(comment=None))
def test_multiple_track_comment(self):
self.tags['comment'] = ['comment1', 'comment2']
self.check(self.track.copy(comment='comment1; comment2'))
def test_missing_track_artist_name(self):
del self.tags['artist']
self.check(self.track.copy(artists=[]))
def test_multiple_track_artist_name(self):
self.tags['artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
self.check(self.track.copy(artists=artists))
def test_missing_track_artist_musicbrainz_id(self):
del self.tags['musicbrainz-artistid']
artist = list(self.track.artists)[0].copy(musicbrainz_id=None)
self.check(self.track.copy(artists=[artist]))
def test_multiple_track_artist_musicbrainz_id(self):
self.tags['musicbrainz-artistid'].append('id')
self.check(self.track)
def test_missing_track_composer_name(self):
del self.tags['composer']
self.check(self.track.copy(composers=[]))
def test_multiple_track_composer_name(self):
self.tags['composer'] = ['composer1', 'composer2']
composers = [Artist(name='composer1'), Artist(name='composer2')]
self.check(self.track.copy(composers=composers))
def test_missing_track_performer_name(self):
del self.tags['performer']
self.check(self.track.copy(performers=[]))
def test_multiple_track_performe_name(self):
self.tags['performer'] = ['performer1', 'performer2']
performers = [Artist(name='performer1'), Artist(name='performer2')]
self.check(self.track.copy(performers=performers))
def test_missing_album_name(self):
del self.tags['album']
album = self.track.album.copy(name=None)
self.check(self.track.copy(album=album))
def test_multiple_album_name(self):
self.tags['album'].append('album2')
self.check(self.track)
def test_missing_album_musicbrainz_id(self):
del self.tags['musicbrainz-albumid']
album = self.track.album.copy(musicbrainz_id=None,
images=[])
self.check(self.track.copy(album=album))
def test_multiple_album_musicbrainz_id(self):
self.tags['musicbrainz-albumid'].append('id')
self.check(self.track)
def test_missing_album_num_tracks(self):
del self.tags['track-count']
album = self.track.album.copy(num_tracks=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_tracks(self):
self.tags['track-count'].append(9)
self.check(self.track)
def test_missing_album_num_discs(self):
del self.tags['album-disc-count']
album = self.track.album.copy(num_discs=None)
self.check(self.track.copy(album=album))
def test_multiple_album_num_discs(self):
self.tags['album-disc-count'].append(9)
self.check(self.track)
def test_missing_album_artist_name(self):
del self.tags['album-artist']
album = self.track.album.copy(artists=[])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_name(self):
self.tags['album-artist'] = ['name1', 'name2']
artists = [Artist(name='name1'), Artist(name='name2')]
album = self.track.album.copy(artists=artists)
self.check(self.track.copy(album=album))
def test_missing_album_artist_musicbrainz_id(self):
del self.tags['musicbrainz-albumartistid']
albumartist = list(self.track.album.artists)[0]
albumartist = albumartist.copy(musicbrainz_id=None)
album = self.track.album.copy(artists=[albumartist])
self.check(self.track.copy(album=album))
def test_multiple_album_artist_musicbrainz_id(self):
self.tags['musicbrainz-albumartistid'].append('id')
self.check(self.track)
def test_stream_organization_track_name(self):
del self.tags['title']
self.tags['organization'] = ['organization']
self.check(self.track.copy(name='organization'))
def test_multiple_organization_track_name(self):
del self.tags['title']
self.tags['organization'] = ['organization1', 'organization2']
self.check(self.track.copy(name='organization1; organization2'))
# TODO: combine all comment types?
def test_stream_location_track_comment(self):
del self.tags['comment']
self.tags['location'] = ['location']
self.check(self.track.copy(comment='location'))
def test_multiple_location_track_comment(self):
del self.tags['comment']
self.tags['location'] = ['location1', 'location2']
self.check(self.track.copy(comment='location1; location2'))
def test_stream_copyright_track_comment(self):
del self.tags['comment']
self.tags['copyright'] = ['copyright']
self.check(self.track.copy(comment='copyright'))
def test_multiple_copyright_track_comment(self):
del self.tags['comment']
self.tags['copyright'] = ['copyright1', 'copyright2']
self.check(self.track.copy(comment='copyright1; copyright2'))
class ScannerTest(unittest.TestCase):
def setUp(self): # noqa
self.errors = {}