diff --git a/CHANGELOG.md b/CHANGELOG.md index 13135d8..9492829 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,10 +8,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] +## Added + +- Added `RestrictedFilestore` to limit the file access of the `NativeFilestore` to a specific + directory. + ## Fixed - Correction for `InvalidDestinationId` exception arguments in destination handler. -- Destination handler now only checks entity ID values when checking inserted packets +- Destination handler now only checks entity ID values when checking inserted packets. +- Source handler used an incorrect check if the file exists without the virtual filestore. # [v0.4.0] 2024-11-08 diff --git a/src/cfdppy/__init__.py b/src/cfdppy/__init__.py index 234edd5..27941f7 100644 --- a/src/cfdppy/__init__.py +++ b/src/cfdppy/__init__.py @@ -5,7 +5,7 @@ from spacepackets.cfdp import TransactionId from .defs import CfdpIndication, CfdpState -from .filestore import HostFilestore +from .filestore import HostFilestore, VirtualFilestore from .handler.common import PacketDestination, get_packet_destination from .mib import ( IndicationCfg, @@ -14,6 +14,7 @@ RemoteEntityCfgTable, ) from .request import PutRequest +from .restricted_filestore import RestrictedFilestore from .user import CfdpUserBase __all__ = [ @@ -27,6 +28,8 @@ "PutRequest", "RemoteEntityCfg", "RemoteEntityCfgTable", + "RestrictedFilestore", "TransactionId", + "VirtualFilestore", "get_packet_destination", ] diff --git a/src/cfdppy/filestore.py b/src/cfdppy/filestore.py index 950bee1..ea007b6 100644 --- a/src/cfdppy/filestore.py +++ b/src/cfdppy/filestore.py @@ -7,6 +7,7 @@ import os import platform import shutil +import subprocess from typing import TYPE_CHECKING, BinaryIO from crcmod.predefined import PredefinedCrc @@ -310,24 +311,28 @@ def list_directory( :param recursive: :return: """ + if not dir_name.exists() or not dir_name.is_dir(): + _LOGGER.warning(f"{dir_name} does not exist or is not a directory") + return FilestoreResponseStatusCode.NOT_PERFORMED + + if platform.system() == "Linux" or platform.system() == "Darwin": + cmd = ["ls", "-al"] + elif platform.system() == "Windows": + cmd = ["dir"] + else: + _LOGGER.warning(f"Unknown OS {platform.system()}, do not know how to list directory") + return FilestoreResponseStatusCode.NOT_PERFORMED + open_flag = "a" if target_file.exists() else "w" with open(target_file, open_flag) as of: - if platform.system() == "Linux" or platform.system() == "Darwin": - cmd = "ls -al" - elif platform.system() == "Windows": - cmd = "dir" - else: - _LOGGER.warning( - f"Unknown OS {platform.system()}, do not know how to list directory" - ) - return FilestoreResponseStatusCode.NOT_PERFORMED of.write(f"Contents of directory {dir_name} generated with '{cmd}':\n") - curr_path = os.getcwd() - os.chdir(dir_name) - os.system( # noqa S605 TODO this is dangerous as the user has control over the target_file - f'{cmd} >> "{target_file}"' - ) - os.chdir(curr_path) + try: + # cmd is not modified by user input and dir_name has been checked above + result = subprocess.run(cmd, check=True, capture_output=True, cwd=dir_name) # noqa S603 + except (subprocess.CalledProcessError, OSError) as e: + _LOGGER.error(f"Failed to list directory {dir_name}: {e}") + return FilestoreResponseStatusCode.NOT_PERFORMED + of.write(result.stdout.decode()) return FilestoreResponseStatusCode.SUCCESS def _verify_checksum(self, checksum_type: ChecksumType) -> None: diff --git a/src/cfdppy/handler/source.py b/src/cfdppy/handler/source.py index 883d893..5290c95 100644 --- a/src/cfdppy/handler/source.py +++ b/src/cfdppy/handler/source.py @@ -573,7 +573,7 @@ def _prepare_file_params(self) -> None: self._params.fp.metadata_only = True else: assert self._put_req.source_file is not None - if not self._put_req.source_file.exists(): + if not self.user.vfs.file_exists(self._put_req.source_file): # TODO: Handle this exception in the handler, reset CFDP state machine raise SourceFileDoesNotExist(self._put_req.source_file) file_size = self.user.vfs.file_size(self._put_req.source_file) diff --git a/src/cfdppy/restricted_filestore.py b/src/cfdppy/restricted_filestore.py new file mode 100644 index 0000000..43d0465 --- /dev/null +++ b/src/cfdppy/restricted_filestore.py @@ -0,0 +1,149 @@ +"""Wrapper to restrict filestore access to a specific directory. + +This class will limit the filestore access to a specific directory. +All relative paths will be relative to this directory. +All absolute paths will be converted to subpaths of the restricted path e.g. + /tmp/file.txt -> /restricted_path/tmp/file.txt + +This is not a security feature but a convenience feature to limit filestore +access to a specific directory. +""" + +from __future__ import annotations # Python 3.9 compatibility for | syntax + +from typing import TYPE_CHECKING + +from cfdppy.filestore import NativeFilestore + +if TYPE_CHECKING: + from pathlib import Path + + from spacepackets.cfdp import ChecksumType, FilestoreResponseStatusCode + + +class RestrictedFilestore(NativeFilestore): + """Wrapper to restrict filestore access to a specific directory.""" + + def __init__(self, restricted_path: Path): + """Create a new RestrictedFilestore instance. + + The path is used to restrict all paths as relative to this path. + Absolute paths will be converted to subpaths of the restricted path + keeping the original path structure. + + :param restricted_path: Path to restrict the filestore to + """ + super().__init__() + self.restricted_path = restricted_path + + def __make_local(self, file: Path) -> Path: + """Make file paths subfolders of the restricted path. + + :param file: File to make relative to the restricted path + :return: New Path + """ + if not file.is_relative_to(self.restricted_path): + if file.is_absolute(): + return self.restricted_path.joinpath(file.relative_to(file.anchor)) + return self.restricted_path.joinpath(file) + return file + + def read_data(self, file: Path, offset: int | None, read_len: int | None = None) -> bytes: + """Read data from file.""" + return super().read_data(self.__make_local(file), offset, read_len) + + def is_directory(self, path: Path) -> bool: + """Check if path is a directory.""" + return super().is_directory(self.__make_local(path)) + + def filename_from_full_path(self, path: Path) -> str | None: + """Get filename from full path.""" + return super().filename_from_full_path(self.__make_local(path)) + + def file_exists(self, path: Path) -> bool: + """Check if file exists.""" + return super().file_exists(self.__make_local(path)) + + def truncate_file(self, file: Path) -> None: + """Truncate file.""" + return super().truncate_file(self.__make_local(file)) + + def file_size(self, file: Path) -> int: + """Get file size.""" + return super().file_size(self.__make_local(file)) + + def write_data(self, file: Path, data: bytes, offset: int | None) -> None: + """Write data to file.""" + return super().write_data(self.__make_local(file), data, offset) + + def create_file(self, file: Path) -> FilestoreResponseStatusCode: + """Create file.""" + return super().create_file(self.__make_local(file)) + + def delete_file(self, file: Path) -> FilestoreResponseStatusCode: + """Delete file.""" + return super().delete_file(self.__make_local(file)) + + def rename_file(self, _old_file: Path, _new_file: Path) -> FilestoreResponseStatusCode: + """Rename file.""" + return super().rename_file(self.__make_local(_old_file), self.__make_local(_new_file)) + + def replace_file(self, _replaced_file: Path, _source_file: Path) -> FilestoreResponseStatusCode: + """Replace file.""" + return super().replace_file( + self.__make_local(_replaced_file), self.__make_local(_source_file) + ) + + def create_directory(self, _dir_name: Path) -> FilestoreResponseStatusCode: + """Create directory.""" + return super().create_directory(self.__make_local(_dir_name)) + + def remove_directory( + self, dir_name: Path, recursive: bool = False + ) -> FilestoreResponseStatusCode: + """Remove directory.""" + return super().remove_directory(dir_name=self.__make_local(dir_name), recursive=recursive) + + def list_directory( + self, _dir_name: Path, _file_name: Path, _recursive: bool = False + ) -> FilestoreResponseStatusCode: + """List directory contents.""" + return super().list_directory( + self.__make_local(_dir_name), self.__make_local(_file_name), _recursive + ) + + def calculate_checksum( + self, + checksum_type: ChecksumType, + file_path: Path, + size_to_verify: int, + segment_len: int = 4096, + ) -> bytes: + """Calculate checksum of file. + + :param checksum_type: Type of checksum + :param file_path: Path to file + :param size_to_verify: Size to check in bytes + :param segment_len: Length of segments to calculate checksum for + :return: checksum as bytes + """ + return super().calculate_checksum( + checksum_type, self.__make_local(file_path), size_to_verify, segment_len + ) + + def verify_checksum( + self, + checksum: bytes, + checksum_type: ChecksumType, + file_path: Path, + size_to_verify: int, + segment_len: int = 4096, + ) -> bool: + """Verify checksum of file.""" + return super().verify_checksum( + checksum=checksum, + checksum_type=checksum_type, + file_path=self.__make_local(file_path), + size_to_verify=size_to_verify, + segment_len=segment_len, + ) diff --git a/tests/test_filestore.py b/tests/test_filestore.py index adc19dd..24d5d3e 100644 --- a/tests/test_filestore.py +++ b/tests/test_filestore.py @@ -1,9 +1,11 @@ import os.path +import shutil import struct import tempfile from pathlib import Path +from unittest import TestCase -from pyfakefs.fake_filesystem_unittest import TestCase +from spacepackets.cfdp import ChecksumType from cfdppy.crc import calc_modular_checksum from cfdppy.filestore import FilestoreResult, NativeFilestore @@ -31,8 +33,7 @@ class TestCfdpHostFilestore(TestCase): def setUp(self): - self.setUpPyfakefs() - self.temp_dir = tempfile.gettempdir() + self.temp_dir = tempfile.mkdtemp() self.test_file_name_0 = Path(f"{self.temp_dir}/cfdp_unittest0.txt") self.test_file_name_1 = Path(f"{self.temp_dir}/cfdp_unittest1.txt") self.test_dir_name_0 = Path(f"{self.temp_dir}/cfdp_test_folder0") @@ -40,7 +41,7 @@ def setUp(self): self.test_list_dir_name = Path(f"{self.temp_dir}/list-dir-test.txt") self.filestore = NativeFilestore() - self.file_path = Path(f"{tempfile.gettempdir()}/crc_file") + self.file_path = Path(f"{self.temp_dir}/crc_file") with open(self.file_path, "wb") as file: file.write(EXAMPLE_DATA_CFDP) # Kind of re-writing the modular checksum impl here which we are trying to test, but the @@ -63,6 +64,9 @@ def setUp(self): self.expected_checksum_for_example = struct.pack("!I", full_sum) + def tearDown(self): + shutil.rmtree(self.temp_dir) + def test_creation(self): res = self.filestore.create_file(self.test_file_name_0) self.assertTrue(res == FilestoreResult.CREATE_SUCCESS) @@ -140,6 +144,11 @@ def test_list_dir(self): def test_modular_checksum(self): self.assertEqual(calc_modular_checksum(self.file_path), self.expected_checksum_for_example) - def tearDown(self): - if self.file_path.exists(): - os.remove(self.file_path) + def test_zero_length_checksum(self): + with self.assertRaises(ValueError): + self.filestore.calculate_checksum( + checksum_type=ChecksumType.CRC_32, + file_path=self.file_path, + size_to_verify=10, + segment_len=0, + ) diff --git a/tests/test_restricted_filestore.py b/tests/test_restricted_filestore.py new file mode 100644 index 0000000..b546518 --- /dev/null +++ b/tests/test_restricted_filestore.py @@ -0,0 +1,153 @@ +import tempfile +from pathlib import Path +from unittest import TestCase + +from crcmod.predefined import PredefinedCrc +from spacepackets.cfdp import ChecksumType, FilestoreResponseStatusCode + +from cfdppy import RestrictedFilestore +from cfdppy.crc import calc_modular_checksum + + +class TestFileSystem(TestCase): + def test_handle_files(self): + with tempfile.TemporaryDirectory() as tempdir: + filestore = RestrictedFilestore(restricted_path=Path(tempdir)) + file_path = Path(tempdir) / "test_file.txt" + file_path.write_text("test") + self.assertTrue(filestore.file_exists(file_path)) + self.assertEqual(4, filestore.file_size(file_path)) + self.assertEqual("test_file.txt", filestore.filename_from_full_path(file_path)) + self.assertFalse(filestore.is_directory(file_path)) + self.assertEqual(b"test", filestore.read_data(file_path, offset=None)) + filestore.truncate_file(file_path) + filestore.write_data(file_path, data=b"new", offset=None) + self.assertEqual("new", file_path.read_text()) + filestore.truncate_file(file_path) + self.assertEqual("", file_path.read_text()) + + # Test create new file + result = filestore.create_file(Path("new_file.txt")) + self.assertEqual(result, FilestoreResponseStatusCode.SUCCESS) + self.assertTrue(Path(tempdir).joinpath("new_file.txt").exists()) + self.assertEqual(0, filestore.file_size(Path("new_file.txt"))) + # Rename + result = filestore.rename_file(Path("new_file.txt"), Path("renamed_file.txt")) + self.assertEqual(result, FilestoreResponseStatusCode.RENAME_SUCCESS) + self.assertTrue(Path(tempdir).joinpath("renamed_file.txt").exists()) + # Replace + result = filestore.replace_file(Path("renamed_file.txt"), Path("test_file.txt")) + self.assertEqual(FilestoreResponseStatusCode.REPLACE_SUCCESS, result) + # Delete + result = filestore.delete_file(Path("renamed_file.txt")) + self.assertEqual(result, FilestoreResponseStatusCode.DELETE_SUCCESS) + self.assertFalse(Path(tempdir).joinpath("renamed_file.txt").exists()) + + def test_handle_directories(self): + with tempfile.TemporaryDirectory() as tempdir: + filestore = RestrictedFilestore(restricted_path=Path(tempdir)) + dir_path = Path(tempdir) / "test_dir" + dir_path.mkdir() + self.assertTrue(filestore.is_directory(Path("test_dir"))) + self.assertEqual( + FilestoreResponseStatusCode.REMOVE_DIR_SUCCESS, + filestore.remove_directory(Path("test_dir"), recursive=False), + ) + + self.assertEqual( + FilestoreResponseStatusCode.CREATE_DIR_SUCCESS, + filestore.create_directory(Path("new_dir")), + ) + self.assertTrue(Path(tempdir).joinpath("new_dir").exists()) + self.assertEqual( + FilestoreResponseStatusCode.REMOVE_DIR_SUCCESS, + filestore.remove_directory(Path("new_dir")), + ) + self.assertFalse(Path(tempdir).joinpath("new_dir").exists()) + # Test list directory + + filestore.create_directory(Path("test_dir")) + file_path = Path(tempdir).joinpath("test_dir").joinpath("should_be_in_list.txt") + file_path.write_text(data="test") + self.assertEqual( + FilestoreResponseStatusCode.SUCCESS, + filestore.list_directory(Path("test_dir"), Path("test_list.txt")), + ) + data = filestore.read_data(Path("test_list.txt"), offset=None) + self.assertIn("should_be_in_list.txt", data.decode()) + + def test_absolute(self): + with tempfile.TemporaryDirectory() as tempdir: + filestore = RestrictedFilestore(restricted_path=Path(tempdir)) + with self.assertRaises(FileNotFoundError): + filestore.read_data(Path(__file__), offset=None) + + def test_checksum(self): + with tempfile.TemporaryDirectory() as tempdir: + filestore = RestrictedFilestore(restricted_path=Path(tempdir)) + filestore.create_file(Path("test_file.txt")) + filestore.write_data(Path("test_file.txt"), data=b"test", offset=None) + + crc = PredefinedCrc(crc_name="crc32") + checksum = crc.new(b"test").digest() + + self.assertTrue( + filestore.verify_checksum( + checksum=checksum, + checksum_type=ChecksumType.CRC_32, + file_path=Path("test_file.txt"), + size_to_verify=10, + ) + ) + self.assertTrue( + filestore.verify_checksum( + checksum=checksum, + checksum_type=ChecksumType.CRC_32, + file_path=Path(tempdir).joinpath("test_file.txt"), + size_to_verify=10, + ) + ) + self.assertEqual( + checksum, + filestore.calculate_checksum( + checksum_type=ChecksumType.CRC_32, + file_path=Path("test_file.txt"), + size_to_verify=10, + ), + ) + self.assertFalse( + filestore.verify_checksum( + checksum=b"NotRight", + checksum_type=ChecksumType.CRC_32, + file_path=Path("test_file.txt"), + size_to_verify=10, + ) + ) + + # No existing file + with self.assertRaises(FileNotFoundError): + filestore.calculate_checksum( + checksum_type=ChecksumType.CRC_32, + file_path=Path("no_file.txt"), + size_to_verify=10, + ) + + # No checksum + self.assertEqual( + bytes([0x0] * 4), + filestore.calculate_checksum( + checksum_type=ChecksumType.NULL_CHECKSUM, + file_path=Path("test_file.txt"), + size_to_verify=10, + ), + ) + modular_checksum = calc_modular_checksum(Path(tempdir).joinpath("test_file.txt")) + + self.assertTrue( + filestore.verify_checksum( + checksum=modular_checksum, + checksum_type=ChecksumType.MODULAR, + file_path=Path("test_file.txt"), + size_to_verify=10, + ) + )