Skip to content

Commit 7618832

Browse files
Buffer async inputs to the crt
The "clever" workaround to the crt taking a sync stream doesn't work. This effectively reverts the crt binding to the previous state of buffering everything immediately. This unfortunately means that the input stream must be closed before anything can be read. This is not a tenable state for the future. The ideal resolution is for the crt to support reading from an async stream. We might have to contribute that support. There may be other options, like using futures... or sleep... which might be necessary. But please no.
1 parent b2735a6 commit 7618832

File tree

1 file changed

+11
-25
lines changed
  • python-packages/smithy-http/smithy_http/aio

1 file changed

+11
-25
lines changed

python-packages/smithy-http/smithy_http/aio/crt.py

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
HAS_CRT = False # type: ignore
2626

2727
from smithy_core import interfaces as core_interfaces
28-
from smithy_core.aio.types import AsyncBytesReader
28+
from smithy_core.aio import interfaces as core_aio_interfaces
2929
from smithy_core.exceptions import MissingDependencyException
3030

3131
from .. import Field, Fields
@@ -188,7 +188,6 @@ def __init__(
188188
self._tls_ctx = crt_io.ClientTlsContext(crt_io.TlsContextOptions())
189189
self._socket_options = crt_io.SocketOptions()
190190
self._connections: ConnectionPoolDict = {}
191-
self._async_reads: set[asyncio.Task[Any]] = set()
192191

193192
async def send(
194193
self,
@@ -301,23 +300,14 @@ async def _marshal_request(
301300
# If the body is already directly in memory, wrap in a BytesIO to hand
302301
# off to CRT.
303302
crt_body = BytesIO(body)
303+
elif not self._is_sync_stream(body):
304+
# If the body is an async stream.... read it all into memory. This is
305+
# very unfortunate, but necessary because the CRT doesn't currently
306+
# have the capability to read async. We will likely have to implment
307+
# this capability into it ourselves, or implment a thread-based wrapper.
308+
crt_body = BytesIO(await request.consume_body_async())
304309
else:
305-
# If the body is async, or potentially very large, start up a task to read
306-
# it into the BytesIO object that CRT needs. By using asyncio.create_task
307-
# we'll start the coroutine without having to explicitly await it.
308-
crt_body = BytesIO()
309-
if not isinstance(body, AsyncIterable):
310-
# If the body isn't already an async iterable, wrap it in one. Objects
311-
# with read methods will be read in chunks so as not to exhaust memory.
312-
body = AsyncBytesReader(body)
313-
314-
# Start the read task in the background.
315-
read_task = asyncio.create_task(self._consume_body_async(body, crt_body))
316-
317-
# Keep track of the read task so that it doesn't get garbage colllected,
318-
# and stop tracking it once it's done.
319-
self._async_reads.add(read_task)
320-
read_task.add_done_callback(self._async_reads.discard)
310+
crt_body = body
321311

322312
crt_request = crt_http.HttpRequest(
323313
method=request.method,
@@ -327,10 +317,6 @@ async def _marshal_request(
327317
)
328318
return crt_request
329319

330-
async def _consume_body_async(
331-
self, source: AsyncIterable[bytes], dest: BytesIO
332-
) -> None:
333-
async for chunk in source:
334-
dest.write(chunk)
335-
# Should we call close here? Or will that make the crt unable to read the last
336-
# chunk?
320+
def _is_sync_stream(self, body: core_aio_interfaces.StreamingBlob):
321+
read = getattr(body, "read")
322+
return read is not None and not asyncio.iscoroutinefunction(read)

0 commit comments

Comments
 (0)