Skip to content

Commit ea77735

Browse files
authored
Add flexible type resolution (#373)
1 parent 1b4c7d6 commit ea77735

File tree

64 files changed

+1994
-288
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1994
-288
lines changed

README.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ They implement the following public methods:
3030
* `options`, composed by
3131
* `messageSchemas` – the `zod` schemas for all supported messages;
3232
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or ISO-8601 date string, if your message doesn't contain it the library will add one automatically to avoid infinite loops on consumer;
33-
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation. **Note:** It is not supported for Kafka publisher
33+
* `messageTypeResolver` - configuration for resolving the message type. This field needs to be defined as `z.literal` in the schema and is used for resolving the correct schema for validation. Supports three modes:
34+
* `{ messageTypePath: 'type' }` - extract type from a field at the root of the message
35+
* `{ literal: 'my.message.type' }` - use a constant type for all messages
36+
* `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function
37+
**Note:** It is not supported for Kafka publisher. See `@message-queue-toolkit/core` README for detailed documentation and examples.
3438
* `locatorConfig` - configuration for resolving existing queue and/or topic. Should not be specified together with the `creationConfig`.
3539
* `creationConfig` - configuration for queue and/or topic to create, if one does not exist. Should not be specified together with the `locatorConfig`;
3640
* `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information);
@@ -94,7 +98,7 @@ Multi-schema consumers support multiple message types via handler configs. They
9498
* `dependencies` – a set of dependencies depending on the protocol;
9599
* `options`, composed by
96100
* `handlers` – configuration for handling each of the supported message types. See "Multi-schema handler definition" for more details;
97-
* `messageTypeField` - which field in the message describes the type of a message. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler; **Note:** It is not supported for Kafka consumer
101+
* `messageTypeResolver` - configuration for resolving the message type. This field needs to be defined as `z.literal` in the schema and is used for routing the message to the correct handler. See Publishers section above for details. **Note:** It is not supported for Kafka consumer.
98102
* `messageTimestampField` - which field in the message contains the message creation date (by default it is `timestamp`). This field needs to be a `Date` object or an ISO-8601 date string;
99103
* `maxRetryDuration` - how long (in seconds) the message should be retried due to the `retryLater` result before marking it as consumed (and sending to DLQ, if one is configured). This is used to avoid infinite loops. Default is 4 days;
100104
* `queueName`; (for SNS publishers this is a misnomer which actually refers to a topic name)

UPGRADING.md

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,105 @@
11
# Upgrading Guide
22

