Skip to content

Commit d066ece

Browse files
authored
Handle edge cases (#374)
1 parent ea77735 commit d066ece

23 files changed

+1613
-80
lines changed

UPGRADING.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
88

9-
- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
9+
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
1010

1111
### Migration Steps
1212

@@ -39,12 +39,18 @@ super(dependencies, {
3939
- **`NO_MESSAGE_TYPE_FIELD` constant removed**: The `NO_MESSAGE_TYPE_FIELD` constant has been removed from `@message-queue-toolkit/core`. Use `messageTypeResolver` with literal mode instead.
4040

4141
- **New `messageTypeResolver` configuration**: A flexible configuration for message type resolution. Supports three modes:
42-
- `{ messageTypePath: 'type' }` - extract type from a field at the root of the message
42+
- `{ messageTypePath: 'type' }` - extract type from a field in the message (supports dot notation for nested paths like `'metadata.type'`)
4343
- `{ literal: 'my.message.type' }` - use a constant type for all messages
4444
- `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function
4545

46+
- **`MessageSchemaContainer` API changed**: The constructor now accepts `SchemaEntry` and `DefinitionEntry` objects instead of bare schemas. Each entry can include an optional `messageType` for explicit type mapping (required when using custom resolvers).
47+
4648
- **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time.
4749

50+
- **Custom resolver validation**: Custom resolver functions (`{ resolver: fn }`) cannot be used with multiple schemas in `MessageSchemaContainer`. This is because the resolver works at runtime, but at registration time the container needs to map schemas to types. Use `messageTypePath` for multiple schemas, or register only a single schema with a custom resolver.
51+
52+
- **Improved error handling in `MessageSchemaContainer.resolveSchema`**: The method now properly catches resolver errors and returns them as `Either<Error, Schema>` instead of throwing. This ensures consistent error handling regardless of resolver configuration.
53+
4854
### Migration Steps
4955

5056
#### If using `NO_MESSAGE_TYPE_FIELD`

packages/amqp/test/utils/testContext.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import type { CommonLogger, ErrorReporter, ErrorResolver } from '@lokalise/node-core'
1+
import {
2+
type CommonLogger,
3+
type ErrorReporter,
4+
type ErrorResolver,
5+
globalLogger,
6+
} from '@lokalise/node-core'
27
import type {
38
MessageMetricsManager,
49
TransactionObservabilityManager,
@@ -11,7 +16,6 @@ import {
1116
import type { NameAndRegistrationPair } from 'awilix'
1217
import { asClass, asFunction, createContainer, Lifetime } from 'awilix'
1318
import { AwilixManager } from 'awilix-manager'
14-
import pino from 'pino'
1519
import { z } from 'zod/v4'
1620
import { AmqpConnectionManager } from '../../lib/AmqpConnectionManager.ts'
1721
import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager.ts'
@@ -83,7 +87,7 @@ export const TestEvents = {
8387
export type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][]
8488
export type TestEventPublishPayloadsType = z.output<TestEventsType[number]['publisherSchema']>
8589

86-
const TestLogger: CommonLogger = pino()
90+
const TestLogger: CommonLogger = globalLogger
8791

8892
export async function registerDependencies(
8993
config: AmqpConfig,

packages/core/README.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,16 @@ The `messageTypeResolver` configuration supports three modes:
8888
8989
##### Mode 1: Field Path (Simple)
9090
91-
Use when the message type is a field at the root level of the parsed message body:
91+
Use when the message type is a field in the parsed message body. Supports dot notation for nested paths:
9292
9393
```typescript
9494
{
95-
messageTypeResolver: { messageTypePath: 'type' }, // Extracts type from message.type
95+
messageTypeResolver: { messageTypePath: 'type' }, // Extracts from message.type
96+
}
97+
98+
// Nested path example
99+
{
100+
messageTypeResolver: { messageTypePath: 'metadata.eventType' }, // Extracts from message.metadata.eventType
96101
}
97102
```
98103

@@ -533,11 +538,17 @@ Manages Zod schemas and validates messages:
533538
import { MessageSchemaContainer } from '@message-queue-toolkit/core'
534539

535540
const container = new MessageSchemaContainer({
536-
messageSchemas: [Schema1, Schema2],
541+
messageSchemas: [{ schema: Schema1 }, { schema: Schema2 }],
542+
messageDefinitions: [],
537543
messageTypeResolver: { messageTypePath: 'type' },
538544
})
539545

540-
const schema = container.resolveSchema(message.type)
546+
const result = container.resolveSchema(message)
547+
if ('error' in result) {
548+
// Handle error
549+
} else {
550+
const schema = result.result
551+
}
541552
```
542553

543554
### AbstractPublisherManager

packages/core/lib/index.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ export {
7272
type SpyResultInput,
7373
TYPE_NOT_RESOLVED,
7474
} from './queues/HandlerSpy.ts'
75-
export type { MessageSchemaContainerOptions } from './queues/MessageSchemaContainer.ts'
75+
export type {
76+
DefinitionEntry,
77+
MessageSchemaContainerOptions,
78+
SchemaEntry,
79+
} from './queues/MessageSchemaContainer.ts'
7680
export { MessageSchemaContainer } from './queues/MessageSchemaContainer.ts'
7781
export type {
7882
MessageTypeResolverConfig,

packages/core/lib/queues/AbstractQueueService.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import {
88
stringValueSerializer,
99
} from '@lokalise/node-core'
1010
import type { MakeRequired } from '@lokalise/universal-ts-utils/node'
11-
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
1211
import {
1312
MESSAGE_DEDUPLICATION_OPTIONS_SCHEMA,
1413
type MessageDeduplicationOptions,
1514
} from '@message-queue-toolkit/schemas'
15+
import { getProperty, setProperty } from 'dot-prop'
1616
import type { ZodSchema, ZodType } from 'zod/v4'
1717
import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors.ts'
1818
import {
@@ -177,10 +177,17 @@ export abstract class AbstractQueueService<
177177
handlers: MessageHandlerConfig<MessagePayloadSchemas, ExecutionContext, PrehandlerOutput>[]
178178
messageTypeResolver?: MessageTypeResolverConfig
179179
}) {
180-
const messageSchemas = options.handlers.map((entry) => entry.schema)
181-
const messageDefinitions: CommonEventDefinition[] = options.handlers
182-
.map((entry) => entry.definition)
183-
.filter((entry) => entry !== undefined)
180+
const messageSchemas = options.handlers.map((entry) => ({
181+
schema: entry.schema,
182+
messageType: entry.messageType,
183+
}))
184+
const messageDefinitions = options.handlers
185+
.filter((entry) => entry.definition !== undefined)
186+
.map((entry) => ({
187+
// biome-ignore lint/style/noNonNullAssertion: filtered above
188+
definition: entry.definition!,
189+
messageType: entry.messageType,
190+
}))
184191

185192
return new MessageSchemaContainer<MessagePayloadSchemas>({
186193
messageTypeResolver: options.messageTypeResolver,
@@ -193,7 +200,7 @@ export abstract class AbstractQueueService<
193200
messageSchemas: readonly ZodSchema<MessagePayloadSchemas>[]
194201
messageTypeResolver?: MessageTypeResolverConfig
195202
}) {
196-
const messageSchemas = options.messageSchemas
203+
const messageSchemas = options.messageSchemas.map((schema) => ({ schema }))
197204

198205
return new MessageSchemaContainer<MessagePayloadSchemas>({
199206
messageTypeResolver: options.messageTypeResolver,
@@ -674,11 +681,13 @@ export abstract class AbstractQueueService<
674681
[this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField],
675682
}
676683

677-
// Preserve message type field if using messageTypePath resolver
684+
// Preserve message type field if using messageTypePath resolver (supports nested paths)
678685
if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) {
679686
const messageTypePath = this.messageTypeResolver.messageTypePath
680-
// @ts-expect-error
681-
result[messageTypePath] = message[messageTypePath]
687+
const typeValue = getProperty(message, messageTypePath)
688+
if (typeValue !== undefined) {
689+
setProperty(result, messageTypePath, typeValue)
690+
}
682691
}
683692

684693
return result

packages/core/lib/queues/HandlerContainer.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,9 +360,9 @@ export class HandlerContainer<
360360

361361
if (!messageType) {
362362
throw new Error(
363-
'Unable to determine message type for handler. ' +
364-
'Either provide messageType in handler options, use a literal resolver, ' +
365-
'or ensure the schema has a literal type field matching messageTypePath.',
363+
'Unable to determine message type for handler at registration time. ' +
364+
'Either provide explicit messageType in handler options (required for custom resolver functions), ' +
365+
'use a literal resolver, or ensure the schema has a literal type field matching messageTypePath.',
366366
)
367367
}
368368

0 commit comments

Comments
 (0)