From 82f5b376da73c06c4023572c27be60117e300eb5 Mon Sep 17 00:00:00 2001 From: Stein Magnus Jodal Date: Thu, 1 Nov 2012 14:03:09 +0100 Subject: [PATCH] Validate the stored playlist file paths --- mopidy/backends/local/stored_playlists.py | 73 +++++++++++++++++------ 1 file changed, 54 insertions(+), 19 deletions(-) diff --git a/mopidy/backends/local/stored_playlists.py b/mopidy/backends/local/stored_playlists.py index 3d488655..9e5b72c6 100644 --- a/mopidy/backends/local/stored_playlists.py +++ b/mopidy/backends/local/stored_playlists.py @@ -19,16 +19,14 @@ logger = logging.getLogger(u'mopidy.backends.local') class LocalStoredPlaylistsProvider(base.BaseStoredPlaylistsProvider): def __init__(self, *args, **kwargs): super(LocalStoredPlaylistsProvider, self).__init__(*args, **kwargs) - self._folder = settings.LOCAL_PLAYLIST_PATH + self._path = settings.LOCAL_PLAYLIST_PATH self.refresh() def create(self, name): name = self._slugify(name) - file_path = os.path.join(self._folder, name + '.m3u') - uri = path.path_to_uri(file_path) + uri = path.path_to_uri(self._get_m3u_path(name)) playlist = Playlist(uri=uri, name=name) - self.save(playlist) - return playlist + return self.save(playlist) def delete(self, uri): playlist = self.lookup(uri) @@ -36,7 +34,7 @@ class LocalStoredPlaylistsProvider(base.BaseStoredPlaylistsProvider): return self._playlists.remove(playlist) - self._delete_m3u(playlist) + self._delete_m3u(playlist.uri) def lookup(self, uri): for playlist in self._playlists: @@ -44,21 +42,24 @@ class LocalStoredPlaylistsProvider(base.BaseStoredPlaylistsProvider): return playlist def refresh(self): + logger.info('Loading playlists from %s', self._path) + playlists = [] - logger.info('Loading playlists from %s', self._folder) - - for m3u in glob.glob(os.path.join(self._folder, '*.m3u')): + for m3u in glob.glob(os.path.join(self._path, '*.m3u')): uri = path.path_to_uri(m3u) name = os.path.splitext(os.path.basename(m3u))[0] + tracks = [] for track_uri in parse_m3u(m3u, settings.LOCAL_MUSIC_PATH): try: + # TODO We must use core.library.lookup() to support tracks + # from other backends tracks.append(self.backend.library.lookup(track_uri)) except LookupError as ex: logger.error('Playlist item could not be added: %s', ex) - playlist = Playlist(uri=uri, name=name, tracks=tracks) + playlist = Playlist(uri=uri, name=name, tracks=tracks) playlists.append(playlist) self.playlists = playlists @@ -69,11 +70,8 @@ class LocalStoredPlaylistsProvider(base.BaseStoredPlaylistsProvider): old_playlist = self.lookup(playlist.uri) if old_playlist and playlist.name != old_playlist.name: - new_name = self._slugify(playlist.name) - src = os.path.join(self._folder, old_playlist.name + '.m3u') - dst = os.path.join(self._folder, new_name + '.m3u') - shutil.move(src, dst) - playlist = playlist.copy(uri=path.path_to_uri(dst), name=new_name) + playlist = playlist.copy(name=self._slugify(playlist.name)) + playlist = self._rename_m3u(playlist) self._save_m3u(playlist) @@ -85,21 +83,58 @@ class LocalStoredPlaylistsProvider(base.BaseStoredPlaylistsProvider): return playlist + def _get_m3u_path(self, name): + name = self._slugify(name) + file_path = os.path.join(self._path, name + '.m3u') + self._validate_file_path(file_path) + return file_path + def _save_m3u(self, playlist): - file_path = playlist.uri[len('file://'):] + file_path = path.uri_to_path(playlist.uri) + self._validate_file_path(file_path) with open(file_path, 'w') as file_handle: for track in playlist.tracks: if track.uri.startswith('file://'): - uri = track.uri[len('file://'):] + uri = path.uri_to_path(track.uri) else: uri = track.uri file_handle.write(uri + '\n') - def _delete_m3u(self, playlist): - file_path = playlist.uri[len('file://'):] + def _delete_m3u(self, uri): + file_path = path.uri_to_path(uri) + self._validate_file_path(file_path) if os.path.exists(file_path): os.remove(file_path) + def _rename_m3u(self, playlist): + src_file_path = path.uri_to_path(playlist.uri) + self._validate_file_path(src_file_path) + + dst_file_path = self._get_m3u_path(playlist.name) + self._validate_file_path(dst_file_path) + + shutil.move(src_file_path, dst_file_path) + + return playlist.copy(uri=path.path_to_uri(dst_file_path)) + + def _validate_file_path(self, file_path): + assert not file_path.endswith(os.sep), ( + 'File path %s cannot end with a path separator' % file_path) + + # Expand symlinks + real_base_path = os.path.realpath(self._path) + real_file_path = os.path.realpath(file_path) + + # Use dir of file for prefix comparision, so we don't accept + # /tmp/foo.m3u as being inside /tmp/foo, simply because they have a + # common prefix, /tmp/foo, which matches the base path, /tmp/foo. + real_dir_path = os.path.dirname(real_file_path) + + # Check if dir of file is the base path or a subdir + common_prefix = os.path.commonprefix([real_base_path, real_dir_path]) + assert common_prefix == real_base_path, ( + 'File path %s must be in %s' % (real_file_path, real_base_path)) + def _slugify(self, value): """ Converts to lowercase, removes non-word characters (alphanumerics and