Merge pull request #1702 from jodal/fix/m3u-arbitrary-file-access

Fix arbitrary file access in M3U backend
This commit is contained in:
Nick Steel 2018-09-27 22:52:32 +01:00 committed by GitHub
commit 09240dae2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 90 additions and 13 deletions

View File

@ -22,6 +22,12 @@ Feature release.
- Core: Fix crash on `library.lookup(uris=[])`. (Fixes: :issue:`1619`, PR:
:issue:`1620`)
- Core: Define return value of `playlists.delete()` to be a bool, :class:`True`
on success, :class:`False` otherwise. (PR: :issue:`1702`)
- M3U: Ignore all attempts at accessing files outside the
:confval:`m3u/playlist_dir`. (Partly fixes: :issue:`1659`, PR: :issue:`1702`)
- File: Change default ordering to show directories first, then files. (PR:
:issue:`1595`)

View File

@ -362,10 +362,16 @@ class PlaylistsProvider(object):
"""
Delete playlist identified by the URI.
Returns :class:`True` if deleted, :class:`False` otherwise.
*MUST be implemented by subclass.*
:param uri: URI of the playlist to delete
:type uri: string
:rtype: :class:`bool`
.. versionchanged:: 2.2
Return type defined.
"""
raise NotImplementedError

View File

@ -180,22 +180,35 @@ class PlaylistsController(object):
If the URI doesn't match the URI schemes handled by the current
backends, nothing happens.
Returns :class:`True` if deleted, :class:`False` otherwise.
:param uri: URI of the playlist to delete
:type uri: string
:rtype: :class:`bool`
.. versionchanged:: 2.2
Return type defined.
"""
validation.check_uri(uri)
uri_scheme = urllib.parse.urlparse(uri).scheme
backend = self.backends.with_playlists.get(uri_scheme, None)
if not backend:
return None # TODO: error reporting to user
return False
success = False
with _backend_error_handling(backend):
backend.playlists.delete(uri).get()
# TODO: error detection and reporting to user
success = backend.playlists.delete(uri).get()
if success is None:
# Return type was defined in Mopidy 2.2. Assume everything went
# well if the backend doesn't report otherwise.
success = True
if success:
listener.CoreListener.send('playlist_deleted', uri=uri)
# TODO: return value?
return success
def filter(self, criteria=None, **kwargs):
"""

View File

@ -9,6 +9,7 @@ import os
import tempfile
from mopidy import backend
from mopidy.internal import path
from . import Extension, translator
@ -89,13 +90,22 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
def delete(self, uri):
path = translator.uri_to_path(uri)
if not self._is_in_basedir(path):
logger.debug('Ignoring path outside playlist dir: %s', uri)
return False
try:
os.remove(self._abspath(path))
except EnvironmentError as e:
log_environment_error('Error deleting playlist %s' % uri, e)
return False
else:
return True
def get_items(self, uri):
path = translator.uri_to_path(uri)
if not self._is_in_basedir(path):
logger.debug('Ignoring path outside playlist dir: %s', uri)
return None
try:
with self._open(path, 'r') as fp:
items = translator.load_items(fp, self._base_dir)
@ -106,6 +116,9 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
def lookup(self, uri):
path = translator.uri_to_path(uri)
if not self._is_in_basedir(path):
logger.debug('Ignoring path outside playlist dir: %s', uri)
return None
try:
with self._open(path, 'r') as fp:
items = translator.load_items(fp, self._base_dir)
@ -120,6 +133,10 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
def save(self, playlist):
path = translator.uri_to_path(playlist.uri)
if not self._is_in_basedir(path):
logger.debug(
'Ignoring path outside playlist dir: %s', playlist.uri)
return None
name = translator.name_from_path(path)
try:
with self._open(path, 'w') as fp:
@ -137,6 +154,11 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
def _abspath(self, path):
return os.path.join(self._playlists_dir, path)
def _is_in_basedir(self, local_path):
if not os.path.isabs(local_path):
local_path = os.path.join(self._playlists_dir, local_path)
return path.is_path_inside_base_dir(local_path, self._playlists_dir)
def _open(self, path, mode='r'):
if path.endswith(b'.m3u8'):
encoding = 'utf-8'
@ -144,6 +166,10 @@ class M3UPlaylistsProvider(backend.PlaylistsProvider):
encoding = self._default_encoding
if not os.path.isabs(path):
path = os.path.join(self._playlists_dir, path)
if not self._is_in_basedir(path):
raise Exception(
'Path (%s) is not inside playlist_dir (%s)'
% (path, self._playlists_dir))
if 'w' in mode:
return replace(path, mode, encoding=encoding, errors='replace')
else:

View File

@ -110,7 +110,9 @@ def dump_items(items, fp):
print(item.uri, file=fp)
def playlist(path, items=[], mtime=None):
def playlist(path, items=None, mtime=None):
if items is None:
items = []
return models.Playlist(
uri=path_to_uri(path),
name=name_from_path(path),

View File

@ -141,26 +141,30 @@ class PlaylistTest(BasePlaylistsTest):
self.assertFalse(self.sp2.create.called)
def test_delete_selects_the_dummy1_backend(self):
self.core.playlists.delete('dummy1:a')
success = self.core.playlists.delete('dummy1:a')
self.assertTrue(success)
self.sp1.delete.assert_called_once_with('dummy1:a')
self.assertFalse(self.sp2.delete.called)
def test_delete_selects_the_dummy2_backend(self):
self.core.playlists.delete('dummy2:a')
success = self.core.playlists.delete('dummy2:a')
self.assertTrue(success)
self.assertFalse(self.sp1.delete.called)
self.sp2.delete.assert_called_once_with('dummy2:a')
def test_delete_with_unknown_uri_scheme_does_nothing(self):
self.core.playlists.delete('unknown:a')
success = self.core.playlists.delete('unknown:a')
self.assertFalse(success)
self.assertFalse(self.sp1.delete.called)
self.assertFalse(self.sp2.delete.called)
def test_delete_ignores_backend_without_playlist_support(self):
self.core.playlists.delete('dummy3:a')
success = self.core.playlists.delete('dummy3:a')
self.assertFalse(success)
self.assertFalse(self.sp1.delete.called)
self.assertFalse(self.sp2.delete.called)
@ -377,7 +381,7 @@ class DeleteBadBackendsTest(MockBackendCorePlaylistsBase):
def test_backend_raises_exception(self, logger):
self.playlists.delete.return_value.get.side_effect = Exception
self.assertIsNone(self.core.playlists.delete('dummy:/1'))
self.assertFalse(self.core.playlists.delete('dummy:/1'))
logger.exception.assert_called_with(mock.ANY, 'DummyBackend')

View File

@ -94,9 +94,15 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
self.assertEqual(uri, playlist.uri)
self.assertTrue(os.path.exists(path))
self.core.playlists.delete(playlist.uri)
success = self.core.playlists.delete(playlist.uri)
self.assertTrue(success)
self.assertFalse(os.path.exists(path))
def test_delete_on_path_outside_playlist_dir_returns_none(self):
success = self.core.playlists.delete('m3u:///etc/passwd')
self.assertFalse(success)
def test_playlist_contents_is_written_to_disk(self):
track = Track(uri=generate_song(1))
playlist = self.core.playlists.create('test')
@ -122,7 +128,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
def test_latin1_playlist_contents_is_written_to_disk(self):
track = Track(uri=generate_song(1), name='Test\x9f', length=60000)
playlist = self.core.playlists.create('test')
playlist = self.core.playlists.save(playlist.copy(tracks=[track]))
playlist = self.core.playlists.save(playlist.replace(tracks=[track]))
path = os.path.join(self.playlists_dir, b'test.m3u')
with open(path, 'rb') as f:
@ -132,7 +138,7 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
def test_utf8_playlist_contents_is_replaced_and_written_to_disk(self):
track = Track(uri=generate_song(1), name='Test\u07b4', length=60000)
playlist = self.core.playlists.create('test')
playlist = self.core.playlists.save(playlist.copy(tracks=[track]))
playlist = self.core.playlists.save(playlist.replace(tracks=[track]))
path = os.path.join(self.playlists_dir, b'test.m3u')
with open(path, 'rb') as f:
@ -216,6 +222,11 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
self.assertEqual(original_playlist, looked_up_playlist)
def test_lookup_on_path_outside_playlist_dir_returns_none(self):
result = self.core.playlists.lookup('m3u:///etc/passwd')
self.assertIsNone(result)
def test_refresh(self):
playlist = self.core.playlists.create('test')
self.assertEqual(playlist, self.core.playlists.lookup(playlist.uri))
@ -251,6 +262,10 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
path = os.path.join(self.playlists_dir, b'test.m3u')
self.assertTrue(os.path.exists(path))
def test_save_on_path_outside_playlist_dir_returns_none(self):
result = self.core.playlists.save(Playlist(uri='m3u:///tmp/test.m3u'))
self.assertIsNone(result)
def test_playlist_with_unknown_track(self):
track = Track(uri='file:///dev/null')
playlist = self.core.playlists.create('test')
@ -330,6 +345,11 @@ class M3UPlaylistsProviderTest(unittest.TestCase):
self.assertIsNone(item_refs)
def test_get_items_from_file_outside_playlist_dir_returns_none(self):
item_refs = self.core.playlists.get_items('m3u:///etc/passwd')
self.assertIsNone(item_refs)
class M3UPlaylistsProviderBaseDirectoryTest(M3UPlaylistsProviderTest):