Skip to content

Commit bbe7cd2

Browse files
authored
[PR #11017/1c01726 backport][3.13] Support Reusable Request Bodies and Improve Payload Handling (#11019)
1 parent 6cc455c commit bbe7cd2

21 files changed

+2864
-154
lines changed

CHANGES/11017.feature.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Added support for reusable request bodies to enable retries, redirects, and digest authentication -- by :user:`bdraco` and :user:`GLGDLY`.
2+
3+
Most payloads can now be safely reused multiple times, fixing long-standing issues where POST requests with form data or file uploads would fail on redirects with errors like "Form data has been processed already" or "I/O operation on closed file". This also enables digest authentication to work with request bodies and allows retry mechanisms to resend requests without consuming the payload. Note that payloads derived from async iterables may still not be reusable in some cases.

CHANGES/5530.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
11017.feature.rst

CHANGES/5577.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
11017.feature.rst

CHANGES/9201.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
11017.feature.rst

CONTRIBUTORS.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ Frederik Gladhorn
136136
Frederik Peter Aalund
137137
Gabriel Tremblay
138138
Gang Ji
139+
Gary Leung
139140
Gary Wilson Jr.
140141
Gennady Andreyev
141142
Georges Dubus

aiohttp/client.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,8 @@ async def _connect_and_send_request(
792792
redirects += 1
793793
history.append(resp)
794794
if max_redirects and redirects >= max_redirects:
795+
if req._body is not None:
796+
await req._body.close()
795797
resp.close()
796798
raise TooManyRedirects(
797799
history[0].request_info, tuple(history)
@@ -823,13 +825,18 @@ async def _connect_and_send_request(
823825
r_url, encoded=not self._requote_redirect_url
824826
)
825827
except ValueError as e:
828+
if req._body is not None:
829+
await req._body.close()
830+
resp.close()
826831
raise InvalidUrlRedirectClientError(
827832
r_url,
828833
"Server attempted redirecting to a location that does not look like a URL",
829834
) from e
830835

831836
scheme = parsed_redirect_url.scheme
832837
if scheme not in HTTP_AND_EMPTY_SCHEMA_SET:
838+
if req._body is not None:
839+
await req._body.close()
833840
resp.close()
834841
raise NonHttpUrlRedirectClientError(r_url)
835842
elif not scheme:
@@ -838,6 +845,9 @@ async def _connect_and_send_request(
838845
try:
839846
redirect_origin = parsed_redirect_url.origin()
840847
except ValueError as origin_val_err:
848+
if req._body is not None:
849+
await req._body.close()
850+
resp.close()
841851
raise InvalidUrlRedirectClientError(
842852
parsed_redirect_url,
843853
"Invalid redirect URL origin",
@@ -854,6 +864,8 @@ async def _connect_and_send_request(
854864

855865
break
856866

867+
if req._body is not None:
868+
await req._body.close()
857869
# check response status
858870
if raise_for_status is None:
859871
raise_for_status = self._raise_for_status

aiohttp/client_middleware_digest_auth.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from .client_exceptions import ClientError
3030
from .client_middlewares import ClientHandlerType
3131
from .client_reqrep import ClientRequest, ClientResponse
32+
from .payload import Payload
3233

3334

3435
class DigestAuthChallenge(TypedDict, total=False):
@@ -192,7 +193,7 @@ def __init__(
192193
self._nonce_count = 0
193194
self._challenge: DigestAuthChallenge = {}
194195

195-
def _encode(self, method: str, url: URL, body: Union[bytes, str]) -> str:
196+
async def _encode(self, method: str, url: URL, body: Union[bytes, Payload]) -> str:
196197
"""
197198
Build digest authorization header for the current challenge.
198199
@@ -207,6 +208,7 @@ def _encode(self, method: str, url: URL, body: Union[bytes, str]) -> str:
207208
Raises:
208209
ClientError: If the challenge is missing required parameters or
209210
contains unsupported values
211+
210212
"""
211213
challenge = self._challenge
212214
if "realm" not in challenge:
@@ -272,11 +274,11 @@ def KD(s: bytes, d: bytes) -> bytes:
272274
A1 = b":".join((self._login_bytes, realm_bytes, self._password_bytes))
273275
A2 = f"{method.upper()}:{path}".encode()
274276
if qop == "auth-int":
275-
if isinstance(body, str):
276-
entity_str = body.encode("utf-8", errors="replace")
277+
if isinstance(body, bytes): # will always be empty bytes unless Payload
278+
entity_bytes = body
277279
else:
278-
entity_str = body
279-
entity_hash = H(entity_str)
280+
entity_bytes = await body.as_bytes() # Get bytes from Payload
281+
entity_hash = H(entity_bytes)
280282
A2 = b":".join((A2, entity_hash))
281283

282284
HA1 = H(A1)
@@ -398,7 +400,7 @@ async def __call__(
398400
for retry_count in range(2):
399401
# Apply authorization header if we have a challenge (on second attempt)
400402
if retry_count > 0:
401-
request.headers[hdrs.AUTHORIZATION] = self._encode(
403+
request.headers[hdrs.AUTHORIZATION] = await self._encode(
402404
request.method, request.url, request.body
403405
)
404406

aiohttp/client_reqrep.py

Lines changed: 158 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,25 @@ def _is_expected_content_type(
252252
return expected_content_type in response_content_type
253253

254254

255+
def _warn_if_unclosed_payload(payload: payload.Payload, stacklevel: int = 2) -> None:
256+
"""Warn if the payload is not closed.
257+
258+
Callers must check that the body is a Payload before calling this method.
259+
260+
Args:
261+
payload: The payload to check
262+
stacklevel: Stack level for the warning (default 2 for direct callers)
263+
"""
264+
if not payload.autoclose and not payload.consumed:
265+
warnings.warn(
266+
"The previous request body contains unclosed resources. "
267+
"Use await request.update_body() instead of setting request.body "
268+
"directly to properly close resources and avoid leaks.",
269+
ResourceWarning,
270+
stacklevel=stacklevel,
271+
)
272+
273+
255274
class ClientRequest:
256275
GET_METHODS = {
257276
hdrs.METH_GET,
@@ -268,7 +287,7 @@ class ClientRequest:
268287
}
269288

270289
# Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
271-
body: Any = b""
290+
_body: Union[None, payload.Payload] = None
272291
auth = None
273292
response = None
274293

@@ -439,6 +458,36 @@ def host(self) -> str:
439458
def port(self) -> Optional[int]:
440459
return self.url.port
441460

461+
@property
462+
def body(self) -> Union[bytes, payload.Payload]:
463+
"""Request body."""
464+
# empty body is represented as bytes for backwards compatibility
465+
return self._body or b""
466+
467+
@body.setter
468+
def body(self, value: Any) -> None:
469+
"""Set request body with warning for non-autoclose payloads.
470+
471+
WARNING: This setter must be called from within an event loop and is not
472+
thread-safe. Setting body outside of an event loop may raise RuntimeError
473+
when closing file-based payloads.
474+
475+
DEPRECATED: Direct assignment to body is deprecated and will be removed
476+
in a future version. Use await update_body() instead for proper resource
477+
management.
478+
"""
479+
# Close existing payload if present
480+
if self._body is not None:
481+
# Warn if the payload needs manual closing
482+
# stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
483+
_warn_if_unclosed_payload(self._body, stacklevel=3)
484+
# NOTE: In the future, when we remove sync close support,
485+
# this setter will need to be removed and only the async
486+
# update_body() method will be available. For now, we call
487+
# _close() for backwards compatibility.
488+
self._body._close()
489+
self._update_body(value)
490+
442491
@property
443492
def request_info(self) -> RequestInfo:
444493
headers: CIMultiDictProxy[str] = CIMultiDictProxy(self.headers)
@@ -590,9 +639,12 @@ def update_transfer_encoding(self) -> None:
590639
)
591640

592641
self.headers[hdrs.TRANSFER_ENCODING] = "chunked"
593-
else:
594-
if hdrs.CONTENT_LENGTH not in self.headers:
595-
self.headers[hdrs.CONTENT_LENGTH] = str(len(self.body))
642+
elif (
643+
self._body is not None
644+
and hdrs.CONTENT_LENGTH not in self.headers
645+
and (size := self._body.size) is not None
646+
):
647+
self.headers[hdrs.CONTENT_LENGTH] = str(size)
596648

597649
def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> None:
598650
"""Set basic auth."""
@@ -610,37 +662,120 @@ def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> Non
610662

611663
self.headers[hdrs.AUTHORIZATION] = auth.encode()
612664

613-
def update_body_from_data(self, body: Any) -> None:
665+
def update_body_from_data(self, body: Any, _stacklevel: int = 3) -> None:
666+
"""Update request body from data."""
667+
if self._body is not None:
668+
_warn_if_unclosed_payload(self._body, stacklevel=_stacklevel)
669+
614670
if body is None:
671+
self._body = None
615672
return
616673

617674
# FormData
618-
if isinstance(body, FormData):
619-
body = body()
675+
maybe_payload = body() if isinstance(body, FormData) else body
620676

621677
try:
622-
body = payload.PAYLOAD_REGISTRY.get(body, disposition=None)
678+
body_payload = payload.PAYLOAD_REGISTRY.get(maybe_payload, disposition=None)
623679
except payload.LookupError:
624-
body = FormData(body)()
625-
626-
self.body = body
680+
body_payload = FormData(maybe_payload)() # type: ignore[arg-type]
627681

682+
self._body = body_payload
628683
# enable chunked encoding if needed
629684
if not self.chunked and hdrs.CONTENT_LENGTH not in self.headers:
630-
if (size := body.size) is not None:
685+
if (size := body_payload.size) is not None:
631686
self.headers[hdrs.CONTENT_LENGTH] = str(size)
632687
else:
633688
self.chunked = True
634689

635690
# copy payload headers
636-
assert body.headers
691+
assert body_payload.headers
637692
headers = self.headers
638693
skip_headers = self._skip_auto_headers
639-
for key, value in body.headers.items():
694+
for key, value in body_payload.headers.items():
640695
if key in headers or (skip_headers is not None and key in skip_headers):
641696
continue
642697
headers[key] = value
643698

699+
def _update_body(self, body: Any) -> None:
700+
"""Update request body after its already been set."""
701+
# Remove existing Content-Length header since body is changing
702+
if hdrs.CONTENT_LENGTH in self.headers:
703+
del self.headers[hdrs.CONTENT_LENGTH]
704+
705+
# Remove existing Transfer-Encoding header to avoid conflicts
706+
if self.chunked and hdrs.TRANSFER_ENCODING in self.headers:
707+
del self.headers[hdrs.TRANSFER_ENCODING]
708+
709+
# Now update the body using the existing method
710+
# Called from _update_body, add 1 to stacklevel from caller
711+
self.update_body_from_data(body, _stacklevel=4)
712+
713+
# Update transfer encoding headers if needed (same logic as __init__)
714+
if body is not None or self.method not in self.GET_METHODS:
715+
self.update_transfer_encoding()
716+
717+
async def update_body(self, body: Any) -> None:
718+
"""
719+
Update request body and close previous payload if needed.
720+
721+
This method safely updates the request body by first closing any existing
722+
payload to prevent resource leaks, then setting the new body.
723+
724+
IMPORTANT: Always use this method instead of setting request.body directly.
725+
Direct assignment to request.body will leak resources if the previous body
726+
contains file handles, streams, or other resources that need cleanup.
727+
728+
Args:
729+
body: The new body content. Can be:
730+
- bytes/bytearray: Raw binary data
731+
- str: Text data (will be encoded using charset from Content-Type)
732+
- FormData: Form data that will be encoded as multipart/form-data
733+
- Payload: A pre-configured payload object
734+
- AsyncIterable: An async iterable of bytes chunks
735+
- File-like object: Will be read and sent as binary data
736+
- None: Clears the body
737+
738+
Usage:
739+
# CORRECT: Use update_body
740+
await request.update_body(b"new request data")
741+
742+
# WRONG: Don't set body directly
743+
# request.body = b"new request data" # This will leak resources!
744+
745+
# Update with form data
746+
form_data = FormData()
747+
form_data.add_field('field', 'value')
748+
await request.update_body(form_data)
749+
750+
# Clear body
751+
await request.update_body(None)
752+
753+
Note:
754+
This method is async because it may need to close file handles or
755+
other resources associated with the previous payload. Always await
756+
this method to ensure proper cleanup.
757+
758+
Warning:
759+
Setting request.body directly is highly discouraged and can lead to:
760+
- Resource leaks (unclosed file handles, streams)
761+
- Memory leaks (unreleased buffers)
762+
- Unexpected behavior with streaming payloads
763+
764+
It is not recommended to change the payload type in middleware. If the
765+
body was already set (e.g., as bytes), it's best to keep the same type
766+
rather than converting it (e.g., to str) as this may result in unexpected
767+
behavior.
768+
769+
See Also:
770+
- update_body_from_data: Synchronous body update without cleanup
771+
- body property: Direct body access (STRONGLY DISCOURAGED)
772+
773+
"""
774+
# Close existing payload if it exists and needs closing
775+
if self._body is not None:
776+
await self._body.close()
777+
self._update_body(body)
778+
644779
def update_expect_continue(self, expect: bool = False) -> None:
645780
if expect:
646781
self.headers[hdrs.EXPECT] = "100-continue"
@@ -717,27 +852,14 @@ async def write_bytes(
717852
protocol = conn.protocol
718853
assert protocol is not None
719854
try:
720-
if isinstance(self.body, payload.Payload):
721-
# Specialized handling for Payload objects that know how to write themselves
722-
await self.body.write_with_length(writer, content_length)
723-
else:
724-
# Handle bytes/bytearray by converting to an iterable for consistent handling
725-
if isinstance(self.body, (bytes, bytearray)):
726-
self.body = (self.body,)
727-
728-
if content_length is None:
729-
# Write the entire body without length constraint
730-
for chunk in self.body:
731-
await writer.write(chunk)
732-
else:
733-
# Write with length constraint, respecting content_length limit
734-
# If the body is larger than content_length, we truncate it
735-
remaining_bytes = content_length
736-
for chunk in self.body:
737-
await writer.write(chunk[:remaining_bytes])
738-
remaining_bytes -= len(chunk)
739-
if remaining_bytes <= 0:
740-
break
855+
# This should be a rare case but the
856+
# self._body can be set to None while
857+
# the task is being started or we wait above
858+
# for the 100-continue response.
859+
# The more likely case is we have an empty
860+
# payload, but 100-continue is still expected.
861+
if self._body is not None:
862+
await self._body.write_with_length(writer, content_length)
741863
except OSError as underlying_exc:
742864
reraised_exc = underlying_exc
743865

@@ -833,7 +955,7 @@ async def send(self, conn: "Connection") -> "ClientResponse":
833955
await writer.write_headers(status_line, self.headers)
834956

835957
task: Optional["asyncio.Task[None]"]
836-
if self.body or self._continue is not None or protocol.writing_paused:
958+
if self._body or self._continue is not None or protocol.writing_paused:
837959
coro = self.write_bytes(writer, conn, self._get_content_length())
838960
if sys.version_info >= (3, 12):
839961
# Optimization for Python 3.12, try to write

0 commit comments

Comments
 (0)