Skip to content

Commit 8aa0921

Browse files
committed
feat(core/protocols): eventstreams for schema serde
1 parent 1cab1f5 commit 8aa0921

File tree

9 files changed

+233
-55
lines changed

9 files changed

+233
-55
lines changed

.changeset/shy-hornets-care.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@smithy/core": minor
3+
---
4+
5+
schema serde eventstreams implementation

packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ export class SmithyRpcV2CborProtocol extends RpcProtocol {
4343
): Promise<IHttpRequest> {
4444
const request = await super.serializeRequest(operationSchema, input, context);
4545
Object.assign(request.headers, {
46-
"content-type": "application/cbor",
46+
"content-type": this.getDefaultContentType(),
4747
"smithy-protocol": "rpc-v2-cbor",
48-
accept: "application/cbor",
48+
accept: this.getDefaultContentType(),
4949
});
5050
if (deref(operationSchema.input) === "unit") {
5151
delete request.body;
@@ -113,4 +113,8 @@ export class SmithyRpcV2CborProtocol extends RpcProtocol {
113113

114114
throw exception;
115115
}
116+
117+
protected getDefaultContentType(): string {
118+
return "application/cbor";
119+
}
116120
}

packages/core/src/submodules/protocols/HttpBindingProtocol.ts

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { HttpRequest } from "@smithy/protocol-http";
44
import {
55
Endpoint,
66
EndpointBearer,
7-
EventStreamSerdeContext,
87
HandlerExecutionContext,
98
HttpRequest as IHttpRequest,
109
HttpResponse as IHttpResponse,
@@ -86,8 +85,12 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
8685
if (isStreaming) {
8786
const isEventStream = memberNs.isStructSchema();
8887
if (isEventStream) {
89-
// todo(schema)
90-
throw new Error("serialization of event streams is not yet implemented");
88+
if (input[memberName]) {
89+
payload = this.serializeEventStream({
90+
eventStream: input[memberName],
91+
unionSchema: memberNs,
92+
});
93+
}
9194
} else {
9295
// streaming blob body
9396
payload = inputMemberValue;
@@ -132,6 +135,9 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
132135
if (hasNonHttpBindingMember && input) {
133136
serializer.write(schema, input);
134137
payload = serializer.flush() as Uint8Array;
138+
139+
// Due to Smithy validation, we can assume that the members with no HTTP
140+
// bindings DO NOT contain an event stream.
135141
}
136142

137143
request.headers = headers;
@@ -221,14 +227,12 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
221227
dataObject[member] = dataFromBody[member];
222228
}
223229
}
230+
// Due to Smithy validation, we can assume that the members with no HTTP
231+
// bindings DO NOT contain an event stream.
224232
}
225233

226-
const output: Output = {
227-
$metadata: this.deserializeMetadata(response),
228-
...dataObject,
229-
};
230-
231-
return output;
234+
dataObject.$metadata = this.deserializeMetadata(response);
235+
return dataObject;
232236
}
233237

