Skip to content

Commit cb9b844

Browse files
fix(asm): fix receive wrapper for asgi [backport 2.6] (#8701)
- Fix receive wrapper for asynchronous requests with more than one message received. - Add a test_streaming (only for FastAPI) to cover the fix - Add one other possible large request body for threat blocking tests on request body ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. - [x] If change touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) (cherry picked from commit 28a03cc)
1 parent 9f72de7 commit cb9b844

File tree

4 files changed

+49
-4
lines changed

4 files changed

+49
-4
lines changed

ddtrace/appsec/_handlers.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,11 @@ async def _on_asgi_request_parse_body(receive, headers):
9595
data_received = await receive()
9696
body = data_received.get("body", b"")
9797

98-
async def receive():
99-
return data_received
98+
async def receive_wrapped(once=[True]):
99+
if once[0]:
100+
once[0] = False
101+
return data_received
102+
return await receive()
100103

101104
content_type = headers.get("content-type") or headers.get("Content-Type")
102105
try:
@@ -111,9 +114,9 @@ async def receive():
111114
req_body = None
112115
else:
113116
req_body = parse_form_multipart(body.decode(), headers) or None
114-
return receive, req_body
117+
return receive_wrapped, req_body
115118
except BaseException:
116-
return receive, None
119+
return receive_wrapped, None
117120

118121
return receive, None
119122

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
ASM: This fix resolves an issue where the asgi middleware could crash with a RuntimeError "Unexpected message received".

tests/appsec/contrib_appsec/fastapi_app/app.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import asyncio
12
from typing import Optional
23

34
from fastapi import FastAPI
45
from fastapi import Request
56
from fastapi.responses import HTMLResponse
67
from fastapi.responses import JSONResponse
8+
from fastapi.responses import StreamingResponse
79
from pydantic import BaseModel
810

911

@@ -83,4 +85,13 @@ async def new_service(service_name: str, request: Request): # noqa: B008
8385
ddtrace.Pin.override(app, service=service_name, tracer=ddtrace.tracer)
8486
return HTMLResponse(service_name, 200)
8587

88+
async def slow_numbers(minimum, maximum):
89+
for number in range(minimum, maximum):
90+
yield "%d" % number
91+
await asyncio.sleep(0.25)
92+
93+
@app.get("/stream/")
94+
async def stream():
95+
return StreamingResponse(slow_numbers(0, 10), media_type="text/html")
96+
8697
return app

tests/appsec/contrib_appsec/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -660,13 +660,20 @@ def test_request_suspicious_request_block_match_response_headers(
660660
assert get_tag(http.STATUS_CODE) == "200"
661661
assert get_tag(APPSEC.JSON) is None
662662

663+
LARGE_BODY = {
664+
f"key_{i}": {f"key_{i}_{j}": {f"key_{i}_{j}_{k}": f"value_{i}_{j}_{k}" for k in range(4)} for j in range(4)}
665+
for i in range(254)
666+
}
667+
LARGE_BODY["attack"] = "yqrweytqwreasldhkuqwgervflnmlnli"
668+
663669
@pytest.mark.parametrize("asm_enabled", [True, False])
664670
@pytest.mark.parametrize(
665671
("body", "content_type", "blocked"),
666672
[
667673
# json body must be blocked
668674
('{"attack": "yqrweytqwreasldhkuqwgervflnmlnli"}', "application/json", "tst-037-003"),
669675
('{"attack": "yqrweytqwreasldhkuqwgervflnmlnli"}', "text/json", "tst-037-003"),
676+
(json.dumps(LARGE_BODY), "text/json", "tst-037-003"),
670677
# xml body must be blocked
671678
(
672679
'<?xml version="1.0" encoding="UTF-8"?><attack>yqrweytqwreasldhkuqwgervflnmlnli</attack>',
@@ -687,6 +694,7 @@ def test_request_suspicious_request_block_match_response_headers(
687694
# other values must not be blocked
688695
('{"attack": "zqrweytqwreasldhkuqxgervflnmlnli"}', "application/json", False),
689696
],
697+
ids=["json", "text_json", "json_large", "xml", "form", "form_multipart", "text", "no_attack"],
690698
)
691699
def test_request_suspicious_request_block_match_request_body(
692700
self, interface: Interface, get_tag, asm_enabled, root_span, body, content_type, blocked
@@ -1001,6 +1009,25 @@ def test_global_callback_list_length(self, interface):
10011009
# only two global callbacks are expected for API Security and Nested Events
10021010
assert len(_asm_request_context.GLOBAL_CALLBACKS.get(_asm_request_context._CONTEXT_CALL, [])) == 2
10031011

1012+
@pytest.mark.parametrize("asm_enabled", [True, False])
1013+
@pytest.mark.parametrize("metastruct", [True, False])
1014+
def test_stream_response(
1015+
self,
1016+
interface: Interface,
1017+
get_tag,
1018+
asm_enabled,
1019+
metastruct,
1020+
root_span,
1021+
):
1022+
if interface.name != "fastapi":
1023+
raise pytest.skip("only fastapi tests have support for stream response")
1024+
with override_global_config(
1025+
dict(_asm_enabled=asm_enabled, _use_metastruct_for_triggers=metastruct)
1026+
), override_env(dict(DD_APPSEC_RULES=rules.RULES_SRB)):
1027+
self.update_tracer(interface)
1028+
response = interface.client.get("/stream/")
1029+
assert self.body(response) == "0123456789"
1030+
10041031

10051032
@contextmanager
10061033
def test_tracer():

0 commit comments

Comments
 (0)