Skip to content

Commit 545783b

Browse files
authored
Fix connection reuse for file-like data payloads (#10915)
1 parent 5fac5f1 commit 545783b

File tree

8 files changed

+1044
-64
lines changed

8 files changed

+1044
-64
lines changed

CHANGES/10325.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
10915.bugfix.rst

CHANGES/10915.bugfix.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Fixed connection reuse for file-like data payloads by ensuring buffer
2+
truncation respects content-length boundaries and preventing premature
3+
connection closure race -- by :user:`bdraco`.

aiohttp/client_reqrep.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,23 @@ def __init__(
304304
def __reset_writer(self, _: object = None) -> None:
305305
self.__writer = None
306306

307+
def _get_content_length(self) -> Optional[int]:
308+
"""Extract and validate Content-Length header value.
309+
310+
Returns parsed Content-Length value or None if not set.
311+
Raises ValueError if header exists but cannot be parsed as an integer.
312+
"""
313+
if hdrs.CONTENT_LENGTH not in self.headers:
314+
return None
315+
316+
content_length_hdr = self.headers[hdrs.CONTENT_LENGTH]
317+
try:
318+
return int(content_length_hdr)
319+
except ValueError:
320+
raise ValueError(
321+
f"Invalid Content-Length header: {content_length_hdr}"
322+
) from None
323+
307324
@property
308325
def skip_auto_headers(self) -> CIMultiDict[None]:
309326
return self._skip_auto_headers or CIMultiDict()
@@ -596,9 +613,37 @@ def update_proxy(
596613
self.proxy_headers = proxy_headers
597614

598615
async def write_bytes(
599-
self, writer: AbstractStreamWriter, conn: "Connection"
616+
self,
617+
writer: AbstractStreamWriter,
618+
conn: "Connection",
619+
content_length: Optional[int],
600620
) -> None:
601-
"""Support coroutines that yields bytes objects."""
621+
"""
622+
Write the request body to the connection stream.
623+
624+
This method handles writing different types of request bodies:
625+
1. Payload objects (using their specialized write_with_length method)
626+
2. Bytes/bytearray objects
627+
3. Iterable body content
628+
629+
Args:
630+
writer: The stream writer to write the body to
631+
conn: The connection being used for this request
632+
content_length: Optional maximum number of bytes to write from the body
633+
(None means write the entire body)
634+
635+
The method properly handles:
636+
- Waiting for 100-Continue responses if required
637+
- Content length constraints for chunked encoding
638+
- Error handling for network issues, cancellation, and other exceptions
639+
- Signaling EOF and timeout management
640+
641+
Raises:
642+
ClientOSError: When there's an OS-level error writing the body
643+
ClientConnectionError: When there's a general connection error
644+
asyncio.CancelledError: When the operation is cancelled
645+
646+
"""
602647
# 100 response
603648
if self._continue is not None:
604649
await writer.drain()
@@ -608,16 +653,30 @@ async def write_bytes(
608653
assert protocol is not None
609654
try:
610655
if isinstance(self.body, payload.Payload):
611-
await self.body.write(writer)
656+
# Specialized handling for Payload objects that know how to write themselves
657+
await self.body.write_with_length(writer, content_length)
612658
else:
659+
# Handle bytes/bytearray by converting to an iterable for consistent handling
613660
if isinstance(self.body, (bytes, bytearray)):
614661
self.body = (self.body,)
615662

616-
for chunk in self.body:
617-
await writer.write(chunk)
663+
if content_length is None:
664+
# Write the entire body without length constraint
665+
for chunk in self.body:
666+
await writer.write(chunk)
667+
else:
668+
# Write with length constraint, respecting content_length limit
669+
# If the body is larger than content_length, we truncate it
670+
remaining_bytes = content_length
671+
for chunk in self.body:
672+
await writer.write(chunk[:remaining_bytes])
673+
remaining_bytes -= len(chunk)
674+
if remaining_bytes <= 0:
675+
break
618676
except OSError as underlying_exc:
619677
reraised_exc = underlying_exc
620678

679+
# Distinguish between timeout and other OS errors for better error reporting
621680
exc_is_not_timeout = underlying_exc.errno is not None or not isinstance(
622681
underlying_exc, asyncio.TimeoutError
623682
)
@@ -629,18 +688,20 @@ async def write_bytes(
629688

630689
set_exception(protocol, reraised_exc, underlying_exc)
631690
except asyncio.CancelledError:
632-
# Body hasn't been fully sent, so connection can't be reused.
691+
# Body hasn't been fully sent, so connection can't be reused
633692
conn.close()
634693
raise
635694
except Exception as underlying_exc:
636695
set_exception(
637696
protocol,
638697
ClientConnectionError(
639-
f"Failed to send bytes into the underlying connection {conn !s}",
698+
"Failed to send bytes into the underlying connection "
699+
f"{conn !s}: {underlying_exc!r}",
640700
),
641701
underlying_exc,
642702
)
643703
else:
704+
# Successfully wrote the body, signal EOF and start response timeout
644705
await writer.write_eof()
645706
protocol.start_timeout()
646707

@@ -705,7 +766,7 @@ async def send(self, conn: "Connection") -> "ClientResponse":
705766
await writer.write_headers(status_line, self.headers)
706767
task: Optional["asyncio.Task[None]"]
707768
if self.body or self._continue is not None or protocol.writing_paused:
708-
coro = self.write_bytes(writer, conn)
769+
coro = self.write_bytes(writer, conn, self._get_content_length())
709770
if sys.version_info >= (3, 12):
710771
# Optimization for Python 3.12, try to write
711772
# bytes immediately to avoid having to schedule

0 commit comments

Comments
 (0)