Skip to content

Commit 48c4786

Browse files
Centralize event stream wrappers
1 parent c11f1de commit 48c4786

File tree

18 files changed

+339
-551
lines changed

18 files changed

+339
-551
lines changed

designs/event-streams.md

Lines changed: 47 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ async with publisher:
5353
publisher.send(FooEvent(foo="bar"))
5454
```
5555

56+
Protocol implementations will be responsible for creating publishers.
57+
5658
## Event Receivers
5759

5860
An `AsyncEventReceiver` is used to receive events from a service.
@@ -131,6 +133,8 @@ async for event in reciever:
131133
handle_event(event)
132134
```
133135

136+
Protocol implementations will be responsible for creating receivers.
137+
134138
### Errors
135139

136140
Event streams may define modeled errors that may be sent over the stream. These
@@ -169,38 +173,30 @@ are handled by the following classes:
169173
* `OutputEventStream` is returned when the operation only has an output stream.
170174

171175
```python
172-
class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protocol):
173-
174-
input_stream: AsyncEventPublisher[I]
175-
176-
_output_stream: AsyncEventReceiver[O] | None = None
177-
_response: R | None = None
178-
179-
@property
180-
def output_stream(self) -> AsyncEventReceiver[O] | None:
181-
return self._output_stream
182-
183-
@output_stream.setter
184-
def output_stream(self, value: AsyncEventReceiver[O]) -> None:
185-
self._output_stream = value
186-
187-
@property
188-
def response(self) -> R | None:
189-
return self._response
190-
191-
@response.setter
192-
def response(self, value: R) -> None:
193-
self._response = value
194-
195-
async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]:
176+
class DuplexEventStream[
177+
IE: SerializeableShape,
178+
OE: DeserializeableShape,
179+
O: DeserializeableShape,
180+
]:
181+
182+
input_stream: EventPublisher[IE]
183+
output_stream: EventReceiver[OE] | None = None
184+
response: O | None = None
185+
186+
def __init__(
187+
self,
188+
*,
189+
input_stream: EventPublisher[IE],
190+
output_future: Future[tuple[O, EventReceiver[OE]]],
191+
) -> None:
192+
self.input_stream = input_stream
193+
self._output_future = output_future
194+
195+
async def await_output(self) -> tuple[O, EventReceiver[OE]]:
196196
...
197197

198198
async def close(self) -> None:
199-
if self.output_stream is None:
200-
_, self.output_stream = await self.await_output()
201-
202-
await self.input_stream.close()
203-
await self.output_stream.close()
199+
...
204200

205201
async def __aenter__(self) -> Self:
206202
return self
@@ -209,21 +205,21 @@ class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protoco
209205
await self.close()
210206

211207

212-
class InputEventStream[I: SerializableShape, R](Protocol):
208+
class InputEventStream[IE: SerializeableShape, O]:
213209

214-
input_stream: AsyncEventPublisher[I]
210+
input_stream: EventPublisher[IE]
211+
response: O | None = None
215212

216-
_response: R | None = None
213+
def __init__(
214+
self,
215+
*,
216+
input_stream: EventPublisher[IE],
217+
output_future: Future[O],
218+
) -> None:
219+
self.input_stream = input_stream
220+
self._output_future = output_future
217221

218-
@property
219-
def response(self) -> R | None:
220-
return self._response
221-
222-
@response.setter
223-
def response(self, value: R) -> None:
224-
self._response = value
225-
226-
async def await_output(self) -> R:
222+
async def await_output(self) -> O:
227223
...
228224

229225
async def close(self) -> None:
@@ -236,11 +232,14 @@ class InputEventStream[I: SerializableShape, R](Protocol):
236232
await self.close()
237233

238234

239-
class OutputEventStream[O: DeserializableShape, R](Protocol):
235+
class OutputEventStream[OE: DeserializeableShape, O: DeserializeableShape]:
240236

241-
output_stream: AsyncEventReceiver[O]
242-
243-
response: R
237+
output_stream: EventReceiver[OE]
238+
response: O
239+
240+
def __init__(self, output_stream: EventReceiver[OE], output: O) -> None:
241+
self.output_stream = output_stream
242+
self.response = output
244243

245244
async def close(self) -> None:
246245
await self.output_stream.close()
@@ -290,6 +289,9 @@ with await client.output_operation() as stream:
290289
handle_event(event)
291290
```
292291

292+
All three output types are centrally located and will be constructed by filling
293+
in the relevant publishers and receivers from the protocol implementation.
294+
293295
## Event Structure
294296

295297
Event messages are structurally similar to HTTP messages. They consist of a map

packages/aws-event-stream/pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ readme = "README.md"
66
requires-python = ">=3.12"
77
dependencies = [
88
"smithy-core",
9-
"smithy-event-stream",
109
]
1110

1211
[build-system]

packages/aws-event-stream/src/aws_event_stream/_private/deserializers.py

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import asyncio
43
import datetime
54
from collections.abc import Callable
65

7-
from smithy_core.aio.interfaces import AsyncByteStream
86
from smithy_core.codecs import Codec
97
from smithy_core.deserializers import (
10-
DeserializeableShape,
118
ShapeDeserializer,
129
SpecificShapeDeserializer,
1310
)
1411
from smithy_core.schemas import Schema
1512
from smithy_core.shapes import ShapeType
1613
from smithy_core.utils import expect_type
17-
from smithy_event_stream.aio.interfaces import AsyncEventReceiver
1814

