From 13c20255e0984b03b893343018a7f3110f8f0e9b Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 13:13:43 +0200 Subject: [PATCH 1/9] cleanup --- UPGRADING.md | 2 +- packages/core/test/queues/HandlerContainer.spec.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/UPGRADING.md b/UPGRADING.md index f30d5c0c..d87db8d7 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -6,7 +6,7 @@ - **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead. -- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes. +- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically. ### Migration Steps diff --git a/packages/core/test/queues/HandlerContainer.spec.ts b/packages/core/test/queues/HandlerContainer.spec.ts index 2d073acf..fd3f1752 100644 --- a/packages/core/test/queues/HandlerContainer.spec.ts +++ b/packages/core/test/queues/HandlerContainer.spec.ts @@ -160,7 +160,7 @@ describe('HandlerContainer', () => { }) describe('resolveMessageType', () => { - describe('with messageTypeField (legacy)', () => { + describe('with messageTypePath resolver', () => { it('should extract message type from the specified field', () => { const configs = new MessageHandlerConfigBuilder() .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) @@ -280,7 +280,7 @@ describe('HandlerContainer', () => { }) describe('with no configuration', () => { - it('should throw error when neither messageTypeField nor messageTypeResolver is configured', () => { + it('should throw error when messageTypeResolver is not configured', () => { const configs = new MessageHandlerConfigBuilder() .addConfig( USER_MESSAGE_SCHEMA, From 12ae8176ee389a7ba352a7192761da591397c5a9 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 13:29:55 +0200 Subject: [PATCH 2/9] improve handling of edge cases --- packages/core/lib/queues/HandlerContainer.ts | 6 ++--- .../core/lib/queues/MessageSchemaContainer.ts | 26 ++++++++++++++++++- .../core/lib/queues/MessageTypeResolver.ts | 15 +++++++++-- .../core/test/queues/HandlerContainer.spec.ts | 6 ++--- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/packages/core/lib/queues/HandlerContainer.ts b/packages/core/lib/queues/HandlerContainer.ts index e5512bc9..cbfb8c0a 100644 --- a/packages/core/lib/queues/HandlerContainer.ts +++ b/packages/core/lib/queues/HandlerContainer.ts @@ -360,9 +360,9 @@ export class HandlerContainer< if (!messageType) { throw new Error( - 'Unable to determine message type for handler. ' + - 'Either provide messageType in handler options, use a literal resolver, ' + - 'or ensure the schema has a literal type field matching messageTypePath.', + 'Unable to determine message type for handler at registration time. ' + + 'Either provide explicit messageType in handler options (required for custom resolver functions), ' + + 'use a literal resolver, or ensure the schema has a literal type field matching messageTypePath.', ) } diff --git a/packages/core/lib/queues/MessageSchemaContainer.ts b/packages/core/lib/queues/MessageSchemaContainer.ts index bfc57d12..8980000f 100644 --- a/packages/core/lib/queues/MessageSchemaContainer.ts +++ b/packages/core/lib/queues/MessageSchemaContainer.ts @@ -5,6 +5,7 @@ import { extractMessageTypeFromSchema, isMessageTypeLiteralConfig, isMessageTypePathConfig, + isMessageTypeResolverFnConfig, type MessageTypeResolverConfig, type MessageTypeResolverContext, resolveMessageType, @@ -45,7 +46,12 @@ export class MessageSchemaContainer { message: Record, attributes?: Record, ): Either> { - const messageType = this.resolveMessageTypeFromData(message, attributes) + let messageType: string | undefined + try { + messageType = this.resolveMessageTypeFromData(message, attributes) + } catch (e) { + return { error: e instanceof Error ? e : new Error(String(e)) } + } const schema = this.messageSchemas[messageType ?? DEFAULT_SCHEMA_KEY] if (!schema) { @@ -103,6 +109,24 @@ export class MessageSchemaContainer { const literalType = this.getLiteralMessageType() const messageTypePath = this.getMessageTypePathForSchema() + // Validate: custom resolver function cannot be used with multiple schemas. + // The resolver works fine for runtime type resolution (when messages arrive), + // but at registration time we need to map each schema to its message type. + // With a custom resolver, we can't know what types it will return until runtime, + // so we can't build the type→schema lookup map for multiple schemas. + if ( + this.messageTypeResolver && + isMessageTypeResolverFnConfig(this.messageTypeResolver) && + array.length > 1 + ) { + throw new Error( + 'Custom resolver function cannot be used with multiple schemas. ' + + 'The resolver works for runtime type resolution, but at registration time ' + + 'we cannot determine which schema corresponds to which type. ' + + 'Use messageTypePath config (to extract types from schema literals) or register only a single schema.', + ) + } + for (const item of array) { let type: string | undefined diff --git a/packages/core/lib/queues/MessageTypeResolver.ts b/packages/core/lib/queues/MessageTypeResolver.ts index afdde16a..93269a33 100644 --- a/packages/core/lib/queues/MessageTypeResolver.ts +++ b/packages/core/lib/queues/MessageTypeResolver.ts @@ -180,7 +180,7 @@ export function resolveMessageType( * * @param schema - Zod schema with shape property * @param messageTypePath - Field name containing the type literal - * @returns The literal type value from the schema, or undefined + * @returns The literal type value from the schema, or undefined if field doesn't exist or isn't a literal */ export function extractMessageTypeFromSchema( // biome-ignore lint/suspicious/noExplicitAny: Schema shape can be any @@ -190,5 +190,16 @@ export function extractMessageTypeFromSchema( if (!messageTypePath) { return undefined } - return schema.shape?.[messageTypePath]?.value as string | undefined + + const field = schema.shape?.[messageTypePath] + if (!field) { + return undefined + } + + // Check if the field has a literal value (z.literal() creates a field with .value) + if (!('value' in field)) { + return undefined + } + + return field.value as string | undefined } diff --git a/packages/core/test/queues/HandlerContainer.spec.ts b/packages/core/test/queues/HandlerContainer.spec.ts index fd3f1752..bcc46b02 100644 --- a/packages/core/test/queues/HandlerContainer.spec.ts +++ b/packages/core/test/queues/HandlerContainer.spec.ts @@ -311,9 +311,9 @@ describe('HandlerContainer', () => { messageHandlers: configs, }), ).toThrow( - 'Unable to determine message type for handler. ' + - 'Either provide messageType in handler options, use a literal resolver, ' + - 'or ensure the schema has a literal type field matching messageTypePath.', + 'Unable to determine message type for handler at registration time. ' + + 'Either provide explicit messageType in handler options (required for custom resolver functions), ' + + 'use a literal resolver, or ensure the schema has a literal type field matching messageTypePath.', ) }) From 4245042806bfedfcf9d9af5eeee302d4e6a6b490 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 13:34:47 +0200 Subject: [PATCH 3/9] More improvements --- .../core/lib/queues/MessageSchemaContainer.ts | 77 +++++---- .../queues/MessageSchemaContainer.spec.ts | 161 ++++++++++++++++++ 2 files changed, 207 insertions(+), 31 deletions(-) create mode 100644 packages/core/test/queues/MessageSchemaContainer.spec.ts diff --git a/packages/core/lib/queues/MessageSchemaContainer.ts b/packages/core/lib/queues/MessageSchemaContainer.ts index 8980000f..cb95aa31 100644 --- a/packages/core/lib/queues/MessageSchemaContainer.ts +++ b/packages/core/lib/queues/MessageSchemaContainer.ts @@ -46,19 +46,28 @@ export class MessageSchemaContainer { message: Record, attributes?: Record, ): Either> { - let messageType: string | undefined + // If no resolver configured, use the single default schema + if (!this.messageTypeResolver) { + const schema = this.messageSchemas[DEFAULT_SCHEMA_KEY] + if (!schema) { + return { + error: new Error('No messageTypeResolver configured and no default schema available'), + } + } + return { result: schema } + } + + let messageType: string try { messageType = this.resolveMessageTypeFromData(message, attributes) } catch (e) { return { error: e instanceof Error ? e : new Error(String(e)) } } - const schema = this.messageSchemas[messageType ?? DEFAULT_SCHEMA_KEY] + const schema = this.messageSchemas[messageType] if (!schema) { return { - error: new Error( - `Unsupported message type: ${messageType ?? DEFAULT_SCHEMA_KEY.toString()}`, - ), + error: new Error(`Unsupported message type: ${messageType}`), } } return { result: schema } @@ -66,17 +75,16 @@ export class MessageSchemaContainer { /** * Resolves message type from message data and optional attributes. + * Only called when messageTypeResolver is configured. */ private resolveMessageTypeFromData( messageData: unknown, messageAttributes?: Record, - ): string | undefined { - if (this.messageTypeResolver) { - const context: MessageTypeResolverContext = { messageData, messageAttributes } - return resolveMessageType(this.messageTypeResolver, context) - } - - return undefined + ): string { + // This method is only called after checking messageTypeResolver exists in resolveSchema + const resolver = this.messageTypeResolver as MessageTypeResolverConfig + const context: MessageTypeResolverContext = { messageData, messageAttributes } + return resolveMessageType(resolver, context) } /** @@ -101,24 +109,21 @@ export class MessageSchemaContainer { return undefined } - private resolveMap>( - array: readonly T[], - ): Record { - const result: Record = {} - - const literalType = this.getLiteralMessageType() - const messageTypePath = this.getMessageTypePathForSchema() + /** + * Validates that multiple schemas can be properly mapped at registration time. + */ + private validateMultipleSchemas(schemaCount: number): void { + if (schemaCount <= 1) return - // Validate: custom resolver function cannot be used with multiple schemas. - // The resolver works fine for runtime type resolution (when messages arrive), - // but at registration time we need to map each schema to its message type. - // With a custom resolver, we can't know what types it will return until runtime, - // so we can't build the type→schema lookup map for multiple schemas. - if ( - this.messageTypeResolver && - isMessageTypeResolverFnConfig(this.messageTypeResolver) && - array.length > 1 - ) { + if (!this.messageTypeResolver) { + throw new Error( + 'Multiple schemas require messageTypeResolver to be configured. ' + + 'Use messageTypePath config (to extract types from schema literals) or literal config.', + ) + } + // Custom resolver function cannot be used with multiple schemas because + // we can't know what types it will return until runtime. + if (isMessageTypeResolverFnConfig(this.messageTypeResolver)) { throw new Error( 'Custom resolver function cannot be used with multiple schemas. ' + 'The resolver works for runtime type resolution, but at registration time ' + @@ -126,6 +131,17 @@ export class MessageSchemaContainer { 'Use messageTypePath config (to extract types from schema literals) or register only a single schema.', ) } + } + + private resolveMap>( + array: readonly T[], + ): Record { + const result: Record = {} + + this.validateMultipleSchemas(array.length) + + const literalType = this.getLiteralMessageType() + const messageTypePath = this.getMessageTypePathForSchema() for (const item of array) { let type: string | undefined @@ -141,8 +157,7 @@ export class MessageSchemaContainer { : // @ts-expect-error - ZodSchema has shape property at runtime extractMessageTypeFromSchema(item, messageTypePath) } - // For custom resolver without field path, we can't extract from schema - // All schemas will use DEFAULT_SCHEMA_KEY + // For single schema without resolver, use DEFAULT_SCHEMA_KEY const key = type ?? DEFAULT_SCHEMA_KEY if (result[key]) throw new Error(`Duplicate schema for type: ${key.toString()}`) diff --git a/packages/core/test/queues/MessageSchemaContainer.spec.ts b/packages/core/test/queues/MessageSchemaContainer.spec.ts new file mode 100644 index 00000000..41f96340 --- /dev/null +++ b/packages/core/test/queues/MessageSchemaContainer.spec.ts @@ -0,0 +1,161 @@ +import { describe, expect, it } from 'vitest' +import z from 'zod/v4' + +import { MessageSchemaContainer } from '../../lib/queues/MessageSchemaContainer.ts' + +const MESSAGE_SCHEMA_A = z.object({ + type: z.literal('message.a'), + payload: z.string(), +}) + +const MESSAGE_SCHEMA_B = z.object({ + type: z.literal('message.b'), + payload: z.number(), +}) + +const MESSAGE_SCHEMA_NO_TYPE = z.object({ + payload: z.string(), +}) + +type MessageA = z.infer +type MessageB = z.infer + +describe('MessageSchemaContainer', () => { + describe('resolveSchema', () => { + it('should resolve schema using messageTypePath', () => { + const container = new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B], + messageDefinitions: [], + messageTypeResolver: { messageTypePath: 'type' }, + }) + + const resultA = container.resolveSchema({ type: 'message.a', payload: 'test' }) + expect('result' in resultA).toBe(true) + + const resultB = container.resolveSchema({ type: 'message.b', payload: 123 }) + expect('result' in resultB).toBe(true) + }) + + it('should return error for unknown message type', () => { + const container = new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A], + messageDefinitions: [], + messageTypeResolver: { messageTypePath: 'type' }, + }) + + const result = container.resolveSchema({ type: 'unknown.type', payload: 'test' }) + expect('error' in result).toBe(true) + if ('error' in result && result.error) { + expect(result.error.message).toContain('Unsupported message type: unknown.type') + } + }) + + it('should catch resolver errors and return as Either error', () => { + const container = new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A], + messageDefinitions: [], + messageTypeResolver: { + resolver: () => { + throw new Error('Resolver failed') + }, + }, + }) + + const result = container.resolveSchema({ payload: 'test' }) + expect('error' in result).toBe(true) + if ('error' in result && result.error) { + expect(result.error.message).toBe('Resolver failed') + } + }) + + it('should resolve schema using literal type', () => { + const container = new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A], + messageDefinitions: [], + messageTypeResolver: { literal: 'message.a' }, + }) + + // Any message resolves to the single schema + const result = container.resolveSchema({ anything: 'works' }) + expect('result' in result).toBe(true) + }) + + it('should resolve schema using attributes with custom resolver', () => { + // Custom resolver extracts type from attributes for runtime resolution. + // For schema registration, we use messageTypePath to map schemas to types. + const container = new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B], + messageDefinitions: [], + messageTypeResolver: { messageTypePath: 'type' }, + }) + + // At runtime, we can still use attributes if needed (though this example uses data) + const result = container.resolveSchema({ type: 'message.a', payload: 'test' }) + expect('result' in result).toBe(true) + }) + }) + + describe('registration validation', () => { + it('should throw error when custom resolver is used with multiple schemas', () => { + expect( + () => + new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B], + messageDefinitions: [], + messageTypeResolver: { + resolver: () => 'some.type', + }, + }), + ).toThrow( + 'Custom resolver function cannot be used with multiple schemas. ' + + 'The resolver works for runtime type resolution, but at registration time ' + + 'we cannot determine which schema corresponds to which type. ' + + 'Use messageTypePath config (to extract types from schema literals) or register only a single schema.', + ) + }) + + it('should allow custom resolver with single schema', () => { + expect( + () => + new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A], + messageDefinitions: [], + messageTypeResolver: { + resolver: () => 'message.a', + }, + }), + ).not.toThrow() + }) + + it('should throw error for duplicate schema types', () => { + const DUPLICATE_SCHEMA = z.object({ + type: z.literal('message.a'), // Same type as MESSAGE_SCHEMA_A + payload: z.string(), // Same structure to satisfy type + }) + + expect( + () => + new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_A, DUPLICATE_SCHEMA], + messageDefinitions: [], + messageTypeResolver: { messageTypePath: 'type' }, + }), + ).toThrow('Duplicate schema for type: message.a') + }) + + it('should handle schemas without literal type field gracefully', () => { + // When schema doesn't have the expected literal field, it falls back to DEFAULT_SCHEMA_KEY + // With a single schema, this works fine + const container = new MessageSchemaContainer({ + messageSchemas: [MESSAGE_SCHEMA_NO_TYPE], + messageDefinitions: [], + messageTypeResolver: { messageTypePath: 'type' }, + }) + + // Since there's no 'type' field in schema, it uses default key + // Any message will fail to match since we're looking for a specific type + const result = container.resolveSchema({ type: 'any.type', payload: 'test' }) + expect('error' in result).toBe(true) + }) + }) +}) From cddb419e4473084599439704b3ceb770fbb6ab8e Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 13:35:51 +0200 Subject: [PATCH 4/9] More improvements --- UPGRADING.md | 4 ++++ packages/core/vitest.config.ts | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/UPGRADING.md b/UPGRADING.md index d87db8d7..25e4ba98 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -45,6 +45,10 @@ super(dependencies, { - **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time. +- **Custom resolver validation**: Custom resolver functions (`{ resolver: fn }`) cannot be used with multiple schemas in `MessageSchemaContainer`. This is because the resolver works at runtime, but at registration time the container needs to map schemas to types. Use `messageTypePath` for multiple schemas, or register only a single schema with a custom resolver. + +- **Improved error handling in `MessageSchemaContainer.resolveSchema`**: The method now properly catches resolver errors and returns them as `Either` instead of throwing. This ensures consistent error handling regardless of resolver configuration. + ### Migration Steps #### If using `NO_MESSAGE_TYPE_FIELD` diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts index 66e288da..bfdcbc8b 100644 --- a/packages/core/vitest.config.ts +++ b/packages/core/vitest.config.ts @@ -16,10 +16,10 @@ export default defineConfig({ include: ['lib/**/*.ts'], exclude: ['vitest.config.ts', 'lib/**/index.ts'], thresholds: { - lines: 35, + lines: 42, functions: 80, branches: 85, - statements: 35, + statements: 42, }, }, }, From 4205603b556030b16b9e5b8de222f18d6b38b5c7 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 14:21:30 +0200 Subject: [PATCH 5/9] Make handling of MessageSchemaContainer more cons --- packages/amqp/test/utils/testContext.ts | 10 +- packages/core/README.md | 10 +- packages/core/lib/index.ts | 6 +- .../core/lib/queues/AbstractQueueService.ts | 18 ++-- .../core/lib/queues/MessageSchemaContainer.ts | 96 +++++++++++++++---- .../core/lib/queues/MessageTypeResolver.ts | 47 ++++++--- packages/core/package.json | 1 + .../core/test/queues/HandlerContainer.spec.ts | 2 +- .../queues/MessageSchemaContainer.spec.ts | 64 ++++++++----- .../test/queues/MessageTypeResolver.spec.ts | 90 ++++++++++++++++- packages/kafka/lib/AbstractKafkaPublisher.ts | 2 +- 11 files changed, 266 insertions(+), 80 deletions(-) diff --git a/packages/amqp/test/utils/testContext.ts b/packages/amqp/test/utils/testContext.ts index 717d72d4..c72f44ce 100644 --- a/packages/amqp/test/utils/testContext.ts +++ b/packages/amqp/test/utils/testContext.ts @@ -1,4 +1,9 @@ -import type { CommonLogger, ErrorReporter, ErrorResolver } from '@lokalise/node-core' +import { + type CommonLogger, + type ErrorReporter, + type ErrorResolver, + globalLogger, +} from '@lokalise/node-core' import type { MessageMetricsManager, TransactionObservabilityManager, @@ -11,7 +16,6 @@ import { import type { NameAndRegistrationPair } from 'awilix' import { asClass, asFunction, createContainer, Lifetime } from 'awilix' import { AwilixManager } from 'awilix-manager' -import pino from 'pino' import { z } from 'zod/v4' import { AmqpConnectionManager } from '../../lib/AmqpConnectionManager.ts' import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager.ts' @@ -83,7 +87,7 @@ export const TestEvents = { export type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][] export type TestEventPublishPayloadsType = z.output -const TestLogger: CommonLogger = pino() +const TestLogger: CommonLogger = globalLogger export async function registerDependencies( config: AmqpConfig, diff --git a/packages/core/README.md b/packages/core/README.md index eef51c71..015b05da 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -533,11 +533,17 @@ Manages Zod schemas and validates messages: import { MessageSchemaContainer } from '@message-queue-toolkit/core' const container = new MessageSchemaContainer({ - messageSchemas: [Schema1, Schema2], + messageSchemas: [{ schema: Schema1 }, { schema: Schema2 }], + messageDefinitions: [], messageTypeResolver: { messageTypePath: 'type' }, }) -const schema = container.resolveSchema(message.type) +const result = container.resolveSchema(message) +if ('error' in result) { + // Handle error +} else { + const schema = result.result +} ``` ### AbstractPublisherManager diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index eb778624..4fd464cd 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -72,7 +72,11 @@ export { type SpyResultInput, TYPE_NOT_RESOLVED, } from './queues/HandlerSpy.ts' -export type { MessageSchemaContainerOptions } from './queues/MessageSchemaContainer.ts' +export type { + DefinitionEntry, + MessageSchemaContainerOptions, + SchemaEntry, +} from './queues/MessageSchemaContainer.ts' export { MessageSchemaContainer } from './queues/MessageSchemaContainer.ts' export type { MessageTypeResolverConfig, diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 29960ccd..3b09e2b0 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -8,7 +8,6 @@ import { stringValueSerializer, } from '@lokalise/node-core' import type { MakeRequired } from '@lokalise/universal-ts-utils/node' -import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' import { MESSAGE_DEDUPLICATION_OPTIONS_SCHEMA, type MessageDeduplicationOptions, @@ -177,10 +176,17 @@ export abstract class AbstractQueueService< handlers: MessageHandlerConfig[] messageTypeResolver?: MessageTypeResolverConfig }) { - const messageSchemas = options.handlers.map((entry) => entry.schema) - const messageDefinitions: CommonEventDefinition[] = options.handlers - .map((entry) => entry.definition) - .filter((entry) => entry !== undefined) + const messageSchemas = options.handlers.map((entry) => ({ + schema: entry.schema, + messageType: entry.messageType, + })) + const messageDefinitions = options.handlers + .filter((entry) => entry.definition !== undefined) + .map((entry) => ({ + // biome-ignore lint/style/noNonNullAssertion: filtered above + definition: entry.definition!, + messageType: entry.messageType, + })) return new MessageSchemaContainer({ messageTypeResolver: options.messageTypeResolver, @@ -193,7 +199,7 @@ export abstract class AbstractQueueService< messageSchemas: readonly ZodSchema[] messageTypeResolver?: MessageTypeResolverConfig }) { - const messageSchemas = options.messageSchemas + const messageSchemas = options.messageSchemas.map((schema) => ({ schema })) return new MessageSchemaContainer({ messageTypeResolver: options.messageTypeResolver, diff --git a/packages/core/lib/queues/MessageSchemaContainer.ts b/packages/core/lib/queues/MessageSchemaContainer.ts index cb95aa31..7d103aed 100644 --- a/packages/core/lib/queues/MessageSchemaContainer.ts +++ b/packages/core/lib/queues/MessageSchemaContainer.ts @@ -11,9 +11,27 @@ import { resolveMessageType, } from './MessageTypeResolver.ts' +export type SchemaEntry = { + schema: ZodSchema + /** + * Explicit message type for this schema. + * Required when using a custom resolver function. + */ + messageType?: string +} + +export type DefinitionEntry = { + definition: CommonEventDefinition + /** + * Explicit message type for this definition. + * Required when using a custom resolver function. + */ + messageType?: string +} + export type MessageSchemaContainerOptions = { - messageDefinitions: readonly CommonEventDefinition[] - messageSchemas: readonly ZodSchema[] + messageDefinitions: readonly DefinitionEntry[] + messageSchemas: readonly SchemaEntry[] /** * Configuration for resolving message types. */ @@ -30,8 +48,8 @@ export class MessageSchemaContainer { constructor(options: MessageSchemaContainerOptions) { this.messageTypeResolver = options.messageTypeResolver - this.messageSchemas = this.resolveMap(options.messageSchemas) - this.messageDefinitions = this.resolveMap(options.messageDefinitions ?? []) + this.messageSchemas = this.resolveSchemaMap(options.messageSchemas) + this.messageDefinitions = this.resolveDefinitionMap(options.messageDefinitions ?? []) } /** @@ -133,36 +151,72 @@ export class MessageSchemaContainer { } } - private resolveMap>( - array: readonly T[], - ): Record { - const result: Record = {} + private resolveSchemaMap( + entries: readonly SchemaEntry[], + ): Record> { + const result: Record> = {} - this.validateMultipleSchemas(array.length) + this.validateMultipleSchemas(entries.length) const literalType = this.getLiteralMessageType() const messageTypePath = this.getMessageTypePathForSchema() - for (const item of array) { + for (const entry of entries) { let type: string | undefined - // If literal type is configured, use it for all schemas - if (literalType) { + // Priority 1: Explicit messageType on the entry + if (entry.messageType) { + type = entry.messageType + } + // Priority 2: Literal type from resolver config (same for all schemas) + else if (literalType) { type = literalType - } else if (messageTypePath) { - // Extract type from schema shape using the field path - type = - 'publisherSchema' in item - ? extractMessageTypeFromSchema(item.publisherSchema, messageTypePath) - : // @ts-expect-error - ZodSchema has shape property at runtime - extractMessageTypeFromSchema(item, messageTypePath) } - // For single schema without resolver, use DEFAULT_SCHEMA_KEY + // Priority 3: Extract type from schema shape using the field path + else if (messageTypePath) { + // @ts-expect-error - ZodSchema has shape property at runtime + type = extractMessageTypeFromSchema(entry.schema, messageTypePath) + } + // If no type extracted, use DEFAULT_SCHEMA_KEY (single schema fallback) const key = type ?? DEFAULT_SCHEMA_KEY if (result[key]) throw new Error(`Duplicate schema for type: ${key.toString()}`) - result[key] = item + result[key] = entry.schema + } + + return result + } + + private resolveDefinitionMap( + entries: readonly DefinitionEntry[], + ): Record { + const result: Record = {} + + const literalType = this.getLiteralMessageType() + const messageTypePath = this.getMessageTypePathForSchema() + + for (const entry of entries) { + let type: string | undefined + + // Priority 1: Explicit messageType on the entry + if (entry.messageType) { + type = entry.messageType + } + // Priority 2: Literal type from resolver config (same for all definitions) + else if (literalType) { + type = literalType + } + // Priority 3: Extract type from definition's publisherSchema using the field path + else if (messageTypePath) { + type = extractMessageTypeFromSchema(entry.definition.publisherSchema, messageTypePath) + } + // If no type extracted, use DEFAULT_SCHEMA_KEY (single definition fallback) + + const key = type ?? DEFAULT_SCHEMA_KEY + if (result[key]) throw new Error(`Duplicate definition for type: ${key.toString()}`) + + result[key] = entry.definition } return result diff --git a/packages/core/lib/queues/MessageTypeResolver.ts b/packages/core/lib/queues/MessageTypeResolver.ts index 93269a33..bd20ea70 100644 --- a/packages/core/lib/queues/MessageTypeResolver.ts +++ b/packages/core/lib/queues/MessageTypeResolver.ts @@ -1,3 +1,5 @@ +import { getProperty } from 'dot-prop' + /** * Context passed to a custom message type resolver function. * Contains both the parsed message data and any message-level attributes/metadata. @@ -47,16 +49,18 @@ export type MessageTypeResolverFn = (context: MessageTypeResolverContext) => str * * Three modes are supported: * - * 1. **Field path** (string): Extract type from a field at the root of the message. - * @example { messageTypePath: 'type' } // extracts type from message.type + * 1. **Field path** (string): Extract type from a field in the message using dot notation. + * Supports nested paths like 'metadata.type' or 'detail.eventType'. + * @example { messageTypePath: 'type' } // extracts from message.type * @example { messageTypePath: 'detail-type' } // for EventBridge events + * @example { messageTypePath: 'metadata.eventType' } // nested path * * 2. **Constant type** (object with `literal`): All messages are treated as the same type. * Useful when a subscription/queue only receives one type of message. * @example { literal: 'order.created' } * * 3. **Custom resolver** (object with `resolver`): Full flexibility via callback function. - * Use when type needs to be extracted from attributes, nested fields, or requires transformation. + * Use when type needs to be extracted from attributes or requires transformation. * @example * { * resolver: ({ messageAttributes }) => messageAttributes?.eventType as string @@ -97,7 +101,8 @@ export type MessageTypeResolverFn = (context: MessageTypeResolverContext) => str export type MessageTypeResolverConfig = | { /** - * Field name at the root of the message containing the message type. + * Path to the field containing the message type. + * Supports dot notation for nested paths (e.g., 'metadata.type', 'detail.eventType'). */ messageTypePath: string } @@ -160,11 +165,13 @@ export function resolveMessageType( } if (isMessageTypePathConfig(config)) { - const data = context.messageData as Record | undefined - const messageType = data?.[config.messageTypePath] as string | undefined - if (messageType === undefined) { + const messageType = getProperty(context.messageData, config.messageTypePath) as + | string + | undefined + | null + if (messageType === undefined || messageType === null) { throw new Error( - `Unable to resolve message type: field '${config.messageTypePath}' not found in message data`, + `Unable to resolve message type: path '${config.messageTypePath}' not found in message data`, ) } return messageType @@ -177,9 +184,10 @@ export function resolveMessageType( /** * Extracts message type from schema definition using the field path. * Used during handler/schema registration to build the routing map. + * Supports dot notation for nested paths (e.g., 'metadata.type'). * * @param schema - Zod schema with shape property - * @param messageTypePath - Field name containing the type literal + * @param messageTypePath - Path to the field containing the type literal (supports dot notation) * @returns The literal type value from the schema, or undefined if field doesn't exist or isn't a literal */ export function extractMessageTypeFromSchema( @@ -191,15 +199,24 @@ export function extractMessageTypeFromSchema( return undefined } - const field = schema.shape?.[messageTypePath] - if (!field) { - return undefined + const pathParts = messageTypePath.split('.') + // biome-ignore lint/suspicious/noExplicitAny: Schema shape can be any + let current: any = schema + + for (const part of pathParts) { + if (!current?.shape) { + return undefined + } + current = current.shape[part] + if (!current) { + return undefined + } } - // Check if the field has a literal value (z.literal() creates a field with .value) - if (!('value' in field)) { + // Check if the final field has a literal value (z.literal() creates a field with .value) + if (!('value' in current)) { return undefined } - return field.value as string | undefined + return current.value as string | undefined } diff --git a/packages/core/package.json b/packages/core/package.json index 5e689643..0b8ceb41 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -28,6 +28,7 @@ "@lokalise/node-core": "^14.2.0", "@lokalise/universal-ts-utils": "^4.5.1", "@message-queue-toolkit/schemas": "^7.0.0", + "dot-prop": "^10.1.0", "fast-equals": "^5.2.2", "json-stream-stringify": "^3.1.6", "tmp": "^0.2.3", diff --git a/packages/core/test/queues/HandlerContainer.spec.ts b/packages/core/test/queues/HandlerContainer.spec.ts index bcc46b02..fad743cf 100644 --- a/packages/core/test/queues/HandlerContainer.spec.ts +++ b/packages/core/test/queues/HandlerContainer.spec.ts @@ -190,7 +190,7 @@ describe('HandlerContainer', () => { }) expect(() => container.resolveMessageType({ userId: '1' })).toThrow( - "Unable to resolve message type: field 'type' not found in message data", + "Unable to resolve message type: path 'type' not found in message data", ) }) }) diff --git a/packages/core/test/queues/MessageSchemaContainer.spec.ts b/packages/core/test/queues/MessageSchemaContainer.spec.ts index 41f96340..0e56799f 100644 --- a/packages/core/test/queues/MessageSchemaContainer.spec.ts +++ b/packages/core/test/queues/MessageSchemaContainer.spec.ts @@ -24,7 +24,7 @@ describe('MessageSchemaContainer', () => { describe('resolveSchema', () => { it('should resolve schema using messageTypePath', () => { const container = new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B], + messageSchemas: [{ schema: MESSAGE_SCHEMA_A }, { schema: MESSAGE_SCHEMA_B }], messageDefinitions: [], messageTypeResolver: { messageTypePath: 'type' }, }) @@ -38,7 +38,7 @@ describe('MessageSchemaContainer', () => { it('should return error for unknown message type', () => { const container = new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A], + messageSchemas: [{ schema: MESSAGE_SCHEMA_A }], messageDefinitions: [], messageTypeResolver: { messageTypePath: 'type' }, }) @@ -50,27 +50,24 @@ describe('MessageSchemaContainer', () => { } }) - it('should catch resolver errors and return as Either error', () => { + it('should catch messageTypePath errors and return as Either error', () => { const container = new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A], + messageSchemas: [{ schema: MESSAGE_SCHEMA_A }], messageDefinitions: [], - messageTypeResolver: { - resolver: () => { - throw new Error('Resolver failed') - }, - }, + messageTypeResolver: { messageTypePath: 'type' }, }) + // Missing 'type' field should result in an error const result = container.resolveSchema({ payload: 'test' }) expect('error' in result).toBe(true) if ('error' in result && result.error) { - expect(result.error.message).toBe('Resolver failed') + expect(result.error.message).toContain("path 'type' not found") } }) it('should resolve schema using literal type', () => { const container = new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A], + messageSchemas: [{ schema: MESSAGE_SCHEMA_A }], messageDefinitions: [], messageTypeResolver: { literal: 'message.a' }, }) @@ -80,27 +77,42 @@ describe('MessageSchemaContainer', () => { expect('result' in result).toBe(true) }) - it('should resolve schema using attributes with custom resolver', () => { - // Custom resolver extracts type from attributes for runtime resolution. - // For schema registration, we use messageTypePath to map schemas to types. - const container = new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B], + it('should resolve schema with custom resolver and explicit messageType', () => { + // Custom resolver validates the message type from attributes. + // With explicit messageType, schema is mapped correctly at registration time. + const container = new MessageSchemaContainer({ + messageSchemas: [{ schema: MESSAGE_SCHEMA_A, messageType: 'message.a' }], messageDefinitions: [], - messageTypeResolver: { messageTypePath: 'type' }, + messageTypeResolver: { + resolver: ({ messageAttributes }) => { + const t = messageAttributes?.type + if (t !== 'message.a') { + throw new Error(`Unsupported type: ${t}`) + } + return t + }, + }, }) - // At runtime, we can still use attributes if needed (though this example uses data) - const result = container.resolveSchema({ type: 'message.a', payload: 'test' }) - expect('result' in result).toBe(true) + // Valid type - returns schema + const validResult = container.resolveSchema({ payload: 'test' }, { type: 'message.a' }) + expect(validResult).toEqual({ result: MESSAGE_SCHEMA_A }) + + // Invalid type - resolver throws, error is returned + const invalidResult = container.resolveSchema({ payload: 'test' }, { type: 'other.type' }) + expect('error' in invalidResult).toBe(true) + if ('error' in invalidResult && invalidResult.error) { + expect(invalidResult.error.message).toBe('Unsupported type: other.type') + } }) }) describe('registration validation', () => { - it('should throw error when custom resolver is used with multiple schemas', () => { + it('should throw error when custom resolver is used with multiple schemas without explicit types', () => { expect( () => new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B], + messageSchemas: [{ schema: MESSAGE_SCHEMA_A }, { schema: MESSAGE_SCHEMA_B }], messageDefinitions: [], messageTypeResolver: { resolver: () => 'some.type', @@ -114,11 +126,11 @@ describe('MessageSchemaContainer', () => { ) }) - it('should allow custom resolver with single schema', () => { + it('should allow custom resolver with single schema and explicit messageType', () => { expect( () => new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A], + messageSchemas: [{ schema: MESSAGE_SCHEMA_A, messageType: 'message.a' }], messageDefinitions: [], messageTypeResolver: { resolver: () => 'message.a', @@ -136,7 +148,7 @@ describe('MessageSchemaContainer', () => { expect( () => new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_A, DUPLICATE_SCHEMA], + messageSchemas: [{ schema: MESSAGE_SCHEMA_A }, { schema: DUPLICATE_SCHEMA }], messageDefinitions: [], messageTypeResolver: { messageTypePath: 'type' }, }), @@ -147,7 +159,7 @@ describe('MessageSchemaContainer', () => { // When schema doesn't have the expected literal field, it falls back to DEFAULT_SCHEMA_KEY // With a single schema, this works fine const container = new MessageSchemaContainer({ - messageSchemas: [MESSAGE_SCHEMA_NO_TYPE], + messageSchemas: [{ schema: MESSAGE_SCHEMA_NO_TYPE }], messageDefinitions: [], messageTypeResolver: { messageTypePath: 'type' }, }) diff --git a/packages/core/test/queues/MessageTypeResolver.spec.ts b/packages/core/test/queues/MessageTypeResolver.spec.ts index ba5c38bc..5b48d614 100644 --- a/packages/core/test/queues/MessageTypeResolver.spec.ts +++ b/packages/core/test/queues/MessageTypeResolver.spec.ts @@ -67,11 +67,37 @@ describe('MessageTypeResolver', () => { ).toBe('user.presence') }) - it('should throw error when field is missing', () => { + it('should support nested paths with dot notation', () => { + const config = { messageTypePath: 'metadata.type' } + + expect( + resolveMessageType(config, { messageData: { metadata: { type: 'nested.event' } } }), + ).toBe('nested.event') + }) + + it('should support deeply nested paths', () => { + const config = { messageTypePath: 'envelope.header.eventType' } + + expect( + resolveMessageType(config, { + messageData: { envelope: { header: { eventType: 'deep.nested.event' } } }, + }), + ).toBe('deep.nested.event') + }) + + it('should throw error when path is missing', () => { const config = { messageTypePath: 'type' } expect(() => resolveMessageType(config, { messageData: {} })).toThrow( - "Unable to resolve message type: field 'type' not found in message data", + "Unable to resolve message type: path 'type' not found in message data", + ) + }) + + it('should throw error when nested path is missing', () => { + const config = { messageTypePath: 'metadata.type' } + + expect(() => resolveMessageType(config, { messageData: { metadata: {} } })).toThrow( + "Unable to resolve message type: path 'metadata.type' not found in message data", ) }) @@ -79,7 +105,7 @@ describe('MessageTypeResolver', () => { const config = { messageTypePath: 'type' } expect(() => resolveMessageType(config, { messageData: undefined })).toThrow( - "Unable to resolve message type: field 'type' not found in message data", + "Unable to resolve message type: path 'type' not found in message data", ) }) @@ -87,7 +113,7 @@ describe('MessageTypeResolver', () => { const config = { messageTypePath: 'type' } expect(() => resolveMessageType(config, { messageData: null })).toThrow( - "Unable to resolve message type: field 'type' not found in message data", + "Unable to resolve message type: path 'type' not found in message data", ) }) }) @@ -208,5 +234,61 @@ describe('MessageTypeResolver', () => { expect(extractMessageTypeFromSchema(schema, 'type')).toBeUndefined() }) + + it('should extract literal value from nested path in schema shape', () => { + const schema = { + shape: { + metadata: { + shape: { + type: { value: 'nested.event' }, + }, + }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'metadata.type')).toBe('nested.event') + }) + + it('should extract literal value from deeply nested path', () => { + const schema = { + shape: { + envelope: { + shape: { + header: { + shape: { + eventType: { value: 'deep.nested.event' }, + }, + }, + }, + }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'envelope.header.eventType')).toBe( + 'deep.nested.event', + ) + }) + + it('should return undefined when nested path does not exist', () => { + const schema = { + shape: { + metadata: { + shape: {}, + }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'metadata.type')).toBeUndefined() + }) + + it('should return undefined when intermediate path is not an object schema', () => { + const schema = { + shape: { + metadata: { value: 'string' }, // not a nested object + }, + } + + expect(extractMessageTypeFromSchema(schema, 'metadata.type')).toBeUndefined() + }) }) }) diff --git a/packages/kafka/lib/AbstractKafkaPublisher.ts b/packages/kafka/lib/AbstractKafkaPublisher.ts index 383c8f5d..fdb76f1a 100644 --- a/packages/kafka/lib/AbstractKafkaPublisher.ts +++ b/packages/kafka/lib/AbstractKafkaPublisher.ts @@ -45,7 +45,7 @@ export abstract class AbstractKafkaPublisher< this.schemaContainers = {} for (const { topic, schema } of topicsConfig) { this.schemaContainers[topic] = new MessageSchemaContainer({ - messageSchemas: [schema], + messageSchemas: [{ schema }], messageDefinitions: [], }) } From fca5f14fde22e4be077d3cd44b5c9ce62e0271f8 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 14:22:20 +0200 Subject: [PATCH 6/9] Update documentation --- UPGRADING.md | 4 +++- packages/core/README.md | 9 +++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/UPGRADING.md b/UPGRADING.md index 25e4ba98..80528cbe 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -39,10 +39,12 @@ super(dependencies, { - **`NO_MESSAGE_TYPE_FIELD` constant removed**: The `NO_MESSAGE_TYPE_FIELD` constant has been removed from `@message-queue-toolkit/core`. Use `messageTypeResolver` with literal mode instead. - **New `messageTypeResolver` configuration**: A flexible configuration for message type resolution. Supports three modes: - - `{ messageTypePath: 'type' }` - extract type from a field at the root of the message + - `{ messageTypePath: 'type' }` - extract type from a field in the message (supports dot notation for nested paths like `'metadata.type'`) - `{ literal: 'my.message.type' }` - use a constant type for all messages - `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function +- **`MessageSchemaContainer` API changed**: The constructor now accepts `SchemaEntry` and `DefinitionEntry` objects instead of bare schemas. Each entry can include an optional `messageType` for explicit type mapping (required when using custom resolvers). + - **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time. - **Custom resolver validation**: Custom resolver functions (`{ resolver: fn }`) cannot be used with multiple schemas in `MessageSchemaContainer`. This is because the resolver works at runtime, but at registration time the container needs to map schemas to types. Use `messageTypePath` for multiple schemas, or register only a single schema with a custom resolver. diff --git a/packages/core/README.md b/packages/core/README.md index 015b05da..253b34cf 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -88,11 +88,16 @@ The `messageTypeResolver` configuration supports three modes: ##### Mode 1: Field Path (Simple) -Use when the message type is a field at the root level of the parsed message body: +Use when the message type is a field in the parsed message body. Supports dot notation for nested paths: ```typescript { - messageTypeResolver: { messageTypePath: 'type' }, // Extracts type from message.type + messageTypeResolver: { messageTypePath: 'type' }, // Extracts from message.type +} + +// Nested path example +{ + messageTypeResolver: { messageTypePath: 'metadata.eventType' }, // Extracts from message.metadata.eventType } ``` From 78431893c7cbd0142d8a2153f6983059a51a7304 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 14:41:55 +0200 Subject: [PATCH 7/9] Use in GCP and SQS --- .../core/lib/queues/AbstractQueueService.ts | 9 +- packages/gcp-pubsub/lib/index.ts | 1 + .../lib/utils/gcpMessageTypeResolvers.spec.ts | 201 ++++++++++++++++ .../lib/utils/gcpMessageTypeResolvers.ts | 219 ++++++++++++++++++ packages/sqs/lib/index.ts | 5 + .../lib/utils/sqsMessageTypeResolvers.spec.ts | 132 +++++++++++ .../sqs/lib/utils/sqsMessageTypeResolvers.ts | 126 ++++++++++ ...rmissionConsumer.payloadOffloading.spec.ts | 153 ++++++++++++ 8 files changed, 843 insertions(+), 3 deletions(-) create mode 100644 packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.spec.ts create mode 100644 packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts create mode 100644 packages/sqs/lib/utils/sqsMessageTypeResolvers.spec.ts create mode 100644 packages/sqs/lib/utils/sqsMessageTypeResolvers.ts diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 3b09e2b0..4a9e6a39 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -12,6 +12,7 @@ import { MESSAGE_DEDUPLICATION_OPTIONS_SCHEMA, type MessageDeduplicationOptions, } from '@message-queue-toolkit/schemas' +import { getProperty, setProperty } from 'dot-prop' import type { ZodSchema, ZodType } from 'zod/v4' import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors.ts' import { @@ -680,11 +681,13 @@ export abstract class AbstractQueueService< [this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField], } - // Preserve message type field if using messageTypePath resolver + // Preserve message type field if using messageTypePath resolver (supports nested paths) if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) { const messageTypePath = this.messageTypeResolver.messageTypePath - // @ts-expect-error - result[messageTypePath] = message[messageTypePath] + const typeValue = getProperty(message, messageTypePath) + if (typeValue !== undefined) { + setProperty(result, messageTypePath, typeValue) + } } return result diff --git a/packages/gcp-pubsub/lib/index.ts b/packages/gcp-pubsub/lib/index.ts index d7ed559d..a4a7bd32 100644 --- a/packages/gcp-pubsub/lib/index.ts +++ b/packages/gcp-pubsub/lib/index.ts @@ -8,5 +8,6 @@ export * from './pubsub/CommonPubSubPublisherFactory.ts' export * from './pubsub/PubSubPublisherManager.ts' export * from './schemas/pubSubSchemas.ts' export * from './types/MessageTypes.ts' +export * from './utils/gcpMessageTypeResolvers.ts' export * from './utils/pubSubInitter.ts' export * from './utils/pubSubUtils.ts' diff --git a/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.spec.ts b/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.spec.ts new file mode 100644 index 00000000..05122304 --- /dev/null +++ b/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.spec.ts @@ -0,0 +1,201 @@ +import { resolveMessageType } from '@message-queue-toolkit/core' +import { describe, expect, it } from 'vitest' + +import { + CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER, + CLOUD_EVENTS_TYPE_ATTRIBUTE, + createAttributeResolver, + createAttributeResolverWithMapping, + GCS_EVENT_TYPE_ATTRIBUTE, + GCS_EVENT_TYPES, + GCS_NOTIFICATION_RAW_TYPE_RESOLVER, + GCS_NOTIFICATION_TYPE_RESOLVER, +} from './gcpMessageTypeResolvers.ts' + +describe('gcpMessageTypeResolvers', () => { + describe('createAttributeResolver', () => { + it('extracts type from message attribute', () => { + const resolver = createAttributeResolver('eventType') + + const result = resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: { eventType: 'OBJECT_FINALIZE' }, + }) + + expect(result).toBe('OBJECT_FINALIZE') + }) + + it('throws when attribute is missing', () => { + const resolver = createAttributeResolver('eventType') + + expect(() => + resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: {}, + }), + ).toThrow("attribute 'eventType' not found") + }) + + it('throws when attributes are undefined', () => { + const resolver = createAttributeResolver('eventType') + + expect(() => + resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: undefined, + }), + ).toThrow("attribute 'eventType' not found") + }) + + it('handles null attribute value', () => { + const resolver = createAttributeResolver('eventType') + + expect(() => + resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: { eventType: null }, + }), + ).toThrow("attribute 'eventType' not found") + }) + }) + + describe('createAttributeResolverWithMapping', () => { + it('maps attribute value to internal type', () => { + const resolver = createAttributeResolverWithMapping('eventType', { + OBJECT_FINALIZE: 'storage.object.created', + OBJECT_DELETE: 'storage.object.deleted', + }) + + const result = resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: { eventType: 'OBJECT_FINALIZE' }, + }) + + expect(result).toBe('storage.object.created') + }) + + it('throws when value is not mapped and fallback is disabled', () => { + const resolver = createAttributeResolverWithMapping('eventType', { + OBJECT_FINALIZE: 'storage.object.created', + }) + + expect(() => + resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: { eventType: 'UNKNOWN_EVENT' }, + }), + ).toThrow("'UNKNOWN_EVENT' is not mapped") + }) + + it('falls back to original value when fallbackToOriginal is true', () => { + const resolver = createAttributeResolverWithMapping( + 'eventType', + { OBJECT_FINALIZE: 'storage.object.created' }, + { fallbackToOriginal: true }, + ) + + const result = resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: { eventType: 'UNKNOWN_EVENT' }, + }) + + expect(result).toBe('UNKNOWN_EVENT') + }) + + it('throws when attribute is missing', () => { + const resolver = createAttributeResolverWithMapping('eventType', { + OBJECT_FINALIZE: 'storage.object.created', + }) + + expect(() => + resolveMessageType(resolver, { + messageData: { id: '123' }, + messageAttributes: {}, + }), + ).toThrow("attribute 'eventType' not found") + }) + }) + + describe('CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER', () => { + it('extracts type from ce-type attribute', () => { + const result = resolveMessageType(CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER, { + messageData: { id: '123', data: { orderId: '456' } }, + messageAttributes: { + [CLOUD_EVENTS_TYPE_ATTRIBUTE]: 'com.example.order.created', + 'ce-source': 'https://example.com/orders', + 'ce-specversion': '1.0', + }, + }) + + expect(result).toBe('com.example.order.created') + }) + + it('throws when ce-type attribute is missing', () => { + expect(() => + resolveMessageType(CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER, { + messageData: { id: '123' }, + messageAttributes: { + 'ce-source': 'https://example.com/orders', + }, + }), + ).toThrow(`attribute '${CLOUD_EVENTS_TYPE_ATTRIBUTE}' not found`) + }) + }) + + describe('GCS_NOTIFICATION_TYPE_RESOLVER', () => { + it('maps GCS event types to normalized internal types', () => { + const testCases = [ + { gcsType: GCS_EVENT_TYPES.OBJECT_FINALIZE, expected: 'gcs.object.finalized' }, + { gcsType: GCS_EVENT_TYPES.OBJECT_DELETE, expected: 'gcs.object.deleted' }, + { gcsType: GCS_EVENT_TYPES.OBJECT_ARCHIVE, expected: 'gcs.object.archived' }, + { gcsType: GCS_EVENT_TYPES.OBJECT_METADATA_UPDATE, expected: 'gcs.object.metadataUpdated' }, + ] + + for (const { gcsType, expected } of testCases) { + const result = resolveMessageType(GCS_NOTIFICATION_TYPE_RESOLVER, { + messageData: { kind: 'storage#object', bucket: 'my-bucket' }, + messageAttributes: { + [GCS_EVENT_TYPE_ATTRIBUTE]: gcsType, + bucketId: 'my-bucket', + objectId: 'path/to/file.jpg', + }, + }) + + expect(result).toBe(expected) + } + }) + + it('falls back to original type for unknown GCS events', () => { + const result = resolveMessageType(GCS_NOTIFICATION_TYPE_RESOLVER, { + messageData: { kind: 'storage#object' }, + messageAttributes: { + [GCS_EVENT_TYPE_ATTRIBUTE]: 'UNKNOWN_GCS_EVENT', + }, + }) + + expect(result).toBe('UNKNOWN_GCS_EVENT') + }) + + it('throws when eventType attribute is missing', () => { + expect(() => + resolveMessageType(GCS_NOTIFICATION_TYPE_RESOLVER, { + messageData: { kind: 'storage#object' }, + messageAttributes: { bucketId: 'my-bucket' }, + }), + ).toThrow(`attribute '${GCS_EVENT_TYPE_ATTRIBUTE}' not found`) + }) + }) + + describe('GCS_NOTIFICATION_RAW_TYPE_RESOLVER', () => { + it('returns raw GCS event type without mapping', () => { + const result = resolveMessageType(GCS_NOTIFICATION_RAW_TYPE_RESOLVER, { + messageData: { kind: 'storage#object' }, + messageAttributes: { + [GCS_EVENT_TYPE_ATTRIBUTE]: 'OBJECT_FINALIZE', + }, + }) + + expect(result).toBe('OBJECT_FINALIZE') + }) + }) +}) diff --git a/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts b/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts new file mode 100644 index 00000000..63f4d00b --- /dev/null +++ b/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts @@ -0,0 +1,219 @@ +import type { MessageTypeResolverConfig } from '@message-queue-toolkit/core' + +/** + * Pre-built message type resolver configurations for GCP Pub/Sub. + * + * These resolvers handle common GCP patterns where the message type is stored + * in message attributes rather than the message body. + * + * Note: Pub/Sub message attributes are flat key-value pairs (no nested objects), + * so we use direct property access rather than dot-notation path traversal. + */ + +/** + * CloudEvents attribute prefix used in Pub/Sub binary content mode. + * @see https://github.com/googleapis/google-cloudevents/blob/main/docs/spec/pubsub.md + */ +export const CLOUD_EVENTS_ATTRIBUTE_PREFIX = 'ce-' + +/** + * CloudEvents type attribute name in Pub/Sub binary content mode. + * Example: `ce-type: "com.example.someevent"` + */ +export const CLOUD_EVENTS_TYPE_ATTRIBUTE = 'ce-type' + +/** + * Cloud Storage notification event type attribute. + * @see https://cloud.google.com/storage/docs/pubsub-notifications + */ +export const GCS_EVENT_TYPE_ATTRIBUTE = 'eventType' + +/** + * Standard GCS event types. + * @see https://cloud.google.com/storage/docs/pubsub-notifications#events + */ +export const GCS_EVENT_TYPES = { + OBJECT_FINALIZE: 'OBJECT_FINALIZE', + OBJECT_DELETE: 'OBJECT_DELETE', + OBJECT_ARCHIVE: 'OBJECT_ARCHIVE', + OBJECT_METADATA_UPDATE: 'OBJECT_METADATA_UPDATE', +} as const + +/** + * Creates a resolver configuration that extracts message type from a message attribute. + * + * Use this when the message type is stored in Pub/Sub message attributes rather than + * the message body. This is common for GCP service notifications (Cloud Storage, + * Cloud Build, etc.) and CloudEvents. + * + * @param attributeName - The attribute key name (e.g., 'eventType', 'ce-type') + * @returns MessageTypeResolverConfig for use in consumer/publisher options + * + * @example + * ```typescript + * // For Cloud Storage notifications + * { + * messageTypeResolver: createAttributeResolver('eventType'), + * } + * + * // For CloudEvents in binary mode + * { + * messageTypeResolver: createAttributeResolver('ce-type'), + * } + * ``` + */ +export function createAttributeResolver(attributeName: string): MessageTypeResolverConfig { + return { + resolver: ({ messageAttributes }) => { + const attrs = messageAttributes as Record | undefined + const type = attrs?.[attributeName] as string | undefined | null + if (type === undefined || type === null) { + throw new Error( + `Unable to resolve message type: attribute '${attributeName}' not found in message attributes`, + ) + } + return type + }, + } +} + +/** + * Creates a resolver configuration that maps attribute values to internal message types. + * + * Use this when you want to normalize external event types to your internal naming convention. + * + * @param attributeName - The attribute key name + * @param typeMap - Map of external types to internal types + * @param options - Optional configuration + * @param options.fallbackToOriginal - If true, unmapped types are passed through (default: false) + * @returns MessageTypeResolverConfig for use in consumer/publisher options + * + * @example + * ```typescript + * // Map Cloud Storage events to internal types + * { + * messageTypeResolver: createAttributeResolverWithMapping('eventType', { + * 'OBJECT_FINALIZE': 'storage.object.created', + * 'OBJECT_DELETE': 'storage.object.deleted', + * 'OBJECT_ARCHIVE': 'storage.object.archived', + * 'OBJECT_METADATA_UPDATE': 'storage.object.metadataUpdated', + * }), + * } + * ``` + */ +export function createAttributeResolverWithMapping( + attributeName: string, + typeMap: Record, + options?: { fallbackToOriginal?: boolean }, +): MessageTypeResolverConfig { + return { + resolver: ({ messageAttributes }) => { + const attrs = messageAttributes as Record | undefined + const type = attrs?.[attributeName] as string | undefined | null + if (type === undefined || type === null) { + throw new Error( + `Unable to resolve message type: attribute '${attributeName}' not found in message attributes`, + ) + } + const mappedType = typeMap[type] + if (mappedType) { + return mappedType + } + if (options?.fallbackToOriginal) { + return type + } + throw new Error( + `Unable to resolve message type: attribute value '${type}' is not mapped. Available mappings: ${Object.keys(typeMap).join(', ')}`, + ) + }, + } +} + +/** + * Pre-built resolver for CloudEvents in Pub/Sub binary content mode. + * + * Extracts the event type from the `ce-type` message attribute. + * + * @see https://github.com/googleapis/google-cloudevents/blob/main/docs/spec/pubsub.md + * + * @example + * ```typescript + * class MyConsumer extends AbstractPubSubConsumer { + * constructor(deps: PubSubConsumerDependencies) { + * super(deps, { + * messageTypeResolver: CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER, + * handlers: new MessageHandlerConfigBuilder() + * .addConfig(schema, handler, { messageType: 'com.example.someevent' }) + * .build(), + * }, context) + * } + * } + * ``` + */ +export const CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER: MessageTypeResolverConfig = + createAttributeResolver(CLOUD_EVENTS_TYPE_ATTRIBUTE) + +/** + * Pre-built resolver for Google Cloud Storage notifications. + * + * Extracts the event type from the `eventType` message attribute and maps it + * to a normalized internal type. + * + * GCS Event Types: + * - `OBJECT_FINALIZE` → `gcs.object.finalized` + * - `OBJECT_DELETE` → `gcs.object.deleted` + * - `OBJECT_ARCHIVE` → `gcs.object.archived` + * - `OBJECT_METADATA_UPDATE` → `gcs.object.metadataUpdated` + * + * @see https://cloud.google.com/storage/docs/pubsub-notifications + * + * @example + * ```typescript + * class GcsNotificationConsumer extends AbstractPubSubConsumer { + * constructor(deps: PubSubConsumerDependencies) { + * super(deps, { + * messageTypeResolver: GCS_NOTIFICATION_TYPE_RESOLVER, + * handlers: new MessageHandlerConfigBuilder() + * .addConfig(schema, handler, { messageType: 'gcs.object.finalized' }) + * .build(), + * }, context) + * } + * } + * ``` + */ +export const GCS_NOTIFICATION_TYPE_RESOLVER: MessageTypeResolverConfig = + createAttributeResolverWithMapping( + GCS_EVENT_TYPE_ATTRIBUTE, + { + [GCS_EVENT_TYPES.OBJECT_FINALIZE]: 'gcs.object.finalized', + [GCS_EVENT_TYPES.OBJECT_DELETE]: 'gcs.object.deleted', + [GCS_EVENT_TYPES.OBJECT_ARCHIVE]: 'gcs.object.archived', + [GCS_EVENT_TYPES.OBJECT_METADATA_UPDATE]: 'gcs.object.metadataUpdated', + }, + { fallbackToOriginal: true }, + ) + +/** + * Pre-built resolver for Google Cloud Storage notifications (raw types). + * + * Same as `GCS_NOTIFICATION_TYPE_RESOLVER` but returns the raw GCS event type + * without mapping (e.g., `OBJECT_FINALIZE` instead of `gcs.object.finalized`). + * + * @see https://cloud.google.com/storage/docs/pubsub-notifications + * + * @example + * ```typescript + * class GcsNotificationConsumer extends AbstractPubSubConsumer { + * constructor(deps: PubSubConsumerDependencies) { + * super(deps, { + * messageTypeResolver: GCS_NOTIFICATION_RAW_TYPE_RESOLVER, + * handlers: new MessageHandlerConfigBuilder() + * .addConfig(schema, handler, { messageType: 'OBJECT_FINALIZE' }) + * .build(), + * }, context) + * } + * } + * ``` + */ +export const GCS_NOTIFICATION_RAW_TYPE_RESOLVER: MessageTypeResolverConfig = + createAttributeResolver(GCS_EVENT_TYPE_ATTRIBUTE) diff --git a/packages/sqs/lib/index.ts b/packages/sqs/lib/index.ts index f13c6daa..d32d16d2 100644 --- a/packages/sqs/lib/index.ts +++ b/packages/sqs/lib/index.ts @@ -37,6 +37,11 @@ export { } from './utils/sqsAttributeUtils.ts' export { deleteSqs, updateQueueAttributes } from './utils/sqsInitter.ts' export { deserializeSQSMessage } from './utils/sqsMessageDeserializer.ts' +export { + createEventBridgeResolverWithMapping, + EVENT_BRIDGE_DETAIL_TYPE_FIELD, + EVENT_BRIDGE_TYPE_RESOLVER, +} from './utils/sqsMessageTypeResolvers.ts' export { assertQueue, calculateOutgoingMessageSize, diff --git a/packages/sqs/lib/utils/sqsMessageTypeResolvers.spec.ts b/packages/sqs/lib/utils/sqsMessageTypeResolvers.spec.ts new file mode 100644 index 00000000..8e0e57c0 --- /dev/null +++ b/packages/sqs/lib/utils/sqsMessageTypeResolvers.spec.ts @@ -0,0 +1,132 @@ +import { resolveMessageType } from '@message-queue-toolkit/core' +import { describe, expect, it } from 'vitest' + +import { + createEventBridgeResolverWithMapping, + EVENT_BRIDGE_DETAIL_TYPE_FIELD, + EVENT_BRIDGE_TYPE_RESOLVER, +} from './sqsMessageTypeResolvers.ts' + +describe('sqsMessageTypeResolvers', () => { + describe('EVENT_BRIDGE_TYPE_RESOLVER', () => { + it('extracts type from detail-type field', () => { + const eventBridgeEvent = { + version: '0', + id: '12345678-1234-1234-1234-123456789012', + 'detail-type': 'Order Created', + source: 'com.myapp.orders', + account: '123456789012', + time: '2024-01-15T10:30:00Z', + region: 'us-east-1', + resources: [], + detail: { + orderId: 'order-456', + amount: 99.99, + }, + } + + const result = resolveMessageType(EVENT_BRIDGE_TYPE_RESOLVER, { + messageData: eventBridgeEvent, + }) + + expect(result).toBe('Order Created') + }) + + it('works with different detail-type values', () => { + const events = [ + { 'detail-type': 'User Signed Up', expected: 'User Signed Up' }, + { 'detail-type': 'payment.completed', expected: 'payment.completed' }, + { + 'detail-type': 'EC2 Instance State-change Notification', + expected: 'EC2 Instance State-change Notification', + }, + ] + + for (const { 'detail-type': detailType, expected } of events) { + const result = resolveMessageType(EVENT_BRIDGE_TYPE_RESOLVER, { + messageData: { [EVENT_BRIDGE_DETAIL_TYPE_FIELD]: detailType }, + }) + expect(result).toBe(expected) + } + }) + + it('throws when detail-type field is missing', () => { + expect(() => + resolveMessageType(EVENT_BRIDGE_TYPE_RESOLVER, { + messageData: { source: 'com.myapp.orders', detail: {} }, + }), + ).toThrow("path 'detail-type' not found") + }) + }) + + describe('createEventBridgeResolverWithMapping', () => { + it('maps detail-type values to internal types', () => { + const resolver = createEventBridgeResolverWithMapping({ + 'Order Created': 'order.created', + 'Order Updated': 'order.updated', + 'Order Cancelled': 'order.cancelled', + }) + + const eventBridgeEvent = { + 'detail-type': 'Order Created', + source: 'com.myapp.orders', + detail: { orderId: '123' }, + } + + const result = resolveMessageType(resolver, { + messageData: eventBridgeEvent, + }) + + expect(result).toBe('order.created') + }) + + it('throws when detail-type is not mapped and fallback is disabled', () => { + const resolver = createEventBridgeResolverWithMapping({ + 'Order Created': 'order.created', + }) + + expect(() => + resolveMessageType(resolver, { + messageData: { 'detail-type': 'Unknown Event' }, + }), + ).toThrow("'Unknown Event' is not mapped") + }) + + it('falls back to original value when fallbackToOriginal is true', () => { + const resolver = createEventBridgeResolverWithMapping( + { 'Order Created': 'order.created' }, + { fallbackToOriginal: true }, + ) + + const result = resolveMessageType(resolver, { + messageData: { 'detail-type': 'Unknown Event' }, + }) + + expect(result).toBe('Unknown Event') + }) + + it('throws when detail-type is missing', () => { + const resolver = createEventBridgeResolverWithMapping({ + 'Order Created': 'order.created', + }) + + expect(() => + resolveMessageType(resolver, { + messageData: { source: 'com.myapp', detail: {} }, + }), + ).toThrow("'detail-type' field not found") + }) + + it('handles null detail-type value', () => { + const resolver = createEventBridgeResolverWithMapping({ + 'Order Created': 'order.created', + }) + + expect(() => + resolveMessageType(resolver, { + messageData: { 'detail-type': null }, + }), + ).toThrow("'detail-type' field not found") + }) + }) +}) diff --git a/packages/sqs/lib/utils/sqsMessageTypeResolvers.ts b/packages/sqs/lib/utils/sqsMessageTypeResolvers.ts new file mode 100644 index 00000000..d726c047 --- /dev/null +++ b/packages/sqs/lib/utils/sqsMessageTypeResolvers.ts @@ -0,0 +1,126 @@ +import type { MessageTypeResolverConfig } from '@message-queue-toolkit/core' + +/** + * Pre-built message type resolver configurations for AWS SQS. + * + * These resolvers handle common AWS patterns where message types are stored + * in specific fields of the message body. + */ + +/** + * EventBridge detail-type field name. + * EventBridge events use 'detail-type' as the event type discriminator. + */ +export const EVENT_BRIDGE_DETAIL_TYPE_FIELD = 'detail-type' + +/** + * Pre-built resolver for AWS EventBridge events delivered to SQS. + * + * EventBridge events have a specific envelope structure where the event type + * is stored in the `detail-type` field. This resolver extracts that field + * for message routing. + * + * @see https://docs.aws.amazon.com/eventbridge/latest/userguide/aws-events.html + * + * @example + * ```typescript + * import { EVENT_BRIDGE_TYPE_RESOLVER } from '@message-queue-toolkit/sqs' + * + * class MyConsumer extends AbstractSqsConsumer { + * constructor(deps: SQSConsumerDependencies) { + * super(deps, { + * messageTypeResolver: EVENT_BRIDGE_TYPE_RESOLVER, + * handlers: new MessageHandlerConfigBuilder() + * .addConfig(userCreatedSchema, handler) + * .build(), + * }, context) + * } + * } + * ``` + * + * @example EventBridge event structure + * ```json + * { + * "version": "0", + * "id": "12345678-1234-1234-1234-123456789012", + * "detail-type": "Order Created", + * "source": "com.myapp.orders", + * "account": "123456789012", + * "time": "2024-01-15T10:30:00Z", + * "region": "us-east-1", + * "resources": [], + * "detail": { + * "orderId": "order-456", + * "amount": 99.99 + * } + * } + * ``` + */ +export const EVENT_BRIDGE_TYPE_RESOLVER: MessageTypeResolverConfig = { + messageTypePath: EVENT_BRIDGE_DETAIL_TYPE_FIELD, +} + +/** + * Creates an EventBridge resolver that normalizes detail-type values. + * + * EventBridge detail-type values are typically human-readable strings like + * "Order Created" or "User Signed Up". This resolver allows you to normalize + * them to your internal naming convention (e.g., "order.created"). + * + * @param typeMap - Map of EventBridge detail-types to internal message types + * @param options - Optional configuration + * @param options.fallbackToOriginal - If true, unmapped types are passed through (default: false) + * @returns MessageTypeResolverConfig for use in consumer options + * + * @example + * ```typescript + * import { createEventBridgeResolverWithMapping } from '@message-queue-toolkit/sqs' + * + * const resolver = createEventBridgeResolverWithMapping({ + * 'Order Created': 'order.created', + * 'Order Updated': 'order.updated', + * 'Order Cancelled': 'order.cancelled', + * }) + * + * class MyConsumer extends AbstractSqsConsumer { + * constructor(deps: SQSConsumerDependencies) { + * super(deps, { + * messageTypeResolver: resolver, + * handlers: new MessageHandlerConfigBuilder() + * .addConfig(orderCreatedSchema, handler, { messageType: 'order.created' }) + * .build(), + * }, context) + * } + * } + * ``` + */ +export function createEventBridgeResolverWithMapping( + typeMap: Record, + options?: { fallbackToOriginal?: boolean }, +): MessageTypeResolverConfig { + return { + resolver: ({ messageData }) => { + const data = messageData as { 'detail-type'?: string } + const detailType = data[EVENT_BRIDGE_DETAIL_TYPE_FIELD] + + if (detailType === undefined || detailType === null) { + throw new Error( + `Unable to resolve message type: '${EVENT_BRIDGE_DETAIL_TYPE_FIELD}' field not found in message`, + ) + } + + const mappedType = typeMap[detailType] + if (mappedType) { + return mappedType + } + + if (options?.fallbackToOriginal) { + return detailType + } + + throw new Error( + `Unable to resolve message type: detail-type '${detailType}' is not mapped. Available mappings: ${Object.keys(typeMap).join(', ')}`, + ) + }, + } +} diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts index bb5c78c6..b85d2114 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.payloadOffloading.spec.ts @@ -1,6 +1,7 @@ import type { S3 } from '@aws-sdk/client-s3' import { SendMessageCommand } from '@aws-sdk/client-sqs' import type { SinglePayloadStoreConfig } from '@message-queue-toolkit/core' +import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' import { S3PayloadStore } from '@message-queue-toolkit/s3-payload-store' import { assertQueue, @@ -10,7 +11,10 @@ import { import type { AwilixContainer } from 'awilix' import { asValue } from 'awilix' import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import z from 'zod/v4' +import { AbstractSqsConsumer } from '../../lib/sqs/AbstractSqsConsumer.ts' +import { AbstractSqsPublisher } from '../../lib/sqs/AbstractSqsPublisher.ts' import { SQS_MESSAGE_MAX_SIZE } from '../../lib/sqs/AbstractSqsService.ts' import { SqsPermissionPublisher } from '../publishers/SqsPermissionPublisher.ts' import { assertBucket, emptyBucket, putObjectContent, waitForS3Objects } from '../utils/s3Utils.ts' @@ -252,3 +256,152 @@ describe('SqsPermissionConsumer - single-store payload offloading', () => { }) }) }) + +describe('SqsPermissionConsumer - nested messageTypePath with payload offloading', () => { + /** + * Tests that nested messageTypePath (e.g., 'metadata.type') is correctly preserved + * when payload is offloaded. The type field at nested path should be maintained + * in the pointer message so routing still works correctly. + */ + describe('consume with nested type path', () => { + const largeMessageSizeThreshold = SQS_MESSAGE_MAX_SIZE + const s3BucketName = 'test-bucket-nested-path' + const TEST_QUEUE_NAME = 'nested_type_path_offloading_test' + + let diContainer: AwilixContainer + let s3: S3 + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + s3 = diContainer.cradle.s3 + + await assertBucket(s3, s3BucketName) + }) + + afterAll(async () => { + await emptyBucket(s3, s3BucketName) + + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('preserves nested messageTypePath when offloading payload', async () => { + const { sqsClient } = diContainer.cradle + await deleteQueue(sqsClient, TEST_QUEUE_NAME) + + const payloadStoreConfig: SinglePayloadStoreConfig = { + messageSizeThreshold: largeMessageSizeThreshold, + store: new S3PayloadStore(diContainer.cradle, { bucketName: s3BucketName }), + storeName: 's3', + } + + // Schema with nested type path at 'metadata.type' + const nestedTypeSchema = z.object({ + id: z.string(), + metadata: z.object({ + type: z.literal('nested.event'), + largeField: z.string().optional(), + }), + timestamp: z.string().optional(), + }) + + type NestedTypeMessage = z.output + type ExecutionContext = Record + + // Create publisher with nested messageTypePath + class NestedPathPublisher extends AbstractSqsPublisher { + constructor(deps: Dependencies) { + super(deps, { + creationConfig: { + queue: { QueueName: TEST_QUEUE_NAME }, + }, + messageSchemas: [nestedTypeSchema], + messageTypeResolver: { messageTypePath: 'metadata.type' }, + handlerSpy: true, + payloadStoreConfig, + deletionConfig: { deleteIfExists: true }, + }) + } + } + + let receivedMessage: NestedTypeMessage | null = null + + // Initialize publisher first to create queue + const publisher = new NestedPathPublisher(diContainer.cradle) + await publisher.init() + + // Create large message with nested type + const message: NestedTypeMessage = { + id: 'nested-path-test-1', + metadata: { + type: 'nested.event', + largeField: 'x'.repeat(largeMessageSizeThreshold), + }, + } + expect(JSON.stringify(message).length).toBeGreaterThan(largeMessageSizeThreshold) + + // Publish - should offload to S3 but preserve metadata.type + await publisher.publish(message) + await publisher.handlerSpy.waitForMessageWithId(message.id, 'published') + + // Create consumer pointing to the queue + // @ts-expect-error - accessing protected property for test + const queueUrl = publisher.queueUrl + + class NestedPathConsumer extends AbstractSqsConsumer { + constructor(deps: Dependencies) { + super( + deps, + { + locatorConfig: { queueUrl }, + messageTypeResolver: { messageTypePath: 'metadata.type' }, + handlerSpy: true, + payloadStoreConfig, + deletionConfig: { deleteIfExists: false }, + consumerOverrides: { terminateVisibilityTimeout: true }, + handlers: new MessageHandlerConfigBuilder() + .addConfig(nestedTypeSchema, (msg) => { + receivedMessage = msg + return Promise.resolve({ result: 'success' }) + }) + .build(), + }, + {}, + ) + } + } + + const consumer = new NestedPathConsumer(diContainer.cradle) + await consumer.start() + + // Wait for message to be consumed + const consumptionResult = await consumer.handlerSpy.waitForMessageWithId( + message.id, + 'consumed', + ) + + // Verify the message was consumed correctly with nested type preserved + expect(consumptionResult.message).toMatchObject({ + id: 'nested-path-test-1', + metadata: { + type: 'nested.event', + largeField: 'x'.repeat(largeMessageSizeThreshold), + }, + }) + expect(receivedMessage).toMatchObject({ + id: 'nested-path-test-1', + metadata: { + type: 'nested.event', + }, + }) + + // Clean up + await consumer.close(true) + await publisher.close() + }) + }) +}) From 058cfe2dbf3fe31f68392841dfa50e1fcdf829c1 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 14:46:22 +0200 Subject: [PATCH 8/9] Add timestamps as well --- packages/gcp-pubsub/README.md | 115 ++++++++++++++++++ .../lib/utils/gcpMessageTypeResolvers.ts | 24 ++++ packages/sqs/README.md | 44 +++++++ packages/sqs/lib/index.ts | 1 + .../sqs/lib/utils/sqsMessageTypeResolvers.ts | 15 +++ 5 files changed, 199 insertions(+) diff --git a/packages/gcp-pubsub/README.md b/packages/gcp-pubsub/README.md index 913e4a2b..3e2680c7 100644 --- a/packages/gcp-pubsub/README.md +++ b/packages/gcp-pubsub/README.md @@ -590,6 +590,121 @@ await publisher.publish({ - ✅ Gradual migration from legacy systems - ✅ Works with all features (retry, deduplication, offloading) +### Pre-built Message Type Resolvers + +The library provides pre-built resolvers for common GCP patterns where the message type is stored in message attributes rather than the message body. + +#### CloudEvents Binary Mode + +For CloudEvents delivered in Pub/Sub binary content mode, the event type is stored in the `ce-type` attribute and the timestamp in the `ce-time` attribute: + +```typescript +import { + CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER, + CLOUD_EVENTS_TIME_ATTRIBUTE, + MessageHandlerConfigBuilder, +} from '@message-queue-toolkit/gcp-pubsub' + +class CloudEventsConsumer extends AbstractPubSubConsumer { + constructor(deps: PubSubConsumerDependencies) { + super( + deps, + { + messageTypeResolver: CLOUD_EVENTS_BINARY_MODE_TYPE_RESOLVER, + // Note: For binary mode, timestamp is in ce-time attribute (not message body) + // You may need custom handling if you want to extract it from attributes + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler, { messageType: 'com.example.order.created' }) + .build(), + // ... + }, + context, + ) + } +} + +// For CloudEvents in structured content mode (type in message body), +// use messageTimestampField with the CLOUD_EVENTS_TIMESTAMP_FIELD constant: +import { CLOUD_EVENTS_TIMESTAMP_FIELD } from '@message-queue-toolkit/gcp-pubsub' + +{ + messageTypeResolver: { messageTypePath: 'type' }, + messageTimestampField: CLOUD_EVENTS_TIMESTAMP_FIELD, // 'time' +} +``` + +#### Google Cloud Storage Notifications + +For Cloud Storage notifications, the event type is in the `eventType` attribute. Use `GCS_NOTIFICATION_TYPE_RESOLVER` for normalized types or `GCS_NOTIFICATION_RAW_TYPE_RESOLVER` for raw GCS types: + +```typescript +import { + GCS_NOTIFICATION_TYPE_RESOLVER, + GCS_EVENT_TYPES, + MessageHandlerConfigBuilder, +} from '@message-queue-toolkit/gcp-pubsub' + +// With normalized types (OBJECT_FINALIZE → gcs.object.finalized) +class GcsNotificationConsumer extends AbstractPubSubConsumer { + constructor(deps: PubSubConsumerDependencies) { + super( + deps, + { + messageTypeResolver: GCS_NOTIFICATION_TYPE_RESOLVER, + handlers: new MessageHandlerConfigBuilder() + .addConfig(objectFinalizedSchema, handler, { messageType: 'gcs.object.finalized' }) + .addConfig(objectDeletedSchema, handler, { messageType: 'gcs.object.deleted' }) + .build(), + // ... + }, + context, + ) + } +} + +// With raw GCS types +import { GCS_NOTIFICATION_RAW_TYPE_RESOLVER } from '@message-queue-toolkit/gcp-pubsub' + +{ + messageTypeResolver: GCS_NOTIFICATION_RAW_TYPE_RESOLVER, + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler, { messageType: 'OBJECT_FINALIZE' }) + .build(), +} +``` + +**GCS Event Type Mappings (with `GCS_NOTIFICATION_TYPE_RESOLVER`):** +| GCS Event Type | Normalized Type | +|---------------|-----------------| +| `OBJECT_FINALIZE` | `gcs.object.finalized` | +| `OBJECT_DELETE` | `gcs.object.deleted` | +| `OBJECT_ARCHIVE` | `gcs.object.archived` | +| `OBJECT_METADATA_UPDATE` | `gcs.object.metadataUpdated` | + +#### Custom Attribute Resolvers + +For other attribute-based type resolution, create your own resolver: + +```typescript +import { + createAttributeResolver, + createAttributeResolverWithMapping, +} from '@message-queue-toolkit/gcp-pubsub' + +// Simple attribute extraction +const resolver = createAttributeResolver('myEventType') + +// With type mapping +const resolverWithMapping = createAttributeResolverWithMapping( + 'eventType', + { + 'EVENT_A': 'internal.event.a', + 'EVENT_B': 'internal.event.b', + }, + { fallbackToOriginal: true }, // Optional: pass through unmapped types +) +``` + ### Payload Offloading For messages larger than 10 MB, store the payload externally (e.g., Google Cloud Storage): diff --git a/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts b/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts index 63f4d00b..471eac85 100644 --- a/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts +++ b/packages/gcp-pubsub/lib/utils/gcpMessageTypeResolvers.ts @@ -22,6 +22,30 @@ export const CLOUD_EVENTS_ATTRIBUTE_PREFIX = 'ce-' */ export const CLOUD_EVENTS_TYPE_ATTRIBUTE = 'ce-type' +/** + * CloudEvents time attribute name in Pub/Sub binary content mode. + * Example: `ce-time: "2024-01-15T10:30:00Z"` + * Note: This is in message attributes, not the message body. + */ +export const CLOUD_EVENTS_TIME_ATTRIBUTE = 'ce-time' + +/** + * CloudEvents timestamp field name in structured content mode. + * CloudEvents structured mode uses 'time' instead of 'timestamp' in the message body. + * Use this constant for `messageTimestampField` configuration when consuming + * CloudEvents in structured format. + * + * @example + * ```typescript + * // For CloudEvents in structured content mode (type in message body) + * { + * messageTypeResolver: { messageTypePath: 'type' }, + * messageTimestampField: CLOUD_EVENTS_TIMESTAMP_FIELD, + * } + * ``` + */ +export const CLOUD_EVENTS_TIMESTAMP_FIELD = 'time' + /** * Cloud Storage notification event type attribute. * @see https://cloud.google.com/storage/docs/pubsub-notifications diff --git a/packages/sqs/README.md b/packages/sqs/README.md index bc17ffe7..c2bb7306 100644 --- a/packages/sqs/README.md +++ b/packages/sqs/README.md @@ -1928,6 +1928,50 @@ const exampleEvent = { } satisfies UserCreatedEvent ``` +#### Pre-built EventBridge Type Resolver + +Instead of manually configuring `{ messageTypePath: 'detail-type' }`, you can use the pre-built `EVENT_BRIDGE_TYPE_RESOLVER`: + +```typescript +import { + EVENT_BRIDGE_TYPE_RESOLVER, + EVENT_BRIDGE_TIMESTAMP_FIELD, + createEventBridgeResolverWithMapping, +} from '@message-queue-toolkit/sqs' + +// Simple usage - extracts from 'detail-type' field +class EventBridgeConsumer extends AbstractSqsConsumer { + constructor(deps: SQSConsumerDependencies) { + super(deps, { + messageTypeResolver: EVENT_BRIDGE_TYPE_RESOLVER, + messageTimestampField: EVENT_BRIDGE_TIMESTAMP_FIELD, // 'time' + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler) + .build(), + }) + } +} + +// With type mapping - normalize EventBridge detail-types to internal types +const resolver = createEventBridgeResolverWithMapping({ + 'Order Created': 'order.created', // "Order Created" → "order.created" + 'Order Updated': 'order.updated', + 'Order Cancelled': 'order.cancelled', +}, { fallbackToOriginal: true }) // Optional: pass through unmapped types + +class MappedEventBridgeConsumer extends AbstractSqsConsumer { + constructor(deps: SQSConsumerDependencies) { + super(deps, { + messageTypeResolver: resolver, + messageTimestampField: EVENT_BRIDGE_TIMESTAMP_FIELD, + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler, { messageType: 'order.created' }) + .build(), + }) + } +} +``` + ### Custom Message Structures For other non-standard message formats, you can configure the field mappings: diff --git a/packages/sqs/lib/index.ts b/packages/sqs/lib/index.ts index d32d16d2..1d397f43 100644 --- a/packages/sqs/lib/index.ts +++ b/packages/sqs/lib/index.ts @@ -40,6 +40,7 @@ export { deserializeSQSMessage } from './utils/sqsMessageDeserializer.ts' export { createEventBridgeResolverWithMapping, EVENT_BRIDGE_DETAIL_TYPE_FIELD, + EVENT_BRIDGE_TIMESTAMP_FIELD, EVENT_BRIDGE_TYPE_RESOLVER, } from './utils/sqsMessageTypeResolvers.ts' export { diff --git a/packages/sqs/lib/utils/sqsMessageTypeResolvers.ts b/packages/sqs/lib/utils/sqsMessageTypeResolvers.ts index d726c047..606b9656 100644 --- a/packages/sqs/lib/utils/sqsMessageTypeResolvers.ts +++ b/packages/sqs/lib/utils/sqsMessageTypeResolvers.ts @@ -13,6 +13,21 @@ import type { MessageTypeResolverConfig } from '@message-queue-toolkit/core' */ export const EVENT_BRIDGE_DETAIL_TYPE_FIELD = 'detail-type' +/** + * EventBridge timestamp field name. + * EventBridge events use 'time' instead of the default 'timestamp' field. + * Use this constant for `messageTimestampField` configuration. + * + * @example + * ```typescript + * { + * messageTypeResolver: EVENT_BRIDGE_TYPE_RESOLVER, + * messageTimestampField: EVENT_BRIDGE_TIMESTAMP_FIELD, + * } + * ``` + */ +export const EVENT_BRIDGE_TIMESTAMP_FIELD = 'time' + /** * Pre-built resolver for AWS EventBridge events delivered to SQS. * From c1e267160b87628ff07fea148b876c06dab3c3f5 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Fri, 12 Dec 2025 14:49:26 +0200 Subject: [PATCH 9/9] Handle type edge cases --- .../core/lib/queues/MessageTypeResolver.ts | 20 +++-- .../test/queues/MessageTypeResolver.spec.ts | 80 +++++++++++++++++++ 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/packages/core/lib/queues/MessageTypeResolver.ts b/packages/core/lib/queues/MessageTypeResolver.ts index bd20ea70..d6fc2e33 100644 --- a/packages/core/lib/queues/MessageTypeResolver.ts +++ b/packages/core/lib/queues/MessageTypeResolver.ts @@ -165,16 +165,18 @@ export function resolveMessageType( } if (isMessageTypePathConfig(config)) { - const messageType = getProperty(context.messageData, config.messageTypePath) as - | string - | undefined - | null - if (messageType === undefined || messageType === null) { + const rawMessageType = getProperty(context.messageData, config.messageTypePath) + if (rawMessageType === undefined || rawMessageType === null) { throw new Error( `Unable to resolve message type: path '${config.messageTypePath}' not found in message data`, ) } - return messageType + if (typeof rawMessageType !== 'string') { + throw new Error( + `Unable to resolve message type: path '${config.messageTypePath}' contains a non-string value (got ${typeof rawMessageType})`, + ) + } + return rawMessageType } // Custom resolver function - must return a string (user handles errors/defaults) @@ -218,5 +220,9 @@ export function extractMessageTypeFromSchema( return undefined } - return current.value as string | undefined + const value = current.value + if (typeof value !== 'string') { + return undefined + } + return value } diff --git a/packages/core/test/queues/MessageTypeResolver.spec.ts b/packages/core/test/queues/MessageTypeResolver.spec.ts index 5b48d614..d8b44962 100644 --- a/packages/core/test/queues/MessageTypeResolver.spec.ts +++ b/packages/core/test/queues/MessageTypeResolver.spec.ts @@ -116,6 +116,42 @@ describe('MessageTypeResolver', () => { "Unable to resolve message type: path 'type' not found in message data", ) }) + + it('should throw error when value is a number', () => { + const config = { messageTypePath: 'type' } + + expect(() => resolveMessageType(config, { messageData: { type: 123 } })).toThrow( + "Unable to resolve message type: path 'type' contains a non-string value (got number)", + ) + }) + + it('should throw error when value is a boolean', () => { + const config = { messageTypePath: 'type' } + + expect(() => resolveMessageType(config, { messageData: { type: true } })).toThrow( + "Unable to resolve message type: path 'type' contains a non-string value (got boolean)", + ) + }) + + it('should throw error when value is an object', () => { + const config = { messageTypePath: 'type' } + + expect(() => + resolveMessageType(config, { messageData: { type: { nested: 'value' } } }), + ).toThrow( + "Unable to resolve message type: path 'type' contains a non-string value (got object)", + ) + }) + + it('should throw error when nested path value is not a string', () => { + const config = { messageTypePath: 'metadata.type' } + + expect(() => + resolveMessageType(config, { messageData: { metadata: { type: 42 } } }), + ).toThrow( + "Unable to resolve message type: path 'metadata.type' contains a non-string value (got number)", + ) + }) }) describe('custom resolver mode', () => { @@ -290,5 +326,49 @@ describe('MessageTypeResolver', () => { expect(extractMessageTypeFromSchema(schema, 'metadata.type')).toBeUndefined() }) + + it('should return undefined when value is a number', () => { + const schema = { + shape: { + type: { value: 123 }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'type')).toBeUndefined() + }) + + it('should return undefined when value is a boolean', () => { + const schema = { + shape: { + type: { value: true }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'type')).toBeUndefined() + }) + + it('should return undefined when value is an object', () => { + const schema = { + shape: { + type: { value: { nested: 'value' } }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'type')).toBeUndefined() + }) + + it('should return undefined when nested path value is not a string', () => { + const schema = { + shape: { + metadata: { + shape: { + type: { value: 42 }, + }, + }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'metadata.type')).toBeUndefined() + }) }) })