Skip to content

Conversation

@kibertoad
Copy link
Owner

@kibertoad kibertoad commented Jan 8, 2026

Summary by CodeRabbit

  • New Features

    • Consumers can defer startup in non-blocking mode and will auto-start once required resources become available; improved runtime readiness tracking and clearer start behavior.
  • Refactor

    • Startup flow reorganized to separate consumer initialization from resource discovery, simplifying lifecycle management and startup sequencing.
  • Tests

    • Expanded coverage for blocking and non-blocking startup scenarios, timing variants, and propagation of topic/queue/subscription information.

✏️ Tip: You can customize this high-level summary in your review settings.

@kibertoad kibertoad requested a review from Drodevbar January 8, 2026 15:15
@kibertoad kibertoad added the patch label Jan 8, 2026
@coderabbitai
Copy link

coderabbitai bot commented Jan 8, 2026

📝 Walkthrough

Walkthrough

Adds resource-readiness tracking and a deferred/non-blocking startup flow for SNS/SQS consumers, extracts consumer startup into a dedicated method, tightens a couple of TypeScript assertions, and adds tests covering blocking and non-blocking startup scenarios.

Changes

Cohort / File(s) Summary
Core SNS-SQS Consumer Readiness
packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Added private resourcesReady and startRequested flags, and a public start() override to defer or trigger startup based on resource readiness; onResourcesReady now sets readiness, updates ARNs/URLs/names, and can trigger startConsumers; DLQ init deferred until resources are ready.
SQS Consumer Lifecycle
packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Added public get isRunning(): boolean and protected async startConsumers(): Promise<void> to encapsulate consumer creation, wiring, and startup; start() now awaits consumer startup via startConsumers().
SNS Init Typing
packages/sns/lib/utils/snsInitter.ts
Replaced non-null assertions with explicit as string casts for topic/subscription identifiers used when building ARNs; no runtime behavior change.
Tests — Startup Polling
packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts, packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
Added extensive tests for blocking/non-blocking start() flows, timing of resource availability, propagation of topicArn/queueUrl/queueName, and pre/post isRunning assertions in SQS consumer tests.

Sequence Diagram(s)

sequenceDiagram
    participant Caller as Caller
    participant Consumer as AbstractSnsSqsConsumer
    participant Initter as initSnsSqs / snsInitter
    participant AWS as SNS/SQS (AWS)
    participant Callback as onResourcesReady

    Caller->>Consumer: start()
    alt blocking mode or resourcesReady==true
        Consumer->>Initter: initSnsSqs() / init()
        Initter->>AWS: ensure topic & queue exist (may create)
        AWS-->>Initter: topicArn / queueUrl
        Initter->>Callback: onResourcesReady(topicArn, queueUrl, ...)
        Callback->>Consumer: set resourcesReady = true, update ARNs/URLs/names
        Consumer->>Consumer: startConsumers()
        Consumer->>AWS: poll messages (SQS consumers)
    else non-blocking mode & resources not ready
        Consumer->>Initter: initSnsSqs() (returns pending)
        Consumer->>Caller: return immediately (startRequested=true)
        AWS-->>Initter: topicArn / queueUrl (when ready)
        Initter->>Callback: onResourcesReady(...)
        Callback->>Consumer: set resourcesReady = true
        alt startRequested was set
            Consumer->>Consumer: startConsumers()
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Possibly related PRs

Suggested labels

minor

Suggested reviewers

  • Drodevbar
  • kjamrog
  • CarlosGamero

Poem

🐰 I waited by the queue and tree,

until ARNs whispered back to me.
A nimble hop — consumers start,
resources ready, beating heart.
Hooray, the messages run free! 🥕✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Avoid starting consumers prematurely' accurately summarizes the main change: introducing resource readiness tracking and delayed consumer startup in non-blocking mode to prevent premature initialization.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ebc4c75 and 36a3873.

📒 Files selected for processing (3)
  • packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
  • packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/sqs/lib/sqs/AbstractSqsConsumer.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts (2)
examples/sns-sqs/lib/common/Dependencies.ts (3)
  • sqsClient (17-17)
  • snsClient (18-18)
  • stsClient (19-19)
packages/sns/lib/utils/snsUtils.ts (1)
  • assertTopic (79-142)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
  • GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/s3-payload-store) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/metrics) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/amqp) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/redis-message-deduplication-store) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/outbox-core) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
  • GitHub Check: general (24.x, @message-queue-toolkit/amqp) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
  • GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
  • GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
  • GitHub Check: kafka (24.x) / build
  • GitHub Check: kafka (22.x) / build
🔇 Additional comments (2)
packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts (1)

995-1002: LGTM! Clean state verification for consumer lifecycle.

The added assertions properly verify the isRunning state before and after start(), improving test coverage for the consumer startup flow. The explanatory comments enhance readability.

packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts (1)

223-629: Excellent test coverage for startup resource polling!

These four new tests comprehensively cover blocking and non-blocking startup modes across different resource availability scenarios:

  1. Blocking mode with delayed resources (lines 223-265): Properly verifies that start() waits for resources to become available
  2. Blocking mode with immediate availability (lines 267-299): Confirms immediate startup when resources exist
  3. Non-blocking mode with delayed resources (lines 546-594): Validates immediate return from start() and deferred consumer activation
  4. Non-blocking mode with immediate availability (lines 596-629): Ensures correct behavior when resources are ready

The tests follow excellent practices:

  • Use vi.waitFor for async conditions (avoiding flakiness)
  • Include clear state assertions (isRunning before/after operations)
  • Verify proper propagation of topicArn, queueUrl, and other subscription properties
  • Provide proper cleanup with consumer.close()

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@kibertoad kibertoad merged commit f68fc0a into main Jan 8, 2026
37 checks passed
@kibertoad kibertoad deleted the fix/proper-fix branch January 8, 2026 15:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants