Skip to content

Commit cf7176c

Browse files
committed
feat(core/protocols): eventstreams for schema serde
1 parent 1cab1f5 commit cf7176c

File tree

3 files changed

+133
-40
lines changed

3 files changed

+133
-40
lines changed

packages/core/src/submodules/protocols/HttpBindingProtocol.ts

Lines changed: 10 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
8686
if (isStreaming) {
8787
const isEventStream = memberNs.isStructSchema();
8888
if (isEventStream) {
89-
// todo(schema)
89+
// todo(schema): serialize event stream payload (REST).
9090
throw new Error("serialization of event streams is not yet implemented");
9191
} else {
9292
// streaming blob body
@@ -223,12 +223,13 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
223223
}
224224
}
225225

226-
const output: Output = {
227-
$metadata: this.deserializeMetadata(response),
228-
...dataObject,
229-
};
226+
const bytes: Uint8Array = await collectBody(response.body, context as SerdeFunctions);
227+
if (bytes.byteLength > 0) {
228+
Object.assign(dataObject, await deserializer.read(ns, bytes));
229+
}
230230

231-
return output;
231+
dataObject.$metadata = this.deserializeMetadata(response);
232+
return dataObject;
232233
}
233234

234235
/**
@@ -277,29 +278,9 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
277278
const isEventStream = memberSchema.isStructSchema();
278279
if (isEventStream) {
279280
// streaming event stream (union)
280-
const context = this.serdeContext as unknown as EventStreamSerdeContext;
281-
if (!context.eventStreamMarshaller) {
282-
throw new Error("@smithy/core - HttpProtocol: eventStreamMarshaller missing in serdeContext.");
283-
}
284-
const memberSchemas = memberSchema.getMemberSchemas();
285-
dataObject[memberName] = context.eventStreamMarshaller.deserialize(response.body, async (event) => {
286-
const unionMember =
287-
Object.keys(event).find((key) => {
288-
return key !== "__type";
289-
}) ?? "";
290-
if (unionMember in memberSchemas) {
291-
const eventStreamSchema = memberSchemas[unionMember];
292-
return {
293-
[unionMember]: await deserializer.read(eventStreamSchema, event[unionMember].body),
294-
};
295-
} else {
296-
// todo(schema): This union convention is ignored by the event stream marshaller.
297-
// todo(schema): This should be returned to the user instead.
298-
// see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts
299-
return {
300-
$unknown: event,
301-
};
302-
}
281+
dataObject[memberName] = this.deserializeEventStream({
282+
response,
283+
unionSchema: memberSchema,
303284
});
304285
} else {
305286
// streaming blob body

packages/core/src/submodules/protocols/HttpProtocol.ts

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
import { NormalizedSchema } from "@smithy/core/schema";
2+
import { SCHEMA } from "@smithy/core/schema";
23
import { HttpRequest, HttpResponse } from "@smithy/protocol-http";
3-
import {
4+
import type {
45
ClientProtocol,
56
Codec,
67
Endpoint,
78
EndpointBearer,
89
EndpointV2,
10+
EventStreamMarshaller,
11+
EventStreamSerdeContext,
912
HandlerExecutionContext,
1013
HttpRequest as IHttpRequest,
1114
HttpResponse as IHttpResponse,
15+
Message as EventStreamMessage,
16+
MessageHeaders,
1217
MetadataBearer,
1318
OperationSchema,
1419
ResponseMetadata,
@@ -17,6 +22,7 @@ import {
1722
ShapeDeserializer,
1823
ShapeSerializer,
1924
} from "@smithy/types";
25+
import { fromUtf8 } from "@smithy/util-utf8";
2026

2127
/**
2228
* Abstract base for HTTP-based client protocols.
@@ -138,6 +144,78 @@ export abstract class HttpProtocol implements ClientProtocol<IHttpRequest, IHttp
138144
};
139145
}
140146

147+
protected serializeEventStream({ input, unionSchema }: { input: AsyncIterable<any>; unionSchema: NormalizedSchema }) {
148+
const marshaller = this.getEventStreamMarshaller();
149+
const memberSchemas = unionSchema.getMemberSchemas();
150+
151+
return marshaller.serialize(input, (event: any): EventStreamMessage => {
152+
const unionMember =
153+
Object.keys(event).find((key) => {
154+
return key !== "__type";
155+
}) ?? "";
156+
const eventStreamSchema = memberSchemas[unionMember] ?? NormalizedSchema.of(SCHEMA.DOCUMENT);
157+
158+
this.serializer.write(eventStreamSchema, event);
159+
const messageSerialization = this.serializer.flush();
160+
161+
const body =
162+
typeof messageSerialization === "string"
163+
? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization)
164+
: messageSerialization;
165+
166+
const headers: MessageHeaders = {
167+
":event-type": { type: "string", value: unionMember },
168+
":message-type": { type: "string", value: "event" },
169+
":content-type": { type: "string", value: this.getDefaultContentType() },
170+
};
171+
172+
return {
173+
headers,
174+
body,
175+
};
176+
});
177+
}
178+
179+
protected deserializeEventStream({
180+
response,
181+
unionSchema,
182+
}: {
183+
response: IHttpResponse;
184+
unionSchema: NormalizedSchema;
185+
}): AsyncIterable<{ [key: string]: any; $unknown?: unknown }> {
186+
const marshaller = this.getEventStreamMarshaller();
187+
const memberSchemas = unionSchema.getMemberSchemas();
188+
189+
return marshaller.deserialize(response.body, async (event) => {
190+
const unionMember =
191+
Object.keys(event).find((key) => {
192+
return key !== "__type";
193+
}) ?? "";
194+
if (unionMember in memberSchemas) {
195+
const eventStreamSchema = memberSchemas[unionMember];
196+
return {
197+
[unionMember]: await this.deserializer.read(eventStreamSchema, event[unionMember].body),
198+
};
199+
} else {
200+
// todo(schema): This union convention is ignored by the event stream marshaller.
201+
// todo(schema): This should be returned to the user instead.
202+
// see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts
203+
return {
204+
$unknown: event,
205+
};
206+
}
207+
});
208+
}
209+
210+
/**
211+
* @returns content-type default header value for event stream events and other documents.
212+
*/
213+
protected getDefaultContentType(): string {
214+
throw new Error(
215+
`@smithy/core/protocols - ${this.constructor.name} getDefaultContentType() implementation missing.`
216+
);
217+
}
218+
141219
/**
142220
* For HTTP binding protocols, this method is overridden in {@link HttpBindingProtocol}.
143221
*
@@ -172,4 +250,12 @@ export abstract class HttpProtocol implements ClientProtocol<IHttpRequest, IHttp
172250
// It should remain unused.
173251
return [];
174252
}
253+
254+
protected getEventStreamMarshaller(): EventStreamMarshaller {
255+
const context = this.serdeContext as unknown as EventStreamSerdeContext;
256+
if (!context.eventStreamMarshaller) {
257+
throw new Error("@smithy/core - HttpProtocol: eventStreamMarshaller missing in serdeContext.");
258+
}
259+
return context.eventStreamMarshaller;
260+
}
175261
}

packages/core/src/submodules/protocols/RpcProtocol.ts

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,15 @@ export abstract class RpcProtocol extends HttpProtocol {
5656
};
5757

5858
if (input) {
59-
serializer.write(schema, _input);
60-
payload = serializer.flush() as Uint8Array;
59+
const eventStreamMember = this.getEventStreamMember(ns);
60+
61+
if (eventStreamMember) {
62+
// todo(schema): serialize event stream payload (RPC).
63+
throw new Error("serialization of event streams is not yet implemented");
64+
} else {
65+
serializer.write(schema, _input);
66+
payload = serializer.flush() as Uint8Array;
67+
}
6168
}
6269

6370
request.headers = headers;
@@ -93,16 +100,35 @@ export abstract class RpcProtocol extends HttpProtocol {
93100
response.headers[header.toLowerCase()] = value;
94101
}
95102

96-
const bytes: Uint8Array = await collectBody(response.body, context as SerdeFunctions);
97-
if (bytes.byteLength > 0) {
98-
Object.assign(dataObject, await deserializer.read(ns, bytes));
103+
const eventStreamMember = this.getEventStreamMember(ns);
104+
105+
if (eventStreamMember) {
106+
// todo(schema): assign other dataObject fields from initial-response.
107+
dataObject[eventStreamMember] = this.deserializeEventStream({
108+
response,
109+
unionSchema: ns.getMemberSchema(eventStreamMember),
110+
});
111+
} else {
112+
const bytes: Uint8Array = await collectBody(response.body, context as SerdeFunctions);
113+
if (bytes.byteLength > 0) {
114+
Object.assign(dataObject, await deserializer.read(ns, bytes));
115+
}
99116
}
100117

101-
const output: Output = {
102-
$metadata: this.deserializeMetadata(response),
103-
...dataObject,
104-
};
118+
dataObject.$metadata = this.deserializeMetadata(response);
119+
return dataObject;
120+
}
121+
122+
protected getDefaultContentType(): string {
123+
return "application/cbor";
124+
}
105125

106-
return output;
126+
protected getEventStreamMember(struct: NormalizedSchema): string {
127+
for (const [memberName, memberSchema] of struct.structIterator()) {
128+
if (memberSchema.isStreaming() && memberSchema.isStructSchema()) {
129+
return memberName;
130+
}
131+
}
132+
return "";
107133
}
108134
}

0 commit comments

Comments
 (0)