|
1 | 1 | diff --git a/fuzz/fuzz_form.py b/fuzz/fuzz_form.py |
2 | | -index 9a3d854..fbc6ad9 100644 |
| 2 | +index 9a3d854..2d519ce 100644 |
3 | 3 | --- a/fuzz/fuzz_form.py |
4 | 4 | +++ b/fuzz/fuzz_form.py |
5 | | -@@ -29,7 +29,7 @@ def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None: |
6 | | - |
7 | | - |
8 | | - def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None: |
| 5 | +@@ -1,6 +1,5 @@ |
| 6 | + import io |
| 7 | + import sys |
| 8 | +-from unittest.mock import Mock |
| 9 | + |
| 10 | + import atheris |
| 11 | + from helpers import EnhancedDataProvider |
| 12 | +@@ -9,40 +8,78 @@ with atheris.instrument_imports(): |
| 13 | + from python_multipart.exceptions import FormParserError |
| 14 | + from python_multipart.multipart import parse_form |
| 15 | + |
| 16 | +-on_field = Mock() |
| 17 | +-on_file = Mock() |
| 18 | ++ |
| 19 | ++# Simple no-op callbacks, stateless and safe to share across iterations. |
| 20 | ++def _on_field(field) -> None: |
| 21 | ++ pass |
| 22 | ++ |
| 23 | ++ |
| 24 | ++def _on_file(file) -> None: |
| 25 | ++ file.close() |
| 26 | + |
| 27 | + |
| 28 | + def parse_octet_stream(fdp: EnhancedDataProvider) -> None: |
| 29 | + header = {"Content-Type": "application/octet-stream"} |
| 30 | +- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) |
| 31 | ++ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) |
| 32 | + |
| 33 | + |
| 34 | + def parse_url_encoded(fdp: EnhancedDataProvider) -> None: |
| 35 | + header = {"Content-Type": "application/x-url-encoded"} |
| 36 | +- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) |
| 37 | ++ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) |
| 38 | + |
| 39 | + |
| 40 | + def parse_form_urlencoded(fdp: EnhancedDataProvider) -> None: |
| 41 | + header = {"Content-Type": "application/x-www-form-urlencoded"} |
| 42 | +- parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), on_field, on_file) |
| 43 | ++ parse_form(header, io.BytesIO(fdp.ConsumeRandomBytes()), _on_field, _on_file) |
| 44 | ++ |
| 45 | ++ |
| 46 | ++def parse_multipart_raw(fdp: EnhancedDataProvider) -> None: |
| 47 | ++ """Fuzz both the boundary value and the raw body to stress-test boundary matching.""" |
| 48 | ++ # Boundary: 1-70 bytes, no CR/LF (RFC 2046 constraint kept to avoid ValueError). |
| 49 | ++ boundary_len = fdp.ConsumeIntInRange(1, max(1, min(70, fdp.remaining_bytes() // 2))) |
| 50 | ++ boundary = fdp.ConsumeBytes(boundary_len) |
| 51 | ++ boundary = boundary.replace(b"\r", b"-").replace(b"\n", b"-").rstrip(b" \t") or b"B" |
| 52 | ++ header = {"Content-Type": "multipart/form-data; boundary=" + boundary.decode("latin-1")} |
| 53 | ++ body = fdp.ConsumeRandomBytes() |
| 54 | ++ parse_form(header, io.BytesIO(body), _on_field, _on_file) |
| 55 | + |
| 56 | + |
| 57 | +-def parse_multipart_form_data(fdp: EnhancedDataProvider) -> None: |
9 | 58 | - boundary = "boundary" |
10 | | -+ boundary = fdp.ConsumeRandomStringOfSize(16) or "boundary" |
11 | | - header = {"Content-Type": f"multipart/form-data; boundary={boundary}"} |
12 | | - body = ( |
13 | | - f"--{boundary}\r\n" |
| 59 | +- header = {"Content-Type": f"multipart/form-data; boundary={boundary}"} |
| 60 | +- body = ( |
| 61 | +- f"--{boundary}\r\n" |
| 62 | +- f"Content-Type: multipart/form-data; boundary={boundary}\r\n\r\n" |
| 63 | +- f"{fdp.ConsumeRandomString()}\r\n" |
| 64 | +- f"--{boundary}--\r\n" |
| 65 | +- ) |
| 66 | +- parse_form(header, io.BytesIO(body.encode("latin1", errors="ignore")), on_field, on_file) |
| 67 | ++def parse_multipart_with_content_length(fdp: EnhancedDataProvider) -> None: |
| 68 | ++ """Fuzz Content-Length handling together with multipart parsing.""" |
| 69 | ++ boundary = b"boundary" |
| 70 | ++ content_length = fdp.ConsumeIntInRange(0, 1024) |
| 71 | ++ header = { |
| 72 | ++ "Content-Type": "multipart/form-data; boundary=boundary", |
| 73 | ++ "Content-Length": str(content_length), |
| 74 | ++ } |
| 75 | ++ body = fdp.ConsumeRandomBytes() |
| 76 | ++ parse_form(header, io.BytesIO(body), _on_field, _on_file) |
| 77 | ++ |
| 78 | ++ |
| 79 | ++def parse_form_urlencoded_chunked(fdp: EnhancedDataProvider) -> None: |
| 80 | ++ """Feed URL-encoded body in small chunks to exercise streaming state machine.""" |
| 81 | ++ from python_multipart.multipart import create_form_parser |
| 82 | ++ |
| 83 | ++ num_chunks = fdp.ConsumeIntInRange(1, 8) |
| 84 | ++ header = {"Content-Type": "application/x-www-form-urlencoded"} |
| 85 | ++ parser = create_form_parser(header, _on_field, _on_file) |
| 86 | ++ body = fdp.ConsumeRandomBytes() |
| 87 | ++ chunk_size = max(1, (len(body) + num_chunks - 1) // num_chunks) |
| 88 | ++ for i in range(0, len(body), chunk_size): |
| 89 | ++ parser.write(body[i : i + chunk_size]) |
| 90 | ++ parser.finalize() |
| 91 | + |
| 92 | + |
| 93 | + def TestOneInput(data: bytes) -> None: |
| 94 | + fdp = EnhancedDataProvider(data) |
| 95 | +- targets = [parse_octet_stream, parse_url_encoded, parse_form_urlencoded, parse_multipart_form_data] |
| 96 | ++ targets = [ |
| 97 | ++ parse_octet_stream, |
| 98 | ++ parse_url_encoded, |
| 99 | ++ parse_form_urlencoded, |
| 100 | ++ parse_multipart_raw, |
| 101 | ++ parse_multipart_with_content_length, |
| 102 | ++ parse_form_urlencoded_chunked, |
| 103 | ++ ] |
| 104 | + target = fdp.PickValueInList(targets) |
| 105 | + |
| 106 | + try: |
0 commit comments