@@ -337,6 +337,7 @@ async def _consume_body_async(
337337 ) -> None :
338338 async for chunk in source :
339339 dest .write (chunk )
340+ dest .end_stream ()
340341
341342 def __deepcopy__ (self , memo : Any ) -> "AWSCRTHTTPClient" :
342343 return AWSCRTHTTPClient (
@@ -354,6 +355,7 @@ def __init__(self) -> None:
354355 # will be much more efficient than a list.
355356 self ._chunks : deque [bytes ] = deque ()
356357 self ._closed = False
358+ self ._done = False
357359
358360 def read (self , size : int | None = - 1 ) -> bytes :
359361 if self ._closed :
@@ -363,21 +365,24 @@ def read(self, size: int | None = -1) -> bytes:
363365 # When the CRT recieves this, it'll try again later.
364366 raise BlockingIOError ("read" )
365367
366- chunk = self ._chunks .popleft ()
367- if size is None or size < 1 :
368- # We could compile all the chunks here instead of just returning
369- # the one, BUT the CRT will keep calling read until empty bytes
370- # are returned. So it's actually better to just return one chunk
371- # since combining them would have some potentially bad memory
372- # usage issues.
373- return chunk
374- else :
375- result = chunk [:size ]
376- remainder = chunk [size :]
368+ # We could compile all the chunks here instead of just returning
369+ # the one, BUT the CRT will keep calling read until empty bytes
370+ # are returned. So it's actually better to just return one chunk
371+ # since combining them would have some potentially bad memory
372+ # usage issues.
373+ result = self ._chunks .popleft ()
374+ if size is not None and size > 0 :
375+ remainder = result [size :]
376+ result = result [:size ]
377377 if remainder :
378378 self ._chunks .appendleft (remainder )
379379 return result
380380
381+ if self ._done and len (self ._chunks ) == 0 :
382+ self .close ()
383+
384+ return result
385+
381386 def read1 (self , size : int = - 1 ) -> bytes :
382387 return self .read (size )
383388
@@ -410,6 +415,11 @@ def closed(self) -> bool:
410415
411416 def close (self ) -> None :
412417 self ._closed = True
418+ self ._done = True
413419
414420 # Clear out the remaining chunks so that they don't sit around in memory.
415421 self ._chunks .clear ()
422+
423+ def end_stream (self ) -> None :
424+ """End the stream, letting any remaining chunks be read before it is closed."""
425+ self ._done = True
0 commit comments