Skip to content

Commit 36461d4

Browse files
committed
feat(core/protocols): eventstreams for schema serde
1 parent 1cab1f5 commit 36461d4

File tree

10 files changed

+403
-52
lines changed

10 files changed

+403
-52
lines changed

.changeset/shy-hornets-care.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@smithy/core": minor
3+
---
4+
5+
schema serde eventstreams implementation

packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ export class SmithyRpcV2CborProtocol extends RpcProtocol {
4343
): Promise<IHttpRequest> {
4444
const request = await super.serializeRequest(operationSchema, input, context);
4545
Object.assign(request.headers, {
46-
"content-type": "application/cbor",
46+
"content-type": this.getDefaultContentType(),
4747
"smithy-protocol": "rpc-v2-cbor",
48-
accept: "application/cbor",
48+
accept: this.getDefaultContentType(),
4949
});
5050
if (deref(operationSchema.input) === "unit") {
5151
delete request.body;
@@ -113,4 +113,8 @@ export class SmithyRpcV2CborProtocol extends RpcProtocol {
113113

114114
throw exception;
115115
}
116+
117+
protected getDefaultContentType(): string {
118+
return "application/cbor";
119+
}
116120
}

packages/core/src/submodules/protocols/HttpBindingProtocol.ts

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { HttpRequest } from "@smithy/protocol-http";
44
import {
55
Endpoint,
66
EndpointBearer,
7-
EventStreamSerdeContext,
87
HandlerExecutionContext,
98
HttpRequest as IHttpRequest,
109
HttpResponse as IHttpResponse,
@@ -86,8 +85,12 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
8685
if (isStreaming) {
8786
const isEventStream = memberNs.isStructSchema();
8887
if (isEventStream) {
89-
// todo(schema)
90-
throw new Error("serialization of event streams is not yet implemented");
88+
if (input[memberName]) {
89+
payload = this.serializeEventStream({
90+
eventStream: input[memberName],
91+
unionSchema: memberNs,
92+
});
93+
}
9194
} else {
9295
// streaming blob body
9396
payload = inputMemberValue;
@@ -132,6 +135,9 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
132135
if (hasNonHttpBindingMember && input) {
133136
serializer.write(schema, input);
134137
payload = serializer.flush() as Uint8Array;
138+
139+
// Due to Smithy validation, we can assume that the members with no HTTP
140+
// bindings DO NOT contain an event stream.
135141
}
136142

137143
request.headers = headers;
@@ -221,14 +227,12 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
221227
dataObject[member] = dataFromBody[member];
222228
}
223229
}
230+
// Due to Smithy validation, we can assume that the members with no HTTP
231+
// bindings DO NOT contain an event stream.
224232
}
225233

226-
const output: Output = {
227-
$metadata: this.deserializeMetadata(response),
228-
...dataObject,
229-
};
230-
231-
return output;
234+
dataObject.$metadata = this.deserializeMetadata(response);
235+
return dataObject;
232236
}
233237

