diff --git a/app/api/v2/handlers/payload_api.py b/app/api/v2/handlers/payload_api.py index 1b034a332..9d16cc513 100644 --- a/app/api/v2/handlers/payload_api.py +++ b/app/api/v2/handlers/payload_api.py @@ -12,6 +12,20 @@ from app.api.v2.schemas.payload_schemas import PayloadQuerySchema, PayloadSchema, PayloadCreateRequestSchema, \ PayloadDeleteRequestSchema +# Extensions that could be executed server-side and must never be accepted as +# uploaded payloads (CWE-94 / Remote Code Execution mitigation). +_BLOCKED_EXTENSIONS = frozenset({'.py', '.pyc', '.pyo', '.so', '.dll'}) + + +def _validate_payload_extension(filename: str) -> None: + """Raise HTTPBadRequest if the file extension is on the server-side executable blocklist.""" + # Normalize the filename so the extension check reflects how it will be stored + # on filesystems like Windows that ignore trailing dots and spaces. + normalized = filename.rstrip(". ") + ext = os.path.splitext(normalized)[1].lower() + if ext in _BLOCKED_EXTENSIONS: + raise web.HTTPBadRequest(reason=f"File type {ext!r} is not allowed as a payload") + class PayloadApi(BaseApi): def __init__(self, services): @@ -73,6 +87,9 @@ async def post_payloads(self, request: web.Request): # Sanitize the file name to prevent directory traversal sanitized_filename = self.sanitize_filename(file_field.filename) + # Block server-side executable file types (CWE-94 RCE mitigation) + _validate_payload_extension(sanitized_filename) + # Generate the file name and path file_name, file_path = await self.__generate_file_name_and_path(sanitized_filename) diff --git a/tests/security/test_payload_extension_blocking.py b/tests/security/test_payload_extension_blocking.py new file mode 100644 index 000000000..b09850059 --- /dev/null +++ b/tests/security/test_payload_extension_blocking.py @@ -0,0 +1,53 @@ +""" +Security regression tests for payload upload extension blocking (CWE-94 fix). + +Verifies that _validate_payload_extension() correctly rejects server-side +executable file types (.py, .pyc, .pyo, .so, .dll). +""" +import pytest +from aiohttp import web + +from app.api.v2.handlers.payload_api import _validate_payload_extension + + +class TestPayloadExtensionBlocking: + + @pytest.mark.parametrize( + "filename", + [ + "malicious.py", + "malicious.pyc", + "malicious.pyo", + "exploit.so", + "evil.dll", + "malicious.PY", + "malicious.Py", + # Files with a safe-looking extension followed by a blocked one + "legit.txt.py", + ], + ) + def test_blocked_filenames_rejected(self, filename): + with pytest.raises(web.HTTPBadRequest): + _validate_payload_extension(filename) + + def test_exe_allowed(self): + """Agent binaries (.exe) must still be uploadable.""" + _validate_payload_extension('sandcat.exe') # should not raise + + def test_elf_allowed(self): + _validate_payload_extension('sandcat-linux') # no extension — should not raise + + def test_zip_allowed(self): + _validate_payload_extension('payloads.zip') + + def test_go_allowed(self): + _validate_payload_extension('manx.go') + + def test_blocked_extensions_set(self): + """Verify the constant is a frozenset and contains at least the known dangerous types.""" + from app.api.v2.handlers.payload_api import _BLOCKED_EXTENSIONS as _BLK + assert isinstance(_BLK, frozenset), "_BLOCKED_EXTENSIONS must be a frozenset" + expected_minimum = frozenset({'.py', '.pyc', '.pyo', '.so', '.dll'}) + assert _BLK.issuperset(expected_minimum), ( + f"_BLOCKED_EXTENSIONS missing required extensions: {expected_minimum - _BLK}" + )