Skip to content

Commit f6bfd66

Browse files
committed
handle eventHeader and eventPayload traits
1 parent 812b849 commit f6bfd66

File tree

5 files changed

+167
-62
lines changed

5 files changed

+167
-62
lines changed

packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { cbor, CborCodec, dateToTag } from "@smithy/core/cbor";
2-
import { NormalizedSchema, SCHEMA, struct } from "@smithy/core/schema";
2+
import { NormalizedSchema, SCHEMA, sim,struct } from "@smithy/core/schema";
33
import { EventStreamMarshaller } from "@smithy/eventstream-serde-node";
44
import { HttpResponse } from "@smithy/protocol-http";
55
import { Message as EventMessage } from "@smithy/types";
@@ -42,9 +42,27 @@ describe(EventStreamSerde.name, () => {
4242
"ns",
4343
"EventStreamStructure",
4444
{ streaming: 1 },
45-
["A", "B", "C"],
45+
["A", "B", "C", "Payload", "CustomHeaders"],
4646
// D is omitted to represent an unknown event.
47-
[struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])]
47+
[
48+
struct("ns", "A", 0, ["name"], [0]),
49+
struct("ns", "B", 0, ["name"], [0]),
50+
struct("ns", "C", 0, ["name"], [0]),
51+
struct(
52+
"ns",
53+
"Payload",
54+
0,
55+
["payload"],
56+
[sim("ns", "StreamingBlobPayload", SCHEMA.STREAMING_BLOB, { eventPayload: 1 })]
57+
),
58+
struct(
59+
"ns",
60+
"CustomHeaders",
61+
0,
62+
["header1", "header2"],
63+
[sim("ns", "EventHeader", 0, { eventHeader: 1 }), sim("ns", "EventHeader", 0, { eventHeader: 1 })]
64+
),
65+
]
4866
);
4967

5068
const eventStreamContainerSchema = struct(
@@ -68,6 +86,8 @@ describe(EventStreamSerde.name, () => {
6886
yield { B: { name: "b" } };
6987
yield { C: { name: "c" } };
7088
yield { $unknown: ["D", { name: "d" }] };
89+
yield { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } };
90+
yield { CustomHeaders: { header1: "h1", header2: "h2" } };
7191
},
7292
};
7393

@@ -104,6 +124,24 @@ describe(EventStreamSerde.name, () => {
104124
},
105125
body: { name: "d" },
106126
},
127+
{
128+
headers: {
129+
":event-type": { type: "string", value: "Payload" },
130+
":message-type": { type: "string", value: "event" },
131+
":content-type": { type: "string", value: "application/octet-stream" },
132+
},
133+
body: new Uint8Array([0, 1, 2, 3, 4, 5, 6]),
134+
},
135+
{
136+
headers: {
137+
":event-type": { type: "string", value: "CustomHeaders" },
138+
":message-type": { type: "string", value: "event" },
139+
":content-type": { type: "string", value: "application/cbor" },
140+
header1: { type: "string", value: "h1" },
141+
header2: { type: "string", value: "h2" },
142+
},
143+
body: {},
144+
},
107145
];
108146

