Skip to content

Commit bbd7a2f

Browse files
committed
wip: initial message testing
1 parent cef7e77 commit bbd7a2f

File tree

14 files changed

+667
-269
lines changed

14 files changed

+667
-269
lines changed

packages/core/eventStreams.d.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
/**
2+
* Do not edit:
3+
* This is a compatibility redirect for contexts that do not understand package.json exports field.
4+
*/
5+
declare module "@smithy/core/eventStreams" {
6+
export * from "@smithy/core/dist-types/submodules/eventStreams/index.d";
7+
}

packages/core/eventStreams.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
/**
3+
* Do not edit:
4+
* This is a compatibility redirect for contexts that do not understand package.json exports field.
5+
*/
6+
module.exports = require("./dist-cjs/submodules/eventStreams/index.js");

packages/core/package.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@
6060
"node": "./dist-cjs/submodules/schema/index.js",
6161
"import": "./dist-es/submodules/schema/index.js",
6262
"require": "./dist-cjs/submodules/schema/index.js"
63+
},
64+
"./eventStreams": {
65+
"types": "./dist-types/submodules/eventStreams/index.d.ts",
66+
"module": "./dist-es/submodules/eventStreams/index.js",
67+
"node": "./dist-cjs/submodules/eventStreams/index.js",
68+
"import": "./dist-es/submodules/eventStreams/index.js",
69+
"require": "./dist-cjs/submodules/eventStreams/index.js"
6370
}
6471
},
6572
"author": {
@@ -95,6 +102,8 @@
95102
"files": [
96103
"./cbor.d.ts",
97104
"./cbor.js",
105+
"./eventStreams.d.ts",
106+
"./eventStreams.js",
98107
"./protocols.d.ts",
99108
"./protocols.js",
100109
"./schema.d.ts",
Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
import { FromStringShapeDeserializer, ToStringShapeSerializer } from "@smithy/core/protocols";
2+
import { NormalizedSchema, SCHEMA, struct } from "@smithy/core/schema";
3+
import { HttpResponse } from "@smithy/protocol-http";
4+
import { Schema, SerdeFunctions, ShapeDeserializer } from "@smithy/types";
5+
import { toUtf8 } from "@smithy/util-utf8";
6+
import { Readable } from "node:stream";
7+
import { describe, expect, test as it, vi } from "vitest";
8+
9+
import { EventStreamSerde } from "./EventStreamSerde";
10+
11+
class StructStringDeserializer implements ShapeDeserializer {
12+
private fromString = new FromStringShapeDeserializer({
13+
httpBindings: true,
14+
timestampFormat: { default: 7, useTrait: true },
15+
});
16+
17+
public read(schema: Schema, data: any): any {
18+
const ns = NormalizedSchema.of(schema);
19+
if (ns.isStructSchema()) {
20+
const output = {} as any;
21+
for (const [m, s] of ns.structIterator()) {
22+
output[m] = this.fromString.read(s, data[m]);
23+
}
24+
return output;
25+
}
26+
27+
return this.fromString.read(schema, data);
28+
}
29+
30+
public setSerdeContext(serdeContext: SerdeFunctions): void {}
31+
}
32+
33+
describe(EventStreamSerde.name, () => {
34+
describe("event stream serde", () => {
35+
// this represents elements injected by the HttpProtocol caller.
36+
const impl = {
37+
serializer: new ToStringShapeSerializer({
38+
timestampFormat: { default: 7, useTrait: true },
39+
}),
40+
deserializer: new StructStringDeserializer(),
41+
getEventStreamMarshaller() {
42+
return this.serdeContext.eventStreamMarshaller;
43+
},
44+
serdeContext: {
45+
eventStreamMarshaller: {
46+
serialize: vi.fn().mockImplementation((eventStream, eventStreamSerializationFn) => {
47+
return Readable.from({
48+
async *[Symbol.asyncIterator]() {
49+
for await (const inputEvent of eventStream) {
50+
yield eventStreamSerializationFn(inputEvent);
51+
}
52+
},
53+
});
54+
}),
55+
deserialize: vi.fn().mockImplementation((body, eventStreamDeserializationFn) => {
56+
return {
57+
async *[Symbol.asyncIterator]() {
58+
for await (const outputEvent of body) {
59+
yield eventStreamDeserializationFn(outputEvent);
60+
}
61+
},
62+
};
63+
}),
64+
},
65+
},
66+
getDefaultContentType() {
67+
return "unit/test";
68+
},
69+
};
70+
71+
const eventStreamSerde = new EventStreamSerde({
72+
marshaller: impl.getEventStreamMarshaller(),
73+
serializer: impl.serializer,
74+
deserializer: impl.deserializer,
75+
defaultContentType: impl.getDefaultContentType(),
76+
});
77+
78+
const serializeEventStream = eventStreamSerde.serializeEventStream.bind(eventStreamSerde);
79+
const deserializeEventStream = eventStreamSerde.deserializeEventStream.bind(eventStreamSerde);
80+
81+
const eventStreamUnionSchema = struct(
82+
"ns",
83+
"EventStreamStructure",
84+
{ streaming: 1 },
85+
["A", "B", "C"],
86+
[struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])]
87+
);
88+
89+
const eventStreamContainerSchema = struct(
90+
"ns",
91+
"EventStreamContainer",
92+
0,
93+
["eventStreamMember", "dateMember", "blobMember"],
94+
[eventStreamUnionSchema, SCHEMA.TIMESTAMP_EPOCH_SECONDS, SCHEMA.BLOB]
95+
);
96+
97+
it("serializes event streams", async () => {
98+
const eventStream = {
99+
async *[Symbol.asyncIterator]() {
100+
yield { A: { name: "a" } };
101+
yield { B: { name: "b" } };
102+
yield { C: { name: "c" } };
103+
yield { $unknown: ["D", { name: "d" }] };
104+
},
105+
};
106+
107+
const requestBody = await serializeEventStream({
108+
eventStream,
109+
requestSchema: NormalizedSchema.of(eventStreamContainerSchema),
110+
});
111+
112+
const collect = [];
113+
for await (const chunk of requestBody) {
114+
collect.push(chunk);
115+
}
116+
expect(
117+
collect.map((item) => {
118+
return {
119+
headers: item.headers,
120+
body: toUtf8(item.body).replace(/\s+/g, ""),
121+
};
122+
})
123+
).toEqual([
124+
{
125+
headers: {
126+
":event-type": { type: "string", value: "A" },
127+
":message-type": { type: "string", value: "event" },
128+
":content-type": { type: "string", value: "unit/test" },
129+
},
130+
body: `{"name":"a"}`,
131+
},
132+
{
133+
headers: {
134+
":event-type": { type: "string", value: "B" },
135+
":message-type": { type: "string", value: "event" },
136+
":content-type": { type: "string", value: "unit/test" },
137+
},
138+
body: `{"name":"b"}`,
139+
},
140+
{
141+
headers: {
142+
":event-type": { type: "string", value: "C" },
143+
":message-type": { type: "string", value: "event" },
144+
":content-type": { type: "string", value: "unit/test" },
145+
},
146+
body: `{"name":"c"}`,
147+
},
148+
{
149+
headers: {
150+
":event-type": { type: "string", value: "D" },
151+
":message-type": { type: "string", value: "event" },
152+
":content-type": { type: "string", value: "unit/test" },
153+
},
154+
body: `{"name":"d"}`,
155+
},
156+
]);
157+
});
158+
159+
it("deserializes event streams", async () => {
160+
const response = new HttpResponse({
161+
statusCode: 200,
162+
body: {
163+
async *[Symbol.asyncIterator]() {
164+
yield { A: { headers: {}, body: { name: "a" } } };
165+
yield { B: { headers: {}, body: { name: "b" } } };
166+
yield { C: { headers: {}, body: { name: "c" } } };
167+
yield { D: { headers: {}, body: { name: "d" } } };
168+
},
169+
},
170+
});
171+
172+
const asyncIterable = await deserializeEventStream({
173+
response,
174+
responseSchema: NormalizedSchema.of(eventStreamContainerSchema),
175+
});
176+
177+
const collect = [];
178+
for await (const event of asyncIterable) {
179+
collect.push(event);
180+
}
181+
expect(collect).toEqual([
182+
{ A: { name: `a` } },
183+
{ B: { name: `b` } },
184+
{ C: { name: `c` } },
185+
{ $unknown: { D: { headers: {}, body: { name: "d" } } } },
186+
]);
187+
});
188+
189+
it("serializes event streams containing an initial-request", async () => {
190+
const eventStream = {
191+
async *[Symbol.asyncIterator]() {
192+
yield { A: { name: "a" } };
193+
yield { B: { name: "b" } };
194+
yield { C: { name: "c" } };
195+
yield { $unknown: ["D", { name: "d" }] };
196+
},
197+
};
198+
199+
const requestBody = await serializeEventStream({
200+
eventStream,
201+
requestSchema: NormalizedSchema.of(eventStreamContainerSchema),
202+
initialRequest: {
203+
dateMember: new Date(0),
204+
blobMember: new Uint8Array([0, 1, 2, 3]),
205+
},
206+
});
207+
208+
const collect = [];
209+
for await (const chunk of requestBody) {
210+
collect.push(chunk);
211+
}
212+
expect(
213+
collect.map((item) => {
214+
return {
215+
headers: item.headers,
216+
body: toUtf8(item.body).replace(/\s+/g, ""),
217+
};
218+
})
219+
).toEqual([
220+
{
221+
headers: {
222+
":event-type": { type: "string", value: "initial-request" },
223+
":message-type": { type: "string", value: "event" },
224+
":content-type": { type: "string", value: "unit/test" },
225+
},
226+
body: `{"dateMember":"1970-01-01T00:00:00.000Z","blobMember":{"0":0,"1":1,"2":2,"3":3}}`,
227+
},
228+
{
229+
headers: {
230+
":event-type": { type: "string", value: "A" },
231+
":message-type": { type: "string", value: "event" },
232+
":content-type": { type: "string", value: "unit/test" },
233+
},
234+
body: `{"name":"a"}`,
235+
},
236+
{
237+
headers: {
238+
":event-type": { type: "string", value: "B" },
239+
":message-type": { type: "string", value: "event" },
240+
":content-type": { type: "string", value: "unit/test" },
241+
},
242+
body: `{"name":"b"}`,
243+
},
244+
{
245+
headers: {
246+
":event-type": { type: "string", value: "C" },
247+
":message-type": { type: "string", value: "event" },
248+
":content-type": { type: "string", value: "unit/test" },
249+
},
250+
body: `{"name":"c"}`,
251+
},
252+
{
253+
headers: {
254+
":event-type": { type: "string", value: "D" },
255+
":message-type": { type: "string", value: "event" },
256+
":content-type": { type: "string", value: "unit/test" },
257+
},
258+
body: `{"name":"d"}`,
259+
},
260+
]);
261+
});
262+
263+
it("deserializes event streams containing an initial-response", async () => {
264+
const response = new HttpResponse({
265+
statusCode: 200,
266+
body: {
267+
async *[Symbol.asyncIterator]() {
268+
yield {
269+
"initial-response": {
270+
headers: {},
271+
body: { dateMember: "0", blobMember: "AAECAw==" },
272+
},
273+
};
274+
yield { A: { headers: {}, body: { name: "a" } } };
275+
yield { B: { headers: {}, body: { name: "b" } } };
276+
yield { C: { headers: {}, body: { name: "c" } } };
277+
yield { D: { headers: {}, body: { name: "d" } } };
278+
},
279+
},
280+
});
281+
282+
const initialResponseContainer = {} as any;
283+
284+
const asyncIterable = await deserializeEventStream({
285+
response,
286+
responseSchema: NormalizedSchema.of(eventStreamContainerSchema),
287+
initialResponseContainer,
288+
});
289+
290+
const collect = [];
291+
for await (const event of asyncIterable) {
292+
collect.push(event);
293+
}
294+
expect(collect).toEqual([
295+
{ A: { name: `a` } },
296+
{ B: { name: `b` } },
297+
{ C: { name: `c` } },
298+
{ $unknown: { D: { headers: {}, body: { name: "d" } } } },
299+
]);
300+
expect(initialResponseContainer).toEqual({
301+
dateMember: new Date(0),
302+
blobMember: new Uint8Array([0, 1, 2, 3]),
303+
});
304+
});
305+
});
306+
});

0 commit comments

Comments
 (0)