Skip to content

Commit 0c16102

Browse files
authored
[PR #10915/545783b backport][3.12] Fix connection reuse for file-like data payloads (#10931)
1 parent 10f0cf8 commit 0c16102

File tree

8 files changed

+1060
-84
lines changed

8 files changed

+1060
-84
lines changed

CHANGES/10325.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
10915.bugfix.rst

CHANGES/10915.bugfix.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed connection reuse for file-like data payloads by ensuring buffer
2+
truncation respects content-length boundaries and preventing premature
3+
connection closure race -- by :user:`bdraco`.

aiohttp/client_reqrep.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,23 @@ def __init__(
370370
def __reset_writer(self, _: object = None) -> None:
371371
self.__writer = None
372372

373+
def _get_content_length(self) -> Optional[int]:
374+
"""Extract and validate Content-Length header value.
375+
376+
Returns parsed Content-Length value or None if not set.
377+
Raises ValueError if header exists but cannot be parsed as an integer.
378+
"""
379+
if hdrs.CONTENT_LENGTH not in self.headers:
380+
return None
381+
382+
content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
383+
try:
384+
return int(content_length_hdr)
385+
except ValueError:
386+
raise ValueError(
387+
f"Invalid Content-Length header: {content_length_hdr}"
388+
) from None
389+
373390
@property
374391
def skip_auto_headers(self) -> CIMultiDict[None]:
375392
return self._skip_auto_headers or CIMultiDict()
@@ -659,9 +676,37 @@ def update_proxy(
659676
self.proxy_headers = proxy_headers
660677

661678
async def write_bytes(
662-
self, writer: AbstractStreamWriter, conn: "Connection"
679+
self,
680+
writer: AbstractStreamWriter,
681+
conn: "Connection",
682+
content_length: Optional[int],
663683
) -> None:
664-
"""Support coroutines that yields bytes objects."""
684+
"""
685+
Write the request body to the connection stream.
686+
687+
This method handles writing different types of request bodies:
688+
1. Payload objects (using their specialized write_with_length method)
689+
2. Bytes/bytearray objects
690+
3. Iterable body content
691+
692+
Args:
693+
writer: The stream writer to write the body to
694+
conn: The connection being used for this request
695+
content_length: Optional maximum number of bytes to write from the body
696+
(None means write the entire body)
697+
698+
The method properly handles:
699+
- Waiting for 100-Continue responses if required
700+
- Content length constraints for chunked encoding
701+
- Error handling for network issues, cancellation, and other exceptions
702+
- Signaling EOF and timeout management
703+
704+
Raises:
705+
ClientOSError: When there's an OS-level error writing the body
706+
ClientConnectionError: When there's a general connection error
707+
asyncio.CancelledError: When the operation is cancelled
708+
709+
"""
665710
# 100 response
666711
if self._continue is not None:
667712
await writer.drain()
@@ -671,16 +716,30 @@ async def write_bytes(
671716
assert protocol is not None
672717
try:
673718
if isinstance(self.body, payload.Payload):
674-
await self.body.write(writer)
719+
# Specialized handling for Payload objects that know how to write themselves
720+
await self.body.write_with_length(writer, content_length)
675721
else:
722+
# Handle bytes/bytearray by converting to an iterable for consistent handling
676723
if isinstance(self.body, (bytes, bytearray)):
677724
self.body = (self.body,)
678725

679-
for chunk in self.body:
680-
await writer.write(chunk)
726+
if content_length is None:
727+
# Write the entire body without length constraint
728+
for chunk in self.body:
729+
await writer.write(chunk)
730+
else:
731+
# Write with length constraint, respecting content_length limit
732+
# If the body is larger than content_length, we truncate it
733+
remaining_bytes = content_length
734+
for chunk in self.body:
735+
await writer.write(chunk[:remaining_bytes])
736+
remaining_bytes -= len(chunk)
737+
if remaining_bytes <= 0:
738+
break
681739
except OSError as underlying_exc:
682740
reraised_exc = underlying_exc
683741

742+
# Distinguish between timeout and other OS errors for better error reporting
684743
exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
685744
underlying_exc, asyncio.TimeoutError
686745
)
@@ -692,18 +751,20 @@ async def write_bytes(
692751

693752
set_exception(protocol, reraised_exc, underlying_exc)
694753
except asyncio.CancelledError:
695-
# Body hasn't been fully sent, so connection can't be reused.
754+
# Body hasn't been fully sent, so connection can't be reused
696755
conn.close()
697756
raise
698757
except Exception as underlying_exc:
699758
set_exception(
700759
protocol,
701760
ClientConnectionError(
702-
f"Failed to send bytes into the underlying connection {conn !s}",
761+
"Failed to send bytes into the underlying connection "
762+
f"{conn !s}: {underlying_exc!r}",
703763
),
704764
underlying_exc,
705765
)
706766
else:
767+
# Successfully wrote the body, signal EOF and start response timeout
707768
await writer.write_eof()
708769
protocol.start_timeout()
709770

@@ -768,7 +829,7 @@ async def send(self, conn: "Connection") -> "ClientResponse":
768829
await writer.write_headers(status_line, self.headers)
769830
task: Optional["asyncio.Task[None]"]
770831
if self.body or self._continue is not None or protocol.writing_paused:
771-
coro = self.write_bytes(writer, conn)
832+
coro = self.write_bytes(writer, conn, self._get_content_length())
772833
if sys.version_info >= (3, 12):
773834
# Optimization for Python 3.12, try to write
774835
# bytes immediately to avoid having to schedule

0 commit comments

Comments
 (0)