109147
/**
@@ -188,6 +226,8 @@ describe(EventStreamSerde.name, () => {
188226
yield { B: { name: "b" } };
189227
yield { C: { name: "c" } };
190228
yield { D: { name: "d" } };
229+
yield { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } };
230+
yield { CustomHeaders: { header1: "h1", header2: "h2" } };
191231
},
192232
};
193233

@@ -230,6 +270,8 @@ describe(EventStreamSerde.name, () => {
230270
{ C: { name: `c` } },
231271
// todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events.
232272
// $unknownEvent,
273+
{ Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } },
274+
{ CustomHeaders: { header1: "h1", header2: "h2" } },
233275
]);
234276
});
235277

@@ -265,6 +307,8 @@ describe(EventStreamSerde.name, () => {
265307
{ C: { name: `c` } },
266308
// todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events.
267309
// $unknownEvent,
310+
{ Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } },
311+
{ CustomHeaders: { header1: "h1", header2: "h2" } },
268312
]);
269313

270314
expect(initialResponseContainer).toEqual({

packages/core/src/submodules/eventStreams/EventStreamSerde.ts

Lines changed: 96 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -109,65 +109,19 @@ export class EventStreamSerde {
109109
Object.keys(event).find((key) => {
110110
return key !== "__type";
111111
}) ?? "";
112-
const eventStreamSchema = memberSchemas[unionMember] ?? NormalizedSchema.of(SCHEMA.DOCUMENT);
113-
114-
let messageSerialization: string | Uint8Array;
115-
let eventType = unionMember;
116-
117-
if (eventStreamSchema.isStructSchema()) {
118-
serializer.write(eventStreamSchema, event[unionMember]);
119-
messageSerialization = serializer.flush();
120-
} else {
121-
// $unknown member
122-
const [type, value] = event[unionMember];
123-
eventType = type;
124-
serializer.write(NormalizedSchema.of(SCHEMA.DOCUMENT), value);
125-
messageSerialization = serializer.flush();
126-
}
127-
128-
const body =
129-
typeof messageSerialization === "string"
130-
? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization)
131-
: messageSerialization;
112+
const { additionalHeaders, body, eventType, explicitPayloadContentType } = this.writeEventBody(
113+
unionMember,
114+
unionSchema,
115+
event
116+
);
132117

133118
const headers: MessageHeaders = {
134119
":event-type": { type: "string", value: eventType },
135120
":message-type": { type: "string", value: "event" },
136-
":content-type": { type: "string", value: defaultContentType },
121+
":content-type": { type: "string", value: explicitPayloadContentType ?? defaultContentType },
122+
...additionalHeaders,
137123
};
138124

139-
// additional trait-annotated event headers.
140-
if (eventStreamSchema.isStructSchema()) {
141-
for (const [memberName, memberSchema] of eventStreamSchema.structIterator()) {
142-
const isHeader = !!memberSchema.getMergedTraits().eventHeader;
143-
if (!isHeader) {
144-
continue;
145-
}
146-
const value = event[memberName];
147-
let type = "binary" as MessageHeaderValue["type"];
148-
if (memberSchema.isNumericSchema()) {
149-
if ((-2) ** 31 <= value && value <= 2 ** 31 - 1) {
150-
type = "integer";
151-
} else {
152-
type = "long";
153-
}
154-
} else if (memberSchema.isTimestampSchema()) {
155-
type = "timestamp";
156-
} else if (memberSchema.isStringSchema()) {
157-
type = "string";
158-
} else if (memberSchema.isBooleanSchema()) {
159-
type = "boolean";
160-
}
161-
162-
if (isHeader && value != undefined) {
163-
headers[memberName] = {
164-
type,
165-
value,
166-
};
167-
}
168-
}
169-
}
170-
171125
return {
172126
headers,
173127
body,
@@ -263,4 +217,93 @@ export class EventStreamSerde {
263217
},
264218
};
265219
}
220+
221+
/**
222+
* @param unionMember - member name within the structure that contains an event stream union.
223+
* @param unionSchema - schema of the union.
224+
* @param event
225+
*
226+
* @returns the event body (bytes) and event type (string).
227+
*/
228+
private writeEventBody(unionMember: string, unionSchema: NormalizedSchema, event: any) {
229+
const serializer = this.serializer;
230+
let eventType = unionMember;
231+
let explicitPayloadMember = null as null | string;
232+
let explicitPayloadContentType: undefined | string;
233+
234+
const isKnownSchema = unionSchema.hasMemberSchema(unionMember);
235+
const additionalHeaders: MessageHeaders = {};
236+
237+
if (!isKnownSchema) {
238+
// $unknown member
239+
const [type, value] = event[unionMember];
240+
eventType = type;
241+
serializer.write(SCHEMA.DOCUMENT, value);
242+
} else {
243+
const eventSchema = unionSchema.getMemberSchema(unionMember);
244+
245+
if (eventSchema.isStructSchema()) {
246+
for (const [memberName, memberSchema] of eventSchema.structIterator()) {
247+
const { eventHeader, eventPayload } = memberSchema.getMergedTraits();
248+
249+
if (eventPayload) {
250+
explicitPayloadMember = memberName;
251+
break;
252+
} else if (eventHeader) {
253+
const value = event[unionMember][memberName];
254+
let type = "binary" as MessageHeaderValue["type"];
255+
if (memberSchema.isNumericSchema()) {
256+
if ((-2) ** 31 <= value && value <= 2 ** 31 - 1) {
257+
type = "integer";
258+
} else {
259+
type = "long";
260+
}
261+
} else if (memberSchema.isTimestampSchema()) {
262+
type = "timestamp";
263+
} else if (memberSchema.isStringSchema()) {
264+
type = "string";
265+
} else if (memberSchema.isBooleanSchema()) {
266+
type = "boolean";
267+
}
268+
269+
if (value != null) {
270+
additionalHeaders[memberName] = {
271+
type,
272+
value,
273+
};
274+
delete event[unionMember][memberName];
275+
}
276+
}
277+
}
278+
279+
if (explicitPayloadMember !== null) {
280+
const payloadSchema = eventSchema.getMemberSchema(explicitPayloadMember);
281+
if (payloadSchema.isBlobSchema()) {
282+
explicitPayloadContentType = "application/octet-stream";
283+
} else if (payloadSchema.isStringSchema()) {
284+
explicitPayloadContentType = "text/plain";
285+
}
286+
serializer.write(payloadSchema, event[unionMember][explicitPayloadMember]);
287+
} else {
288+
serializer.write(eventSchema, event[unionMember]);
289+
}
290+
} else {
291+
throw new Error("@smithy/core/eventStreams - non-struct member not supported in event stream union.");
292+
}
293+
}
294+
295+
const messageSerialization: string | Uint8Array = serializer.flush();
296+
297+
const body =
298+
typeof messageSerialization === "string"
299+
? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization)
300+
: messageSerialization;
301+
302+
return {
303+
body,
304+
eventType,
305+
explicitPayloadContentType,
306+
additionalHeaders,
307+
};
308+
}
266309
}

