Skip to content

Commit c6b60fd

Browse files
Centralize event stream wrappers
1 parent a32bf5c commit c6b60fd

File tree

7 files changed

+349
-549
lines changed

7 files changed

+349
-549
lines changed

packages/aws-event-stream/src/aws_event_stream/_private/deserializers.py

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import asyncio
43
import datetime
54
from collections.abc import Callable
65

7-
from smithy_core.aio.interfaces import AsyncByteStream
86
from smithy_core.codecs import Codec
97
from smithy_core.deserializers import (
10-
DeserializeableShape,
118
ShapeDeserializer,
129
SpecificShapeDeserializer,
1310
)
1411
from smithy_core.schemas import Schema
1512
from smithy_core.shapes import ShapeType
1613
from smithy_core.utils import expect_type
17-
from smithy_core.aio.interfaces.eventstream import AsyncEventReceiver
1814

1915
from ..events import HEADERS_DICT, Event
2016
from ..exceptions import EventError, UnmodeledEventError
@@ -28,40 +24,6 @@
2824
INITIAL_MESSAGE_TYPES = (INITIAL_REQUEST_EVENT_TYPE, INITIAL_RESPONSE_EVENT_TYPE)
2925

3026

31-
class AWSAsyncEventReceiver[E: DeserializeableShape](AsyncEventReceiver[E]):
32-
def __init__(
33-
self,
34-
payload_codec: Codec,
35-
source: AsyncByteStream,
36-
deserializer: Callable[[ShapeDeserializer], E],
37-
is_client_mode: bool = True,
38-
) -> None:
39-
self._payload_codec = payload_codec
40-
self._source = source
41-
self._is_client_mode = is_client_mode
42-
self._deserializer = deserializer
43-
44-
async def receive(self) -> E | None:
45-
event = await Event.decode_async(self._source)
46-
if event is None:
47-
return None
48-
49-
deserializer = EventDeserializer(
50-
event=event,
51-
payload_codec=self._payload_codec,
52-
is_client_mode=self._is_client_mode,
53-
)
54-
result = self._deserializer(deserializer)
55-
if isinstance(getattr(result, "value"), Exception):
56-
raise result.value # type: ignore
57-
return result
58-
59-
async def close(self) -> None:
60-
if (close := getattr(self._source, "close", None)) is not None:
61-
if asyncio.iscoroutine(result := close()):
62-
await result
63-
64-
6527
class EventDeserializer(SpecificShapeDeserializer):
6628
def __init__(
6729
self, event: Event, payload_codec: Codec, is_client_mode: bool = True

packages/aws-event-stream/src/aws_event_stream/_private/serializers.py

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,19 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
import asyncio
43
import datetime
5-
from collections.abc import Callable, Iterator
4+
from collections.abc import Iterator
65
from contextlib import contextmanager
76
from io import BytesIO
87
from typing import Never
98

10-
from smithy_core.aio.interfaces import AsyncWriter
119
from smithy_core.codecs import Codec
12-
from smithy_core.exceptions import ExpectationNotMetException
1310
from smithy_core.schemas import Schema
1411
from smithy_core.serializers import (
1512
InterceptingSerializer,
16-
SerializeableShape,
1713
ShapeSerializer,
1814
SpecificShapeSerializer,
1915
)
2016
from smithy_core.shapes import ShapeType
21-
from smithy_core.aio.interfaces.eventstream import AsyncEventPublisher
2217

2318
from ..events import EventMessage, HEADER_VALUE, Short, Byte, Long
2419
from ..exceptions import InvalidHeaderValue
@@ -33,41 +28,6 @@
3328
_DEFAULT_BLOB_CONTENT_TYPE = "application/octet-stream"
3429

3530

36-
type Signer = Callable[[EventMessage], EventMessage]
37-
"""A function that takes an event message and signs it, and returns it signed."""
38-
39-
40-
class AWSAsyncEventPublisher[E: SerializeableShape](AsyncEventPublisher[E]):
41-
def __init__(
42-
self,
43-
payload_codec: Codec,
44-
async_writer: AsyncWriter,
45-
signer: Signer | None = None,
46-
is_client_mode: bool = True,
47-
):
48-
self._writer = async_writer
49-
self._signer = signer
50-
self._serializer = EventSerializer(
51-
payload_codec=payload_codec, is_client_mode=is_client_mode
52-
)
53-
54-
async def send(self, event: E) -> None:
55-
event.serialize(self._serializer)
56-
result = self._serializer.get_result()
57-
if result is None:
58-
raise ExpectationNotMetException(
59-
"Expected an event message to be serialized, but was None."
60-
)
61-
if self._signer is not None:
62-
result = self._signer(result)
63-
await self._writer.write(result.encode())
64-
65-
async def close(self) -> None:
66-
if (close := getattr(self._writer, "close", None)) is not None:
67-
if asyncio.iscoroutine(result := close()):
68-
await result
69-
70-
7131
class EventSerializer(SpecificShapeSerializer):
7232
def __init__(
7333
self,

0 commit comments

Comments
 (0)