Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@ They implement the following public methods:
* `options`, composed by
* `messageSchemas` – the `zod` schemas for all supported messages;
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation. **Note:** It is not supported for Kafka publisher
* `messageTypeResolver` - configuration for resolving the message type. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation. Supports three modes:
* `{ messageTypePath: 'type' }` - extract type from a field at the root of the message
* `{ literal: 'my.message.type' }` - use a constant type for all messages
* `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function
**Note:** It is not supported for Kafka publisher. See `@message-queue-toolkit/core` README for detailed documentation and examples.
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`;
* `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information);
Expand Down Expand Up @@ -94,7 +98,7 @@ Multi-schema consumers support multiple message types via handler configs. They
* `dependencies` – a set of dependencies depending on the protocol;
* `options`, composed by
* `handlers` – configuration for handling each of the supported message types. See "Multi-schema handler definition" for more details;
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler; **Note:** It is not supported for Kafka consumer
* `messageTypeResolver` - configuration for resolving the message type. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler. See Publishers section above for details. **Note:** It is not supported for Kafka consumer.
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or an ISO-8601 date string;
* `maxRetryDuration` - how long (in seconds) the message should be retried due to the `retryLater` result before marking it as consumed (and sending to DLQ, if one is configured). This is used to avoid infinite loops. Default is 4 days;
* `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name)
Expand Down
100 changes: 100 additions & 0 deletions UPGRADING.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,105 @@
# Upgrading Guide

## Upgrading </br> `core` `25.x.x` -> `26.0.0` </br> `sqs` `xx.x.x` -> `xx.0.0` </br> `sns` `xx.x.x` -> `xx.0.0` </br> `amqp` `xx.x.x` -> `xx.0.0` </br> `gcp-pubsub` `2.x.x` -> `3.0.0`

### Description of Breaking Changes

- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.

- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.

### Migration Steps

#### Replacing `messageTypeField` with `messageTypeResolver`

Replace `messageTypeField: 'fieldName'` with `messageTypeResolver: { messageTypePath: 'fieldName' }`:

```typescript
// Before
super(dependencies, {
messageTypeField: 'type',
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler)
.build(),
})

// After
super(dependencies, {
messageTypeResolver: { messageTypePath: 'type' },
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler)
.build(),
})
```

## Upgrading </br> `core` `24.x.x` -> `25.0.0` </br> `gcp-pubsub` `1.x.x` -> `2.0.0`

### Description of Breaking Changes

- **`NO_MESSAGE_TYPE_FIELD` constant removed**: The `NO_MESSAGE_TYPE_FIELD` constant has been removed from `@message-queue-toolkit/core`. Use `messageTypeResolver` with literal mode instead.

- **New `messageTypeResolver` configuration**: A flexible configuration for message type resolution. Supports three modes:
- `{ messageTypePath: 'type' }` - extract type from a field at the root of the message
- `{ literal: 'my.message.type' }` - use a constant type for all messages
- `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function

- **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time.

### Migration Steps

#### If using `NO_MESSAGE_TYPE_FIELD`

Replace `messageTypeField: NO_MESSAGE_TYPE_FIELD` with `messageTypeResolver: { literal: 'your.message.type' }`:

```typescript
// Before
import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core'

super(dependencies, {
messageTypeField: NO_MESSAGE_TYPE_FIELD,
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler)
.build(),
})

