Skip to content

Commit 5c043c2

Browse files
committed
Merge branch 'develop' into codegen_sigv4_auth
2 parents 45cce24 + 4579c2b commit 5c043c2

File tree

14 files changed

+3171
-9
lines changed

14 files changed

+3171
-9
lines changed

designs/event-streams.md

Lines changed: 325 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,325 @@
1+
# Event Streams
2+
3+
Event streams represent a behavioral difference in Smithy operations. Most
4+
operations work philosophically like functions in python - you provide some
5+
parameters once, and get results once. Event streams, on the other hand,
6+
represent a continual exchange of data which may be flow in one direction
7+
or in both directions (a.k.a. a "bidirectional" or "duplex" stream).
8+
9+
To facilitate these different usage scenarios, the return type event stream
10+
operations are altered to provide customers with persistent stream objects
11+
that they can write or read to.
12+
13+
## Event Publishers
14+
15+
An `AsyncEventPublisher` is used to send events to a service.
16+
17+
```python
18+
class AsyncEventPublisher[E: SerializableShape](Protocol):
19+
async def send(self, event: E) -> None:
20+
...
21+
22+
async def close(self) -> None:
23+
pass
24+
25+
async def __aenter__(self) -> Self:
26+
return self
27+
28+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
29+
await self.close()
30+
```
31+
32+
Publishers expose a `send` method that takes an event class which implements
33+
`SerializableShape`. It then passes that shape to an internal `ShapeSerializer`
34+
and sends it over the connection. (Note that these `ShapeSerializer`s and
35+
connection types are internal, and so are not part of the interface shown
36+
above.)
37+
38+
The `ShapeSerializer`s work in exactly the same way as they do for other use
39+
cases. They are ultimately driven by each `SerializableShape`'s `serialize`
40+
method.
41+
42+
Publishers also expose a few Python standard methods. `close` can be used to
43+
clean up any long-running resources, such as an HTTP connection or open file
44+
handle. The async context manager magic methods are also supported, and by
45+
default they just serve to automatically call `close` on exit. It is important
46+
however that implementations of `AsyncEventPublisher` MUST NOT require
47+
`__aenter__` or any other method to be called prior to `send`. These publishers
48+
are intended to be immediately useful and so any setup SHOULD take place while
49+
constructing them in the `ClientProtocol`.
50+
51+
```python
52+
async with publisher:
53+
publisher.send(FooEvent(foo="bar"))
54+
```
55+
56+
## Event Receivers
57+
58+
An `AsyncEventReceiver` is used to receive events from a service.
59+
60+
```python
61+
class AsyncEventReceiver[E: DeserializableShape](Protocol):
62+
63+
async def receive(self) -> E | None:
64+
...
65+
66+
async def close(self) -> None:
67+
pass
68+
69+
async def __anext__(self) -> E:
70+
result = await self.receive()
71+
if result is None:
72+
await self.close()
73+
raise StopAsyncIteration
74+
return result
75+
76+
def __aiter__(self) -> Self:
77+
return self
78+
79+
async def __enter__(self) -> Self:
80+
return self
81+
82+
async def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any):
83+
await self.close()
84+
```
85+
86+
Similar to publishers, these expose a single method that MUST be implemented.
87+
The `receive` method receives a single event from among the different declared
88+
event types. These events are read from the connection and then deserialized
89+
with `ShapeDeserializer`s.
90+
91+
The `ShapeDeserializer`s work in mostly the same way as they do for other use
92+
cases. They are ultimately driven by each `DeserializableShape`'s `deserialize`
93+
method. Since the shape on the wire might be one of several types, a
94+
`TypeRegistry` SHOULD be used to access the correct event shape. Protocols MUST
95+
have some sort of discriminator on the wire that can be used to match the wire
96+
event to the ID of the shape it represents.
97+
98+
Receivers also expose a few standard Python methods. `close` can be used to
99+
clean up any long-running resources, such as an HTTP connection or open file
100+
handle. The async context manager magic methods are also supported, and by
101+
default they just serve to autoatically call `close` on exit. It is important
102+
however that implementations of `AsyncEventReceiver` MUST NOT require
103+
`__aenter__` or any other method to be called prior to `receive`. These
104+
receivers are intended to be immediately useful and so any setup SHOULD take
105+
place while constructing them.
106+
107+
`AsyncEventReceiver` additionally implements the async iterable methods, which
108+
is the standard way of interacting with async streams in Python. These methods
109+
are fully implemented by the `AsyncEventReceiver` class, so any implementations
110+
that inherit from it do not need to do anything. `close` is automatically called
111+
when no more events are available.
112+
113+
```python
114+
def handle_event(event: ExampleEventStream):
115+
# Events are a union, so you must check which kind was received
116+
match event:
117+
case FooEvent:
118+
print(event.foo)
119+
case _:
120+
print(f"Unkown event: {event}")
121+
122+
123+
# Usage via directly calling `receive`
124+
async with receiver_a:
125+
if (event := await receiver_a.receive()) is not None:
126+
handle_event(event)
127+
128+
129+
# Usage via iterator
130+
async for event in reciever:
131+
handle_event(event)
132+
```
133+
134+
### Errors
135+
136+
Event streams may define modeled errors that may be sent over the stream. These
137+
errors are deserialized in exactly the same way that other response shapes are.
138+
Modeled error classes implement the same `SerializeableShape` and
139+
`DeserializeableShape` interfaces that normal shapes do.
140+
141+
Event stream protocols may also define a way to send an error that is
142+
structured, but not part of the model. These could, for example, represent an
143+
unknown error on the service side that would result in a 500-level error in a
144+
standard HTTP request lifecycle. These errors MUST be parsed by receiver
145+
implementations into a generic exception class.
146+
147+
All errors received over the stream MUST be raised by the receiver. All errors
148+
are considered terminal, so the receiver MUST close any open resources after
149+
receiving an error.
150+
151+
### Unknown and Malformed Events
152+
153+
If a receiver encounters an unknown event, it MUST treat it as an error and
154+
raise an exception. If an identifier was able to be parsed from the event, it
155+
MUST be included in the exception message. Like any other error, receiving an
156+
unknown event is considered to be terminal, so the receiver MUST close any open
157+
resources after receiving it.
158+
159+
## Operation Return Types
160+
161+
An event stream operation may stream events to the service, from the service, or
162+
both. Each of these cases deserves to be handled separately, and so each has a
163+
different return type that encapsulates a publisher and/or receiver. These cases
164+
are handled by the following classes:
165+
166+
* `DuplexEventStream` is returned when the operation has both input and output
167+
streams.
168+
* `InputEventStream` is returned when the operation only has an input stream.
169+
* `OutputEventStream` is returned when the operation only has an output stream.
170+
171+
```python
172+
class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protocol):
173+
174+
input_stream: AsyncEventPublisher[I]
175+
176+
_output_stream: AsyncEventReceiver[O] | None = None
177+
_response: R | None = None
178+
179+
@property
180+
def output_stream(self) -> AsyncEventReceiver[O] | None:
181+
return self._output_stream
182+
183+
@output_stream.setter
184+
def output_stream(self, value: AsyncEventReceiver[O]) -> None:
185+
self._output_stream = value
186+
187+
@property
188+
def response(self) -> R | None:
189+
return self._response
190+
191+
@response.setter
192+
def response(self, value: R) -> None:
193+
self._response = value
194+
195+
async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]:
196+
...
197+
198+
async def close(self) -> None:
199+
if self.output_stream is None:
200+
_, self.output_stream = await self.await_output()
201+
202+
await self.input_stream.close()
203+
await self.output_stream.close()
204+
205+
async def __aenter__(self) -> Self:
206+
return self
207+
208+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
209+
await self.close()
210+
211+
212+
class InputEventStream[I: SerializableShape, R](Protocol):
213+
214+
input_stream: AsyncEventPublisher[I]
215+
216+
_response: R | None = None
217+
218+
@property
219+
def response(self) -> R | None:
220+
return self._response
221+
222+
@response.setter
223+
def response(self, value: R) -> None:
224+
self._response = value
225+
226+
async def await_output(self) -> R:
227+
...
228+
229+
async def close(self) -> None:
230+
await self.input_stream.close()
231+
232+
async def __aenter__(self) -> Self:
233+
return self
234+
235+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
236+
await self.close()
237+
238+
239+
class OutputEventStream[O: DeserializableShape, R](Protocol):
240+
241+
output_stream: AsyncEventReceiver[O]
242+
243+
response: R
244+
245+
async def close(self) -> None:
246+
await self.output_stream.close()
247+
248+
async def __aenter__(self) -> Self:
249+
return self
250+
251+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
252+
await self.close()
253+
```
254+
255+
All three classes share certain functionality. They all implement `close` and
256+
the async context manager magic methods. By default these just call close on
257+
the underlying publisher and/or receiver.
258+
259+
Both `InputEventStream` and `DuplexEventStream` have an `await_output` method
260+
that waits for the initial request to be received, returning that and the output
261+
stream. Their `response` and `output_stream` properties will not be set until
262+
then. This is important because clients MUST be able to start sending events to
263+
the service immediately, without waiting for the initial response. This is
264+
critical because there are existing services that require one or more events to
265+
be sent before they start sending responses.
266+
267+
```python
268+
with await client.duplex_operation(DuplexInput(spam="eggs")) as stream:
269+
stream.input_stream.send(FooEvent(foo="bar"))
270+
271+
initial, output_stream = await stream.await_output()
272+
273+
for event in output_stream:
274+
handle_event(event)
275+
276+
277+
with await client.input_operation() as stream:
278+
stream.input_stream.send(FooEvent(foo="bar"))
279+
```
280+
281+
The `OutputEventStream`'s initial `response` and `output_stream` will never be
282+
`None`, however. Instead, the `ClientProtocol` MUST set values for these when
283+
constructing the object. This differs from the other stream types because the
284+
lack of an input stream means that the service has nothing to wait on from the
285+
client before sending responses.
286+
287+
```python
288+
with await client.output_operation() as stream:
289+
for event in output_stream:
290+
handle_event(event)
291+
```
292+
293+
## Event Structure
294+
295+
Event messages are structurally similar to HTTP messages. They consist of a map
296+
of headers alongside a binary payload. Unlike HTTP messages, headers can be one
297+
of a number of different types.
298+
299+
```python
300+
type HEADER_VALUE = bool | int | bytes | str | datetime.datetime
301+
302+
class EventMessage(Protocol):
303+
headers: Mapping[str, HEADER_VALUE]
304+
payload: bytes
305+
```
306+
307+
This structure MUST NOT be exposed as the response type for a receiver or input
308+
type for a publisher. It MAY be exposed for modification in a similar way to how
309+
HTTP messages are exposed during the request pipeline. In particular, it SHOULD
310+
be exposed for the purposes of signing.
311+
312+
## FAQ
313+
314+
### Why aren't the event streams one class?
315+
316+
Forcing the three event stream variants into one class makes typing a mess. When
317+
they're separate, they can be paramaterized on their event union without having
318+
to lean on `Any`. It also doesn't expose properties that will always be `None`
319+
and doesn't force properties that will never be `None` to be declared optional.
320+
321+
### How are events signed?
322+
323+
The signer interface will need to be updated to expose a `sign_event` method
324+
similar to the current `sign` method, but which takes an `EventMessage` instead
325+
of an `HTTPRequest` or other request type.

