diff --git a/projects/python-multipart/fuzz_form.patch b/projects/python-multipart/fuzz_form.patch index 6d0b6421a81a..fc1fd3b43c5e 100644 --- a/projects/python-multipart/fuzz_form.patch +++ b/projects/python-multipart/fuzz_form.patch @@ -1,13 +1,106 @@ diff --git a/fuzz/fuzz_form.py b/fuzz/fuzz_form.py -index 9a3d854..fbc6ad9 100644 +index 9a3d854..2d519ce 100644 --- a/fuzz/fuzz_form.py +++ b/fuzz/fuzz_form.py -@@ -29,7 +29,7 @@ def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None: - - - def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None: +@@ -1,6 +1,5 @@ + import io + import sys +-from unittest.mock import Mock + + import atheris + from helpers import EnhancedDataProvider +@@ -9,40 +8,78 @@ with atheris.instrument_imports(): + from python_multipart.exceptions import FormParserError + from python_multipart.multipart import parse_form + +-on_field = Mock() +-on_file = Mock() ++ ++# Simple no-op callbacks, stateless and safe to share across iterations. ++def _on_field(field) -> None: ++ pass ++ ++ ++def _on_file(file) -> None: ++ file.close() + + + def parse_octet_stream(fdp: EnhancedDataProvider) -> None: + header = {"Content-Type": "application/octet-stream"} +- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) ++ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) + + + def parse_url_encoded(fdp: EnhancedDataProvider) -> None: + header = {"Content-Type": "application/x-url-encoded"} +- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) ++ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) + + + def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None: + header = {"Content-Type": "application/x-www-form-urlencoded"} +- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) ++ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) ++ ++ ++def parse_multipart_raw(fdp: EnhancedDataProvider) -> None: ++ """Fuzz both the boundary value and the raw body to stress-test boundary matching.""" ++ # Boundary: 1-70 bytes, no CR/LF (RFC 2046 constraint kept to avoid ValueError). ++ boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2))) ++ boundary = fdp.ConsumeBytes(boundary_len) ++ boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" ++ header = {"Content-Type": "multipart/form-data; boundary=" + boundary.decode("latin-1")} ++ body = fdp.ConsumeRandomBytes() ++ parse_form(header, io.BytesIO(body), _on_field, _on_file) + + +-def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None: - boundary = "boundary" -+ boundary = fdp.ConsumeRandomStringOfSize(16) or "boundary" - header = {"Content-Type": f"multipart/form-data; boundary={boundary}"} - body = ( - f"--{boundary}\r\n" +- header = {"Content-Type": f"multipart/form-data; boundary={boundary}"} +- body = ( +- f"--{boundary}\r\n" +- f"Content-Type: multipart/form-data; boundary={boundary}\r\n\r\n" +- f"{fdp.ConsumeRandomString()}\r\n" +- f"--{boundary}--\r\n" +- ) +- parse_form(header, io.BytesIO(body.encode("latin1", errors="ignore")), on_field, on_file) ++def parse_multipart_with_content_length(fdp: EnhancedDataProvider) -> None: ++ """Fuzz Content-Length handling together with multipart parsing.""" ++ boundary = b"boundary" ++ content_length = fdp.ConsumeIntInRange(0, 1024) ++ header = { ++ "Content-Type": "multipart/form-data; boundary=boundary", ++ "Content-Length": str(content_length), ++ } ++ body = fdp.ConsumeRandomBytes() ++ parse_form(header, io.BytesIO(body), _on_field, _on_file) ++ ++ ++def parse_form_urlencoded_chunked(fdp: EnhancedDataProvider) -> None: ++ """Feed URL-encoded body in small chunks to exercise streaming state machine.""" ++ from python_multipart.multipart import create_form_parser ++ ++ num_chunks = fdp.ConsumeIntInRange(1, 8) ++ header = {"Content-Type": "application/x-www-form-urlencoded"} ++ parser = create_form_parser(header, _on_field, _on_file) ++ body = fdp.ConsumeRandomBytes() ++ chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) ++ for i in range(0, len(body), chunk_size): ++ parser.write(body[i : i + chunk_size]) ++ parser.finalize() + + + def TestOneInput(data: bytes) -> None: + fdp = EnhancedDataProvider(data) +- targets = [parse_octet_stream, parse_url_encoded, parse_form_urlencoded, parse_multipart_form_data] ++ targets = [ ++ parse_octet_stream, ++ parse_url_encoded, ++ parse_form_urlencoded, ++ parse_multipart_raw, ++ parse_multipart_with_content_length, ++ parse_form_urlencoded_chunked, ++ ] + target = fdp.PickValueInList(targets) + + try: