Skip to content

Commit a039242

Browse files
Add EventStream interfaces
This adds in the EventStream interfaces that operations will use as their return types.
1 parent 882603b commit a039242

File tree

1 file changed

+177
-0
lines changed
  • python-packages/smithy-event-stream/smithy_event_stream/aio

1 file changed

+177
-0
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import Any, Protocol, Self
4+
5+
from smithy_core.deserializers import DeserializeableShape
6+
from smithy_core.serializers import SerializeableShape
7+
8+
9+
class InputEventStream[E: SerializeableShape](Protocol):
10+
"""Asynchronously sends events to a service.
11+
12+
This may be used as a context manager to ensure the stream is closed before exiting.
13+
"""
14+
15+
async def send(self, event: E) -> None:
16+
"""Sends an event to the service.
17+
18+
:param event: The event to send.
19+
"""
20+
...
21+
22+
def close(self) -> None:
23+
"""Closes the event stream."""
24+
...
25+
26+
def __enter__(self) -> Self:
27+
return self
28+
29+
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any):
30+
self.close()
31+
32+
33+
class OutputEventStream[E: DeserializeableShape](Protocol):
34+
"""Asynchronously receives events from a service.
35+
36+
Events may be recived via the ``receive`` method or by using this class as
37+
an async iterable.
38+
39+
This may also be used as a context manager to ensure the stream is closed before
40+
exiting.
41+
"""
42+
43+
async def receive(self) -> E | None:
44+
"""Receive a single event from the service.
45+
46+
:returns: An event or None. None indicates that no more events will be sent by
47+
the service.
48+
"""
49+
...
50+
51+
def close(self) -> None:
52+
"""Closes the event stream."""
53+
...
54+
55+
async def __anext__(self) -> E:
56+
result = await self.receive()
57+
if result is None:
58+
self.close()
59+
raise StopAsyncIteration
60+
return result
61+
62+
def __aiter__(self) -> Self:
63+
return self
64+
65+
def __enter__(self) -> Self:
66+
return self
67+
68+
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any):
69+
self.close()
70+
71+
72+
class EventStream[I: InputEventStream[Any] | None, O: OutputEventStream[Any] | None, R](
73+
Protocol
74+
):
75+
"""A unidirectional or bidirectional event stream.
76+
77+
To ensure that streams are closed upon exiting, this class may be used as an async
78+
context manager.
79+
80+
.. code-block:: python
81+
82+
async def main():
83+
client = ChatClient()
84+
input = StreamMessagesInput(chat_room="aws-python-sdk", username="hunter7")
85+
86+
async with client.stream_messages(input=input) as stream:
87+
stream.input_stream.send(MessageStreamMessage("Chat logger starting up."))
88+
response_handler = handle_output(stream)
89+
stream.input_stream.send(MessageStreamMessage("Chat logger active."))
90+
await response_handler
91+
92+
async def handle_output(stream: EventStream) -> None:
93+
_, output_stream = await stream.await_output()
94+
async for event in output_stream:
95+
match event:
96+
case MessageStreamMessage():
97+
print(event.value)
98+
case MessageStreamShutdown():
99+
return
100+
case _:
101+
stream.input_stream.send(
102+
MessageStreamMessage("Unknown message type received. Shutting down.")
103+
)
104+
return
105+
"""
106+
107+
input_stream: I
108+
"""An event stream that sends events to the service.
109+
110+
This value will be None if the operation has no input stream.
111+
"""
112+
113+
output_stream: O | None = None
114+
"""An event stream that receives events from the service.
115+
116+
This value may be None until ``await_output`` has been called.
117+
118+
This value will also be None if the operation has no output stream.
119+
"""
120+
121+
response: R | None = None
122+
"""The initial response from the service.
123+
124+
This value may be None until ``await_output`` has been called.
125+
126+
This may include context necessary to interpret output events or prepare
127+
input events. It will always be available before any events.
128+
"""
129+
130+
async def await_output(self) -> tuple[R, O]:
131+
"""Await the operation's output.
132+
133+
The EventStream will be returned as soon as the input stream is ready to
134+
receive events, which may be before the intitial response has been received
135+
and the service is ready to send events.
136+
137+
Awaiting this method will wait until the initial response was received and the
138+
service is ready to send events. The initial response and output stream will
139+
be returned by this operation and also cached in ``response`` and
140+
``output_stream``, respectively.
141+
142+
The default implementation of this method performs the caching behavior,
143+
delegating to the abstract ``_await_output`` method to actually retrieve the
144+
initial response and output stream.
145+
146+
:returns: A tuple containing the initial response and output stream. If the
147+
operation has no output stream, the second value will be None.
148+
"""
149+
if self.response is not None:
150+
self.response, self.output_stream = await self._await_output()
151+
152+
return self._response, self._output_stream # type: ignore
153+
154+
async def _await_output(self) -> tuple[R, O]:
155+
"""Await the operation's output without caching.
156+
157+
This method is meant to be used with the default implementation of await_output.
158+
It should return the output directly without caching.
159+
"""
160+
...
161+
162+
async def close(self) -> None:
163+
"""Closes the event stream.
164+
165+
This closes both the input and output streams.
166+
"""
167+
if self.output_stream is None:
168+
_, self.output_stream = await self.await_output()
169+
170+
if self.output_stream is not None:
171+
self.output_stream.close()
172+
173+
async def __aenter__(self) -> Self:
174+
return self
175+
176+
async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
177+
await self.close()

0 commit comments

Comments
 (0)