@@ -190,6 +190,25 @@ class ConnectionKey(NamedTuple):
190
190
proxy_headers_hash : Optional [int ] # hash(CIMultiDict)
191
191
192
192
193
+ def _warn_if_unclosed_payload (payload : payload .Payload , stacklevel : int = 2 ) -> None :
194
+ """Warn if the payload is not closed.
195
+
196
+ Callers must check that the body is a Payload before calling this method.
197
+
198
+ Args:
199
+ payload: The payload to check
200
+ stacklevel: Stack level for the warning (default 2 for direct callers)
201
+ """
202
+ if not payload .autoclose and not payload .consumed :
203
+ warnings .warn (
204
+ "The previous request body contains unclosed resources. "
205
+ "Use await request.update_body() instead of setting request.body "
206
+ "directly to properly close resources and avoid leaks." ,
207
+ ResourceWarning ,
208
+ stacklevel = stacklevel ,
209
+ )
210
+
211
+
193
212
class ClientRequest :
194
213
GET_METHODS = {
195
214
hdrs .METH_GET ,
@@ -206,7 +225,7 @@ class ClientRequest:
206
225
}
207
226
208
227
# Type of body depends on PAYLOAD_REGISTRY, which is dynamic.
209
- body : Any = b""
228
+ _body : Union [ None , payload . Payload ] = None
210
229
auth = None
211
230
response = None
212
231
@@ -373,6 +392,36 @@ def host(self) -> str:
373
392
def port (self ) -> Optional [int ]:
374
393
return self .url .port
375
394
395
+ @property
396
+ def body (self ) -> Union [bytes , payload .Payload ]:
397
+ """Request body."""
398
+ # empty body is represented as bytes for backwards compatibility
399
+ return self ._body or b""
400
+
401
+ @body .setter
402
+ def body (self , value : Any ) -> None :
403
+ """Set request body with warning for non-autoclose payloads.
404
+
405
+ WARNING: This setter must be called from within an event loop and is not
406
+ thread-safe. Setting body outside of an event loop may raise RuntimeError
407
+ when closing file-based payloads.
408
+
409
+ DEPRECATED: Direct assignment to body is deprecated and will be removed
410
+ in a future version. Use await update_body() instead for proper resource
411
+ management.
412
+ """
413
+ # Close existing payload if present
414
+ if self ._body is not None :
415
+ # Warn if the payload needs manual closing
416
+ # stacklevel=3: user code -> body setter -> _warn_if_unclosed_payload
417
+ _warn_if_unclosed_payload (self ._body , stacklevel = 3 )
418
+ # NOTE: In the future, when we remove sync close support,
419
+ # this setter will need to be removed and only the async
420
+ # update_body() method will be available. For now, we call
421
+ # _close() for backwards compatibility.
422
+ self ._body ._close ()
423
+ self ._update_body (value )
424
+
376
425
@property
377
426
def request_info (self ) -> RequestInfo :
378
427
headers : CIMultiDictProxy [str ] = CIMultiDictProxy (self .headers )
@@ -522,9 +571,12 @@ def update_transfer_encoding(self) -> None:
522
571
)
523
572
524
573
self .headers [hdrs .TRANSFER_ENCODING ] = "chunked"
525
- else :
526
- if hdrs .CONTENT_LENGTH not in self .headers :
527
- self .headers [hdrs .CONTENT_LENGTH ] = str (len (self .body ))
574
+ elif (
575
+ self ._body is not None
576
+ and hdrs .CONTENT_LENGTH not in self .headers
577
+ and (size := self ._body .size ) is not None
578
+ ):
579
+ self .headers [hdrs .CONTENT_LENGTH ] = str (size )
528
580
529
581
def update_auth (self , auth : Optional [BasicAuth ], trust_env : bool = False ) -> None :
530
582
"""Set basic auth."""
@@ -542,42 +594,125 @@ def update_auth(self, auth: Optional[BasicAuth], trust_env: bool = False) -> Non
542
594
543
595
self .headers [hdrs .AUTHORIZATION ] = auth .encode ()
544
596
545
- def update_body_from_data (self , body : Any ) -> None :
597
+ def update_body_from_data (self , body : Any , _stacklevel : int = 3 ) -> None :
598
+ """Update request body from data."""
599
+ if self ._body is not None :
600
+ _warn_if_unclosed_payload (self ._body , stacklevel = _stacklevel )
601
+
546
602
if body is None :
603
+ self ._body = None
547
604
return
548
605
549
606
# FormData
550
- if isinstance (body , FormData ):
551
- body = body ()
607
+ maybe_payload = body () if isinstance (body , FormData ) else body
552
608
553
609
try :
554
- body = payload .PAYLOAD_REGISTRY .get (body , disposition = None )
610
+ body_payload = payload .PAYLOAD_REGISTRY .get (maybe_payload , disposition = None )
555
611
except payload .LookupError :
556
- boundary = None
612
+ boundary : Optional [ str ] = None
557
613
if CONTENT_TYPE in self .headers :
558
614
boundary = parse_mimetype (self .headers [CONTENT_TYPE ]).parameters .get (
559
615
"boundary"
560
616
)
561
- body = FormData (body , boundary = boundary )()
562
-
563
- self .body = body
617
+ body_payload = FormData (maybe_payload , boundary = boundary )() # type: ignore[arg-type]
564
618
619
+ self ._body = body_payload
565
620
# enable chunked encoding if needed
566
621
if not self .chunked and hdrs .CONTENT_LENGTH not in self .headers :
567
- if (size := body .size ) is not None :
622
+ if (size := body_payload .size ) is not None :
568
623
self .headers [hdrs .CONTENT_LENGTH ] = str (size )
569
624
else :
570
625
self .chunked = True
571
626
572
627
# copy payload headers
573
- assert body .headers
628
+ assert body_payload .headers
574
629
headers = self .headers
575
630
skip_headers = self ._skip_auto_headers
576
- for key , value in body .headers .items ():
631
+ for key , value in body_payload .headers .items ():
577
632
if key in headers or (skip_headers is not None and key in skip_headers ):
578
633
continue
579
634
headers [key ] = value
580
635
636
+ def _update_body (self , body : Any ) -> None :
637
+ """Update request body after its already been set."""
638
+ # Remove existing Content-Length header since body is changing
639
+ if hdrs .CONTENT_LENGTH in self .headers :
640
+ del self .headers [hdrs .CONTENT_LENGTH ]
641
+
642
+ # Remove existing Transfer-Encoding header to avoid conflicts
643
+ if self .chunked and hdrs .TRANSFER_ENCODING in self .headers :
644
+ del self .headers [hdrs .TRANSFER_ENCODING ]
645
+
646
+ # Now update the body using the existing method
647
+ # Called from _update_body, add 1 to stacklevel from caller
648
+ self .update_body_from_data (body , _stacklevel = 4 )
649
+
650
+ # Update transfer encoding headers if needed (same logic as __init__)
651
+ if body is not None or self .method not in self .GET_METHODS :
652
+ self .update_transfer_encoding ()
653
+
654
+ async def update_body (self , body : Any ) -> None :
655
+ """
656
+ Update request body and close previous payload if needed.
657
+
658
+ This method safely updates the request body by first closing any existing
659
+ payload to prevent resource leaks, then setting the new body.
660
+
661
+ IMPORTANT: Always use this method instead of setting request.body directly.
662
+ Direct assignment to request.body will leak resources if the previous body
663
+ contains file handles, streams, or other resources that need cleanup.
664
+
665
+ Args:
666
+ body: The new body content. Can be:
667
+ - bytes/bytearray: Raw binary data
668
+ - str: Text data (will be encoded using charset from Content-Type)
669
+ - FormData: Form data that will be encoded as multipart/form-data
670
+ - Payload: A pre-configured payload object
671
+ - AsyncIterable: An async iterable of bytes chunks
672
+ - File-like object: Will be read and sent as binary data
673
+ - None: Clears the body
674
+
675
+ Usage:
676
+ # CORRECT: Use update_body
677
+ await request.update_body(b"new request data")
678
+
679
+ # WRONG: Don't set body directly
680
+ # request.body = b"new request data" # This will leak resources!
681
+
682
+ # Update with form data
683
+ form_data = FormData()
684
+ form_data.add_field('field', 'value')
685
+ await request.update_body(form_data)
686
+
687
+ # Clear body
688
+ await request.update_body(None)
689
+
690
+ Note:
691
+ This method is async because it may need to close file handles or
692
+ other resources associated with the previous payload. Always await
693
+ this method to ensure proper cleanup.
694
+
695
+ Warning:
696
+ Setting request.body directly is highly discouraged and can lead to:
697
+ - Resource leaks (unclosed file handles, streams)
698
+ - Memory leaks (unreleased buffers)
699
+ - Unexpected behavior with streaming payloads
700
+
701
+ It is not recommended to change the payload type in middleware. If the
702
+ body was already set (e.g., as bytes), it's best to keep the same type
703
+ rather than converting it (e.g., to str) as this may result in unexpected
704
+ behavior.
705
+
706
+ See Also:
707
+ - update_body_from_data: Synchronous body update without cleanup
708
+ - body property: Direct body access (STRONGLY DISCOURAGED)
709
+
710
+ """
711
+ # Close existing payload if it exists and needs closing
712
+ if self ._body is not None :
713
+ await self ._body .close ()
714
+ self ._update_body (body )
715
+
581
716
def update_expect_continue (self , expect : bool = False ) -> None :
582
717
if expect :
583
718
self .headers [hdrs .EXPECT ] = "100-continue"
@@ -654,27 +789,14 @@ async def write_bytes(
654
789
protocol = conn .protocol
655
790
assert protocol is not None
656
791
try :
657
- if isinstance (self .body , payload .Payload ):
658
- # Specialized handling for Payload objects that know how to write themselves
659
- await self .body .write_with_length (writer , content_length )
660
- else :
661
- # Handle bytes/bytearray by converting to an iterable for consistent handling
662
- if isinstance (self .body , (bytes , bytearray )):
663
- self .body = (self .body ,)
664
-
665
- if content_length is None :
666
- # Write the entire body without length constraint
667
- for chunk in self .body :
668
- await writer .write (chunk )
669
- else :
670
- # Write with length constraint, respecting content_length limit
671
- # If the body is larger than content_length, we truncate it
672
- remaining_bytes = content_length
673
- for chunk in self .body :
674
- await writer .write (chunk [:remaining_bytes ])
675
- remaining_bytes -= len (chunk )
676
- if remaining_bytes <= 0 :
677
- break
792
+ # This should be a rare case but the
793
+ # self._body can be set to None while
794
+ # the task is being started or we wait above
795
+ # for the 100-continue response.
796
+ # The more likely case is we have an empty
797
+ # payload, but 100-continue is still expected.
798
+ if self ._body is not None :
799
+ await self ._body .write_with_length (writer , content_length )
678
800
except OSError as underlying_exc :
679
801
reraised_exc = underlying_exc
680
802
@@ -770,7 +892,7 @@ async def send(self, conn: "Connection") -> "ClientResponse":
770
892
await writer .write_headers (status_line , self .headers )
771
893
772
894
task : Optional ["asyncio.Task[None]" ]
773
- if self .body or self ._continue is not None or protocol .writing_paused :
895
+ if self ._body or self ._continue is not None or protocol .writing_paused :
774
896
coro = self .write_bytes (writer , conn , self ._get_content_length ())
775
897
if sys .version_info >= (3 , 12 ):
776
898
# Optimization for Python 3.12, try to write
0 commit comments