Skip to content

Commit e7cec4c

Browse files
fix(asm): fix receive wrapper for asgi [backport 2.7] (#8700)
- Fix receive wrapper for asynchronous requests with more than one message received. - Add a test_streaming (only for FastAPI) to cover the fix - Add one other possible large request body for threat blocking tests on request body ## Checklist - [x] Change(s) are motivated and described in the PR description - [x] Testing strategy is described if automated tests are not included in the PR - [x] Risks are described (performance impact, potential for breakage, maintainability) - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed or label `changelog/no-changelog` is set - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)) - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] If this PR changes the public interface, I've notified `@DataDog/apm-tees`. - [x] If change touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. ## Reviewer Checklist - [x] Title is accurate - [x] All changes are related to the pull request's stated goal - [x] Description motivates each change - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - [x] Testing strategy adequately addresses listed risks - [x] Change is maintainable (easy to change, telemetry, documentation) - [x] Release note makes sense to a user of the library - [x] Author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) (cherry picked from commit 28a03cc)
1 parent 52c0632 commit e7cec4c

File tree

4 files changed

+49
-4
lines changed

4 files changed

+49
-4
lines changed

ddtrace/appsec/_handlers.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,11 @@ async def _on_asgi_request_parse_body(receive, headers):
9595
data_received = await receive()
9696
body = data_received.get("body", b"")
9797

98-
async def receive():
99-
return data_received
98+
async def receive_wrapped(once=[True]):
99+
if once[0]:
100+
once[0] = False
101+
return data_received
102+
return await receive()
100103

101104
content_type = headers.get("content-type") or headers.get("Content-Type")
102105
try:
@@ -111,9 +114,9 @@ async def receive():
111114
req_body = None
112115
else:
113116
req_body = parse_form_multipart(body.decode(), headers) or None
114-
return receive, req_body
117+
return receive_wrapped, req_body
115118
except BaseException:
116-
return receive, None
119+
return receive_wrapped, None
117120

118121
return receive, None
119122

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
ASM: This fix resolves an issue where the asgi middleware could crash with a RuntimeError "Unexpected message received".

tests/appsec/contrib_appsec/fastapi_app/app.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import asyncio
12
from typing import Optional
23

34
from fastapi import FastAPI
45
from fastapi import Request
56
from fastapi.responses import HTMLResponse
67
from fastapi.responses import JSONResponse
8+
from fastapi.responses import StreamingResponse
79
from pydantic import BaseModel
810

911

@@ -83,4 +85,13 @@ async def new_service(service_name: str, request: Request): # noqa: B008
8385
ddtrace.Pin.override(app, service=service_name, tracer=ddtrace.tracer)
8486
return HTMLResponse(service_name, 200)
8587

88+
async def slow_numbers(minimum, maximum):
89+
for number in range(minimum, maximum):
90+
yield "%d" % number
91+
await asyncio.sleep(0.25)
92+
93+
@app.get("/stream/")
94+
async def stream():
95+
return StreamingResponse(slow_numbers(0, 10), media_type="text/html")
96+
8697
return app

tests/appsec/contrib_appsec/utils.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,13 +659,20 @@ def test_request_suspicious_request_block_match_response_headers(
659659
assert get_tag(http.STATUS_CODE) == "200"
660660
assert get_tag(APPSEC.JSON) is None
661661

662+
LARGE_BODY = {
663+
f"key_{i}": {f"key_{i}_{j}": {f"key_{i}_{j}_{k}": f"value_{i}_{j}_{k}" for k in range(4)} for j in range(4)}
664+
for i in range(254)
665+
}
666+
LARGE_BODY["attack"] = "yqrweytqwreasldhkuqwgervflnmlnli"
667+
662668
@pytest.mark.parametrize("asm_enabled", [True, False])
663669
@pytest.mark.parametrize(
664670
("body", "content_type", "blocked"),
665671
[
666672
# json body must be blocked
667673
('{"attack": "yqrweytqwreasldhkuqwgervflnmlnli"}', "application/json", "tst-037-003"),
668674
('{"attack": "yqrweytqwreasldhkuqwgervflnmlnli"}', "text/json", "tst-037-003"),
675+
(json.dumps(LARGE_BODY), "text/json", "tst-037-003"),
669676
# xml body must be blocked
670677
(
671678
'<?xml version="1.0" encoding="UTF-8"?><attack>yqrweytqwreasldhkuqwgervflnmlnli</attack>',
@@ -686,6 +693,7 @@ def test_request_suspicious_request_block_match_response_headers(
686693
# other values must not be blocked
687694
('{"attack": "zqrweytqwreasldhkuqxgervflnmlnli"}', "application/json", False),
688695
],
696+
ids=["json", "text_json", "json_large", "xml", "form", "form_multipart", "text", "no_attack"],
689697
)
690698
def test_request_suspicious_request_block_match_request_body(
691699
self, interface: Interface, get_tag, asm_enabled, root_span, body, content_type, blocked
@@ -1000,6 +1008,25 @@ def test_global_callback_list_length(self, interface):
10001008
# only two global callbacks are expected for API Security and Nested Events
10011009
assert len(_asm_request_context.GLOBAL_CALLBACKS.get(_asm_request_context._CONTEXT_CALL, [])) == 2
10021010

1011+
@pytest.mark.parametrize("asm_enabled", [True, False])
1012+
@pytest.mark.parametrize("metastruct", [True, False])
1013+
def test_stream_response(
1014+
self,
1015+
interface: Interface,
1016+
get_tag,
1017+
asm_enabled,
1018+
metastruct,
1019+
root_span,
1020+
):
1021+
if interface.name != "fastapi":
1022+
raise pytest.skip("only fastapi tests have support for stream response")
1023+
with override_global_config(
1024+
dict(_asm_enabled=asm_enabled, _use_metastruct_for_triggers=metastruct)
1025+
), override_env(dict(DD_APPSEC_RULES=rules.RULES_SRB)):
1026+
self.update_tracer(interface)
1027+
response = interface.client.get("/stream/")
1028+
assert self.body(response) == "0123456789"
1029+
10031030

10041031
@contextmanager
10051032
def test_tracer():

0 commit comments

Comments
 (0)