-
Notifications
You must be signed in to change notification settings - Fork 7
Improve error handling, introduce retry logic for GCP PubSub #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Warning Rate limit exceeded@kibertoad has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 7 minutes and 26 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughAdds subscription-level retry and reinitialization with exponential backoff to the GCP Pub/Sub consumer, introduces typed gRPC and subscription-not-found errors, removes a Pub/Sub utils module, updates exports, and adds/expands tests and README docs for deduplication, payload offloading, and subscription retry behavior. Changes
Sequence Diagram(s)sequenceDiagram
participant Consumer as PubSubConsumer
participant Initter as initWithRetry()
participant Subscription as Subscription
participant Handlers as EventHandlers
participant Reinit as reinitializeWithRetry()
Consumer->>Initter: start()
Initter->>Subscription: attempt to initialize subscription
alt transient init error (NOT_FOUND / PERMISSION_DENIED / retryable gRPC)
Subscription-->>Initter: error
Initter->>Initter: wait (exponential backoff) and retry
Initter->>Subscription: retry init (loop until maxRetries)
else initialized
Subscription-->>Initter: success
end
Initter->>Handlers: setupSubscriptionEventHandlers()
Handlers->>Subscription: attach message / error / close listeners
Subscription->>Handlers: message
Handlers->>Consumer: deliver -> process -> ack/nack
Handlers->>Consumer: deduplicate on success
alt runtime subscription error (retryable gRPC code)
Subscription->>Handlers: error (with code)
Handlers->>Handlers: handleSubscriptionError()
Handlers->>Reinit: trigger reinitializeWithRetry()
Reinit->>Subscription: remove listeners & close
Reinit->>Initter: re-init with backoff (loop until success or max)
Reinit->>Handlers: reattach handlers after success
else non-retryable
Handlers-->>Consumer: emit error to consumer error handler
end
alt subscription closed
Subscription->>Handlers: close
Handlers->>Handlers: handleSubscriptionClose()
alt still consuming
Handlers->>Reinit: trigger reinitializeWithRetry()
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (2)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts (2)
45-47: Consider isolating Redis cleanup to test-specific keys.Using
redis.flushall()clears all Redis data, which could interfere with parallel tests if they share the same Redis instance. Consider using key patterns with test-specific prefixes andSCAN/DELinstead.async function cleanRedis() { - await redis.flushall() + // Use SCAN to delete only keys with our test prefix + const pattern = `consumer:*` + let cursor = '0' + do { + const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100) + cursor = nextCursor + if (keys.length > 0) { + await redis.del(...keys) + } + } while (cursor !== '0') }
200-231: Arbitrary timeout in lock acquisition test could cause flakiness.The
2000mswait at line 219 assumes the message will be nacked and redelivered within that window. Consider using a more deterministic approach like waiting for a spy event or increasing the timeout with explanation.await publisher.publish(message) - // Wait a bit for processing attempts - message should be nacked and redelivered - await new Promise((resolve) => setTimeout(resolve, 2000)) + // Wait for the nack to be processed - we can't directly observe nacks, + // so we wait for processing attempts. If this becomes flaky, increase timeout. + await new Promise((resolve) => setTimeout(resolve, 3000))Alternatively, if the spy tracks nack events, use that for a more reliable wait.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
packages/gcp-pubsub/README.md(1 hunks)packages/gcp-pubsub/lib/index.ts(0 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts(9 hunks)packages/gcp-pubsub/lib/utils/pubSubUtils.ts(0 hunks)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts(1 hunks)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts(2 hunks)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts(1 hunks)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts(1 hunks)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts(1 hunks)packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts(1 hunks)packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts(1 hunks)packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts(3 hunks)packages/gcp-pubsub/vitest.config.ts(1 hunks)
💤 Files with no reviewable changes (2)
- packages/gcp-pubsub/lib/index.ts
- packages/gcp-pubsub/lib/utils/pubSubUtils.ts
🧰 Additional context used
🧬 Code graph analysis (5)
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts (3)
packages/redis-message-deduplication-store/lib/RedisMessageDeduplicationStore.ts (1)
RedisMessageDeduplicationStore(15-66)packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)
PubSubPermissionPublisher(34-62)packages/gcp-pubsub/test/utils/cleanupPubSub.ts (1)
deletePubSubTopic(3-9)
packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts (1)
packages/gcp-pubsub/lib/errors/PubSubConsumerErrorResolver.ts (1)
PubSubConsumerErrorResolver(6-34)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts (3)
packages/gcp-pubsub/test/utils/cleanupPubSub.ts (2)
deletePubSubTopicAndSubscription(27-42)deletePubSubSubscription(11-25)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1)
PubSubPermissionConsumer(62-142)packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)
PubSubPermissionPublisher(34-62)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts (5)
packages/redis-message-deduplication-store/lib/RedisMessageDeduplicationStore.ts (1)
RedisMessageDeduplicationStore(15-66)packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1)
PubSubPermissionConsumer(62-142)packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (1)
PubSubPermissionPublisher(34-62)packages/gcp-pubsub/test/utils/cleanupPubSub.ts (1)
deletePubSubTopicAndSubscription(27-42)packages/core/lib/message-deduplication/AcquireLockTimeoutError.ts (1)
AcquireLockTimeoutError(4-11)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (3)
packages/gcp-pubsub/test/fakes/FakeLogger.ts (1)
error(23-25)packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
PubSubMessage(4-4)packages/core/lib/index.ts (1)
DeduplicationRequesterEnum(17-17)
🪛 Gitleaks (8.30.0)
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts
[high] 278-278: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.
(generic-api-key)
🪛 markdownlint-cli2 (0.18.1)
packages/gcp-pubsub/README.md
1564-1564: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (26)
packages/gcp-pubsub/vitest.config.ts (2)
17-23: LGTM! Exclusions are appropriate.The added exclusions follow best practices:
FakeConsumerErrorResolver.tsis a test utility that doesn't need coverageMessageTypes.tscontains TypeScript type definitions with no runtime codepubSubSchemas.tslikely contains declarative Zod schemas that are better tested indirectly through integration tests
24-29: Excellent improvement to coverage thresholds!The increased thresholds (+1-5%) align well with the PR's goal of adding comprehensive test coverage. These modest increases are achievable and demonstrate the value of the new tests added in this PR.
packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts (5)
1-5: LGTM on imports and setup.The imports correctly use
zod/v4and relevant error types from the toolkit packages.
7-17: LGTM on SyntaxError test.The test correctly verifies that
SyntaxErrormaps toMessageInvalidFormatErrorwith the original message preserved.
19-33: LGTM on ZodError test.The test correctly constructs a Zod v4
ZodErrorwith the expected issue structure and validates mapping toMessageValidationError.
35-48: LGTM on StandardizedError test.The test correctly sets up the
Symbol.for('StandardizedErrorSymbol')property and validates thaterrorCodeis propagated.
50-68: LGTM on fallback error handling tests.Both tests verify that unknown/regular errors correctly map to
InternalErrorwith the generic message andINTERNAL_ERRORcode.packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (4)
37-63: Well-documented error codes with rationale.The documentation clearly explains why
PERMISSION_DENIEDandNOT_FOUNDare included for eventual consistency scenarios after Terraform deployments. The reference links to GCP docs and the related GitHub issue are helpful.
65-96: LGTM on retry options type and defaults.The default values (5 retries, 1s base delay, 30s max delay) provide reasonable exponential backoff behavior.
280-326: LGTM on initWithRetry implementation.The initialization retry logic correctly:
- Uses string matching for retryable error detection
- Implements exponential backoff with cap
- Logs attempts with relevant context
- Throws non-retryable errors immediately
571-579: Good defensive cleanup in close().Removing listeners before closing prevents the close handler from triggering unintended reinitialization.
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts (2)
52-52: LGTM on deduplication ID field configuration.Setting
messageDeduplicationIdField: 'id'correctly configures the publisher to use the message'sidfield for deduplication purposes.
31-31: No action needed. ThemessageDeduplicationConfigoption is properly passed to the base class via the...optionsspread operator at line 48. It does not need to be explicitly listed in thesuper()call.packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts (1)
267-323: LGTM on logMessages option test.The test validates that the
logMessages=truecode path executes without errors. The comment acknowledging the inability to easily verify log output is appropriate - the focus on ensuring the path doesn't crash is sufficient for this integration test.packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts (1)
34-35: LGTM on options expansion.Adding
subscriptionRetryOptionsandmessageDeduplicationIdFieldto the picked options correctly exposes the new retry and deduplication configuration for tests.packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts (2)
1-9: LGTM on new imports.The
OFFLOADED_PAYLOAD_SIZE_ATTRIBUTEimport is needed for the new error test case.
213-297: LGTM on payload retrieval error test.The test correctly validates the error path when an offloaded payload cannot be retrieved from GCS. The static analysis hint about an "API key" at line 278 is a false positive -
non-existent-key-12345is clearly a test fixture key referencing a non-existent GCS object, not actual credentials.packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts (4)
1-17: LGTM on imports and setup.Imports correctly include
AcquireLockTimeoutErrorfor mocking lock timeout scenarios andRedisMessageDeduplicationStorefor the deduplication store.
87-100: LGTM on normal deduplication processing test.The test correctly validates that a message is consumed normally when deduplication is enabled and no duplicate exists.
102-134: LGTM on duplicate message skipping test.The test properly validates the deduplication flow:
- First message is processed normally
- Second identical message is skipped with
skippedAsDuplicate: true- Handler counter remains at 1
233-259: LGTM on non-timeout error handling test.The test correctly validates that non-timeout errors from lock acquisition are swallowed and the message is still processed. This is important to ensure Redis connection errors don't block message processing.
packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts (1)
1-190: LGTM! Comprehensive deduplication test coverage.The test suite thoroughly validates publisher-level deduplication behavior:
- Verifies deduplication key storage in Redis
- Tests duplicate message detection with
skippedAsDuplicateflag- Confirms independent messages publish separately
- Validates that deduplication can be disabled
The test structure follows best practices with proper setup/teardown, DI container usage, and handlerSpy-based assertions.
packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts (4)
41-109: Comprehensive configuration test coverage.The tests thoroughly validate the subscriptionRetryOptions configuration behavior:
- Default values are correctly applied
- Custom values override defaults
- Partial configurations merge with defaults
The use of
@ts-expect-errorto access private fields is acceptable for testing purposes.
154-182: Excellent verification of cleanup behavior.The test properly verifies that
close()removes all event listeners, preventing reconnection attempts after shutdown. Checking listener counts is a robust way to ensure cleanup happens correctly.
292-354: Solid integration test for runtime reconnection.The test effectively validates the auto-reconnection flow:
- Consumer processes messages successfully
- Subscription is deleted mid-operation
- Consumer detects the error and reconnects
- Consumer resumes processing
The 5-second wait (line 336) allows time for error detection and reinitialization. While fixed delays can be flaky, this is reasonable given the 30-second timeout and the eventual consistency nature of the scenario.
356-396: Good test for initialization retry with eventual consistency.The test simulates a real-world scenario where a subscription doesn't exist initially (e.g., Terraform propagation delay) and is created asynchronously. The consumer's retry logic correctly waits and succeeds once the subscription becomes available.
The use of
globalThis.setTimeout(line 382) is appropriate for scheduling the delayed subscription creation outside the test's control flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (2)
396-405: Unhandled promise rejection in error handler.The
throwat line 404 within the.catch()block will cause an unhandled promise rejection since this is called from a synchronous event handler context. The error is logged but the throw has no catch handler.Remove the throw and handle the failure state synchronously:
this.reinitializeWithRetry(1).catch((reinitError) => { this.logger.error({ msg: 'Failed to reinitialize subscription after retryable error', subscriptionName: this.subscriptionName, topicName: this.topicName, error: reinitError, }) - // Re-throw to surface the error - consumer is now in a failed state - throw reinitError + // Consumer is now in a failed state - surface the error through the standard path + this.handleError(reinitError) })
459-552: Race condition:isReinitializingreset allows concurrent reinitializations.The flag is set to
falseat line 545 before the recursive call at line 548, and again in thefinallyblock at line 550. This creates a window where concurrent callers can bypass the guard at line 461 during the retry sequence.Replace the recursive approach with an iterative loop to maintain the
isReinitializinglock for the entire retry sequence:private async reinitializeWithRetry(attempt: number): Promise<void> { if (this.isReinitializing) { this.logger.debug({ msg: 'Reinitialization already in progress, skipping', subscriptionName: this.subscriptionName, }) return } this.isReinitializing = true try { + while (attempt <= this.subscriptionRetryOptions.maxRetries) { // Calculate delay with exponential backoff const delay = Math.min( this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), this.subscriptionRetryOptions.maxRetryDelayMs, ) this.logger.info({ msg: `Reinitialization attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, subscriptionName: this.subscriptionName, topicName: this.topicName, attempt, delayMs: delay, }) // Wait before retry await new Promise((resolve) => setTimeout(resolve, delay)) // Don't continue if we've been stopped during the wait if (!this.isConsuming) { this.logger.info({ msg: 'Consumer stopped during reinitialization wait, aborting', subscriptionName: this.subscriptionName, }) return } + try { // Close existing subscription to remove old event handlers if (this.subscription) { try { this.subscription.removeAllListeners() await this.subscription.close() } catch { // Ignore close errors - subscription may already be closed } } // Reinitialize await this.init() if (!this.subscription) { throw new Error('Subscription not initialized after init()') } // Wait for subscription to be ready await this.waitForSubscriptionReady() // Reattach event handlers this.setupSubscriptionEventHandlers() this.logger.info({ msg: 'Successfully reinitialized subscription', subscriptionName: this.subscriptionName, topicName: this.topicName, attempt, }) + return // Success - exit early + } catch (error) { this.logger.warn({ msg: `Reinitialization attempt ${attempt} failed, will retry`, subscriptionName: this.subscriptionName, topicName: this.topicName, attempt, error, }) + attempt++ + } + } - this.isReinitializing = false - - // Retry with incremented attempt count - await this.reinitializeWithRetry(attempt + 1) + // Exceeded max retries + const error = new Error( + `Failed to reinitialize subscription ${this.subscriptionName} after ${this.subscriptionRetryOptions.maxRetries} attempts`, + ) + this.handleError(error) + throw error } finally { this.isReinitializing = false } }
🧹 Nitpick comments (3)
packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts (2)
1-9: Consider setting prototype for saferinstanceofbehavior on custom ErrorExtending
Errorcan be fragile across transpilation targets; explicitly setting the prototype makesinstanceof SubscriptionDoesNotExistErrormore reliable (especially with ES5 targets or unusual runtimes).You could do:
export class SubscriptionDoesNotExistError extends Error { public readonly subscriptionName: string constructor(subscriptionName: string) { super(`Subscription ${subscriptionName} does not exist`) this.name = 'SubscriptionDoesNotExistError' this.subscriptionName = subscriptionName + + // Ensure proper prototype chain for instanceof checks across targets + Object.setPrototypeOf(this, new.target.prototype) } }
11-20: Tighten the type guard to avoid false positives and ensure shapeThe guard currently only checks
name, so any object with{ name: 'SubscriptionDoesNotExistError' }will be treated asSubscriptionDoesNotExistError, even if it lackssubscriptionName. That’s slightly unsound for a type guard.A simpler and safer option is to lean on
instanceof:-export function isSubscriptionDoesNotExistError( - error: unknown, -): error is SubscriptionDoesNotExistError { - return ( - typeof error === 'object' && - error !== null && - 'name' in error && - error.name === 'SubscriptionDoesNotExistError' - ) -} +export function isSubscriptionDoesNotExistError( + error: unknown, +): error is SubscriptionDoesNotExistError { + return error instanceof SubscriptionDoesNotExistError +}This guarantees the full shape (including
subscriptionName) without widening the accepted set of values.packages/gcp-pubsub/lib/utils/pubSubInitter.ts (1)
4-4: Typed subscription-missing error is a nice improvement; consider aligning other missing-resource pathsThe switch from a generic
ErrortoSubscriptionDoesNotExistErrorfor thelocatorConfig.subscriptionNamepath makes initialization failures more distinguishable and easier to handle in retry logic and error resolvers.For consistency, you might later introduce similar typed errors (e.g., for missing topics or DLQ topics) so callers can uniformly branch on specific failure modes instead of relying on message parsing in those other paths; not a blocker, just a potential follow-up for API polish.
Also applies to: 78-96
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts(1 hunks)packages/gcp-pubsub/lib/index.ts(1 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts(10 hunks)packages/gcp-pubsub/lib/utils/pubSubInitter.ts(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/gcp-pubsub/lib/utils/pubSubInitter.ts (1)
packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts (1)
SubscriptionDoesNotExistError(1-9)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (3)
packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts (1)
isSubscriptionDoesNotExistError(11-20)packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
PubSubMessage(4-4)packages/core/lib/index.ts (1)
DeduplicationRequesterEnum(17-17)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: general (20.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: kafka (22.x) / build
- GitHub Check: kafka (24.x) / build
🔇 Additional comments (11)
packages/gcp-pubsub/lib/index.ts (1)
1-3: Export wiring forSubscriptionDoesNotExistErrorlooks goodRe-exporting the new error from the package entry point keeps the public API consistent with other errors and makes typed handling ergonomic.
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (10)
21-21: LGTM: Import is correctly used in retry logic.The imported
isSubscriptionDoesNotExistErrortype guard is appropriately used ininitWithRetryto determine if initialization errors should be retried.
38-64: Well-documented retry logic for eventual consistency.The inclusion of
NOT_FOUNDandPERMISSION_DENIEDerror codes is well-justified for handling GCP's eventual consistency after Terraform deployments. The comprehensive documentation with references is excellent.
66-97: LGTM: Reasonable retry defaults and well-documented types.The default retry configuration (5 attempts, 1s base delay, 30s max delay) provides a good balance between resilience and avoiding excessive wait times. The exponential backoff formula is clearly documented.
141-146: LGTM: Clean API extension.The optional
subscriptionRetryOptionsfield allows users to customize retry behavior while providing sensible defaults.
189-189: LGTM: Clean initialization with proper defaults.The
subscriptionRetryOptionsis correctly stored asRequired<SubscriptionRetryOptions>to eliminate optional checks, and the constructor properly merges user configuration with defaults.Also applies to: 192-192, 213-223
253-271: LGTM: Start flow properly integrated with retry logic.The startup sequence correctly uses
initWithRetry()and extracts event handler setup into a separate method for reattachment after reinitialization.
329-361: LGTM: Event handlers properly extracted for reuse.Extracting event handler setup into a separate method enables clean reattachment after reinitialization. The method correctly guards against null subscription.
572-580: LGTM: Prevents unintended reinitialization during shutdown.Removing listeners before closing the subscription prevents the
closeevent handler from triggering reinitialization during intentional shutdown. The order (setisConsuming = false, remove listeners, close) is correct.
733-733: LGTM: Proper deduplication on successful processing.The
deduplicateMessagecall is correctly placed after successful message processing and uses the appropriateDeduplicationRequesterEnum.Consumerrequester type.
281-327: No action needed. The string matching forNOT_FOUNDandPERMISSION_DENIEDininitWithRetryis appropriate and necessary.These errors originate from GCP SDK's gRPC layer during initialization, where errors may not have the structured
error.codeproperty available (which is used inhandleSubscriptionErrorfor operational-time errors). String matching is the correct approach here, not a code smell. GCP maintains error message stability as part of its SDK contract, and the codebase is well-documented explaining why these specific error types are retryable after Terraform deployments due to eventual consistency (see lines 39-56 and theRETRYABLE_SUBSCRIPTION_ERROR_CODESconstant).Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (3)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (3)
383-408: Consider surfacing reinitialization failures fromclosehandler throughhandleError.When
handleSubscriptionClosetriggers a reinitialization, failures are logged at error level but not propagated via the normal error-handling channel. For consistency with other failure paths, you may want to also callthis.handleError(reinitError as Error)in the.catchblock (taking care not to rethrow, to avoid unhandled rejections):this.reinitializeWithRetry(1).catch((reinitError) => { this.logger.error({ msg: 'Failed to reinitialize subscription after unexpected close', subscriptionName: this.subscriptionName, topicName: this.topicName, error: reinitError, }) + // Optionally surface via the standard error path + // void this.handleError(reinitError as Error) })This would make it easier for consumers to observe and react to fatal subscription failures.
421-514: Reinitialization retry loop is functionally correct; consider iterative form and early abort on non‑transient errors.The
reinitializeWithRetrylogic correctly:
- Prevents concurrent reinitializations via
isReinitializing.- Caps attempts with
subscriptionRetryOptions.maxRetries.- Applies exponential backoff and aborts if the consumer is stopped during the wait.
- Recreates the subscription and reattaches handlers.
Two non‑blocking refinements you might consider:
Iterative loop instead of recursion. The current recursion is bounded, but an explicit loop (e.g.,
for (let attempt = startAttempt; attempt <= maxRetries; attempt++)) would simplify theisReinitializinglifecycle and remove the need to reset the flag both incatchandfinally, making the control flow easier to reason about.Reuse retryability checks. Unlike
initWithRetry, this method treats all errors frominit()as retryable up tomaxRetries. If there are clearly non‑transient failure modes (e.g., invalid configuration), you might want to reuseisRetryableGrpcError/isSubscriptionDoesNotExistErrorhere too and bail out early for non‑retryable errors to avoid needless retries.
341-372: Avoid throwing inside.catchcallback to prevent unhandled promise rejections.In
handleSubscriptionError,this.reinitializeWithRetry(1).catch(...)is called from a synchronous event handler, and the.catchcallback currently re‑throwsreinitError. That new thrown error rejects the returned Promise from.catch, but since that Promise isn’t awaited or chained, it will surface as an unhandled rejection at runtime.Instead, log and optionally surface via your normal error path without re‑throwing here. For example:
// Trigger reinitialization with retry this.reinitializeWithRetry(1).catch((reinitError) => { this.logger.error({ msg: 'Failed to reinitialize subscription after retryable error', subscriptionName: this.subscriptionName, topicName: this.topicName, error: reinitError, }) - // Re-throw to surface the error - consumer is now in a failed state - throw reinitError + // Consumer is now in a failed state - error has been logged + // Optionally surface via the standard error path: + // void this.handleError(reinitError as Error) })
🧹 Nitpick comments (3)
packages/gcp-pubsub/lib/errors/grpcErrors.ts (1)
22-39: Centralized gRPC error helpers look good; optional type tightening for the retryable set.The definition of retryable codes and the
GrpcError/type guards is clean and matches typical@grpc/grpc-jserror shape. If you want to tighten types a bit, you could declare the set asSet<RetryableGrpcStatusCode>instead ofSet<number>and cast on insertion, but that's a minor nicety, not required.packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (2)
36-43: Retry options wiring is sound; minor doc/semantics clarifications.The
SubscriptionRetryOptionsshape, default constant, andsubscriptionRetryOptionsconstruction look good and give a clear, bounded retry configuration. Two small clarity points you might consider:
- The JSDoc says “Maximum number of retry attempts before giving up”, but both
initWithRetryandreinitializeWithRetrytreatmaxRetriesas “maximum total attempts (including the first)” rather than “extra retries”. If you intend “retries after the first attempt”, you’d need to adjust the comparison logic; otherwise, consider rephrasing the doc to “Maximum number of attempts (including the first)”.- The exponential backoff comment uses
2^attempt, whereas the implementation uses2^(attempt - 1)(so the first delay is exactlybaseRetryDelayMs). Updating the comment to match the actual formula would avoid confusion.Also applies to: 50-67, 111-117, 159-160, 183-193
302-320: Event handler setup is correct; consider reusingGrpcErrortype for consistency.Using a dedicated
setupSubscriptionEventHandlersand arrow functions to preservethisis good. For type consistency with the new helpers, you could optionally type the error parameter as your sharedGrpcError(orError & { code?: number } | GrpcError) and centralize narrowing there, but this is cosmetic.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
packages/gcp-pubsub/lib/errors/grpcErrors.ts(1 hunks)packages/gcp-pubsub/lib/index.ts(1 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts(10 hunks)packages/gcp-pubsub/package.json(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/gcp-pubsub/lib/index.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: general (22.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/core) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (24.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/core) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: kafka (24.x) / build
- GitHub Check: kafka (22.x) / build
🔇 Additional comments (3)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (3)
223-241: Initialization retry flow is robust and well-bounded.
start()delegating toinitWithRetry()and the logic ininitWithRetry(retry only forSubscriptionDoesNotExistError/retryable gRPC codes, with capped exponential backoff and structured logging) looks correct and aligns with the new retry config. The recursive depth is small and bounded bymaxRetries, so stack usage is fine here.Also applies to: 251-295
535-540: Listener removal before close is a good safeguard.Setting
isConsuming = falseand removing all listeners before closing the subscription cleanly prevents theclosehandler from kicking off a reinitialization during an intentional shutdown. This makes the lifecycle behavior much more predictable.
659-703: New deduplication mark on successful processing looks correct; confirm behavior when dedup is disabled.Marking the message as deduplicated on the success path via
deduplicateMessage(..., DeduplicationRequesterEnum.Consumer)beforeack()integrates well with the earlier duplication checks and lock acquisition/release.One thing to double‑check: if
enableConsumerDeduplicationis false or the message isn’t eligible for deduplication, ensure thatdeduplicateMessageis effectively a no‑op (or at least doesn’t throw) so this unconditional call can’t introduce failures in non‑deduplicated setups. If that’s already guaranteed by the base implementation, this change is solid as‑is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts(1 hunks)packages/gcp-pubsub/lib/errors/grpcErrors.ts(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: kafka (22.x) / build
- GitHub Check: kafka (24.x) / build
🔇 Additional comments (8)
packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts (1)
1-9: LGTM! Clean error class implementation.The custom error class is well-structured with a descriptive message, proper name assignment, and a typed property for the subscription name.
packages/gcp-pubsub/lib/errors/grpcErrors.ts (7)
3-29: Excellent documentation for eventual consistency handling.The comprehensive explanation of why PERMISSION_DENIED and NOT_FOUND are included in the retryable codes is valuable. The references to GCP documentation and GitHub issues provide strong justification for this design decision.
31-33: Good use of type extraction and Set optimization.The type extraction from the const array ensures type safety, and the Set provides O(1) lookup performance for retry checks.
50-59: LGTM! Duck typing approach is appropriate here.The duck-typing approach correctly avoids fragile instanceof checks and validates the essential properties (numeric
codeand stringmessage) needed to identify gRPC errors.
67-69: LGTM! Clean composition of type guards.The function correctly composes the
isGrpcErrorcheck with the retryable codes lookup.
77-82: LGTM! Simple and correct helper.The function provides a clean way to extract gRPC status codes with proper undefined handling.
84-85: LGTM! Convenient re-export.Re-exporting GrpcStatus simplifies imports for consumers of this module.
1-1: @grpc/grpc-js dependency version and status export are correctly specified.The version constraint "^1.14.2" in packages/gcp-pubsub/package.json is properly specified, and the
statusexport is available in this version and all compatible versions. The import statement is valid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
packages/gcp-pubsub/README.md (1)
1564-1567: Language identifier already present.The past review flagged a missing language identifier on the code block, but the current code on line 1565 correctly includes
text. No action needed.
🧹 Nitpick comments (1)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (1)
249-249: Consider makinginitWithRetryiterative rather than recursiveThe logic in
initWithRetryis correct, but becausemaxRetriesis externally configurable, using recursion for retries can, in edge cases, lead to deeper stacks than necessary and is a bit inconsistent with the iterative approach inreinitializeWithRetry. A small loop refactor would make this more robust and symmetric with the reinit path:- private async initWithRetry(attempt = 1): Promise<void> { - try { - await this.init() - } catch (error) { - // Check if we should retry - if (attempt >= this.subscriptionRetryOptions.maxRetries) { - this.logger.error({ - msg: `Failed to initialize subscription after ${attempt} attempts`, - subscriptionName: - this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, - topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, - error, - }) - throw error - } - - // Check if error is retryable using gRPC status codes - const isRetryableSubscriptionError = isSubscriptionDoesNotExistError(error) - const isRetryableGrpc = isRetryableGrpcError(error) - - if (!isRetryableSubscriptionError && !isRetryableGrpc) { - throw error - } - - // Calculate delay with exponential backoff - const delay = Math.min( - this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), - this.subscriptionRetryOptions.maxRetryDelayMs, - ) - - this.logger.warn({ - msg: `Retryable error during initialization, attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, - subscriptionName: - this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, - topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, - errorCode: isRetryableGrpc ? error.code : undefined, - errorMessage: - isRetryableGrpc || isRetryableSubscriptionError ? error.message : String(error), - attempt, - delayMs: delay, - }) - - await new Promise((resolve) => setTimeout(resolve, delay)) - await this.initWithRetry(attempt + 1) - } - } + private async initWithRetry(startAttempt = 1): Promise<void> { + for ( + let attempt = startAttempt; + attempt <= this.subscriptionRetryOptions.maxRetries; + attempt++ + ) { + try { + await this.init() + return + } catch (error) { + // Check if we should retry + if (attempt >= this.subscriptionRetryOptions.maxRetries) { + this.logger.error({ + msg: `Failed to initialize subscription after ${attempt} attempts`, + subscriptionName: + this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, + topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, + error, + }) + throw error + } + + // Check if error is retryable using gRPC status codes + const isRetryableSubscriptionError = isSubscriptionDoesNotExistError(error) + const isRetryableGrpc = isRetryableGrpcError(error) + + if (!isRetryableSubscriptionError && !isRetryableGrpc) { + throw error + } + + // Calculate delay with exponential backoff + const delay = Math.min( + this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), + this.subscriptionRetryOptions.maxRetryDelayMs, + ) + + this.logger.warn({ + msg: `Retryable error during initialization, attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, + subscriptionName: + this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, + topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, + errorCode: isRetryableGrpc ? (error as { code?: number }).code : undefined, + errorMessage: + isRetryableGrpc || isRetryableSubscriptionError + ? (error as { message: string }).message + : String(error), + attempt, + delayMs: delay, + }) + + await new Promise((resolve) => setTimeout(resolve, delay)) + } + } + }Also applies to: 271-316
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/gcp-pubsub/README.md(3 hunks)packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts(1 hunks)packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts(11 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts
🧰 Additional context used
🧬 Code graph analysis (1)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (3)
packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts (1)
isSubscriptionDoesNotExistError(11-24)packages/gcp-pubsub/lib/errors/grpcErrors.ts (1)
isRetryableGrpcError(67-69)packages/gcp-pubsub/lib/types/MessageTypes.ts (1)
PubSubMessage(4-4)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: general (22.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (20.x, @message-queue-toolkit/metrics) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcs-payload-store) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (22.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sns) / build
- GitHub Check: general (24.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/outbox-core) / build
- GitHub Check: general (20.x, @message-queue-toolkit/redis-message-deduplication-store) / build
- GitHub Check: general (22.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (20.x, @message-queue-toolkit/sqs) / build
- GitHub Check: general (22.x, @message-queue-toolkit/s3-payload-store) / build
- GitHub Check: general (24.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: general (20.x, @message-queue-toolkit/amqp) / build
- GitHub Check: general (22.x, @message-queue-toolkit/gcp-pubsub) / build
- GitHub Check: kafka (22.x) / build
- GitHub Check: kafka (24.x) / build
🔇 Additional comments (4)
packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts (4)
36-67: Subscription retry configuration & wiring look solidThe
DEFAULT_SUBSCRIPTION_RETRY_OPTIONS,SubscriptionRetryOptionstype, and the constructor wiring into aRequired<SubscriptionRetryOptions>field are consistent and easy to reason about. The options surface onPubSubConsumerOptionsis also clear and well-documented. No changes needed here.Also applies to: 111-117, 159-160, 203-213
318-350: Event handler extraction improves reinit storyPulling subscription event wiring into
setupSubscriptionEventHandlers()and reusing it from bothstart()andreinitializeWithRetry()makes the reinitialization path much cleaner and less error-prone. Guarding on!this.subscriptionat the top is also a good safety net.
557-562: Listener removal before close is a good shutdown safeguardRemoving all listeners before calling
subscription.close()is a nice touch to ensurecloseevents during intentional shutdown do not accidentally trigger reinitialization logic. This matches theisConsumingflag semantics cleanly.
717-719: Deduplication on successful consumption is wired correctlyCalling
deduplicateMessage(validatedMessage, DeduplicationRequesterEnum.Consumer)on the success path before emittingconsumedand acking ensures that future deliveries of the same message can be recognized and skipped. The ordering with respect tomessage.ack()and lock release looks correct.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests
Chores
✏️ Tip: You can customize this high-level summary in your review settings.