From cef7e775c2fe0611629ae6c1e78a77d88d2aa6db Mon Sep 17 00:00:00 2001 From: George Fu Date: Tue, 26 Aug 2025 14:32:56 -0400 Subject: [PATCH 1/5] feat(core/schema): event stream initial messages --- .changeset/calm-trees-teach.md | 5 + .../core/src/submodules/cbor/CborCodec.ts | 6 +- .../cbor/SmithyRpcV2CborProtocol.spec.ts | 94 ++++++++++++++++++- .../cbor/SmithyRpcV2CborProtocol.ts | 53 ++++++++--- 4 files changed, 139 insertions(+), 19 deletions(-) create mode 100644 .changeset/calm-trees-teach.md diff --git a/.changeset/calm-trees-teach.md b/.changeset/calm-trees-teach.md new file mode 100644 index 00000000000..4cafa391ef0 --- /dev/null +++ b/.changeset/calm-trees-teach.md @@ -0,0 +1,5 @@ +--- +"@smithy/core": patch +--- + +CBOR protocol error handling fallbacks diff --git a/packages/core/src/submodules/cbor/CborCodec.ts b/packages/core/src/submodules/cbor/CborCodec.ts index 0bf6130e978..a3d1f911cb2 100644 --- a/packages/core/src/submodules/cbor/CborCodec.ts +++ b/packages/core/src/submodules/cbor/CborCodec.ts @@ -139,7 +139,11 @@ export class CborShapeDeserializer implements ShapeDeserializer { return this.readValue(schema, data); } - private readValue(_schema: Schema, value: any): any { + /** + * Public because it's called by the protocol implementation to deserialize errors. + * @internal + */ + public readValue(_schema: Schema, value: any): any { const ns = NormalizedSchema.of(_schema); if (ns.isTimestampSchema() && typeof value === "number") { diff --git a/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.spec.ts b/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.spec.ts index 2763e682196..8906feeed96 100644 --- a/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.spec.ts +++ b/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.spec.ts @@ -1,7 +1,7 @@ -import { list, map, SCHEMA, struct } from "@smithy/core/schema"; +import { error, list, map, op, SCHEMA, struct, TypeRegistry } from "@smithy/core/schema"; import { HttpRequest, HttpResponse } from "@smithy/protocol-http"; -import type { SchemaRef } from "@smithy/types"; -import { describe, expect, test as it } from "vitest"; +import { ResponseMetadata, RetryableTrait, SchemaRef } from "@smithy/types"; +import { beforeEach, describe, expect, test as it } from "vitest"; import { cbor } from "./cbor"; import { dateToTag } from "./parseCborBody"; @@ -273,4 +273,92 @@ describe(SmithyRpcV2CborProtocol.name, () => { }); } }); + + describe("error handling", () => { + const protocol = new SmithyRpcV2CborProtocol({ defaultNamespace: "ns" }); + + const operation = op( + "ns", + "OperationWithModeledException", + {}, + struct("ns", "Input", 0, [], []), + struct("ns", "Output", 0, [], []) + ); + + const errorResponse = new HttpResponse({ + statusCode: 400, + headers: {}, + body: cbor.serialize({ + __type: "ns#ModeledException", + modeledProperty: "oh no", + }), + }); + + const serdeContext = {}; + + class ServiceBaseException extends Error { + public readonly $fault: "client" | "server" = "client"; + public $response?: HttpResponse; + public $retryable?: RetryableTrait; + public $metadata: ResponseMetadata = { + httpStatusCode: 400, + }; + } + + class ModeledExceptionCtor extends ServiceBaseException { + public modeledProperty: string = ""; + } + + beforeEach(() => { + TypeRegistry.for("ns").destroy(); + }); + + it("should throw the schema error ctor if one exists", async () => { + // this is for modeled exceptions. + + TypeRegistry.for("ns").register( + "ns#ModeledException", + error("ns", "ModeledException", 0, ["modeledProperty"], [0], ModeledExceptionCtor) + ); + TypeRegistry.for("ns").register( + "smithy.ts.sdk.synthetic.ns#BaseServiceException", + error("smithy.ts.sdk.synthetic.ns", "BaseServiceException", 0, [], [], ServiceBaseException) + ); + + try { + await protocol.deserializeResponse(operation, serdeContext as any, errorResponse); + } catch (e) { + expect(e).toBeInstanceOf(ModeledExceptionCtor); + expect((e as ModeledExceptionCtor).modeledProperty).toEqual("oh no"); + expect(e).toBeInstanceOf(ServiceBaseException); + } + expect.assertions(3); + }); + + it("should throw a base error if available in the namespace, when no error schema is modeled", async () => { + // this is the expected fallback case for all generated clients. + + TypeRegistry.for("ns").register( + "smithy.ts.sdk.synthetic.ns#BaseServiceException", + error("smithy.ts.sdk.synthetic.ns", "BaseServiceException", 0, [], [], ServiceBaseException) + ); + + try { + await protocol.deserializeResponse(operation, serdeContext as any, errorResponse); + } catch (e) { + expect(e).toBeInstanceOf(ServiceBaseException); + } + expect.assertions(1); + }); + + it("should fall back to a generic JS Error as a last resort", async () => { + // this shouldn't happen, but in case the type registry is mutated incorrectly. + try { + await protocol.deserializeResponse(operation, serdeContext as any, errorResponse); + } catch (e) { + expect(e).toBeInstanceOf(Error); + } + expect.assertions(1); + }); + }); }); diff --git a/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.ts b/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.ts index 712b4dea080..63fe378e10f 100644 --- a/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.ts +++ b/packages/core/src/submodules/cbor/SmithyRpcV2CborProtocol.ts @@ -1,5 +1,5 @@ import { RpcProtocol } from "@smithy/core/protocols"; -import { deref, ErrorSchema, OperationSchema, TypeRegistry } from "@smithy/core/schema"; +import { deref, ErrorSchema, NormalizedSchema, OperationSchema, TypeRegistry } from "@smithy/core/schema"; import type { EndpointBearer, HandlerExecutionContext, @@ -87,31 +87,54 @@ export class SmithyRpcV2CborProtocol extends RpcProtocol { dataObject: any, metadata: ResponseMetadata ): Promise { - const error = loadSmithyRpcV2CborErrorCode(response, dataObject) ?? "Unknown"; + const errorName = loadSmithyRpcV2CborErrorCode(response, dataObject) ?? "Unknown"; let namespace = this.options.defaultNamespace; - if (error.includes("#")) { - [namespace] = error.split("#"); + if (errorName.includes("#")) { + [namespace] = errorName.split("#"); } + const errorMetadata = { + $metadata: metadata, + $response: response, + $fault: response.statusCode <= 500 ? ("client" as const) : ("server" as const), + }; + const registry = TypeRegistry.for(namespace); - const errorSchema: ErrorSchema = registry.getSchema(error) as ErrorSchema; - if (!errorSchema) { - // TODO(schema) throw client base exception using the dataObject. - throw new Error("schema not found for " + error); + let errorSchema: ErrorSchema; + try { + errorSchema = registry.getSchema(errorName) as ErrorSchema; + } catch (e) { + if (dataObject.Message) { + dataObject.message = dataObject.Message; + } + const baseExceptionSchema = TypeRegistry.for("smithy.ts.sdk.synthetic." + namespace).getBaseException(); + if (baseExceptionSchema) { + const ErrorCtor = baseExceptionSchema.ctor; + throw Object.assign(new ErrorCtor({ name: errorName }), errorMetadata, dataObject); + } + throw Object.assign(new Error(errorName), errorMetadata, dataObject); } + const ns = NormalizedSchema.of(errorSchema); const message = dataObject.message ?? dataObject.Message ?? "Unknown"; const exception = new errorSchema.ctor(message); - Object.assign(exception, { - $metadata: metadata, - $response: response, - message, - ...dataObject, - }); - throw exception; + const output = {} as any; + for (const [name, member] of ns.structIterator()) { + output[name] = this.deserializer.readValue(member, dataObject[name]); + } + + throw Object.assign( + exception, + errorMetadata, + { + $fault: ns.getMergedTraits().error, + message, + }, + output + ); } protected getDefaultContentType(): string { From bbd7a2f59e0ce23727724e6670437eae4815234a Mon Sep 17 00:00:00 2001 From: George Fu Date: Tue, 26 Aug 2025 16:53:18 -0400 Subject: [PATCH 2/5] wip: initial message testing --- packages/core/eventStreams.d.ts | 7 + packages/core/eventStreams.js | 6 + packages/core/package.json | 9 + .../eventStreams/EventStreamSerde.spec.ts | 306 ++++++++++++++++++ .../eventStreams/EventStreamSerde.ts | 262 +++++++++++++++ .../core/src/submodules/eventStreams/index.ts | 1 + .../protocols/HttpBindingProtocol.ts | 14 +- .../submodules/protocols/HttpProtocol.spec.ts | 153 +-------- .../src/submodules/protocols/HttpProtocol.ts | 149 +++------ .../src/submodules/protocols/RpcProtocol.ts | 19 +- .../core/src/submodules/protocols/index.ts | 1 + packages/core/tsconfig.cjs.json | 3 +- packages/core/tsconfig.es.json | 3 +- packages/core/tsconfig.types.json | 3 +- 14 files changed, 667 insertions(+), 269 deletions(-) create mode 100644 packages/core/eventStreams.d.ts create mode 100644 packages/core/eventStreams.js create mode 100644 packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts create mode 100644 packages/core/src/submodules/eventStreams/EventStreamSerde.ts create mode 100644 packages/core/src/submodules/eventStreams/index.ts diff --git a/packages/core/eventStreams.d.ts b/packages/core/eventStreams.d.ts new file mode 100644 index 00000000000..88f25af8bc7 --- /dev/null +++ b/packages/core/eventStreams.d.ts @@ -0,0 +1,7 @@ +/** + * Do not edit: + * This is a compatibility redirect for contexts that do not understand package.json exports field. + */ +declare module "@smithy/core/eventStreams" { + export * from "@smithy/core/dist-types/submodules/eventStreams/index.d"; +} diff --git a/packages/core/eventStreams.js b/packages/core/eventStreams.js new file mode 100644 index 00000000000..f3a53171333 --- /dev/null +++ b/packages/core/eventStreams.js @@ -0,0 +1,6 @@ + +/** + * Do not edit: + * This is a compatibility redirect for contexts that do not understand package.json exports field. + */ +module.exports = require("./dist-cjs/submodules/eventStreams/index.js"); diff --git a/packages/core/package.json b/packages/core/package.json index 2e38a382325..f1d9a8a4146 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -60,6 +60,13 @@ "node": "./dist-cjs/submodules/schema/index.js", "import": "./dist-es/submodules/schema/index.js", "require": "./dist-cjs/submodules/schema/index.js" + }, + "./eventStreams": { + "types": "./dist-types/submodules/eventStreams/index.d.ts", + "module": "./dist-es/submodules/eventStreams/index.js", + "node": "./dist-cjs/submodules/eventStreams/index.js", + "import": "./dist-es/submodules/eventStreams/index.js", + "require": "./dist-cjs/submodules/eventStreams/index.js" } }, "author": { @@ -95,6 +102,8 @@ "files": [ "./cbor.d.ts", "./cbor.js", + "./eventStreams.d.ts", + "./eventStreams.js", "./protocols.d.ts", "./protocols.js", "./schema.d.ts", diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts b/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts new file mode 100644 index 00000000000..2ee30336df8 --- /dev/null +++ b/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts @@ -0,0 +1,306 @@ +import { FromStringShapeDeserializer, ToStringShapeSerializer } from "@smithy/core/protocols"; +import { NormalizedSchema, SCHEMA, struct } from "@smithy/core/schema"; +import { HttpResponse } from "@smithy/protocol-http"; +import { Schema, SerdeFunctions, ShapeDeserializer } from "@smithy/types"; +import { toUtf8 } from "@smithy/util-utf8"; +import { Readable } from "node:stream"; +import { describe, expect, test as it, vi } from "vitest"; + +import { EventStreamSerde } from "./EventStreamSerde"; + +class StructStringDeserializer implements ShapeDeserializer { + private fromString = new FromStringShapeDeserializer({ + httpBindings: true, + timestampFormat: { default: 7, useTrait: true }, + }); + + public read(schema: Schema, data: any): any { + const ns = NormalizedSchema.of(schema); + if (ns.isStructSchema()) { + const output = {} as any; + for (const [m, s] of ns.structIterator()) { + output[m] = this.fromString.read(s, data[m]); + } + return output; + } + + return this.fromString.read(schema, data); + } + + public setSerdeContext(serdeContext: SerdeFunctions): void {} +} + +describe(EventStreamSerde.name, () => { + describe("event stream serde", () => { + // this represents elements injected by the HttpProtocol caller. + const impl = { + serializer: new ToStringShapeSerializer({ + timestampFormat: { default: 7, useTrait: true }, + }), + deserializer: new StructStringDeserializer(), + getEventStreamMarshaller() { + return this.serdeContext.eventStreamMarshaller; + }, + serdeContext: { + eventStreamMarshaller: { + serialize: vi.fn().mockImplementation((eventStream, eventStreamSerializationFn) => { + return Readable.from({ + async *[Symbol.asyncIterator]() { + for await (const inputEvent of eventStream) { + yield eventStreamSerializationFn(inputEvent); + } + }, + }); + }), + deserialize: vi.fn().mockImplementation((body, eventStreamDeserializationFn) => { + return { + async *[Symbol.asyncIterator]() { + for await (const outputEvent of body) { + yield eventStreamDeserializationFn(outputEvent); + } + }, + }; + }), + }, + }, + getDefaultContentType() { + return "unit/test"; + }, + }; + + const eventStreamSerde = new EventStreamSerde({ + marshaller: impl.getEventStreamMarshaller(), + serializer: impl.serializer, + deserializer: impl.deserializer, + defaultContentType: impl.getDefaultContentType(), + }); + + const serializeEventStream = eventStreamSerde.serializeEventStream.bind(eventStreamSerde); + const deserializeEventStream = eventStreamSerde.deserializeEventStream.bind(eventStreamSerde); + + const eventStreamUnionSchema = struct( + "ns", + "EventStreamStructure", + { streaming: 1 }, + ["A", "B", "C"], + [struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])] + ); + + const eventStreamContainerSchema = struct( + "ns", + "EventStreamContainer", + 0, + ["eventStreamMember", "dateMember", "blobMember"], + [eventStreamUnionSchema, SCHEMA.TIMESTAMP_EPOCH_SECONDS, SCHEMA.BLOB] + ); + + it("serializes event streams", async () => { + const eventStream = { + async *[Symbol.asyncIterator]() { + yield { A: { name: "a" } }; + yield { B: { name: "b" } }; + yield { C: { name: "c" } }; + yield { $unknown: ["D", { name: "d" }] }; + }, + }; + + const requestBody = await serializeEventStream({ + eventStream, + requestSchema: NormalizedSchema.of(eventStreamContainerSchema), + }); + + const collect = []; + for await (const chunk of requestBody) { + collect.push(chunk); + } + expect( + collect.map((item) => { + return { + headers: item.headers, + body: toUtf8(item.body).replace(/\s+/g, ""), + }; + }) + ).toEqual([ + { + headers: { + ":event-type": { type: "string", value: "A" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"a"}`, + }, + { + headers: { + ":event-type": { type: "string", value: "B" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"b"}`, + }, + { + headers: { + ":event-type": { type: "string", value: "C" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"c"}`, + }, + { + headers: { + ":event-type": { type: "string", value: "D" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"d"}`, + }, + ]); + }); + + it("deserializes event streams", async () => { + const response = new HttpResponse({ + statusCode: 200, + body: { + async *[Symbol.asyncIterator]() { + yield { A: { headers: {}, body: { name: "a" } } }; + yield { B: { headers: {}, body: { name: "b" } } }; + yield { C: { headers: {}, body: { name: "c" } } }; + yield { D: { headers: {}, body: { name: "d" } } }; + }, + }, + }); + + const asyncIterable = await deserializeEventStream({ + response, + responseSchema: NormalizedSchema.of(eventStreamContainerSchema), + }); + + const collect = []; + for await (const event of asyncIterable) { + collect.push(event); + } + expect(collect).toEqual([ + { A: { name: `a` } }, + { B: { name: `b` } }, + { C: { name: `c` } }, + { $unknown: { D: { headers: {}, body: { name: "d" } } } }, + ]); + }); + + it("serializes event streams containing an initial-request", async () => { + const eventStream = { + async *[Symbol.asyncIterator]() { + yield { A: { name: "a" } }; + yield { B: { name: "b" } }; + yield { C: { name: "c" } }; + yield { $unknown: ["D", { name: "d" }] }; + }, + }; + + const requestBody = await serializeEventStream({ + eventStream, + requestSchema: NormalizedSchema.of(eventStreamContainerSchema), + initialRequest: { + dateMember: new Date(0), + blobMember: new Uint8Array([0, 1, 2, 3]), + }, + }); + + const collect = []; + for await (const chunk of requestBody) { + collect.push(chunk); + } + expect( + collect.map((item) => { + return { + headers: item.headers, + body: toUtf8(item.body).replace(/\s+/g, ""), + }; + }) + ).toEqual([ + { + headers: { + ":event-type": { type: "string", value: "initial-request" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"dateMember":"1970-01-01T00:00:00.000Z","blobMember":{"0":0,"1":1,"2":2,"3":3}}`, + }, + { + headers: { + ":event-type": { type: "string", value: "A" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"a"}`, + }, + { + headers: { + ":event-type": { type: "string", value: "B" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"b"}`, + }, + { + headers: { + ":event-type": { type: "string", value: "C" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"c"}`, + }, + { + headers: { + ":event-type": { type: "string", value: "D" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "unit/test" }, + }, + body: `{"name":"d"}`, + }, + ]); + }); + + it("deserializes event streams containing an initial-response", async () => { + const response = new HttpResponse({ + statusCode: 200, + body: { + async *[Symbol.asyncIterator]() { + yield { + "initial-response": { + headers: {}, + body: { dateMember: "0", blobMember: "AAECAw==" }, + }, + }; + yield { A: { headers: {}, body: { name: "a" } } }; + yield { B: { headers: {}, body: { name: "b" } } }; + yield { C: { headers: {}, body: { name: "c" } } }; + yield { D: { headers: {}, body: { name: "d" } } }; + }, + }, + }); + + const initialResponseContainer = {} as any; + + const asyncIterable = await deserializeEventStream({ + response, + responseSchema: NormalizedSchema.of(eventStreamContainerSchema), + initialResponseContainer, + }); + + const collect = []; + for await (const event of asyncIterable) { + collect.push(event); + } + expect(collect).toEqual([ + { A: { name: `a` } }, + { B: { name: `b` } }, + { C: { name: `c` } }, + { $unknown: { D: { headers: {}, body: { name: "d" } } } }, + ]); + expect(initialResponseContainer).toEqual({ + dateMember: new Date(0), + blobMember: new Uint8Array([0, 1, 2, 3]), + }); + }); + }); +}); diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.ts b/packages/core/src/submodules/eventStreams/EventStreamSerde.ts new file mode 100644 index 00000000000..0e4da74aef5 --- /dev/null +++ b/packages/core/src/submodules/eventStreams/EventStreamSerde.ts @@ -0,0 +1,262 @@ +import { NormalizedSchema, SCHEMA } from "@smithy/core/schema"; +import type { + EventStreamMarshaller, + HttpRequest as IHttpRequest, + HttpResponse as IHttpResponse, + Message as EventStreamMessage, + MessageHeaders, + MessageHeaderValue, + SerdeFunctions, + ShapeDeserializer, + ShapeSerializer, +} from "@smithy/types"; +import { fromUtf8 } from "@smithy/util-utf8"; + +/** + * Separated module for async mixin of EventStream serde capability. + * + * @alpha + */ +export class EventStreamSerde { + private readonly marshaller: EventStreamMarshaller; + private readonly serializer: ShapeSerializer; + private readonly deserializer: ShapeDeserializer; + private readonly serdeContext?: SerdeFunctions; + private readonly defaultContentType: string; + + public constructor({ + marshaller, + serializer, + deserializer, + serdeContext, + defaultContentType, + }: { + marshaller: EventStreamMarshaller; + serializer: ShapeSerializer; + deserializer: ShapeDeserializer; + serdeContext?: SerdeFunctions; + defaultContentType: string; + }) { + this.marshaller = marshaller; + this.serializer = serializer; + this.deserializer = deserializer; + this.serdeContext = serdeContext; + this.defaultContentType = defaultContentType; + } + + /** + * @param eventStream - the iterable provided by the caller. + * @param requestSchema - the schema of the event stream container (struct). + * @param [initialRequest] - only provided if the initial-request is part of the event stream (RPC). + * + * @returns a stream suitable for the HTTP body of a request. + */ + public async serializeEventStream({ + eventStream, + requestSchema, + initialRequest, + }: { + eventStream: AsyncIterable; + requestSchema: NormalizedSchema; + initialRequest?: any; + }): Promise { + const marshaller = this.marshaller; + const eventStreamMember = requestSchema.getEventStreamMember(); + const unionSchema = requestSchema.getMemberSchema(eventStreamMember); + const memberSchemas = unionSchema.getMemberSchemas(); + + const serializer = this.serializer; + const defaultContentType = this.defaultContentType; + + const initialRequestMarker = Symbol("initialRequestMarker"); + + const eventStreamIterable: AsyncIterable = { + async *[Symbol.asyncIterator]() { + if (initialRequest) { + const headers: MessageHeaders = { + ":event-type": { type: "string", value: "initial-request" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: defaultContentType }, + }; + serializer.write(requestSchema, initialRequest); + const body = serializer.flush(); + yield { + [initialRequestMarker]: true, + headers, + body, + }; + } + + for await (const page of eventStream) { + yield page; + } + }, + }; + + return marshaller.serialize(eventStreamIterable, (event: any): EventStreamMessage => { + if (event[initialRequestMarker]) { + return { + headers: event.headers, + body: event.body, + }; + } + + const unionMember = + Object.keys(event).find((key) => { + return key !== "__type"; + }) ?? ""; + const eventStreamSchema = memberSchemas[unionMember] ?? NormalizedSchema.of(SCHEMA.DOCUMENT); + + let messageSerialization: string | Uint8Array; + let eventType = unionMember; + + if (eventStreamSchema.isStructSchema()) { + serializer.write(eventStreamSchema, event[unionMember]); + messageSerialization = serializer.flush(); + } else { + // $unknown member + const [type, value] = event[unionMember]; + eventType = type; + serializer.write(NormalizedSchema.of(SCHEMA.DOCUMENT), value); + messageSerialization = serializer.flush(); + } + + const body = + typeof messageSerialization === "string" + ? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization) + : messageSerialization; + + const headers: MessageHeaders = { + ":event-type": { type: "string", value: eventType }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: defaultContentType }, + }; + + // additional trait-annotated event headers. + if (eventStreamSchema.isStructSchema()) { + for (const [memberName, memberSchema] of eventStreamSchema.structIterator()) { + const isHeader = !!memberSchema.getMergedTraits().eventHeader; + if (!isHeader) { + continue; + } + const value = event[memberName]; + let type = "binary" as MessageHeaderValue["type"]; + if (memberSchema.isNumericSchema()) { + if ((-2) ** 31 <= value && value <= 2 ** 31 - 1) { + type = "integer"; + } else { + type = "long"; + } + } else if (memberSchema.isTimestampSchema()) { + type = "timestamp"; + } else if (memberSchema.isStringSchema()) { + type = "string"; + } else if (memberSchema.isBooleanSchema()) { + type = "boolean"; + } + + if (isHeader && value != undefined) { + headers[memberName] = { + type, + value, + }; + } + } + } + + return { + headers, + body, + }; + }); + } + + /** + * @param response - http response from which to read the event stream. + * @param unionSchema - schema of the event stream container (struct). + * @param [initialResponseContainer] - provided and written to only if the initial response is part of the event stream (RPC). + * + * @returns the asyncIterable of the event stream. + */ + public async deserializeEventStream({ + response, + responseSchema, + initialResponseContainer, + }: { + response: IHttpResponse; + responseSchema: NormalizedSchema; + initialResponseContainer?: any; + }): Promise> { + const marshaller = this.marshaller; + const eventStreamMember = responseSchema.getEventStreamMember(); + const unionSchema = responseSchema.getMemberSchema(eventStreamMember); + const memberSchemas = unionSchema.getMemberSchemas(); + + const initialResponseMarker = Symbol("initialResponseMarker"); + + const asyncIterable = marshaller.deserialize(response.body, async (event) => { + const unionMember = + Object.keys(event).find((key) => { + return key !== "__type"; + }) ?? ""; + + if (unionMember === "initial-response") { + const dataObject = await this.deserializer.read(responseSchema, event[unionMember].body); + delete dataObject[eventStreamMember]; + return { + [initialResponseMarker]: true, + ...dataObject, + }; + } else if (unionMember in memberSchemas) { + const eventStreamSchema = memberSchemas[unionMember]; + return { + [unionMember]: await this.deserializer.read(eventStreamSchema, event[unionMember].body), + }; + } else { + // todo(schema): This union convention is ignored by the event stream marshaller. + // todo(schema): This should be returned to the user instead. + // see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts + return { + $unknown: event, + }; + } + }); + + const asyncIterator = asyncIterable[Symbol.asyncIterator](); + const firstEvent = await asyncIterator.next(); + + if (firstEvent.done) { + return asyncIterable; + } + + if (firstEvent.value?.[initialResponseMarker]) { + // if the initial response is part of the event stream, we assume + // that the response schema was provided because RpcProtocols are the only ones + // that act in this way. + if (!responseSchema) { + throw new Error( + "@smithy::core/protocols - initial-response event encountered in event stream but no response schema given." + ); + } + + for (const [key, value] of Object.entries(firstEvent.value)) { + initialResponseContainer[key] = value; + } + } + + return { + async *[Symbol.asyncIterator]() { + if (!firstEvent?.value?.[initialResponseMarker]) { + yield firstEvent.value; + } + while (true) { + const { done, value } = await asyncIterator.next(); + if (done) { + break; + } + yield value; + } + }, + }; + } +} diff --git a/packages/core/src/submodules/eventStreams/index.ts b/packages/core/src/submodules/eventStreams/index.ts new file mode 100644 index 00000000000..849f48ccfda --- /dev/null +++ b/packages/core/src/submodules/eventStreams/index.ts @@ -0,0 +1 @@ +export * from "./EventStreamSerde"; diff --git a/packages/core/src/submodules/protocols/HttpBindingProtocol.ts b/packages/core/src/submodules/protocols/HttpBindingProtocol.ts index 5f720df99eb..0feddbf7745 100644 --- a/packages/core/src/submodules/protocols/HttpBindingProtocol.ts +++ b/packages/core/src/submodules/protocols/HttpBindingProtocol.ts @@ -85,14 +85,17 @@ export abstract class HttpBindingProtocol extends HttpProtocol { if (isStreaming) { const isEventStream = memberNs.isStructSchema(); if (isEventStream) { + // event stream (union) + // initial-request is handled by other HTTP bindings. + // no additional handling is needed here. if (input[memberName]) { - payload = this.serializeEventStream({ + payload = await this.serializeEventStream({ eventStream: input[memberName], - unionSchema: memberNs, + requestSchema: ns, }); } } else { - // streaming blob body + // data stream (blob) payload = inputMemberValue; } } else { @@ -281,9 +284,10 @@ export abstract class HttpBindingProtocol extends HttpProtocol { const isEventStream = memberSchema.isStructSchema(); if (isEventStream) { // event stream (union) - dataObject[memberName] = this.deserializeEventStream({ + // initial-response is handled by other HTTP bindings. + dataObject[memberName] = await this.deserializeEventStream({ response, - unionSchema: memberSchema, + responseSchema: ns, }); } else { // data stream (blob) diff --git a/packages/core/src/submodules/protocols/HttpProtocol.spec.ts b/packages/core/src/submodules/protocols/HttpProtocol.spec.ts index 12375128350..bf84daf1978 100644 --- a/packages/core/src/submodules/protocols/HttpProtocol.spec.ts +++ b/packages/core/src/submodules/protocols/HttpProtocol.spec.ts @@ -1,13 +1,9 @@ -import { map, NormalizedSchema, SCHEMA, struct } from "@smithy/core/schema"; -import { HttpResponse } from "@smithy/protocol-http"; +import { map, SCHEMA, struct } from "@smithy/core/schema"; import type { HandlerExecutionContext, HttpResponse as IHttpResponse, Schema, SerdeFunctions } from "@smithy/types"; -import { toUtf8 } from "@smithy/util-utf8"; -import { Readable } from "node:stream"; -import { describe, expect, test as it, vi } from "vitest"; +import { describe, expect, test as it } from "vitest"; import { HttpProtocol } from "./HttpProtocol"; import { FromStringShapeDeserializer } from "./serde/FromStringShapeDeserializer"; -import { ToStringShapeSerializer } from "./serde/ToStringShapeSerializer"; describe(HttpProtocol.name, () => { it("ignores http bindings (only HttpBindingProtocol uses them)", async () => { @@ -53,149 +49,4 @@ describe(HttpProtocol.name, () => { // headers were ignored }); }); - - describe("event stream serde", () => { - const impl = { - serializer: new ToStringShapeSerializer({ - timestampFormat: { default: 7, useTrait: true }, - }), - deserializer: new FromStringShapeDeserializer({ - httpBindings: true, - timestampFormat: { default: 7, useTrait: true }, - }), - getEventStreamMarshaller() { - return this.serdeContext.eventStreamMarshaller; - }, - serdeContext: { - eventStreamMarshaller: { - serialize: vi.fn().mockImplementation((eventStream, eventStreamSerializationFn) => { - return Readable.from({ - async *[Symbol.asyncIterator]() { - for await (const inputEvent of eventStream) { - yield eventStreamSerializationFn(inputEvent); - } - }, - }); - }), - deserialize: vi.fn().mockImplementation((body, eventStreamDeserializationFn) => { - return { - async *[Symbol.asyncIterator]() { - for await (const outputEvent of body) { - yield eventStreamDeserializationFn(outputEvent); - } - }, - }; - }), - }, - }, - getDefaultContentType() { - return "unit/test"; - }, - }; - - const serializeEventStream = (HttpProtocol.prototype as any).serializeEventStream.bind(impl); - const deserializeEventStream = (HttpProtocol.prototype as any).deserializeEventStream.bind(impl); - - const eventStreamUnionSchema = struct( - "ns", - "EventStreamStructure", - { streaming: 1 }, - ["A", "B", "C"], - [struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])] - ); - - it("serializes event streams", async () => { - const eventStream = { - async *[Symbol.asyncIterator]() { - yield { A: { name: "a" } }; - yield { B: { name: "b" } }; - yield { C: { name: "c" } }; - yield { $unknown: ["D", { name: "d" }] }; - }, - }; - const unionSchema = NormalizedSchema.of(eventStreamUnionSchema); - - const requestBody = serializeEventStream({ - eventStream, - unionSchema, - }); - - const collect = []; - for await (const chunk of requestBody) { - collect.push(chunk); - } - expect( - collect.map((item) => { - return { - headers: item.headers, - body: toUtf8(item.body).replace(/\s+/g, ""), - }; - }) - ).toEqual([ - { - headers: { - ":event-type": { type: "string", value: "A" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"a"}`, - }, - { - headers: { - ":event-type": { type: "string", value: "B" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"b"}`, - }, - { - headers: { - ":event-type": { type: "string", value: "C" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"c"}`, - }, - { - headers: { - ":event-type": { type: "string", value: "D" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"d"}`, - }, - ]); - }); - - it("deserializes event streams", async () => { - const response = new HttpResponse({ - statusCode: 200, - body: { - async *[Symbol.asyncIterator]() { - yield { A: { headers: {}, body: { name: "a" } } }; - yield { B: { headers: {}, body: { name: "b" } } }; - yield { C: { headers: {}, body: { name: "c" } } }; - yield { D: { headers: {}, body: { name: "d" } } }; - }, - }, - }); - const unionSchema = NormalizedSchema.of(eventStreamUnionSchema); - - const asyncIterable = deserializeEventStream({ - response, - unionSchema, - }); - - const collect = []; - for await (const event of asyncIterable) { - collect.push(event); - } - expect(collect).toEqual([ - { A: { name: "a" } }, - { B: { name: "b" } }, - { C: { name: "c" } }, - { $unknown: { D: { headers: {}, body: { name: "d" } } } }, - ]); - }); - }); }); diff --git a/packages/core/src/submodules/protocols/HttpProtocol.ts b/packages/core/src/submodules/protocols/HttpProtocol.ts index da207781d3c..eddb9717734 100644 --- a/packages/core/src/submodules/protocols/HttpProtocol.ts +++ b/packages/core/src/submodules/protocols/HttpProtocol.ts @@ -1,5 +1,5 @@ +import type { EventStreamSerde } from "@smithy/core/eventStreams"; import { NormalizedSchema } from "@smithy/core/schema"; -import { SCHEMA } from "@smithy/core/schema"; import { HttpRequest, HttpResponse } from "@smithy/protocol-http"; import type { ClientProtocol, @@ -12,9 +12,6 @@ import type { HandlerExecutionContext, HttpRequest as IHttpRequest, HttpResponse as IHttpResponse, - Message as EventStreamMessage, - MessageHeaders, - MessageHeaderValue, MetadataBearer, OperationSchema, ResponseMetadata, @@ -23,7 +20,6 @@ import type { ShapeDeserializer, ShapeSerializer, } from "@smithy/types"; -import { fromUtf8 } from "@smithy/util-utf8"; /** * Abstract base for HTTP-based client protocols. @@ -146,121 +142,64 @@ export abstract class HttpProtocol implements ClientProtocol; - unionSchema: NormalizedSchema; - }): IHttpRequest["body"] { - const marshaller = this.getEventStreamMarshaller(); - const memberSchemas = unionSchema.getMemberSchemas(); - - return marshaller.serialize(eventStream, (event: any): EventStreamMessage => { - const unionMember = - Object.keys(event).find((key) => { - return key !== "__type"; - }) ?? ""; - const eventStreamSchema = memberSchemas[unionMember] ?? NormalizedSchema.of(SCHEMA.DOCUMENT); - - let messageSerialization: string | Uint8Array; - let eventType = unionMember; - - if (eventStreamSchema.isStructSchema()) { - this.serializer.write(eventStreamSchema, event[unionMember]); - messageSerialization = this.serializer.flush(); - } else { - // $unknown member - const [type, value] = event[unionMember]; - eventType = type; - this.serializer.write(NormalizedSchema.of(SCHEMA.DOCUMENT), value); - messageSerialization = this.serializer.flush(); - } - - const body = - typeof messageSerialization === "string" - ? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization) - : messageSerialization; - - const headers: MessageHeaders = { - ":event-type": { type: "string", value: eventType }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: this.getDefaultContentType() }, - }; - - // additional trait-annotated event headers. - if (eventStreamSchema.isStructSchema()) { - for (const [memberName, memberSchema] of eventStreamSchema.structIterator()) { - const isHeader = !!memberSchema.getMergedTraits().eventHeader; - if (!isHeader) { - continue; - } - const value = event[memberName]; - let type = "binary" as MessageHeaderValue["type"]; - if (memberSchema.isNumericSchema()) { - if ((-2) ** 31 <= value && value <= 2 ** 31 - 1) { - type = "integer"; - } else { - type = "long"; - } - } else if (memberSchema.isTimestampSchema()) { - type = "timestamp"; - } else if (memberSchema.isStringSchema()) { - type = "string"; - } else if (memberSchema.isBooleanSchema()) { - type = "boolean"; - } - - if (isHeader && value != undefined) { - headers[memberName] = { - type, - value, - }; - } - } - } - - return { - headers, - body, - }; + requestSchema: NormalizedSchema; + initialRequest?: any; + }): Promise { + const eventStreamSerde = await this.loadEventStreamCapability(); + return eventStreamSerde.serializeEventStream({ + eventStream, + requestSchema, + initialRequest, }); } /** + * @param response - http response from which to read the event stream. + * @param unionSchema - schema of the event stream container (struct). + * @param [initialResponseContainer] - provided and written to only if the initial response is part of the event stream (RPC). + * * @returns the asyncIterable of the event stream. */ - protected deserializeEventStream({ + protected async deserializeEventStream({ response, - unionSchema, + responseSchema, + initialResponseContainer, }: { response: IHttpResponse; - unionSchema: NormalizedSchema; - }): AsyncIterable<{ [key: string]: any; $unknown?: unknown }> { - const marshaller = this.getEventStreamMarshaller(); - const memberSchemas = unionSchema.getMemberSchemas(); - - return marshaller.deserialize(response.body, async (event) => { - const unionMember = - Object.keys(event).find((key) => { - return key !== "__type"; - }) ?? ""; + responseSchema: NormalizedSchema; + initialResponseContainer?: any; + }): Promise> { + const eventStreamSerde = await this.loadEventStreamCapability(); + return eventStreamSerde.deserializeEventStream({ + response, + responseSchema, + initialResponseContainer, + }); + } - if (unionMember in memberSchemas) { - const eventStreamSchema = memberSchemas[unionMember]; - return { - [unionMember]: await this.deserializer.read(eventStreamSchema, event[unionMember].body), - }; - } else { - // todo(schema): This union convention is ignored by the event stream marshaller. - // todo(schema): This should be returned to the user instead. - // see "if (deserialized.$unknown) return;" in getUnmarshalledStream.ts - return { - $unknown: event, - }; - } + /** + * Loads eventStream capability async (for chunking). + */ + protected async loadEventStreamCapability(): Promise { + const { EventStreamSerde } = await import("@smithy/core/eventStreams"); + return new EventStreamSerde({ + marshaller: this.getEventStreamMarshaller(), + serializer: this.serializer, + deserializer: this.deserializer, + serdeContext: this.serdeContext, + defaultContentType: this.getDefaultContentType(), }); } diff --git a/packages/core/src/submodules/protocols/RpcProtocol.ts b/packages/core/src/submodules/protocols/RpcProtocol.ts index d421536d90f..c5153558c0f 100644 --- a/packages/core/src/submodules/protocols/RpcProtocol.ts +++ b/packages/core/src/submodules/protocols/RpcProtocol.ts @@ -60,9 +60,18 @@ export abstract class RpcProtocol extends HttpProtocol { if (eventStreamMember) { if (_input[eventStreamMember]) { - payload = this.serializeEventStream({ + const initialRequest = {} as any; + for (const [memberName, memberSchema] of ns.structIterator()) { + if (memberName !== eventStreamMember && _input[memberName]) { + serializer.write(memberSchema, _input[memberName]); + initialRequest[memberName] = serializer.flush(); + } + } + + payload = await this.serializeEventStream({ eventStream: _input[eventStreamMember], - unionSchema: ns.getMemberSchema(eventStreamMember), + requestSchema: ns, + initialRequest, }); } } else { @@ -106,10 +115,10 @@ export abstract class RpcProtocol extends HttpProtocol { const eventStreamMember = ns.getEventStreamMember(); if (eventStreamMember) { - // todo(schema): assign other dataObject fields from initial-response. - dataObject[eventStreamMember] = this.deserializeEventStream({ + dataObject[eventStreamMember] = await this.deserializeEventStream({ response, - unionSchema: ns.getMemberSchema(eventStreamMember), + responseSchema: ns, + initialResponseContainer: dataObject, }); } else { const bytes: Uint8Array = await collectBody(response.body, context as SerdeFunctions); diff --git a/packages/core/src/submodules/protocols/index.ts b/packages/core/src/submodules/protocols/index.ts index 33a857e25ea..f0a88dc6215 100644 --- a/packages/core/src/submodules/protocols/index.ts +++ b/packages/core/src/submodules/protocols/index.ts @@ -1,6 +1,7 @@ export * from "./collect-stream-body"; export * from "./extended-encode-uri-component"; export * from "./HttpBindingProtocol"; +export * from "./HttpProtocol"; export * from "./RpcProtocol"; export * from "./requestBuilder"; export * from "./resolve-path"; diff --git a/packages/core/tsconfig.cjs.json b/packages/core/tsconfig.cjs.json index 5804ea19cf1..a59d097b9b4 100644 --- a/packages/core/tsconfig.cjs.json +++ b/packages/core/tsconfig.cjs.json @@ -7,7 +7,8 @@ "@smithy/core/cbor": ["./src/submodules/cbor/index.ts"], "@smithy/core/protocols": ["./src/submodules/protocols/index.ts"], "@smithy/core/serde": ["./src/submodules/serde/index.ts"], - "@smithy/core/schema": ["./src/submodules/schema/index.ts"] + "@smithy/core/schema": ["./src/submodules/schema/index.ts"], + "@smithy/core/eventStreams": ["./src/submodules/eventStreams/index.ts"] } }, "extends": "../../tsconfig.cjs.json", diff --git a/packages/core/tsconfig.es.json b/packages/core/tsconfig.es.json index 6cb3631b45e..6e3f93fcbbc 100644 --- a/packages/core/tsconfig.es.json +++ b/packages/core/tsconfig.es.json @@ -8,7 +8,8 @@ "@smithy/core/cbor": ["./src/submodules/cbor/index.ts"], "@smithy/core/protocols": ["./src/submodules/protocols/index.ts"], "@smithy/core/serde": ["./src/submodules/serde/index.ts"], - "@smithy/core/schema": ["./src/submodules/schema/index.ts"] + "@smithy/core/schema": ["./src/submodules/schema/index.ts"], + "@smithy/core/eventStreams": ["./src/submodules/eventStreams/index.ts"] } }, "extends": "../../tsconfig.es.json", diff --git a/packages/core/tsconfig.types.json b/packages/core/tsconfig.types.json index 42ce68798f6..4b20a12fb3f 100644 --- a/packages/core/tsconfig.types.json +++ b/packages/core/tsconfig.types.json @@ -7,7 +7,8 @@ "@smithy/core/cbor": ["./src/submodules/cbor/index.ts"], "@smithy/core/protocols": ["./src/submodules/protocols/index.ts"], "@smithy/core/serde": ["./src/submodules/serde/index.ts"], - "@smithy/core/schema": ["./src/submodules/schema/index.ts"] + "@smithy/core/schema": ["./src/submodules/schema/index.ts"], + "@smithy/core/eventStreams": ["./src/submodules/eventStreams/index.ts"] } }, "extends": "../../tsconfig.types.json", From 812b849669af016a310e0a8f67ca560404adb995 Mon Sep 17 00:00:00 2001 From: George Fu Date: Wed, 27 Aug 2025 12:57:33 -0400 Subject: [PATCH 3/5] feat(core/eventStreams): test with eventStreamMarshaller --- .changeset/calm-trees-teach.md | 2 +- packages/core/package.json | 1 + .../eventStreams/EventStreamSerde.spec.ts | 385 ++++++++---------- .../eventStreams/EventStreamSerde.ts | 8 +- yarn.lock | 3 +- 5 files changed, 188 insertions(+), 211 deletions(-) diff --git a/.changeset/calm-trees-teach.md b/.changeset/calm-trees-teach.md index 4cafa391ef0..a4992e9b5d4 100644 --- a/.changeset/calm-trees-teach.md +++ b/.changeset/calm-trees-teach.md @@ -1,5 +1,5 @@ --- -"@smithy/core": patch +"@smithy/core": minor --- CBOR protocol error handling fallbacks diff --git a/packages/core/package.json b/packages/core/package.json index f1d9a8a4146..1e884bf67db 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -119,6 +119,7 @@ "directory": "packages/core" }, "devDependencies": { + "@smithy/eventstream-serde-node": "workspace:^", "@types/node": "^18.11.9", "concurrently": "7.0.0", "downlevel-dts": "0.10.1", diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts b/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts index 2ee30336df8..e38cc41b23a 100644 --- a/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts +++ b/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts @@ -1,70 +1,33 @@ -import { FromStringShapeDeserializer, ToStringShapeSerializer } from "@smithy/core/protocols"; +import { cbor, CborCodec, dateToTag } from "@smithy/core/cbor"; import { NormalizedSchema, SCHEMA, struct } from "@smithy/core/schema"; +import { EventStreamMarshaller } from "@smithy/eventstream-serde-node"; import { HttpResponse } from "@smithy/protocol-http"; -import { Schema, SerdeFunctions, ShapeDeserializer } from "@smithy/types"; -import { toUtf8 } from "@smithy/util-utf8"; -import { Readable } from "node:stream"; -import { describe, expect, test as it, vi } from "vitest"; +import { Message as EventMessage } from "@smithy/types"; +import { fromUtf8, toUtf8 } from "@smithy/util-utf8"; +import { describe, expect, test as it } from "vitest"; import { EventStreamSerde } from "./EventStreamSerde"; -class StructStringDeserializer implements ShapeDeserializer { - private fromString = new FromStringShapeDeserializer({ - httpBindings: true, - timestampFormat: { default: 7, useTrait: true }, - }); - - public read(schema: Schema, data: any): any { - const ns = NormalizedSchema.of(schema); - if (ns.isStructSchema()) { - const output = {} as any; - for (const [m, s] of ns.structIterator()) { - output[m] = this.fromString.read(s, data[m]); - } - return output; - } - - return this.fromString.read(schema, data); - } - - public setSerdeContext(serdeContext: SerdeFunctions): void {} -} - describe(EventStreamSerde.name, () => { describe("event stream serde", () => { + const cborCodec = new CborCodec(); + // this represents elements injected by the HttpProtocol caller. + // we use the real event stream marshaller (universal) here to get an accurate integration test. const impl = { - serializer: new ToStringShapeSerializer({ - timestampFormat: { default: 7, useTrait: true }, - }), - deserializer: new StructStringDeserializer(), + serializer: cborCodec.createSerializer(), + deserializer: cborCodec.createDeserializer(), getEventStreamMarshaller() { return this.serdeContext.eventStreamMarshaller; }, serdeContext: { - eventStreamMarshaller: { - serialize: vi.fn().mockImplementation((eventStream, eventStreamSerializationFn) => { - return Readable.from({ - async *[Symbol.asyncIterator]() { - for await (const inputEvent of eventStream) { - yield eventStreamSerializationFn(inputEvent); - } - }, - }); - }), - deserialize: vi.fn().mockImplementation((body, eventStreamDeserializationFn) => { - return { - async *[Symbol.asyncIterator]() { - for await (const outputEvent of body) { - yield eventStreamDeserializationFn(outputEvent); - } - }, - }; - }), - }, + eventStreamMarshaller: new EventStreamMarshaller({ + utf8Encoder: toUtf8, + utf8Decoder: fromUtf8, + }), }, getDefaultContentType() { - return "unit/test"; + return "application/cbor"; }, }; @@ -75,14 +38,12 @@ describe(EventStreamSerde.name, () => { defaultContentType: impl.getDefaultContentType(), }); - const serializeEventStream = eventStreamSerde.serializeEventStream.bind(eventStreamSerde); - const deserializeEventStream = eventStreamSerde.deserializeEventStream.bind(eventStreamSerde); - const eventStreamUnionSchema = struct( "ns", "EventStreamStructure", { streaming: 1 }, ["A", "B", "C"], + // D is omitted to represent an unknown event. [struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])] ); @@ -90,12 +51,18 @@ describe(EventStreamSerde.name, () => { "ns", "EventStreamContainer", 0, + // here the non-eventstream members form an initial-request + // or initial-response when present. ["eventStreamMember", "dateMember", "blobMember"], [eventStreamUnionSchema, SCHEMA.TIMESTAMP_EPOCH_SECONDS, SCHEMA.BLOB] ); - it("serializes event streams", async () => { - const eventStream = { + describe("serialization", () => { + async function messageDeserializer(event: Record): Promise { + return event; + } + + const eventStreamCallerInput = { async *[Symbol.asyncIterator]() { yield { A: { name: "a" } }; yield { B: { name: "b" } }; @@ -104,202 +71,206 @@ describe(EventStreamSerde.name, () => { }, }; - const requestBody = await serializeEventStream({ - eventStream, - requestSchema: NormalizedSchema.of(eventStreamContainerSchema), - }); - - const collect = []; - for await (const chunk of requestBody) { - collect.push(chunk); - } - expect( - collect.map((item) => { - return { - headers: item.headers, - body: toUtf8(item.body).replace(/\s+/g, ""), - }; - }) - ).toEqual([ + const canonicalEvents = [ { headers: { ":event-type": { type: "string", value: "A" }, ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, + ":content-type": { type: "string", value: "application/cbor" }, }, - body: `{"name":"a"}`, + body: { name: "a" }, }, { headers: { ":event-type": { type: "string", value: "B" }, ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, + ":content-type": { type: "string", value: "application/cbor" }, }, - body: `{"name":"b"}`, + body: { name: "b" }, }, { headers: { ":event-type": { type: "string", value: "C" }, ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, + ":content-type": { type: "string", value: "application/cbor" }, }, - body: `{"name":"c"}`, + body: { name: "c" }, }, { headers: { ":event-type": { type: "string", value: "D" }, ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, + ":content-type": { type: "string", value: "application/cbor" }, }, - body: `{"name":"d"}`, + body: { name: "d" }, }, - ]); - }); + ]; - it("deserializes event streams", async () => { - const response = new HttpResponse({ - statusCode: 200, - body: { - async *[Symbol.asyncIterator]() { - yield { A: { headers: {}, body: { name: "a" } } }; - yield { B: { headers: {}, body: { name: "b" } } }; - yield { C: { headers: {}, body: { name: "c" } } }; - yield { D: { headers: {}, body: { name: "d" } } }; - }, - }, + /** + * Takes an outgoing request requestBody of event streams, + * collects it, and maps to the canonical object form. + */ + async function collectTranslate(requestBody: any) { + const reparsed = impl.getEventStreamMarshaller().deserialize(requestBody, messageDeserializer); + + const collect = []; + for await (const chunk of reparsed) { + collect.push(chunk); + } + + return collect.map((item) => { + const object = Object.values(item)[0] as any; + return { + headers: object.headers, + body: cbor.deserialize(object.body), + }; + }); + } + + it("serializes event streams", async () => { + const requestBody = await eventStreamSerde.serializeEventStream({ + eventStream: eventStreamCallerInput, + requestSchema: NormalizedSchema.of(eventStreamContainerSchema), + }); + + expect(await collectTranslate(requestBody)).toEqual(canonicalEvents); }); - const asyncIterable = await deserializeEventStream({ - response, - responseSchema: NormalizedSchema.of(eventStreamContainerSchema), + it("serializes event streams containing an initial-request", async () => { + const requestBody = await eventStreamSerde.serializeEventStream({ + eventStream: eventStreamCallerInput, + requestSchema: NormalizedSchema.of(eventStreamContainerSchema), + initialRequest: { + dateMember: new Date(0), + blobMember: new Uint8Array([0, 1, 2, 3]), + }, + }); + + expect(await collectTranslate(requestBody)).toEqual([ + { + headers: { + ":event-type": { type: "string", value: "initial-request" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "application/cbor" }, + }, + body: { + blobMember: new Uint8Array([0, 1, 2, 3]), + dateMember: dateToTag(new Date(0)), + }, + }, + ...canonicalEvents, + ]); }); + }); - const collect = []; - for await (const event of asyncIterable) { - collect.push(event); + describe("deserialization", () => { + /** + * Converts a canonical event to a JS object representation + * of an event stream event. + */ + function messageSerializer(event: any): EventMessage { + const eventType = Object.keys(event)[0]; + return { + headers: { + ":message-type": { type: "string", value: "event" }, + ":event-type": { type: "string", value: eventType }, + ":content-type": { type: "string", value: "application/cbor" }, + }, + body: cbor.serialize(event[eventType]), + }; } - expect(collect).toEqual([ - { A: { name: `a` } }, - { B: { name: `b` } }, - { C: { name: `c` } }, - { $unknown: { D: { headers: {}, body: { name: "d" } } } }, - ]); - }); - it("serializes event streams containing an initial-request", async () => { - const eventStream = { + const eventStreamMarshaller = impl.getEventStreamMarshaller(); + + const canonicalEvents = { async *[Symbol.asyncIterator]() { yield { A: { name: "a" } }; yield { B: { name: "b" } }; yield { C: { name: "c" } }; - yield { $unknown: ["D", { name: "d" }] }; + yield { D: { name: "d" } }; }, }; - const requestBody = await serializeEventStream({ - eventStream, - requestSchema: NormalizedSchema.of(eventStreamContainerSchema), - initialRequest: { - dateMember: new Date(0), - blobMember: new Uint8Array([0, 1, 2, 3]), + const $unknownEvent = { + $unknown: { + D: { + headers: { + ":message-type": { type: "string", value: "event" }, + ":event-type": { type: "string", value: "D" }, + ":content-type": { type: "string", value: "application/cbor" }, + }, + body: Uint8Array.from(cbor.serialize({ name: "d" })), + }, }, - }); + }; + void $unknownEvent; - const collect = []; - for await (const chunk of requestBody) { - collect.push(chunk); + async function collect(asyncIterable: AsyncIterable): Promise { + const collect = []; + for await (const event of asyncIterable) { + collect.push(event); + } + return collect; } - expect( - collect.map((item) => { - return { - headers: item.headers, - body: toUtf8(item.body).replace(/\s+/g, ""), - }; - }) - ).toEqual([ - { - headers: { - ":event-type": { type: "string", value: "initial-request" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"dateMember":"1970-01-01T00:00:00.000Z","blobMember":{"0":0,"1":1,"2":2,"3":3}}`, - }, - { - headers: { - ":event-type": { type: "string", value: "A" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"a"}`, - }, - { - headers: { - ":event-type": { type: "string", value: "B" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"b"}`, - }, - { - headers: { - ":event-type": { type: "string", value: "C" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"c"}`, - }, - { - headers: { - ":event-type": { type: "string", value: "D" }, - ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: "unit/test" }, - }, - body: `{"name":"d"}`, - }, - ]); - }); - it("deserializes event streams containing an initial-response", async () => { - const response = new HttpResponse({ - statusCode: 200, - body: { - async *[Symbol.asyncIterator]() { - yield { - "initial-response": { - headers: {}, - body: { dateMember: "0", blobMember: "AAECAw==" }, - }, - }; - yield { A: { headers: {}, body: { name: "a" } } }; - yield { B: { headers: {}, body: { name: "b" } } }; - yield { C: { headers: {}, body: { name: "c" } } }; - yield { D: { headers: {}, body: { name: "d" } } }; - }, - }, - }); + it("deserializes event streams", async () => { + const response = new HttpResponse({ + statusCode: 200, + body: eventStreamMarshaller.serialize(canonicalEvents, messageSerializer), + }); - const initialResponseContainer = {} as any; + const asyncIterable = await eventStreamSerde.deserializeEventStream({ + response, + responseSchema: NormalizedSchema.of(eventStreamContainerSchema), + }); - const asyncIterable = await deserializeEventStream({ - response, - responseSchema: NormalizedSchema.of(eventStreamContainerSchema), - initialResponseContainer, + expect(await collect(asyncIterable)).toEqual([ + { A: { name: `a` } }, + { B: { name: `b` } }, + { C: { name: `c` } }, + // todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events. + // $unknownEvent, + ]); }); - const collect = []; - for await (const event of asyncIterable) { - collect.push(event); - } - expect(collect).toEqual([ - { A: { name: `a` } }, - { B: { name: `b` } }, - { C: { name: `c` } }, - { $unknown: { D: { headers: {}, body: { name: "d" } } } }, - ]); - expect(initialResponseContainer).toEqual({ - dateMember: new Date(0), - blobMember: new Uint8Array([0, 1, 2, 3]), + it("deserializes event streams containing an initial-response", async () => { + const response = new HttpResponse({ + statusCode: 200, + body: eventStreamMarshaller.serialize( + { + async *[Symbol.asyncIterator]() { + yield { + "initial-response": { dateMember: 0, blobMember: "AAECAw==" }, + }; + for await (const it of canonicalEvents) { + yield it; + } + }, + }, + messageSerializer + ), + }); + + const initialResponseContainer = {} as any; + + const asyncIterable = await eventStreamSerde.deserializeEventStream({ + response, + responseSchema: NormalizedSchema.of(eventStreamContainerSchema), + initialResponseContainer, + }); + + expect(await collect(asyncIterable)).toEqual([ + { A: { name: `a` } }, + { B: { name: `b` } }, + { C: { name: `c` } }, + // todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events. + // $unknownEvent, + ]); + + expect(initialResponseContainer).toEqual({ + dateMember: new Date(0), + blobMember: new Uint8Array([0, 1, 2, 3]), + }); }); }); }); diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.ts b/packages/core/src/submodules/eventStreams/EventStreamSerde.ts index 0e4da74aef5..8b8202ab559 100644 --- a/packages/core/src/submodules/eventStreams/EventStreamSerde.ts +++ b/packages/core/src/submodules/eventStreams/EventStreamSerde.ts @@ -14,6 +14,7 @@ import { fromUtf8 } from "@smithy/util-utf8"; /** * Separated module for async mixin of EventStream serde capability. + * This is used by the HttpProtocol base class from \@smithy/core/protocols. * * @alpha */ @@ -24,6 +25,9 @@ export class EventStreamSerde { private readonly serdeContext?: SerdeFunctions; private readonly defaultContentType: string; + /** + * Properties are injected by the HttpProtocol. + */ public constructor({ marshaller, serializer, @@ -59,7 +63,7 @@ export class EventStreamSerde { eventStream: AsyncIterable; requestSchema: NormalizedSchema; initialRequest?: any; - }): Promise { + }): Promise { const marshaller = this.marshaller; const eventStreamMember = requestSchema.getEventStreamMember(); const unionSchema = requestSchema.getMemberSchema(eventStreamMember); @@ -176,7 +180,7 @@ export class EventStreamSerde { * @param unionSchema - schema of the event stream container (struct). * @param [initialResponseContainer] - provided and written to only if the initial response is part of the event stream (RPC). * - * @returns the asyncIterable of the event stream. + * @returns the asyncIterable of the event stream for the end-user. */ public async deserializeEventStream({ response, diff --git a/yarn.lock b/yarn.lock index 2a89dfe091d..1e3a89904d3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2474,6 +2474,7 @@ __metadata: version: 0.0.0-use.local resolution: "@smithy/core@workspace:packages/core" dependencies: + "@smithy/eventstream-serde-node": "workspace:^" "@smithy/middleware-serde": "workspace:^" "@smithy/protocol-http": "workspace:^" "@smithy/types": "workspace:^" @@ -2555,7 +2556,7 @@ __metadata: languageName: unknown linkType: soft -"@smithy/eventstream-serde-node@workspace:packages/eventstream-serde-node": +"@smithy/eventstream-serde-node@workspace:^, @smithy/eventstream-serde-node@workspace:packages/eventstream-serde-node": version: 0.0.0-use.local resolution: "@smithy/eventstream-serde-node@workspace:packages/eventstream-serde-node" dependencies: From fcedfb39ac141d92c2cea3591dbb4337f9bbd1d1 Mon Sep 17 00:00:00 2001 From: George Fu Date: Thu, 28 Aug 2025 11:19:24 -0400 Subject: [PATCH 4/5] handle eventHeader and eventPayload traits --- .../eventStreams/EventStreamSerde.spec.ts | 63 +++++++- .../eventStreams/EventStreamSerde.ts | 149 +++++++++++------- .../schema/schemas/NormalizedSchema.spec.ts | 9 ++ .../schema/schemas/NormalizedSchema.ts | 12 ++ .../schema/SchemaTraitFilterIndex.java | 4 +- 5 files changed, 179 insertions(+), 58 deletions(-) diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts b/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts index e38cc41b23a..44a5cf7fa4e 100644 --- a/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts +++ b/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts @@ -1,5 +1,5 @@ import { cbor, CborCodec, dateToTag } from "@smithy/core/cbor"; -import { NormalizedSchema, SCHEMA, struct } from "@smithy/core/schema"; +import { NormalizedSchema, SCHEMA, sim, struct } from "@smithy/core/schema"; import { EventStreamMarshaller } from "@smithy/eventstream-serde-node"; import { HttpResponse } from "@smithy/protocol-http"; import { Message as EventMessage } from "@smithy/types"; @@ -42,9 +42,28 @@ describe(EventStreamSerde.name, () => { "ns", "EventStreamStructure", { streaming: 1 }, - ["A", "B", "C"], + ["A", "B", "C", "Payload", "TextPayload", "CustomHeaders"], // D is omitted to represent an unknown event. - [struct("ns", "A", 0, ["name"], [0]), struct("ns", "B", 0, ["name"], [0]), struct("ns", "C", 0, ["name"], [0])] + [ + struct("ns", "A", 0, ["name"], [0]), + struct("ns", "B", 0, ["name"], [0]), + struct("ns", "C", 0, ["name"], [0]), + struct( + "ns", + "Payload", + 0, + ["payload"], + [sim("ns", "StreamingBlobPayload", SCHEMA.STREAMING_BLOB, { eventPayload: 1 })] + ), + struct("ns", "TextPayload", 0, ["payload"], [sim("ns", "TextPayload", SCHEMA.STRING, { eventPayload: 1 })]), + struct( + "ns", + "CustomHeaders", + 0, + ["header1", "header2"], + [sim("ns", "EventHeader", 0, { eventHeader: 1 }), sim("ns", "EventHeader", 0, { eventHeader: 1 })] + ), + ] ); const eventStreamContainerSchema = struct( @@ -68,6 +87,9 @@ describe(EventStreamSerde.name, () => { yield { B: { name: "b" } }; yield { C: { name: "c" } }; yield { $unknown: ["D", { name: "d" }] }; + yield { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } }; + yield { TextPayload: { payload: "beep boop" } }; + yield { CustomHeaders: { header1: "h1", header2: "h2" } }; }, }; @@ -104,6 +126,32 @@ describe(EventStreamSerde.name, () => { }, body: { name: "d" }, }, + { + headers: { + ":event-type": { type: "string", value: "Payload" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "application/octet-stream" }, + }, + body: new Uint8Array([0, 1, 2, 3, 4, 5, 6]), + }, + { + headers: { + ":event-type": { type: "string", value: "TextPayload" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "text/plain" }, + }, + body: "beep boop", + }, + { + headers: { + ":event-type": { type: "string", value: "CustomHeaders" }, + ":message-type": { type: "string", value: "event" }, + ":content-type": { type: "string", value: "application/cbor" }, + header1: { type: "string", value: "h1" }, + header2: { type: "string", value: "h2" }, + }, + body: {}, + }, ]; /** @@ -188,6 +236,9 @@ describe(EventStreamSerde.name, () => { yield { B: { name: "b" } }; yield { C: { name: "c" } }; yield { D: { name: "d" } }; + yield { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } }; + yield { TextPayload: { payload: "boop beep" } }; + yield { CustomHeaders: { header1: "h1", header2: "h2" } }; }, }; @@ -230,6 +281,9 @@ describe(EventStreamSerde.name, () => { { C: { name: `c` } }, // todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events. // $unknownEvent, + { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } }, + { TextPayload: { payload: "boop beep" } }, + { CustomHeaders: { header1: "h1", header2: "h2" } }, ]); }); @@ -265,6 +319,9 @@ describe(EventStreamSerde.name, () => { { C: { name: `c` } }, // todo(schema) getMessageUnmarshaller.ts must be patched to return unknown events. // $unknownEvent, + { Payload: { payload: new Uint8Array([0, 1, 2, 3, 4, 5, 6]) } }, + { TextPayload: { payload: "boop beep" } }, + { CustomHeaders: { header1: "h1", header2: "h2" } }, ]); expect(initialResponseContainer).toEqual({ diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.ts b/packages/core/src/submodules/eventStreams/EventStreamSerde.ts index 8b8202ab559..13b49841dff 100644 --- a/packages/core/src/submodules/eventStreams/EventStreamSerde.ts +++ b/packages/core/src/submodules/eventStreams/EventStreamSerde.ts @@ -109,65 +109,19 @@ export class EventStreamSerde { Object.keys(event).find((key) => { return key !== "__type"; }) ?? ""; - const eventStreamSchema = memberSchemas[unionMember] ?? NormalizedSchema.of(SCHEMA.DOCUMENT); - - let messageSerialization: string | Uint8Array; - let eventType = unionMember; - - if (eventStreamSchema.isStructSchema()) { - serializer.write(eventStreamSchema, event[unionMember]); - messageSerialization = serializer.flush(); - } else { - // $unknown member - const [type, value] = event[unionMember]; - eventType = type; - serializer.write(NormalizedSchema.of(SCHEMA.DOCUMENT), value); - messageSerialization = serializer.flush(); - } - - const body = - typeof messageSerialization === "string" - ? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization) - : messageSerialization; + const { additionalHeaders, body, eventType, explicitPayloadContentType } = this.writeEventBody( + unionMember, + unionSchema, + event + ); const headers: MessageHeaders = { ":event-type": { type: "string", value: eventType }, ":message-type": { type: "string", value: "event" }, - ":content-type": { type: "string", value: defaultContentType }, + ":content-type": { type: "string", value: explicitPayloadContentType ?? defaultContentType }, + ...additionalHeaders, }; - // additional trait-annotated event headers. - if (eventStreamSchema.isStructSchema()) { - for (const [memberName, memberSchema] of eventStreamSchema.structIterator()) { - const isHeader = !!memberSchema.getMergedTraits().eventHeader; - if (!isHeader) { - continue; - } - const value = event[memberName]; - let type = "binary" as MessageHeaderValue["type"]; - if (memberSchema.isNumericSchema()) { - if ((-2) ** 31 <= value && value <= 2 ** 31 - 1) { - type = "integer"; - } else { - type = "long"; - } - } else if (memberSchema.isTimestampSchema()) { - type = "timestamp"; - } else if (memberSchema.isStringSchema()) { - type = "string"; - } else if (memberSchema.isBooleanSchema()) { - type = "boolean"; - } - - if (isHeader && value != undefined) { - headers[memberName] = { - type, - value, - }; - } - } - } - return { headers, body, @@ -263,4 +217,93 @@ export class EventStreamSerde { }, }; } + + /** + * @param unionMember - member name within the structure that contains an event stream union. + * @param unionSchema - schema of the union. + * @param event + * + * @returns the event body (bytes) and event type (string). + */ + private writeEventBody(unionMember: string, unionSchema: NormalizedSchema, event: any) { + const serializer = this.serializer; + let eventType = unionMember; + let explicitPayloadMember = null as null | string; + let explicitPayloadContentType: undefined | string; + + const isKnownSchema = unionSchema.hasMemberSchema(unionMember); + const additionalHeaders: MessageHeaders = {}; + + if (!isKnownSchema) { + // $unknown member + const [type, value] = event[unionMember]; + eventType = type; + serializer.write(SCHEMA.DOCUMENT, value); + } else { + const eventSchema = unionSchema.getMemberSchema(unionMember); + + if (eventSchema.isStructSchema()) { + for (const [memberName, memberSchema] of eventSchema.structIterator()) { + const { eventHeader, eventPayload } = memberSchema.getMergedTraits(); + + if (eventPayload) { + explicitPayloadMember = memberName; + break; + } else if (eventHeader) { + const value = event[unionMember][memberName]; + let type = "binary" as MessageHeaderValue["type"]; + if (memberSchema.isNumericSchema()) { + if ((-2) ** 31 <= value && value <= 2 ** 31 - 1) { + type = "integer"; + } else { + type = "long"; + } + } else if (memberSchema.isTimestampSchema()) { + type = "timestamp"; + } else if (memberSchema.isStringSchema()) { + type = "string"; + } else if (memberSchema.isBooleanSchema()) { + type = "boolean"; + } + + if (value != null) { + additionalHeaders[memberName] = { + type, + value, + }; + delete event[unionMember][memberName]; + } + } + } + + if (explicitPayloadMember !== null) { + const payloadSchema = eventSchema.getMemberSchema(explicitPayloadMember); + if (payloadSchema.isBlobSchema()) { + explicitPayloadContentType = "application/octet-stream"; + } else if (payloadSchema.isStringSchema()) { + explicitPayloadContentType = "text/plain"; + } + serializer.write(payloadSchema, event[unionMember][explicitPayloadMember]); + } else { + serializer.write(eventSchema, event[unionMember]); + } + } else { + throw new Error("@smithy/core/eventStreams - non-struct member not supported in event stream union."); + } + } + + const messageSerialization: string | Uint8Array = serializer.flush(); + + const body = + typeof messageSerialization === "string" + ? (this.serdeContext?.utf8Decoder ?? fromUtf8)(messageSerialization) + : messageSerialization; + + return { + body, + eventType, + explicitPayloadContentType, + additionalHeaders, + }; + } } diff --git a/packages/core/src/submodules/schema/schemas/NormalizedSchema.spec.ts b/packages/core/src/submodules/schema/schemas/NormalizedSchema.spec.ts index cf38e29e8e4..0ac6744900e 100644 --- a/packages/core/src/submodules/schema/schemas/NormalizedSchema.spec.ts +++ b/packages/core/src/submodules/schema/schemas/NormalizedSchema.spec.ts @@ -173,6 +173,15 @@ describe(NormalizedSchema.name, () => { }); }); + it("can identify whether a member exists", () => { + expect(ns.hasMemberSchema("list")).toBe(true); + expect(ns.hasMemberSchema("map")).toBe(true); + expect(ns.hasMemberSchema("struct")).toBe(true); + expect(ns.hasMemberSchema("a")).toBe(false); + expect(ns.hasMemberSchema("b")).toBe(false); + expect(ns.hasMemberSchema("c")).toBe(false); + }); + describe("iteration", () => { it("iterates over member schemas", () => { const iteration = Array.from(ns.structIterator()); diff --git a/packages/core/src/submodules/schema/schemas/NormalizedSchema.ts b/packages/core/src/submodules/schema/schemas/NormalizedSchema.ts index 7912c4ed338..176a28bbb59 100644 --- a/packages/core/src/submodules/schema/schemas/NormalizedSchema.ts +++ b/packages/core/src/submodules/schema/schemas/NormalizedSchema.ts @@ -373,6 +373,18 @@ export class NormalizedSchema implements INormalizedSchema { throw new Error(`@smithy/core/schema - the schema ${this.getName(true)} does not have a value member.`); } + /** + * @param member - to query. + * @returns whether there is a memberSchema with the given member name. False if not a structure (or union). + */ + public hasMemberSchema(member: string): boolean { + if (this.isStructSchema()) { + const struct = this.getSchema() as StructureSchema; + return member in struct.members; + } + return false; + } + /** * @returns the NormalizedSchema for the given member name. The returned instance will return true for `isMemberSchema()` * and will have the member name given. diff --git a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java index 84fed4c6630..7a687b5e472 100644 --- a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java +++ b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java @@ -74,8 +74,8 @@ final class SchemaTraitFilterIndex implements KnowledgeIndex { ErrorTrait.ID, // set by the ServiceException runtime classes. RequiresLengthTrait.ID, // unhandled - EventHeaderTrait.ID, - EventPayloadTrait.ID, + EventHeaderTrait.ID, // @smithy/core/EventStreams::EventStreamSerde + EventPayloadTrait.ID, // @smithy/core/EventStreams::EventStreamSerde // afaict, HttpErrorTrait is ignored by the client. The discriminator selects the error structure // but the actual HTTP response status code is used with no particular comparison From 133568018defae9614a84bf9b5aafebb5af2cc3a Mon Sep 17 00:00:00 2001 From: George Fu Date: Thu, 28 Aug 2025 13:10:56 -0400 Subject: [PATCH 5/5] rename eventStreams submodule to event-streams --- .../{eventStreams.d.ts => event-streams.d.ts} | 4 ++-- .../core/{eventStreams.js => event-streams.js} | 2 +- packages/core/package.json | 16 ++++++++-------- .../EventStreamSerde.spec.ts | 0 .../EventStreamSerde.ts | 2 +- .../{eventStreams => event-streams}/index.ts | 0 .../src/submodules/protocols/HttpProtocol.ts | 4 ++-- packages/core/tsconfig.cjs.json | 2 +- packages/core/tsconfig.es.json | 2 +- packages/core/tsconfig.types.json | 2 +- .../codegen/schema/SchemaTraitFilterIndex.java | 4 ++-- 11 files changed, 19 insertions(+), 19 deletions(-) rename packages/core/{eventStreams.d.ts => event-streams.d.ts} (50%) rename packages/core/{eventStreams.js => event-streams.js} (62%) rename packages/core/src/submodules/{eventStreams => event-streams}/EventStreamSerde.spec.ts (100%) rename packages/core/src/submodules/{eventStreams => event-streams}/EventStreamSerde.ts (98%) rename packages/core/src/submodules/{eventStreams => event-streams}/index.ts (100%) diff --git a/packages/core/eventStreams.d.ts b/packages/core/event-streams.d.ts similarity index 50% rename from packages/core/eventStreams.d.ts rename to packages/core/event-streams.d.ts index 88f25af8bc7..8637fa8bca0 100644 --- a/packages/core/eventStreams.d.ts +++ b/packages/core/event-streams.d.ts @@ -2,6 +2,6 @@ * Do not edit: * This is a compatibility redirect for contexts that do not understand package.json exports field. */ -declare module "@smithy/core/eventStreams" { - export * from "@smithy/core/dist-types/submodules/eventStreams/index.d"; +declare module "@smithy/core/event-streams" { + export * from "@smithy/core/dist-types/submodules/event-streams/index.d"; } diff --git a/packages/core/eventStreams.js b/packages/core/event-streams.js similarity index 62% rename from packages/core/eventStreams.js rename to packages/core/event-streams.js index f3a53171333..e79eab773b2 100644 --- a/packages/core/eventStreams.js +++ b/packages/core/event-streams.js @@ -3,4 +3,4 @@ * Do not edit: * This is a compatibility redirect for contexts that do not understand package.json exports field. */ -module.exports = require("./dist-cjs/submodules/eventStreams/index.js"); +module.exports = require("./dist-cjs/submodules/event-streams/index.js"); diff --git a/packages/core/package.json b/packages/core/package.json index 1e884bf67db..cf437721c66 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -61,12 +61,12 @@ "import": "./dist-es/submodules/schema/index.js", "require": "./dist-cjs/submodules/schema/index.js" }, - "./eventStreams": { - "types": "./dist-types/submodules/eventStreams/index.d.ts", - "module": "./dist-es/submodules/eventStreams/index.js", - "node": "./dist-cjs/submodules/eventStreams/index.js", - "import": "./dist-es/submodules/eventStreams/index.js", - "require": "./dist-cjs/submodules/eventStreams/index.js" + "./event-streams": { + "types": "./dist-types/submodules/event-streams/index.d.ts", + "module": "./dist-es/submodules/event-streams/index.js", + "node": "./dist-cjs/submodules/event-streams/index.js", + "import": "./dist-es/submodules/event-streams/index.js", + "require": "./dist-cjs/submodules/event-streams/index.js" } }, "author": { @@ -102,8 +102,8 @@ "files": [ "./cbor.d.ts", "./cbor.js", - "./eventStreams.d.ts", - "./eventStreams.js", + "./event-streams.d.ts", + "./event-streams.js", "./protocols.d.ts", "./protocols.js", "./schema.d.ts", diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts b/packages/core/src/submodules/event-streams/EventStreamSerde.spec.ts similarity index 100% rename from packages/core/src/submodules/eventStreams/EventStreamSerde.spec.ts rename to packages/core/src/submodules/event-streams/EventStreamSerde.spec.ts diff --git a/packages/core/src/submodules/eventStreams/EventStreamSerde.ts b/packages/core/src/submodules/event-streams/EventStreamSerde.ts similarity index 98% rename from packages/core/src/submodules/eventStreams/EventStreamSerde.ts rename to packages/core/src/submodules/event-streams/EventStreamSerde.ts index 13b49841dff..9cc3a2f59b9 100644 --- a/packages/core/src/submodules/eventStreams/EventStreamSerde.ts +++ b/packages/core/src/submodules/event-streams/EventStreamSerde.ts @@ -288,7 +288,7 @@ export class EventStreamSerde { serializer.write(eventSchema, event[unionMember]); } } else { - throw new Error("@smithy/core/eventStreams - non-struct member not supported in event stream union."); + throw new Error("@smithy/core/event-streams - non-struct member not supported in event stream union."); } } diff --git a/packages/core/src/submodules/eventStreams/index.ts b/packages/core/src/submodules/event-streams/index.ts similarity index 100% rename from packages/core/src/submodules/eventStreams/index.ts rename to packages/core/src/submodules/event-streams/index.ts diff --git a/packages/core/src/submodules/protocols/HttpProtocol.ts b/packages/core/src/submodules/protocols/HttpProtocol.ts index eddb9717734..ec984c79a68 100644 --- a/packages/core/src/submodules/protocols/HttpProtocol.ts +++ b/packages/core/src/submodules/protocols/HttpProtocol.ts @@ -1,4 +1,4 @@ -import type { EventStreamSerde } from "@smithy/core/eventStreams"; +import type { EventStreamSerde } from "@smithy/core/event-streams"; import { NormalizedSchema } from "@smithy/core/schema"; import { HttpRequest, HttpResponse } from "@smithy/protocol-http"; import type { @@ -193,7 +193,7 @@ export abstract class HttpProtocol implements ClientProtocol { - const { EventStreamSerde } = await import("@smithy/core/eventStreams"); + const { EventStreamSerde } = await import("@smithy/core/event-streams"); return new EventStreamSerde({ marshaller: this.getEventStreamMarshaller(), serializer: this.serializer, diff --git a/packages/core/tsconfig.cjs.json b/packages/core/tsconfig.cjs.json index a59d097b9b4..e9e9ab91f48 100644 --- a/packages/core/tsconfig.cjs.json +++ b/packages/core/tsconfig.cjs.json @@ -8,7 +8,7 @@ "@smithy/core/protocols": ["./src/submodules/protocols/index.ts"], "@smithy/core/serde": ["./src/submodules/serde/index.ts"], "@smithy/core/schema": ["./src/submodules/schema/index.ts"], - "@smithy/core/eventStreams": ["./src/submodules/eventStreams/index.ts"] + "@smithy/core/event-streams": ["./src/submodules/event-streams/index.ts"] } }, "extends": "../../tsconfig.cjs.json", diff --git a/packages/core/tsconfig.es.json b/packages/core/tsconfig.es.json index 6e3f93fcbbc..8cab10dd9fb 100644 --- a/packages/core/tsconfig.es.json +++ b/packages/core/tsconfig.es.json @@ -9,7 +9,7 @@ "@smithy/core/protocols": ["./src/submodules/protocols/index.ts"], "@smithy/core/serde": ["./src/submodules/serde/index.ts"], "@smithy/core/schema": ["./src/submodules/schema/index.ts"], - "@smithy/core/eventStreams": ["./src/submodules/eventStreams/index.ts"] + "@smithy/core/event-streams": ["./src/submodules/event-streams/index.ts"] } }, "extends": "../../tsconfig.es.json", diff --git a/packages/core/tsconfig.types.json b/packages/core/tsconfig.types.json index 4b20a12fb3f..14f10909722 100644 --- a/packages/core/tsconfig.types.json +++ b/packages/core/tsconfig.types.json @@ -8,7 +8,7 @@ "@smithy/core/protocols": ["./src/submodules/protocols/index.ts"], "@smithy/core/serde": ["./src/submodules/serde/index.ts"], "@smithy/core/schema": ["./src/submodules/schema/index.ts"], - "@smithy/core/eventStreams": ["./src/submodules/eventStreams/index.ts"] + "@smithy/core/event-streams": ["./src/submodules/event-streams/index.ts"] } }, "extends": "../../tsconfig.types.json", diff --git a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java index 7a687b5e472..a9d5d6c6832 100644 --- a/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java +++ b/smithy-typescript-codegen/src/main/java/software/amazon/smithy/typescript/codegen/schema/SchemaTraitFilterIndex.java @@ -74,8 +74,8 @@ final class SchemaTraitFilterIndex implements KnowledgeIndex { ErrorTrait.ID, // set by the ServiceException runtime classes. RequiresLengthTrait.ID, // unhandled - EventHeaderTrait.ID, // @smithy/core/EventStreams::EventStreamSerde - EventPayloadTrait.ID, // @smithy/core/EventStreams::EventStreamSerde + EventHeaderTrait.ID, // @smithy/core/event-streams::EventStreamSerde + EventPayloadTrait.ID, // @smithy/core/event-streams::EventStreamSerde // afaict, HttpErrorTrait is ignored by the client. The discriminator selects the error structure // but the actual HTTP response status code is used with no particular comparison