designs/serialization.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,9 @@ class ShapeSerializer(Protocol):
317317
def write_document(self, schema: "Schema", value: "Document") -> None:
318318
...
319319

320+
def write_data_stream(self, schema: "Schema", value: StreamingBlob) -> None:
321+
raise NotImplementedError()
322+
320323

321324
@runtime_checkable
322325
class MapSerializer(Protocol):
@@ -531,6 +534,9 @@ class ShapeDeserializer(Protocol):
531534
def read_timestamp(self, schema: "Schema") -> datetime.datetime:
532535
...
533536

537+
def read_data_stream(self, schema: "Schema") -> StreamingBlob:
538+
raise NotImplementedError()
539+
534540

535541
@runtime_checkable
536542
class DeserializeableShape(Protocol):

packages/smithy-aws-core/src/smithy_aws_core/interceptors/user_agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,5 @@ def _crt_version(self) -> list[UserAgentComponent]:
6868
import awscrt
6969

7070
return [UserAgentComponent("md", "awscrt", awscrt.__version__)]
71-
except AttributeError:
71+
except (ImportError, AttributeError):
7272
return []

packages/smithy-core/src/smithy_core/deserializers.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@
33
from decimal import Decimal
44
from typing import TYPE_CHECKING, Never, Protocol, Self, runtime_checkable
55

