Skip to content

Commit c911a8d

Browse files
committed
fix(event-stream): handle initial response
1 parent 48f4e43 commit c911a8d

File tree

5 files changed

+42
-15
lines changed

5 files changed

+42
-15
lines changed

packages/eventstream-serde-universal/src/EventStreamMarshaller.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,6 @@ import { Decoder, Encoder, EventStreamMarshaller as IEventStreamMarshaller, Mess
1111
import { getChunkedStream } from "./getChunkedStream";
1212
import { getMessageUnmarshaller } from "./getUnmarshalledStream";
1313

14-
/**
15-
* @internal
16-
*/
17-
export interface EventStreamMarshaller extends IEventStreamMarshaller {}
18-
1914
/**
2015
* @internal
2116
*/
@@ -27,7 +22,7 @@ export interface EventStreamMarshallerOptions {
2722
/**
2823
* @internal
2924
*/
30-
export class EventStreamMarshaller {
25+
export class EventStreamMarshaller implements IEventStreamMarshaller {
3126
private readonly eventStreamCodec: EventStreamCodec;
3227
private readonly utfEncoder: Encoder;
3328

@@ -41,11 +36,9 @@ export class EventStreamMarshaller {
4136
deserializer: (input: Record<string, Message>) => Promise<T>
4237
): AsyncIterable<T> {
4338
const inputStream = getChunkedStream(body);
44-
// @ts-expect-error Type 'SmithyMessageDecoderStream<Record<string, any>>' is not assignable to type 'AsyncIterable<T>'
45-
return new SmithyMessageDecoderStream({
39+
return new SmithyMessageDecoderStream<T>({
4640
messageStream: new MessageDecoderStream({ inputStream, decoder: this.eventStreamCodec }),
47-
// @ts-expect-error Type 'T' is not assignable to type 'Record<string, any>'
48-
deserializer: getMessageUnmarshaller(deserializer, this.utfEncoder),
41+
deserializer: getMessageUnmarshaller<any>(deserializer, this.utfEncoder),
4942
});
5043
}
5144

packages/eventstream-serde-universal/src/getUnmarshalledStream.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ export function getMessageUnmarshaller<T extends Record<string, any>>(
6161
const event = {
6262
[message.headers[":event-type"].value as string]: message,
6363
};
64-
const deserialized = await deserializer(event);
65-
if (deserialized.$unknown) return;
66-
return deserialized;
64+
return deserializer(event);
6765
} else {
6866
throw Error(`Unrecognizable event type: ${message.headers[":event-type"].value}`);
6967
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Peeks the first frame of the async iterable and writes the values into
3+
* the container if it is an initial-response event.
4+
*
5+
* @internal
6+
*
7+
* @param container - write destination for initial-response.
8+
* @param responseIterable - the response event stream.
9+
*/
10+
export async function writeResponse<T>(
11+
container: Record<string, any>,
12+
responseIterable: AsyncIterable<T>
13+
): Promise<AsyncIterable<T>> {
14+
const asyncIterator = responseIterable[Symbol.asyncIterator]();
15+
// todo: handle empty iterator or timeout.
16+
const firstFrame = await asyncIterator.next();
17+
if (firstFrame.value.$unknown?.["initial-response"]) {
18+
console.log("assigned initial response into container", {
19+
initialResponse: firstFrame.value.$unknown["initial-response"],
20+
});
21+
Object.assign(container, firstFrame.value.$unknown["initial-response"]);
22+
return {
23+
[Symbol.asyncIterator]: () => ({
24+
next: asyncIterator.next.bind(asyncIterator),
25+
}),
26+
};
27+
}
28+
return responseIterable;
29+
}
30+
31+
/**
32+
* @internal
33+
*/
34+
export async function writeRequest<T>() {
35+
throw new Error("not implemented");
36+
}

packages/types/src/eventStream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export interface EventStreamMarshallerSerFn<StreamType> {
119119
* @public
120120
*
121121
* An interface which provides functions for serializing and deserializing binary event stream
122-
* to/from corresponsing modeled shape.
122+
* to/from corresponding modeled shape.
123123
*/
124124
export interface EventStreamMarshaller<StreamType = any> {
125125
deserialize: EventStreamMarshallerDeserFn<StreamType>;

smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/integration/EventStreamGenerator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import software.amazon.smithy.utils.SmithyUnstableApi;
4949

5050
/**
51-
* Evnetstream code generator.
51+
* Event stream code generator.
5252
*/
5353
@SmithyUnstableApi
5454
public class EventStreamGenerator {

0 commit comments

Comments
 (0)