3+
## Upgrading </br> `core` `25.x.x` -> `26.0.0` </br> `sqs` `xx.x.x` -> `xx.0.0` </br> `sns` `xx.x.x` -> `xx.0.0` </br> `amqp` `xx.x.x` -> `xx.0.0` </br> `gcp-pubsub` `2.x.x` -> `3.0.0`
4+
5+
### Description of Breaking Changes
6+
7+
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
8+
9+
- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
10+
11+
### Migration Steps
12+
13+
#### Replacing `messageTypeField` with `messageTypeResolver`
14+
15+
Replace `messageTypeField: 'fieldName'` with `messageTypeResolver: { messageTypePath: 'fieldName' }`:
16+
17+
```typescript
18+
// Before
19+
super(dependencies, {
20+
messageTypeField: 'type',
21+
handlers: new MessageHandlerConfigBuilder()
22+
.addConfig(schema, handler)
23+
.build(),
24+
})
25+
26+
// After
27+
super(dependencies, {
28+
messageTypeResolver: { messageTypePath: 'type' },
29+
handlers: new MessageHandlerConfigBuilder()
30+
.addConfig(schema, handler)
31+
.build(),
32+
})
33+
```
34+
35+
## Upgrading </br> `core` `24.x.x` -> `25.0.0` </br> `gcp-pubsub` `1.x.x` -> `2.0.0`
36+
37+
### Description of Breaking Changes
38+
39+
- **`NO_MESSAGE_TYPE_FIELD` constant removed**: The `NO_MESSAGE_TYPE_FIELD` constant has been removed from `@message-queue-toolkit/core`. Use `messageTypeResolver` with literal mode instead.
40+
41+
- **New `messageTypeResolver` configuration**: A flexible configuration for message type resolution. Supports three modes:
42+
- `{ messageTypePath: 'type' }` - extract type from a field at the root of the message
43+
- `{ literal: 'my.message.type' }` - use a constant type for all messages
44+
- `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function
45+
46+
- **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time.
47+
48+
### Migration Steps
49+
50+
#### If using `NO_MESSAGE_TYPE_FIELD`
51+
52+
Replace `messageTypeField: NO_MESSAGE_TYPE_FIELD` with `messageTypeResolver: { literal: 'your.message.type' }`:
53+
54+
```typescript
55+
// Before
56+
import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core'
57+
58+
super(dependencies, {
59+
messageTypeField: NO_MESSAGE_TYPE_FIELD,
60+
handlers: new MessageHandlerConfigBuilder()
61+
.addConfig(schema, handler)
62+
.build(),
63+
})
64+
65+
// After
66+
super(dependencies, {
67+
messageTypeResolver: { literal: 'your.message.type' },
68+
handlers: new MessageHandlerConfigBuilder()
69+
.addConfig(schema, handler, { messageType: 'your.message.type' })
70+
.build(),
71+
})
72+
```
73+
74+
#### If using custom resolver function
75+
76+
When using `messageTypeResolver: { resolver: fn }`, provide explicit `messageType` in handler options:
77+
78+
```typescript
79+
super(dependencies, {
80+
messageTypeResolver: {
81+
resolver: ({ messageAttributes }) => {
82+
// Map external event types to internal types
83+
const eventType = messageAttributes?.eventType as string
84+
if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created'
85+
throw new Error(`Unknown event type: ${eventType}`)
86+
},
87+
},
88+
handlers: new MessageHandlerConfigBuilder()
89+
.addConfig(ObjectSchema, handler, { messageType: 'storage.object.created' })
90+
.build(),
91+
})
92+
```
93+
94+
#### GCP Pub/Sub DLQ Consumer
95+
96+
The `AbstractPubSubDlqConsumer` now uses `DLQ_MESSAGE_TYPE` constant internally. If you import this constant, update your import:
97+
98+
```typescript
99+
import { DLQ_MESSAGE_TYPE } from '@message-queue-toolkit/gcp-pubsub'
100+
// DLQ_MESSAGE_TYPE = 'dlq.message'
101+
```
102+
3103
## Upgrading </br> `core` `19.0.0` -> `20.0.0` </br> `sqs` `19.0.0` -> `20.0.0` </br> `sns` `20.0.0` -> `21.0.0` </br> `amqp` `18.0.0` -> `19.0.0` </br> `metrics` `2.0.0` -> `3.0.0`
4104

5105
### Description of Breaking Changes

examples/sns-sqs/lib/common/TestPublisherManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export const publisherManager = new SnsPublisherManager<
2929
newPublisherOptions: {
3030
handlerSpy: true,
3131
messageIdField: 'id',
32-
messageTypeField: 'type',
32+
messageTypeResolver: { messageTypePath: 'type' },
3333
deletionConfig: {
3434
deleteIfExists: isTest, // only enable this in tests
3535
// and ensure that the owning side is doing the deletion.

examples/sns-sqs/lib/common/UserConsumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export class UserConsumer extends AbstractSnsSqsConsumer<SupportedMessages, Exec
2727
.addConfig(UserEvents.created, userCreatedHandler, {})
2828
.addConfig(UserEvents.updated, userUpdatedHandler, {})
2929
.build(),
30-
messageTypeField: 'type',
30+
messageTypeResolver: { messageTypePath: 'type' },
3131
// Consumer creates its own queue
3232
creationConfig: {
3333
queue: {

packages/amqp/lib/AbstractAmqpConsumer.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ export abstract class AbstractAmqpConsumer<
104104
ExecutionContext,
105105
PrehandlerOutput
106106
>({
107-
messageTypeField: this.messageTypeField,
107+
messageTypeResolver: this.messageTypeResolver,
108108
messageHandlers: options.handlers,
109109
})
110110
this.executionContext = executionContext
@@ -151,12 +151,8 @@ export abstract class AbstractAmqpConsumer<
151151
}
152152
const { originalMessage, parsedMessage } = deserializedMessage.result
153153

154-
// @ts-expect-error
155-
const messageType = parsedMessage[this.messageTypeField]
156-
const transactionSpanId = `queue_${this.queueName}:${
157-
// @ts-expect-error
158-
parsedMessage[this.messageTypeField]
159-
}`
154+
const messageType = this.resolveMessageTypeFromMessage(parsedMessage) ?? 'unknown'
155+
const transactionSpanId = `queue_${this.queueName}:${messageType}`
160156

161157
// @ts-expect-error
162158
const uniqueTransactionKey = parsedMessage[this.messageIdField]

packages/amqp/lib/AbstractAmqpPublisher.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ export abstract class AbstractAmqpPublisher<
8888
}
8989

9090
if (this.logMessages) {
91-
// @ts-expect-error
92-
const resolvedLogMessage = this.resolveMessageLog(message, message[this.messageTypeField])
91+
const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown'
92+
const resolvedLogMessage = this.resolveMessageLog(message, messageType)
9393
this.logMessage(resolvedLogMessage)
9494
}
9595

@@ -122,8 +122,7 @@ export abstract class AbstractAmqpPublisher<
122122
// @ts-expect-error
123123
queueName: this.queueName,
124124
exchange: this.exchange,
125-
// @ts-expect-error
126-
messageType: message[this.messageTypeField] ?? 'unknown',
125+
messageType: this.resolveMessageTypeFromMessage(message) ?? 'unknown',
127126
}),
128127
cause: err as Error,
129128
})

packages/amqp/test/consumers/AmqpPermissionConsumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ export class AmqpPermissionConsumer extends AbstractAmqpQueueConsumer<
7575
deadLetterQueue: options?.deadLetterQueue,
7676
logMessages: options?.logMessages,
7777
handlerSpy: true,
78-
messageTypeField: 'messageType',
78+
messageTypeResolver: { messageTypePath: 'messageType' },
7979
deletionConfig: { deleteIfExists: true },
8080
maxRetryDuration: options?.maxRetryDuration,
8181
handlers: new MessageHandlerConfigBuilder<

packages/amqp/test/fakes/CustomFakeConsumer.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ import type { ZodSchema } from 'zod/v4'
55
import { AbstractAmqpQueueConsumer } from '../../lib/AbstractAmqpQueueConsumer.ts'
66
import type { AMQPConsumerDependencies } from '../../lib/AbstractAmqpService.ts'
77

8+
/**
9+
* Message type used for the catch-all handler in CustomFakeConsumer.
10+
*/
11+
const CUSTOM_FAKE_MESSAGE_TYPE = 'custom.fake.message'
12+
813
export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<
914
PublisherBaseMessageType,
1015
unknown
@@ -22,9 +27,12 @@ export class CustomFakeConsumer extends AbstractAmqpQueueConsumer<
2227
},
2328
},
2429
handlerSpy: true,
25-
messageTypeField: 'messageType',
30+
// Use literal resolver so all messages route to the same handler
31+
messageTypeResolver: { literal: CUSTOM_FAKE_MESSAGE_TYPE },
2632
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
27-
.addConfig(schema, () => Promise.resolve({ result: 'success' }))
33+
.addConfig(schema, () => Promise.resolve({ result: 'success' }), {
34+
messageType: CUSTOM_FAKE_MESSAGE_TYPE,
35+
})
2836
.build(),
2937
},
3038
{},

packages/amqp/test/fakes/FakeQueueConsumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export class FakeQueueConsumer extends AbstractAmqpQueueConsumer<
2525
deleteIfExists: true,
2626
},
2727
handlerSpy: true,
28-
messageTypeField: 'type',
28+
messageTypeResolver: { messageTypePath: 'type' },
2929
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
3030
.addConfig(eventDefinition.consumerSchema, () => Promise.resolve({ result: 'success' }))
3131
.build(),

packages/amqp/test/fakes/FakeTopicConsumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export class FakeTopicConsumer extends AbstractAmqpTopicConsumer<
4141
deleteIfExists: true,
4242
},
4343
handlerSpy: true,
44-
messageTypeField: 'type',
44+
messageTypeResolver: { messageTypePath: 'type' },
4545
handlers: new MessageHandlerConfigBuilder<PublisherBaseMessageType, unknown>()
4646
.addConfig(eventDefinition.consumerSchema, () => {
4747
this.messageCounter++

0 commit comments

Comments
 (0)