-
Notifications
You must be signed in to change notification settings - Fork 7
Add flexible type resolution #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a resolver-based message type system ( Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Msg as Incoming Message
participant Resolver as MessageTypeResolver
participant Container as HandlerContainer
participant Schema as MessageSchemaContainer
participant Handler as Message Handler
Msg->>Resolver: resolveMessageType(messageData, messageAttributes)
alt explicit per-handler configured
Resolver-->>Container: (explicit) handlerKey
else literal config
Resolver-->>Container: (literal) handlerKey
else path config
Resolver->>Resolver: extract path from messageData or attributes
Resolver-->>Container: handlerKey or error
else custom fn
Resolver->>Resolver: call custom resolver(messageData, attributes)
Resolver-->>Container: handlerKey or error
end
Container->>Container: resolveHandler(handlerKey)
Container->>Schema: resolveSchema(messageData, messageAttributes)
Schema-->>Handler: validated payload
Handler->>Handler: process message
Handler->>Container: addProcessedMessage(result, messageId, resolvedType)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/core/lib/queues/MessageSchemaContainer.ts (1)
54-70: Consider wrapping resolver call in try-catch to maintain Either return type.The
resolveSchemamethod returnsEither<Error, ZodSchema>, butresolveMessageTypeFromDatacan throw when usingresolveMessageType(e.g., when a required field is missing). This would propagate as an uncaught exception rather than returning{ error: ... }.public resolveSchema( // biome-ignore lint/suspicious/noExplicitAny: This is expected message: Record<string, any>, attributes?: Record<string, unknown>, ): Either<Error, ZodSchema<MessagePayloadSchemas>> { - const messageType = this.resolveMessageTypeFromData(message, attributes) + let messageType: string | undefined + try { + messageType = this.resolveMessageTypeFromData(message, attributes) + } catch (error) { + return { error: error instanceof Error ? error : new Error(String(error)) } + } const schema = this.messageSchemas[messageType ?? DEFAULT_SCHEMA_KEY]
🧹 Nitpick comments (3)
README.md (1)
34-38: Clear documentation of the new messageTypeResolver feature.The documentation effectively introduces the three resolution modes with concise examples and appropriately references the core package README for detailed information.
The static analysis tool flagged minor indentation inconsistencies (expected 4-6 spaces, actual 8-12 spaces). Consider adjusting the indentation to align with markdown best practices:
- * `messageTypeResolver` - alternative to `messageTypeField` for flexible message type resolution. Supports three modes: - * `{ messageTypePath: 'type' }` - extract type from a field (equivalent to `messageTypeField`) - * `{ literal: 'my.message.type' }` - use a constant type for all messages - * `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function - See `@message-queue-toolkit/core` README for detailed documentation and examples. + * `messageTypeResolver` - alternative to `messageTypeField` for flexible message type resolution. Supports three modes: + * `{ messageTypePath: 'type' }` - extract type from a field (equivalent to `messageTypeField`) + * `{ literal: 'my.message.type' }` - use a constant type for all messages + * `{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }` - custom resolver function + See `@message-queue-toolkit/core` README for detailed documentation and examples.packages/core/lib/queues/MessageTypeResolver.ts (1)
164-173: Consider validating the type of the extracted field value.When extracting from
messageTypePath, the code assumes the field value is a string. If the field exists but contains a non-string value (e.g., a number or object), this will silently pass through and could cause issues downstream.if (isMessageTypePathConfig(config)) { const data = context.messageData as Record<string, unknown> | undefined const messageType = data?.[config.messageTypePath] as string | undefined - if (messageType === undefined) { + if (messageType === undefined || typeof messageType !== 'string') { throw new Error( - `Unable to resolve message type: field '${config.messageTypePath}' not found in message data`, + `Unable to resolve message type: field '${config.messageTypePath}' not found or not a string in message data`, ) } return messageType }packages/core/lib/queues/HandlerContainer.ts (1)
367-408: Potential issue with literal resolver and multiple handlers.When using a
literalresolver, all handlers without explicitmessageTypewill resolve to the same literal value. This will cause a "Duplicate handler for message type" error on the second handler. This behavior may be intentional (only one handler for literal mode), but it's worth documenting or providing a clearer error message.Consider adding a check to provide a more specific error message when literal mode is used with multiple handlers:
const literalType = this.getLiteralMessageType() const messageTypePath = this.getMessageTypePathForSchema() + // When using literal mode with multiple handlers, each handler must specify its own messageType + if (literalType && supportedHandlers.length > 1) { + const handlersWithoutType = supportedHandlers.filter(h => !h.messageType) + if (handlersWithoutType.length > 1) { + throw new Error( + 'When using literal resolver with multiple handlers, each handler must provide an explicit messageType option.', + ) + } + } + return supportedHandlers.reduce(
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (17)
README.md(2 hunks)UPGRADING.md(1 hunks)packages/amqp/test/fakes/CustomFakeConsumer.ts(2 hunks)packages/core/README.md(5 hunks)packages/core/lib/index.ts(1 hunks)packages/core/lib/queues/AbstractQueueService.ts(5 hunks)packages/core/lib/queues/HandlerContainer.ts(7 hunks)packages/core/lib/queues/MessageSchemaContainer.ts(3 hunks)packages/core/lib/queues/MessageTypeResolver.ts(1 hunks)packages/core/lib/types/queueOptionsTypes.ts(2 hunks)packages/core/test/queues/HandlerContainer.spec.ts(1 hunks)packages/core/test/queues/HandlerContainer.types.spec.ts(1 hunks)packages/core/test/queues/MessageTypeResolver.spec.ts(1 hunks)packages/gcp-pubsub/README.md(2 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts(4 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts(5 hunks)packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.spec.ts(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
packages/core/test/queues/HandlerContainer.types.spec.ts (1)
packages/core/lib/queues/HandlerContainer.ts (4)
MessageHandlerConfig(82-133)HandlerConfigOptions(60-80)HandlerContainerOptions(239-254)HandlerContainer(256-409)
packages/core/lib/queues/MessageTypeResolver.ts (2)
packages/core/lib/index.ts (8)
MessageTypeResolverContext(77-77)MessageTypeResolverFn(78-78)MessageTypeResolverConfig(76-76)isMessageTypePathConfig(83-83)isMessageTypeLiteralConfig(82-82)isMessageTypeResolverFnConfig(84-84)resolveMessageType(85-85)extractMessageTypeFromSchema(81-81)packages/core/lib/queues/HandlerContainer.ts (1)
resolveMessageType(306-330)
packages/amqp/test/fakes/CustomFakeConsumer.ts (3)
packages/core/lib/queues/HandlerContainer.ts (1)
MessageHandlerConfigBuilder(135-225)packages/core/lib/messages/baseMessageSchemas.ts (1)
PublisherBaseMessageType(10-10)packages/schemas/lib/messages/baseMessageSchemas.ts (1)
PublisherBaseMessageType(25-25)
packages/core/lib/queues/MessageSchemaContainer.ts (1)
packages/core/lib/queues/MessageTypeResolver.ts (6)
MessageTypeResolverConfig(98-119)MessageTypeResolverContext(5-15)resolveMessageType(156-177)isMessageTypePathConfig(124-128)isMessageTypeLiteralConfig(133-137)extractMessageTypeFromSchema(187-196)
packages/core/lib/queues/HandlerContainer.ts (1)
packages/core/lib/queues/MessageTypeResolver.ts (6)
MessageTypeResolverConfig(98-119)MessageTypeResolverContext(5-15)resolveMessageType(156-177)isMessageTypePathConfig(124-128)isMessageTypeLiteralConfig(133-137)extractMessageTypeFromSchema(187-196)
🪛 markdownlint-cli2 (0.18.1)
README.md
34-34: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
35-35: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
36-36: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
37-37: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
103-103: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
🔇 Additional comments (45)
packages/metrics/lib/prometheus/metrics/message-time/PrometheusMessageQueueTimeMetric.spec.ts (1)
210-210: LGTM! Improves test determinism.Using a precomputed
timestampvariable instead ofDate.now()inside the loop makes the test deterministic and prevents potential flakiness from time-dependent test data.packages/amqp/test/fakes/CustomFakeConsumer.ts (1)
8-35: LGTM! Clean migration to literal message type resolver.The changes demonstrate the new resolver-based routing pattern:
CUSTOM_FAKE_MESSAGE_TYPEconstant provides a clear, reusable literal typemessageTypeResolver: { literal: ... }replaces field-based routing for catch-all behavior- Explicit
messageTypein handler options aligns with the new API requirementsThis is a good example of the migration path from legacy routing to the new flexible resolver system.
packages/gcp-pubsub/README.md (2)
787-791: LGTM! Clear documentation of DLQ_MESSAGE_TYPE export.The updated import statement helpfully documents the new
DLQ_MESSAGE_TYPEconstant with its value, making it easy for users to understand the DLQ message type convention.
827-827: LGTM! Accurate explanation of DLQ resolver behavior.The documentation correctly describes that DLQ consumers use a literal message type resolver, treating all messages uniformly. This aligns with the DLQ use case where message structure is less predictable.
packages/core/lib/index.ts (1)
75-86: LGTM! Well-structured public API for the new MessageTypeResolver feature.The exports provide a complete and type-safe API surface:
- Type exports for configuration and context
- Type guards for discriminating resolver modes
- Utility functions for resolution and schema extraction
This aligns with the documented migration from
NO_MESSAGE_TYPE_FIELDto the new resolver-based approach.UPGRADING.md (1)
3-70: LGTM! Comprehensive and well-structured migration guide.The upgrading documentation provides:
- Clear explanation of breaking changes
- Multiple migration scenarios with before/after code examples
- Explicit guidance on the
messageTyperequirement for custom resolvers- Platform-specific notes for GCP Pub/Sub DLQ consumers
This will significantly ease the migration burden for users upgrading to the new version.
packages/core/test/queues/MessageTypeResolver.spec.ts (1)
1-212: LGTM! Comprehensive test coverage for MessageTypeResolver.The test suite provides excellent coverage:
- ✅ Type guard validation for all three resolver modes
- ✅ Literal mode behavior
- ✅ Path-based resolution with edge cases (kebab-case field names)
- ✅ Custom resolver with messageAttributes propagation and event mapping
- ✅ Error handling for missing fields and null/undefined data
- ✅ Schema extraction with various edge cases
The tests are well-organized and demonstrate practical use cases, making them valuable both for validation and as documentation.
packages/core/test/queues/HandlerContainer.spec.ts (1)
162-397: LGTM! Thorough test coverage for message type resolution.The test suite comprehensively validates the new resolution system:
- ✅ Legacy
messageTypeFieldbackward compatibility- ✅ All three
messageTypeResolvermodes (literal, path, custom function)- ✅
messageAttributespropagation to custom resolvers- ✅ Explicit
messageTypein handler options with correct precedence- ✅ Error cases: missing configuration, unresolvable types, duplicate types
- ✅ Practical patterns (e.g., GCS event type mapping:
OBJECT_FINALIZE→storage.object.created)The tests effectively document the API behavior and validate both registration-time and runtime resolution logic.
packages/core/lib/types/queueOptionsTypes.ts (2)
7-7: LGTM!Import for the new
MessageTypeResolverConfigtype is correctly added to support the new resolver-based routing option.
79-125: Well-documented API addition with clear precedence rules.The documentation clearly explains:
- The three resolver modes (field path, literal, custom function)
- Precedence behavior (
messageTypeResolverovermessageTypeField)- Migration path with deprecation notice on the legacy field
The examples are practical and cover common use cases including Cloud Storage notifications.
packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts (3)
11-14: LGTM!Good practice to export the DLQ message type as a constant. This allows tests and other code to reference the same type string consistently.
39-53: Correct type update to omit both resolver options.Properly omitting both
messageTypeFieldandmessageTypeResolverfromPubSubDlqConsumerOptionsprevents users from accidentally overriding the DLQ's uniform type handling behavior.
119-120: Clean migration to literal resolver.Using
messageTypeResolver: { literal: DLQ_MESSAGE_TYPE }is the correct approach for DLQ consumers. This aligns with the new resolver system and provides clear semantics that all DLQ messages are treated as the same type.packages/core/test/queues/HandlerContainer.types.spec.ts (4)
1-53: LGTM! Comprehensive type test setup.Good approach using vitest's
expectTypeOffor compile-time type validation. The test schemas correctly usez.literal()for the type field, which is essential for the schema-based type extraction to work.
54-105: Thorough type inference tests for the builder.Tests correctly validate that:
- Handler receives the correct narrowed message type from the schema
- Context type is properly propagated
- Optional
messageTypein handler options is correctly typed
143-192: Good runtime behavior documentation in type tests.The comments on lines 185-188 clearly explain the design decision that
resolveHandleraccepts any string at compile time with runtime validation. This is appropriate since the message type comes fromresolveMessageType()which extracts it from validated messages.
194-228: Complete coverage of MessageTypeResolverConfig variants.Tests validate all three resolver modes and correctly verify:
messageDatais typed asunknownmessageAttributesis typed asRecord<string, unknown> | undefined- Resolver function must return
stringpackages/core/lib/queues/MessageSchemaContainer.ts (4)
4-11: LGTM!Imports are correctly added for the new MessageTypeResolver utilities and types.
16-24: Clear deprecation and documentation.Good documentation explaining precedence and deprecation of the legacy field.
72-91: Logic is correct but note exception behavior.The method correctly prioritizes
messageTypeResolverovermessageTypeField. Note that when a resolver is configured and fails (throws), the exception propagates up. This is consistent with the behavior inHandlerContainer.resolveMessageType.
118-150: Well-structured priority system for schema registration.The resolution order is clear:
- Literal type from resolver config (same for all schemas)
- Extract from schema using messageTypePath
- Fall back to DEFAULT_SCHEMA_KEY for custom resolvers
This correctly handles all three resolver modes.
packages/core/README.md (4)
73-126: Excellent documentation on message type resolution.The introduction clearly explains:
- What message type is and why it matters
- The two configuration approaches with clear guidance on when to use each
- All three resolver modes with examples
This will significantly help users understand and adopt the new flexible resolver system.
127-366: Comprehensive platform-specific examples.The real-world examples cover major platforms and scenarios:
- AWS SQS, EventBridge, SNS
- Apache Kafka (including CloudEvents headers)
- GCP Pub/Sub (own events, Cloud Storage notifications, Eventarc/CloudEvents)
- Single-type queues pattern
The Cloud Storage notification example is particularly valuable as it demonstrates why custom resolvers are needed when type info is in attributes rather than the message body.
464-512: Clear handler configuration guidance.Good documentation on:
- The
messageTypeoption requirement for custom resolvers- Priority order for determining handler message type
- Complete example with handler registration and container setup
655-669: API reference properly updated.The type definitions for
MessageTypeResolverContext,MessageTypeResolverFn, andMessageTypeResolverConfigare clear and accurate.packages/core/lib/queues/MessageTypeResolver.ts (4)
1-15: LGTM!The
MessageTypeResolverContexttype is well-documented and appropriately flexible for handling messages from different sources (PubSub, SQS, etc.).
17-43: LGTM!The
MessageTypeResolverFntype is well-documented with clear examples demonstrating both attribute-based and nested data extraction patterns.
45-119: LGTM!The
MessageTypeResolverConfigdiscriminated union is well-designed with comprehensive documentation and practical examples for Cloud Storage notifications and CloudEvents format.
121-146: LGTM!The type guard functions correctly narrow the discriminated union types using the
inoperator.packages/core/lib/queues/AbstractQueueService.ts (5)
64-68: LGTM!The new imports are correctly added to support the flexible message type resolution feature.
119-126: LGTM!The deprecation annotation on
messageTypeFieldprovides a clear migration path while maintaining backwards compatibility withmessageTypeResolver.
158-160: LGTM!The constructor correctly initializes both the legacy
messageTypeField(with empty string default for backwards compatibility) and the newmessageTypeResolver.
181-212: LGTM!Both
resolveConsumerMessageSchemaContainerandresolvePublisherMessageSchemaContainerare consistently updated to accept and propagate themessageTypeResolverconfiguration.
214-238: Note: Different error handling vs. HandlerContainer.This method returns
undefinedwhen resolution fails, whileHandlerContainer.resolveMessageTypethrows an error. This difference appears intentional based on usage context, but ensure consumers handle theundefinedreturn appropriately.packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (4)
145-149: LGTM!The
HandlerContaineris correctly initialized with the newmessageTypeResolveroption.
286-289: LGTM!The
resolveSchemacall correctly passes message attributes to support attribute-based type resolution.
359-380: LGTM!The error handling for message type resolution is comprehensive: the lock is properly released, the error is reported, and the message is correctly marked as invalid before terminal handling.
474-479: LGTM!The
resolveSchemaoverride correctly accepts and propagatesmessageAttributesto the underlying schema container.packages/core/lib/queues/HandlerContainer.ts (7)
8-15: LGTM!The necessary types and functions are correctly imported from
MessageTypeResolver.ts.
66-71: LGTM!The
messageTypeoption is well-documented, clearly explaining when explicit type specification is required (custom resolver) vs. optional (path or literal modes).
90-94: LGTM!The
messageTypeproperty is correctly added toMessageHandlerConfigwith clear documentation about its routing purpose.
245-253: LGTM!The deprecation of
messageTypeFieldand addition ofmessageTypeResolverwith clear precedence documentation provides a smooth migration path.
265-276: LGTM!The class properties and constructor correctly initialize both the new
messageTypeResolverand the deprecatedmessageTypeFieldfor backwards compatibility.
306-330: LGTM!The
resolveMessageTypemethod correctly implements the priority-based resolution:messageTypeResolverfirst, thenmessageTypeField, with clear error messages when resolution fails.
332-355: LGTM!The private helper methods correctly abstract the resolution logic, with
getMessageTypePathForSchemareturningundefinedfor literal/custom resolver modes where schema extraction isn't applicable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (7)
packages/sqs/README.md (2)
542-592: Custom field names section conflicts with later examples usingmessage.messageType.
Here the schema useseventType(Line 550) and the resolver points toeventType(Line 570), so handlers will observemessage.eventTypeunless the toolkit normalizes it. Later examples (e.g., logger/predicates) refer tomessage.messageType, which is confusing in the context of this section. Either (a) explicitly state normalization behavior, or (b) adjust later generic snippets to use the field name from the schema being discussed.
873-895: Replacemessage.messageTypein generic snippets with the actual type field for that example (or use a neutraltypeaccessor).
Multiple snippets usemessage.messageType(e.g., Line 887, Line 1524, Line 1558) while earlier in the README the discriminator field is frequentlymessageType, sometimeseventType, and for EventBridge it’sdetail-type. If the toolkit does not normalize this, these snippets are misleading.Also applies to: 1522-1527, 1556-1561
packages/sns/test/consumers/SnsSqsPermissionConsumer.ts (1)
154-170: Consider removingcreationConfig: options.creationConfig as anyor refactoring the type narrowing.The
as anyescape hatch obscures type mismatches in thecreationConfigshape, making it harder to catch configuration errors during test setup. While this is test code (reducing the practical risk), the pattern appears consistently across multiple test consumers, suggesting a TypeScript limitation withPick<Union>narrowing. Either eliminate the cast by improving the type definition, or add an inline type assertion comment explaining why it's necessary.packages/core/lib/queues/MessageSchemaContainer.ts (2)
43-59:resolveSchema()can throw even though it returnsEither— catch resolver errors and return{ error }.
Right nowresolveMessageType(...)may throw (missing path / custom resolver), which escapes as an exception and breaks theEithercontract.public resolveSchema( // biome-ignore lint/suspicious/noExplicitAny: This is expected message: Record<string, any>, attributes?: Record<string, unknown>, ): Either<Error, ZodSchema<MessagePayloadSchemas>> { - const messageType = this.resolveMessageTypeFromData(message, attributes) + let messageType: string | undefined + try { + messageType = this.resolveMessageTypeFromData(message, attributes) + } catch (e) { + return { error: e instanceof Error ? e : new Error(String(e)) } + } const schema = this.messageSchemas[messageType ?? DEFAULT_SCHEMA_KEY] if (!schema) { return { error: new Error( `Unsupported message type: ${messageType ?? DEFAULT_SCHEMA_KEY.toString()}`, ), } } return { result: schema } }Also applies to: 64-74
98-127: Add validation to throw an actionable error when custom resolver is used with multiple schemas.When
messageTypeResolveris configured as a resolver function with multiple schemas, all schemas get mapped toSymbol(NO_MESSAGE_TYPE), triggering a cryptic "Duplicate schema for type: Symbol(NO_MESSAGE_TYPE)" error. Since custom resolver mode cannot extract types at schema registration time, validate in theresolveMapmethod that whenisMessageTypeResolverFnConfig(messageTypeResolver)is true andarray.length > 1, throw a descriptive error directing users to use either a single schema,messageTypePath, orliteralconfig instead.packages/core/lib/queues/HandlerSpy.ts (1)
176-224:addProcessedMessage(...)breaks ID/type observability whenmessageis null (wrong id) and resolves waiters with the unsynthesized object.
This makeswaitForMessageWithId(...)unreliable for failed parses and can resolve promises withmessage: nulleven though you store a synthesized message in the buffer.addProcessedMessage( processingResult: SpyResultInput<MessagePayloadSchemas>, messageId: string | undefined, messageType: string | typeof TYPE_NOT_RESOLVED, ) { const resolvedMessageId = processingResult.message?.[this.messageIdField] ?? messageId ?? randomUUID() @@ const resolvedProcessingResult = processingResult.message ? (processingResult as SpyResultOutput<MessagePayloadSchemas>) : ({ ...processingResult, message: { - [this.messageIdField]: messageId, + [this.messageIdField]: resolvedMessageId, type: resolvedMessageType === 'TYPE_NOT_RESOLVED' ? 'FAILED_TO_RESOLVE' : resolvedMessageType, }, } as SpyResultOutput<MessagePayloadSchemas>) @@ if (foundPromise) { - foundPromise.resolve(processingResult) + foundPromise.resolve(resolvedProcessingResult)packages/core/lib/queues/AbstractQueueService.ts (1)
657-684: Custom resolvers accessing messageData fields are incompatible with payload offloading.The code only preserves the message type field for
messageTypePathresolvers during payload offloading. Customresolverfunctions that extract type frommessageDatafields (as shown in the README examples likemessageData['detail-type']for EventBridge) will fail when the message is consumed with an offloaded payload, sincemessageDatais not preserved in the offloaded pointer.This incompatibility is not documented and there is no validation preventing users from combining custom resolvers with payload offloading. Either add validation to prevent this combination or extend the offloading logic to support custom resolvers.
🧹 Nitpick comments (13)
packages/amqp/test/utils/testContext.ts (1)
144-204: DRY: extract the sharedmessageTypeResolverinto a constant to avoid divergence.
The same{ messageTypePath: 'type' }object is repeated in three publisher configs; consider a singleconstused by all three.+const TEST_MESSAGE_TYPE_RESOLVER = { messageTypePath: 'type' } as const + // ... newPublisherOptions: { isLazyInitEnabled: true, handlerSpy: true, messageIdField: 'id', - messageTypeResolver: { messageTypePath: 'type' }, + messageTypeResolver: TEST_MESSAGE_TYPE_RESOLVER, }, // ... newPublisherOptions: { isLazyInitEnabled: false, handlerSpy: true, messageIdField: 'id', - messageTypeResolver: { messageTypePath: 'type' }, + messageTypeResolver: TEST_MESSAGE_TYPE_RESOLVER, }, // ... newPublisherOptions: { handlerSpy: true, messageIdField: 'id', - messageTypeResolver: { messageTypePath: 'type' }, + messageTypeResolver: TEST_MESSAGE_TYPE_RESOLVER, },packages/sqs/test/consumers/SqsPermissionConsumerFifo.ts (1)
101-167: Allow overridingmessageTypeResolverviaoptions(don’t hard-code).
Right nowmessageTypeResolveris forced to{ messageTypePath: 'messageType' }, preventing callers from testing other resolution modes in this reusable test consumer.- messageTypeResolver: { messageTypePath: 'messageType' }, + messageTypeResolver: options.messageTypeResolver ?? { messageTypePath: 'messageType' },packages/sqs/README.md (4)
88-93: Clarify thatmessageTypeResolverselects “where to read type”, not that the message must contain a “unique message type field”.
Line 89 implies the message necessarily has a single discriminator “field”, but the PR’s goal is that type may come from nested paths / attributes / custom resolver. Consider rewording to avoid contradicting the new flexibility.- - A unique message type field (discriminator for routing) - configurable via `messageTypeResolver` (required) + - A message type discriminator (used for routing) - configurable via `messageTypeResolver` (required)
125-146: Examples only showmessageTypePath; consider adding (or linking to) literal/custom resolver examples for SQS docs.
Given the PR introduces “literal / path / custom fn” resolution, the SQS README would be more complete if it briefly mentions the other resolver modes (or links to the core docs section that does), so readers don’t assume path is the only supported format.Also applies to: 193-213, 239-268, 309-347, 423-469, 471-538, 560-592
793-796: Handler routing description should align with “resolver” terminology.
Line 795 says “using the discriminator field”, but withmessageTypeResolverit’s not always a field (could be derived / from metadata). Suggest wording tweak.-Handlers process messages based on their type. Messages are routed to the appropriate handler using the discriminator field (configurable via `messageTypeResolver`): +Handlers process messages based on their type. Messages are routed to the appropriate handler using the resolved message type (configurable via `messageTypeResolver`):
1972-1977:messageTypeResolverbullet could be more concrete about supported resolver shapes.
Line 1974 currently says “Configuration containing the message type for routing” but doesn’t indicate accepted forms (path/literal/custom). A short hint here would reduce back-and-forth.packages/sns/lib/sns/AbstractSnsPublisher.ts (1)
122-132: Resolver-basedmessageTypein logs/errors is the right direction; consider reusing + logging the parsed message
Small follow-ups to reduce surprises:
- Prefer
this.resolveMessageLog(parsedMessage, messageType)(validated) instead ofmessage.- Compute
messageTypeonce per publish attempt and reuse for both logging andInternalError.details(avoid double resolution / inconsistent values if a custom resolver is non-deterministic).Also applies to: 167-179
packages/amqp/lib/AbstractAmqpConsumer.ts (1)
102-110: Good resolver adoption; sanitize/capmessageTypebefore embedding intotransactionSpanId
SincemessageTypecomes from message content, consider normalizing (allowed charset) and truncating to prevent odd span IDs / cardinality blow-ups when type resolution yields unexpected values.Also applies to: 154-163
packages/core/test/queues/HandlerContainer.spec.ts (1)
162-197: Misleading describe name: “with messageTypeField (legacy)” is actuallymessageTypePath.
Consider renaming the block to reduce confusion when reading failures (e.g. “with messageTypePath (legacy field-based)”).packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts (1)
11-15: DLQ single-type routing via literal resolver is clean; docs should match behavior.
The implementation does use routing (just a single type), so consider rewording the class docs bullet to “does not require type in payload; uses a literal resolver”.Also applies to: 37-60, 115-126
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1)
365-379: Normalize non-Errorthrows before callinghandleError(...).
resolveErrorisunknown; if someone throws a string/object from a custom resolver, downstream error handling/logging may break.- } catch (resolveError) { + } catch (resolveError) { await releaseLock.release() - this.handleError(resolveError) + this.handleError(resolveError instanceof Error ? resolveError : new Error(String(resolveError))) this.handleMessageProcessed({ message: validatedMessage, processingResult: { status: 'error', errorReason: 'invalidMessage', }, messageProcessingStartTimestamp, queueName: this.subscriptionName ?? this.topicName, }) this.handleTerminalError(message, 'invalidMessage') return }packages/core/lib/queues/HandlerSpy.ts (1)
98-115:ANY_MESSAGE_TYPEfiltering only applies at the top level — docs/readme should reflect that.
If nested matching is a use-case, you’ll need recursive stripping; otherwise clarify “top-level fields”.packages/core/lib/queues/HandlerContainer.ts (1)
361-371: Consider clarifying error message for literal mode.When using literal resolver mode, all handlers without explicit
messageTypewill be assigned the same literal value. If a developer attempts to register multiple handlers in literal mode, the error "Duplicate handler for message type: X" might be confusing since they didn't explicitly create duplicates—the literal resolver assigned the same type to all handlers.Consider enhancing the error message to indicate when the duplicate is due to literal resolver configuration:
if (acc[messageType]) { - throw new Error(`Duplicate handler for message type: ${messageType}`) + const duplicateSource = literalType === messageType + ? ' (all handlers receive the same literal type from resolver)' + : '' + throw new Error(`Duplicate handler for message type: ${messageType}${duplicateSource}`) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (61)
README.md(2 hunks)UPGRADING.md(1 hunks)examples/sns-sqs/lib/common/TestPublisherManager.ts(1 hunks)examples/sns-sqs/lib/common/UserConsumer.ts(1 hunks)packages/amqp/lib/AbstractAmqpConsumer.ts(2 hunks)packages/amqp/lib/AbstractAmqpPublisher.ts(2 hunks)packages/amqp/test/consumers/AmqpPermissionConsumer.ts(1 hunks)packages/amqp/test/fakes/FakeQueueConsumer.ts(1 hunks)packages/amqp/test/fakes/FakeTopicConsumer.ts(1 hunks)packages/amqp/test/publishers/AmqpPermissionPublisher.ts(1 hunks)packages/amqp/test/utils/testContext.ts(3 hunks)packages/core/README.md(7 hunks)packages/core/lib/events/DomainEventEmitter.ts(1 hunks)packages/core/lib/index.ts(1 hunks)packages/core/lib/queues/AbstractQueueService.ts(8 hunks)packages/core/lib/queues/HandlerContainer.ts(7 hunks)packages/core/lib/queues/HandlerSpy.ts(3 hunks)packages/core/lib/queues/MessageSchemaContainer.ts(3 hunks)packages/core/lib/queues/MessageTypeResolver.ts(1 hunks)packages/core/lib/types/queueOptionsTypes.ts(3 hunks)packages/core/package.json(1 hunks)packages/core/test/queues/HandlerContainer.spec.ts(4 hunks)packages/core/test/queues/HandlerContainer.types.spec.ts(1 hunks)packages/core/test/queues/HandlerSpy.spec.ts(13 hunks)packages/core/vitest.config.ts(1 hunks)packages/gcp-pubsub/README.md(12 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts(4 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts(6 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts(2 hunks)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts(1 hunks)packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts(1 hunks)packages/gcp-pubsub/test/pubsub/PubSubPublisherManager.spec.ts(2 hunks)packages/gcs-payload-store/README.md(2 hunks)packages/kafka/lib/AbstractKafkaService.ts(2 hunks)packages/metrics/package.json(1 hunks)packages/sns/README.md(7 hunks)packages/sns/lib/sns/AbstractSnsPublisher.ts(2 hunks)packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts(1 hunks)packages/sns/lib/sns/SnsPublisherManager.spec.ts(1 hunks)packages/sns/lib/sns/fakes/FakeConsumer.ts(1 hunks)packages/sns/lib/utils/snsAttributeUtils.ts(1 hunks)packages/sns/package.json(1 hunks)packages/sns/test/consumers/CreateLocateConfigMixConsumer.ts(1 hunks)packages/sns/test/consumers/SnsSqsEntityConsumer.ts(1 hunks)packages/sns/test/consumers/SnsSqsPermissionConsumer.ts(1 hunks)packages/sns/test/consumers/SnsSqsPermissionConsumerFifo.ts(1 hunks)packages/sns/test/publishers/CreateLocateConfigMixPublisher.ts(1 hunks)packages/sns/test/publishers/SnsPermissionPublisher.ts(1 hunks)packages/sns/test/publishers/SnsPermissionPublisherFifo.ts(1 hunks)packages/sns/test/utils/testContext.ts(1 hunks)packages/sqs/README.md(14 hunks)packages/sqs/examples/eventbridge-consumer-example.ts(1 hunks)packages/sqs/lib/sqs/AbstractSqsConsumer.ts(3 hunks)packages/sqs/lib/sqs/AbstractSqsPublisher.ts(2 hunks)packages/sqs/package.json(1 hunks)packages/sqs/test/consumers/SqsEventBridgeConsumer.spec.ts(1 hunks)packages/sqs/test/consumers/SqsEventBridgeConsumer.ts(2 hunks)packages/sqs/test/consumers/SqsPermissionConsumer.ts(1 hunks)packages/sqs/test/consumers/SqsPermissionConsumerFifo.ts(1 hunks)packages/sqs/test/publishers/SqsPermissionPublisher.ts(1 hunks)packages/sqs/test/publishers/SqsPermissionPublisherFifo.ts(1 hunks)
✅ Files skipped from review due to trivial changes (3)
- packages/gcs-payload-store/README.md
- packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts
- packages/sqs/test/consumers/SqsEventBridgeConsumer.ts
🚧 Files skipped from review as they are similar to previous changes (3)
- README.md
- packages/gcp-pubsub/README.md
- packages/core/lib/queues/MessageTypeResolver.ts
🧰 Additional context used
🧬 Code graph analysis (7)
packages/core/lib/queues/AbstractQueueService.ts (3)
packages/core/lib/queues/MessageTypeResolver.ts (4)
MessageTypeResolverConfig(97-117)MessageTypeResolverContext(5-15)resolveMessageType(154-175)isMessageTypePathConfig(122-126)packages/core/lib/queues/MessageSchemaContainer.ts (1)
MessageSchemaContainer(24-131)packages/core/lib/queues/HandlerSpy.ts (1)
TYPE_NOT_RESOLVED(31-31)
packages/core/lib/queues/HandlerSpy.ts (2)
packages/core/lib/index.ts (3)
ANY_MESSAGE_TYPE(67-67)TYPE_NOT_RESOLVED(73-73)objectMatches(101-101)packages/core/lib/utils/matchUtils.ts (1)
objectMatches(39-65)
packages/core/lib/queues/HandlerContainer.ts (1)
packages/core/lib/queues/MessageTypeResolver.ts (6)
MessageTypeResolverConfig(97-117)MessageTypeResolverContext(5-15)resolveMessageType(154-175)isMessageTypePathConfig(122-126)isMessageTypeLiteralConfig(131-135)extractMessageTypeFromSchema(185-194)
packages/kafka/lib/AbstractKafkaService.ts (2)
packages/core/lib/index.ts (1)
TYPE_NOT_RESOLVED(73-73)packages/core/lib/queues/HandlerSpy.ts (1)
TYPE_NOT_RESOLVED(31-31)
packages/core/lib/types/queueOptionsTypes.ts (1)
packages/core/lib/queues/MessageTypeResolver.ts (1)
MessageTypeResolverConfig(97-117)
packages/core/test/queues/HandlerContainer.types.spec.ts (2)
packages/core/lib/queues/HandlerContainer.ts (5)
MessageHandlerConfigBuilder(135-225)MessageHandlerConfig(82-133)HandlerConfigOptions(60-80)HandlerContainerOptions(239-249)HandlerContainer(251-382)packages/core/lib/queues/MessageTypeResolver.ts (1)
MessageTypeResolverConfig(97-117)
packages/core/lib/queues/MessageSchemaContainer.ts (3)
packages/schemas/lib/events/eventTypes.ts (1)
CommonEventDefinition(22-33)packages/core/lib/queues/MessageTypeResolver.ts (6)
MessageTypeResolverConfig(97-117)MessageTypeResolverContext(5-15)resolveMessageType(154-175)isMessageTypePathConfig(122-126)isMessageTypeLiteralConfig(131-135)extractMessageTypeFromSchema(185-194)packages/core/lib/queues/HandlerContainer.ts (1)
resolveMessageType(296-306)
🪛 markdownlint-cli2 (0.18.1)
UPGRADING.md
11-11: Spaces inside code span elements
(MD038, no-space-in-code)
29-29: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
30-30: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
31-31: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
32-32: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
33-33: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
34-34: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
35-35: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
36-36: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
38-38: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
39-39: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
40-40: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
41-41: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
42-42: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
43-43: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
44-44: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
45-45: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
46-46: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
47-47: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
48-48: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
49-49: Unordered list indentation
Expected: 6; Actual: 12
(MD007, ul-indent)
53-53: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
54-54: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
98-98: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
99-99: Unordered list indentation
Expected: 2; Actual: 4
(MD007, ul-indent)
100-100: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
101-101: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
102-102: Unordered list indentation
Expected: 4; Actual: 8
(MD007, ul-indent)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (24.x, @message-queue-toolkit/schemas) / build
- GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: kafka (22.x) / build
- GitHub Check: kafka (24.x) / build
🔇 Additional comments (57)
packages/amqp/test/utils/testContext.ts (1)
151-156: Config migration looks correct: resolver path'type'preserves legacy behavior.
Switching tomessageTypeResolver: { messageTypePath: 'type' }keeps the message-type lookup equivalent to the previous rootmessageTypeField: 'type'while aligning tests with the new resolver-based API.Also applies to: 172-177, 193-197
packages/sns/test/consumers/SnsSqsPermissionConsumerFifo.ts (1)
168-168: LGTM!The migration from
messageTypeFieldto the newmessageTypeResolverpattern is correct. Using{ messageTypePath: 'messageType' }maintains the same behavior as before while adopting the new flexible type resolution API.packages/amqp/test/fakes/FakeTopicConsumer.ts (1)
44-44: LGTM! Clean migration to the new resolver API.The change correctly migrates from the legacy
messageTypeFieldto the newmessageTypeResolverAPI while maintaining the same behavior (resolving message type from the 'type' field).packages/amqp/test/consumers/AmqpPermissionConsumer.ts (1)
78-78: LGTM! Migration to resolver-based API is correct.The change correctly migrates from the legacy
messageTypeFieldto the newmessageTypeResolverwithmessageTypePath. This maintains equivalent functionality while adopting the flexible resolver-based approach.packages/sqs/README.md (1)
1680-1763: EventBridge sections:messageTypeResolver: { messageTypePath: 'detail-type' }looks consistent; verify escaping/typing expectations.
The examples correctly treatdetail-typeas a top-level key. Just ensure your published types/docs mention that hyphenated keys require bracket access in TS when referenced directly (e.g.,message['detail-type'])—though in these snippets you wisely don’t access it directly.Also applies to: 1810-1840, 1872-1898
packages/amqp/test/publishers/AmqpPermissionPublisher.ts (1)
37-52: Verify resolver config shape is correct (messageTypePath).This migration looks consistent, but please double-check the new
messageTypeResolvercontract accepts{ messageTypePath: 'messageType' }(key name + nesting), since this is easy to mismatch during refactors.packages/amqp/lib/AbstractAmqpPublisher.ts (1)
90-96: Resolve message type once and reuse (consistency + avoid recomputation).Right now it’s resolved once for logging (Line 91) and again for error details (Line 125), potentially against a mutated
message(Line 96). Consider computingmessageTypeonce per call and reusing it in both places.+ const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' const resolvedLogMessage = this.resolveMessageLog(message, messageType) this.logMessage(resolvedLogMessage) } message = this.updateInternalProperties(message) try { this.publishInternal(objectToBuffer(message), options) this.handleMessageProcessed({ message, messageProcessingStartTimestamp, // @ts-expect-error it will throw an error before if id field is not set messageId: message[this.messageIdField], processingResult: { status: 'published' }, queueName: this.resolveTopicOrQueue(), }) } catch (err) { ... details: copyWithoutUndefined({ publisher: this.constructor.name, // @ts-expect-error queueName: this.queueName, exchange: this.exchange, - messageType: this.resolveMessageTypeFromMessage(message) ?? 'unknown', + messageType, }),[uggest_recommended_refactor]
Also applies to: 117-128
packages/sqs/package.json (2)
55-55: TypeScript patch bump looks good.The TypeScript version bump from ^5.9.2 to ^5.9.3 is a minor patch release and should be safe.
57-57: No Zod API compatibility issues detected.The SQS package uses only standard, stable Zod 4.x APIs that are fully compatible with version 4.1.13. All schema definitions use basic patterns (z.object, z.string, z.array, z.enum, z.literal, z.optional, z.nullable) with no deprecated error customization syntax or advanced features that would be affected by the version bump within the 4.x series. The explicit
zod/v4import path confirms the code is already aligned with Zod 4.x conventions, and no breaking changes are anticipated for this minor version update.packages/gcp-pubsub/test/pubsub/PubSubPublisherManager.spec.ts (1)
67-67: Configuration updated correctly.Both occurrences at lines 67 and 211 properly migrate from
messageTypeField: 'type'tomessageTypeResolver: { messageTypePath: 'type' }. The oldmessageTypeFieldAPI has been removed from production code; remaining references inHandlerContainer.spec.tsare intentional test coverage for legacy behavior and error cases.packages/sns/test/utils/testContext.ts (2)
154-154: Flag inconsistency between AI summary and actual code.The AI summary claims three publisher configurations were updated (
queuePublisherManager.newPublisherOptions,queuePublisherManagerNoLazy.newPublisherOptions,topicPublisherManager.newPublisherOptions), but this file contains only onepublisherManagerconfiguration. The summary appears to conflate changes across multiple files or contains inaccurate information about this specific file.
154-154: Migration to resolver-based type resolution is complete and consistent across the codebase.The change from
messageTypeField: 'type'tomessageTypeResolver: { messageTypePath: 'type' }correctly implements the new flexible type resolution API. Verification confirms no legacymessageTypeFieldpattern remains in any test file, and all test contexts across sns, sqs, amqp, gcp-pubsub, and core packages have been consistently updated to use the newmessageTypeResolverpattern.packages/sqs/examples/eventbridge-consumer-example.ts (1)
112-112: Configuration change is correct and properly handles hyphenated field names.The migration from
messageTypeFieldtomessageTypeResolverwithmessageTypePath: 'detail-type'correctly aligns with the PR's flexible type resolution objective. The implementation uses bracket notation for property access, which natively handles hyphenated field names without requiring any special handling.packages/sns/lib/utils/snsAttributeUtils.ts (1)
46-50: No action required. The parameter namemessageTypePathrefers to the field name at the root level of the message (e.g.,'type','messageType','detail-type'), not a nested path like'payload.type'. The implementation at line 53 correctly accessesschema.shape[messageTypePath].value, and all call sites throughout the codebase confirm this usage pattern. This is consistent with the documented behavior inqueueOptionsTypes.tswhich describes the option as "field name at the root of the message."packages/sns/package.json (1)
62-62: No action needed. Zod 4.1.13 is a safe patch bump containing only bug fixes (regex error message accuracy, partial record handling, recursive tuple recursion prevention, catchall iteration correctness) with no breaking changes. The SNS package tests do not snapshot or explicitly validate Zod error message formatting—they only assert on error structure via.issuesand pattern-match on error types like/invalid_type/. The bump is within the existing peer dependency constraint (>=3.25.76 <5.0.0) and poses no risk to test stability.packages/sns/test/consumers/SnsSqsEntityConsumer.ts (1)
74-87: Migration tomessageTypeResolveris complete in SNS test suite.The SNS tests have been fully migrated away from the legacy
messageTypeFieldconfiguration. The newmessageTypeResolver: { messageTypePath: 'type' }format at line 82 is correct and consistent with the codebase requirements. No additional changes needed.packages/sns/test/consumers/CreateLocateConfigMixConsumer.ts (1)
31-41: This looks good—themessageTypeResolverconfiguration is correct.The
messageTypePath: 'type'matches theMessageTypeResolverConfigtype definition and aligns with the corresponding publisher configuration inCreateLocateConfigMixPublisher.ts. The test messages are correctly configured to extract the type discriminator from the root-leveltypefield.packages/core/vitest.config.ts (1)
10-13: LGTM!Enabling type checking for
*.types.spec.tsfiles is a good practice and aligns well with the introduction of the resolver-based message type system.packages/sns/lib/sns/fakes/FakeConsumer.ts (1)
39-39: LGTM!The migration from
messageTypeField: 'type'tomessageTypeResolver: { messageTypePath: 'type' }is consistent with the PR's flexible type resolution objectives.packages/sns/test/publishers/SnsPermissionPublisherFifo.ts (1)
45-45: LGTM!The migration to
messageTypeResolver: { messageTypePath: 'messageType' }is consistent with the flexible type resolution pattern.packages/core/lib/events/DomainEventEmitter.ts (1)
183-183: LGTM!Adding
event.typeas the third parameter toaddProcessedMessageis consistent with the updated HandlerSpy API and enables proper message type tracking for background handlers.packages/amqp/test/fakes/FakeQueueConsumer.ts (1)
28-28: LGTM!The migration to
messageTypeResolver: { messageTypePath: 'type' }is consistent with the project-wide adoption of the flexible type resolution pattern.packages/sns/lib/sns/SnsPublisherManager.messageDeduplication.spec.ts (1)
44-44: LGTM!The test configuration correctly adopts the new
messageTypeResolverpattern, ensuring the deduplication tests exercise the resolver-based message type resolution.packages/core/test/queues/HandlerSpy.spec.ts (2)
3-3: LGTM!Adding the
TYPE_NOT_RESOLVEDimport is necessary for testing scenarios where message type cannot be resolved.
90-97: LGTM!All
addProcessedMessagecalls have been correctly updated to include the message type as the third parameter. The use of'test.type'for regular test cases andTYPE_NOT_RESOLVEDfor invalid message scenarios (line 416) is appropriate and consistent with the new API signature.Also applies to: 110-126, 138-154, 166-173, 187-203, 228-235, 251-267, 283-299, 322-342, 354-372, 384-400, 410-417
packages/core/package.json (1)
51-51: Zod version 4.1.13 is valid and secure.The upgrade from
^4.0.17to^4.1.13is confirmed valid. Version 4.1.13 exists on npm as the current latest stable release, and no known security vulnerabilities affect this version. No CVEs apply to zod 4.1.13 (CVE-2023-4316 only affected versions ≤3.22.3).packages/sqs/lib/sqs/AbstractSqsPublisher.ts (2)
121-122: LGTM! Consistent resolver-based type resolution for logging.The change correctly uses the new
resolveMessageTypeFromMessagehelper with an appropriate'unknown'fallback, aligning with the flexible type resolution approach introduced in this PR.
167-167: Consistent with logging path.Error handling correctly mirrors the logging path by using the same resolver-based approach for determining message type in error details.
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1)
112-112: LGTM! Correctly migrated to resolver-based configuration.The change from
messageTypeField: 'messageType'tomessageTypeResolver: { messageTypePath: 'messageType' }correctly adopts the new flexible type resolution API while maintaining equivalent behavior for root-level field access.packages/sqs/test/consumers/SqsPermissionConsumer.ts (1)
119-119: LGTM! Consistent with the resolver-based migration pattern.The SQS test consumer correctly adopts the new
messageTypeResolverconfiguration, matching the migration approach used across other transport implementations.packages/sns/test/publishers/CreateLocateConfigMixPublisher.ts (1)
27-27: LGTM! Correctly migrated with preserved field path.The migration maintains the
'type'path (as opposed to'messageType'used elsewhere), correctly reflecting this publisher's specific message schema structure.packages/kafka/lib/AbstractKafkaService.ts (2)
16-16: LGTM! Import added for the new type resolution sentinel.The
TYPE_NOT_RESOLVEDsymbol is correctly imported to indicate when message type cannot be determined.
95-100: LGTM! Clear explanation and correct usage of TYPE_NOT_RESOLVED.The comment clearly documents why Kafka uses
TYPE_NOT_RESOLVED(lacking unified message type resolution), and theaddProcessedMessagecall correctly passes it as the third argument for the handler spy.Note that
messageMetricsManageron line 120 uses the string'unknown'formessageType. This is intentional since metrics typically need string labels, but consider whether Kafka should eventually support proper type resolution for consistency with other transports.packages/sns/lib/sns/SnsPublisherManager.spec.ts (1)
132-141: Test config correctly migrated to resolver-based type resolution
messageTypeResolver: { messageTypePath: 'type' }matches the message shape used in this spec and aligns with the new API.examples/sns-sqs/lib/common/UserConsumer.ts (1)
21-33: Example updated correctly tomessageTypeResolver
This keeps the example aligned with the new resolver-based routing while preserving the existing handler setup.packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)
45-50: Publisher fixture correctly migrated tomessageTypeResolver
Path-based resolver preserves existing message type extraction semantics.packages/sqs/test/publishers/SqsPermissionPublisherFifo.ts (1)
22-51: FIFO publisher config migration looks correct
Type resolution update is isolated and shouldn’t affect FIFO behavior.packages/sqs/test/publishers/SqsPermissionPublisher.ts (1)
36-58: Test publisher correctly moved tomessageTypeResolver
This should keep routing/type extraction identical to the oldmessageTypeFieldconfig.examples/sns-sqs/lib/common/TestPublisherManager.ts (1)
24-41: Example publisher manager config migration looks correct
messageTypeResolver: { messageTypePath: 'type' }matches the sample message shapes and new API.UPGRADING.md (1)
3-102: LGTM! Comprehensive migration documentation.The upgrade guide clearly documents the breaking changes and provides actionable migration steps with before/after examples. The three-mode resolver approach (messageTypePath, literal, custom resolver) is well-explained with practical examples for common scenarios including EventBridge DLQ handling.
packages/sqs/test/consumers/SqsEventBridgeConsumer.spec.ts (1)
164-164: LGTM! Consistent migration to resolver-based type resolution.The change correctly replaces
messageTypeFieldwithmessageTypeResolver: { messageTypePath: 'detail-type' }, aligning with the EventBridge event structure wheredetail-typeis the discriminator field.packages/sns/test/publishers/SnsPermissionPublisher.ts (1)
45-45: LGTM! Standard migration to resolver configuration.The change correctly adopts the new
messageTypeResolver: { messageTypePath: 'messageType' }pattern for the SNS publisher.packages/sns/README.md (1)
97-99: LGTM! Thorough documentation updates.The README comprehensively documents the migration to
messageTypeResolver, with consistent examples throughout. The explanation of the three resolver modes (messageTypePath, literal, custom resolver) provides clear guidance for users.Also applies to: 146-146, 217-217, 290-290, 377-377, 498-498, 544-544
packages/core/lib/index.ts (1)
67-88: LGTM! Public API updates align with resolver-based architecture.The exports correctly reflect the migration strategy:
- Removed
NO_MESSAGE_TYPE_FIELD(documented breaking change)- Added
MessageTypeResolvertypes and utilities- Added
HandlerSpyconstants (ANY_MESSAGE_TYPE,TYPE_NOT_RESOLVED) for flexible message handlingpackages/sqs/lib/sqs/AbstractSqsConsumer.ts (2)
232-234: LGTM! Correct migration to resolver-based configuration.The HandlerContainer initialization now uses
messageTypeResolverinstead of the legacymessageTypeField, aligning with the new flexible resolution mechanism.
381-382: Good defensive coding with 'unknown' fallback.The fallback to
'unknown'whenresolveMessageTypeFromMessagereturns undefined ensures thattransactionSpanIdis always defined, preventing runtime errors whenmessageTypeResolveris not configured or resolution fails.Also applies to: 528-528
packages/core/lib/types/queueOptionsTypes.ts (1)
7-7: LGTM! Well-documented type updates.The changes correctly reflect the new resolver-based architecture:
messageTypeinProcessedMessageMetadatais now optional, supporting cases where resolution may fail or is not configuredCommonQueueOptions.messageTypeResolveris thoroughly documented with three modes (Field Path, Literal, Custom Resolver) and practical examplesAlso applies to: 28-28, 80-111
packages/core/test/queues/HandlerContainer.types.spec.ts (1)
1-241: LGTM! Comprehensive type coverage for resolver-based architecture.The type tests thoroughly validate the new
MessageTypeResolvertypes andHandlerContainerinference logic, covering all three resolver modes (messageTypePath, literal, custom resolver) and handler return types. Type assertions will run in CI as vitest's typecheck is enabled in the configuration with the pattern**/*.types.spec.ts.packages/core/test/queues/HandlerContainer.spec.ts (2)
134-137: Good update tomessageTypeResolverin existing routing/execution tests.
Keeps routing behavior coverage while aligning with the new API.Also applies to: 151-155, 412-415
198-397: Nice coverage for literal/path/custom resolver + precedence + failure modes.
The matrix here should catch most regressions in resolver behavior (especially attributes-vs-data precedence).packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1)
145-149: Good propagation of Pub/Sub attributes into schema + type resolution, with correct lock release on failure.
This makes attribute-based routing viable and avoids leaking dedupe locks when type resolution fails.Also applies to: 285-289, 358-380, 473-478
packages/core/lib/queues/HandlerSpy.ts (1)
12-32: NewANY_MESSAGE_TYPE/TYPE_NOT_RESOLVEDexports are useful for resolver-based routing tests.
Good ergonomics for tests that can’t know the payloadtypefield.packages/core/README.md (1)
73-365: Excellent comprehensive documentation for message type resolution.The new "Message Type Resolution" section is thorough and well-structured, providing clear guidance across multiple platforms and use cases. The real-world examples effectively demonstrate when and how to use each resolver mode (field path, literal, custom function).
packages/core/lib/queues/AbstractQueueService.ts (2)
205-222: Clean helper method for centralized type resolution.The
resolveMessageTypeFromMessagemethod properly encapsulates type resolution logic and returnsundefinedwhen no resolver is configured, allowing callers to handle the absence of type information gracefully.
267-278: Good use of TYPE_NOT_RESOLVED for consistent spy logging.Resolving the message type once and using
TYPE_NOT_RESOLVEDfor cases where type cannot be determined ensures consistent behavior in handler spy tracking and metrics.packages/core/lib/queues/HandlerContainer.ts (2)
288-306: Well-designed public API for type resolution.The
resolveMessageTypemethod provides a clean public interface for resolving message types before handler lookup. Clear error handling when resolver is not configured.
330-381: Solid priority-based handler registration logic.The three-level priority system for determining handler message types is well-implemented:
- Explicit
messageTypein handler options (highest)- Literal type from resolver config
- Schema extraction via
messageTypePathClear error handling for unresolved and duplicate types.
| import { MessageHandlerConfigBuilder } from '@message-queue-toolkit/core' | ||
| import { z } from 'zod/v4' | ||
| import { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the specific file mentioned
cat -n packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts | head -20Repository: kibertoad/message-queue-toolkit
Length of output: 927
🏁 Script executed:
# Search for all Zod imports across the codebase
rg "import.*zod" -A 2 -B 2Repository: kibertoad/message-queue-toolkit
Length of output: 28560
🌐 Web query:
zod v4 ESM export structure default import vs named import
💡 Result:
Short answer: Zod v4 exposes a namespace-style ESM API — use namespace imports (import * as z from "zod" or "zod/v4") or named imports for specific symbols; there is no stable default-export you should rely on (import z from "zod" is unsupported/unsafe and tooling/Node resolution can break). [1][2][3]
Details (very brief):
- Official docs show the intended pattern: import * as z from "zod" (or import * as z from "zod/v4"). Use named imports only for non-namespace exports (e.g., import { ZodError } from "zod"). [1]
- Community tooling and linters enforce namespace imports for Zod v4 to ensure correct bundling and compatibility. [2]
- Practical reasons: Zod’s packaging uses ESM/conditional exports; relying on a default CJS-style export or
import z from "zod/v4"can fail under certain module resolutions (Node/tooling) — prefer the namespace form. [3]
References:
[1] Zod docs: zod import examples.
[2] eslint-plugin-import-zod (enforces namespace imports).
[3] zod GitHub issue discussing 'zod/v4' import resolution problems.
Fix Zod import to use namespace import pattern (import * as z from 'zod/v4').
The codebase has mixed Zod import styles: named imports ({ z }), default imports (z), and type imports. Per official Zod v4 documentation, the default import (import z from 'zod/v4') is unsupported and unsafe under certain module resolutions. The recommended pattern is namespace import: import * as z from 'zod/v4'. Update this file and enforce consistency across the codebase using a linter rule.
🤖 Prompt for AI Agents
In packages/gcp-pubsub/lib/pubsub/AbstractPubSubDlqConsumer.ts around lines 2 to
4 the Zod import uses a named import (`{ z } from 'zod/v4'`) which is unsafe;
change it to the namespace pattern `import * as z from 'zod/v4'`, update any
local references that expect `z` to use the new namespace import (no other code
changes needed if identifiers remain `z`), and add/update the repository
linter/ESLint rule (or a codemod) to enforce `import * as z from 'zod/v4'`
across the codebase for consistency.
| "@vitest/coverage-v8": "^3.2.4", | ||
| "rimraf": "^6.0.1", | ||
| "typescript": "^5.9.2", | ||
| "typescript": "^5.9.3", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all package.json files and extract TypeScript versions
echo "=== TypeScript versions in all package.json files ==="
fd -e package.json -x sh -c 'echo "File: $1"; grep -A0 -B0 "\"typescript\"" "$1" 2>/dev/null' _ {} \;Repository: kibertoad/message-queue-toolkit
Length of output: 128
🏁 Script executed:
#!/bin/bash
# First, find all package.json files
echo "=== All package.json files in repository ==="
fd -e package.json
echo ""
echo "=== Content of packages/metrics/package.json ==="
cat packages/metrics/package.jsonRepository: kibertoad/message-queue-toolkit
Length of output: 2027
🏁 Script executed:
#!/bin/bash
# Use git ls-files to find all package.json files
echo "=== package.json files via git ls-files ==="
git ls-files | grep package.json
echo ""
echo "=== Checking packages/ directory structure ==="
ls -la packages/ 2>/dev/null | head -20 || echo "packages/ directory not found or inaccessible"
echo ""
echo "=== Git diff to see what changed in this commit ==="
git diff HEAD~1 HEAD -- '**/package.json' | grep -A5 -B5 'typescript' | head -50Repository: kibertoad/message-queue-toolkit
Length of output: 1535
🏁 Script executed:
#!/bin/bash
# Extract TypeScript versions from all package.json files
echo "=== TypeScript versions across all package.json files ==="
for file in $(git ls-files | grep package.json); do
version=$(grep -o '"typescript"[[:space:]]*:[[:space:]]*"[^"]*"' "$file" 2>/dev/null)
if [ -n "$version" ]; then
echo "$file: $version"
fi
done
echo ""
echo "=== Root package.json TypeScript version ==="
grep '"typescript"' package.jsonRepository: kibertoad/message-queue-toolkit
Length of output: 916
Inconsistent TypeScript version across workspace packages.
The TypeScript dev dependency in packages/metrics/package.json was bumped to ^5.9.3, but this creates version drift across the monorepo. Only four packages (gcp-pubsub, metrics, sns, sqs) use ^5.9.3, while eight others (amqp, core, gcs-payload-store, kafka, outbox-core, redis-message-deduplication-store, s3-payload-store, schemas) remain at ^5.9.2. Either update all workspace packages to ^5.9.3 for consistency, or revert metrics to ^5.9.2 to align with the majority.
| const messageTypes = messageSchemas.map((schema) => { | ||
| // @ts-expect-error | ||
| return schema.shape[messageTypeField].value as string | ||
| return schema.shape[messageTypePath].value as string | ||
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for missing or invalid schema fields.
The function accesses schema.shape[messageTypePath].value without validation, which could cause runtime errors if:
- A schema doesn't have the specified field
- The field exists but isn't a literal schema (no
.valueproperty) messageTypePathis undefined or empty
The @ts-expect-error suppresses these legitimate concerns. Consider adding validation:
const messageTypes = messageSchemas.map((schema) => {
- // @ts-expect-error
- return schema.shape[messageTypePath].value as string
+ const field = schema.shape?.[messageTypePath]
+ if (!field || !('value' in field)) {
+ throw new Error(`Schema missing required field "${messageTypePath}" or field is not a literal`)
+ }
+ return field.value as string
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const messageTypes = messageSchemas.map((schema) => { | |
| // @ts-expect-error | |
| return schema.shape[messageTypeField].value as string | |
| return schema.shape[messageTypePath].value as string | |
| }) | |
| const messageTypes = messageSchemas.map((schema) => { | |
| const field = schema.shape?.[messageTypePath] | |
| if (!field || !('value' in field)) { | |
| throw new Error(`Schema missing required field "${messageTypePath}" or field is not a literal`) | |
| } | |
| return field.value as string | |
| }) |
Our current type resolution logic is very rigid - it assumes that type is a field on a root level of the message.
This may not always be the case - sometimes type might be a part of payload, or, in case of Google PubSub, may even be coming from attributes metadata.
This provides a more flexible mechanism for type resolution.
Will release it separately to avoid publishing semver major packages that do not need a version bump (metrics that had a flaky test which is located inside /lib)
Summary by CodeRabbit
New Features
Breaking Changes
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.