Skip to content

Commit bd17d3e

Browse files
authored
Merge pull request #436 from smithy-lang/await-crt-initial-http
Remove use of thread locks in crt bindings
2 parents 952dd31 + 3b83add commit bd17d3e

File tree

1 file changed

+113
-88
lines changed
  • packages/smithy-http/src/smithy_http/aio

1 file changed

+113
-88
lines changed

packages/smithy-http/src/smithy_http/aio/crt.py

Lines changed: 113 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@
33
# pyright: reportMissingTypeStubs=false,reportUnknownMemberType=false
44
# flake8: noqa: F811
55
import asyncio
6+
from concurrent.futures import Future as ConcurrentFuture
67
from collections import deque
7-
from collections.abc import AsyncGenerator, AsyncIterable, Awaitable
8-
from concurrent.futures import Future
8+
from collections.abc import AsyncGenerator, AsyncIterable
99
from copy import deepcopy
1010
from io import BytesIO, BufferedIOBase
11-
from threading import Lock
1211
from typing import TYPE_CHECKING, Any
1312

1413

@@ -62,23 +61,100 @@ def _initialize_default_loop(self) -> "crt_io.ClientBootstrap":
6261

6362

6463
class AWSCRTHTTPResponse(http_aio_interfaces.HTTPResponse):
65-
def __init__(self) -> None:
64+
def __init__(self, *, status: int, fields: Fields, body: "CRTResponseBody") -> None:
6665
_assert_crt()
67-
self._stream: crt_http.HttpClientStream | None = None
68-
self._status_code_future: Future[int] = Future()
69-
self._headers_future: Future[Fields] = Future()
70-
self._chunk_futures: list[Future[bytes]] = []
71-
self._received_chunks: list[bytes] = []
72-
self._chunk_lock: Lock = Lock()
73-
74-
def _set_stream(self, stream: "crt_http.HttpClientStream") -> None:
66+
self._status = status
67+
self._fields = fields
68+
self._body = body
69+
70+
@property
71+
def status(self) -> int:
72+
return self._status
73+
74+
@property
75+
def fields(self) -> Fields:
76+
return self._fields
77+
78+
@property
79+
def body(self) -> AsyncIterable[bytes]:
80+
return self.chunks()
81+
82+
@property
83+
def reason(self) -> str | None:
84+
"""Optional string provided by the server explaining the status."""
85+
# TODO: See how CRT exposes reason.
86+
return None
87+
88+
async def chunks(self) -> AsyncGenerator[bytes, None]:
89+
while True:
90+
chunk = await self._body.next()
91+
if chunk:
92+
yield chunk
93+
else:
94+
break
95+
96+
def __repr__(self) -> str:
97+
return (
98+
f"AWSCRTHTTPResponse("
99+
f"status={self.status}, "
100+
f"fields={self.fields!r}, body=...)"
101+
)
102+
103+
104+
class CRTResponseBody:
105+
def __init__(self) -> None:
106+
self._stream: "crt_http.HttpClientStream | None" = None
107+
self._chunk_futures: deque[ConcurrentFuture[bytes]] = deque()
108+
109+
# deque is thread safe and the crt is only going to be writing
110+
# with one thread anyway, so we *shouldn't* need to gate this
111+
# behind a lock. In an ideal world, the CRT would expose
112+
# an interface that better matches python's async.
113+
self._received_chunks: deque[bytes] = deque()
114+
115+
def set_stream(self, stream: "crt_http.HttpClientStream") -> None:
75116
if self._stream is not None:
76117
raise SmithyHTTPException("Stream already set on AWSCRTHTTPResponse object")
77118
self._stream = stream
78119
self._stream.completion_future.add_done_callback(self._on_complete)
79120
self._stream.activate()
80121

