Skip to content

Commit 99323d2

Browse files
Write CRT data asynchronously
This updates the CRT bindings to write data to its ByteIO body asynchronously rather than buffering it all into memory when making a call. This should enable us to support event streaming, and also prevent us from buffering a huge amount of data into memory, even for synchronous bodies of uncertain size.
1 parent a4be928 commit 99323d2

File tree

1 file changed

+26
-2
lines changed
  • python-packages/smithy-http/smithy_http/aio

1 file changed

+26
-2
lines changed

python-packages/smithy-http/smithy_http/aio/crt.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
HAS_CRT = False # type: ignore
2626

2727
from smithy_core import interfaces as core_interfaces
28+
from smithy_core.aio.types import AsyncBytesReader
2829
from smithy_core.exceptions import MissingDependencyException
2930

3031
from .. import Field, Fields
@@ -293,12 +294,35 @@ async def _marshal_request(
293294

294295
path = self._render_path(request.destination)
295296
headers = crt_http.HttpHeaders(headers_list)
296-
body = BytesIO(await request.consume_body_async())
297+
298+
body = request.body
299+
if isinstance(body, bytes | bytearray):
300+
# If the body is already directly in memory, wrap in in a BytesIO to hand
301+
# off to CRT.
302+
crt_body = BytesIO(body)
303+
else:
304+
# If the body is async, or potentially very large, start up a task to read
305+
# it into the BytesIO object that CRT needs. By using asyncio.create_task
306+
# we'll start the coroutine without having to explicitly await it.
307+
crt_body = BytesIO()
308+
if not isinstance(body, AsyncIterable):
309+
# If the body isn't already an async iterable, wrap it in one. Objects
310+
# with read methods will be read in chunks so as not to exhaust memory.
311+
body = AsyncBytesReader(body)
312+
asyncio.create_task(self._consume_body_async(body, crt_body))
297313

298314
crt_request = crt_http.HttpRequest(
299315
method=request.method,
300316
path=path,
301317
headers=headers,
302-
body_stream=body,
318+
body_stream=crt_body,
303319
)
304320
return crt_request
321+
322+
async def _consume_body_async(
323+
self, source: AsyncIterable[bytes], dest: BytesIO
324+
) -> None:
325+
async for chunk in source:
326+
dest.write(chunk)
327+
# Should we call close here? Or will that make the crt unable to read the last
328+
# chunk?

0 commit comments

Comments
 (0)