// After
super(dependencies, {
messageTypeResolver: { literal: 'your.message.type' },
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler, { messageType: 'your.message.type' })
.build(),
})
```

#### If using custom resolver function

When using `messageTypeResolver: { resolver: fn }`, provide explicit `messageType` in handler options:

```typescript
super(dependencies, {
messageTypeResolver: {
resolver: ({ messageAttributes }) => {
// Map external event types to internal types
const eventType = messageAttributes?.eventType as string
if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created'
throw new Error(`Unknown event type: ${eventType}`)
},
},
handlers: new MessageHandlerConfigBuilder()
.addConfig(ObjectSchema, handler, { messageType: 'storage.object.created' })
.build(),
})
```

#### GCP Pub/Sub DLQ Consumer

The `AbstractPubSubDlqConsumer` now uses `DLQ_MESSAGE_TYPE` constant internally. If you import this constant, update your import:

```typescript
import { DLQ_MESSAGE_TYPE } from '@message-queue-toolkit/gcp-pubsub'
// DLQ_MESSAGE_TYPE = 'dlq.message'
```

## Upgrading </br> `core` `19.0.0` -> `20.0.0` </br> `sqs` `19.0.0` -> `20.0.0` </br> `sns` `20.0.0` -> `21.0.0` </br> `amqp` `18.0.0` -> `19.0.0` </br> `metrics` `2.0.0` -> `3.0.0`

### Description of Breaking Changes
Expand Down
2 changes: 1 addition & 1 deletion examples/sns-sqs/lib/common/TestPublisherManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export const publisherManager = new SnsPublisherManager<
newPublisherOptions: {
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
messageTypeResolver: { messageTypePath: 'type' },
deletionConfig: {
deleteIfExists: isTest, // only enable this in tests
// and ensure that the owning side is doing the deletion.
Expand Down
2 changes: 1 addition & 1 deletion examples/sns-sqs/lib/common/UserConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class UserConsumer extends AbstractSnsSqsConsumer<SupportedMessages, Exec
.addConfig(UserEvents.created, userCreatedHandler, {})
.addConfig(UserEvents.updated, userUpdatedHandler, {})
.build(),
messageTypeField: 'type',
messageTypeResolver: { messageTypePath: 'type' },
// Consumer creates its own queue
creationConfig: {
queue: {
Expand Down
10 changes: 3 additions & 7 deletions packages/amqp/lib/AbstractAmqpConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export abstract class AbstractAmqpConsumer<
ExecutionContext,
PrehandlerOutput
>({
messageTypeField: this.messageTypeField,
messageTypeResolver: this.messageTypeResolver,
messageHandlers: options.handlers,
})
this.executionContext = executionContext
Expand Down Expand Up @@ -151,12 +151,8 @@ export abstract class AbstractAmqpConsumer<
}
const { originalMessage, parsedMessage } = deserializedMessage.result

// @ts-expect-error
const messageType = parsedMessage[this.messageTypeField]
const transactionSpanId = `queue_${this.queueName}:${
// @ts-expect-error
parsedMessage[this.messageTypeField]
}`
const messageType = this.resolveMessageTypeFromMessage(parsedMessage) ?? 'unknown'
const transactionSpanId = `queue_${this.queueName}:${messageType}`

// @ts-expect-error
const uniqueTransactionKey = parsedMessage[this.messageIdField]
Expand Down
7 changes: 3 additions & 4 deletions packages/amqp/lib/AbstractAmqpPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ export abstract class AbstractAmqpPublisher<
}

if (this.logMessages) {
// @ts-expect-error
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown'
const resolvedLogMessage = this.resolveMessageLog(message, messageType)
this.logMessage(resolvedLogMessage)
}

Expand Down Expand Up @@ -122,8 +122,7 @@ export abstract class AbstractAmqpPublisher<
// @ts-expect-error
queueName: this.queueName,
exchange: this.exchange,
// @ts-expect-error
messageType: message[this.messageTypeField] ?? 'unknown',
messageType: this.resolveMessageTypeFromMessage(message) ?? 'unknown',
}),
cause: err as Error,
})
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/test/consumers/AmqpPermissionConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ export class AmqpPermissionConsumer extends AbstractAmqpQueueConsumer<
deadLetterQueue: options?.deadLetterQueue,
logMessages: options?.logMessages,
handlerSpy: true,
messageTypeField: 'messageType',
messageTypeResolver: { messageTypePath: 'messageType' },
deletionConfig: { deleteIfExists: true },
maxRetryDuration: options?.maxRetryDuration,
handlers: new MessageHandlerConfigBuilder<
Expand Down
12 changes: 10 additions & 2 deletions packages/amqp/test/fakes/CustomFakeConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import type { ZodSchema } from 'zod/v4'
import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer.ts'
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService.ts'

/**
* Message type used for the catch-all handler in CustomFakeConsumer.
*/
const CUSTOM_FAKE_MESSAGE_TYPE = 'custom.fake.message'

export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<
PublisherBaseMessageType,
unknown
Expand All @@ -22,9 +27,12 @@ export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<
},
},
handlerSpy: true,
messageTypeField: 'messageType',
// Use literal resolver so all messages route to the same handler
messageTypeResolver: { literal: CUSTOM_FAKE_MESSAGE_TYPE },
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
.addConfig(schema, () => Promise.resolve({ result: 'success' }))
.addConfig(schema, () => Promise.resolve({ result: 'success' }), {
messageType: CUSTOM_FAKE_MESSAGE_TYPE,
})
.build(),
},
{},
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/test/fakes/FakeQueueConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class FakeQueueConsumer extends AbstractAmqpQueueConsumer<
deleteIfExists: true,
},
handlerSpy: true,
messageTypeField: 'type',
messageTypeResolver: { messageTypePath: 'type' },
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
.addConfig(eventDefinition.consumerSchema, () => Promise.resolve({ result: 'success' }))
.build(),
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/test/fakes/FakeTopicConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class FakeTopicConsumer extends AbstractAmqpTopicConsumer<
deleteIfExists: true,
},
handlerSpy: true,
messageTypeField: 'type',
messageTypeResolver: { messageTypePath: 'type' },
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
.addConfig(eventDefinition.consumerSchema, () => {
this.messageCounter++
Expand Down
2 changes: 1 addition & 1 deletion packages/amqp/test/publishers/AmqpPermissionPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class AmqpPermissionPublisher extends AbstractAmqpQueuePublisher<Supporte
}),
logMessages: options.logMessages ?? true,
messageSchemas: [PERMISSIONS_ADD_MESSAGE_SCHEMA, PERMISSIONS_REMOVE_MESSAGE_SCHEMA],
messageTypeField: 'messageType',
messageTypeResolver: { messageTypePath: 'messageType' },
})
}
}
6 changes: 3 additions & 3 deletions packages/amqp/test/utils/testContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export async function registerDependencies(
isLazyInitEnabled: true,
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
messageTypeResolver: { messageTypePath: 'type' },
},
})
},
Expand All @@ -173,7 +173,7 @@ export async function registerDependencies(
isLazyInitEnabled: false,
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
messageTypeResolver: { messageTypePath: 'type' },
},
})
},
Expand All @@ -193,7 +193,7 @@ export async function registerDependencies(
newPublisherOptions: {
handlerSpy: true,
messageIdField: 'id',
messageTypeField: 'type',
messageTypeResolver: { messageTypePath: 'type' },
},
})
},
Expand Down
Loading
Loading