81-
def _on_headers(
122+
def on_body(self, chunk: bytes, **kwargs: Any) -> None: # pragma: crt-callback
123+
# TODO: update back pressure window once CRT supports it
124+
if self._chunk_futures:
125+
future = self._chunk_futures.popleft()
126+
future.set_result(chunk)
127+
else:
128+
self._received_chunks.append(chunk)
129+
130+
async def next(self) -> bytes:
131+
if self._stream is None:
132+
raise SmithyHTTPException("Stream not set")
133+
134+
# TODO: update backpressure window once CRT supports it
135+
if self._received_chunks:
136+
return self._received_chunks.popleft()
137+
elif self._stream.completion_future.done():
138+
return b""
139+
else:
140+
future = ConcurrentFuture[bytes]()
141+
self._chunk_futures.append(future)
142+
return await asyncio.wrap_future(future)
143+
144+
def _on_complete(
145+
self, completion_future: ConcurrentFuture[int]
146+
) -> None: # pragma: crt-callback
147+
for future in self._chunk_futures:
148+
future.set_result(b"")
149+
self._chunk_futures.clear()
150+
151+
152+
class CRTResponseFactory:
153+
def __init__(self, body: CRTResponseBody) -> None:
154+
self._body = body
155+
self._response_future = ConcurrentFuture[AWSCRTHTTPResponse]()
156+
157+
def on_response(
82158
self, status_code: int, headers: list[tuple[str, str]], **kwargs: Any
83159
) -> None: # pragma: crt-callback
84160
fields = Fields()
@@ -91,76 +167,24 @@ def _on_headers(
91167
values=[header_val],
92168
kind=FieldPosition.HEADER,
93169
)
94-
self._status_code_future.set_result(status_code)
95-
self._headers_future.set_result(fields)
96-
97-
def _on_body(self, chunk: bytes, **kwargs: Any) -> None: # pragma: crt-callback
98-
with self._chunk_lock:
99-
# TODO: update back pressure window once CRT supports it
100-
if self._chunk_futures:
101-
future = self._chunk_futures.pop(0)
102-
future.set_result(chunk)
103-
else:
104-
self._received_chunks.append(chunk)
105-
106-
def _get_chunk_future(self) -> Future[bytes]:
107-
if self._stream is None:
108-
raise SmithyHTTPException("Stream not set")
109-
with self._chunk_lock:
110-
future: Future[bytes] = Future()
111-
# TODO: update backpressure window once CRT supports it
112-
if self._received_chunks:
113-
chunk = self._received_chunks.pop(0)
114-
future.set_result(chunk)
115-
elif self._stream.completion_future.done():
116-
future.set_result(b"")
117-
else:
118-
self._chunk_futures.append(future)
119-
return future
120-
121-
def _on_complete(
122-
self, completion_future: Future[int]
123-
) -> None: # pragma: crt-callback
124-
with self._chunk_lock:
125-
if self._chunk_futures:
126-
future = self._chunk_futures.pop(0)
127-
future.set_result(b"")
128-
129-
@property
130-
def body(self) -> AsyncIterable[bytes]:
131-
return self.chunks()
132-
133-
@property
134-
def status(self) -> int:
135-
"""The 3 digit response status code (1xx, 2xx, 3xx, 4xx, 5xx)."""
136-
return self._status_code_future.result()
137170

138-
@property
139-
def fields(self) -> Fields:
140-
"""List of HTTP header fields."""
141-
if self._stream is None:
142-
raise SmithyHTTPException("Stream not set")
143-
if not self._headers_future.done():
144-
raise SmithyHTTPException("Headers not received yet")
145-
return self._headers_future.result()
171+
self._response_future.set_result(
172+
AWSCRTHTTPResponse(
173+
status=status_code,
174+
fields=fields,
175+
body=self._body,
176+
)
177+
)
146178

147-
@property
148-
def reason(self) -> str | None:
149-
"""Optional string provided by the server explaining the status."""
150-
# TODO: See how CRT exposes reason.
151-
return None
179+
async def await_response(self) -> AWSCRTHTTPResponse:
180+
return await asyncio.wrap_future(self._response_future)
152181

153-
def get_chunk(self) -> Awaitable[bytes]:
154-
future = self._get_chunk_future()
155-
return asyncio.wrap_future(future)
182+
def set_done_callback(self, stream: "crt_http.HttpClientStream") -> None:
183+
stream.completion_future.add_done_callback(self._cancel)
156184

157-
async def chunks(self) -> AsyncGenerator[bytes, None]:
158-
while True:
159-
chunk = await self.get_chunk()
160-
if chunk:
161-
yield chunk
162-
else:
163-
break
185+
def _cancel(self, completion_future: ConcurrentFuture[int | Exception]) -> None:
186+
if not self._response_future.done():
187+
self._response_future.cancel()
164188

165189

166190
ConnectionPoolKey = tuple[str, str, int | None]
@@ -209,20 +233,21 @@ async def send(
209233
"""
210234
crt_request = await self._marshal_request(request)
211235
connection = await self._get_connection(request.destination)
212-
crt_response = AWSCRTHTTPResponse()
236+
response_body = CRTResponseBody()
237+
response_factory = CRTResponseFactory(response_body)
213238
crt_stream = connection.request(
214239
crt_request,
215-
crt_response._on_headers, # pyright: ignore[reportPrivateUsage]
216-
crt_response._on_body, # pyright: ignore[reportPrivateUsage]
240+
response_factory.on_response,
241+
response_body.on_body,
217242
)
218-
crt_response._set_stream(crt_stream) # pyright: ignore[reportPrivateUsage]
219-
return crt_response
243+
response_factory.set_done_callback(crt_stream)
244+
response_body.set_stream(crt_stream)
245+
return await response_factory.await_response()
220246

221247
async def _create_connection(
222248
self, url: core_interfaces.URI
223249
) -> "crt_http.HttpClientConnection":
224-
"""Builds and validates connection to ``url``, returns it as
225-
``asyncio.Future``"""
250+
"""Builds and validates connection to ``url``"""
226251
connect_future = self._build_new_connection(url)
227252
connection = await asyncio.wrap_future(connect_future)
228253
self._validate_connection(connection)
@@ -242,7 +267,7 @@ async def _get_connection(
242267

243268
def _build_new_connection(
244269
self, url: core_interfaces.URI
245-
) -> Future["crt_http.HttpClientConnection"]:
270+
) -> ConcurrentFuture["crt_http.HttpClientConnection"]:
246271
if url.scheme == "http":
247272
port = self._HTTP_PORT
248273
tls_connection_options = None
@@ -259,7 +284,7 @@ def _build_new_connection(
259284
if url.port is not None:
260285
port = url.port
261286

262-
connect_future: Future[crt_http.HttpClientConnection] = (
287+
connect_future: ConcurrentFuture[crt_http.HttpClientConnection] = (
263288
crt_http.HttpClientConnection.new(
264289
bootstrap=self._client_bootstrap,
265290
host_name=url.host,

0 commit comments

Comments
 (0)