|
| 1 | +import errno |
| 2 | +import os |
| 3 | +import shutil |
| 4 | +import stat |
| 5 | +import tempfile |
| 6 | +import unittest |
| 7 | +import uuid |
| 8 | + |
| 9 | +from fuse import FuseOSError |
| 10 | +from tempfile import mkdtemp |
| 11 | +from unittest.mock import patch, MagicMock |
| 12 | + |
| 13 | +from globaleaks_eph_fs import EphemeralFile, EphemeralOperations, mount_globaleaks_eph_fs, main, unmount_if_mounted |
| 14 | + |
| 15 | +TEST_PATH = str(uuid.uuid4()) |
| 16 | +TEST_DATA = b"Hello, world! This is a test data for writing, seeking and reading operations." |
| 17 | + |
| 18 | +ORIGINAL_SIZE = len(TEST_DATA) |
| 19 | +EXTENDED_SIZE = ORIGINAL_SIZE*2 |
| 20 | +REDUCED_SIZE = ORIGINAL_SIZE//2 |
| 21 | + |
| 22 | +class TestEphemeralFile(unittest.TestCase): |
| 23 | + def setUp(self): |
| 24 | + self.storage_dir = mkdtemp() |
| 25 | + self.ephemeral_file = EphemeralFile(self.storage_dir) |
| 26 | + |
| 27 | + def tearDown(self): |
| 28 | + shutil.rmtree(self.storage_dir) |
| 29 | + |
| 30 | + def test_create_and_write_file(self): |
| 31 | + with self.ephemeral_file.open('w') as file: |
| 32 | + file.write(TEST_DATA) |
| 33 | + |
| 34 | + self.assertTrue(os.path.exists(self.ephemeral_file.filepath)) |
| 35 | + |
| 36 | + def test_encryption_and_decryption(self): |
| 37 | + with self.ephemeral_file.open('w') as file: |
| 38 | + file.write(TEST_DATA) |
| 39 | + |
| 40 | + # Define test cases: each case is a tuple (seek_position, read_size, expected_data) |
| 41 | + seek_tests = [ |
| 42 | + (0, 1, TEST_DATA[:1]), # Seek at the start read 1 byte |
| 43 | + (5, 5, TEST_DATA[5:10]), # Seek forward, read 5 bytes |
| 44 | + (10, 2, TEST_DATA[10:12]), # Seek forward, read 2 bytes |
| 45 | + (0, 3, TEST_DATA[:3]), # Seek backward, read 3 bytes |
| 46 | + ] |
| 47 | + |
| 48 | + # Test forward and backward seeking with different offsets |
| 49 | + with self.ephemeral_file.open('r') as file: |
| 50 | + for seek_pos, read_size, expected in seek_tests: |
| 51 | + file.seek(seek_pos) # Seek to the given position |
| 52 | + self.assertEqual(file.tell(), seek_pos) # Check position after seeking forward |
| 53 | + read_data = file.read(read_size) # Read the specified number of bytes |
| 54 | + self.assertEqual(read_data, expected) # Verify the data matches the expected value |
| 55 | + |
| 56 | + def test_file_cleanup(self): |
| 57 | + path_copy = self.ephemeral_file.filepath |
| 58 | + del self.ephemeral_file |
| 59 | + self.assertFalse(os.path.exists(path_copy)) |
| 60 | + |
| 61 | +class TestEphemeralOperations(unittest.TestCase): |
| 62 | + def setUp(self): |
| 63 | + self.storage_dir = mkdtemp() |
| 64 | + self.fs = EphemeralOperations(self.storage_dir) |
| 65 | + |
| 66 | + # Get current user's UID and GID |
| 67 | + self.current_uid = os.getuid() |
| 68 | + self.current_gid = os.getgid() |
| 69 | + |
| 70 | + def tearDown(self): |
| 71 | + for file in self.fs.files.values(): |
| 72 | + os.remove(file.filepath) |
| 73 | + os.rmdir(self.storage_dir) |
| 74 | + |
| 75 | + def test_create_file(self): |
| 76 | + self.fs.create(TEST_PATH, 0o660) |
| 77 | + self.assertIn(TEST_PATH, self.fs.files) |
| 78 | + |
| 79 | + def test_create_file_with_arbitrary_name(self): |
| 80 | + with self.assertRaises(FuseOSError) as context: |
| 81 | + self.fs.create('/arbitraryname', os.O_RDONLY) |
| 82 | + self.assertEqual(context.exception.errno, errno.ENOENT) |
| 83 | + |
| 84 | + def test_open_existing_file(self): |
| 85 | + self.fs.create(TEST_PATH, 0o660) |
| 86 | + self.fs.open(TEST_PATH, os.O_RDONLY) |
| 87 | + |
| 88 | + def test_write_and_read_file(self): |
| 89 | + self.fs.create(TEST_PATH, 0o660) |
| 90 | + |
| 91 | + self.fs.open(TEST_PATH, os.O_RDWR) |
| 92 | + self.fs.write(TEST_PATH, TEST_DATA, 0, None) |
| 93 | + |
| 94 | + self.fs.release(TEST_PATH, None) |
| 95 | + |
| 96 | + self.fs.open(TEST_PATH, os.O_RDONLY) |
| 97 | + |
| 98 | + read_data = self.fs.read(TEST_PATH, len(TEST_DATA), 0, None) |
| 99 | + |
| 100 | + self.assertEqual(read_data, TEST_DATA) |
| 101 | + |
| 102 | + self.fs.release(TEST_PATH, None) |
| 103 | + |
| 104 | + def test_unlink_file(self): |
| 105 | + self.fs.create(TEST_PATH, 0o660) |
| 106 | + self.assertIn(TEST_PATH, self.fs.files) |
| 107 | + |
| 108 | + self.fs.unlink(TEST_PATH) |
| 109 | + self.assertNotIn(TEST_PATH, self.fs.files) |
| 110 | + |
| 111 | + def test_file_not_found(self): |
| 112 | + with self.assertRaises(FuseOSError) as context: |
| 113 | + self.fs.open('/nonexistentfile', os.O_RDONLY) |
| 114 | + self.assertEqual(context.exception.errno, errno.ENOENT) |
| 115 | + |
| 116 | + def test_getattr_root(self): |
| 117 | + attr = self.fs.getattr('/') |
| 118 | + self.assertEqual(stat.S_IFMT(attr['st_mode']), stat.S_IFDIR) |
| 119 | + self.assertEqual(attr['st_mode'] & 0o777, 0o750) |
| 120 | + self.assertEqual(attr['st_nlink'], 2) |
| 121 | + |
| 122 | + def test_getattr_file(self): |
| 123 | + self.fs.create(TEST_PATH, mode=0o660) |
| 124 | + |
| 125 | + attr = self.fs.getattr(TEST_PATH) |
| 126 | + |
| 127 | + self.assertEqual(stat.S_IFMT(attr['st_mode']), stat.S_IFREG) |
| 128 | + self.assertEqual(attr['st_mode'] & 0o777, 0o660) |
| 129 | + self.assertEqual(attr['st_size'], 0) |
| 130 | + self.assertEqual(attr['st_nlink'], 1) |
| 131 | + self.assertEqual(attr['st_uid'], os.getuid()) |
| 132 | + self.assertEqual(attr['st_gid'], os.getgid()) |
| 133 | + self.assertIn('st_atime', attr) |
| 134 | + self.assertIn('st_mtime', attr) |
| 135 | + self.assertIn('st_ctime', attr) |
| 136 | + |
| 137 | + def test_getattr_nonexistent(self): |
| 138 | + with self.assertRaises(OSError) as _: |
| 139 | + self.fs.getattr('/nonexistent') |
| 140 | + |
| 141 | + def test_truncate(self): |
| 142 | + self.fs.create(TEST_PATH, 0o660) |
| 143 | + self.fs.write(TEST_PATH, TEST_DATA, 0, None) |
| 144 | + |
| 145 | + self.fs.truncate(TEST_PATH, REDUCED_SIZE, None) |
| 146 | + file_content = self.fs.read(TEST_PATH, ORIGINAL_SIZE, 0, None) |
| 147 | + self.assertEqual(len(file_content), REDUCED_SIZE) |
| 148 | + self.assertEqual(file_content, TEST_DATA[:REDUCED_SIZE]) |
| 149 | + |
| 150 | + def test_extend(self): |
| 151 | + self.fs.create(TEST_PATH, 0o660) |
| 152 | + self.fs.write(TEST_PATH, TEST_DATA, 0, None) |
| 153 | + |
| 154 | + self.fs.truncate(TEST_PATH, EXTENDED_SIZE, None) |
| 155 | + file_content = self.fs.read(TEST_PATH, EXTENDED_SIZE * 2, 0, None) |
| 156 | + self.assertEqual(file_content[:ORIGINAL_SIZE], TEST_DATA) |
| 157 | + self.assertEqual(len(file_content), EXTENDED_SIZE) |
| 158 | + self.assertTrue(all(byte == 0 for byte in file_content[ORIGINAL_SIZE:])) |
| 159 | + |
| 160 | + def test_readdir(self): |
| 161 | + file_names = [] |
| 162 | + for _ in range(3): |
| 163 | + file_names.append(str(uuid.uuid4())) |
| 164 | + self.fs.create(file_names[-1], 0o660) |
| 165 | + |
| 166 | + directory_contents = self.fs.readdir('/', None) |
| 167 | + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[1], file_names[2]}) |
| 168 | + |
| 169 | + self.fs.unlink(file_names[1]) |
| 170 | + directory_contents = self.fs.readdir('/', None) |
| 171 | + self.assertEqual(set(directory_contents), {'.', '..', file_names[0], file_names[2]}) |
| 172 | + |
| 173 | + @patch("os.chmod") |
| 174 | + def test_chmod_success(self, mock_chmod): |
| 175 | + self.fs.create(TEST_PATH, 0o660) |
| 176 | + mock_chmod.assert_called_with(self.fs.files[TEST_PATH].filepath, 0o660) |
| 177 | + self.fs.chmod(TEST_PATH, 0o640) |
| 178 | + mock_chmod.assert_called_with(self.fs.files[TEST_PATH].filepath, 0o640) |
| 179 | + |
| 180 | + def test_chmod_file_not_found(self): |
| 181 | + with self.assertRaises(FuseOSError) as context: |
| 182 | + self.fs.chmod("/nonexistent", 0o644) |
| 183 | + self.assertEqual(context.exception.errno, errno.ENOENT) |
| 184 | + |
| 185 | + @patch("os.chown") |
| 186 | + def test_chown_success(self, mock_chown): |
| 187 | + self.fs.create(TEST_PATH, 0o660) |
| 188 | + self.fs.chown(TEST_PATH, self.current_uid, self.current_gid) |
| 189 | + mock_chown.assert_called_once_with(self.fs.files[TEST_PATH].filepath, self.current_uid, self.current_gid) |
| 190 | + |
| 191 | + def test_chown_file_not_found(self): |
| 192 | + with self.assertRaises(FuseOSError) as context: |
| 193 | + self.fs.chown("/nonexistent", self.current_uid, self.current_gid) |
| 194 | + self.assertEqual(context.exception.errno, errno.ENOENT) |
| 195 | + |
| 196 | + @patch("os.chown", side_effect=PermissionError) |
| 197 | + def test_chown_permission_error(self, mock_chown): |
| 198 | + self.fs.create(TEST_PATH, 0o660) |
| 199 | + with self.assertRaises(PermissionError): |
| 200 | + self.fs.chown(TEST_PATH, self.current_uid, self.current_gid) |
| 201 | + |
| 202 | + @patch('atexit.register') |
| 203 | + @patch('argparse.ArgumentParser.parse_args') |
| 204 | + @patch('globaleaks_eph_fs.subprocess.run') |
| 205 | + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') |
| 206 | + @patch('globaleaks_eph_fs.FUSE') |
| 207 | + @patch('builtins.print') |
| 208 | + def test_main_mount_with_unspecified_storage_directory(self, mock_print, mock_FUSE, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): |
| 209 | + with tempfile.TemporaryDirectory() as mount_point: |
| 210 | + mock_parse_args.return_value = MagicMock( |
| 211 | + mount_point=mount_point, |
| 212 | + storage_directory=None, |
| 213 | + unmount=False |
| 214 | + ) |
| 215 | + |
| 216 | + original_mount_function = mount_globaleaks_eph_fs |
| 217 | + |
| 218 | + def side_effect_func(mount_point, storage_directory, flag): |
| 219 | + return original_mount_function(mount_point, storage_directory, False) |
| 220 | + |
| 221 | + mock_mount.side_effect = side_effect_func |
| 222 | + |
| 223 | + main() |
| 224 | + |
| 225 | + mock_mount.assert_called_once_with(mount_point, None, True) |
| 226 | + |
| 227 | + mock_atexit_register.assert_called_once_with(unmount_if_mounted, mount_point) |
| 228 | + |
| 229 | + mock_subprocess.assert_called_once_with(['mount'], capture_output=True, text=True) |
| 230 | + |
| 231 | + @patch('atexit.register') |
| 232 | + @patch('argparse.ArgumentParser.parse_args') |
| 233 | + @patch('globaleaks_eph_fs.subprocess.run') |
| 234 | + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') |
| 235 | + @patch('globaleaks_eph_fs.FUSE') |
| 236 | + @patch('builtins.print') |
| 237 | + def test_main_mount_with_specified_storage_directory(self, mock_print, mock_FUSE, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): |
| 238 | + with tempfile.TemporaryDirectory() as mount_point, tempfile.TemporaryDirectory() as storage_directory: |
| 239 | + mock_parse_args.return_value = MagicMock( |
| 240 | + mount_point=mount_point, |
| 241 | + storage_directory=storage_directory |
| 242 | + ) |
| 243 | + |
| 244 | + original_mount_function = mount_globaleaks_eph_fs |
| 245 | + |
| 246 | + def side_effect_func(mount_point, storage_directory, flag): |
| 247 | + return original_mount_function(mount_point, storage_directory, False) |
| 248 | + |
| 249 | + mock_mount.side_effect = side_effect_func |
| 250 | + |
| 251 | + main() |
| 252 | + |
| 253 | + mock_mount.assert_called_once_with(mount_point, storage_directory, True) |
| 254 | + |
| 255 | + mock_atexit_register.assert_called_once_with(unmount_if_mounted, mount_point) |
| 256 | + |
| 257 | + mock_subprocess.assert_called_once_with(['mount'], capture_output=True, text=True) |
| 258 | + |
| 259 | + @patch('atexit.register') |
| 260 | + @patch('argparse.ArgumentParser.parse_args') |
| 261 | + @patch('globaleaks_eph_fs.subprocess.run') |
| 262 | + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') |
| 263 | + @patch('globaleaks_eph_fs.is_mount_point') |
| 264 | + @patch('globaleaks_eph_fs.FUSE') |
| 265 | + @patch('builtins.print') |
| 266 | + def test_main_with_mount_point_check(self, mock_print, mock_FUSE, mock_is_mount_point, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): |
| 267 | + with tempfile.TemporaryDirectory() as mount_point: |
| 268 | + mock_parse_args.return_value = MagicMock( |
| 269 | + mount_point=mount_point, |
| 270 | + storage_directory=None |
| 271 | + ) |
| 272 | + |
| 273 | + mock_is_mount_point.return_value = True |
| 274 | + |
| 275 | + original_mount_function = mount_globaleaks_eph_fs |
| 276 | + |
| 277 | + def side_effect_func(mount_point, storage_directory, flag): |
| 278 | + return original_mount_function(mount_point, storage_directory, False) |
| 279 | + |
| 280 | + mock_mount.side_effect = side_effect_func |
| 281 | + |
| 282 | + main() |
| 283 | + |
| 284 | + mock_mount.assert_called_once_with(mount_point, None, True) |
| 285 | + |
| 286 | + mock_atexit_register.assert_called_once_with(unmount_if_mounted, mount_point) |
| 287 | + |
| 288 | + mock_subprocess.assert_called_once_with(['fusermount', '-u', mount_point]) |
| 289 | + |
| 290 | + @patch('atexit.register') |
| 291 | + @patch('argparse.ArgumentParser.parse_args') |
| 292 | + @patch('globaleaks_eph_fs.subprocess.run') |
| 293 | + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') |
| 294 | + @patch('globaleaks_eph_fs.is_mount_point') |
| 295 | + @patch('globaleaks_eph_fs.FUSE') |
| 296 | + @patch('builtins.print') |
| 297 | + def test_main_keyboard_interrupt(self, mock_print, mock_FUSE, mock_is_mount_point, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): |
| 298 | + with tempfile.TemporaryDirectory() as mount_point: |
| 299 | + mock_parse_args.return_value = MagicMock( |
| 300 | + mount_point=mount_point, |
| 301 | + storage_directory=None |
| 302 | + ) |
| 303 | + |
| 304 | + mock_is_mount_point.return_value = False |
| 305 | + |
| 306 | + mock_mount.side_effect = KeyboardInterrupt |
| 307 | + |
| 308 | + with self.assertRaises(SystemExit): |
| 309 | + main() |
| 310 | + |
| 311 | + mock_mount.assert_called_once_with(mount_point, None, True) |
| 312 | + |
| 313 | + mock_subprocess.assert_not_called() |
| 314 | + |
| 315 | + @patch('atexit.register') |
| 316 | + @patch('argparse.ArgumentParser.parse_args') |
| 317 | + @patch('globaleaks_eph_fs.subprocess.run') |
| 318 | + @patch('globaleaks_eph_fs.mount_globaleaks_eph_fs') |
| 319 | + @patch('globaleaks_eph_fs.is_mount_point') |
| 320 | + @patch('globaleaks_eph_fs.FUSE') |
| 321 | + @patch('builtins.print') |
| 322 | + def test_main_other_exception(self, mock_print, mock_FUSE, mock_is_mount_point, mock_mount, mock_subprocess, mock_parse_args, mock_atexit_register): |
| 323 | + with tempfile.TemporaryDirectory() as mount_point: |
| 324 | + mock_parse_args.return_value = MagicMock( |
| 325 | + mount_point=mount_point, |
| 326 | + storage_directory=None |
| 327 | + ) |
| 328 | + |
| 329 | + mock_is_mount_point.return_value = False |
| 330 | + |
| 331 | + mock_mount.side_effect = Exception("Some unexpected error") |
| 332 | + |
| 333 | + with self.assertRaises(SystemExit): |
| 334 | + main() |
| 335 | + |
| 336 | + mock_mount.assert_called_once_with(mount_point, None, True) |
| 337 | + |
| 338 | + mock_atexit_register.assert_not_called() |
| 339 | + |
| 340 | + mock_subprocess.assert_not_called() |
| 341 | + |
| 342 | +if __name__ == '__main__': |
| 343 | + unittest.main() |
0 commit comments