234238
/**
@@ -276,33 +280,13 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
276280
if (isStreaming) {
277281
const isEventStream = memberSchema.isStructSchema();
278282
if (isEventStream) {
279-
// streaming event stream (union)
280-
const context = this.serdeContext as unknown as EventStreamSerdeContext;
281-
if (!context.eventStreamMarshaller) {
282-
throw new Error("@smithy/core - HttpProtocol: eventStreamMarshaller missing in serdeContext.");
283-
}
284-
const memberSchemas = memberSchema.getMemberSchemas();
285-
dataObject[memberName] = context.eventStreamMarshaller.deserialize(response.body, async (event) => {
286-
const unionMember =
287-
Object.keys(event).find((key) => {
288-
return key !== "__type";
289-
}) ?? "";
290-
if (unionMember in memberSchemas) {
291-
const eventStreamSchema = memberSchemas[unionMember];
292-
return {
293-
[unionMember]: await deserializer.read(eventStreamSchema, event[unionMember].body),
294-
};
295-
} else {
296-
// todo(schema): This union convention is ignored by the event stream marshaller.
297-
// todo(schema): This should be returned to the user instead.
298-
// see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts
299-
return {
300-
$unknown: event,
301-
};
302-
}
283+
// event stream (union)
284+
dataObject[memberName] = this.deserializeEventStream({
285+
response,
286+
unionSchema: memberSchema,
303287
});
304288
} else {
305-
// streaming blob body
289+
// data stream (blob)
306290
dataObject[memberName] = sdkStreamMixin(response.body);
307291
}
308292
} else if (response.body) {

packages/core/src/submodules/protocols/HttpProtocol.ts

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
import { NormalizedSchema } from "@smithy/core/schema";
2+
import { SCHEMA } from "@smithy/core/schema";
23
import { HttpRequest, HttpResponse } from "@smithy/protocol-http";
34
import {
45
ClientProtocol,
56
Codec,
67
Endpoint,
78
EndpointBearer,
89
EndpointV2,
10+
EventStreamMarshaller,
11+
EventStreamSerdeContext,
912
HandlerExecutionContext,
1013
HttpRequest as IHttpRequest,
1114
HttpResponse as IHttpResponse,
15+
Message as EventStreamMessage,
16+
MessageHeaders,
17+
MessageHeaderValue,
1218
MetadataBearer,
1319
OperationSchema,
1420
ResponseMetadata,
@@ -17,6 +23,7 @@ import {
1723
ShapeDeserializer,
1824
ShapeSerializer,
1925
} from "@smithy/types";
26+
import { fromUtf8 } from "@smithy/util-utf8";
2027

2128
/**
2229
* Abstract base for HTTP-based client protocols.
@@ -138,6 +145,114 @@ export abstract class HttpProtocol implements ClientProtocol<IHttpRequest, IHttp
138145
};
139146
}
140147

148+
protected serializeEventStream({
149+
eventStream,
150+
unionSchema,
151+
}: {
152+
eventStream: AsyncIterable<any>;
153+
unionSchema: NormalizedSchema;
154+
}) {
155+
const marshaller = this.getEventStreamMarshaller();
156+
const memberSchemas = unionSchema.getMemberSchemas();
157+
158+
return marshaller.serialize(eventStream, (event: any): EventStreamMessage => {
159+
const unionMember =
160+
Object.keys(event).find((key) => {
161+
return key !== "__type";
162+
}) ?? "";
163+
const eventStreamSchema = memberSchemas[unionMember] ?? NormalizedSchema.of(SCHEMA.DOCUMENT);
164+
165+
this.serializer.write(eventStreamSchema, event);
166+
const messageSerialization = this.serializer.flush();
167+
168+
const body =
169+
typeof messageSerialization === "string"
170+
? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization)
171+
: messageSerialization;
172+
173+
const headers: MessageHeaders = {
174+
":event-type": { type: "string", value: unionMember },
175+
":message-type": { type: "string", value: "event" },
176+
":content-type": { type: "string", value: this.getDefaultContentType() },
177+
};
178+
179+
// additional trait-annotated event headers.
180+
for (const [memberName, memberSchema] of eventStreamSchema.structIterator()) {
181+
const isHeader = !!memberSchema.getMergedTraits().eventHeader;
182+
if (!isHeader) {
183+
continue;
184+
}
185+
const value = event[memberName];
186+
let type = "binary" as MessageHeaderValue["type"];
187+
if (memberSchema.isNumericSchema()) {
188+
if ((-2) ** 31 <= value && value <= 2 ** 31 - 1) {
189+
type = "integer";
190+
} else {
191+
type = "long";
192+
}
193+
} else if (memberSchema.isTimestampSchema()) {
194+
type = "timestamp";
195+
} else if (memberSchema.isStringSchema()) {
196+
type = "string";
197+
} else if (memberSchema.isBooleanSchema()) {
198+
type = "boolean";
199+
}
200+
201+
if (isHeader && value != undefined) {
202+
headers[memberName] = {
203+
type,
204+
value,
205+
};
206+
}
207+
}
208+
209+
return {
210+
headers,
211+
body,
212+
};
213+
});
214+
}
215+
216+
protected deserializeEventStream({
217+
response,
218+
unionSchema,
219+
}: {
220+
response: IHttpResponse;
221+
unionSchema: NormalizedSchema;
222+
}): AsyncIterable<{ [key: string]: any; $unknown?: unknown }> {
223+
const marshaller = this.getEventStreamMarshaller();
224+
const memberSchemas = unionSchema.getMemberSchemas();
225+
226+
return marshaller.deserialize(response.body, async (event) => {
227+
const unionMember =
228+
Object.keys(event).find((key) => {
229+
return key !== "__type";
230+
}) ?? "";
231+
if (unionMember in memberSchemas) {
232+
const eventStreamSchema = memberSchemas[unionMember];
233+
return {
234+
[unionMember]: await this.deserializer.read(eventStreamSchema, event[unionMember].body),
235+
};
236+
} else {
237+
// todo(schema): This union convention is ignored by the event stream marshaller.
238+
// todo(schema): This should be returned to the user instead.
239+
// see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts
240+
return {
241+
$unknown: event,
242+
};
243+
}
244+
});
245+
}
246+
247+
/**
248+
* @returns content-type default header value for event stream events and other documents.
249+
*/
250+
protected getDefaultContentType(): string {
251+
throw new Error(
252+
`@smithy/core/protocols - ${this.constructor.name} getDefaultContentType() implementation missing.`
253+
);
254+
}
255+
141256
/**
142257
* For HTTP binding protocols, this method is overridden in {@link HttpBindingProtocol}.
143258
*
@@ -172,4 +287,12 @@ export abstract class HttpProtocol implements ClientProtocol<IHttpRequest, IHttp
172287
// It should remain unused.
173288
return [];
174289
}
290+
291+
protected getEventStreamMarshaller(): EventStreamMarshaller {
292+
const context = this.serdeContext as unknown as EventStreamSerdeContext;
293+
if (!context.eventStreamMarshaller) {
294+
throw new Error("@smithy/core - HttpProtocol: eventStreamMarshaller missing in serdeContext.");
295+
}
296+
return context.eventStreamMarshaller;
297+
}
175298
}

packages/core/src/submodules/protocols/RpcProtocol.ts

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { NormalizedSchema, SCHEMA } from "@smithy/core/schema";
22
import { HttpRequest } from "@smithy/protocol-http";
3-
import {
3+
import type {
44
Endpoint,
55
EndpointBearer,
66
HandlerExecutionContext,
@@ -56,8 +56,19 @@ export abstract class RpcProtocol extends HttpProtocol {
5656
};
5757

5858
if (input) {
59-
serializer.write(schema, _input);
60-
payload = serializer.flush() as Uint8Array;
59+
const eventStreamMember = ns.getEventStreamMember();
60+
61+
if (eventStreamMember) {
62+
if (_input[eventStreamMember]) {
63+
payload = this.serializeEventStream({
64+
eventStream: _input[eventStreamMember],
65+
unionSchema: ns.getMemberSchema(eventStreamMember),
66+
});
67+
}
68+
} else {
69+
serializer.write(schema, _input);
70+
payload = serializer.flush() as Uint8Array;
71+
}
6172
}
6273

6374
request.headers = headers;
@@ -93,16 +104,21 @@ export abstract class RpcProtocol extends HttpProtocol {
93104
response.headers[header.toLowerCase()] = value;
94105
}
95106

96-
const bytes: Uint8Array = await collectBody(response.body, context as SerdeFunctions);
97-
if (bytes.byteLength > 0) {
98-
Object.assign(dataObject, await deserializer.read(ns, bytes));
107+
const eventStreamMember = ns.getEventStreamMember();
108+
if (eventStreamMember) {
109+
// todo(schema): assign other dataObject fields from initial-response.
110+
dataObject[eventStreamMember] = this.deserializeEventStream({
111+
response,
112+
unionSchema: ns.getMemberSchema(eventStreamMember),
113+
});
114+
} else {
115+
const bytes: Uint8Array = await collectBody(response.body, context as SerdeFunctions);
116+
if (bytes.byteLength > 0) {
117+
Object.assign(dataObject, await deserializer.read(ns, bytes));
118+
}
99119
}
100120

101-
const output: Output = {
102-
$metadata: this.deserializeMetadata(response),
103-
...dataObject,
104-
};
105-
106-
return output;
121+
dataObject.$metadata = this.deserializeMetadata(response);
122+
return dataObject;
107123
}
108124
}

packages/core/src/submodules/schema/schemas/NormalizedSchema.spec.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,4 +239,39 @@ describe(NormalizedSchema.name, () => {
239239
}
240240
});
241241
});
242+
243+
describe("event stream detection", () => {
244+
it("should retrieve the event stream member", () => {
245+
const schema = struct(
246+
"ns",
247+
"StructureWithEventStream",
248+
0,
249+
["A", "B", "C", "D", "EventStream"],
250+
[0, 0, 0, 0, struct("ns", "Union", { streaming: 1 }, [], [])]
251+
);
252+
const ns = NormalizedSchema.of(schema);
253+
254+
expect(ns.getEventStreamMember()).toEqual("EventStream");
255+
});
256+
257+
it("should return empty string if no event stream member is present", () => {
258+
const schema = struct(
259+
"ns",
260+
"StructureWithEventStream",
261+
0,
262+
["A", "B", "C", "D", "EventStream"],
263+
[0, 0, 0, 0, struct("ns", "Union", 0, [], [])]
264+
);
265+
const ns = NormalizedSchema.of(schema);
266+
267+
expect(ns.getEventStreamMember()).toEqual("");
268+
});
269+
270+
it("should not throw an exception if the NormalizedSchema is not a structure", () => {
271+
const schema = 0;
272+
const ns = NormalizedSchema.of(schema);
273+
274+
expect(ns.getEventStreamMember()).toEqual("");
275+
});
276+
});
242277
});

0 commit comments

Comments
 (0)