diff --git a/README.md b/README.md index dbbcb8c2..a3886060 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,11 @@ They implement the following public methods: * `options`, composed by * `messageSchemas` – the `zod` schemas for all supported messages; * `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer; - * `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation. **Note:** It is not supported for Kafka publisher + * `messageTypeResolver` - configuration for resolving the message type. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation. Supports three modes: + * `{ messageTypePath: 'type' }` - extract type from a field at the root of the message + * `{ literal: 'my.message.type' }` - use a constant type for all messages + * `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function + **Note:** It is not supported for Kafka publisher. See `@message-queue-toolkit/core` README for detailed documentation and examples. * `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`. * `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`; * `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information); @@ -94,7 +98,7 @@ Multi-schema consumers support multiple message types via handler configs. They * `dependencies` – a set of dependencies depending on the protocol; * `options`, composed by * `handlers` – configuration for handling each of the supported message types. See "Multi-schema handler definition" for more details; - * `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler; **Note:** It is not supported for Kafka consumer + * `messageTypeResolver` - configuration for resolving the message type. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler. See Publishers section above for details. **Note:** It is not supported for Kafka consumer. * `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or an ISO-8601 date string; * `maxRetryDuration` - how long (in seconds) the message should be retried due to the `retryLater` result before marking it as consumed (and sending to DLQ, if one is configured). This is used to avoid infinite loops. Default is 4 days; * `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name) diff --git a/UPGRADING.md b/UPGRADING.md index 43656e9f..f30d5c0c 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,5 +1,105 @@ # Upgrading Guide +## Upgrading
`core` `25.x.x` -> `26.0.0`
`sqs` `xx.x.x` -> `xx.0.0`
`sns` `xx.x.x` -> `xx.0.0`
`amqp` `xx.x.x` -> `xx.0.0`
`gcp-pubsub` `2.x.x` -> `3.0.0` + +### Description of Breaking Changes + +- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead. + +- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes. + +### Migration Steps + +#### Replacing `messageTypeField` with `messageTypeResolver` + +Replace `messageTypeField: 'fieldName'` with `messageTypeResolver: { messageTypePath: 'fieldName' }`: + +```typescript +// Before +super(dependencies, { + messageTypeField: 'type', + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler) + .build(), +}) + +// After +super(dependencies, { + messageTypeResolver: { messageTypePath: 'type' }, + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler) + .build(), +}) +``` + +## Upgrading
`core` `24.x.x` -> `25.0.0`
`gcp-pubsub` `1.x.x` -> `2.0.0` + +### Description of Breaking Changes + +- **`NO_MESSAGE_TYPE_FIELD` constant removed**: The `NO_MESSAGE_TYPE_FIELD` constant has been removed from `@message-queue-toolkit/core`. Use `messageTypeResolver` with literal mode instead. + +- **New `messageTypeResolver` configuration**: A flexible configuration for message type resolution. Supports three modes: + - `{ messageTypePath: 'type' }` - extract type from a field at the root of the message + - `{ literal: 'my.message.type' }` - use a constant type for all messages + - `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function + +- **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time. + +### Migration Steps + +#### If using `NO_MESSAGE_TYPE_FIELD` + +Replace `messageTypeField: NO_MESSAGE_TYPE_FIELD` with `messageTypeResolver: { literal: 'your.message.type' }`: + +```typescript +// Before +import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core' + +super(dependencies, { + messageTypeField: NO_MESSAGE_TYPE_FIELD, + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler) + .build(), +}) + +// After +super(dependencies, { + messageTypeResolver: { literal: 'your.message.type' }, + handlers: new MessageHandlerConfigBuilder() + .addConfig(schema, handler, { messageType: 'your.message.type' }) + .build(), +}) +``` + +#### If using custom resolver function + +When using `messageTypeResolver: { resolver: fn }`, provide explicit `messageType` in handler options: + +```typescript +super(dependencies, { + messageTypeResolver: { + resolver: ({ messageAttributes }) => { + // Map external event types to internal types + const eventType = messageAttributes?.eventType as string + if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created' + throw new Error(`Unknown event type: ${eventType}`) + }, + }, + handlers: new MessageHandlerConfigBuilder() + .addConfig(ObjectSchema, handler, { messageType: 'storage.object.created' }) + .build(), +}) +``` + +#### GCP Pub/Sub DLQ Consumer + +The `AbstractPubSubDlqConsumer` now uses `DLQ_MESSAGE_TYPE` constant internally. If you import this constant, update your import: + +```typescript +import { DLQ_MESSAGE_TYPE } from '@message-queue-toolkit/gcp-pubsub' +// DLQ_MESSAGE_TYPE = 'dlq.message' +``` + ## Upgrading
`core` `19.0.0` -> `20.0.0`
`sqs` `19.0.0` -> `20.0.0`
`sns` `20.0.0` -> `21.0.0`
`amqp` `18.0.0` -> `19.0.0`
`metrics` `2.0.0` -> `3.0.0` ### Description of Breaking Changes diff --git a/examples/sns-sqs/lib/common/TestPublisherManager.ts b/examples/sns-sqs/lib/common/TestPublisherManager.ts index 584c50e8..93bd8419 100644 --- a/examples/sns-sqs/lib/common/TestPublisherManager.ts +++ b/examples/sns-sqs/lib/common/TestPublisherManager.ts @@ -29,7 +29,7 @@ export const publisherManager = new SnsPublisherManager< newPublisherOptions: { handlerSpy: true, messageIdField: 'id', - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, deletionConfig: { deleteIfExists: isTest, // only enable this in tests // and ensure that the owning side is doing the deletion. diff --git a/examples/sns-sqs/lib/common/UserConsumer.ts b/examples/sns-sqs/lib/common/UserConsumer.ts index 0ca52872..a52edf12 100644 --- a/examples/sns-sqs/lib/common/UserConsumer.ts +++ b/examples/sns-sqs/lib/common/UserConsumer.ts @@ -27,7 +27,7 @@ export class UserConsumer extends AbstractSnsSqsConsumer({ - messageTypeField: this.messageTypeField, + messageTypeResolver: this.messageTypeResolver, messageHandlers: options.handlers, }) this.executionContext = executionContext @@ -151,12 +151,8 @@ export abstract class AbstractAmqpConsumer< } const { originalMessage, parsedMessage } = deserializedMessage.result - // @ts-expect-error - const messageType = parsedMessage[this.messageTypeField] - const transactionSpanId = `queue_${this.queueName}:${ - // @ts-expect-error - parsedMessage[this.messageTypeField] - }` + const messageType = this.resolveMessageTypeFromMessage(parsedMessage) ?? 'unknown' + const transactionSpanId = `queue_${this.queueName}:${messageType}` // @ts-expect-error const uniqueTransactionKey = parsedMessage[this.messageIdField] diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 439910d0..5b8aabdd 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -88,8 +88,8 @@ export abstract class AbstractAmqpPublisher< } if (this.logMessages) { - // @ts-expect-error - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) + const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' + const resolvedLogMessage = this.resolveMessageLog(message, messageType) this.logMessage(resolvedLogMessage) } @@ -122,8 +122,7 @@ export abstract class AbstractAmqpPublisher< // @ts-expect-error queueName: this.queueName, exchange: this.exchange, - // @ts-expect-error - messageType: message[this.messageTypeField] ?? 'unknown', + messageType: this.resolveMessageTypeFromMessage(message) ?? 'unknown', }), cause: err as Error, }) diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.ts index 064d7dff..95217645 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.ts @@ -75,7 +75,7 @@ export class AmqpPermissionConsumer extends AbstractAmqpQueueConsumer< deadLetterQueue: options?.deadLetterQueue, logMessages: options?.logMessages, handlerSpy: true, - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, deletionConfig: { deleteIfExists: true }, maxRetryDuration: options?.maxRetryDuration, handlers: new MessageHandlerConfigBuilder< diff --git a/packages/amqp/test/fakes/CustomFakeConsumer.ts b/packages/amqp/test/fakes/CustomFakeConsumer.ts index 942e315b..616ff54f 100644 --- a/packages/amqp/test/fakes/CustomFakeConsumer.ts +++ b/packages/amqp/test/fakes/CustomFakeConsumer.ts @@ -5,6 +5,11 @@ import type { ZodSchema } from 'zod/v4' import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer.ts' import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService.ts' +/** + * Message type used for the catch-all handler in CustomFakeConsumer. + */ +const CUSTOM_FAKE_MESSAGE_TYPE = 'custom.fake.message' + export class CustomFakeConsumer extends AbstractAmqpQueueConsumer< PublisherBaseMessageType, unknown @@ -22,9 +27,12 @@ export class CustomFakeConsumer extends AbstractAmqpQueueConsumer< }, }, handlerSpy: true, - messageTypeField: 'messageType', + // Use literal resolver so all messages route to the same handler + messageTypeResolver: { literal: CUSTOM_FAKE_MESSAGE_TYPE }, handlers: new MessageHandlerConfigBuilder() - .addConfig(schema, () => Promise.resolve({ result: 'success' })) + .addConfig(schema, () => Promise.resolve({ result: 'success' }), { + messageType: CUSTOM_FAKE_MESSAGE_TYPE, + }) .build(), }, {}, diff --git a/packages/amqp/test/fakes/FakeQueueConsumer.ts b/packages/amqp/test/fakes/FakeQueueConsumer.ts index 2a08ce17..959065c9 100644 --- a/packages/amqp/test/fakes/FakeQueueConsumer.ts +++ b/packages/amqp/test/fakes/FakeQueueConsumer.ts @@ -25,7 +25,7 @@ export class FakeQueueConsumer extends AbstractAmqpQueueConsumer< deleteIfExists: true, }, handlerSpy: true, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, handlers: new MessageHandlerConfigBuilder() .addConfig(eventDefinition.consumerSchema, () => Promise.resolve({ result: 'success' })) .build(), diff --git a/packages/amqp/test/fakes/FakeTopicConsumer.ts b/packages/amqp/test/fakes/FakeTopicConsumer.ts index 5232f648..cdc7d21f 100644 --- a/packages/amqp/test/fakes/FakeTopicConsumer.ts +++ b/packages/amqp/test/fakes/FakeTopicConsumer.ts @@ -41,7 +41,7 @@ export class FakeTopicConsumer extends AbstractAmqpTopicConsumer< deleteIfExists: true, }, handlerSpy: true, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, handlers: new MessageHandlerConfigBuilder() .addConfig(eventDefinition.consumerSchema, () => { this.messageCounter++ diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts index 901cf705..e4734b2c 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.ts @@ -48,7 +48,7 @@ export class AmqpPermissionPublisher extends AbstractAmqpQueuePublisher ``` +### Message Type Resolution + +#### What is Message Type? + +The **message type** is a discriminator field that identifies what kind of event or command a message represents. It's used for: + +1. **Routing**: Directing messages to the appropriate handler based on their type +2. **Schema validation**: Selecting the correct Zod schema to validate the message +3. **Observability**: Tracking metrics and logs per message type + +In a typical event-driven architecture, a single queue or topic may receive multiple types of messages. For example, a `user-events` queue might receive `user.created`, `user.updated`, and `user.deleted` events. The message type tells the consumer which handler should process each message. + +#### Configuration Options + +The `messageTypeResolver` configuration supports three modes: + +##### Mode 1: Field Path (Simple) + +Use when the message type is a field at the root level of the parsed message body: + +```typescript +{ + messageTypeResolver: { messageTypePath: 'type' }, // Extracts type from message.type +} +``` + +##### Mode 2: Literal (Constant) + +Use when all messages are of the same type: + +```typescript +{ + messageTypeResolver: { literal: 'order.created' }, // All messages treated as this type +} +``` + +##### Mode 3: Custom Resolver (Flexible) + +Use for complex scenarios where the type needs to be extracted from message attributes, nested fields, or requires transformation: + +```typescript +import type { MessageTypeResolverConfig } from '@message-queue-toolkit/core' + +const resolverConfig: MessageTypeResolverConfig = { + resolver: ({ messageData, messageAttributes }) => { + // Your custom logic here + return 'resolved.type' + }, +} +``` + +**Important:** The resolver function must always return a valid string. If the type cannot be determined, either return a default type or throw an error with a descriptive message. + +#### Real-World Examples by Platform + +##### AWS SQS (Plain) + +When publishing your own events directly to SQS, you control the message format: + +```typescript +// Message format you control +{ + "id": "msg-123", + "type": "order.created", // Your type field + "timestamp": "2024-01-15T10:30:00Z", + "payload": { + "orderId": "order-456", + "amount": 99.99 + } +} + +// Configuration +{ + messageTypeResolver: { messageTypePath: 'type' }, +} +``` + +##### AWS EventBridge → SQS + +EventBridge events have a specific structure with `detail-type`: + +```typescript +// EventBridge event structure delivered to SQS +{ + "version": "0", + "id": "12345678-1234-1234-1234-123456789012", + "detail-type": "Order Created", // EventBridge uses detail-type + "source": "com.myapp.orders", + "account": "123456789012", + "time": "2024-01-15T10:30:00Z", + "region": "us-east-1", + "detail": { + "orderId": "order-456", + "amount": 99.99 + } +} + +// Configuration +{ + messageTypeResolver: { messageTypePath: 'detail-type' }, +} + +// Or with resolver for normalization +{ + messageTypeResolver: { + resolver: ({ messageData }) => { + const data = messageData as { 'detail-type'?: string; source?: string } + const detailType = data['detail-type'] + if (!detailType) throw new Error('detail-type is required') + // Optionally normalize: "Order Created" → "order.created" + return detailType.toLowerCase().replace(/ /g, '.') + }, + }, +} +``` + +##### AWS SNS → SQS + +SNS messages wrapped in SQS have the actual payload in the `Message` field (handled automatically by the library after unwrapping): + +```typescript +// After SNS envelope unwrapping, you get your original message +{ + "id": "msg-123", + "type": "user.signup.completed", + "userId": "user-789", + "email": "user@example.com" +} + +// Configuration +{ + messageTypeResolver: { messageTypePath: 'type' }, +} +``` + +##### Apache Kafka + +Kafka typically uses topic-based routing, but you may still need message types within a topic: + +```typescript +// Kafka message value (JSON) +{ + "eventType": "inventory.reserved", + "eventId": "evt-123", + "timestamp": 1705312200000, + "data": { + "sku": "PROD-001", + "quantity": 5 + } +} + +// Configuration +{ + messageTypeResolver: { messageTypePath: 'eventType' }, +} + +// Or using Kafka headers (via custom resolver) +{ + messageTypeResolver: { + resolver: ({ messageData, messageAttributes }) => { + // Kafka headers are passed as messageAttributes + if (messageAttributes?.['ce_type']) { + return messageAttributes['ce_type'] as string // CloudEvents header + } + const data = messageData as { eventType?: string } + if (!data.eventType) throw new Error('eventType required') + return data.eventType + }, + }, +} +``` + +##### Google Cloud Pub/Sub (Your Own Events) + +When you control the message format in Pub/Sub: + +```typescript +// Your message (base64-decoded from data field) +{ + "type": "payment.processed", + "paymentId": "pay-123", + "amount": 150.00, + "currency": "USD" +} + +// Configuration +{ + messageTypeResolver: { messageTypePath: 'type' }, +} +``` + +##### Google Cloud Pub/Sub (Cloud Storage Notifications) + +Cloud Storage notifications put the event type in message **attributes**, not the data payload: + +```typescript +// Pub/Sub message structure for Cloud Storage notifications +{ + "data": "eyJraW5kIjoic3RvcmFnZSMgb2JqZWN0In0=", // Base64-encoded object metadata + "attributes": { + "eventType": "OBJECT_FINALIZE", // Type is HERE, not in data! + "bucketId": "my-bucket", + "objectId": "path/to/file.jpg", + "objectGeneration": "1705312200000" + }, + "messageId": "123456789", + "publishTime": "2024-01-15T10:30:00Z" +} + +// Configuration - must use resolver to access attributes +{ + messageTypeResolver: { + resolver: ({ messageAttributes }) => { + const eventType = messageAttributes?.eventType as string + if (!eventType) { + throw new Error('eventType attribute required for Cloud Storage notifications') + } + // Map GCS event types to your internal types + const typeMap: Record = { + 'OBJECT_FINALIZE': 'storage.object.created', + 'OBJECT_DELETE': 'storage.object.deleted', + 'OBJECT_ARCHIVE': 'storage.object.archived', + 'OBJECT_METADATA_UPDATE': 'storage.object.metadataUpdated', + } + return typeMap[eventType] ?? eventType + }, + }, +} +``` + +##### Google Cloud Pub/Sub (Eventarc / CloudEvents) + +Eventarc delivers events in CloudEvents format: + +```typescript +// CloudEvents structured format +{ + "specversion": "1.0", + "type": "google.cloud.storage.object.v1.finalized", // CloudEvents type + "source": "//storage.googleapis.com/projects/_/buckets/my-bucket", + "id": "1234567890", + "time": "2024-01-15T10:30:00Z", + "datacontenttype": "application/json", + "data": { + "bucket": "my-bucket", + "name": "path/to/file.jpg", + "contentType": "image/jpeg" + } +} + +// Configuration +{ + messageTypeResolver: { messageTypePath: 'type' }, // CloudEvents type is at root level +} + +// Or with mapping to simpler types +{ + messageTypeResolver: { + resolver: ({ messageData }) => { + const data = messageData as { type?: string } + const ceType = data.type + if (!ceType) throw new Error('CloudEvents type required') + // Map verbose CloudEvents types to simpler names + if (ceType.includes('storage.object') && ceType.includes('finalized')) { + return 'storage.object.created' + } + if (ceType.includes('storage.object') && ceType.includes('deleted')) { + return 'storage.object.deleted' + } + return ceType + }, + }, +} +``` + +##### Single-Type Queues (Any Platform) + +When a queue/subscription only ever receives one type of message, use `literal`: + +```typescript +// Dedicated queue for order.created events only +{ + messageTypeResolver: { + literal: 'order.created', + }, +} +``` + +This is useful for: +- Dedicated queues/subscriptions filtered to a single event type +- Legacy systems where messages don't have a type field +- Simple integrations where you know exactly what you're receiving + ### Handler Configuration Use `MessageHandlerConfigBuilder` to configure handlers for different message types: @@ -167,6 +460,56 @@ const handlers = new MessageHandlerConfigBuilder< .build() ``` +#### Handler Configuration Options + +The third parameter to `addConfig` accepts these options: + +| Option | Type | Description | +|--------|------|-------------| +| `messageType` | `string` | Explicit message type for routing. Required when using custom resolver. | +| `messageLogFormatter` | `(message) => unknown` | Custom formatter for logging | +| `preHandlers` | `Prehandler[]` | Middleware functions run before the handler | +| `preHandlerBarrier` | `BarrierCallback` | Barrier function for out-of-order message handling | + +#### Explicit Message Type + +When using a custom resolver function (`messageTypeResolver: { resolver: fn }`), the message type cannot be automatically extracted from schemas at registration time. You must provide an explicit `messageType` for each handler: + +```typescript +const handlers = new MessageHandlerConfigBuilder() + .addConfig( + STORAGE_OBJECT_SCHEMA, + handleObjectCreated, + { messageType: 'storage.object.created' } // Required for custom resolver + ) + .addConfig( + STORAGE_DELETE_SCHEMA, + handleObjectDeleted, + { messageType: 'storage.object.deleted' } // Required for custom resolver + ) + .build() + +const container = new HandlerContainer({ + messageHandlers: handlers, + messageTypeResolver: { + resolver: ({ messageAttributes }) => { + // Map external event types to your internal types + const eventType = messageAttributes?.eventType as string + if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created' + if (eventType === 'OBJECT_DELETE') return 'storage.object.deleted' + throw new Error(`Unknown event type: ${eventType}`) + }, + }, +}) +``` + +**Priority for determining handler message type:** +1. Explicit `messageType` in handler options (highest priority) +2. Literal type from `messageTypeResolver: { literal: 'type' }` +3. Extract from schema's literal field using `messageTypePath` + +If the message type cannot be determined, an error is thrown during container construction. + ### HandlerContainer Routes messages to appropriate handlers based on message type: @@ -176,7 +519,7 @@ import { HandlerContainer } from '@message-queue-toolkit/core' const container = new HandlerContainer({ messageHandlers: handlers, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, }) const handler = container.resolveHandler(message.type) @@ -191,7 +534,7 @@ import { MessageSchemaContainer } from '@message-queue-toolkit/core' const container = new MessageSchemaContainer({ messageSchemas: [Schema1, Schema2], - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, }) const schema = container.resolveSchema(message.type) @@ -229,25 +572,6 @@ await emitter.emit('user.created', { userId: 'user-123' }) ## Utilities -### NO_MESSAGE_TYPE_FIELD - -Use this constant when your consumer should accept all message types without routing: - -```typescript -import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core' - -// Consumer will use a single handler for all messages -{ - messageTypeField: NO_MESSAGE_TYPE_FIELD, - handlers: new MessageHandlerConfigBuilder() - .addConfig(PassthroughSchema, async (message) => { - // Handles any message type - return { result: 'success' } - }) - .build(), -} -``` - ### Error Classes ```typescript @@ -327,6 +651,21 @@ type BarrierCallback = ( type BarrierResult = | { isPassing: true; output: Output } | { isPassing: false; output?: never } + +// Message type resolver context +type MessageTypeResolverContext = { + messageData: unknown + messageAttributes?: Record +} + +// Message type resolver function +type MessageTypeResolverFn = (context: MessageTypeResolverContext) => string + +// Message type resolver configuration +type MessageTypeResolverConfig = + | { messageTypePath: string } // Extract from field at root of message data + | { literal: string } // Constant type for all messages + | { resolver: MessageTypeResolverFn } // Custom resolver function ``` ### Utility Functions diff --git a/packages/core/lib/events/DomainEventEmitter.ts b/packages/core/lib/events/DomainEventEmitter.ts index 5633c7b1..b2da7b5e 100644 --- a/packages/core/lib/events/DomainEventEmitter.ts +++ b/packages/core/lib/events/DomainEventEmitter.ts @@ -180,6 +180,7 @@ export class DomainEventEmitter processingResult: { status: 'consumed' }, }, event.id, + event.type, ) }) this.inProgressBackgroundHandlerByEventId.set(event.id, bgPromise) diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index 9f2019b5..eb778624 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -61,18 +61,31 @@ export { HandlerContainer, MessageHandlerConfig, MessageHandlerConfigBuilder, - NO_MESSAGE_TYPE_FIELD, type PrehandlerResult, } from './queues/HandlerContainer.ts' export { + ANY_MESSAGE_TYPE, HandlerSpy, type HandlerSpyParams, type PublicHandlerSpy, resolveHandlerSpy, type SpyResultInput, + TYPE_NOT_RESOLVED, } from './queues/HandlerSpy.ts' export type { MessageSchemaContainerOptions } from './queues/MessageSchemaContainer.ts' export { MessageSchemaContainer } from './queues/MessageSchemaContainer.ts' +export type { + MessageTypeResolverConfig, + MessageTypeResolverContext, + MessageTypeResolverFn, +} from './queues/MessageTypeResolver.ts' +export { + extractMessageTypeFromSchema, + isMessageTypeLiteralConfig, + isMessageTypePathConfig, + isMessageTypeResolverFnConfig, + resolveMessageType, +} from './queues/MessageTypeResolver.ts' export type { AsyncPublisher, ExtraParams, diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 0814fae1..29960ccd 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -59,8 +59,14 @@ import type { PrehandlerResult, } from './HandlerContainer.ts' import type { HandlerSpy, PublicHandlerSpy } from './HandlerSpy.ts' -import { resolveHandlerSpy } from './HandlerSpy.ts' +import { resolveHandlerSpy, TYPE_NOT_RESOLVED } from './HandlerSpy.ts' import { MessageSchemaContainer } from './MessageSchemaContainer.ts' +import { + isMessageTypePathConfig, + type MessageTypeResolverConfig, + type MessageTypeResolverContext, + resolveMessageType, +} from './MessageTypeResolver.ts' export type Deserializer = ( message: unknown, @@ -111,7 +117,10 @@ export abstract class AbstractQueueService< protected readonly errorReporter: ErrorReporter public readonly logger: CommonLogger protected readonly messageIdField: string - protected readonly messageTypeField: string + /** + * Configuration for resolving message types. + */ + protected readonly messageTypeResolver?: MessageTypeResolverConfig protected readonly logMessages: boolean protected readonly creationConfig?: QueueConfiguration protected readonly locatorConfig?: QueueLocatorType @@ -143,7 +152,7 @@ export abstract class AbstractQueueService< this.messageMetricsManager = messageMetricsManager this.messageIdField = options.messageIdField ?? 'id' - this.messageTypeField = options.messageTypeField + this.messageTypeResolver = options.messageTypeResolver this.messageTimestampField = options.messageTimestampField ?? 'timestamp' this.messageDeduplicationIdField = options.messageDeduplicationIdField ?? 'deduplicationId' this.messageDeduplicationOptionsField = @@ -166,7 +175,7 @@ export abstract class AbstractQueueService< protected resolveConsumerMessageSchemaContainer(options: { handlers: MessageHandlerConfig[] - messageTypeField: string + messageTypeResolver?: MessageTypeResolverConfig }) { const messageSchemas = options.handlers.map((entry) => entry.schema) const messageDefinitions: CommonEventDefinition[] = options.handlers @@ -174,7 +183,7 @@ export abstract class AbstractQueueService< .filter((entry) => entry !== undefined) return new MessageSchemaContainer({ - messageTypeField: options.messageTypeField, + messageTypeResolver: options.messageTypeResolver, messageSchemas, messageDefinitions, }) @@ -182,17 +191,36 @@ export abstract class AbstractQueueService< protected resolvePublisherMessageSchemaContainer(options: { messageSchemas: readonly ZodSchema[] - messageTypeField: string + messageTypeResolver?: MessageTypeResolverConfig }) { const messageSchemas = options.messageSchemas return new MessageSchemaContainer({ - messageTypeField: options.messageTypeField, + messageTypeResolver: options.messageTypeResolver, messageSchemas, messageDefinitions: [], }) } + /** + * Resolves message type from message data and optional attributes using messageTypeResolver. + * + * @param messageData - The parsed message data + * @param messageAttributes - Optional message-level attributes (e.g., PubSub attributes) + * @returns The resolved message type, or undefined if not configured + */ + protected resolveMessageTypeFromMessage( + messageData: unknown, + messageAttributes?: Record, + ): string | undefined { + if (this.messageTypeResolver) { + const context: MessageTypeResolverContext = { messageData, messageAttributes } + return resolveMessageType(this.messageTypeResolver, context) + } + + return undefined + } + protected abstract resolveSchema( message: MessagePayloadSchemas, ): Either> @@ -236,12 +264,17 @@ export abstract class AbstractQueueService< const { message, processingResult, messageId } = params const messageProcessingEndTimestamp = Date.now() + // Resolve message type once and pass to spy for consistent type resolution + const messageType = message + ? (this.resolveMessageTypeFromMessage(message) ?? TYPE_NOT_RESOLVED) + : TYPE_NOT_RESOLVED this._handlerSpy?.addProcessedMessage( { message, processingResult, }, messageId, + messageType, ) const debugLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug') @@ -278,11 +311,7 @@ export abstract class AbstractQueueService< const resolvedMessageId: string | undefined = message?.[this.messageIdField] ?? messageId const messageTimestamp = message ? this.tryToExtractTimestamp(message)?.getTime() : undefined - const messageType = - message && this.messageTypeField in message - ? // @ts-ignore - message[this.messageTypeField] - : undefined + const messageType = message ? this.resolveMessageTypeFromMessage(message) : undefined const messageDeduplicationId = message && this.messageDeduplicationIdField in message ? // @ts-ignore @@ -625,7 +654,7 @@ export abstract class AbstractQueueService< } // Return message with both new and legacy formats for backward compatibility - return { + const result: OffloadedPayloadPointerPayload = { // Extended payload reference format payloadRef: { id: payloadId, @@ -638,14 +667,21 @@ export abstract class AbstractQueueService< // @ts-expect-error [this.messageIdField]: message[this.messageIdField], // @ts-expect-error - [this.messageTypeField]: message[this.messageTypeField], - // @ts-expect-error [this.messageTimestampField]: message[this.messageTimestampField], // @ts-expect-error [this.messageDeduplicationIdField]: message[this.messageDeduplicationIdField], // @ts-expect-error [this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField], } + + // Preserve message type field if using messageTypePath resolver + if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) { + const messageTypePath = this.messageTypeResolver.messageTypePath + // @ts-expect-error + result[messageTypePath] = message[messageTypePath] + } + + return result } /** diff --git a/packages/core/lib/queues/HandlerContainer.ts b/packages/core/lib/queues/HandlerContainer.ts index f6be05fd..e5512bc9 100644 --- a/packages/core/lib/queues/HandlerContainer.ts +++ b/packages/core/lib/queues/HandlerContainer.ts @@ -5,6 +5,14 @@ import type { ZodSchema } from 'zod/v4' import type { DoNotProcessMessageError } from '../errors/DoNotProcessError.ts' import type { RetryMessageLaterError } from '../errors/RetryMessageLaterError.ts' +import { + extractMessageTypeFromSchema, + isMessageTypeLiteralConfig, + isMessageTypePathConfig, + type MessageTypeResolverConfig, + type MessageTypeResolverContext, + resolveMessageType, +} from './MessageTypeResolver.ts' export type PreHandlingOutputs = { preHandlerOutput: PrehandlerOutput @@ -55,6 +63,12 @@ export type HandlerConfigOptions< PrehandlerOutput, BarrierOutput, > = { + /** + * Explicit message type for this handler. + * Required when using a custom resolver function that cannot extract types from schemas. + * Optional when using messageTypePath or literal resolver modes. + */ + messageType?: string messageLogFormatter?: LogFormatter preHandlerBarrier?: BarrierCallback< MessagePayloadSchema, @@ -73,6 +87,11 @@ export class MessageHandlerConfig< > { public readonly schema: ZodSchema public readonly definition?: CommonEventDefinition + /** + * Explicit message type for this handler, if provided. + * Used for routing when type cannot be extracted from schema. + */ + public readonly messageType?: string public readonly handler: Handler< MessagePayloadSchema, ExecutionContext, @@ -105,6 +124,7 @@ export class MessageHandlerConfig< ) { this.schema = schema this.definition = eventDefinition + this.messageType = options?.messageType this.handler = handler this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter this.preHandlerBarrier = options?.preHandlerBarrier @@ -222,18 +242,12 @@ export type HandlerContainerOptions< PrehandlerOutput = undefined, > = { messageHandlers: MessageHandlerConfig[] - messageTypeField: string + /** + * Configuration for resolving message types. + */ + messageTypeResolver?: MessageTypeResolverConfig } -/** - * Use this constant as `messageTypeField` value when your consumer should accept - * all message types without routing by type. When used, a single handler will - * process all incoming messages regardless of their type field value. - */ -export const NO_MESSAGE_TYPE_FIELD = '' - -const DEFAULT_HANDLER_KEY = 'NO_MESSAGE_TYPE' - export class HandlerContainer< MessagePayloadSchemas extends object, ExecutionContext, @@ -243,37 +257,76 @@ export class HandlerContainer< string, MessageHandlerConfig > - private readonly messageTypeField: string + private readonly messageTypeResolver?: MessageTypeResolverConfig constructor( options: HandlerContainerOptions, ) { - this.messageTypeField = options.messageTypeField + this.messageTypeResolver = options.messageTypeResolver this.messageHandlers = this.resolveHandlerMap(options.messageHandlers) } /** * Resolves a handler for the given message type. - * When messageTypeField is NO_MESSAGE_TYPE_FIELD (empty string), pass undefined as messageType - * to get the default handler. */ public resolveHandler( - messageType: string | undefined, + messageType: string, ): MessageHandlerConfig< MessagePayloadSchemas, ExecutionContext, PrehandlerOutput, BarrierOutput > { - const handlerKey = messageType ?? DEFAULT_HANDLER_KEY - const handler = this.messageHandlers[handlerKey] + const handler = this.messageHandlers[messageType] if (!handler) { - throw new Error(`Unsupported message type: ${handlerKey}`) + throw new Error(`Unsupported message type: ${messageType}`) } // @ts-expect-error return handler } + /** + * Resolves message type from message data and optional attributes using the configured resolver. + * + * @param messageData - The parsed message data + * @param messageAttributes - Optional message-level attributes (e.g., PubSub attributes) + * @returns The resolved message type + * @throws Error if message type cannot be resolved + */ + public resolveMessageType( + messageData: unknown, + messageAttributes?: Record, + ): string { + if (this.messageTypeResolver) { + const context: MessageTypeResolverContext = { messageData, messageAttributes } + return resolveMessageType(this.messageTypeResolver, context) + } + + throw new Error('Unable to resolve message type: messageTypeResolver is not configured') + } + + /** + * Gets the field path used for extracting message type from schemas during registration. + * Returns undefined for literal or custom resolver modes. + */ + private getMessageTypePathForSchema(): string | undefined { + if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) { + return this.messageTypeResolver.messageTypePath + } + // For literal or custom resolver, we don't extract type from schema + return undefined + } + + /** + * Gets the literal message type if configured. + */ + private getLiteralMessageType(): string | undefined { + if (this.messageTypeResolver && isMessageTypeLiteralConfig(this.messageTypeResolver)) { + return this.messageTypeResolver.literal + } + return undefined + } + private resolveHandlerMap( supportedHandlers: MessageHandlerConfig< MessagePayloadSchemas, @@ -284,17 +337,40 @@ export class HandlerContainer< string, MessageHandlerConfig > { + const literalType = this.getLiteralMessageType() + const messageTypePath = this.getMessageTypePathForSchema() + return supportedHandlers.reduce( (acc, entry) => { - // When messageTypeField is empty (NO_MESSAGE_TYPE_FIELD), use DEFAULT_HANDLER_KEY - // This allows a single handler to process all message types let messageType: string | undefined - if (this.messageTypeField) { - // @ts-expect-error - messageType = entry.schema.shape[this.messageTypeField]?.value + + // Priority 1: Explicit messageType on the handler config + if (entry.messageType) { + messageType = entry.messageType + } + // Priority 2: Literal type from resolver config (same for all handlers) + else if (literalType) { + messageType = literalType + } + // Priority 3: Extract type from schema shape using the field path + else if (messageTypePath) { + // @ts-expect-error - ZodSchema has shape property at runtime + messageType = extractMessageTypeFromSchema(entry.schema, messageTypePath) + } + + if (!messageType) { + throw new Error( + 'Unable to determine message type for handler. ' + + 'Either provide messageType in handler options, use a literal resolver, ' + + 'or ensure the schema has a literal type field matching messageTypePath.', + ) + } + + if (acc[messageType]) { + throw new Error(`Duplicate handler for message type: ${messageType}`) } - const handlerKey = messageType ?? DEFAULT_HANDLER_KEY - acc[handlerKey] = entry + + acc[messageType] = entry return acc }, {} as Record< diff --git a/packages/core/lib/queues/HandlerSpy.ts b/packages/core/lib/queues/HandlerSpy.ts index 2cad92a0..2573bb18 100644 --- a/packages/core/lib/queues/HandlerSpy.ts +++ b/packages/core/lib/queues/HandlerSpy.ts @@ -9,10 +9,30 @@ import type { } from '../types/MessageQueueTypes.ts' import { objectMatches } from '../utils/matchUtils.ts' +/** + * Symbol that can be used in place of a message type value in `waitForMessage` or `checkForMessage` + * to match messages regardless of their type. Useful when using custom message type resolvers + * where the type may not be available in the message payload itself. + * + * @example + * // Match any message with a specific ID, regardless of type + * await spy.waitForMessage({ id: '123', type: ANY_MESSAGE_TYPE }) + */ +export const ANY_MESSAGE_TYPE: unique symbol = Symbol('ANY_MESSAGE_TYPE') + +/** + * Symbol used to indicate that the message type could not be resolved. + * Typically used when message parsing failed before type resolution could occur. + * + * @example + * // For failed message parsing + * spy.addProcessedMessage({ message: null, processingResult }, messageId, TYPE_NOT_RESOLVED) + */ +export const TYPE_NOT_RESOLVED: unique symbol = Symbol('TYPE_NOT_RESOLVED') + export type HandlerSpyParams = { bufferSize?: number messageIdField?: string - messageTypeField?: string } export type SpyResultInput = { @@ -66,15 +86,12 @@ export class HandlerSpy { // biome-ignore lint/suspicious/noExplicitAny: This is expected private readonly messageBuffer: Fifo> private readonly messageIdField: keyof MessagePayloadSchemas - private readonly messageTypeField: keyof MessagePayloadSchemas private readonly spyPromises: SpyPromiseMetadata[] constructor(params: HandlerSpyParams = {}) { this.messageBuffer = new Fifo(params.bufferSize ?? 100) // @ts-expect-error this.messageIdField = params.messageIdField ?? 'id' - // @ts-expect-error - this.messageTypeField = params.messageTypeField ?? 'type' this.spyPromises = [] } @@ -83,8 +100,16 @@ export class HandlerSpy { fields: DeepPartial, status?: MessageProcessingResultStatus, ): boolean { + // Handle ANY_MESSAGE_TYPE symbol - if any field value is ANY_MESSAGE_TYPE, skip matching that field + const fieldsToMatch = { ...fields } + for (const [key, value] of Object.entries(fieldsToMatch)) { + if (value === ANY_MESSAGE_TYPE) { + delete fieldsToMatch[key as keyof typeof fieldsToMatch] + } + } + return ( - objectMatches(fields, spyResult.message) && + objectMatches(fieldsToMatch, spyResult.message) && (!status || spyResult.processingResult.status === status) ) } @@ -142,21 +167,35 @@ export class HandlerSpy { return Object.values(this.messageBuffer.items).map((item) => item.value) } - addProcessedMessage(processingResult: SpyResultInput, messageId?: string) { + /** + * Add a processed message to the spy buffer. + * @param processingResult - The processing result containing the message and status + * @param messageId - Optional message ID override (used if message parsing failed) + * @param messageType - The resolved message type, or TYPE_NOT_RESOLVED symbol if type couldn't be determined + */ + addProcessedMessage( + processingResult: SpyResultInput, + messageId: string | undefined, + messageType: string | typeof TYPE_NOT_RESOLVED, + ) { const resolvedMessageId = processingResult.message?.[this.messageIdField] ?? messageId ?? randomUUID() - const resolvedMessageType = - processingResult.message?.[this.messageTypeField] ?? 'FAILED_TO_RESOLVE' + // Use provided messageType, converting TYPE_NOT_RESOLVED symbol to string for storage + const resolvedMessageType: string = + messageType === TYPE_NOT_RESOLVED ? 'TYPE_NOT_RESOLVED' : messageType - // If we failed to parse message, let's store id and type at least + // If we failed to parse message, let's store id and type at least for debugging const resolvedProcessingResult = processingResult.message ? (processingResult as SpyResultOutput) : ({ ...processingResult, message: { [this.messageIdField]: messageId, - [this.messageTypeField]: resolvedMessageType, + type: + resolvedMessageType === 'TYPE_NOT_RESOLVED' + ? 'FAILED_TO_RESOLVE' + : resolvedMessageType, }, } as SpyResultOutput) diff --git a/packages/core/lib/queues/MessageSchemaContainer.ts b/packages/core/lib/queues/MessageSchemaContainer.ts index a57b5640..bfc57d12 100644 --- a/packages/core/lib/queues/MessageSchemaContainer.ts +++ b/packages/core/lib/queues/MessageSchemaContainer.ts @@ -1,11 +1,22 @@ import type { Either } from '@lokalise/node-core' import type { CommonEventDefinition } from '@message-queue-toolkit/schemas' import type { ZodSchema } from 'zod/v4' +import { + extractMessageTypeFromSchema, + isMessageTypeLiteralConfig, + isMessageTypePathConfig, + type MessageTypeResolverConfig, + type MessageTypeResolverContext, + resolveMessageType, +} from './MessageTypeResolver.ts' export type MessageSchemaContainerOptions = { messageDefinitions: readonly CommonEventDefinition[] messageSchemas: readonly ZodSchema[] - messageTypeField?: string + /** + * Configuration for resolving message types. + */ + messageTypeResolver?: MessageTypeResolverConfig } const DEFAULT_SCHEMA_KEY = Symbol('NO_MESSAGE_TYPE') @@ -14,19 +25,27 @@ export class MessageSchemaContainer { public readonly messageDefinitions: Record private readonly messageSchemas: Record> - private readonly messageTypeField?: string + private readonly messageTypeResolver?: MessageTypeResolverConfig constructor(options: MessageSchemaContainerOptions) { - this.messageTypeField = options.messageTypeField + this.messageTypeResolver = options.messageTypeResolver this.messageSchemas = this.resolveMap(options.messageSchemas) this.messageDefinitions = this.resolveMap(options.messageDefinitions ?? []) } + /** + * Resolves the schema for a message based on its type. + * + * @param message - The parsed message data + * @param attributes - Optional message-level attributes (e.g., PubSub attributes) + * @returns Either an error or the resolved schema + */ public resolveSchema( // biome-ignore lint/suspicious/noExplicitAny: This is expected message: Record, + attributes?: Record, ): Either> { - const messageType = this.messageTypeField ? message[this.messageTypeField] : undefined + const messageType = this.resolveMessageTypeFromData(message, attributes) const schema = this.messageSchemas[messageType ?? DEFAULT_SCHEMA_KEY] if (!schema) { @@ -39,22 +58,67 @@ export class MessageSchemaContainer { return { result: schema } } + /** + * Resolves message type from message data and optional attributes. + */ + private resolveMessageTypeFromData( + messageData: unknown, + messageAttributes?: Record, + ): string | undefined { + if (this.messageTypeResolver) { + const context: MessageTypeResolverContext = { messageData, messageAttributes } + return resolveMessageType(this.messageTypeResolver, context) + } + + return undefined + } + + /** + * Gets the field path used for extracting message type from schemas during registration. + * Returns undefined for literal or custom resolver modes. + */ + private getMessageTypePathForSchema(): string | undefined { + if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) { + return this.messageTypeResolver.messageTypePath + } + // For literal or custom resolver, we don't extract type from schema + return undefined + } + + /** + * Gets the literal message type if configured. + */ + private getLiteralMessageType(): string | undefined { + if (this.messageTypeResolver && isMessageTypeLiteralConfig(this.messageTypeResolver)) { + return this.messageTypeResolver.literal + } + return undefined + } + private resolveMap>( array: readonly T[], ): Record { const result: Record = {} + const literalType = this.getLiteralMessageType() + const messageTypePath = this.getMessageTypePathForSchema() + for (const item of array) { let type: string | undefined - if (this.messageTypeField) { + // If literal type is configured, use it for all schemas + if (literalType) { + type = literalType + } else if (messageTypePath) { + // Extract type from schema shape using the field path type = 'publisherSchema' in item - ? // @ts-expect-error - item.publisherSchema?.shape[this.messageTypeField]?.value - : // @ts-expect-error - item.shape?.[this.messageTypeField]?.value + ? extractMessageTypeFromSchema(item.publisherSchema, messageTypePath) + : // @ts-expect-error - ZodSchema has shape property at runtime + extractMessageTypeFromSchema(item, messageTypePath) } + // For custom resolver without field path, we can't extract from schema + // All schemas will use DEFAULT_SCHEMA_KEY const key = type ?? DEFAULT_SCHEMA_KEY if (result[key]) throw new Error(`Duplicate schema for type: ${key.toString()}`) diff --git a/packages/core/lib/queues/MessageTypeResolver.ts b/packages/core/lib/queues/MessageTypeResolver.ts new file mode 100644 index 00000000..afdde16a --- /dev/null +++ b/packages/core/lib/queues/MessageTypeResolver.ts @@ -0,0 +1,194 @@ +/** + * Context passed to a custom message type resolver function. + * Contains both the parsed message data and any message-level attributes/metadata. + */ +export type MessageTypeResolverContext = { + /** + * The parsed/decoded message body (e.g., JSON-parsed data field in PubSub) + */ + messageData: unknown + /** + * Message-level attributes/metadata (e.g., PubSub message attributes, SQS message attributes) + * This is where Cloud Storage notifications put eventType, for example. + */ + messageAttributes?: Record +} + +/** + * Function that extracts the message type from a message. + * Used for routing messages to appropriate handlers and schemas. + * + * The function MUST return a valid message type string. If the type cannot be + * determined, the function should either: + * - Return a default type (e.g., 'unknown' or a fallback handler type) + * - Throw an error with a descriptive message + * + * @example + * // Extract from attributes (e.g., Cloud Storage notifications) + * const resolver: MessageTypeResolverFn = ({ messageAttributes }) => { + * const eventType = messageAttributes?.eventType as string | undefined + * if (!eventType) { + * throw new Error('eventType attribute is required') + * } + * return eventType + * } + * + * @example + * // Extract from nested data with fallback + * const resolver: MessageTypeResolverFn = ({ messageData }) => { + * const data = messageData as { metadata?: { eventName?: string } } + * return data.metadata?.eventName ?? 'default.event' + * } + */ +export type MessageTypeResolverFn = (context: MessageTypeResolverContext) => string + +/** + * Configuration for resolving message types. + * + * Three modes are supported: + * + * 1. **Field path** (string): Extract type from a field at the root of the message. + * @example { messageTypePath: 'type' } // extracts type from message.type + * @example { messageTypePath: 'detail-type' } // for EventBridge events + * + * 2. **Constant type** (object with `literal`): All messages are treated as the same type. + * Useful when a subscription/queue only receives one type of message. + * @example { literal: 'order.created' } + * + * 3. **Custom resolver** (object with `resolver`): Full flexibility via callback function. + * Use when type needs to be extracted from attributes, nested fields, or requires transformation. + * @example + * { + * resolver: ({ messageAttributes }) => messageAttributes?.eventType as string + * } + * + * @example + * // Cloud Storage notifications via PubSub - type is in attributes + * const config: MessageTypeResolverConfig = { + * resolver: ({ messageAttributes }) => { + * const eventType = messageAttributes?.eventType as string | undefined + * if (!eventType) { + * throw new Error('eventType attribute is required for Cloud Storage notifications') + * } + * // Optionally map to your internal event types + * if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created' + * if (eventType === 'OBJECT_DELETE') return 'storage.object.deleted' + * return eventType + * } + * } + * + * @example + * // CloudEvents format - type might be in envelope or data + * const config: MessageTypeResolverConfig = { + * resolver: ({ messageData, messageAttributes }) => { + * // Check for CloudEvents binary mode (type in attributes) + * if (messageAttributes?.['ce-type']) { + * return messageAttributes['ce-type'] as string + * } + * // Fall back to type in message data + * const data = messageData as { type?: string } + * if (!data.type) { + * throw new Error('Message type not found in CloudEvents envelope or message data') + * } + * return data.type + * } + * } + */ +export type MessageTypeResolverConfig = + | { + /** + * Field name at the root of the message containing the message type. + */ + messageTypePath: string + } + | { + /** + * Constant message type for all messages. + * Use when all messages in a queue/subscription are of the same type. + */ + literal: string + } + | { + /** + * Custom function to extract message type from message data and/or attributes. + * Provides full flexibility for complex routing scenarios. + */ + resolver: MessageTypeResolverFn + } + +/** + * Type guard to check if config uses field path mode + */ +export function isMessageTypePathConfig( + config: MessageTypeResolverConfig, +): config is { messageTypePath: string } { + return 'messageTypePath' in config +} + +/** + * Type guard to check if config uses literal/constant mode + */ +export function isMessageTypeLiteralConfig( + config: MessageTypeResolverConfig, +): config is { literal: string } { + return 'literal' in config +} + +/** + * Type guard to check if config uses custom resolver mode + */ +export function isMessageTypeResolverFnConfig( + config: MessageTypeResolverConfig, +): config is { resolver: MessageTypeResolverFn } { + return 'resolver' in config +} + +/** + * Resolves message type using the provided configuration. + * + * @param config - The resolver configuration + * @param context - Context containing message data and attributes + * @returns The resolved message type + * @throws Error if message type cannot be resolved (for messageTypePath mode) + */ +export function resolveMessageType( + config: MessageTypeResolverConfig, + context: MessageTypeResolverContext, +): string { + if (isMessageTypeLiteralConfig(config)) { + return config.literal + } + + if (isMessageTypePathConfig(config)) { + const data = context.messageData as Record | undefined + const messageType = data?.[config.messageTypePath] as string | undefined + if (messageType === undefined) { + throw new Error( + `Unable to resolve message type: field '${config.messageTypePath}' not found in message data`, + ) + } + return messageType + } + + // Custom resolver function - must return a string (user handles errors/defaults) + return config.resolver(context) +} + +/** + * Extracts message type from schema definition using the field path. + * Used during handler/schema registration to build the routing map. + * + * @param schema - Zod schema with shape property + * @param messageTypePath - Field name containing the type literal + * @returns The literal type value from the schema, or undefined + */ +export function extractMessageTypeFromSchema( + // biome-ignore lint/suspicious/noExplicitAny: Schema shape can be any + schema: { shape?: Record }, + messageTypePath: string | undefined, +): string | undefined { + if (!messageTypePath) { + return undefined + } + return schema.shape?.[messageTypePath]?.value as string | undefined +} diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index c56e0b52..9100af14 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -4,6 +4,7 @@ import type { MessageDeduplicationConfig } from '../message-deduplication/messag import type { PayloadStoreConfig } from '../payload-store/payloadStoreTypes.ts' import type { MessageHandlerConfig } from '../queues/HandlerContainer.ts' import type { HandlerSpy, HandlerSpyParams } from '../queues/HandlerSpy.ts' +import type { MessageTypeResolverConfig } from '../queues/MessageTypeResolver.ts' import type { MessageProcessingResult, TransactionObservabilityManager, @@ -21,9 +22,10 @@ export type ProcessedMessageMetadata messageAttributes?.eventType as string + * } + * } + */ + messageTypeResolver?: MessageTypeResolverConfig messageIdField?: string messageTimestampField?: string messageDeduplicationIdField?: string diff --git a/packages/core/package.json b/packages/core/package.json index c937aeb1..5e689643 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -48,7 +48,7 @@ "rimraf": "^6.0.1", "typescript": "^5.9.2", "vitest": "^3.2.4", - "zod": "^4.0.17" + "zod": "^4.1.13" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { diff --git a/packages/core/test/queues/HandlerContainer.spec.ts b/packages/core/test/queues/HandlerContainer.spec.ts index ce0515e7..2d073acf 100644 --- a/packages/core/test/queues/HandlerContainer.spec.ts +++ b/packages/core/test/queues/HandlerContainer.spec.ts @@ -133,7 +133,7 @@ describe('HandlerContainer', () => { const container = new HandlerContainer({ messageHandlers: configs, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, }) const userHandler = container.resolveHandler('user.created') @@ -150,7 +150,7 @@ describe('HandlerContainer', () => { const container = new HandlerContainer({ messageHandlers: configs, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, }) expect(() => container.resolveHandler('unknown.type')).toThrow( @@ -159,6 +159,243 @@ describe('HandlerContainer', () => { }) }) + describe('resolveMessageType', () => { + describe('with messageTypeField (legacy)', () => { + it('should extract message type from the specified field', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { messageTypePath: 'type' }, + }) + + const messageType = container.resolveMessageType({ + type: 'user.created', + userId: '1', + email: 'test@test.com', + }) + expect(messageType).toBe('user.created') + }) + + it('should throw error when field is missing', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { messageTypePath: 'type' }, + }) + + expect(() => container.resolveMessageType({ userId: '1' })).toThrow( + "Unable to resolve message type: field 'type' not found in message data", + ) + }) + }) + + describe('with messageTypeResolver', () => { + it('should use literal resolver', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { literal: 'user.created' }, + }) + + const messageType = container.resolveMessageType({}) + expect(messageType).toBe('user.created') + }) + + it('should use messageTypePath resolver', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { messageTypePath: 'type' }, + }) + + const messageType = container.resolveMessageType({ type: 'user.created' }) + expect(messageType).toBe('user.created') + }) + + it('should use custom resolver function', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig( + USER_MESSAGE_SCHEMA, + () => Promise.resolve({ result: 'success' as const }), + { messageType: 'user.created' }, // Required when using custom resolver + ) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { + resolver: ({ messageAttributes }) => { + const eventType = messageAttributes?.eventType as string + if (!eventType) throw new Error('eventType required') + return eventType === 'USER_CREATED' ? 'user.created' : eventType + }, + }, + }) + + const messageType = container.resolveMessageType({}, { eventType: 'USER_CREATED' }) + expect(messageType).toBe('user.created') + }) + + it('should pass messageAttributes to custom resolver', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig( + USER_MESSAGE_SCHEMA, + () => Promise.resolve({ result: 'success' as const }), + { messageType: 'user.created' }, // Required when using custom resolver + ) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { + resolver: ({ messageData, messageAttributes }) => { + // Prefer attributes over data + if (messageAttributes?.type) { + return messageAttributes.type as string + } + const data = messageData as { type?: string } + if (!data.type) throw new Error('type required') + return data.type + }, + }, + }) + + // From attributes + expect(container.resolveMessageType({}, { type: 'user.created' })).toBe('user.created') + // From data when no attributes + expect(container.resolveMessageType({ type: 'user.created' })).toBe('user.created') + }) + }) + + describe('with no configuration', () => { + it('should throw error when neither messageTypeField nor messageTypeResolver is configured', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig( + USER_MESSAGE_SCHEMA, + () => Promise.resolve({ result: 'success' as const }), + { messageType: 'user.created' }, // Explicit type to allow handler registration + ) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + }) + + // Handler is registered but runtime resolution fails + expect(() => container.resolveMessageType({ type: 'user.created' })).toThrow( + 'Unable to resolve message type: messageTypeResolver is not configured', + ) + }) + + it('should throw error during registration if message type cannot be determined', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + // Without messageTypePath, literal resolver, or explicit messageType, registration should fail + expect( + () => + new HandlerContainer({ + messageHandlers: configs, + }), + ).toThrow( + 'Unable to determine message type for handler. ' + + 'Either provide messageType in handler options, use a literal resolver, ' + + 'or ensure the schema has a literal type field matching messageTypePath.', + ) + }) + + it('should throw error for duplicate message types', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const }), { + messageType: 'duplicate.type', + }) + .addConfig(ORDER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const }), { + messageType: 'duplicate.type', + }) + .build() + + expect( + () => + new HandlerContainer({ + messageHandlers: configs, + }), + ).toThrow('Duplicate handler for message type: duplicate.type') + }) + }) + + describe('with explicit messageType in handler options', () => { + it('should use explicit messageType over schema extraction', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig( + USER_MESSAGE_SCHEMA, + () => Promise.resolve({ result: 'success' as const }), + { messageType: 'custom.type' }, // Override schema-derived type + ) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { messageTypePath: 'type' }, + }) + + // Handler is registered under 'custom.type', not 'user.created' + expect(() => container.resolveHandler('user.created')).toThrow( + 'Unsupported message type: user.created', + ) + const handler = container.resolveHandler('custom.type') + expect(handler.schema).toBe(USER_MESSAGE_SCHEMA) + }) + + it('should support multiple handlers with explicit types', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const }), { + messageType: 'storage.object.created', + }) + .addConfig(ORDER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const }), { + messageType: 'storage.object.deleted', + }) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { + resolver: ({ messageAttributes }) => { + const eventType = messageAttributes?.eventType as string + if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created' + if (eventType === 'OBJECT_DELETE') return 'storage.object.deleted' + throw new Error(`Unknown event type: ${eventType}`) + }, + }, + }) + + // Verify handlers are correctly registered + expect(container.resolveHandler('storage.object.created').schema).toBe(USER_MESSAGE_SCHEMA) + expect(container.resolveHandler('storage.object.deleted').schema).toBe(ORDER_MESSAGE_SCHEMA) + + // Verify runtime resolution works with the resolver + expect(container.resolveMessageType({}, { eventType: 'OBJECT_FINALIZE' })).toBe( + 'storage.object.created', + ) + expect(container.resolveMessageType({}, { eventType: 'OBJECT_DELETE' })).toBe( + 'storage.object.deleted', + ) + }) + }) + }) + describe('handler execution', () => { it('should execute handler with correct context', async () => { const context: TestContext = { @@ -174,7 +411,7 @@ describe('HandlerContainer', () => { const container = new HandlerContainer({ messageHandlers: configs, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, }) const handler = container.resolveHandler('user.created') diff --git a/packages/core/test/queues/HandlerContainer.types.spec.ts b/packages/core/test/queues/HandlerContainer.types.spec.ts new file mode 100644 index 00000000..fddaa10e --- /dev/null +++ b/packages/core/test/queues/HandlerContainer.types.spec.ts @@ -0,0 +1,241 @@ +/** + * TypeScript type tests for HandlerContainer and related types. + * These tests verify that the type system correctly enforces constraints. + * + * Run with: npx tsc --noEmit to check types + */ + +import type { Either } from '@lokalise/node-core' +import { describe, expectTypeOf, it } from 'vitest' +import z from 'zod/v4' + +import { + type HandlerConfigOptions, + HandlerContainer, + type HandlerContainerOptions, + MessageHandlerConfig, + MessageHandlerConfigBuilder, +} from '../../lib/queues/HandlerContainer.ts' +import type { MessageTypeResolverConfig } from '../../lib/queues/MessageTypeResolver.ts' + +// Test message types +type UserMessage = { + type: 'user.created' + userId: string + email: string +} + +type OrderMessage = { + type: 'order.placed' + orderId: string + amount: number +} + +type SupportedMessages = UserMessage | OrderMessage + +// Test context +type TestContext = { + processedMessages: unknown[] +} + +// Test schemas +const USER_MESSAGE_SCHEMA = z.object({ + type: z.literal('user.created'), + userId: z.string(), + email: z.string(), +}) + +const ORDER_MESSAGE_SCHEMA = z.object({ + type: z.literal('order.placed'), + orderId: z.string(), + amount: z.number(), +}) + +describe('HandlerContainer Types', () => { + describe('MessageHandlerConfigBuilder', () => { + it('should infer handler message type from schema', () => { + const builder = new MessageHandlerConfigBuilder() + + // Handler receives the correct message type inferred from schema + builder.addConfig(USER_MESSAGE_SCHEMA, (message, _context) => { + expectTypeOf(message).toEqualTypeOf() + expectTypeOf(message.userId).toBeString() + expectTypeOf(message.email).toBeString() + return Promise.resolve({ result: 'success' as const }) + }) + + builder.addConfig(ORDER_MESSAGE_SCHEMA, (message, _context) => { + expectTypeOf(message).toEqualTypeOf() + expectTypeOf(message.orderId).toBeString() + expectTypeOf(message.amount).toBeNumber() + return Promise.resolve({ result: 'success' as const }) + }) + }) + + it('should accept messageType in options', () => { + const builder = new MessageHandlerConfigBuilder() + + // messageType is optional in handler options + builder.addConfig( + USER_MESSAGE_SCHEMA, + () => Promise.resolve({ result: 'success' as const }), + { messageType: 'custom.type' }, + ) + + // Other options should still work alongside messageType + builder.addConfig( + ORDER_MESSAGE_SCHEMA, + () => Promise.resolve({ result: 'success' as const }), + { + messageType: 'another.type', + messageLogFormatter: (message) => ({ id: message.orderId }), + }, + ) + }) + + it('should type context correctly', () => { + const builder = new MessageHandlerConfigBuilder() + + builder.addConfig(USER_MESSAGE_SCHEMA, (_message, context) => { + expectTypeOf(context).toEqualTypeOf() + expectTypeOf(context.processedMessages).toBeArray() + return Promise.resolve({ result: 'success' as const }) + }) + }) + }) + + describe('MessageHandlerConfig', () => { + it('should store messageType as optional string', () => { + const config = new MessageHandlerConfig( + USER_MESSAGE_SCHEMA, + () => Promise.resolve({ result: 'success' as const }), + { messageType: 'explicit.type' }, + ) + + expectTypeOf(config.messageType).toEqualTypeOf() + }) + }) + + describe('HandlerConfigOptions', () => { + it('should have messageType as optional', () => { + type Options = HandlerConfigOptions + + expectTypeOf().toEqualTypeOf() + }) + }) + + describe('HandlerContainerOptions', () => { + it('should accept messageTypeResolver config', () => { + type Options = HandlerContainerOptions + + expectTypeOf().toEqualTypeOf< + MessageTypeResolverConfig | undefined + >() + }) + }) + + describe('HandlerContainer', () => { + it('resolveMessageType should return string', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { messageTypePath: 'type' }, + }) + + const result = container.resolveMessageType({ type: 'user.created' }) + expectTypeOf(result).toBeString() + }) + + it('resolveHandler should require string parameter', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { messageTypePath: 'type' }, + }) + + // This should compile - string is required + container.resolveHandler('user.created') + + // The parameter type should be string + expectTypeOf(container.resolveHandler).parameter(0).toBeString() + }) + + it('resolveHandler accepts any string (runtime validation)', () => { + const configs = new MessageHandlerConfigBuilder() + .addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const })) + .build() + + const container = new HandlerContainer({ + messageHandlers: configs, + messageTypeResolver: { messageTypePath: 'type' }, + }) + + // Note: resolveHandler accepts any string at compile time. + // Type validation happens at runtime - unknown types throw an error. + // This is by design: the message type comes from resolveMessageType() + // which extracts it from actual validated messages. + expectTypeOf(container.resolveHandler).parameter(0).toBeString() + expectTypeOf(container.resolveHandler).parameter(0).not.toEqualTypeOf<'user.created'>() + }) + }) + + describe('MessageTypeResolverConfig', () => { + it('should accept messageTypePath config', () => { + const config: MessageTypeResolverConfig = { messageTypePath: 'type' } + expectTypeOf(config).toMatchTypeOf() + }) + + it('should accept literal config', () => { + const config: MessageTypeResolverConfig = { literal: 'fixed.type' } + expectTypeOf(config).toMatchTypeOf() + }) + + it('should accept resolver function config', () => { + const config: MessageTypeResolverConfig = { + resolver: ({ messageData, messageAttributes }) => { + expectTypeOf(messageData).toBeUnknown() + expectTypeOf(messageAttributes).toEqualTypeOf | undefined>() + return 'resolved.type' + }, + } + expectTypeOf(config).toMatchTypeOf() + }) + + it('resolver function should return string', () => { + const config: MessageTypeResolverConfig = { + resolver: () => { + // Return type must be string + return 'type' + }, + } + + if ('resolver' in config) { + const result = config.resolver({ messageData: {} }) + expectTypeOf(result).toBeString() + } + }) + }) + + describe('Handler return type', () => { + it('should return Either with retryLater or success', () => { + const builder = new MessageHandlerConfigBuilder() + + builder.addConfig(USER_MESSAGE_SCHEMA, () => { + // Both return types should be valid + const successResult: Either<'retryLater', 'success'> = { result: 'success' } + const retryResult: Either<'retryLater', 'success'> = { error: 'retryLater' } + + expectTypeOf(successResult).toMatchTypeOf>() + expectTypeOf(retryResult).toMatchTypeOf>() + + return Promise.resolve(successResult) + }) + }) + }) +}) diff --git a/packages/core/test/queues/HandlerSpy.spec.ts b/packages/core/test/queues/HandlerSpy.spec.ts index 046193f0..0c5dd353 100644 --- a/packages/core/test/queues/HandlerSpy.spec.ts +++ b/packages/core/test/queues/HandlerSpy.spec.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from 'vitest' -import { HandlerSpy, isHandlerSpy } from '../../lib/queues/HandlerSpy.ts' +import { HandlerSpy, isHandlerSpy, TYPE_NOT_RESOLVED } from '../../lib/queues/HandlerSpy.ts' type Message = { id: string @@ -87,10 +87,14 @@ describe('HandlerSpy', () => { it('Remove stored events', () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_2, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_2, + }, + undefined, + 'test.type', + ) spy.clear() @@ -103,15 +107,23 @@ describe('HandlerSpy', () => { it('Finds previously consumed event', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_2, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_2, + }, + undefined, + 'test.type', + ) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE, + }, + undefined, + 'test.type', + ) const message = await spy.waitForMessage({ status: 'done', @@ -123,15 +135,23 @@ describe('HandlerSpy', () => { it('Finds previously consumed event with type narrowing', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_2B, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_2B, + }, + undefined, + 'test.type', + ) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGEB, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGEB, + }, + undefined, + 'test.type', + ) const message = await spy.waitForMessage({ status2: 'done', @@ -143,10 +163,14 @@ describe('HandlerSpy', () => { it('Finds previously consumed event with a deep match', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_3, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_3, + }, + undefined, + 'test.type', + ) const message = await spy.waitForMessage({ payload: { @@ -160,15 +184,23 @@ describe('HandlerSpy', () => { it('Finds previously consumed event with a deep partial match', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_3, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_3, + }, + undefined, + 'test.type', + ) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_4, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_4, + }, + undefined, + 'test.type', + ) const message = await spy.waitForMessage({ payload: { @@ -193,10 +225,14 @@ describe('HandlerSpy', () => { }, } - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message, + }, + undefined, + 'test.type', + ) const messageResult = await spy.waitForMessage({ payload: { @@ -212,15 +248,23 @@ describe('HandlerSpy', () => { it('Finds previously consumed event with an array match in order', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_5A, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_5A, + }, + undefined, + 'test.type', + ) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_5B, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_5B, + }, + undefined, + 'test.type', + ) const message = await spy.waitForMessage({ payload: { @@ -236,15 +280,23 @@ describe('HandlerSpy', () => { it('Finds multiple previously consumed events', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'retryLater' }, - message: TEST_MESSAGE, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'retryLater' }, + message: TEST_MESSAGE, + }, + undefined, + 'test.type', + ) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE, + }, + undefined, + 'test.type', + ) const message = await spy.waitForMessage( { @@ -267,19 +319,27 @@ describe('HandlerSpy', () => { it('Waits for an message to be consumed', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_2, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_2, + }, + undefined, + 'test.type', + ) const spyPromise = spy.waitForMessage({ status: 'done', }) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE, + }, + undefined, + 'test.type', + ) const message = await spyPromise @@ -291,17 +351,25 @@ describe('HandlerSpy', () => { it('Waits for an message to be consumed by id', async () => { const spy = new HandlerSpy() - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_2, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_2, + }, + undefined, + 'test.type', + ) const spyPromise = spy.waitForMessageWithId(TEST_MESSAGE.id) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE, + }, + undefined, + 'test.type', + ) const message = await spyPromise @@ -313,15 +381,23 @@ describe('HandlerSpy', () => { messageIdField: 'id2', }) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGE_2B, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGE_2B, + }, + undefined, + 'test.type', + ) - spy.addProcessedMessage({ - processingResult: { status: 'consumed' }, - message: TEST_MESSAGEB, - }) + spy.addProcessedMessage( + { + processingResult: { status: 'consumed' }, + message: TEST_MESSAGEB, + }, + undefined, + 'test.type', + ) const message = await spy.waitForMessageWithId(TEST_MESSAGEB.id2) @@ -337,6 +413,7 @@ describe('HandlerSpy', () => { message: null, }, 'abc', + TYPE_NOT_RESOLVED, ) const messageResult = await spy.waitForMessageWithId('abc') diff --git a/packages/core/test/queues/MessageTypeResolver.spec.ts b/packages/core/test/queues/MessageTypeResolver.spec.ts new file mode 100644 index 00000000..ba5c38bc --- /dev/null +++ b/packages/core/test/queues/MessageTypeResolver.spec.ts @@ -0,0 +1,212 @@ +import { describe, expect, it } from 'vitest' + +import { + extractMessageTypeFromSchema, + isMessageTypeLiteralConfig, + isMessageTypePathConfig, + isMessageTypeResolverFnConfig, + resolveMessageType, +} from '../../lib/queues/MessageTypeResolver.ts' + +describe('MessageTypeResolver', () => { + describe('type guards', () => { + it('isMessageTypePathConfig should return true for messageTypePath config', () => { + expect(isMessageTypePathConfig({ messageTypePath: 'type' })).toBe(true) + expect(isMessageTypePathConfig({ literal: 'test' })).toBe(false) + expect(isMessageTypePathConfig({ resolver: () => 'test' })).toBe(false) + }) + + it('isMessageTypeLiteralConfig should return true for literal config', () => { + expect(isMessageTypeLiteralConfig({ literal: 'test' })).toBe(true) + expect(isMessageTypeLiteralConfig({ messageTypePath: 'type' })).toBe(false) + expect(isMessageTypeLiteralConfig({ resolver: () => 'test' })).toBe(false) + }) + + it('isMessageTypeResolverFnConfig should return true for resolver config', () => { + expect(isMessageTypeResolverFnConfig({ resolver: () => 'test' })).toBe(true) + expect(isMessageTypeResolverFnConfig({ messageTypePath: 'type' })).toBe(false) + expect(isMessageTypeResolverFnConfig({ literal: 'test' })).toBe(false) + }) + }) + + describe('resolveMessageType', () => { + describe('literal mode', () => { + it('should return the literal value regardless of message content', () => { + const config = { literal: 'order.created' } + + expect(resolveMessageType(config, { messageData: {} })).toBe('order.created') + expect(resolveMessageType(config, { messageData: { type: 'different' } })).toBe( + 'order.created', + ) + expect(resolveMessageType(config, { messageData: null })).toBe('order.created') + }) + }) + + describe('messageTypePath mode', () => { + it('should extract type from the specified field path', () => { + const config = { messageTypePath: 'type' } + + expect(resolveMessageType(config, { messageData: { type: 'user.created' } })).toBe( + 'user.created', + ) + }) + + it('should work with different field names', () => { + const config = { messageTypePath: 'eventType' } + + expect(resolveMessageType(config, { messageData: { eventType: 'order.placed' } })).toBe( + 'order.placed', + ) + }) + + it('should work with kebab-case field names', () => { + const config = { messageTypePath: 'detail-type' } + + expect( + resolveMessageType(config, { messageData: { 'detail-type': 'user.presence' } }), + ).toBe('user.presence') + }) + + it('should throw error when field is missing', () => { + const config = { messageTypePath: 'type' } + + expect(() => resolveMessageType(config, { messageData: {} })).toThrow( + "Unable to resolve message type: field 'type' not found in message data", + ) + }) + + it('should throw error when messageData is undefined', () => { + const config = { messageTypePath: 'type' } + + expect(() => resolveMessageType(config, { messageData: undefined })).toThrow( + "Unable to resolve message type: field 'type' not found in message data", + ) + }) + + it('should throw error when messageData is null', () => { + const config = { messageTypePath: 'type' } + + expect(() => resolveMessageType(config, { messageData: null })).toThrow( + "Unable to resolve message type: field 'type' not found in message data", + ) + }) + }) + + describe('custom resolver mode', () => { + it('should use the custom resolver function', () => { + const config = { + resolver: ({ messageData }: { messageData: unknown }) => { + const data = messageData as { metadata?: { eventName?: string } } + return data.metadata?.eventName ?? 'default' + }, + } + + expect( + resolveMessageType(config, { messageData: { metadata: { eventName: 'custom.event' } } }), + ).toBe('custom.event') + }) + + it('should pass messageAttributes to the resolver', () => { + const config = { + resolver: ({ messageAttributes }: { messageAttributes?: Record }) => { + const eventType = messageAttributes?.eventType as string + if (!eventType) throw new Error('eventType required') + return eventType + }, + } + + expect( + resolveMessageType(config, { + messageData: {}, + messageAttributes: { eventType: 'OBJECT_FINALIZE' }, + }), + ).toBe('OBJECT_FINALIZE') + }) + + it('should support mapping event types', () => { + const config = { + resolver: ({ messageAttributes }: { messageAttributes?: Record }) => { + const eventType = messageAttributes?.eventType as string + if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created' + if (eventType === 'OBJECT_DELETE') return 'storage.object.deleted' + return eventType ?? 'unknown' + }, + } + + expect( + resolveMessageType(config, { + messageData: {}, + messageAttributes: { eventType: 'OBJECT_FINALIZE' }, + }), + ).toBe('storage.object.created') + + expect( + resolveMessageType(config, { + messageData: {}, + messageAttributes: { eventType: 'OBJECT_DELETE' }, + }), + ).toBe('storage.object.deleted') + }) + + it('should allow resolver to throw custom errors', () => { + const config = { + resolver: () => { + throw new Error('Custom error: event type missing') + }, + } + + expect(() => resolveMessageType(config, { messageData: {} })).toThrow( + 'Custom error: event type missing', + ) + }) + }) + }) + + describe('extractMessageTypeFromSchema', () => { + it('should extract literal value from schema shape', () => { + const schema = { + shape: { + type: { value: 'user.created' }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'type')).toBe('user.created') + }) + + it('should return undefined when field path is undefined', () => { + const schema = { + shape: { + type: { value: 'user.created' }, + }, + } + + expect(extractMessageTypeFromSchema(schema, undefined)).toBeUndefined() + }) + + it('should return undefined when field is not in schema shape', () => { + const schema = { + shape: { + type: { value: 'user.created' }, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'eventType')).toBeUndefined() + }) + + it('should return undefined when schema has no shape', () => { + const schema = {} + + expect(extractMessageTypeFromSchema(schema, 'type')).toBeUndefined() + }) + + it('should return undefined when field has no value', () => { + const schema = { + shape: { + type: {}, + }, + } + + expect(extractMessageTypeFromSchema(schema, 'type')).toBeUndefined() + }) + }) +}) diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts index 7a7fcdba..66e288da 100644 --- a/packages/core/vitest.config.ts +++ b/packages/core/vitest.config.ts @@ -7,6 +7,10 @@ export default defineConfig({ watch: false, restoreMocks: true, pool: 'threads', + typecheck: { + enabled: true, + include: ['**/*.types.spec.ts'], + }, coverage: { provider: 'v8', include: ['lib/**/*.ts'], diff --git a/packages/gcp-pubsub/README.md b/packages/gcp-pubsub/README.md index e8259f4c..913e4a2b 100644 --- a/packages/gcp-pubsub/README.md +++ b/packages/gcp-pubsub/README.md @@ -114,7 +114,7 @@ Consumers receive and process messages from Pub/Sub subscriptions. They handle: ### Message Schemas Messages are validated using Zod schemas. Each message must have: -- A unique message type field (discriminator for routing) - configurable via `messageTypeField` (required) +- A unique message type field (discriminator for routing) - configurable via `messageTypeResolver` (required) - A message ID field (for tracking and deduplication) - configurable via `messageIdField` (default: `'id'`) - A timestamp field (added automatically if missing) - configurable via `messageTimestampField` (default: `'timestamp'`) @@ -163,7 +163,7 @@ class UserEventPublisher extends AbstractPubSubPublisher { }, }, messageSchemas: [UserEventSchema], - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, logMessages: true, } ) @@ -211,7 +211,7 @@ class UserEventConsumer extends AbstractPubSubConsumer() .addConfig( UserEventSchema, @@ -412,7 +412,7 @@ When using `locatorConfig`, you connect to existing resources without creating t { // Required - Message Schema Configuration messageSchemas: [Schema1, Schema2], // Array of Zod schemas - messageTypeField: 'messageType', // Field containing message type discriminator + messageTypeResolver: { messageTypePath: 'messageType' }, // Field containing message type discriminator // Topic Configuration (one of these required) creationConfig: { @@ -459,7 +459,7 @@ When using `locatorConfig`, you connect to existing resources without creating t { // Required - Message Handling Configuration handlers: MessageHandlerConfigBuilder.build(), // Message handlers configuration - messageTypeField: 'messageType', // Field containing message type discriminator + messageTypeResolver: { messageTypePath: 'messageType' }, // Field containing message type discriminator // Topic and Subscription Configuration (one of these required) creationConfig: { @@ -560,7 +560,7 @@ class OrderPublisher extends AbstractPubSubPublisher { // Map library's internal fields to your custom fields messageIdField: 'messageId', // Default: 'id' - messageTypeField: 'eventType', // Required + messageTypeResolver: { messageTypePath: 'eventType' }, // Required messageTimestampField: 'createdAt', // Default: 'timestamp' messageDeduplicationIdField: 'txId', // Default: 'deduplicationId' messageDeduplicationOptionsField: 'txOptions', // Default: 'deduplicationOptions' @@ -613,7 +613,7 @@ class LargeMessagePublisher extends AbstractPubSubPublisher { topic: { name: 'large-messages' }, }, messageSchemas: [MyMessageSchema], - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, payloadStoreConfig: { store: payloadStore, messageSizeThreshold: PUBSUB_MESSAGE_MAX_SIZE, // 10 MB @@ -784,7 +784,11 @@ Dead Letter Queues capture messages that cannot be processed after multiple atte The library provides `AbstractPubSubDlqConsumer`, a convenience class for consuming messages from a DLQ topic. Unlike regular consumers that route messages by type, DLQ consumers accept any message structure since dead-lettered messages can come from various failed processing scenarios. ```typescript -import { AbstractPubSubDlqConsumer, type DlqMessage } from '@message-queue-toolkit/gcp-pubsub' +import { + AbstractPubSubDlqConsumer, + type DlqMessage, + DLQ_MESSAGE_TYPE // 'dlq.message' - the message type used for all DLQ messages +} from '@message-queue-toolkit/gcp-pubsub' class MyDlqConsumer extends AbstractPubSubDlqConsumer { constructor(dependencies: PubSubConsumerDependencies, context: MyContext) { @@ -820,7 +824,7 @@ await dlqConsumer.start() ``` **Key differences from AbstractPubSubConsumer:** -- Does NOT require `messageTypeField` (accepts all message types) +- Uses a literal message type resolver (`DLQ_MESSAGE_TYPE = 'dlq.message'`) - all messages are treated as the same type - Uses a passthrough schema that accepts any message with an `id` field - Simplified handler configuration (single handler for all messages) - The `DlqMessage` type includes `id: string` and passes through all other fields @@ -914,7 +918,7 @@ await publisher.publish(message, { ### Message Handlers -Handlers process messages based on their type. Messages are routed to the appropriate handler using the discriminator field (configurable via `messageTypeField`): +Handlers process messages based on their type. Messages are routed to the appropriate handler using the discriminator field (configurable via `messageTypeResolver`): ```typescript import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' @@ -1748,7 +1752,7 @@ it('publishes message', async () => { **Constructor Options:** - `messageSchemas`: Array of Zod schemas for messages -- `messageTypeField`: Field name containing message type +- `messageTypeResolver`: Configuration for resolving message type - `creationConfig` / `locatorConfig`: Topic configuration - `logMessages`: Enable message logging - `payloadStoreConfig`: Payload offloading configuration @@ -1769,7 +1773,7 @@ it('publishes message', async () => { **Constructor Options:** - `handlers`: Message handler configuration -- `messageTypeField`: Field name containing message type +- `messageTypeResolver`: Configuration for resolving message type - `creationConfig` / `locatorConfig`: Topic + subscription configuration - `logMessages`: Enable message logging - `payloadStoreConfig`: Payload retrieval configuration diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index c0144864..8d9f35b3 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -144,7 +144,7 @@ export abstract class AbstractPubSubConsumer< this._messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options) this.handlerContainer = new HandlerContainer({ messageHandlers: options.handlers, - messageTypeField: options.messageTypeField, + messageTypeResolver: options.messageTypeResolver, }) } @@ -282,7 +282,10 @@ export abstract class AbstractPubSubConsumer< messagePayload = retrievalResult.result } - const resolveSchemaResult = this.resolveSchema(messagePayload as MessagePayloadType) + const resolveSchemaResult = this.resolveSchema( + messagePayload as MessagePayloadType, + message.attributes as Record, + ) if (resolveSchemaResult.error) { this.handleMessageProcessed({ message: messagePayload as MessagePayloadType, @@ -352,8 +355,28 @@ export abstract class AbstractPubSubConsumer< const releaseLock = acquireLockResult.result - // @ts-expect-error - const messageType = validatedMessage[this.messageTypeField] + // Resolve message type using the new resolver (with attributes) or legacy field + let messageType: string + try { + messageType = this.handlerContainer.resolveMessageType( + validatedMessage, + message.attributes as Record, + ) + } catch (resolveError) { + await releaseLock.release() + this.handleError(resolveError) + this.handleMessageProcessed({ + message: validatedMessage, + processingResult: { + status: 'error', + errorReason: 'invalidMessage', + }, + messageProcessingStartTimestamp, + queueName: this.subscriptionName ?? this.topicName, + }) + this.handleTerminalError(message, 'invalidMessage') + return + } try { // Process message @@ -447,8 +470,11 @@ export abstract class AbstractPubSubConsumer< } } - protected override resolveSchema(messagePayload: MessagePayloadType) { - return this._messageSchemaContainer.resolveSchema(messagePayload) + protected override resolveSchema( + messagePayload: MessagePayloadType, + messageAttributes?: Record, + ) { + return this._messageSchemaContainer.resolveSchema(messagePayload, messageAttributes) } protected override processMessage( diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts index 1e929bed..9baacb1e 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts @@ -1,5 +1,5 @@ import type { Either } from '@lokalise/node-core' -import { MessageHandlerConfigBuilder, NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core' +import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' import { z } from 'zod/v4' import { AbstractPubSubConsumer, @@ -8,6 +8,11 @@ import { } from './AbstractPubSubConsumer.ts' import type { PubSubCreationConfig, PubSubQueueLocatorType } from './AbstractPubSubService.ts' +/** + * Message type used for DLQ consumers. All DLQ messages are treated as this type. + */ +export const DLQ_MESSAGE_TYPE = 'dlq.message' + /** * Base schema for DLQ messages. * Uses passthrough() to accept any message structure while ensuring basic fields exist. @@ -31,7 +36,7 @@ export type DlqMessageHandler = ( /** * Options for AbstractPubSubDlqConsumer. - * Omits messageTypeField and handlers since DLQ consumers handle all message types uniformly. + * Omits messageTypeResolver and handlers since DLQ consumers handle all message types uniformly. */ export type PubSubDlqConsumerOptions< ExecutionContext, @@ -45,7 +50,7 @@ export type PubSubDlqConsumerOptions< CreationConfigType, QueueLocatorType >, - 'messageTypeField' | 'handlers' + 'messageTypeResolver' | 'handlers' > & { /** * Handler function to process DLQ messages. @@ -62,7 +67,7 @@ export type PubSubDlqConsumerOptions< * any message structure since DLQ messages can come from various failed processing scenarios. * * Key differences from AbstractPubSubConsumer: - * - Does NOT use messageTypeField routing (accepts all message types) + * - Does NOT use messageTypeResolver routing (accepts all message types) * - Uses a passthrough schema that accepts any message with an 'id' field * - Simplified handler configuration (single handler for all messages) * @@ -111,9 +116,8 @@ export abstract class AbstractPubSubDlqConsumer< dependencies, { ...restOptions, - // NO_MESSAGE_TYPE_FIELD ensures all messages use the default handler - // for both schema storage and lookup, allowing a single handler for any message type - messageTypeField: NO_MESSAGE_TYPE_FIELD, + // Use literal resolver - all DLQ messages are treated as the same type + messageTypeResolver: { literal: DLQ_MESSAGE_TYPE }, handlers: new MessageHandlerConfigBuilder() .addConfig(DLQ_MESSAGE_SCHEMA, (message, context) => handler(message, context)) .build(), diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts index 8089caac..d9484b43 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts @@ -69,8 +69,8 @@ export abstract class AbstractPubSubPublisher const parsedMessage = messageSchemaResult.result.parse(message) if (this.logMessages) { - // @ts-expect-error - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) + const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' + const resolvedLogMessage = this.resolveMessageLog(message, messageType) this.logMessage(resolvedLogMessage) } @@ -111,8 +111,7 @@ export abstract class AbstractPubSubPublisher details: { publisher: this.constructor.name, topicName: this.topicName, - // @ts-expect-error - messageType: message[this.messageTypeField] ?? 'unknown', + messageType: this.resolveMessageTypeFromMessage(message) ?? 'unknown', }, cause: err, }) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts index 68762907..ff759621 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts @@ -109,7 +109,7 @@ export class PubSubPermissionConsumer extends AbstractPubSubConsumer< dependencies, { ...options, - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, handlerSpy: true, handlers: new MessageHandlerConfigBuilder< SupportedMessages, diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts index c5db7763..3eb70a28 100644 --- a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts @@ -45,7 +45,7 @@ export class PubSubPermissionPublisher extends AbstractPubSubPublisher { }), newPublisherOptions: { handlerSpy: true, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, logMessages: false, }, }, @@ -208,7 +208,7 @@ describe('PubSubPublisherManager', () => { }, }, handlerSpy: true, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, messageSchemas: [injectedSchema.consumerSchema], }) diff --git a/packages/gcs-payload-store/README.md b/packages/gcs-payload-store/README.md index a171cdda..87bde232 100644 --- a/packages/gcs-payload-store/README.md +++ b/packages/gcs-payload-store/README.md @@ -238,7 +238,7 @@ class LargeMessagePublisher extends AbstractSqsPublisher { queue: { QueueName: 'large-messages' }, }, messageSchemas: [MY_MESSAGE_SCHEMA], - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, payloadStoreConfig: { store: new GCSPayloadStore( { gcsStorage: storage }, @@ -267,7 +267,7 @@ class PubSubLargeMessagePublisher extends AbstractPubSubPublisher { topic: { name: 'large-events' }, }, messageSchemas: [MY_MESSAGE_SCHEMA], - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, payloadStoreConfig: { store: new GCSPayloadStore( { gcsStorage: storage }, diff --git a/packages/kafka/lib/AbstractKafkaService.ts b/packages/kafka/lib/AbstractKafkaService.ts index 8d5f2014..1528ee90 100644 --- a/packages/kafka/lib/AbstractKafkaService.ts +++ b/packages/kafka/lib/AbstractKafkaService.ts @@ -13,6 +13,7 @@ import { type MessageProcessingResult, type PublicHandlerSpy, resolveHandlerSpy, + TYPE_NOT_RESOLVED, } from '@message-queue-toolkit/core' import type { BaseOptions } from '@platformatic/kafka' import type { @@ -91,7 +92,12 @@ export abstract class AbstractKafkaService< const { message, processingResult } = params const messageId = this.resolveMessageId(message.value) - this._handlerSpy?.addProcessedMessage({ message: message.value, processingResult }, messageId) + // Kafka doesn't have unified message type resolution yet, use TYPE_NOT_RESOLVED + this._handlerSpy?.addProcessedMessage( + { message: message.value, processingResult }, + messageId, + TYPE_NOT_RESOLVED, + ) if (this.options.logMessages) { this.logger.debug( diff --git a/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.spec.ts b/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.spec.ts index ac28c38f..1f9aa825 100644 --- a/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.spec.ts +++ b/packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.spec.ts @@ -207,7 +207,7 @@ describe('PrometheusMessageQueueTimeMetric', () => { processingResult: { status: 'consumed' }, message: message, queueName, - messageTimestamp: Date.now(), + messageTimestamp: timestamp, messageProcessingStartTimestamp: timestamp + 53, messageProcessingEndTimestamp: timestamp, }) diff --git a/packages/metrics/package.json b/packages/metrics/package.json index 2050598c..dc226150 100644 --- a/packages/metrics/package.json +++ b/packages/metrics/package.json @@ -40,7 +40,7 @@ "@message-queue-toolkit/core": "*", "@vitest/coverage-v8": "^3.2.4", "rimraf": "^6.0.1", - "typescript": "^5.9.2", + "typescript": "^5.9.3", "vitest": "^3.2.4" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", diff --git a/packages/sns/README.md b/packages/sns/README.md index 1d9dc327..fbac96db 100644 --- a/packages/sns/README.md +++ b/packages/sns/README.md @@ -94,7 +94,7 @@ Publisher → SNS Topic → [Subscriptions] → SQS Queues → Consumers ### Message Schemas Messages use the same schema requirements as SQS. Each message must have: -- A unique message type field (discriminator for routing) - configurable via `messageTypeField` (required) +- A unique message type field (discriminator for routing) - configurable via `messageTypeResolver` (required) - A message ID field (for tracking and deduplication) - configurable via `messageIdField` (default: `'id'`) - A timestamp field (added automatically if missing) - configurable via `messageTimestampField` (default: `'timestamp'`) @@ -143,7 +143,7 @@ class UserEventsPublisher extends AbstractSnsPublisher { }, { messageSchemas: [UserCreatedSchema, UserUpdatedSchema], - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, creationConfig: { topic: { Name: 'user-events-topic', @@ -214,7 +214,7 @@ class UserEventsConsumer extends AbstractSnsSqsConsumer< }, }, { - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, handlers: new MessageHandlerConfigBuilder() .addConfig( UserCreatedSchema, @@ -287,7 +287,7 @@ class UserEventsFifoPublisher extends AbstractSnsPublisher { }, { messageSchemas: [UserCreatedSchema, UserUpdatedSchema], - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, fifoTopic: true, // Enable FIFO mode // Option 1: Use a field from the message as MessageGroupId @@ -374,7 +374,7 @@ class UserEventsFifoConsumer extends AbstractSnsSqsConsumer< }, { fifoQueue: true, // Enable FIFO mode for SQS queue - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, handlers: new MessageHandlerConfigBuilder() .addConfig(UserCreatedSchema, handleUserCreated) .addConfig(UserUpdatedSchema, handleUserUpdated) @@ -495,7 +495,7 @@ When using `locatorConfig`, you connect to an existing topic without creating it { // Required - Message Schema Configuration messageSchemas: [Schema1, Schema2], // Array of Zod schemas - messageTypeField: 'messageType', // Field containing message type discriminator + messageTypeResolver: { messageTypePath: 'messageType' }, // Field containing message type discriminator // Topic Configuration (one of these required) creationConfig: { /* ... */ }, // Create topic if doesn't exist @@ -541,7 +541,7 @@ SNS consumers use the same options as SQS consumers, plus SNS-specific subscript // Required - Message Handling Configuration handlers: MessageHandlerConfigBuilder.build(), - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, // Topic & Queue Configuration creationConfig: { diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index a0bb1e97..cc32d0e3 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -126,8 +126,8 @@ export abstract class AbstractSnsPublisher this.locatorConfig?.topicName ?? this.creationConfig?.topic?.Name ?? 'unknown' if (this.logMessages) { - // @ts-expect-error - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) + const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' + const resolvedLogMessage = this.resolveMessageLog(message, messageType) this.logMessage(resolvedLogMessage) } @@ -173,8 +173,7 @@ export abstract class AbstractSnsPublisher details: { publisher: this.constructor.name, topic: this.topicArn, - // @ts-expect-error - messageType: message[this.messageTypeField] ?? 'unknown', + messageType: this.resolveMessageTypeFromMessage(message) ?? 'unknown', }, cause: err, }) diff --git a/packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts b/packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts index f9f7c1cd..e0ab9fa0 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts @@ -41,7 +41,7 @@ describe('SnsPublisherManager', () => { newPublisherOptions: { handlerSpy: true, messageIdField: 'id', - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, creationConfig: { updateAttributesIfExists: true, }, diff --git a/packages/sns/lib/sns/SnsPublisherManager.spec.ts b/packages/sns/lib/sns/SnsPublisherManager.spec.ts index 62479eea..c8986e9e 100644 --- a/packages/sns/lib/sns/SnsPublisherManager.spec.ts +++ b/packages/sns/lib/sns/SnsPublisherManager.spec.ts @@ -136,7 +136,7 @@ describe('SnsPublisherManager', () => { }, }, handlerSpy: true, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, messageSchemas: [TestEvents.created.publisherSchema], }) diff --git a/packages/sns/lib/sns/fakes/FakeConsumer.ts b/packages/sns/lib/sns/fakes/FakeConsumer.ts index db850d61..b9666171 100644 --- a/packages/sns/lib/sns/fakes/FakeConsumer.ts +++ b/packages/sns/lib/sns/fakes/FakeConsumer.ts @@ -36,7 +36,7 @@ export class FakeConsumer extends AbstractSn subscriptionConfig: { updateAttributesIfExists: true, }, - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, handlerSpy: true, }, dependencies, diff --git a/packages/sns/lib/utils/snsAttributeUtils.ts b/packages/sns/lib/utils/snsAttributeUtils.ts index 0611bde6..eb776e4d 100644 --- a/packages/sns/lib/utils/snsAttributeUtils.ts +++ b/packages/sns/lib/utils/snsAttributeUtils.ts @@ -46,11 +46,11 @@ export function generateTopicSubscriptionPolicy(params: TopicSubscriptionPolicyP export function generateFilterAttributes( // biome-ignore lint/suspicious/noExplicitAny: Expected messageSchemas: ZodSchema[], - messageTypeField: string, + messageTypePath: string, ) { const messageTypes = messageSchemas.map((schema) => { // @ts-expect-error - return schema.shape[messageTypeField].value as string + return schema.shape[messageTypePath].value as string }) return { diff --git a/packages/sns/package.json b/packages/sns/package.json index b0a9e449..da4fc6d9 100644 --- a/packages/sns/package.json +++ b/packages/sns/package.json @@ -59,7 +59,7 @@ "rimraf": "^6.0.1", "typescript": "^5.9.3", "vitest": "^3.2.4", - "zod": "^4.1.12" + "zod": "^4.1.13" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { diff --git a/packages/sns/test/consumers/CreateLocateConfigMixConsumer.ts b/packages/sns/test/consumers/CreateLocateConfigMixConsumer.ts index 6a1825cb..ec146846 100644 --- a/packages/sns/test/consumers/CreateLocateConfigMixConsumer.ts +++ b/packages/sns/test/consumers/CreateLocateConfigMixConsumer.ts @@ -36,7 +36,7 @@ export class CreateLocateConfigMixConsumer extends AbstractSnsSqsConsumer< .addConfig(TestEvents.created, entityCreatedHandler, {}) .addConfig(TestEvents.updated, entityUpdatedHandler, {}) .build(), - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, creationConfig: { queue: { QueueName: CreateLocateConfigMixConsumer.CONSUMED_QUEUE_NAME, diff --git a/packages/sns/test/consumers/SnsSqsEntityConsumer.ts b/packages/sns/test/consumers/SnsSqsEntityConsumer.ts index 9a87c26f..1c73d522 100644 --- a/packages/sns/test/consumers/SnsSqsEntityConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsEntityConsumer.ts @@ -79,7 +79,7 @@ export class SnsSqsEntityConsumer extends AbstractSnsSqsConsumer< topic: { Name: SnsSqsEntityConsumer.SUBSCRIBED_TOPIC_NAME }, }, }), - messageTypeField: 'type', + messageTypeResolver: { messageTypePath: 'type' }, subscriptionConfig: { updateAttributesIfExists: false, }, diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts index d54a1e1a..3155b3fd 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.ts @@ -161,7 +161,7 @@ export class SnsSqsPermissionConsumer extends AbstractSnsSqsConsumer< topic: { Name: SnsSqsPermissionConsumer.SUBSCRIBED_TOPIC_NAME }, }, }), - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, subscriptionConfig: { updateAttributesIfExists: false, }, diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumerFifo.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumerFifo.ts index 6a40be8e..c1b34884 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumerFifo.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumerFifo.ts @@ -165,7 +165,7 @@ export class SnsSqsPermissionConsumerFifo extends AbstractSnsSqsConsumer< }, }), fifoQueue: true, - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, subscriptionConfig: { updateAttributesIfExists: false, }, diff --git a/packages/sns/test/publishers/CreateLocateConfigMixPublisher.ts b/packages/sns/test/publishers/CreateLocateConfigMixPublisher.ts index 8361d66a..47ca4f55 100644 --- a/packages/sns/test/publishers/CreateLocateConfigMixPublisher.ts +++ b/packages/sns/test/publishers/CreateLocateConfigMixPublisher.ts @@ -24,7 +24,7 @@ export class CreateLocateConfigMixPublisher extends AbstractSnsPublisher payloadStoreConfig: options?.payloadStoreConfig, messageSchemas: [PERMISSIONS_ADD_MESSAGE_SCHEMA, PERMISSIONS_REMOVE_MESSAGE_SCHEMA], handlerSpy: true, - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, messageDeduplicationConfig: options?.messageDeduplicationConfig, enablePublisherDeduplication: options?.enablePublisherDeduplication, messageDeduplicationIdField: 'deduplicationId', diff --git a/packages/sns/test/publishers/SnsPermissionPublisherFifo.ts b/packages/sns/test/publishers/SnsPermissionPublisherFifo.ts index 0656ef1f..4a63c5db 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisherFifo.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisherFifo.ts @@ -42,7 +42,7 @@ export class SnsPermissionPublisherFifo extends AbstractSnsPublisher { }, { messageSchemas: [UserCreatedSchema, UserUpdatedSchema], - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, creationConfig: { queue: { QueueName: 'user-events-queue', @@ -193,7 +193,7 @@ class UserEventsConsumer extends AbstractSqsConsumer< }, }, { - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, handlers: new MessageHandlerConfigBuilder() .addConfig( UserCreatedSchema, @@ -247,7 +247,7 @@ class UserEventsFifoPublisher extends AbstractSqsPublisher { }, { messageSchemas: [UserCreatedSchema, UserUpdatedSchema], - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, fifoQueue: true, // Enable FIFO mode // Option 1: Use a field from the message as MessageGroupId @@ -327,7 +327,7 @@ class UserEventsFifoConsumer extends AbstractSqsConsumer< }, { fifoQueue: true, // Enable FIFO mode - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, handlers: new MessageHandlerConfigBuilder() .addConfig(UserCreatedSchema, handleUserCreated) .addConfig(UserUpdatedSchema, handleUserUpdated) @@ -426,7 +426,7 @@ When using `locatorConfig`, you connect to an existing queue without creating it { // Required - Message Schema Configuration messageSchemas: [Schema1, Schema2], // Array of Zod schemas - messageTypeField: 'messageType', // Field containing message type discriminator + messageTypeResolver: { messageTypePath: 'messageType' }, // Field containing message type discriminator // Queue Configuration (one of these required) creationConfig: { /* ... */ }, // Create queue if doesn't exist @@ -474,7 +474,7 @@ When using `locatorConfig`, you connect to an existing queue without creating it { // Required - Message Handling Configuration handlers: MessageHandlerConfigBuilder.build(), // Message handlers configuration - messageTypeField: 'messageType', // Field containing message type discriminator + messageTypeResolver: { messageTypePath: 'messageType' }, // Field containing message type discriminator // Queue Configuration (one of these required) creationConfig: { /* ... */ }, @@ -567,7 +567,7 @@ class OrderPublisher extends AbstractSqsPublisher { // Map library's internal fields to your custom fields messageIdField: 'messageId', // Default: 'id' - messageTypeField: 'eventType', // Required + messageTypeResolver: { messageTypePath: 'eventType' }, // Required messageTimestampField: 'createdAt', // Default: 'timestamp' messageDeduplicationIdField: 'txId', // Default: 'deduplicationId' messageDeduplicationOptionsField: 'txOptions', // Default: 'deduplicationOptions' @@ -792,7 +792,7 @@ await publisher.publish({ ### Message Handlers -Handlers process messages based on their type. Messages are routed to the appropriate handler using the discriminator field (configurable via `messageTypeField`): +Handlers process messages based on their type. Messages are routed to the appropriate handler using the discriminator field (configurable via `messageTypeResolver`): ```typescript import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' @@ -1741,7 +1741,7 @@ class EventBridgeConsumer extends AbstractSqsConsumer() @@ -1878,7 +1878,7 @@ class UserEventConsumer extends AbstractSqsConsumer { }, // EventBridge field mappings - messageTypeField: 'detail-type', + messageTypeResolver: { messageTypePath: 'detail-type' }, messageTimestampField: 'time', handlers: new MessageHandlerConfigBuilder() @@ -1952,7 +1952,7 @@ class CustomConsumer extends AbstractSqsConsumer { creationConfig: { queue: { QueueName: 'custom-queue' } }, // Map your custom field names - messageTypeField: 'eventType', // Instead of 'type' + messageTypeResolver: { messageTypePath: 'eventType' }, // Instead of 'type' messageIdField: 'correlationId', // Instead of 'id' messageTimestampField: 'occurredAt', // Instead of 'timestamp' @@ -1971,7 +1971,7 @@ class CustomConsumer extends AbstractSqsConsumer { **Configuration Options:** -- `messageTypeField` (required) - Field name containing the message type for routing +- `messageTypeResolver` (required) - Configuration containing the message type for routing - `messageIdField` (optional, default: `'id'`) - Field name containing the message ID - `messageTimestampField` (optional, default: `'timestamp'`) - Field name containing the timestamp diff --git a/packages/sqs/examples/eventbridge-consumer-example.ts b/packages/sqs/examples/eventbridge-consumer-example.ts index 5d43ac4d..0facbd21 100644 --- a/packages/sqs/examples/eventbridge-consumer-example.ts +++ b/packages/sqs/examples/eventbridge-consumer-example.ts @@ -109,7 +109,7 @@ class EventBridgeConsumer extends AbstractSqsConsumer({ - messageTypeField: this.messageTypeField, + messageTypeResolver: this.messageTypeResolver, messageHandlers: options.handlers, }) this.isDeduplicationEnabled = !!options.enableConsumerDeduplication @@ -378,9 +378,7 @@ export abstract class AbstractSqsConsumer< return message } - // @ts-expect-error - // eslint-disable-next-line @typescript-eslint/restrict-template-expressions - const messageType = parsedMessage[this.messageTypeField] + const messageType = this.resolveMessageTypeFromMessage(parsedMessage) ?? 'unknown' const transactionSpanId = `queue_${this.queueName}:${messageType}` // @ts-expect-error @@ -527,8 +525,7 @@ export abstract class AbstractSqsConsumer< messageProcessingStartTimestamp: number, ): Promise { // Extract message type for barrier rechecks - // @ts-expect-error - const messageType = parsedMessage[this.messageTypeField] as string + const messageType = this.resolveMessageTypeFromMessage(parsedMessage) ?? 'unknown' // Sleep and periodically recheck barrier until maxRetryDuration is exceeded while (this.shouldBeRetried(originalMessage, this.maxRetryDuration)) { diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index b77fd0be..620d821c 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -118,8 +118,8 @@ export abstract class AbstractSqsPublisher const parsedMessage = messageSchemaResult.result.parse(message) if (this.logMessages) { - // @ts-expect-error - const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField]) + const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' + const resolvedLogMessage = this.resolveMessageLog(message, messageType) this.logMessage(resolvedLogMessage) } @@ -164,8 +164,7 @@ export abstract class AbstractSqsPublisher publisher: this.constructor.name, queueArn: this.queueArn, queueName: this.queueName, - // @ts-expect-error - messageType: message[this.messageTypeField] ?? 'unknown', + messageType: this.resolveMessageTypeFromMessage(message) ?? 'unknown', }, cause: err, }) diff --git a/packages/sqs/package.json b/packages/sqs/package.json index f723f06c..72329547 100644 --- a/packages/sqs/package.json +++ b/packages/sqs/package.json @@ -52,9 +52,9 @@ "awilix-manager": "^6.1.0", "rimraf": "^6.0.1", "ioredis": "^5.6.1", - "typescript": "^5.9.2", + "typescript": "^5.9.3", "vitest": "^3.2.4", - "zod": "^4.0.17" + "zod": "^4.1.13" }, "homepage": "https://github.com/kibertoad/message-queue-toolkit", "repository": { diff --git a/packages/sqs/test/consumers/SqsEventBridgeConsumer.spec.ts b/packages/sqs/test/consumers/SqsEventBridgeConsumer.spec.ts index 8bdef003..bb59fffa 100644 --- a/packages/sqs/test/consumers/SqsEventBridgeConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsEventBridgeConsumer.spec.ts @@ -161,7 +161,7 @@ describe('SqsEventBridgeConsumer', () => { deletionConfig: { deleteIfExists: true, }, - messageTypeField: 'detail-type', + messageTypeResolver: { messageTypePath: 'detail-type' }, messageIdField: 'id', messageTimestampField: 'time', handlerSpy: true, diff --git a/packages/sqs/test/consumers/SqsEventBridgeConsumer.ts b/packages/sqs/test/consumers/SqsEventBridgeConsumer.ts index 55ff0e41..c17224e7 100644 --- a/packages/sqs/test/consumers/SqsEventBridgeConsumer.ts +++ b/packages/sqs/test/consumers/SqsEventBridgeConsumer.ts @@ -25,7 +25,7 @@ export type EventBridgeTestContext = { * EventBridge consumer demonstrating non-standard event format support. * * Key configurations: - * - messageTypeField: 'detail-type' (instead of default 'type') + * - messageTypeResolver: { messageTypePath: 'detail-type' } (instead of default 'type') * - messageTimestampField: 'time' (instead of default 'timestamp') * * How it works: @@ -53,7 +53,7 @@ export class SqsEventBridgeConsumer extends AbstractSqsConsumer< }, // EventBridge-specific field mappings - messageTypeField: 'detail-type', // EventBridge uses 'detail-type' instead of 'type' + messageTypeResolver: { messageTypePath: 'detail-type' }, // EventBridge uses 'detail-type' instead of 'type' messageIdField: 'id', // Standard field, same as default messageTimestampField: 'time', // EventBridge uses 'time' instead of 'timestamp' diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.ts index 03cd743b..3155d507 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.ts @@ -116,7 +116,7 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< deleteIfExists: true, }, deadLetterQueue: options.deadLetterQueue, - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, handlerSpy: true, consumerOverrides: options.consumerOverrides ?? { terminateVisibilityTimeout: true, // this allows to retry failed messages immediately diff --git a/packages/sqs/test/consumers/SqsPermissionConsumerFifo.ts b/packages/sqs/test/consumers/SqsPermissionConsumerFifo.ts index cca571ff..6862a0d9 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumerFifo.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumerFifo.ts @@ -120,7 +120,7 @@ export class SqsPermissionConsumerFifo extends AbstractSqsConsumer< deleteIfExists: true, }, deadLetterQueue: options.deadLetterQueue, - messageTypeField: 'messageType', + messageTypeResolver: { messageTypePath: 'messageType' }, handlerSpy: true, consumerOverrides: options.consumerOverrides ?? { terminateVisibilityTimeout: true, // this allows to retry failed messages immediately diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.ts index 3de0d12c..33ab0c15 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.ts @@ -49,7 +49,7 @@ export class SqsPermissionPublisher extends AbstractSqsPublisher