Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.

- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.

Comment on lines 7 to 10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Small wording nit: “outside of” → “outside” (avoids redundancy).

-- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
+- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
- **`HandlerSpyParams.messageTypePath` removed**: The `messageTypePath` option in `HandlerSpyParams` has been removed. Message types are now passed explicitly when adding processed messages. This is handled internally by the library, so most users won't need to make changes.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
- **`messageTypeField` option removed**: The deprecated `messageTypeField` option has been removed from all queue services. Use `messageTypeResolver` instead.
- **`HandlerSpy.addProcessedMessage` signature changed**: The `addProcessedMessage` method now requires a `messageType` parameter. This is an internal API change - if you're using `HandlerSpy` directly (outside the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
🧰 Tools
🪛 LanguageTool

[style] ~9-~9: This phrase is redundant. Consider using “outside”.
Context: ... if you're using HandlerSpy directly (outside of the built-in queue services), you'll ne...

(OUTSIDE_OF)

🤖 Prompt for AI Agents
In UPGRADING.md around lines 7 to 10, change the wording "outside of the
built-in queue services" to "outside the built-in queue services" to remove the
redundant "of"; update the sentence accordingly so it reads that callers using
HandlerSpy directly outside the built-in queue services need to update their
calls.

### Migration Steps

Expand Down Expand Up @@ -45,6 +45,10 @@ super(dependencies, {

- **Explicit `messageType` in handler configuration**: When using a custom resolver function, you must provide an explicit `messageType` in handler options since the type cannot be extracted from schemas at registration time.

- **Custom resolver validation**: Custom resolver functions (`{ resolver: fn }`) cannot be used with multiple schemas in `MessageSchemaContainer`. This is because the resolver works at runtime, but at registration time the container needs to map schemas to types. Use `messageTypePath` for multiple schemas, or register only a single schema with a custom resolver.

- **Improved error handling in `MessageSchemaContainer.resolveSchema`**: The method now properly catches resolver errors and returns them as `Either<Error, Schema>` instead of throwing. This ensures consistent error handling regardless of resolver configuration.

### Migration Steps

#### If using `NO_MESSAGE_TYPE_FIELD`
Expand Down
6 changes: 3 additions & 3 deletions packages/core/lib/queues/HandlerContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,9 @@ export class HandlerContainer<

if (!messageType) {
throw new Error(
'Unable to determine message type for handler. ' +
'Either provide messageType in handler options, use a literal resolver, ' +
'or ensure the schema has a literal type field matching messageTypePath.',
'Unable to determine message type for handler at registration time. ' +
'Either provide explicit messageType in handler options (required for custom resolver functions), ' +
'use a literal resolver, or ensure the schema has a literal type field matching messageTypePath.',
)
}

Expand Down
67 changes: 53 additions & 14 deletions packages/core/lib/queues/MessageSchemaContainer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
extractMessageTypeFromSchema,
isMessageTypeLiteralConfig,
isMessageTypePathConfig,
isMessageTypeResolverFnConfig,
type MessageTypeResolverConfig,
type MessageTypeResolverContext,
resolveMessageType,
Expand Down Expand Up @@ -45,32 +46,45 @@ export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
message: Record<string, any>,
attributes?: Record<string, unknown>,
): Either<Error, ZodSchema<MessagePayloadSchemas>> {
const messageType = this.resolveMessageTypeFromData(message, attributes)
// If no resolver configured, use the single default schema
if (!this.messageTypeResolver) {
const schema = this.messageSchemas[DEFAULT_SCHEMA_KEY]
if (!schema) {
return {
error: new Error('No messageTypeResolver configured and no default schema available'),
}
}
return { result: schema }
}

let messageType: string
try {
messageType = this.resolveMessageTypeFromData(message, attributes)
} catch (e) {
return { error: e instanceof Error ? e : new Error(String(e)) }
}

const schema = this.messageSchemas[messageType ?? DEFAULT_SCHEMA_KEY]
const schema = this.messageSchemas[messageType]
if (!schema) {
return {
error: new Error(
`Unsupported message type: ${messageType ?? DEFAULT_SCHEMA_KEY.toString()}`,
),
error: new Error(`Unsupported message type: ${messageType}`),
}
}
return { result: schema }
}

/**
* Resolves message type from message data and optional attributes.
* Only called when messageTypeResolver is configured.
*/
private resolveMessageTypeFromData(
messageData: unknown,
messageAttributes?: Record<string, unknown>,
): string | undefined {
if (this.messageTypeResolver) {
const context: MessageTypeResolverContext = { messageData, messageAttributes }
return resolveMessageType(this.messageTypeResolver, context)
}

return undefined
): string {
// This method is only called after checking messageTypeResolver exists in resolveSchema
const resolver = this.messageTypeResolver as MessageTypeResolverConfig
const context: MessageTypeResolverContext = { messageData, messageAttributes }
return resolveMessageType(resolver, context)
}

/**
Expand All @@ -95,11 +109,37 @@ export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
return undefined
}

/**
* Validates that multiple schemas can be properly mapped at registration time.
*/
private validateMultipleSchemas(schemaCount: number): void {
if (schemaCount <= 1) return

if (!this.messageTypeResolver) {
throw new Error(
'Multiple schemas require messageTypeResolver to be configured. ' +
'Use messageTypePath config (to extract types from schema literals) or literal config.',
)
}
// Custom resolver function cannot be used with multiple schemas because
// we can't know what types it will return until runtime.
if (isMessageTypeResolverFnConfig(this.messageTypeResolver)) {
throw new Error(
'Custom resolver function cannot be used with multiple schemas. ' +
'The resolver works for runtime type resolution, but at registration time ' +
'we cannot determine which schema corresponds to which type. ' +
'Use messageTypePath config (to extract types from schema literals) or register only a single schema.',
)
}
}

private resolveMap<T extends CommonEventDefinition | ZodSchema<MessagePayloadSchemas>>(
array: readonly T[],
): Record<string | symbol, T> {
const result: Record<string | symbol, T> = {}

this.validateMultipleSchemas(array.length)

const literalType = this.getLiteralMessageType()
const messageTypePath = this.getMessageTypePathForSchema()

Expand All @@ -117,8 +157,7 @@ export class MessageSchemaContainer<MessagePayloadSchemas extends object> {
: // @ts-expect-error - ZodSchema has shape property at runtime
extractMessageTypeFromSchema(item, messageTypePath)
}
// For custom resolver without field path, we can't extract from schema
// All schemas will use DEFAULT_SCHEMA_KEY
// For single schema without resolver, use DEFAULT_SCHEMA_KEY

const key = type ?? DEFAULT_SCHEMA_KEY
if (result[key]) throw new Error(`Duplicate schema for type: ${key.toString()}`)
Expand Down
15 changes: 13 additions & 2 deletions packages/core/lib/queues/MessageTypeResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ export function resolveMessageType(
*
* @param schema - Zod schema with shape property
* @param messageTypePath - Field name containing the type literal
* @returns The literal type value from the schema, or undefined
* @returns The literal type value from the schema, or undefined if field doesn't exist or isn't a literal
*/
export function extractMessageTypeFromSchema(
// biome-ignore lint/suspicious/noExplicitAny: Schema shape can be any
Expand All @@ -190,5 +190,16 @@ export function extractMessageTypeFromSchema(
if (!messageTypePath) {
return undefined
}
return schema.shape?.[messageTypePath]?.value as string | undefined

const field = schema.shape?.[messageTypePath]
if (!field) {
return undefined
}

// Check if the field has a literal value (z.literal() creates a field with .value)
if (!('value' in field)) {
return undefined
}

return field.value as string | undefined
}
10 changes: 5 additions & 5 deletions packages/core/test/queues/HandlerContainer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ describe('HandlerContainer', () => {
})

describe('resolveMessageType', () => {
describe('with messageTypeField (legacy)', () => {
describe('with messageTypePath resolver', () => {
it('should extract message type from the specified field', () => {
const configs = new MessageHandlerConfigBuilder<SupportedMessages, TestContext>()
.addConfig(USER_MESSAGE_SCHEMA, () => Promise.resolve({ result: 'success' as const }))
Expand Down Expand Up @@ -280,7 +280,7 @@ describe('HandlerContainer', () => {
})

describe('with no configuration', () => {
it('should throw error when neither messageTypeField nor messageTypeResolver is configured', () => {
it('should throw error when messageTypeResolver is not configured', () => {
const configs = new MessageHandlerConfigBuilder<SupportedMessages, TestContext>()
.addConfig(
USER_MESSAGE_SCHEMA,
Expand Down Expand Up @@ -311,9 +311,9 @@ describe('HandlerContainer', () => {
messageHandlers: configs,
}),
).toThrow(
'Unable to determine message type for handler. ' +
'Either provide messageType in handler options, use a literal resolver, ' +
'or ensure the schema has a literal type field matching messageTypePath.',
'Unable to determine message type for handler at registration time. ' +
'Either provide explicit messageType in handler options (required for custom resolver functions), ' +
'use a literal resolver, or ensure the schema has a literal type field matching messageTypePath.',
)
})

Expand Down
161 changes: 161 additions & 0 deletions packages/core/test/queues/MessageSchemaContainer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { describe, expect, it } from 'vitest'
import z from 'zod/v4'

import { MessageSchemaContainer } from '../../lib/queues/MessageSchemaContainer.ts'

const MESSAGE_SCHEMA_A = z.object({
type: z.literal('message.a'),
payload: z.string(),
})

const MESSAGE_SCHEMA_B = z.object({
type: z.literal('message.b'),
payload: z.number(),
})

const MESSAGE_SCHEMA_NO_TYPE = z.object({
payload: z.string(),
})

type MessageA = z.infer<typeof MESSAGE_SCHEMA_A>
type MessageB = z.infer<typeof MESSAGE_SCHEMA_B>

describe('MessageSchemaContainer', () => {
describe('resolveSchema', () => {
it('should resolve schema using messageTypePath', () => {
const container = new MessageSchemaContainer<MessageA | MessageB>({
messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B],
messageDefinitions: [],
messageTypeResolver: { messageTypePath: 'type' },
})

const resultA = container.resolveSchema({ type: 'message.a', payload: 'test' })
expect('result' in resultA).toBe(true)

const resultB = container.resolveSchema({ type: 'message.b', payload: 123 })
expect('result' in resultB).toBe(true)
})

it('should return error for unknown message type', () => {
const container = new MessageSchemaContainer<MessageA>({
messageSchemas: [MESSAGE_SCHEMA_A],
messageDefinitions: [],
messageTypeResolver: { messageTypePath: 'type' },
})

const result = container.resolveSchema({ type: 'unknown.type', payload: 'test' })
expect('error' in result).toBe(true)
if ('error' in result && result.error) {
expect(result.error.message).toContain('Unsupported message type: unknown.type')
}
})

it('should catch resolver errors and return as Either error', () => {
const container = new MessageSchemaContainer<MessageA>({
messageSchemas: [MESSAGE_SCHEMA_A],
messageDefinitions: [],
messageTypeResolver: {
resolver: () => {
throw new Error('Resolver failed')
},
},
})

const result = container.resolveSchema({ payload: 'test' })
expect('error' in result).toBe(true)
if ('error' in result && result.error) {
expect(result.error.message).toBe('Resolver failed')
}
})

it('should resolve schema using literal type', () => {
const container = new MessageSchemaContainer<MessageA>({
messageSchemas: [MESSAGE_SCHEMA_A],
messageDefinitions: [],
messageTypeResolver: { literal: 'message.a' },
})

// Any message resolves to the single schema
const result = container.resolveSchema({ anything: 'works' })
expect('result' in result).toBe(true)
})

it('should resolve schema using attributes with custom resolver', () => {
// Custom resolver extracts type from attributes for runtime resolution.
// For schema registration, we use messageTypePath to map schemas to types.
const container = new MessageSchemaContainer<MessageA | MessageB>({
messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B],
messageDefinitions: [],
messageTypeResolver: { messageTypePath: 'type' },
})

// At runtime, we can still use attributes if needed (though this example uses data)
const result = container.resolveSchema({ type: 'message.a', payload: 'test' })
expect('result' in result).toBe(true)
})
})

describe('registration validation', () => {
it('should throw error when custom resolver is used with multiple schemas', () => {
expect(
() =>
new MessageSchemaContainer<MessageA | MessageB>({
messageSchemas: [MESSAGE_SCHEMA_A, MESSAGE_SCHEMA_B],
messageDefinitions: [],
messageTypeResolver: {
resolver: () => 'some.type',
},
}),
).toThrow(
'Custom resolver function cannot be used with multiple schemas. ' +
'The resolver works for runtime type resolution, but at registration time ' +
'we cannot determine which schema corresponds to which type. ' +
'Use messageTypePath config (to extract types from schema literals) or register only a single schema.',
)
})

it('should allow custom resolver with single schema', () => {
expect(
() =>
new MessageSchemaContainer<MessageA>({
messageSchemas: [MESSAGE_SCHEMA_A],
messageDefinitions: [],
messageTypeResolver: {
resolver: () => 'message.a',
},
}),
).not.toThrow()
})

it('should throw error for duplicate schema types', () => {
const DUPLICATE_SCHEMA = z.object({
type: z.literal('message.a'), // Same type as MESSAGE_SCHEMA_A
payload: z.string(), // Same structure to satisfy type
})

expect(
() =>
new MessageSchemaContainer<MessageA>({
messageSchemas: [MESSAGE_SCHEMA_A, DUPLICATE_SCHEMA],
messageDefinitions: [],
messageTypeResolver: { messageTypePath: 'type' },
}),
).toThrow('Duplicate schema for type: message.a')
})

it('should handle schemas without literal type field gracefully', () => {
// When schema doesn't have the expected literal field, it falls back to DEFAULT_SCHEMA_KEY
// With a single schema, this works fine
const container = new MessageSchemaContainer({
messageSchemas: [MESSAGE_SCHEMA_NO_TYPE],
messageDefinitions: [],
messageTypeResolver: { messageTypePath: 'type' },
})

// Since there's no 'type' field in schema, it uses default key
// Any message will fail to match since we're looking for a specific type
const result = container.resolveSchema({ type: 'any.type', payload: 'test' })
expect('error' in result).toBe(true)
})
})
})
4 changes: 2 additions & 2 deletions packages/core/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ export default defineConfig({
include: ['lib/**/*.ts'],
exclude: ['vitest.config.ts', 'lib/**/index.ts'],
thresholds: {
lines: 35,
lines: 42,
functions: 80,
branches: 85,
statements: 35,
statements: 42,
},
},
},
Expand Down
Loading