6-
from .exceptions import SmithyException
6+
from .exceptions import SmithyException, UnsupportedStreamException
77

88
if TYPE_CHECKING:
99
from .documents import Document
1010
from .schemas import Schema
11+
from .aio.interfaces import StreamingBlob as _Stream
1112

1213

1314
@runtime_checkable
@@ -171,6 +172,22 @@ def read_timestamp(self, schema: "Schema") -> datetime.datetime:
171172
"""
172173
...
173174

175+
def read_data_stream(self, schema: "Schema") -> "_Stream":
176+
"""Read a data stream from the underlying data.
177+
178+
The data itself MUST NOT be read by this method. The value returned is intended
179+
to be read later by the consumer. In an HTTP implementation, for example, this
180+
would directly return the HTTP body stream. The stream MAY be wrapped to provide
181+
a more consistent interface or to avoid exposing implementation details.
182+
183+
Data streams are only supported at the top-level input and output for
184+
operations.
185+
186+
:param schema: The shape's schema.
187+
:returns: A data stream derived from the underlying data.
188+
"""
189+
raise UnsupportedStreamException()
190+
174191

175192
class SpecificShapeDeserializer(ShapeDeserializer):
176193
"""Expects to deserialize a specific kind of shape, failing if other shapes are
@@ -247,6 +264,9 @@ def read_document(self, schema: "Schema") -> "Document":
247264
def read_timestamp(self, schema: "Schema") -> datetime.datetime:
248265
self._invalid_state(schema)
249266

267+
def read_data_stream(self, schema: "Schema") -> "_Stream":
268+
self._invalid_state(schema)
269+
250270

251271
@runtime_checkable
252272
class DeserializeableShape(Protocol):

0 commit comments

Comments
 (0)