-
Notifications
You must be signed in to change notification settings - Fork 7
Correctly handle scenario when subscriptionArn is not passed #390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…admittedly, is the usual case)
📝 WalkthroughWalkthroughAdds asynchronous polling helpers for SNS topics and SQS queues, integrates them into initSnsSqs to support blocking and non-blocking startupResourcePolling, exposes polling error callback types, expands tests for polling/subscription scenarios, and adds (duplicated) README docs for startup polling. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Application
participant Init as initSnsSqs
participant PollTopic as pollForTopic
participant PollQueue as pollForQueue
participant SNS as SNS Topic
participant SQS as SQS Queue
App->>Init: call initSnsSqs({ startupResourcePolling, nonBlocking?, ... })
alt blocking or topic exists
Init->>PollTopic: poll/check topic
PollTopic->>SNS: query topic
SNS-->>PollTopic: topic found
PollTopic-->>Init: topicArn
Init->>PollQueue: poll/check queue
PollQueue->>SQS: query queue
SQS-->>PollQueue: queue found
PollQueue-->>Init: queueUrl
Init-->>App: return ready result (blocking)
else non-blocking & topic missing
Init->>PollTopic: start background pollForTopic (returns early)
Init-->>App: return partial result (resourcesReady: false)
PollTopic->>SNS: periodic checks
SNS-->>PollTopic: becomes available
PollTopic-->>Init: onResourceAvailable(topic)
Init->>PollQueue: start background pollForQueue (if not already)
PollQueue->>SQS: periodic checks
SQS-->>PollQueue: becomes available
PollQueue-->>Init: onResourceAvailable(queue)
Init->>App: invoke onResourcesReady callback
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~40 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/sns/lib/utils/snsInitter.ts (2)
57-79: Consider using a consistent return type withpollForTopic.The
pollForQueuefunction returnsunknown | undefinedwhilepollForTopicreturns a structuredTopicPollingResult. For consistency and maintainability, you might want to define a similarQueuePollingResulttype that includes both the result and the queue URL.♻️ Optional: Use consistent structured return type
+// Helper type for queue polling result +type QueuePollingResult = { + queueResult: unknown | undefined + queueUrl: string +} + // Helper function to poll for SQS queue availability async function pollForQueue( sqsClient: SQSClient, queueUrl: string, startupResourcePolling: NonNullable<SNSSQSQueueLocatorType['startupResourcePolling']>, extraParams?: ExtraParams, onResourceAvailable?: () => void, -): Promise<unknown | undefined> { - return await waitForResource({ +): Promise<QueuePollingResult> { + const queueResult = await waitForResource({ config: startupResourcePolling, ... }) + + return { queueResult, queueUrl } }
140-171: Consider documenting the non-blocking behavior limitation for subscription creation mode.In the subscription creation path (no subscriptionArn), when non-blocking mode is enabled and the topic isn't immediately available, the function returns early without starting background polling. This means the
onResourcesReadycallback will never be invoked in this scenario, unlike the existing subscription locator path (lines 249-267) which does start background polling.This is likely intentional since subscription creation can't proceed until the topic exists anyway, but it may be worth adding a comment to clarify this design choice for future maintainers.
📝 Optional: Add clarifying comment
// If non-blocking and topic wasn't immediately available, return early with resourcesReady: false // The caller will need to handle this case (e.g., retry later or use onResourcesReady callback) + // Note: Unlike the locator path, we don't start background polling here since subscription + // creation cannot proceed until the topic exists. The caller must retry init() entirely. if (nonBlocking && topicResult === undefined) {
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/sns/lib/utils/snsInitter.tspackages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts (3)
packages/sns/lib/utils/snsUtils.ts (1)
assertTopic(79-142)packages/core/lib/utils/startupResourcePollingUtils.ts (1)
StartupResourcePollingTimeoutError(53-66)packages/sns/lib/utils/snsInitter.ts (1)
initSnsSqs(98-345)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: general (24.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: kafka (22.x) / build
- GitHub Check: kafka (24.x) / build
🔇 Additional comments (6)
packages/sns/lib/utils/snsInitter.ts (3)
25-55: LGTM! Clean extraction of polling helpers.The
pollForTopichelper nicely encapsulates the topic availability polling logic with proper error handling and callback support. TheTopicPollingResulttype provides a clear contract for the return value.
225-234: LGTM! Clean integration of polling helpers.The polling helpers are properly integrated with the existing availability tracking mechanism. The callbacks correctly update state and trigger
notifyIfBothReady().Also applies to: 279-288
249-267: The code comment is accurate; there is no double notification risk.The
.then()handler and theonResourceAvailablecallback are mutually exclusive by design. ThewaitForResourcefunction does not callonResourceAvailablewhen a resource is immediately available (it returns the result directly at line 282-285 of startupResourcePollingUtils.ts). The callback is only invoked during background polling when the resource becomes available later. Therefore,notifyIfBothReady()will be called exactly once in all scenarios—either from the.then()handler or the callback, never both.packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts (3)
361-393: LGTM! Well-structured test for topic polling in subscription creation mode.The test properly:
- Sets up the scenario without subscriptionArn to trigger subscription creation path
- Uses background init with delayed topic creation
- Verifies all expected properties including that
subscriptionArnis defined after completion
395-415: LGTM! Timeout error scenario is properly tested.Good coverage of the timeout behavior with a short timeout (200ms) to ensure the test runs quickly while still validating the error path.
417-445: LGTM! Non-blocking mode test correctly validates early return behavior.The test properly verifies:
resourcesReady: falseis returnedsubscriptionArnis empty string (not undefined)queueNameis preserved even when not yet createdTesting at the
initSnsSqslevel is appropriate here since it gives direct access to the return value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @packages/sns/lib/utils/snsInitter.ts:
- Around line 392-401: The background pollForQueue call is missing an onError
handler so failures during non-blocking queue polling aren't forwarded to
onResourcesError; modify the pollForQueue invocation (the call that passes
sqsClient, queueUrl, startupResourcePolling, extraParams, and the success
callback that sets queueAvailable and calls notifyIfBothReady) to include an
onError callback which forwards the error to onResourcesError (and/or sets
queueAvailable=false if needed) so any timeout or failure in the background poll
is propagated to the existing resource error handling.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
packages/core/lib/index.tspackages/core/lib/utils/startupResourcePollingUtils.tspackages/sns/README.mdpackages/sns/lib/utils/snsInitter.tspackages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts (3)
packages/sns/lib/utils/snsInitter.ts (1)
initSnsSqs(226-458)packages/sns/lib/utils/snsUtils.ts (1)
assertTopic(79-142)packages/core/lib/utils/startupResourcePollingUtils.ts (1)
StartupResourcePollingTimeoutError(76-89)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (24.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (24.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (22.x, @message-queue-toolkit/schemas) / build
- GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (22.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: kafka (24.x) / build
- GitHub Check: kafka (22.x) / build
🔇 Additional comments (8)
packages/core/lib/utils/startupResourcePollingUtils.ts (2)
7-21: LGTM! Well-designed error context types.The
PollingErrorContextandPollingErrorCallbacktypes provide a clean way to distinguish between transient and final errors. TheisFinalflag is particularly useful for consumers to determine whether to retry or handle the error definitively.
326-336: LGTM! Proper error normalization and propagation.The catch block correctly:
- Normalizes errors using
isErrorto ensure Error instances- Marks background polling failures as final (
isFinal: true)- Invokes the error callback with proper context
packages/core/lib/index.ts (1)
110-111: LGTM! Proper public API exposure.The new error callback types are correctly exported alongside the existing startup resource polling utilities, making them available for consumers that need advanced error handling.
packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts (2)
360-414: LGTM! Comprehensive error callback testing.This test properly verifies that
onResourcesErroris invoked when background queue polling times out. The test:
- Uses appropriate timeouts for LocalStack
- Verifies the error message contains "Timeout"
- Confirms
isFinal: truein the context- Uses
vi.waitForfor reliable async assertions
417-553: LGTM! Thorough coverage of subscription creation mode.The new test suite comprehensively covers the scenario where
subscriptionArnis not provided:
- Blocking mode with topic polling
- Timeout handling when topic never appears
- Non-blocking mode with immediate return
- Background subscription creation with callbacks
This aligns well with the documented behavior in the README.
packages/sns/lib/utils/snsInitter.ts (3)
25-57: LGTM! Well-structured topic polling helper.The
pollForTopicfunction properly encapsulates topic availability checking with polling configuration, callbacks, and error handling. The return typeTopicPollingResultis clear and useful.
92-176: LGTM! Comprehensive subscription creation with polling.The
createSubscriptionWithPollingfunction handles the complex scenario of creating a subscription when the topic may not exist yet. The logic correctly:
- Handles both blocking and non-blocking modes
- Creates subscription after topic becomes available
- Invokes callbacks appropriately
- Catches and reports subscription creation errors
210-214: LGTM! Callback type properly integrated.The
onResourcesErrorcallback is correctly typed usingPollingErrorCallbackand well-documented with JSDoc explaining when it's invoked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @packages/sns/lib/utils/snsInitter.ts:
- Around line 362-381: pollForQueue is invoked without handling rejected
promises; attach a .catch handler to the promise returned by pollForQueue (the
same chain that currently uses .then(...)) to prevent unhandled rejections and
forward unexpected errors to the existing error callback: in the chain after
.then(result => { ... }), add .catch((err) => {
extraParams?.onResourcesError?.(err, { resource: 'queue', queueUrl }); }) (or
similar context), so any unexpected exception gets passed to
extraParams.onResourcesError and avoids unhandled promise rejections while
preserving the current success handling that sets queueAvailable and calls
notifyIfBothReady.
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/sns/lib/utils/snsInitter.tspackages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/sns/lib/utils/snsInitter.ts (10)
packages/sns/lib/index.ts (3)
SNSSQSQueueLocatorType(17-17)getTopicAttributes(44-44)SNSCreationConfig(5-5)packages/core/lib/index.ts (4)
ExtraParams(95-95)PollingErrorCallback(110-110)waitForResource(115-115)isStartupResourcePollingEnabled(109-109)packages/core/lib/utils/startupResourcePollingUtils.ts (3)
PollingErrorCallback(21-21)waitForResource(275-344)isStartupResourcePollingEnabled(350-354)packages/sns/lib/utils/snsUtils.ts (1)
getTopicAttributes(25-50)packages/sns/lib/sns/AbstractSnsService.ts (1)
SNSCreationConfig(42-45)packages/sqs/lib/index.ts (2)
SQSCreationConfig(17-17)getQueueAttributes(56-56)packages/sns/lib/types/TopicTypes.ts (2)
TopicResolutionOptions(4-4)isCreateTopicCommand(6-8)packages/sns/lib/utils/snsSubscriber.ts (1)
SNSSubscriptionOptions(18-21)packages/sqs/lib/utils/sqsUtils.ts (1)
getQueueAttributes(62-88)packages/sns/lib/utils/stsUtils.ts (1)
buildTopicArn(15-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/core) / build
- GitHub Check: general (24.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (20.x, @message-queue-toolkit/schemas) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (20.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (24.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/core) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: kafka (24.x) / build
- GitHub Check: kafka (22.x) / build
🔇 Additional comments (8)
packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts (3)
18-18: LGTM!Good improvement replacing a dynamic import with a static import for
initSnsSqs. This provides better type checking and cleaner test code.
360-467: LGTM!Good test coverage for
onResourcesErrorcallback scenarios. The tests properly verify:
- Callback invocation with timeout errors
- The
{ isFinal: true }context indicating terminal failure- Both queue and topic timeout paths
470-688: LGTM!Comprehensive test coverage for the subscription creation mode when
subscriptionArnis not provided. The tests properly cover:
- Blocking mode waiting for topic before subscription creation
- Timeout handling when topic never appears
- Non-blocking immediate return with partial state
- Background subscription creation with
onResourcesReadycallback- Immediate subscription when topic is already available
- Error callback invocation on polling timeout
packages/sns/lib/utils/snsInitter.ts (5)
25-57: LGTM!Well-designed helper function that encapsulates topic polling logic. The
TopicPollingResulttype cleanly captures both the polling result and resolved topic ARN.
59-90: LGTM!Good extraction of subscription creation logic into a dedicated helper. This reduces duplication and makes the main flow easier to follow.
92-177: LGTM!Solid implementation of the subscription creation with polling. The error handling in
onTopicReady(lines 128-137) properly catches errors and propagates them viaonResourcesError. The conditional callback wiring based onnonBlockingmode is correct.
269-310: LGTM!Good refactoring of the subscription creation path. The logic correctly differentiates between:
- Polling for an existing topic (
!isCreateTopicCommand)- Immediate subscription creation when the topic will be created
The use of
createSubscriptionWithPollinghelper cleanly encapsulates the polling + subscription logic.
211-216: LGTM!Good addition of the
onResourcesErrorcallback toInitSnsSqsExtraParams. The documentation clearly explains the purpose and when it's invoked (polling timeout or subscription creation failure).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/sns/lib/utils/snsInitter.ts (1)
315-316: Missing validation: topicArn or topicName must be provided when subscriptionArn is set.When
locatorConfig.subscriptionArnis provided, the code assumes eithertopicArnortopicNameis also set, but there's no validation. If both are undefined, an empty string is passed tobuildTopicArn, resulting in an invalid ARN:arn:aws:sns:${region}:${account}:.🔒 Add validation to prevent invalid ARN construction
const subscriptionTopicArn = - locatorConfig.topicArn ?? (await buildTopicArn(stsClient, locatorConfig.topicName ?? '')) + locatorConfig.topicArn ?? + (() => { + if (!locatorConfig.topicName) { + throw new Error( + 'Either topicArn or topicName must be specified in locatorConfig when subscriptionArn is provided' + ) + } + return buildTopicArn(stsClient, locatorConfig.topicName) + })()Or more simply:
+ if (!locatorConfig.topicArn && !locatorConfig.topicName) { + throw new Error( + 'Either topicArn or topicName must be specified in locatorConfig when subscriptionArn is provided' + ) + } + const subscriptionTopicArn = locatorConfig.topicArn ?? (await buildTopicArn(stsClient, locatorConfig.topicName ?? ''))
🤖 Fix all issues with AI agents
In @packages/sns/lib/utils/snsInitter.ts:
- Around line 276-278: The assignment to topicArnToWaitFor can pass an empty
string to buildTopicArn when both topicResolutionOptions.topicArn and
topicResolutionOptions.topicName are undefined; add explicit validation before
calling buildTopicArn to ensure at least one of topicResolutionOptions.topicArn
or topicResolutionOptions.topicName is present and non-empty, and if not throw a
clear error (or return a failure) instead of calling buildTopicArn, referencing
the topicResolutionOptions variable and the buildTopicArn function so the check
occurs immediately before computing topicArnToWaitFor.
- Around line 150-157: AbstractSnsSqsConsumer.init() currently assigns values
returned from initSnsSqs() without checking the resourcesReady flag; when
initSnsSqs() returns resourcesReady: false (non-blocking startup) it can return
empty subscriptionArn/queueUrl which the consumer then uses. Update
AbstractSnsSqsConsumer.init() to check the returned object from initSnsSqs()
(the variable holding topicResult/queue info) and if resourcesReady is false
either (a) do not assign subscriptionArn/queueUrl/queueName and register work to
run only in the onResourcesReady callback, or (b) throw/return early so
dependent operations are deferred; ensure any code paths that assume
subscriptionArn/queueUrl only run after resourcesReady === true or inside
onResourcesReady.
🧹 Nitpick comments (1)
packages/sns/lib/utils/snsInitter.ts (1)
86-88: Consider a more descriptive error message.The error message could include context like topic ARN and queue URL to aid debugging.
💬 Suggested improvement
if (!subscriptionArn) { - throw new Error('Failed to subscribe') + throw new Error(`Failed to create subscription for topic ${topicArn} and queue ${queueUrl}`) }
📜 Review details
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/sns/lib/utils/snsInitter.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/sns/lib/utils/snsInitter.ts (8)
packages/sns/lib/index.ts (4)
SNSSQSQueueLocatorType(17-17)getTopicAttributes(44-44)SNSCreationConfig(5-5)subscribeToTopic(36-36)packages/core/lib/utils/startupResourcePollingUtils.ts (2)
waitForResource(275-344)isStartupResourcePollingEnabled(350-354)packages/sns/lib/utils/snsUtils.ts (1)
getTopicAttributes(25-50)packages/sns/lib/sns/AbstractSnsService.ts (1)
SNSCreationConfig(42-45)packages/sns/lib/types/TopicTypes.ts (2)
TopicResolutionOptions(4-4)isCreateTopicCommand(6-8)packages/sns/lib/utils/snsSubscriber.ts (1)
subscribeToTopic(49-127)packages/sqs/lib/utils/sqsUtils.ts (1)
getQueueAttributes(62-88)packages/sns/lib/utils/stsUtils.ts (1)
buildTopicArn(15-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/schemas) / build
- GitHub Check: general (24.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: kafka (24.x) / build
- GitHub Check: kafka (22.x) / build
🔇 Additional comments (8)
packages/sns/lib/utils/snsInitter.ts (8)
4-5: LGTM!The import changes appropriately support the new polling and error handling functionality.
25-29: LGTM!The type definition clearly represents the polling result structure.
31-57: LGTM!The polling helper correctly delegates to
waitForResourceand properly handles the not_found case fromgetTopicAttributes.
179-203: LGTM!The queue polling helper follows the same pattern as
pollForTopicand correctly handles the not_found case.
211-215: LGTM!The new callback parameter is well-documented and properly typed.
293-309: LGTM!The refactoring to use the
createSubscriptionhelper improves code organization.
362-392: LGTM!The background queue polling implementation correctly handles both success and error cases, with appropriate error logging and callback invocation.
404-413: LGTM!The blocking queue polling correctly awaits the result and handles the availability callback.
| if (nonBlocking && topicResult === undefined) { | ||
| return { | ||
| subscriptionArn: '', | ||
| topicArn, | ||
| queueName, | ||
| queueUrl: '', | ||
| resourcesReady: false, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find usages of createSubscriptionWithPolling and initSnsSqs to verify resourcesReady is checked
rg -n -B3 -A10 'createSubscriptionWithPolling|initSnsSqs' --type=ts -g '!**/snsInitter.ts' | head -150Repository: kibertoad/message-queue-toolkit
Length of output: 15770
🏁 Script executed:
#!/bin/bash
# Find how initSnsSqs result is used in AbstractSnsSqsConsumer.ts
rg -n -B5 -A30 'initSnsSqs' packages/sns/lib/sns/AbstractSnsSqsConsumer.tsRepository: kibertoad/message-queue-toolkit
Length of output: 2512
🏁 Script executed:
#!/bin/bash
# Get the function signature and return type for initSnsSqs
cat -n packages/sns/lib/utils/snsInitter.ts | head -100Repository: kibertoad/message-queue-toolkit
Length of output: 4429
🏁 Script executed:
#!/bin/bash
# Get more of the snsInitter.ts file, focusing on createSubscriptionWithPolling and initSnsSqs signature
sed -n '92,200p' packages/sns/lib/utils/snsInitter.ts | cat -nRepository: kibertoad/message-queue-toolkit
Length of output: 4053
🏁 Script executed:
#!/bin/bash
# Find the initSnsSqs function signature and return type
sed -n '200,350p' packages/sns/lib/utils/snsInitter.ts | cat -nRepository: kibertoad/message-queue-toolkit
Length of output: 6487
🏁 Script executed:
#!/bin/bash
# Continue reading the initSnsSqs function
sed -n '350,450p' packages/sns/lib/utils/snsInitter.ts | cat -nRepository: kibertoad/message-queue-toolkit
Length of output: 4124
🏁 Script executed:
#!/bin/bash
# Continue reading the rest of initSnsSqs function
sed -n '450,480p' packages/sns/lib/utils/snsInitter.ts | cat -nRepository: kibertoad/message-queue-toolkit
Length of output: 1106
🏁 Script executed:
#!/bin/bash
# Check if AbstractSnsSqsConsumer does anything with resourcesReady after assignment
rg -n 'resourcesReady' packages/sns/lib/sns/AbstractSnsSqsConsumer.tsRepository: kibertoad/message-queue-toolkit
Length of output: 57
🏁 Script executed:
#!/bin/bash
# Check if AbstractSnsSqsConsumer checks for empty queueUrl or subscriptionArn
rg -n 'this\.queueUrl|this\.subscriptionArn|this\.topicArn' packages/sns/lib/sns/AbstractSnsSqsConsumer.ts -A5 -B2Repository: kibertoad/message-queue-toolkit
Length of output: 422
🏁 Script executed:
#!/bin/bash
# Check how the consumer's init method is called and if there's any error handling for empty values
rg -n 'consumer\.init\|\.init\(' packages/sns/lib/sns/AbstractSnsSqsConsumer.ts -B5 -A5 | head -50Repository: kibertoad/message-queue-toolkit
Length of output: 57
🏁 Script executed:
#!/bin/bash
# Let's check what happens after init is called - look for how this.subscriptionArn and this.queueUrl are used
rg -n 'this\.subscriptionArn|this\.queueUrl' packages/sns/lib/sns/ -A3 -B1 | head -100Repository: kibertoad/message-queue-toolkit
Length of output: 673
🏁 Script executed:
#!/bin/bash
# Check if there's any validation or checks in the consumer's init or startup methods that verify resourcesReady
rg -n 'init|startup' packages/sns/lib/sns/AbstractSnsSqsConsumer.ts -B2 -A15 | head -150Repository: kibertoad/message-queue-toolkit
Length of output: 2522
🏁 Script executed:
#!/bin/bash
# Check the test file to see how they handle resourcesReady in non-blocking mode
sed -n '526,600p' packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts | cat -nRepository: kibertoad/message-queue-toolkit
Length of output: 2862
🏁 Script executed:
#!/bin/bash
# Check if the consumer starts consuming messages before resourcesReady is checked
rg -n 'start\|consume\|listen' packages/sns/lib/sns/AbstractSnsSqsConsumer.ts -B3 -A10 | head -80Repository: kibertoad/message-queue-toolkit
Length of output: 57
🏁 Script executed:
#!/bin/bash
# Check the parent AbstractSqsConsumer to see if there's a start method
rg -n 'class AbstractSnsSqsConsumer' packages/sns/lib/sns/AbstractSnsSqsConsumer.ts -A20Repository: kibertoad/message-queue-toolkit
Length of output: 762
🏁 Script executed:
#!/bin/bash
# Check if there's validation that prevents starting before init completes or resourcesReady is true
rg -n 'resourcesReady' packages/sns/lib/ -B5 -A5Repository: kibertoad/message-queue-toolkit
Length of output: 8154
Check resourcesReady in AbstractSnsSqsConsumer.init() before using returned values.
The init() method in AbstractSnsSqsConsumer.ts (lines 98-110) does not check the resourcesReady flag after calling initSnsSqs(). When startup resource polling is enabled with non-blocking mode, initSnsSqs can return with resourcesReady: false and empty strings for subscriptionArn and queueUrl. The consumer currently stores these empty values without validation, which will cause issues when the consumer attempts to use these properties.
Either validate that resourcesReady is true before proceeding, or defer operations that depend on these values until the onResourcesReady callback is invoked.
🤖 Prompt for AI Agents
In @packages/sns/lib/utils/snsInitter.ts around lines 150 - 157,
AbstractSnsSqsConsumer.init() currently assigns values returned from
initSnsSqs() without checking the resourcesReady flag; when initSnsSqs() returns
resourcesReady: false (non-blocking startup) it can return empty
subscriptionArn/queueUrl which the consumer then uses. Update
AbstractSnsSqsConsumer.init() to check the returned object from initSnsSqs()
(the variable holding topicResult/queue info) and if resourcesReady is false
either (a) do not assign subscriptionArn/queueUrl/queueName and register work to
run only in the onResourcesReady callback, or (b) throw/return early so
dependent operations are deferred; ensure any code paths that assume
subscriptionArn/queueUrl only run after resourcesReady === true or inside
onResourcesReady.
| const topicArnToWaitFor = | ||
| topicResolutionOptions.topicArn ?? | ||
| (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add validation for topicArn and topicName before calling buildTopicArn.
If both topicResolutionOptions.topicArn and topicResolutionOptions.topicName are undefined, an empty string is passed to buildTopicArn, which would create an invalid ARN. While the validation at line 243 ensures at least one topic identifier exists, the logic flow doesn't guarantee that the locator config has topicArn or topicName set specifically.
🛡️ Add validation before building topic ARN
+ if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) {
+ throw new Error(
+ 'Either topicArn or topicName must be specified when using startup resource polling without topic creation'
+ )
+ }
+
const topicArnToWaitFor =
topicResolutionOptions.topicArn ??
(await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? ''))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const topicArnToWaitFor = | |
| topicResolutionOptions.topicArn ?? | |
| (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) | |
| if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) { | |
| throw new Error( | |
| 'Either topicArn or topicName must be specified when using startup resource polling without topic creation' | |
| ) | |
| } | |
| const topicArnToWaitFor = | |
| topicResolutionOptions.topicArn ?? | |
| (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) |
🤖 Prompt for AI Agents
In @packages/sns/lib/utils/snsInitter.ts around lines 276 - 278, The assignment
to topicArnToWaitFor can pass an empty string to buildTopicArn when both
topicResolutionOptions.topicArn and topicResolutionOptions.topicName are
undefined; add explicit validation before calling buildTopicArn to ensure at
least one of topicResolutionOptions.topicArn or topicResolutionOptions.topicName
is present and non-empty, and if not throw a clear error (or return a failure)
instead of calling buildTopicArn, referencing the topicResolutionOptions
variable and the buildTopicArn function so the check occurs immediately before
computing topicArnToWaitFor.
Summary by CodeRabbit
New Features
Documentation
Tests
✏️ Tip: You can customize this high-level summary in your review settings.