Skip to content

Commit 7cec4ee

Browse files
Create high-level AWS event streams
This updates the high-level event stream interfaces and creates AWS implementations of them. The `EventStream` protocol was split into three protocols: `DuplexEventStream`, `InputEventStream`, and `OutputEventStream`. These three classes encompass the three different configurations that clients can expect, and each are typed with their particular use-case in mind. This lets the type declarations be more concise and accurate. Before, it could be extremely ambiguous from a typing perspective what you were getting. The old `InputEventStream` and `OutputEventStream` classes were renamed to `AsyncEventPublisher` and `AsyncEventReceiver`, respectively. This is a more accurate description of what they do, particularly as they can be used for a service implementation as well. In the AWS implementation, some changes needed to be made. Notably the `Event` class had to get a `decode_async` method to be able to read from an async stream. Then the calling of that method had to be pulled out of the deserializer so that both sync and async clients can use it. Test cases were updated to also test the async method. Tests for the event stream classes will come in the form of protocol tests later on.
1 parent fda945a commit 7cec4ee

File tree

11 files changed

+569
-54
lines changed

11 files changed

+569
-54
lines changed

python-packages/aws-event-stream/aws_event_stream/_private/deserializers.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@
33
import datetime
44
from collections.abc import Callable
55

6+
from smithy_core.aio.interfaces import AsyncByteStream, AsyncCloseable
67
from smithy_core.codecs import Codec
7-
from smithy_core.deserializers import ShapeDeserializer, SpecificShapeDeserializer
8-
from smithy_core.interfaces import BytesReader
8+
from smithy_core.deserializers import (
9+
DeserializeableShape,
10+
ShapeDeserializer,
11+
SpecificShapeDeserializer,
12+
)
913
from smithy_core.schemas import Schema
1014
from smithy_core.utils import expect_type
15+
from smithy_event_stream.aio.interfaces import AsyncEventReceiver
1116

1217
from ..events import HEADERS_DICT, Event
1318
from ..exceptions import EventError, UnmodeledEventError
@@ -17,11 +22,38 @@
1722
INITIAL_MESSAGE_TYPES = (INITIAL_REQUEST_EVENT_TYPE, INITIAL_RESPONSE_EVENT_TYPE)
1823

1924

20-
class EventDeserializer(SpecificShapeDeserializer):
25+
class AWSAsyncEventReceiver[E: DeserializeableShape](AsyncEventReceiver[E]):
2126
def __init__(
22-
self, source: BytesReader, payload_codec: Codec, is_client_mode: bool = True
27+
self,
28+
payload_codec: Codec,
29+
source: AsyncByteStream,
30+
deserializer: Callable[[ShapeDeserializer], E],
31+
is_client_mode: bool = True,
2332
) -> None:
33+
self._payload_codec = payload_codec
2434
self._source = source
35+
self._is_client_mode = is_client_mode
36+
self._deserializer = deserializer
37+
38+
async def receive(self) -> E | None:
39+
event = await Event.decode_async(self._source)
40+
deserializer = EventDeserializer(
41+
event=event,
42+
payload_codec=self._payload_codec,
43+
is_client_mode=self._is_client_mode,
44+
)
45+
return self._deserializer(deserializer)
46+
47+
async def close(self) -> None:
48+
if isinstance(self._source, AsyncCloseable):
49+
await self._source.close()
50+
51+
52+
class EventDeserializer(SpecificShapeDeserializer):
53+
def __init__(
54+
self, event: Event, payload_codec: Codec, is_client_mode: bool = True
55+
) -> None:
56+
self._event = event
2557
self._payload_codec = payload_codec
2658
self._is_client_mode = is_client_mode
2759

@@ -30,13 +62,12 @@ def read_struct(
3062
schema: Schema,
3163
consumer: Callable[[Schema, ShapeDeserializer], None],
3264
) -> None:
33-
event = Event.decode(self._source)
34-
headers = event.message.headers
65+
headers = self._event.message.headers
3566

3667
payload_deserializer = None
37-
if event.message.payload:
68+
if self._event.message.payload:
3869
payload_deserializer = self._payload_codec.create_deserializer(
39-
event.message.payload
70+
self._event.message.payload
4071
)
4172

4273
message_deserializer = EventMessageDeserializer(headers, payload_deserializer)
@@ -61,7 +92,7 @@ def read_struct(
6192
expect_type(str, headers[":error-message"]),
6293
)
6394
case _:
64-
raise EventError(f"Unknown event structure: {event}")
95+
raise EventError(f"Unknown event structure: {self._event}")
6596

6697

6798
class EventMessageDeserializer(SpecificShapeDeserializer):

python-packages/aws-event-stream/aws_event_stream/_private/serializers.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33
import datetime
4-
from collections.abc import Iterator
4+
from collections.abc import Callable, Iterator
55
from contextlib import contextmanager
66
from io import BytesIO
77
from typing import Never
88

9+
from smithy_core.aio.interfaces import AsyncCloseable, AsyncWriter
910
from smithy_core.codecs import Codec
11+
from smithy_core.exceptions import ExpectationNotMetException
1012
from smithy_core.schemas import Schema
1113
from smithy_core.serializers import (
1214
InterceptingSerializer,
15+
SerializeableShape,
1316
ShapeSerializer,
1417
SpecificShapeSerializer,
1518
)
1619
from smithy_core.shapes import ShapeType
1720
from smithy_core.utils import expect_type
21+
from smithy_event_stream.aio.interfaces import AsyncEventPublisher
1822

1923
from ..events import EventHeaderEncoder, EventMessage
2024
from ..exceptions import InvalidHeaderValue
@@ -30,6 +34,40 @@
3034
_DEFAULT_BLOB_CONTENT_TYPE = "application/octet-stream"
3135

3236

37+
type Signer = Callable[[EventMessage], EventMessage]
38+
"""A function that takes an event message and signs it, and returns it signed."""
39+
40+
41+
class AWSAsyncEventPublisher[E: SerializeableShape](AsyncEventPublisher[E]):
42+
def __init__(
43+
self,
44+
payload_codec: Codec,
45+
async_writer: AsyncWriter,
46+
signer: Signer | None = None,
47+
is_client_mode: bool = True,
48+
):
49+
self._writer = async_writer
50+
self._signer = signer
51+
self._serializer = EventSerializer(
52+
payload_codec=payload_codec, is_client_mode=is_client_mode
53+
)
54+
55+
async def send(self, event: E) -> None:
56+
event.serialize(self._serializer)
57+
result = self._serializer.get_result()
58+
if result is None:
59+
raise ExpectationNotMetException(
60+
"Expected an event message to be serialized, but was None."
61+
)
62+
if self._signer is not None:
63+
result = self._signer(result)
64+
await self._writer.write(result.encode())
65+
66+
async def close(self) -> None:
67+
if isinstance(self._writer, AsyncCloseable):
68+
await self._writer.close()
69+
70+
3371
class EventSerializer(SpecificShapeSerializer):
3472
def __init__(
3573
self,

0 commit comments

Comments
 (0)