Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 102 additions & 9 deletions projects/python-multipart/fuzz_form.patch
Original file line number Diff line number Diff line change
@@ -1,13 +1,106 @@
diff --git a/fuzz/fuzz_form.py b/fuzz/fuzz_form.py
index 9a3d854..fbc6ad9 100644
index 9a3d854..2d519ce 100644
--- a/fuzz/fuzz_form.py
+++ b/fuzz/fuzz_form.py
@@ -29,7 +29,7 @@ def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None:


def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None:
@@ -1,6 +1,5 @@
import io
import sys
-from unittest.mock import Mock

import atheris
from helpers import EnhancedDataProvider
@@ -9,40 +8,78 @@ with atheris.instrument_imports():
from python_multipart.exceptions import FormParserError
from python_multipart.multipart import parse_form

-on_field = Mock()
-on_file = Mock()
+
+# Simple no-op callbacks, stateless and safe to share across iterations.
+def _on_field(field) -> None:
+ pass
+
+
+def _on_file(file) -> None:
+ file.close()


def parse_octet_stream(fdp: EnhancedDataProvider) -> None:
header = {"Content-Type": "application/octet-stream"}
- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file)
+ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file)


def parse_url_encoded(fdp: EnhancedDataProvider) -> None:
header = {"Content-Type": "application/x-url-encoded"}
- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file)
+ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file)


def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None:
header = {"Content-Type": "application/x-www-form-urlencoded"}
- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file)
+ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file)
+
+
+def parse_multipart_raw(fdp: EnhancedDataProvider) -> None:
+ """Fuzz both the boundary value and the raw body to stress-test boundary matching."""
+ # Boundary: 1-70 bytes, no CR/LF (RFC 2046 constraint kept to avoid ValueError).
+ boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2)))
+ boundary = fdp.ConsumeBytes(boundary_len)
+ boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B"
+ header = {"Content-Type": "multipart/form-data; boundary=" + boundary.decode("latin-1")}
+ body = fdp.ConsumeRandomBytes()
+ parse_form(header, io.BytesIO(body), _on_field, _on_file)


-def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None:
- boundary = "boundary"
+ boundary = fdp.ConsumeRandomStringOfSize(16) or "boundary"
header = {"Content-Type": f"multipart/form-data; boundary={boundary}"}
body = (
f"--{boundary}\r\n"
- header = {"Content-Type": f"multipart/form-data; boundary={boundary}"}
- body = (
- f"--{boundary}\r\n"
- f"Content-Type: multipart/form-data; boundary={boundary}\r\n\r\n"
- f"{fdp.ConsumeRandomString()}\r\n"
- f"--{boundary}--\r\n"
- )
- parse_form(header, io.BytesIO(body.encode("latin1", errors="ignore")), on_field, on_file)
+def parse_multipart_with_content_length(fdp: EnhancedDataProvider) -> None:
+ """Fuzz Content-Length handling together with multipart parsing."""
+ boundary = b"boundary"
+ content_length = fdp.ConsumeIntInRange(0, 1024)
+ header = {
+ "Content-Type": "multipart/form-data; boundary=boundary",
+ "Content-Length": str(content_length),
+ }
+ body = fdp.ConsumeRandomBytes()
+ parse_form(header, io.BytesIO(body), _on_field, _on_file)
+
+
+def parse_form_urlencoded_chunked(fdp: EnhancedDataProvider) -> None:
+ """Feed URL-encoded body in small chunks to exercise streaming state machine."""
+ from python_multipart.multipart import create_form_parser
+
+ num_chunks = fdp.ConsumeIntInRange(1, 8)
+ header = {"Content-Type": "application/x-www-form-urlencoded"}
+ parser = create_form_parser(header, _on_field, _on_file)
+ body = fdp.ConsumeRandomBytes()
+ chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks)
+ for i in range(0, len(body), chunk_size):
+ parser.write(body[i : i + chunk_size])
+ parser.finalize()


def TestOneInput(data: bytes) -> None:
fdp = EnhancedDataProvider(data)
- targets = [parse_octet_stream, parse_url_encoded, parse_form_urlencoded, parse_multipart_form_data]
+ targets = [
+ parse_octet_stream,
+ parse_url_encoded,
+ parse_form_urlencoded,
+ parse_multipart_raw,
+ parse_multipart_with_content_length,
+ parse_form_urlencoded_chunked,
+ ]
target = fdp.PickValueInList(targets)

try: