# encoding: utf-8 from __future__ import absolute_import, unicode_literals import os import shutil import tempfile import unittest import pytest from mopidy import compat, exceptions from mopidy.internal import path from mopidy.internal.gi import GLib import tests class GetOrCreateDirTest(unittest.TestCase): def setUp(self): # noqa: N802 self.parent = tempfile.mkdtemp() def tearDown(self): # noqa: N802 if os.path.isdir(self.parent): shutil.rmtree(self.parent) def test_creating_dir(self): dir_path = os.path.join(self.parent, b'test') self.assert_(not os.path.exists(dir_path)) created = path.get_or_create_dir(dir_path) self.assert_(os.path.exists(dir_path)) self.assert_(os.path.isdir(dir_path)) self.assertEqual(created, dir_path) def test_creating_nested_dirs(self): level2_dir = os.path.join(self.parent, b'test') level3_dir = os.path.join(self.parent, b'test', b'test') self.assert_(not os.path.exists(level2_dir)) self.assert_(not os.path.exists(level3_dir)) created = path.get_or_create_dir(level3_dir) self.assert_(os.path.exists(level2_dir)) self.assert_(os.path.isdir(level2_dir)) self.assert_(os.path.exists(level3_dir)) self.assert_(os.path.isdir(level3_dir)) self.assertEqual(created, level3_dir) def test_creating_existing_dir(self): created = path.get_or_create_dir(self.parent) self.assert_(os.path.exists(self.parent)) self.assert_(os.path.isdir(self.parent)) self.assertEqual(created, self.parent) def test_create_dir_with_name_of_existing_file_throws_oserror(self): conflicting_file = os.path.join(self.parent, b'test') open(conflicting_file, 'w').close() dir_path = os.path.join(self.parent, b'test') with self.assertRaises(OSError): path.get_or_create_dir(dir_path) def test_create_dir_with_unicode(self): with self.assertRaises(ValueError): dir_path = compat.text_type(os.path.join(self.parent, b'test')) path.get_or_create_dir(dir_path) def test_create_dir_with_none(self): with self.assertRaises(ValueError): path.get_or_create_dir(None) class GetOrCreateFileTest(unittest.TestCase): def setUp(self): # noqa: N802 self.parent = tempfile.mkdtemp() def tearDown(self): # noqa: N802 if os.path.isdir(self.parent): shutil.rmtree(self.parent) def test_creating_file(self): file_path = os.path.join(self.parent, b'test') self.assert_(not os.path.exists(file_path)) created = path.get_or_create_file(file_path) self.assert_(os.path.exists(file_path)) self.assert_(os.path.isfile(file_path)) self.assertEqual(created, file_path) def test_creating_nested_file(self): level2_dir = os.path.join(self.parent, b'test') file_path = os.path.join(self.parent, b'test', b'test') self.assert_(not os.path.exists(level2_dir)) self.assert_(not os.path.exists(file_path)) created = path.get_or_create_file(file_path) self.assert_(os.path.exists(level2_dir)) self.assert_(os.path.isdir(level2_dir)) self.assert_(os.path.exists(file_path)) self.assert_(os.path.isfile(file_path)) self.assertEqual(created, file_path) def test_creating_existing_file(self): file_path = os.path.join(self.parent, b'test') path.get_or_create_file(file_path) created = path.get_or_create_file(file_path) self.assert_(os.path.exists(file_path)) self.assert_(os.path.isfile(file_path)) self.assertEqual(created, file_path) def test_create_file_with_name_of_existing_dir_throws_ioerror(self): conflicting_dir = os.path.join(self.parent) with self.assertRaises(IOError): path.get_or_create_file(conflicting_dir) def test_create_dir_with_unicode_filename_throws_value_error(self): with self.assertRaises(ValueError): file_path = compat.text_type(os.path.join(self.parent, b'test')) path.get_or_create_file(file_path) def test_create_file_with_none_filename_throws_value_error(self): with self.assertRaises(ValueError): path.get_or_create_file(None) def test_create_dir_without_mkdir(self): file_path = os.path.join(self.parent, b'foo', b'bar') with self.assertRaises(IOError): path.get_or_create_file(file_path, mkdir=False) def test_create_dir_with_bytes_content(self): file_path = os.path.join(self.parent, b'test') created = path.get_or_create_file(file_path, content=b'foobar') with open(created) as fh: self.assertEqual(fh.read(), b'foobar') def test_create_dir_with_unicode_content(self): file_path = os.path.join(self.parent, b'test') created = path.get_or_create_file(file_path, content='foobaræøå') with open(created) as fh: self.assertEqual(fh.read(), b'foobar\xc3\xa6\xc3\xb8\xc3\xa5') class GetUnixSocketPathTest(unittest.TestCase): def test_correctly_matched_socket_path(self): self.assertEqual( path.get_unix_socket_path('unix:/tmp/mopidy.socket'), '/tmp/mopidy.socket' ) def test_correctly_no_match_socket_path(self): self.assertIsNone(path.get_unix_socket_path('127.0.0.1')) class PathToFileURITest(unittest.TestCase): def test_simple_path(self): result = path.path_to_uri('/etc/fstab') self.assertEqual(result, 'file:///etc/fstab') def test_space_in_path(self): result = path.path_to_uri('/tmp/test this') self.assertEqual(result, 'file:///tmp/test%20this') def test_unicode_in_path(self): result = path.path_to_uri('/tmp/æøå') self.assertEqual(result, 'file:///tmp/%C3%A6%C3%B8%C3%A5') def test_utf8_in_path(self): result = path.path_to_uri('/tmp/æøå'.encode('utf-8')) self.assertEqual(result, 'file:///tmp/%C3%A6%C3%B8%C3%A5') def test_latin1_in_path(self): result = path.path_to_uri('/tmp/æøå'.encode('latin-1')) self.assertEqual(result, 'file:///tmp/%E6%F8%E5') class UriToPathTest(unittest.TestCase): def test_simple_uri(self): result = path.uri_to_path('file:///etc/fstab') self.assertEqual(result, '/etc/fstab'.encode('utf-8')) def test_space_in_uri(self): result = path.uri_to_path('file:///tmp/test%20this') self.assertEqual(result, '/tmp/test this'.encode('utf-8')) def test_unicode_in_uri(self): result = path.uri_to_path('file:///tmp/%C3%A6%C3%B8%C3%A5') self.assertEqual(result, '/tmp/æøå'.encode('utf-8')) def test_latin1_in_uri(self): result = path.uri_to_path('file:///tmp/%E6%F8%E5') self.assertEqual(result, '/tmp/æøå'.encode('latin-1')) class SplitPathTest(unittest.TestCase): def test_empty_path(self): self.assertEqual([], path.split_path('')) def test_single_dir(self): self.assertEqual(['foo'], path.split_path('foo')) def test_dirs(self): self.assertEqual(['foo', 'bar', 'baz'], path.split_path('foo/bar/baz')) def test_initial_slash_is_ignored(self): self.assertEqual( ['foo', 'bar', 'baz'], path.split_path('/foo/bar/baz')) def test_only_slash(self): self.assertEqual([], path.split_path('/')) class ExpandPathTest(unittest.TestCase): # TODO: test via mocks? def test_empty_path(self): self.assertEqual(os.path.abspath(b'.'), path.expand_path(b'')) def test_absolute_path(self): self.assertEqual(b'/tmp/foo', path.expand_path(b'/tmp/foo')) def test_home_dir_expansion(self): self.assertEqual( os.path.expanduser(b'~/foo'), path.expand_path(b'~/foo')) def test_abspath(self): self.assertEqual(os.path.abspath(b'./foo'), path.expand_path(b'./foo')) def test_xdg_subsititution(self): self.assertEqual( GLib.get_user_data_dir() + b'/foo', path.expand_path(b'$XDG_DATA_DIR/foo')) def test_xdg_subsititution_unknown(self): self.assertIsNone( path.expand_path(b'/tmp/$XDG_INVALID_DIR/foo')) class FindMTimesTest(unittest.TestCase): maxDiff = None def setUp(self): # noqa: N802 self.tmpdir = tempfile.mkdtemp(b'.mopidy-tests') def tearDown(self): # noqa: N802 shutil.rmtree(self.tmpdir, ignore_errors=True) def mkdir(self, *args): name = os.path.join(self.tmpdir, *[bytes(a) for a in args]) os.mkdir(name) return name def touch(self, *args): name = os.path.join(self.tmpdir, *[bytes(a) for a in args]) open(name, 'w').close() return name def test_names_are_bytestrings(self): """We shouldn't be mixing in unicode for paths.""" result, errors = path.find_mtimes(tests.path_to_data_dir('')) for name in result.keys() + errors.keys(): self.assertEqual(name, tests.IsA(bytes)) def test_nonexistent_dir(self): """Non existent search roots are an error""" missing = os.path.join(self.tmpdir, 'does-not-exist') result, errors = path.find_mtimes(missing) self.assertEqual(result, {}) self.assertEqual(errors, {missing: tests.IsA(exceptions.FindError)}) def test_empty_dir(self): """Empty directories should not show up in results""" self.mkdir('empty') result, errors = path.find_mtimes(self.tmpdir) self.assertEqual(result, {}) self.assertEqual(errors, {}) def test_file_as_the_root(self): """Specifying a file as the root should just return the file""" single = self.touch('single') result, errors = path.find_mtimes(single) self.assertEqual(result, {single: tests.any_int}) self.assertEqual(errors, {}) def test_nested_directories(self): """Searching nested directories should find all files""" # Setup foo/bar and baz directories self.mkdir('foo') self.mkdir('foo', 'bar') self.mkdir('baz') # Touch foo/file foo/bar/file and baz/file foo_file = self.touch('foo', 'file') foo_bar_file = self.touch('foo', 'bar', 'file') baz_file = self.touch('baz', 'file') result, errors = path.find_mtimes(self.tmpdir) self.assertEqual(result, {foo_file: tests.any_int, foo_bar_file: tests.any_int, baz_file: tests.any_int}) self.assertEqual(errors, {}) def test_missing_permission_to_file(self): """Missing permissions to a file is not a search error""" target = self.touch('no-permission') os.chmod(target, 0) result, errors = path.find_mtimes(self.tmpdir) self.assertEqual({target: tests.any_int}, result) self.assertEqual({}, errors) def test_missing_permission_to_directory(self): """Missing permissions to a directory is an error""" directory = self.mkdir('no-permission') os.chmod(directory, 0) result, errors = path.find_mtimes(self.tmpdir) self.assertEqual({}, result) self.assertEqual({directory: tests.IsA(exceptions.FindError)}, errors) def test_symlinks_are_ignored(self): """By default symlinks should be treated as an error""" target = self.touch('target') link = os.path.join(self.tmpdir, 'link') os.symlink(target, link) result, errors = path.find_mtimes(self.tmpdir) self.assertEqual(result, {target: tests.any_int}) self.assertEqual(errors, {link: tests.IsA(exceptions.FindError)}) def test_symlink_to_file_as_root_is_followed(self): """Passing a symlink as the root should be followed when follow=True""" target = self.touch('target') link = os.path.join(self.tmpdir, 'link') os.symlink(target, link) result, errors = path.find_mtimes(link, follow=True) self.assertEqual({link: tests.any_int}, result) self.assertEqual({}, errors) def test_symlink_to_directory_is_followed(self): pass def test_symlink_pointing_at_itself_fails(self): """Symlink pointing at itself should give as an OS error""" link = os.path.join(self.tmpdir, 'link') os.symlink(link, link) result, errors = path.find_mtimes(link, follow=True) self.assertEqual({}, result) self.assertEqual({link: tests.IsA(exceptions.FindError)}, errors) def test_symlink_pointing_at_parent_fails(self): """We should detect a loop via the parent and give up on the branch""" os.symlink(self.tmpdir, os.path.join(self.tmpdir, 'link')) result, errors = path.find_mtimes(self.tmpdir, follow=True) self.assertEqual({}, result) self.assertEqual(1, len(errors)) self.assertEqual(tests.IsA(Exception), errors.values()[0]) def test_indirect_symlink_loop(self): """More indirect loops should also be detected""" # Setup tmpdir/directory/loop where loop points to tmpdir directory = os.path.join(self.tmpdir, b'directory') loop = os.path.join(directory, b'loop') os.mkdir(directory) os.symlink(self.tmpdir, loop) result, errors = path.find_mtimes(self.tmpdir, follow=True) self.assertEqual({}, result) self.assertEqual({loop: tests.IsA(Exception)}, errors) def test_symlink_branches_are_not_excluded(self): """Using symlinks to make a file show up multiple times should work""" self.mkdir('directory') target = self.touch('directory', 'target') link1 = os.path.join(self.tmpdir, b'link1') link2 = os.path.join(self.tmpdir, b'link2') os.symlink(target, link1) os.symlink(target, link2) expected = {target: tests.any_int, link1: tests.any_int, link2: tests.any_int} result, errors = path.find_mtimes(self.tmpdir, follow=True) self.assertEqual(expected, result) self.assertEqual({}, errors) def test_gives_mtime_in_milliseconds(self): fname = self.touch('foobar') os.utime(fname, (1, 3.14159265)) result, errors = path.find_mtimes(fname) self.assertEqual(len(result), 1) mtime, = result.values() self.assertEqual(mtime, 3141) self.assertEqual(errors, {}) class TestIsPathInsideBaseDir(object): def test_when_inside(self): assert path.is_path_inside_base_dir( '/æ/øå'.encode('utf-8'), '/æ'.encode('utf-8')) def test_when_outside(self): assert not path.is_path_inside_base_dir( '/æ/øå'.encode('utf-8'), '/ø'.encode('utf-8')) def test_byte_inside_str_fails(self): with pytest.raises(ValueError): path.is_path_inside_base_dir('/æ/øå'.encode('utf-8'), '/æ') def test_str_inside_byte_fails(self): with pytest.raises(ValueError): path.is_path_inside_base_dir('/æ/øå', '/æ'.encode('utf-8')) def test_str_inside_str_fails(self): with pytest.raises(ValueError): path.is_path_inside_base_dir('/æ/øå', '/æ') # TODO: kill this in favour of just os.path.getmtime + mocks class MtimeTest(unittest.TestCase): def tearDown(self): # noqa: N802 path.mtime.undo_fake() def test_mtime_of_current_dir(self): mtime_dir = int(os.stat('.').st_mtime) self.assertEqual(mtime_dir, path.mtime('.')) def test_fake_time_is_returned(self): path.mtime.set_fake_time(123456) self.assertEqual(path.mtime('.'), 123456)