Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.

- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.

Comment on lines 7 to 10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Small wording nit: “outside of” → “outside” (avoids redundancy).

-- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
+- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
🧰 Tools
🪛 LanguageTool

[style] ~9-~9: This phrase is redundant. Consider using “outside”.
Context: ... if you're using HandlerSpy directly (outside of the built-in queue services), you'll ne...

(OUTSIDE_OF)

🤖 Prompt for AI Agents
In UPGRADING.md around lines 7 to 10, change the wording "outside of the
built-in queue services" to "outside the built-in queue services" to remove the
redundant "of"; update the sentence accordingly so it reads that callers using
HandlerSpy directly outside the built-in queue services need to update their
calls.

### Migration Steps

Expand Down Expand Up @@ -39,12 +39,18 @@ super(dependencies, {
- **`NO_MESSAGE_TYPE_FIELD` constant removed**: The `NO_MESSAGE_TYPE_FIELD` constant has been removed from `@message-queue-toolkit/core`. Use `messageTypeResolver` with literal mode instead.

- **New `messageTypeResolver` configuration**: A flexible configuration for message type resolution. Supports three modes:
- `{ messageTypePath: 'type' }` - extract type from a field at the root of the message
- `{ messageTypePath: 'type' }` - extract type from a field in the message (supports dot notation for nested paths like `'metadata.type'`)
- `{ literal: 'my.message.type' }` - use a constant type for all messages
- `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function

- **`MessageSchemaContainer` API changed**: The constructor now accepts `SchemaEntry` and `DefinitionEntry` objects instead of bare schemas. Each entry can include an optional `messageType` for explicit type mapping (required when using custom resolvers).

- **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time.

- **Custom resolver validation**: Custom resolver functions (`{ resolver: fn }`) cannot be used with multiple schemas in `MessageSchemaContainer`. This is because the resolver works at runtime, but at registration time the container needs to map schemas to types. Use `messageTypePath` for multiple schemas, or register only a single schema with a custom resolver.

- **Improved error handling in `MessageSchemaContainer.resolveSchema`**: The method now properly catches resolver errors and returns them as `Either<Error, Schema>` instead of throwing. This ensures consistent error handling regardless of resolver configuration.

### Migration Steps

#### If using `NO_MESSAGE_TYPE_FIELD`
Expand Down
10 changes: 7 additions & 3 deletions packages/amqp/test/utils/testContext.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
import type { CommonLogger, ErrorReporter, ErrorResolver } from '@lokalise/node-core'
import {
type CommonLogger,
type ErrorReporter,
type ErrorResolver,
globalLogger,
} from '@lokalise/node-core'
import type {
MessageMetricsManager,
TransactionObservabilityManager,
Expand All @@ -11,7 +16,6 @@ import {
import type { NameAndRegistrationPair } from 'awilix'
import { asClass, asFunction, createContainer, Lifetime } from 'awilix'
import { AwilixManager } from 'awilix-manager'
import pino from 'pino'
import { z } from 'zod/v4'
import { AmqpConnectionManager } from '../../lib/AmqpConnectionManager.ts'
import type { AmqpAwareEventDefinition } from '../../lib/AmqpQueuePublisherManager.ts'
Expand Down Expand Up @@ -83,7 +87,7 @@ export const TestEvents = {
export type TestEventsType = (typeof TestEvents)[keyof typeof TestEvents][]
export type TestEventPublishPayloadsType = z.output<TestEventsType[number]['publisherSchema']>

const TestLogger: CommonLogger = pino()
const TestLogger: CommonLogger = globalLogger

export async function registerDependencies(
config: AmqpConfig,
Expand Down
19 changes: 15 additions & 4 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,16 @@ The `messageTypeResolver` configuration supports three modes:

##### Mode 1: Field Path (Simple)

Use when the message type is a field at the root level of the parsed message body:
Use when the message type is a field in the parsed message body. Supports dot notation for nested paths:

```typescript
{
messageTypeResolver: { messageTypePath: 'type' }, // Extracts type from message.type
messageTypeResolver: { messageTypePath: 'type' }, // Extracts from message.type
}

// Nested path example
{
messageTypeResolver: { messageTypePath: 'metadata.eventType' }, // Extracts from message.metadata.eventType
}
```

Expand Down Expand Up @@ -533,11 +538,17 @@ Manages Zod schemas and validates messages:
import { MessageSchemaContainer } from '@message-queue-toolkit/core'

const container = new MessageSchemaContainer({
messageSchemas: [Schema1, Schema2],
messageSchemas: [{ schema: Schema1 }, { schema: Schema2 }],
messageDefinitions: [],
messageTypeResolver: { messageTypePath: 'type' },
})

const schema = container.resolveSchema(message.type)
const result = container.resolveSchema(message)
if ('error' in result) {
// Handle error
} else {
const schema = result.result
}
```

### AbstractPublisherManager
Expand Down
6 changes: 5 additions & 1 deletion packages/core/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ export {
type SpyResultInput,
TYPE_NOT_RESOLVED,
} from './queues/HandlerSpy.ts'
export type { MessageSchemaContainerOptions } from './queues/MessageSchemaContainer.ts'
export type {
DefinitionEntry,
MessageSchemaContainerOptions,
SchemaEntry,
} from './queues/MessageSchemaContainer.ts'
export { MessageSchemaContainer } from './queues/MessageSchemaContainer.ts'
export type {
MessageTypeResolverConfig,
Expand Down
27 changes: 18 additions & 9 deletions packages/core/lib/queues/AbstractQueueService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import {
stringValueSerializer,
} from '@lokalise/node-core'
import type { MakeRequired } from '@lokalise/universal-ts-utils/node'
import type { CommonEventDefinition } from '@message-queue-toolkit/schemas'
import {
MESSAGE_DEDUPLICATION_OPTIONS_SCHEMA,
type MessageDeduplicationOptions,
} from '@message-queue-toolkit/schemas'
import { getProperty, setProperty } from 'dot-prop'
import type { ZodSchema, ZodType } from 'zod/v4'
import type { MessageInvalidFormatError, MessageValidationError } from '../errors/Errors.ts'
import {
Expand Down Expand Up @@ -177,10 +177,17 @@ export abstract class AbstractQueueService<
handlers: MessageHandlerConfig<MessagePayloadSchemas, ExecutionContext, PrehandlerOutput>[]
messageTypeResolver?: MessageTypeResolverConfig
}) {
const messageSchemas = options.handlers.map((entry) => entry.schema)
const messageDefinitions: CommonEventDefinition[] = options.handlers
.map((entry) => entry.definition)
.filter((entry) => entry !== undefined)
const messageSchemas = options.handlers.map((entry) => ({
schema: entry.schema,
messageType: entry.messageType,
}))
const messageDefinitions = options.handlers
.filter((entry) => entry.definition !== undefined)
.map((entry) => ({
// biome-ignore lint/style/noNonNullAssertion: filtered above
definition: entry.definition!,
messageType: entry.messageType,
}))

return new MessageSchemaContainer<MessagePayloadSchemas>({
messageTypeResolver: options.messageTypeResolver,
Expand All @@ -193,7 +200,7 @@ export abstract class AbstractQueueService<
messageSchemas: readonly ZodSchema<MessagePayloadSchemas>[]
messageTypeResolver?: MessageTypeResolverConfig
}) {
const messageSchemas = options.messageSchemas
const messageSchemas = options.messageSchemas.map((schema) => ({ schema }))

return new MessageSchemaContainer<MessagePayloadSchemas>({
messageTypeResolver: options.messageTypeResolver,
Expand Down Expand Up @@ -674,11 +681,13 @@ export abstract class AbstractQueueService<
[this.messageDeduplicationOptionsField]: message[this.messageDeduplicationOptionsField],
}

// Preserve message type field if using messageTypePath resolver
// Preserve message type field if using messageTypePath resolver (supports nested paths)
if (this.messageTypeResolver && isMessageTypePathConfig(this.messageTypeResolver)) {
const messageTypePath = this.messageTypeResolver.messageTypePath
// @ts-expect-error
result[messageTypePath] = message[messageTypePath]
const typeValue = getProperty(message, messageTypePath)
if (typeValue !== undefined) {
setProperty(result, messageTypePath, typeValue)
}
}

return result
Expand Down
6 changes: 3 additions & 3 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ export class HandlerContainer<

if (!messageType) {
throw new Error(
'Unable to determine message type for handler. ' +
'Either provide messageType in handler options, use a literal resolver, ' +
'or ensure the schema has a literal type field matching messageTypePath.',
'Unable to determine message type for handler at registration time. ' +
'Either provide explicit messageType in handler options (required for custom resolver functions), ' +
'use a literal resolver, or ensure the schema has a literal type field matching messageTypePath.',
)
}

Expand Down
Loading
Loading