Skip to content

Commit 7abe2ce

Browse files
authored
Merge branch 'develop' into convert-docs-to-rst
2 parents fb551e2 + 072adf4 commit 7abe2ce

File tree

35 files changed

+674
-613
lines changed

35 files changed

+674
-613
lines changed

codegen/core/src/main/java/software/amazon/smithy/python/codegen/ClientGenerator.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,13 @@ def _classify_error(
196196
if (hasStreaming) {
197197
writer.addStdlibImports("typing", Set.of("Any", "Awaitable"));
198198
writer.addStdlibImport("asyncio");
199+
200+
writer.addImports("smithy_core.aio.eventstream",
201+
Set.of(
202+
"InputEventStream",
203+
"OutputEventStream",
204+
"DuplexEventStream"));
205+
writer.addImport("smithy_core.aio.interfaces.eventstream", "EventReceiver");
199206
writer.write(
200207
"""
201208
async def _input_stream[Input: SerializeableShape, Output: DeserializeableShape](
@@ -214,6 +221,10 @@ def _classify_error(
214221
))
215222
request_context = await request_future
216223
${5C|}
224+
return InputEventStream[Any, Any](
225+
input_stream=publisher,
226+
output_future=awaitable_output,
227+
)
217228
218229
async def _output_stream[Input: SerializeableShape, Output: DeserializeableShape](
219230
self,
@@ -232,6 +243,10 @@ def _classify_error(
232243
)
233244
transport_response = await response_future
234245
${6C|}
246+
return OutputEventStream[Any, Any](
247+
output_stream=receiver,
248+
output=output
249+
)
235250
236251
async def _duplex_stream[Input: SerializeableShape, Output: DeserializeableShape](
237252
self,
@@ -251,15 +266,34 @@ def _classify_error(
251266
response_future=response_future
252267
))
253268
request_context = await request_future
254-
${7C|}
269+
${5C|}
270+
output_future = asyncio.create_task(self._wrap_duplex_output(
271+
response_future, awaitable_output, config, operation_name,
272+
event_deserializer
273+
))
274+
return DuplexEventStream[Any, Any, Any](
275+
input_stream=publisher,
276+
output_future=output_future,
277+
)
278+
279+
async def _wrap_duplex_output(
280+
self,
281+
response_future: Future[$3T],
282+
awaitable_output: Future[Any],
283+
config: $4T,
284+
operation_name: str,
285+
event_deserializer: Callable[[ShapeDeserializer], Any],
286+
) -> tuple[Any, EventReceiver[Any]]:
287+
transport_response = await response_future
288+
${6C|}
289+
return await awaitable_output, receiver
255290
""",
256291
pluginSymbol,
257292
transportRequest,
258293
transportResponse,
259294
configSymbol,
260295
writer.consumer(w -> context.protocolGenerator().wrapInputStream(context, w)),
261-
writer.consumer(w -> context.protocolGenerator().wrapOutputStream(context, w)),
262-
writer.consumer(w -> context.protocolGenerator().wrapDuplexStream(context, w)));
296+
writer.consumer(w -> context.protocolGenerator().wrapOutputStream(context, w)));
263297
}
264298
writer.addStdlibImport("typing", "Any");
265299
writer.addStdlibImport("asyncio", "iscoroutine");
@@ -282,9 +316,9 @@ def _classify_error(
282316
request_future, response_future,
283317
)
284318
except Exception as e:
285-
if request_future is not None and not request_future.done:
319+
if request_future is not None and not request_future.done():
286320
request_future.set_exception($4T(e))
287-
if response_future is not None and not response_future.done:
321+
if response_future is not None and not response_future.done():
288322
response_future.set_exception($4T(e))
289323
290324
# Make sure every exception that we throw is an instance of $4T so
@@ -872,7 +906,6 @@ private void generateEventStreamOperation(PythonWriter writer, OperationShape op
872906

873907
if (inputStreamSymbol != null) {
874908
if (outputStreamSymbol != null) {
875-
writer.addImport("smithy_event_stream.aio.interfaces", "DuplexEventStream");
876909
writer.write("""
877910
async def ${operationName:L}(
878911
self,
@@ -922,7 +955,6 @@ raise NotImplementedError()
922955
""", writer.consumer(w -> writeSharedOperationInit(w, operation, input)));
923956
}
924957
} else {
925-
writer.addImport("smithy_event_stream.aio.interfaces", "OutputEventStream");
926958
writer.write("""
927959
async def ${operationName:L}(
928960
self,

codegen/core/src/main/java/software/amazon/smithy/python/codegen/generators/ProtocolGenerator.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,4 @@ default void generateProtocolTests(GenerationContext context) {}
157157
default void wrapInputStream(GenerationContext context, PythonWriter writer) {}
158158

159159
default void wrapOutputStream(GenerationContext context, PythonWriter writer) {}
160-
161-
default void wrapDuplexStream(GenerationContext context, PythonWriter writer) {}
162160
}

codegen/core/src/main/java/software/amazon/smithy/python/codegen/integrations/RestJsonProtocolGenerator.java

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -396,13 +396,12 @@ public void wrapInputStream(GenerationContext context, PythonWriter writer) {
396396
writer.addImport("smithy_json", "JSONCodec");
397397
writer.addImport("smithy_core.aio.types", "AsyncBytesReader");
398398
writer.addImport("smithy_core.types", "TimestampFormat");
399-
writer.addImport("aws_event_stream.aio", "AWSInputEventStream");
399+
writer.addImport("aws_event_stream.aio", "AWSEventPublisher");
400400
writer.write(
401401
"""
402402
codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS)
403-
return AWSInputEventStream[Any, Any](
403+
publisher = AWSEventPublisher[Any](
404404
payload_codec=codec,
405-
awaitable_output=awaitable_output,
406405
async_writer=request_context.transport_request.body, # type: ignore
407406
)
408407
""");
@@ -415,39 +414,17 @@ public void wrapOutputStream(GenerationContext context, PythonWriter writer) {
415414
writer.addImport("smithy_json", "JSONCodec");
416415
writer.addImport("smithy_core.aio.types", "AsyncBytesReader");
417416
writer.addImport("smithy_core.types", "TimestampFormat");
418-
writer.addImport("aws_event_stream.aio", "AWSOutputEventStream");
417+
writer.addImport("aws_event_stream.aio", "AWSEventReceiver");
419418
writer.write(
420419
"""
421420
codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS)
422-
return AWSOutputEventStream[Any, Any](
421+
receiver = AWSEventReceiver(
423422
payload_codec=codec,
424-
initial_response=output,
425-
async_reader=AsyncBytesReader(
423+
source=AsyncBytesReader(
426424
transport_response.body # type: ignore
427425
),
428426
deserializer=event_deserializer, # type: ignore
429427
)
430428
""");
431429
}
432-
433-
@Override
434-
public void wrapDuplexStream(GenerationContext context, PythonWriter writer) {
435-
writer.addDependency(SmithyPythonDependency.SMITHY_JSON);
436-
writer.addDependency(SmithyPythonDependency.AWS_EVENT_STREAM);
437-
writer.addImport("smithy_json", "JSONCodec");
438-
writer.addImport("smithy_core.aio.types", "AsyncBytesReader");
439-
writer.addImport("smithy_core.types", "TimestampFormat");
440-
writer.addImport("aws_event_stream.aio", "AWSDuplexEventStream");
441-
writer.write(
442-
"""
443-
codec = JSONCodec(default_timestamp_format=TimestampFormat.EPOCH_SECONDS)
444-
return AWSDuplexEventStream[Any, Any, Any](
445-
payload_codec=codec,
446-
async_writer=request_context.transport_request.body, # type: ignore
447-
awaitable_output=awaitable_output,
448-
awaitable_response=response_future,
449-
deserializer=event_deserializer, # type: ignore
450-
)
451-
""");
452-
}
453430
}

codegen/gradle/libs.versions.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[versions]
2-
junit5 = "5.12.0"
2+
junit5 = "5.12.1"
33
smithy = "1.55.0"
44
test-logger-plugin = "4.0.0"
55
spotbugs = "6.0.22"
66
spotless = "7.0.2"
77
smithy-gradle-plugins = "1.2.0"
8-
dep-analysis = "2.11.0"
8+
dep-analysis = "2.12.0"
99
jsoup = "1.19.1"
1010
commonmark = "0.15.2"
1111

designs/event-streams.md

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ async with publisher:
5353
publisher.send(FooEvent(foo="bar"))
5454
```
5555

56+
Protocol implementations will be responsible for creating publishers.
57+
5658
## Event Receivers
5759

5860
An `AsyncEventReceiver` is used to receive events from a service.
@@ -131,6 +133,8 @@ async for event in reciever:
131133
handle_event(event)
132134
```
133135

136+
Protocol implementations will be responsible for creating receivers.
137+
134138
### Errors
135139

136140
Event streams may define modeled errors that may be sent over the stream. These
@@ -169,38 +173,30 @@ are handled by the following classes:
169173
* `OutputEventStream` is returned when the operation only has an output stream.
170174

171175
```python
172-
class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protocol):
173-
174-
input_stream: AsyncEventPublisher[I]
175-
176-
_output_stream: AsyncEventReceiver[O] | None = None
177-
_response: R | None = None
178-
179-
@property
180-
def output_stream(self) -> AsyncEventReceiver[O] | None:
181-
return self._output_stream
182-
183-
@output_stream.setter
184-
def output_stream(self, value: AsyncEventReceiver[O]) -> None:
185-
self._output_stream = value
186-
187-
@property
188-
def response(self) -> R | None:
189-
return self._response
190-
191-
@response.setter
192-
def response(self, value: R) -> None:
193-
self._response = value
194-
195-
async def await_output(self) -> tuple[R, AsyncEventReceiver[O]]:
176+
class DuplexEventStream[
177+
IE: SerializeableShape,
178+
OE: DeserializeableShape,
179+
O: DeserializeableShape,
180+
]:
181+
182+
input_stream: EventPublisher[IE]
183+
output_stream: EventReceiver[OE] | None = None
184+
output: O | None = None
185+
186+
def __init__(
187+
self,
188+
*,
189+
input_stream: EventPublisher[IE],
190+
output_future: Future[tuple[O, EventReceiver[OE]]],
191+
) -> None:
192+
self.input_stream = input_stream
193+
self._output_future = output_future
194+
195+
async def await_output(self) -> tuple[O, EventReceiver[OE]]:
196196
...
197197

198198
async def close(self) -> None:
199-
if self.output_stream is None:
200-
_, self.output_stream = await self.await_output()
201-
202-
await self.input_stream.close()
203-
await self.output_stream.close()
199+
...
204200

205201
async def __aenter__(self) -> Self:
206202
return self
@@ -209,21 +205,21 @@ class DuplexEventStream[I: SerializableShape, O: DeserializableShape, R](Protoco
209205
await self.close()
210206

211207

212-
class InputEventStream[I: SerializableShape, R](Protocol):
208+
class InputEventStream[IE: SerializeableShape, O]:
213209

214-
input_stream: AsyncEventPublisher[I]
210+
input_stream: EventPublisher[IE]
211+
output: O | None = None
215212

216-
_response: R | None = None
213+
def __init__(
214+
self,
215+
*,
216+
input_stream: EventPublisher[IE],
217+
output_future: Future[O],
218+
) -> None:
219+
self.input_stream = input_stream
220+
self._output_future = output_future
217221

218-
@property
219-
def response(self) -> R | None:
220-
return self._response
221-
222-
@response.setter
223-
def response(self, value: R) -> None:
224-
self._response = value
225-
226-
async def await_output(self) -> R:
222+
async def await_output(self) -> O:
227223
...
228224

229225
async def close(self) -> None:
@@ -236,11 +232,14 @@ class InputEventStream[I: SerializableShape, R](Protocol):
236232
await self.close()
237233

238234

239-
class OutputEventStream[O: DeserializableShape, R](Protocol):
235+
class OutputEventStream[OE: DeserializeableShape, O: DeserializeableShape]:
240236

241-
output_stream: AsyncEventReceiver[O]
242-
243-
response: R
237+
output_stream: EventReceiver[OE]
238+
output: O
239+
240+
def __init__(self, output_stream: EventReceiver[OE], output: O) -> None:
241+
self.output_stream = output_stream
242+
self.output = output
244243

245244
async def close(self) -> None:
246245
await self.output_stream.close()
@@ -258,7 +257,7 @@ the underlying publisher and/or receiver.
258257

259258
Both `InputEventStream` and `DuplexEventStream` have an `await_output` method
260259
that waits for the initial request to be received, returning that and the output
261-
stream. Their `response` and `output_stream` properties will not be set until
260+
stream. Their `output` and `output_stream` properties will not be set until
262261
then. This is important because clients MUST be able to start sending events to
263262
the service immediately, without waiting for the initial response. This is
264263
critical because there are existing services that require one or more events to
@@ -278,8 +277,8 @@ with await client.input_operation() as stream:
278277
stream.input_stream.send(FooEvent(foo="bar"))
279278
```
280279

281-
The `OutputEventStream`'s initial `response` and `output_stream` will never be
282-
`None`, however. Instead, the `ClientProtocol` MUST set values for these when
280+
The `OutputEventStream`'s `output` and `output_stream` will never be `None`,
281+
however. Instead, the `ClientProtocol` MUST set values for these when
283282
constructing the object. This differs from the other stream types because the
284283
lack of an input stream means that the service has nothing to wait on from the
285284
client before sending responses.
@@ -290,6 +289,9 @@ with await client.output_operation() as stream:
290289
handle_event(event)
291290
```
292291

292+
All three output types are centrally located and will be constructed by filling
293+
in the relevant publishers and receivers from the protocol implementation.
294+
293295
## Event Structure
294296

295297
Event messages are structurally similar to HTTP messages. They consist of a map

packages/aws-event-stream/pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ readme = "README.md"
66
requires-python = ">=3.12"
77
dependencies = [
88
"smithy-core",
9-
"smithy-event-stream",
109
]
1110

1211
[build-system]

0 commit comments

Comments
 (0)