packages/core/src/submodules/schema/schemas/NormalizedSchema.spec.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,15 @@ describe(NormalizedSchema.name, () => {
173173
});
174174
});
175175

176+
it("can identify whether a member exists", () => {
177+
expect(ns.hasMemberSchema("list")).toBe(true);
178+
expect(ns.hasMemberSchema("map")).toBe(true);
179+
expect(ns.hasMemberSchema("struct")).toBe(true);
180+
expect(ns.hasMemberSchema("a")).toBe(false);
181+
expect(ns.hasMemberSchema("b")).toBe(false);
182+
expect(ns.hasMemberSchema("c")).toBe(false);
183+
});
184+
176185
describe("iteration", () => {
177186
it("iterates over member schemas", () => {
178187
const iteration = Array.from(ns.structIterator());

packages/core/src/submodules/schema/schemas/NormalizedSchema.ts

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ export class NormalizedSchema implements INormalizedSchema {
3636
* @param ref - a polymorphic SchemaRef to be dereferenced/normalized.
3737
* @param memberName - optional memberName if this NormalizedSchema should be considered a member schema.
3838
*/
39-
public constructor(
40-
private readonly ref: SchemaRef,
41-
private memberName?: string
42-
) {
39+
public constructor(private readonly ref: SchemaRef, private memberName?: string) {
4340
const traitStack = [] as SchemaTraits[];
4441
let _ref = ref;
4542
let schema = ref;
@@ -373,6 +370,18 @@ export class NormalizedSchema implements INormalizedSchema {
373370
throw new Error(`@smithy/core/schema - the schema ${this.getName(true)} does not have a value member.`);
374371
}
375372

373+
/**
374+
* @param member - to query.
375+
* @returns whether there is a memberSchema with the given member name. False if not a structure (or union).
376+
*/
377+
public hasMemberSchema(member: string): boolean {
378+
if (this.isStructSchema()) {
379+
const struct = this.getSchema() as StructureSchema;
380+
return member in struct.members;
381+
}
382+
return false;
383+
}
384+
376385
/**
377386
* @returns the NormalizedSchema for the given member name. The returned instance will return true for `isMemberSchema()`
378387
* and will have the member name given.

smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ final class SchemaTraitFilterIndex implements KnowledgeIndex {
7474
ErrorTrait.ID, // set by the ServiceException runtime classes.
7575
RequiresLengthTrait.ID, // unhandled
7676

77-
EventHeaderTrait.ID,
78-
EventPayloadTrait.ID,
77+
EventHeaderTrait.ID, // @smithy/core/EventStreams::EventStreamSerde
78+
EventPayloadTrait.ID, // @smithy/core/EventStreams::EventStreamSerde
7979

8080
// afaict, HttpErrorTrait is ignored by the client. The discriminator selects the error structure
8181
// but the actual HTTP response status code is used with no particular comparison

0 commit comments

Comments
 (0)