234238
/**
@@ -276,33 +280,13 @@ export abstract class HttpBindingProtocol extends HttpProtocol {
276280
if (isStreaming) {
277281
const isEventStream = memberSchema.isStructSchema();
278282
if (isEventStream) {
279-
// streaming event stream (union)
280-
const context = this.serdeContext as unknown as EventStreamSerdeContext;
281-
if (!context.eventStreamMarshaller) {
282-
throw new Error("@smithy/core - HttpProtocol: eventStreamMarshaller missing in serdeContext.");
283-
}
284-
const memberSchemas = memberSchema.getMemberSchemas();
285-
dataObject[memberName] = context.eventStreamMarshaller.deserialize(response.body, async (event) => {
286-
const unionMember =
287-
Object.keys(event).find((key) => {
288-
return key !== "__type";
289-
}) ?? "";
290-
if (unionMember in memberSchemas) {
291-
const eventStreamSchema = memberSchemas[unionMember];
292-
return {
293-
[unionMember]: await deserializer.read(eventStreamSchema, event[unionMember].body),
294-
};
295-
} else {
296-
// todo(schema): This union convention is ignored by the event stream marshaller.
297-
// todo(schema): This should be returned to the user instead.
298-
// see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts
299-
return {
300-
$unknown: event,
301-
};
302-
}
283+
// event stream (union)
284+
dataObject[memberName] = this.deserializeEventStream({
285+
response,
286+
unionSchema: memberSchema,
303287
});
304288
} else {
305-
// streaming blob body
289+
// data stream (blob)
306290
dataObject[memberName] = sdkStreamMixin(response.body);
307291
}
308292
} else if (response.body) {

packages/core/src/submodules/protocols/HttpProtocol.spec.ts

Lines changed: 151 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import { map, SCHEMA, struct } from "@smithy/core/schema";
2+
import { HttpResponse } from "@smithy/protocol-http";
23
import { HandlerExecutionContext, HttpResponse as IHttpResponse, Schema, SerdeFunctions } from "@smithy/types";
3-
import { describe, expect, test as it } from "vitest";
4+
import { toUtf8 } from "@smithy/util-utf8";
5+
import { Readable } from "node:stream";
6+
import { describe, expect, test as it, vi } from "vitest";
47

8+
import { NormalizedSchema } from "../schema";
59
import { HttpProtocol } from "./HttpProtocol";
610
import { FromStringShapeDeserializer } from "./serde/FromStringShapeDeserializer";
11+
import { ToStringShapeSerializer } from "./serde/ToStringShapeSerializer";
712

813
describe(HttpProtocol.name, () => {
914
it("ignores http bindings (only HttpBindingProtocol uses them)", async () => {
@@ -49,4 +54,149 @@ describe(HttpProtocol.name, () => {
4954
// headers were ignored
5055
});
5156
});
57+
58+
describe("event stream serde", () => {
59+
const impl = {
60+
serializer: new ToStringShapeSerializer({
61+
timestampFormat: { default: 7, useTrait: true },
62+
}),
63+
deserializer: new FromStringShapeDeserializer({
64+
httpBindings: true,
65+
timestampFormat: { default: 7, useTrait: true },
66+
}),
67+
getEventStreamMarshaller() {
68+
return this.serdeContext.eventStreamMarshaller;
69+
},
70+
serdeContext: {
71+
eventStreamMarshaller: {
72+
serialize: vi.fn().mockImplementation((eventStream, eventStreamSerializationFn) => {
73+
return Readable.from({
74+
async *[Symbol.asyncIterator]() {
75+
for await (const inputEvent of eventStream) {
76+
yield eventStreamSerializationFn(inputEvent);
77+
}
78+
},
79+
});
80+
}),
81+
deserialize: vi.fn().mockImplementation((body, eventStreamDeserializationFn) => {
82+
return {
83+
async *[Symbol.asyncIterator]() {
84+
for await (const outputEvent of body) {
85+
yield eventStreamDeserializationFn(outputEvent);
86+
}
87+
},
88+
};
89+
}),
90+
},
91+
},
92+
getDefaultContentType() {
93+
return "unit/test";
94+
},
95+
};
96+
97+
const serializeEventStream = (HttpProtocol.prototype as any).serializeEventStream.bind(impl);
98+
const deserializeEventStream = (HttpProtocol.prototype as any).deserializeEventStream.bind(impl);
99+
100+
const eventStreamUnionSchema = struct(
101+
"ns",
102+
"EventStreamStructure",
103+
{ streaming: 1 },
104+
["A", "B", "C"],
105+
[struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])]
106+
);
107+
108+
it("serializes event streams", async () => {
109+
const eventStream = {
110+
async *[Symbol.asyncIterator]() {
111+
yield { A: { name: "a" } };
112+
yield { B: { name: "b" } };
113+
yield { C: { name: "c" } };
114+
yield { $unknown: ["D", { name: "d" }] };
115+
},
116+
};
117+
const unionSchema = NormalizedSchema.of(eventStreamUnionSchema);
118+
119+
const requestBody = serializeEventStream({
120+
eventStream,
121+
unionSchema,
122+
});
123+
124+
const collect = [];
125+
for await (const chunk of requestBody) {
126+
collect.push(chunk);
127+
}
128+
expect(
129+
collect.map((item) => {
130+
return {
131+
headers: item.headers,
132+
body: toUtf8(item.body).replace(/\s+/g, ""),
133+
};
134+
})
135+
).toEqual([
136+
{
137+
headers: {
138+
":event-type": { type: "string", value: "A" },
139+
":message-type": { type: "string", value: "event" },
140+
":content-type": { type: "string", value: "unit/test" },
141+
},
142+
body: `{"name":"a"}`,
143+
},
144+
{
145+
headers: {
146+
":event-type": { type: "string", value: "B" },
147+
":message-type": { type: "string", value: "event" },
148+
":content-type": { type: "string", value: "unit/test" },
149+
},
150+
body: `{"name":"b"}`,
151+
},
152+
{
153+
headers: {
154+
":event-type": { type: "string", value: "C" },
155+
":message-type": { type: "string", value: "event" },
156+
":content-type": { type: "string", value: "unit/test" },
157+
},
158+
body: `{"name":"c"}`,
159+
},
160+
{
161+
headers: {
162+
":event-type": { type: "string", value: "D" },
163+
":message-type": { type: "string", value: "event" },
164+
":content-type": { type: "string", value: "unit/test" },
165+
},
166+
body: `{"name":"d"}`,
167+
},
168+
]);
169+
});
170+
171+
it("deserializes event streams", async () => {
172+
const response = new HttpResponse({
173+
statusCode: 200,
174+
body: {
175+
async *[Symbol.asyncIterator]() {
176+
yield { A: { headers: {}, body: { name: "a" } } };
177+
yield { B: { headers: {}, body: { name: "b" } } };
178+
yield { C: { headers: {}, body: { name: "c" } } };
179+
yield { D: { headers: {}, body: { name: "d" } } };
180+
},
181+
},
182+
});
183+
const unionSchema = NormalizedSchema.of(eventStreamUnionSchema);
184+
185+
const asyncIterable = deserializeEventStream({
186+
response,
187+
unionSchema,
188+
});
189+
190+
const collect = [];
191+
for await (const event of asyncIterable) {
192+
collect.push(event);
193+
}
194+
expect(collect).toEqual([
195+
{ A: { name: "a" } },
196+
{ B: { name: "b" } },
197+
{ C: { name: "c" } },
198+
{ $unknown: { D: { headers: {}, body: { name: "d" } } } },
199+
]);
200+
});
201+
});
52202
});

0 commit comments

Comments
 (0)