1915
from ..events import HEADERS_DICT, Event
2016
from ..exceptions import EventError, UnmodeledEventError
@@ -28,59 +24,6 @@
2824
INITIAL_MESSAGE_TYPES = (INITIAL_REQUEST_EVENT_TYPE, INITIAL_RESPONSE_EVENT_TYPE)
2925

3026

31-
class AWSAsyncEventReceiver[E: DeserializeableShape](AsyncEventReceiver[E]):
32-
def __init__(
33-
self,
34-
payload_codec: Codec,
35-
source: AsyncByteStream,
36-
deserializer: Callable[[ShapeDeserializer], E],
37-
is_client_mode: bool = True,
38-
) -> None:
39-
self._payload_codec = payload_codec
40-
self._source = source
41-
self._is_client_mode = is_client_mode
42-
self._deserializer = deserializer
43-
self._closed = False
44-
45-
async def receive(self) -> E | None:
46-
if self._closed:
47-
return None
48-
49-
try:
50-
event = await Event.decode_async(self._source)
51-
except Exception as e:
52-
await self.close()
53-
if not isinstance(e, EventError):
54-
raise IOError("Failed to read from stream.") from e
55-
raise
56-
57-
if event is None:
58-
return None
59-
60-
deserializer = EventDeserializer(
61-
event=event,
62-
payload_codec=self._payload_codec,
63-
is_client_mode=self._is_client_mode,
64-
)
65-
result = self._deserializer(deserializer)
66-
if isinstance(getattr(result, "value"), Exception):
67-
raise result.value # type: ignore
68-
return result
69-
70-
async def close(self) -> None:
71-
if self._closed:
72-
return
73-
self._closed = True
74-
75-
if (close := getattr(self._source, "close", None)) is not None:
76-
if asyncio.iscoroutine(result := close()):
77-
await result
78-
79-
@property
80-
def closed(self) -> bool:
81-
return self._closed
82-
83-
8427
class EventDeserializer(SpecificShapeDeserializer):
8528
def __init__(
8629
self, event: Event, payload_codec: Codec, is_client_mode: bool = True

packages/aws-event-stream/src/aws_event_stream/_private/serializers.py

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import asyncio
43
import datetime
5-
from collections.abc import Callable, Iterator
4+
from collections.abc import Iterator
65
from contextlib import contextmanager
76
from io import BytesIO
87
from typing import Never
98

10-
from smithy_core.aio.interfaces import AsyncWriter
119
from smithy_core.codecs import Codec
12-
from smithy_core.exceptions import ExpectationNotMetException
1310
from smithy_core.schemas import Schema
1411
from smithy_core.serializers import (
1512
InterceptingSerializer,
16-
SerializeableShape,
1713
ShapeSerializer,
1814
SpecificShapeSerializer,
1915
)
2016
from smithy_core.shapes import ShapeType
21-
from smithy_event_stream.aio.interfaces import AsyncEventPublisher
2217

2318
from ..events import EventMessage, HEADER_VALUE, Short, Byte, Long
2419
from ..exceptions import InvalidHeaderValue
@@ -33,58 +28,6 @@
3328
_DEFAULT_BLOB_CONTENT_TYPE = "application/octet-stream"
3429

3530

36-
type Signer = Callable[[EventMessage], EventMessage]
37-
"""A function that takes an event message and signs it, and returns it signed."""
38-
39-
40-
class AWSAsyncEventPublisher[E: SerializeableShape](AsyncEventPublisher[E]):
41-
def __init__(
42-
self,
43-
payload_codec: Codec,
44-
async_writer: AsyncWriter,
45-
signer: Signer | None = None,
46-
is_client_mode: bool = True,
47-
):
48-
self._writer = async_writer
49-
self._signer = signer
50-
self._serializer = EventSerializer(
51-
payload_codec=payload_codec, is_client_mode=is_client_mode
52-
)
53-
self._closed = False
54-
55-
async def send(self, event: E) -> None:
56-
if self._closed:
57-
raise IOError("Attempted to write to closed stream.")
58-
event.serialize(self._serializer)
59-
result = self._serializer.get_result()
60-
if result is None:
61-
raise ExpectationNotMetException(
62-
"Expected an event message to be serialized, but was None."
63-
)
64-
if self._signer is not None:
65-
result = self._signer(result)
66-
67-
encoded_result = result.encode()
68-
try:
69-
await self._writer.write(encoded_result)
70-
except Exception as e:
71-
await self.close()
72-
raise IOError("Failed to write to stream.") from e
73-
74-
async def close(self) -> None:
75-
if self._closed:
76-
return
77-
self._closed = True
78-
79-
if (close := getattr(self._writer, "close", None)) is not None:
80-
if asyncio.iscoroutine(result := close()):
81-
await result
82-
83-
@property
84-
def closed(self) -> bool:
85-
return self._closed
86-
87-
8831
class EventSerializer(SpecificShapeSerializer):
8932
def __init__(
9033
self,

0 commit comments

Comments
 (0)