Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,13 @@ def _classify_error(
if (hasStreaming) {
writer.addStdlibImports("typing", Set.of("Any", "Awaitable"));
writer.addStdlibImport("asyncio");

writer.addImports("smithy_core.aio.eventstream",
Set.of(
"InputEventStream",
"OutputEventStream",
"DuplexEventStream"));
writer.addImport("smithy_core.aio.interfaces.eventstream", "EventReceiver");
writer.write(
"""
async def _input_stream[Input: SerializeableShape, Output: DeserializeableShape](
Expand All @@ -218,6 +225,10 @@ def _classify_error(
))
request_context = await request_future
${5C|}
return InputEventStream[Any, Any](
input_stream=publisher,
output_future=awaitable_output,
)
async def _output_stream[Input: SerializeableShape, Output: DeserializeableShape](
self,
Expand All @@ -236,6 +247,10 @@ def _classify_error(
)
transport_response = await response_future
${6C|}
return OutputEventStream[Any, Any](
output_stream=receiver,
output=output
)
async def _duplex_stream[Input: SerializeableShape, Output: DeserializeableShape](
self,
Expand All @@ -255,15 +270,34 @@ def _classify_error(
response_future=response_future
))
request_context = await request_future
${7C|}
${5C|}
output_future = asyncio.create_task(self._wrap_duplex_output(
response_future, awaitable_output, config, operation_name,
event_deserializer
))
return DuplexEventStream[Any, Any, Any](
input_stream=publisher,
output_future=output_future,
)
async def _wrap_duplex_output(
self,
response_future: Future[$3T],
awaitable_output: Future[Any],
config: $4T,
operation_name: str,
event_deserializer: Callable[[ShapeDeserializer], Any],
) -> tuple[Any, EventReceiver[Any]]:
transport_response = await response_future
${6C|}
return await awaitable_output, receiver
""",
pluginSymbol,
transportRequest,
transportResponse,
configSymbol,
writer.consumer(w -> context.protocolGenerator().wrapInputStream(context, w)),
writer.consumer(w -> context.protocolGenerator().wrapOutputStream(context, w)),
writer.consumer(w -> context.protocolGenerator().wrapDuplexStream(context, w)));
writer.consumer(w -> context.protocolGenerator().wrapOutputStream(context, w)));
}
writer.addStdlibImport("typing", "Any");
writer.addStdlibImport("asyncio", "iscoroutine");
Expand Down Expand Up @@ -872,7 +906,6 @@ private void generateEventStreamOperation(PythonWriter writer, OperationShape op

if (inputStreamSymbol != null) {
if (outputStreamSymbol != null) {
writer.addImport("smithy_event_stream.aio.interfaces", "DuplexEventStream");
writer.write("""
async def ${operationName:L}(
self,
Expand Down Expand Up @@ -922,7 +955,6 @@ raise NotImplementedError()
""", writer.consumer(w -> writeSharedOperationInit(w, operation, input)));
}
} else {
writer.addImport("smithy_event_stream.aio.interfaces", "OutputEventStream");
writer.write("""
async def ${operationName:L}(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,4 @@ default void generateProtocolTests(GenerationContext context) {}
default void wrapInputStream(GenerationContext context, PythonWriter writer) {}

default void wrapOutputStream(GenerationContext context, PythonWriter writer) {}

default void wrapDuplexStream(GenerationContext context, PythonWriter writer) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,12 @@ public void wrapInputStream(GenerationContext context, PythonWriter writer) {
writer.addImport("smithy_json", "JSONCodec");
writer.addImport("smithy_core.aio.types", "AsyncBytesReader");
writer.addImport("smithy_core.types", "TimestampFormat");
writer.addImport("aws_event_stream.aio", "AWSInputEventStream");
writer.addImport("aws_event_stream.aio", "AWSEventPublisher");
writer.write(
"""
codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS)
return AWSInputEventStream[Any, Any](
publisher = AWSEventPublisher[Any](
payload_codec=codec,
awaitable_output=awaitable_output,
async_writer=request_context.transport_request.body, # type: ignore
)
""");
Expand All @@ -415,39 +414,17 @@ public void wrapOutputStream(GenerationContext context, PythonWriter writer) {
writer.addImport("smithy_json", "JSONCodec");
writer.addImport("smithy_core.aio.types", "AsyncBytesReader");
writer.addImport("smithy_core.types", "TimestampFormat");
writer.addImport("aws_event_stream.aio", "AWSOutputEventStream");
writer.addImport("aws_event_stream.aio", "AWSEventReceiver");
writer.write(
"""
codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS)
return AWSOutputEventStream[Any, Any](
receiver = AWSEventReceiver(
payload_codec=codec,
initial_response=output,
async_reader=AsyncBytesReader(
source=AsyncBytesReader(
transport_response.body # type: ignore
),
deserializer=event_deserializer, # type: ignore
)
""");
}

@Override
public void wrapDuplexStream(GenerationContext context, PythonWriter writer) {
writer.addDependency(SmithyPythonDependency.SMITHY_JSON);
writer.addDependency(SmithyPythonDependency.AWS_EVENT_STREAM);
writer.addImport("smithy_json", "JSONCodec");
writer.addImport("smithy_core.aio.types", "AsyncBytesReader");
writer.addImport("smithy_core.types", "TimestampFormat");
writer.addImport("aws_event_stream.aio", "AWSDuplexEventStream");
writer.write(
"""
codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS)
return AWSDuplexEventStream[Any, Any, Any](
payload_codec=codec,
async_writer=request_context.transport_request.body, # type: ignore
awaitable_output=awaitable_output,
awaitable_response=response_future,
deserializer=event_deserializer, # type: ignore
)
""");
}
}
98 changes: 50 additions & 48 deletions designs/event-streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ async with publisher:
publisher.send(FooEvent(foo="bar"))
```

Protocol implementations will be responsible for creating publishers.

## Event Receivers

An `AsyncEventReceiver` is used to receive events from a service.
Expand Down Expand Up @@ -131,6 +133,8 @@ async for event in reciever:
handle_event(event)
```

Protocol implementations will be responsible for creating receivers.

### Errors

Event streams may define modeled errors that may be sent over the stream. These
Expand Down Expand Up @@ -169,38 +173,30 @@ are handled by the following classes:
* `OutputEventStream` is returned when the operation only has an output stream.

```python
class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protocol):

input_stream: AsyncEventPublisher[I]

_output_stream: AsyncEventReceiver[O] | None = None
_response: R | None = None

@property
def output_stream(self) -> AsyncEventReceiver[O] | None:
return self._output_stream

@output_stream.setter
def output_stream(self, value: AsyncEventReceiver[O]) -> None:
self._output_stream = value

@property
def response(self) -> R | None:
return self._response

@response.setter
def response(self, value: R) -> None:
self._response = value

async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]:
class DuplexEventStream[
IE: SerializeableShape,
OE: DeserializeableShape,
O: DeserializeableShape,
]:

input_stream: EventPublisher[IE]
output_stream: EventReceiver[OE] | None = None
output: O | None = None

def __init__(
self,
*,
input_stream: EventPublisher[IE],
output_future: Future[tuple[O, EventReceiver[OE]]],
) -> None:
self.input_stream = input_stream
self._output_future = output_future

async def await_output(self) -> tuple[O, EventReceiver[OE]]:
...

async def close(self) -> None:
if self.output_stream is None:
_, self.output_stream = await self.await_output()

await self.input_stream.close()
await self.output_stream.close()
...

async def __aenter__(self) -> Self:
return self
Expand All @@ -209,21 +205,21 @@ class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protoco
await self.close()


class InputEventStream[I: SerializableShape, R](Protocol):
class InputEventStream[IE: SerializeableShape, O]:

input_stream: AsyncEventPublisher[I]
input_stream: EventPublisher[IE]
output: O | None = None

_response: R | None = None
def __init__(
self,
*,
input_stream: EventPublisher[IE],
output_future: Future[O],
) -> None:
self.input_stream = input_stream
self._output_future = output_future

@property
def response(self) -> R | None:
return self._response

@response.setter
def response(self, value: R) -> None:
self._response = value

async def await_output(self) -> R:
async def await_output(self) -> O:
...

async def close(self) -> None:
Expand All @@ -236,11 +232,14 @@ class InputEventStream[I: SerializableShape, R](Protocol):
await self.close()


class OutputEventStream[O: DeserializableShape, R](Protocol):
class OutputEventStream[OE: DeserializeableShape, O: DeserializeableShape]:

output_stream: AsyncEventReceiver[O]

response: R
output_stream: EventReceiver[OE]
output: O

def __init__(self, output_stream: EventReceiver[OE], output: O) -> None:
self.output_stream = output_stream
self.output = output

async def close(self) -> None:
await self.output_stream.close()
Expand All @@ -258,7 +257,7 @@ the underlying publisher and/or receiver.

Both `InputEventStream` and `DuplexEventStream` have an `await_output` method
that waits for the initial request to be received, returning that and the output
stream. Their `response` and `output_stream` properties will not be set until
stream. Their `output` and `output_stream` properties will not be set until
then. This is important because clients MUST be able to start sending events to
the service immediately, without waiting for the initial response. This is
critical because there are existing services that require one or more events to
Expand All @@ -278,8 +277,8 @@ with await client.input_operation() as stream:
stream.input_stream.send(FooEvent(foo="bar"))
```

The `OutputEventStream`'s initial `response` and `output_stream` will never be
`None`, however. Instead, the `ClientProtocol` MUST set values for these when
The `OutputEventStream`'s `output` and `output_stream` will never be `None`,
however. Instead, the `ClientProtocol` MUST set values for these when
constructing the object. This differs from the other stream types because the
lack of an input stream means that the service has nothing to wait on from the
client before sending responses.
Expand All @@ -290,6 +289,9 @@ with await client.output_operation() as stream:
handle_event(event)
```

All three output types are centrally located and will be constructed by filling
in the relevant publishers and receivers from the protocol implementation.

## Event Structure

Event messages are structurally similar to HTTP messages. They consist of a map
Expand Down
1 change: 0 additions & 1 deletion packages/aws-event-stream/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"smithy-core",
"smithy-event-stream",
]

[build-system]
Expand Down
Loading