From 7bcafaa1b69be37dd8ef0506d95c93f8aa90753e Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 12:09:54 +0200 Subject: [PATCH 01/11] Improve error handling logic, extra tests, bugfixes for deduplication logic --- packages/gcp-pubsub/README.md | 69 ++++ packages/gcp-pubsub/lib/index.ts | 1 - .../lib/pubsub/AbstractPubSubConsumer.ts | 297 +++++++++++++++++- packages/gcp-pubsub/lib/utils/pubSubUtils.ts | 18 -- ...ubPermissionConsumer.deduplication.spec.ts | 268 ++++++++++++++++ ...rmissionConsumer.payloadOffloading.spec.ts | 89 +++++- ...rmissionConsumer.subscriptionRetry.spec.ts | 286 +++++++++++++++++ .../consumers/PubSubPermissionConsumer.ts | 2 + packages/gcp-pubsub/vitest.config.ts | 2 +- 9 files changed, 1008 insertions(+), 24 deletions(-) delete mode 100644 packages/gcp-pubsub/lib/utils/pubSubUtils.ts create mode 100644 packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts create mode 100644 packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts diff --git a/packages/gcp-pubsub/README.md b/packages/gcp-pubsub/README.md index 644a4f2a..61104bbb 100644 --- a/packages/gcp-pubsub/README.md +++ b/packages/gcp-pubsub/README.md @@ -1490,6 +1490,75 @@ When a message cannot be processed (invalid format, schema validation failure, h **Best Practice:** Always configure a DLQ in production to capture and analyze failed messages. +### Subscription-Level Error Handling + +The consumer provides a higher-level error recovery mechanism that complements the SDK's built-in gRPC retry logic. While the `@google-cloud/pubsub` SDK automatically retries some transient errors at the gRPC level, there are scenarios where the SDK does not recover automatically: + +1. **Eventual consistency errors** (`NOT_FOUND`, `PERMISSION_DENIED`) are not in the SDK's default retry codes +2. **Subscription stream disconnections** may not automatically reconnect in all cases +3. **Infrastructure changes** (e.g., after Terraform deployments) may require full subscription reinitialization + +**Retryable Error Codes:** + +*Errors the consumer handles via reinitialization:* +- `DEADLINE_EXCEEDED` (4): Request timeout that SDK retry couldn't resolve +- `NOT_FOUND` (5): Subscription may not be propagated yet (eventual consistency) +- `PERMISSION_DENIED` (7): IAM permissions may not be propagated yet (eventual consistency) +- `RESOURCE_EXHAUSTED` (8): Quota exceeded, retry with backoff +- `INTERNAL` (13): Server error, should be transient +- `UNAVAILABLE` (14): Service temporarily unable to process + +When these errors reach the `subscription.on('error')` handler (meaning SDK's built-in retry couldn't resolve them), the consumer will: +1. Log a warning with error details +2. Close the existing subscription and remove event listeners +3. Reinitialize the subscription with exponential backoff +4. Reattach event handlers and continue consuming + +**Why `NOT_FOUND` and `PERMISSION_DENIED`?** + +After Terraform deployments, GCP resources and IAM permissions can take several minutes to propagate across GCP's distributed infrastructure. During this window, the subscription may report these errors even though the configuration is correct. The consumer retries with exponential backoff to handle this eventual consistency. + +**Note:** For most transient errors, the SDK's built-in retry will handle recovery automatically. The consumer's reinitialization logic is a safety net for cases where SDK retry is exhausted or not applicable. + +**Configuration:** + +```typescript +class MyConsumer extends AbstractPubSubConsumer { + constructor(dependencies: PubSubConsumerDependencies) { + super( + dependencies, + { + // ... other options ... + + // Optional: Configure subscription retry behavior + subscriptionRetryOptions: { + maxRetries: 5, // Maximum retry attempts (default: 5) + baseRetryDelayMs: 1000, // Base delay for exponential backoff (default: 1000ms) + maxRetryDelayMs: 30000, // Maximum delay between retries (default: 30000ms) + }, + }, + executionContext, + ) + } +} +``` + +**Exponential Backoff Formula:** +``` +delay = min(baseRetryDelayMs * 2^(attempt-1), maxRetryDelayMs) +``` + +With default settings, delays are: 1s, 2s, 4s, 8s, 16s (capped at 30s). + +**Unexpected Subscription Closure:** + +The consumer also handles unexpected subscription closures (e.g., network issues, GCP service restarts). If the subscription closes while the consumer is still supposed to be consuming, it will automatically attempt reinitialization. + +**References:** +- [GCP Pub/Sub Error Codes](https://cloud.google.com/pubsub/docs/reference/error-codes) +- [GCP Pub/Sub Troubleshooting](https://cloud.google.com/pubsub/docs/troubleshooting) +- [Node.js Pub/Sub Subscription Reconnection Issues](https://github.com/googleapis/nodejs-pubsub/issues/979) + ### Error Resolver ```typescript diff --git a/packages/gcp-pubsub/lib/index.ts b/packages/gcp-pubsub/lib/index.ts index d7ed559d..70c25d88 100644 --- a/packages/gcp-pubsub/lib/index.ts +++ b/packages/gcp-pubsub/lib/index.ts @@ -9,4 +9,3 @@ export * from './pubsub/PubSubPublisherManager.ts' export * from './schemas/pubSubSchemas.ts' export * from './types/MessageTypes.ts' export * from './utils/pubSubInitter.ts' -export * from './utils/pubSubUtils.ts' diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 222804e7..9a2fdce2 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -34,6 +34,67 @@ const _ABORT_EARLY_EITHER: Either<'abort', never> = { } const DEFAULT_MAX_RETRY_DURATION = 4 * 24 * 60 * 60 // 4 days in seconds +/** + * gRPC status codes for which subscription operations should be retried. + * + * Includes both: + * 1. GCP-documented retryable errors (DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, INTERNAL, UNAVAILABLE) + * 2. Eventual consistency errors common after Terraform deployments (NOT_FOUND, PERMISSION_DENIED) + * + * **Why PERMISSION_DENIED is included:** + * After Terraform deployments, IAM permissions can take several minutes to propagate across + * GCP's distributed infrastructure. During this window, the subscription may report + * PERMISSION_DENIED even though permissions are correctly configured. + * + * **Why NOT_FOUND is included:** + * Similar to PERMISSION_DENIED, newly created subscriptions may not be immediately visible + * across all GCP endpoints due to eventual consistency. + * + * @see https://cloud.google.com/pubsub/docs/reference/error-codes + * @see https://github.com/googleapis/nodejs-pubsub/issues/979 + */ +const RETRYABLE_SUBSCRIPTION_ERROR_CODES = [ + 4, // DEADLINE_EXCEEDED - Request timeout, may succeed on retry + 5, // NOT_FOUND - Subscription may not be propagated yet (eventual consistency) + 7, // PERMISSION_DENIED - IAM permissions may not be propagated yet (eventual consistency) + 8, // RESOURCE_EXHAUSTED - Quota exceeded, retry with backoff + 13, // INTERNAL - Server error, should be transient + 14, // UNAVAILABLE - Service temporarily unable to process +] as const + +/** + * Default configuration for subscription error retry behavior. + */ +const DEFAULT_SUBSCRIPTION_RETRY_OPTIONS = { + maxRetries: 5, + baseRetryDelayMs: 1000, + maxRetryDelayMs: 30000, +} as const + +/** + * Configuration options for subscription-level error retry behavior. + * This handles transient errors that occur at the subscription level + * (e.g., NOT_FOUND, PERMISSION_DENIED after Terraform deployments). + */ +export type SubscriptionRetryOptions = { + /** + * Maximum number of retry attempts before giving up. + * @default 5 + */ + maxRetries?: number + /** + * Base delay in milliseconds for exponential backoff. + * Actual delay = min(baseRetryDelayMs * 2^attempt, maxRetryDelayMs) + * @default 1000 + */ + baseRetryDelayMs?: number + /** + * Maximum delay in milliseconds between retries. + * @default 30000 + */ + maxRetryDelayMs?: number +} + export type PubSubDeadLetterQueueOptions = { deadLetterPolicy: { maxDeliveryAttempts: number @@ -76,6 +137,12 @@ export type PubSubConsumerOptions< maxMilliseconds?: number } } + /** + * Configuration for subscription-level error retry behavior. + * Handles transient errors like NOT_FOUND and PERMISSION_DENIED + * that can occur after Terraform deployments due to GCP's eventual consistency. + */ + subscriptionRetryOptions?: SubscriptionRetryOptions } export abstract class AbstractPubSubConsumer< @@ -118,8 +185,10 @@ export abstract class AbstractPubSubConsumer< > private readonly deadLetterQueueOptions?: PubSubDeadLetterQueueOptions private readonly isDeduplicationEnabled: boolean + private readonly subscriptionRetryOptions: Required private maxRetryDuration: number private isConsuming = false + private isReinitializing = false protected readonly errorResolver: ErrorResolver protected readonly executionContext: ExecutionContext @@ -140,6 +209,17 @@ export abstract class AbstractPubSubConsumer< this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION this.executionContext = executionContext this.isDeduplicationEnabled = !!options.enableConsumerDeduplication + this.subscriptionRetryOptions = { + maxRetries: + options.subscriptionRetryOptions?.maxRetries ?? + DEFAULT_SUBSCRIPTION_RETRY_OPTIONS.maxRetries, + baseRetryDelayMs: + options.subscriptionRetryOptions?.baseRetryDelayMs ?? + DEFAULT_SUBSCRIPTION_RETRY_OPTIONS.baseRetryDelayMs, + maxRetryDelayMs: + options.subscriptionRetryOptions?.maxRetryDelayMs ?? + DEFAULT_SUBSCRIPTION_RETRY_OPTIONS.maxRetryDelayMs, + } this._messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options) this.handlerContainer = new HandlerContainer({ @@ -186,14 +266,31 @@ export abstract class AbstractPubSubConsumer< this.isConsuming = true + this.setupSubscriptionEventHandlers() + } + + /** + * Sets up event handlers for the subscription. + * Extracted to allow reattachment after reinitialization. + */ + private setupSubscriptionEventHandlers(): void { + if (!this.subscription) { + return + } + // Configure message handler this.subscription.on('message', async (message: PubSubMessage) => { await this.handleMessage(message) }) - // Configure error handler - this.subscription.on('error', (error) => { - this.handleError(error) + // Configure error handler with retry logic for transient errors + this.subscription.on('error', (error: Error & { code?: number }) => { + this.handleSubscriptionError(error) + }) + + // Configure close handler to detect unexpected disconnections + this.subscription.on('close', () => { + this.handleSubscriptionClose() }) // Configure flow control if provided @@ -206,6 +303,197 @@ export abstract class AbstractPubSubConsumer< } } + /** + * Handles subscription-level errors. + * + * For retryable errors (NOT_FOUND, PERMISSION_DENIED), attempts to reinitialize + * the subscription with exponential backoff. These errors commonly occur after + * Terraform deployments due to GCP's eventual consistency. + * + * @see https://cloud.google.com/pubsub/docs/reference/error-codes + */ + private handleSubscriptionError(error: Error & { code?: number }): void { + // Don't handle errors during shutdown + if (!this.isConsuming) { + return + } + + const errorCode = error.code + + // Check if this is a retryable subscription error + if ( + errorCode !== undefined && + RETRYABLE_SUBSCRIPTION_ERROR_CODES.includes( + errorCode as (typeof RETRYABLE_SUBSCRIPTION_ERROR_CODES)[number], + ) + ) { + this.logger.warn({ + msg: 'Retryable subscription error occurred, attempting to reinitialize', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + errorCode, + errorMessage: error.message, + }) + + // Trigger reinitialization with retry + this.reinitializeWithRetry(1).catch((reinitError) => { + this.logger.error({ + msg: 'Failed to reinitialize subscription after retryable error', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + error: reinitError, + }) + // Re-throw to surface the error - consumer is now in a failed state + throw reinitError + }) + } else { + // Non-retryable error - log and report + this.handleError(error) + } + } + + /** + * Handles unexpected subscription close events. + * + * If the subscription closes while we're still supposed to be consuming, + * attempts to reinitialize. This can happen due to: + * - Network issues + * - GCP service restarts + * - Subscription deletion/recreation + */ + private handleSubscriptionClose(): void { + this.logger.info({ + msg: 'PubSub subscription closed', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + isConsuming: this.isConsuming, + }) + + // If we're still supposed to be consuming, try to reinitialize + if (this.isConsuming && !this.isReinitializing) { + this.logger.warn({ + msg: 'Subscription closed unexpectedly while consuming, attempting to reinitialize', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + }) + + this.reinitializeWithRetry(1).catch((reinitError) => { + this.logger.error({ + msg: 'Failed to reinitialize subscription after unexpected close', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + error: reinitError, + }) + }) + } + } + + /** + * Reinitializes the subscription with exponential backoff retry. + * + * This method: + * 1. Closes the existing subscription (if any) + * 2. Waits with exponential backoff + * 3. Reinitializes the subscription + * 4. Reattaches event handlers + * + * @param attempt - Current retry attempt number (1-based) + */ + private async reinitializeWithRetry(attempt: number): Promise { + // Prevent concurrent reinitializations + if (this.isReinitializing) { + this.logger.debug({ + msg: 'Reinitialization already in progress, skipping', + subscriptionName: this.subscriptionName, + }) + return + } + + // Check if we've exceeded max retries + if (attempt > this.subscriptionRetryOptions.maxRetries) { + const error = new Error( + `Failed to reinitialize subscription ${this.subscriptionName} after ${this.subscriptionRetryOptions.maxRetries} attempts`, + ) + this.handleError(error) + throw error + } + + this.isReinitializing = true + + try { + // Calculate delay with exponential backoff + const delay = Math.min( + this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), + this.subscriptionRetryOptions.maxRetryDelayMs, + ) + + this.logger.info({ + msg: `Reinitialization attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + delayMs: delay, + }) + + // Wait before retry + await new Promise((resolve) => setTimeout(resolve, delay)) + + // Don't continue if we've been stopped during the wait + if (!this.isConsuming) { + this.logger.info({ + msg: 'Consumer stopped during reinitialization wait, aborting', + subscriptionName: this.subscriptionName, + }) + return + } + + // Close existing subscription to remove old event handlers + if (this.subscription) { + try { + this.subscription.removeAllListeners() + await this.subscription.close() + } catch { + // Ignore close errors - subscription may already be closed + } + } + + // Reinitialize + await this.init() + + if (!this.subscription) { + throw new Error('Subscription not initialized after init()') + } + + // Wait for subscription to be ready + await this.waitForSubscriptionReady() + + // Reattach event handlers + this.setupSubscriptionEventHandlers() + + this.logger.info({ + msg: 'Successfully reinitialized subscription', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + }) + } catch (error) { + this.logger.warn({ + msg: `Reinitialization attempt ${attempt} failed, will retry`, + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + error, + }) + + this.isReinitializing = false + + // Retry with incremented attempt count + await this.reinitializeWithRetry(attempt + 1) + } finally { + this.isReinitializing = false + } + } + private async waitForSubscriptionReady(maxAttempts = 100, delayMs = 20): Promise { if (!this.subscription) { throw new Error('Subscription not initialized') @@ -227,6 +515,8 @@ export abstract class AbstractPubSubConsumer< public override async close(): Promise { this.isConsuming = false if (this.subscription) { + // Remove listeners first to prevent close handler from triggering reinitialization + this.subscription.removeAllListeners() await this.subscription.close() } await super.close() @@ -383,6 +673,7 @@ export abstract class AbstractPubSubConsumer< } // Success + await this.deduplicateMessage(validatedMessage, DeduplicationRequesterEnum.Consumer) this.handleMessageProcessed({ message: validatedMessage, processingResult: { status: 'consumed' }, diff --git a/packages/gcp-pubsub/lib/utils/pubSubUtils.ts b/packages/gcp-pubsub/lib/utils/pubSubUtils.ts deleted file mode 100644 index 99840ad9..00000000 --- a/packages/gcp-pubsub/lib/utils/pubSubUtils.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type { PubSub } from '@google-cloud/pubsub' - -export type PubSubConfig = { - projectId: string - emulatorHost?: string -} - -export function createPubSubClient(config: PubSubConfig): PubSub { - const { PubSub } = require('@google-cloud/pubsub') - - if (config.emulatorHost) { - process.env.PUBSUB_EMULATOR_HOST = config.emulatorHost - } - - return new PubSub({ - projectId: config.projectId, - }) -} diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts new file mode 100644 index 00000000..fd0d15e7 --- /dev/null +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts @@ -0,0 +1,268 @@ +import { randomUUID } from 'node:crypto' +import type { PubSub } from '@google-cloud/pubsub' +import { + AcquireLockTimeoutError, + type MessageDeduplicationConfig, +} from '@message-queue-toolkit/core' +import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store' +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import type { Redis } from 'ioredis' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { PubSubPermissionPublisher } from '../publishers/PubSubPermissionPublisher.ts' +import { deletePubSubTopicAndSubscription } from '../utils/cleanupPubSub.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { PubSubPermissionConsumer } from './PubSubPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas.ts' + +describe('PubSubPermissionConsumer - Deduplication', () => { + const TOPIC_NAME = 'user_permissions_dedup_test' + const SUBSCRIPTION_NAME = 'user_permissions_dedup_test_sub' + + let diContainer: AwilixContainer + let pubSubClient: PubSub + let redis: Redis + let messageDeduplicationStore: RedisMessageDeduplicationStore + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + redis = diContainer.cradle.redis + messageDeduplicationStore = new RedisMessageDeduplicationStore({ + redis, + }) + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + async function cleanRedis() { + const keys = await redis.keys('*consumer*') + if (keys.length > 0) { + await redis.del(...keys) + } + const mutexKeys = await redis.keys('*mutex*') + if (mutexKeys.length > 0) { + await redis.del(...mutexKeys) + } + } + + describe('consumer deduplication', () => { + let consumer: PubSubPermissionConsumer + let publisher: PubSubPermissionPublisher + let messageDeduplicationConfig: MessageDeduplicationConfig + + beforeEach(async () => { + messageDeduplicationConfig = { + deduplicationStore: messageDeduplicationStore, + } + + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + enableConsumerDeduplication: true, + messageDeduplicationConfig, + // Use 'id' field as deduplication ID since our schema doesn't have deduplicationId + messageDeduplicationIdField: 'id', + }) + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + await cleanRedis() + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await consumer.close() + await publisher.close() + await cleanRedis() + }) + + it('processes message normally when deduplication is enabled', async () => { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: randomUUID(), + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + }) + + it('skips duplicate message when already processed', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + // First publish + await publisher.publish(message) + + // Message is successfully processed during the first consumption + const firstConsumptionResult = await consumer.handlerSpy.waitForMessageWithId(messageId) + expect(firstConsumptionResult.processingResult).toEqual({ status: 'consumed' }) + expect(consumer.addCounter).toBe(1) + + // Clear the spy, so we can check subsequent call + consumer.handlerSpy.clear() + + // Publish again - should be skipped as duplicate + await publisher.publish(message) + + // Message is not processed due to deduplication + const secondConsumptionResult = await consumer.handlerSpy.waitForMessageWithId(messageId) + expect(secondConsumptionResult.processingResult).toEqual({ + status: 'consumed', + skippedAsDuplicate: true, + }) + + // Handler should only have been called once + expect(consumer.addCounter).toBe(1) + }) + }) + + describe('lock acquisition', () => { + let consumer: PubSubPermissionConsumer + let publisher: PubSubPermissionPublisher + let messageDeduplicationConfig: MessageDeduplicationConfig + + beforeEach(async () => { + messageDeduplicationConfig = { + deduplicationStore: messageDeduplicationStore, + } + + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + enableConsumerDeduplication: true, + messageDeduplicationConfig, + // Use 'id' field as deduplication ID since our schema doesn't have deduplicationId + messageDeduplicationIdField: 'id', + }) + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + await cleanRedis() + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + vi.restoreAllMocks() + await consumer.close() + await publisher.close() + await cleanRedis() + }) + + it('acquires and releases lock during message processing', { timeout: 15000 }, async () => { + const messageId = randomUUID() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + + // After successful processing, the lock should be released + // but the deduplication key should still exist + const deduplicationKeyExists = await messageDeduplicationStore.keyExists( + `consumer:${messageId}`, + ) + expect(deduplicationKeyExists).toBe(true) + }) + + it('nacks message when lock acquisition times out', { timeout: 15000 }, async () => { + const messageId = randomUUID() + + // Mock acquireLock to simulate timeout (another consumer holds the lock) + // Only AcquireLockTimeoutError causes retry - regular errors are swallowed and message is processed + vi.spyOn(messageDeduplicationStore, 'acquireLock').mockResolvedValue({ + error: new AcquireLockTimeoutError('Lock acquisition timeout'), + }) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + // Wait a bit for processing attempts - message should be nacked and redelivered + await new Promise((resolve) => setTimeout(resolve, 2000)) + + // Handler should not have been called because lock couldn't be acquired + expect(consumer.addCounter).toBe(0) + + // Restore the mock so the message can be processed + vi.restoreAllMocks() + + // Now the message should be processed on redelivery + const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + }) + + it( + 'processes message when lock acquisition has non-timeout error', + { timeout: 15000 }, + async () => { + const messageId = randomUUID() + + // Mock acquireLock to simulate a non-timeout error (e.g., Redis connection error) + // Non-timeout errors are swallowed and message is processed normally + vi.spyOn(messageDeduplicationStore, 'acquireLock').mockResolvedValue({ + error: new Error('Redis connection error'), + }) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + // Message should be processed even though lock acquisition failed + const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + }, + ) + }) +}) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts index 1b5ea296..3905edf9 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts @@ -1,11 +1,12 @@ +import type { PubSub } from '@google-cloud/pubsub' import type { Storage } from '@google-cloud/storage' import type { PayloadStoreConfig } from '@message-queue-toolkit/core' import { GCSPayloadStore } from '@message-queue-toolkit/gcs-payload-store' import type { AwilixContainer } from 'awilix' import { asValue } from 'awilix' import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' - import { PUBSUB_MESSAGE_MAX_SIZE } from '../../lib/pubsub/AbstractPubSubService.ts' +import { OFFLOADED_PAYLOAD_SIZE_ATTRIBUTE } from '../../lib/utils/messageUtils.ts' import { PubSubPermissionPublisher } from '../publishers/PubSubPermissionPublisher.ts' import { deletePubSubTopicAndSubscription } from '../utils/cleanupPubSub.ts' import { assertBucket, emptyBucket } from '../utils/gcsUtils.ts' @@ -208,4 +209,90 @@ describe('PubSubPermissionConsumer - Payload Offloading', () => { }, ) }) + + describe('payload retrieval errors', () => { + const largeMessageSizeThreshold = PUBSUB_MESSAGE_MAX_SIZE + const gcsBucketName = 'test-bucket' + + let diContainer: AwilixContainer + let pubSubClient: PubSub + let gcsStorage: Storage + let payloadStoreConfig: PayloadStoreConfig + let consumer: PubSubPermissionConsumer + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + gcsStorage = diContainer.cradle.gcsStorage + pubSubClient = diContainer.cradle.pubSubClient + + await assertBucket(gcsStorage, gcsBucketName) + payloadStoreConfig = { + messageSizeThreshold: largeMessageSizeThreshold, + store: new GCSPayloadStore(diContainer.cradle, { bucketName: gcsBucketName }), + storeName: 'gcs', + } + }) + + beforeEach(async () => { + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: PubSubPermissionConsumer.TOPIC_NAME }, + subscription: { name: PubSubPermissionConsumer.SUBSCRIPTION_NAME }, + }, + payloadStoreConfig, + }) + + await deletePubSubTopicAndSubscription( + pubSubClient, + PubSubPermissionConsumer.TOPIC_NAME, + PubSubPermissionConsumer.SUBSCRIPTION_NAME, + ) + + await consumer.start() + }) + + afterEach(async () => { + await consumer.close() + }) + + afterAll(async () => { + await emptyBucket(gcsStorage, gcsBucketName) + + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('handles error when offloaded payload cannot be retrieved', { timeout: 10000 }, async () => { + // Create a message that looks like it has offloaded payload but the payload doesn't exist + const topic = pubSubClient.topic(PubSubPermissionConsumer.TOPIC_NAME) + + const messageWithFakeOffload = { + id: 'fake-offload-1', + messageType: 'add', + timestamp: new Date().toISOString(), + // Reference to a non-existent GCS object + _payloadKey: 'non-existent-key-12345', + } + + // Publish with the offload attribute to trigger retrieval + await topic.publishMessage({ + data: Buffer.from(JSON.stringify(messageWithFakeOffload)), + attributes: { + [OFFLOADED_PAYLOAD_SIZE_ATTRIBUTE]: '12345', + }, + }) + + // Wait for the error to be tracked + const spyResult = await consumer.handlerSpy.waitForMessage({ id: 'fake-offload-1' }, 'error') + + expect(spyResult.processingResult.status).toBe('error') + // @ts-expect-error field exists + expect(spyResult.processingResult.errorReason).toBe('invalidMessage') + expect(consumer.addCounter).toBe(0) + }) + }) }) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts new file mode 100644 index 00000000..6f2e90f2 --- /dev/null +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts @@ -0,0 +1,286 @@ +import type { PubSub } from '@google-cloud/pubsub' +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { PubSubPermissionPublisher } from '../publishers/PubSubPermissionPublisher.ts' +import { deletePubSubTopicAndSubscription } from '../utils/cleanupPubSub.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { PubSubPermissionConsumer } from './PubSubPermissionConsumer.ts' + +describe('PubSubPermissionConsumer - Subscription Retry', () => { + const TOPIC_NAME = 'user_permissions_retry_test' + const SUBSCRIPTION_NAME = 'user_permissions_retry_test_sub' + + // Unit tests that don't require infrastructure + describe('exponential backoff calculation', () => { + it('calculates correct delays for each attempt', () => { + // Test the exponential backoff formula: min(baseDelay * 2^(attempt-1), maxDelay) + const baseDelay = 1000 + const maxDelay = 30000 + + const calculateDelay = (attempt: number) => + Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay) + + expect(calculateDelay(1)).toBe(1000) // 1000 * 2^0 = 1000 + expect(calculateDelay(2)).toBe(2000) // 1000 * 2^1 = 2000 + expect(calculateDelay(3)).toBe(4000) // 1000 * 2^2 = 4000 + expect(calculateDelay(4)).toBe(8000) // 1000 * 2^3 = 8000 + expect(calculateDelay(5)).toBe(16000) // 1000 * 2^4 = 16000 + expect(calculateDelay(6)).toBe(30000) // 1000 * 2^5 = 32000, capped at 30000 + expect(calculateDelay(10)).toBe(30000) // Any higher attempt is capped + }) + }) + + // Integration tests that require PubSub emulator and Redis + describe('subscriptionRetryOptions configuration', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + afterEach(async () => { + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + }) + + it('uses default retry options when not specified', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + + // Access private field via type assertion for testing + // @ts-expect-error - accessing private field for testing + const retryOptions = consumer.subscriptionRetryOptions + + expect(retryOptions).toEqual({ + maxRetries: 5, + baseRetryDelayMs: 1000, + maxRetryDelayMs: 30000, + }) + + await consumer.close() + }) + + it('accepts custom retry options', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 10, + baseRetryDelayMs: 500, + maxRetryDelayMs: 60000, + }, + }) + + // @ts-expect-error - accessing private field for testing + const retryOptions = consumer.subscriptionRetryOptions + + expect(retryOptions).toEqual({ + maxRetries: 10, + baseRetryDelayMs: 500, + maxRetryDelayMs: 60000, + }) + + await consumer.close() + }) + + it('merges partial retry options with defaults', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 3, + // baseRetryDelayMs and maxRetryDelayMs should use defaults + }, + }) + + // @ts-expect-error - accessing private field for testing + const retryOptions = consumer.subscriptionRetryOptions + + expect(retryOptions).toEqual({ + maxRetries: 3, + baseRetryDelayMs: 1000, + maxRetryDelayMs: 30000, + }) + + await consumer.close() + }) + }) + + describe('close behavior', () => { + let diContainer: AwilixContainer + let consumer: PubSubPermissionConsumer + let publisher: PubSubPermissionPublisher + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + beforeEach(async () => { + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await consumer.close() + await publisher.close() + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('sets isConsuming to false on close', async () => { + // Verify consumer is running + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(true) + + await consumer.close() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(false) + }) + + it('removes all listeners on close to prevent reconnection', async () => { + // Get the subscription reference before close + // @ts-expect-error - accessing protected field for testing + const subscription = consumer.subscription + + // Verify subscription exists and has listeners + expect(subscription).toBeDefined() + expect(subscription?.listenerCount('message')).toBeGreaterThan(0) + expect(subscription?.listenerCount('error')).toBeGreaterThan(0) + expect(subscription?.listenerCount('close')).toBeGreaterThan(0) + + await consumer.close() + + // After close, listeners should be removed + expect(subscription?.listenerCount('message')).toBe(0) + expect(subscription?.listenerCount('error')).toBe(0) + expect(subscription?.listenerCount('close')).toBe(0) + }) + + it('nacks messages received during shutdown', { timeout: 10000 }, async () => { + // First verify consumer is working + const message1 = { + id: 'shutdown-test-1', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message1) + await consumer.handlerSpy.waitForMessageWithId('shutdown-test-1', 'consumed') + expect(consumer.addCounter).toBe(1) + + // Now close the consumer + await consumer.close() + + // Messages published after close should not be processed + // (they'll be nacked and redelivered when consumer restarts) + // This test just verifies the consumer doesn't crash when receiving messages during shutdown + }) + }) + + describe('start behavior', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + afterEach(async () => { + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + }) + + it('does not start multiple times if already consuming', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + + await consumer.start() + + // @ts-expect-error - accessing protected field for testing + const subscription = consumer.subscription + const initialListenerCount = subscription?.listenerCount('message') + + // Call start again - should be a no-op + await consumer.start() + + // Listener count should not have increased (no duplicate handlers) + expect(subscription?.listenerCount('message')).toBe(initialListenerCount) + + await consumer.close() + }) + + it('sets up all required event handlers', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + + await consumer.start() + + // @ts-expect-error - accessing protected field for testing + const subscription = consumer.subscription + + expect(subscription?.listenerCount('message')).toBe(1) + expect(subscription?.listenerCount('error')).toBe(1) + expect(subscription?.listenerCount('close')).toBe(1) + + await consumer.close() + }) + }) +}) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts index 68762907..0433dd47 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts @@ -31,6 +31,8 @@ type PubSubPermissionConsumerOptions = Pick< | 'payloadStoreConfig' | 'messageDeduplicationConfig' | 'enableConsumerDeduplication' + | 'subscriptionRetryOptions' + | 'messageDeduplicationIdField' > & { addPreHandlerBarrier?: ( message: SupportedMessages, diff --git a/packages/gcp-pubsub/vitest.config.ts b/packages/gcp-pubsub/vitest.config.ts index 703b0e9c..1ef50688 100644 --- a/packages/gcp-pubsub/vitest.config.ts +++ b/packages/gcp-pubsub/vitest.config.ts @@ -14,7 +14,7 @@ export default defineConfig({ coverage: { provider: 'v8', include: ['lib/**/*.ts'], - exclude: ['vitest.config.ts', 'lib/**/index.ts'], + exclude: ['vitest.config.ts', 'lib/**/index.ts', 'lib/fakes/FakeConsumerErrorResolver.ts'], thresholds: { lines: 80, functions: 90, From 6f9e7dc02669055247408b3d8eb8b077fb3aef25 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 12:27:59 +0200 Subject: [PATCH 02/11] More tests --- .../PubSubPermissionConsumer.spec.ts | 58 ++++++ .../PubSubConsumerErrorResolver.spec.ts | 69 +++++++ ...sionPublisher.messageDeduplication.spec.ts | 193 ++++++++++++++++++ .../publishers/PubSubPermissionPublisher.ts | 3 + packages/gcp-pubsub/vitest.config.ts | 14 +- 5 files changed, 333 insertions(+), 4 deletions(-) create mode 100644 packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts create mode 100644 packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts index dd6de237..4be81672 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts @@ -263,4 +263,62 @@ describe('PubSubPermissionConsumer', () => { }) }) }) + + describe('logMessages option', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('logs messages when logMessages is enabled', { timeout: 10000 }, async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: 'log_messages_test' }, + subscription: { name: 'log_messages_test_sub' }, + }, + logMessages: true, + }) + const publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: 'log_messages_test' }, + }, + }) + + await deletePubSubTopicAndSubscription( + pubSubClient, + 'log_messages_test', + 'log_messages_test_sub', + ) + await consumer.start() + await publisher.init() + + const message = { + id: 'log-test-1', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + await consumer.handlerSpy.waitForMessageWithId('log-test-1', 'consumed') + + // The message should be logged (we can't easily verify log output, but we verify + // that the code path with logMessages=true doesn't crash) + expect(consumer.addCounter).toBe(1) + + await consumer.close() + await publisher.close() + }) + }) }) diff --git a/packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts b/packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts new file mode 100644 index 00000000..8674bb8b --- /dev/null +++ b/packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts @@ -0,0 +1,69 @@ +import { InternalError } from '@lokalise/node-core' +import { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core' +import { describe, expect, it } from 'vitest' +import { ZodError } from 'zod/v4' +import { PubSubConsumerErrorResolver } from '../../lib/errors/PubSubConsumerErrorResolver.ts' + +describe('PubSubConsumerErrorResolver', () => { + const errorResolver = new PubSubConsumerErrorResolver() + + it('returns MessageInvalidFormatError for SyntaxError', () => { + const syntaxError = new SyntaxError('Unexpected token') + + const result = errorResolver.processError(syntaxError) + + expect(result).toBeInstanceOf(MessageInvalidFormatError) + expect(result.message).toBe('Unexpected token') + }) + + it('returns MessageValidationError for ZodError', () => { + const zodError = new ZodError([ + { + code: 'invalid_type', + expected: 'string', + path: ['field'], + message: 'Expected string, received number', + }, + ]) + + const result = errorResolver.processError(zodError) + + expect(result).toBeInstanceOf(MessageValidationError) + expect(result.message).toContain('invalid_type') + }) + + it('returns InternalError for StandardizedError', () => { + // Create an error that matches the StandardizedError interface + // StandardizedError requires: name, message, code, and Symbol.for('StandardizedErrorSymbol') = true + const standardizedError = Object.assign(new Error('Standardized error'), { + code: 'CUSTOM_CODE', + [Symbol.for('StandardizedErrorSymbol')]: true, + }) + + const result = errorResolver.processError(standardizedError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Standardized error') + expect(result.errorCode).toBe('CUSTOM_CODE') + }) + + it('returns InternalError for unknown errors', () => { + const unknownError = { message: 'Unknown error' } + + const result = errorResolver.processError(unknownError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) + + it('returns InternalError for regular Error', () => { + const regularError = new Error('Regular error') + + const result = errorResolver.processError(regularError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) +}) diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts new file mode 100644 index 00000000..c5a205ca --- /dev/null +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts @@ -0,0 +1,193 @@ +import { randomUUID } from 'node:crypto' +import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store' +import { type AwilixContainer, asValue } from 'awilix' +import type { Redis } from 'ioredis' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from '../consumers/userConsumerSchemas.ts' +import { deletePubSubTopic } from '../utils/cleanupPubSub.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { PubSubPermissionPublisher } from './PubSubPermissionPublisher.ts' + +describe('PubSubPermissionPublisher - Message Deduplication', () => { + const TOPIC_NAME = 'publisher_dedup_test' + + let diContainer: AwilixContainer + let redis: Redis + let messageDeduplicationStore: RedisMessageDeduplicationStore + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + redis = diContainer.cradle.redis + messageDeduplicationStore = new RedisMessageDeduplicationStore({ + redis, + }) + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + async function cleanRedis() { + const keys = await redis.keys('*publisher*') + if (keys.length > 0) { + await redis.del(...keys) + } + } + + describe('publisher deduplication', () => { + let publisher: PubSubPermissionPublisher + + beforeEach(async () => { + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + messageDeduplicationConfig: { + deduplicationStore: messageDeduplicationStore, + }, + enablePublisherDeduplication: true, + }) + + await deletePubSubTopic(diContainer.cradle.pubSubClient, TOPIC_NAME) + await cleanRedis() + await publisher.init() + }) + + afterEach(async () => { + vi.restoreAllMocks() + await publisher.close() + await cleanRedis() + }) + + it('publishes message and stores deduplication key', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + const spy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(spy.processingResult).toEqual({ status: 'published' }) + + // Verify deduplication key was stored + const deduplicationKeyExists = await messageDeduplicationStore.keyExists( + `publisher:${messageId}`, + ) + expect(deduplicationKeyExists).toBe(true) + }) + + it('skips duplicate message when already published', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + // First publish + await publisher.publish(message) + + const firstSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(firstSpy.processingResult).toEqual({ status: 'published' }) + + // Clear spy for subsequent call + publisher.handlerSpy.clear() + + // Second publish - should be skipped as duplicate + await publisher.publish(message) + + const secondSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(secondSpy.processingResult).toEqual({ + status: 'published', + skippedAsDuplicate: true, + }) + }) + + it('publishes messages with different IDs independently', async () => { + const message1: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: randomUUID(), + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + const message2: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: randomUUID(), + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user2'], + } + + // First message + await publisher.publish(message1) + const spy1 = await publisher.handlerSpy.waitForMessageWithId(message1.id, 'published') + expect(spy1.processingResult).toEqual({ status: 'published' }) + + publisher.handlerSpy.clear() + + // Second message with different ID + await publisher.publish(message2) + const spy2 = await publisher.handlerSpy.waitForMessageWithId(message2.id, 'published') + expect(spy2.processingResult).toEqual({ status: 'published' }) + }) + }) + + describe('publisher without deduplication', () => { + let publisher: PubSubPermissionPublisher + + beforeEach(async () => { + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + // No deduplication config + }) + + await deletePubSubTopic(diContainer.cradle.pubSubClient, TOPIC_NAME) + await cleanRedis() + await publisher.init() + }) + + afterEach(async () => { + await publisher.close() + await cleanRedis() + }) + + it('publishes duplicate messages when deduplication is disabled', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + // First publish + await publisher.publish(message) + const firstSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(firstSpy.processingResult).toEqual({ status: 'published' }) + + publisher.handlerSpy.clear() + + // Second publish - should also be published (no deduplication) + await publisher.publish(message) + const secondSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(secondSpy.processingResult).toEqual({ status: 'published' }) + + // Verify no deduplication key was stored + const deduplicationKeyExists = await messageDeduplicationStore.keyExists( + `publisher:${messageId}`, + ) + expect(deduplicationKeyExists).toBe(false) + }) + }) +}) diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts index c5db7763..ec46cf47 100644 --- a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts @@ -1,3 +1,4 @@ +import type { MessageDeduplicationConfig } from '@message-queue-toolkit/core' import type { PubSubMessageOptions } from '../../lib/pubsub/AbstractPubSubPublisher.ts' import { AbstractPubSubPublisher } from '../../lib/pubsub/AbstractPubSubPublisher.ts' import type { PubSubDependencies } from '../../lib/pubsub/AbstractPubSubService.ts' @@ -27,6 +28,7 @@ type PubSubPermissionPublisherOptions = { } payloadStoreConfig?: any enablePublisherDeduplication?: boolean + messageDeduplicationConfig?: MessageDeduplicationConfig } export class PubSubPermissionPublisher extends AbstractPubSubPublisher { @@ -47,6 +49,7 @@ export class PubSubPermissionPublisher extends AbstractPubSubPublisher Date: Thu, 11 Dec 2025 12:33:52 +0200 Subject: [PATCH 03/11] Cleanup --- ...ubPermissionConsumer.deduplication.spec.ts | 9 +------- ...rmissionConsumer.subscriptionRetry.spec.ts | 21 ------------------- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts index fd0d15e7..64b49cab 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts @@ -43,14 +43,7 @@ describe('PubSubPermissionConsumer - Deduplication', () => { }) async function cleanRedis() { - const keys = await redis.keys('*consumer*') - if (keys.length > 0) { - await redis.del(...keys) - } - const mutexKeys = await redis.keys('*mutex*') - if (mutexKeys.length > 0) { - await redis.del(...mutexKeys) - } + await redis.flushAll() } describe('consumer deduplication', () => { diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts index 6f2e90f2..f2734011 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts @@ -12,27 +12,6 @@ describe('PubSubPermissionConsumer - Subscription Retry', () => { const TOPIC_NAME = 'user_permissions_retry_test' const SUBSCRIPTION_NAME = 'user_permissions_retry_test_sub' - // Unit tests that don't require infrastructure - describe('exponential backoff calculation', () => { - it('calculates correct delays for each attempt', () => { - // Test the exponential backoff formula: min(baseDelay * 2^(attempt-1), maxDelay) - const baseDelay = 1000 - const maxDelay = 30000 - - const calculateDelay = (attempt: number) => - Math.min(baseDelay * Math.pow(2, attempt - 1), maxDelay) - - expect(calculateDelay(1)).toBe(1000) // 1000 * 2^0 = 1000 - expect(calculateDelay(2)).toBe(2000) // 1000 * 2^1 = 2000 - expect(calculateDelay(3)).toBe(4000) // 1000 * 2^2 = 4000 - expect(calculateDelay(4)).toBe(8000) // 1000 * 2^3 = 8000 - expect(calculateDelay(5)).toBe(16000) // 1000 * 2^4 = 16000 - expect(calculateDelay(6)).toBe(30000) // 1000 * 2^5 = 32000, capped at 30000 - expect(calculateDelay(10)).toBe(30000) // Any higher attempt is capped - }) - }) - - // Integration tests that require PubSub emulator and Redis describe('subscriptionRetryOptions configuration', () => { let diContainer: AwilixContainer let pubSubClient: PubSub From 8af627ab492d9f01ad8b635db2c4d7380daf455e Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 12:34:36 +0200 Subject: [PATCH 04/11] second cleanup --- .../PubSubPermissionPublisher.messageDeduplication.spec.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts index c5a205ca..a66bdec0 100644 --- a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts @@ -33,10 +33,7 @@ describe('PubSubPermissionPublisher - Message Deduplication', () => { }) async function cleanRedis() { - const keys = await redis.keys('*publisher*') - if (keys.length > 0) { - await redis.del(...keys) - } + await redis.flushAll() } describe('publisher deduplication', () => { From d744f67c186ddcd0802ac7cbe9c8bcac97bf6329 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 12:44:53 +0200 Subject: [PATCH 05/11] Implement reconnect at start --- .../lib/pubsub/AbstractPubSubConsumer.ts | 58 +++++- ...ubPermissionConsumer.deduplication.spec.ts | 2 +- ...rmissionConsumer.subscriptionRetry.spec.ts | 178 +++++++++++++++++- ...sionPublisher.messageDeduplication.spec.ts | 2 +- packages/gcp-pubsub/vitest.config.ts | 8 +- 5 files changed, 240 insertions(+), 8 deletions(-) diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 9a2fdce2..c2ab99d1 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -255,7 +255,7 @@ export abstract class AbstractPubSubConsumer< return } - await this.init() + await this.initWithRetry() if (!this.subscription) { throw new Error('Subscription not initialized after init()') @@ -269,6 +269,62 @@ export abstract class AbstractPubSubConsumer< this.setupSubscriptionEventHandlers() } + /** + * Initializes the consumer with retry logic for transient errors. + * + * This handles eventual consistency issues that can occur after Terraform deployments + * where topics/subscriptions may not be immediately visible across all GCP endpoints. + * + * @param attempt - Current retry attempt number (1-based) + */ + private async initWithRetry(attempt = 1): Promise { + try { + await this.init() + } catch (error) { + // Check if we should retry + if (attempt >= this.subscriptionRetryOptions.maxRetries) { + this.logger.error({ + msg: `Failed to initialize subscription after ${attempt} attempts`, + subscriptionName: + this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, + topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, + error, + }) + throw error + } + + // Check if error is retryable (NOT_FOUND type errors from initPubSub) + const errorMessage = error instanceof Error ? error.message : String(error) + const isRetryable = + errorMessage.includes('does not exist') || + errorMessage.includes('NOT_FOUND') || + errorMessage.includes('PERMISSION_DENIED') + + if (!isRetryable) { + throw error + } + + // Calculate delay with exponential backoff + const delay = Math.min( + this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), + this.subscriptionRetryOptions.maxRetryDelayMs, + ) + + this.logger.warn({ + msg: `Retryable error during initialization, attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, + subscriptionName: + this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, + topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, + errorMessage, + attempt, + delayMs: delay, + }) + + await new Promise((resolve) => setTimeout(resolve, delay)) + await this.initWithRetry(attempt + 1) + } + } + /** * Sets up event handlers for the subscription. * Extracted to allow reattachment after reinitialization. diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts index 64b49cab..50cc00f2 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts @@ -43,7 +43,7 @@ describe('PubSubPermissionConsumer - Deduplication', () => { }) async function cleanRedis() { - await redis.flushAll() + await redis.flushall() } describe('consumer deduplication', () => { diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts index f2734011..f3839190 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts @@ -1,9 +1,14 @@ +import { setTimeout } from 'node:timers/promises' + import type { PubSub } from '@google-cloud/pubsub' import type { AwilixContainer } from 'awilix' import { asValue } from 'awilix' import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' import { PubSubPermissionPublisher } from '../publishers/PubSubPermissionPublisher.ts' -import { deletePubSubTopicAndSubscription } from '../utils/cleanupPubSub.ts' +import { + deletePubSubSubscription, + deletePubSubTopicAndSubscription, +} from '../utils/cleanupPubSub.ts' import type { Dependencies } from '../utils/testContext.ts' import { registerDependencies } from '../utils/testContext.ts' import { PubSubPermissionConsumer } from './PubSubPermissionConsumer.ts' @@ -262,4 +267,175 @@ describe('PubSubPermissionConsumer - Subscription Retry', () => { await consumer.close() }) }) + + describe('reconnection behavior', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + afterEach(async () => { + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + }) + + it( + 'resubscribes after subscription is temporarily unavailable', + { timeout: 30000 }, + async () => { + expect.assertions(2) + + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + const publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + try { + await consumer.start() + await publisher.init() + + // Verify consumer is working initially + const message1 = { + id: 'reconnect-test-1', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message1) + await consumer.handlerSpy.waitForMessageWithId('reconnect-test-1', 'consumed') + expect(consumer.addCounter).toBe(1) + + // Delete the subscription while consumer is running + await deletePubSubSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + + // Wait for consumer to detect the error and reconnect + // The consumer should automatically recreate the subscription via creationConfig + await setTimeout(5000) + + // Verify consumer can process messages after reconnection + const message2 = { + id: 'reconnect-test-2', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user2'], + } + + await publisher.publish(message2) + await consumer.handlerSpy.waitForMessageWithId('reconnect-test-2', 'consumed') + expect(consumer.addCounter).toBe(2) + } finally { + await consumer.close() + await publisher.close() + } + }, + ) + + it( + 'retries initialization when subscription does not exist initially', + { timeout: 20000 }, + async () => { + expect.assertions(1) + + // First create the topic only (no subscription) + const topic = pubSubClient.topic(TOPIC_NAME) + const [topicExists] = await topic.exists() + if (!topicExists) { + await topic.create() + } + + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + locatorConfig: { + topicName: TOPIC_NAME, + subscriptionName: SUBSCRIPTION_NAME, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + + // Create subscription after a delay (simulating eventual consistency) + globalThis.setTimeout(async () => { + await topic.createSubscription(SUBSCRIPTION_NAME) + }, 1500) + + try { + // This should retry and eventually succeed when subscription is created + await consumer.start() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(true) + } finally { + await consumer.close() + } + }, + ) + + it('does not attempt reconnection after close is called', { timeout: 15000 }, async () => { + expect.assertions(4) + + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + + try { + await consumer.start() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(true) + + // Close the consumer + await consumer.close() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(false) + + // Delete subscription - this should NOT trigger reconnection since consumer is closed + await deletePubSubSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + + // Wait a bit to ensure no reconnection attempt happens + await setTimeout(2000) + + // Verify consumer is still closed and not attempting to reconnect + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(false) + // @ts-expect-error - accessing private field for testing + expect(consumer.isReinitializing).toBe(false) + } finally { + await consumer.close() + } + }) + }) }) diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts index a66bdec0..32e1a6ae 100644 --- a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts @@ -33,7 +33,7 @@ describe('PubSubPermissionPublisher - Message Deduplication', () => { }) async function cleanRedis() { - await redis.flushAll() + await redis.flushall() } describe('publisher deduplication', () => { diff --git a/packages/gcp-pubsub/vitest.config.ts b/packages/gcp-pubsub/vitest.config.ts index 157a9eb9..ab86aefa 100644 --- a/packages/gcp-pubsub/vitest.config.ts +++ b/packages/gcp-pubsub/vitest.config.ts @@ -22,10 +22,10 @@ export default defineConfig({ 'lib/types/MessageTypes.ts', ], thresholds: { - lines: 79, - functions: 88, - branches: 74, - statements: 79, + lines: 85, + functions: 92, + branches: 75, + statements: 85, }, }, }, From 94e2a6d6860591ee9e7445b5d374e1b85b1f0698 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 12:45:43 +0200 Subject: [PATCH 06/11] Add documentation --- packages/gcp-pubsub/README.md | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/packages/gcp-pubsub/README.md b/packages/gcp-pubsub/README.md index 61104bbb..bb3c327f 100644 --- a/packages/gcp-pubsub/README.md +++ b/packages/gcp-pubsub/README.md @@ -1498,9 +1498,24 @@ The consumer provides a higher-level error recovery mechanism that complements t 2. **Subscription stream disconnections** may not automatically reconnect in all cases 3. **Infrastructure changes** (e.g., after Terraform deployments) may require full subscription reinitialization +#### Initialization Retry + +When calling `start()`, the consumer will automatically retry initialization if it encounters retryable errors. This is particularly useful when: + +- Using `locatorConfig` and the subscription doesn't exist yet due to eventual consistency +- Services start in parallel and the subscription is being created by another process +- Terraform deployments are still propagating + +The retry logic handles errors containing: +- `does not exist` - Resource not yet visible +- `NOT_FOUND` - gRPC error code 5 +- `PERMISSION_DENIED` - gRPC error code 7 (IAM propagation delay) + +#### Runtime Reconnection + **Retryable Error Codes:** -*Errors the consumer handles via reinitialization:* +*Errors the consumer handles via reinitialization during runtime:* - `DEADLINE_EXCEEDED` (4): Request timeout that SDK retry couldn't resolve - `NOT_FOUND` (5): Subscription may not be propagated yet (eventual consistency) - `PERMISSION_DENIED` (7): IAM permissions may not be propagated yet (eventual consistency) @@ -1522,6 +1537,8 @@ After Terraform deployments, GCP resources and IAM permissions can take several **Configuration:** +The same retry options apply to both initialization and runtime reconnection: + ```typescript class MyConsumer extends AbstractPubSubConsumer { constructor(dependencies: PubSubConsumerDependencies) { @@ -1530,7 +1547,7 @@ class MyConsumer extends AbstractPubSubConsumer { { // ... other options ... - // Optional: Configure subscription retry behavior + // Optional: Configure retry behavior for both init and runtime errors subscriptionRetryOptions: { maxRetries: 5, // Maximum retry attempts (default: 5) baseRetryDelayMs: 1000, // Base delay for exponential backoff (default: 1000ms) From 235b7c416274807d455d39ab6a54aa0031be7aa4 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 13:04:03 +0200 Subject: [PATCH 07/11] Improve error structure --- .../errors/SubscriptionDoesNotExistError.ts | 20 +++++++++++++++++++ packages/gcp-pubsub/lib/index.ts | 1 + .../lib/pubsub/AbstractPubSubConsumer.ts | 3 ++- .../gcp-pubsub/lib/utils/pubSubInitter.ts | 3 ++- 4 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts diff --git a/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts new file mode 100644 index 00000000..615e8672 --- /dev/null +++ b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts @@ -0,0 +1,20 @@ +export class SubscriptionDoesNotExistError extends Error { + public readonly subscriptionName: string + + constructor(subscriptionName: string) { + super(`Subscription ${subscriptionName} does not exist`) + this.name = 'SubscriptionDoesNotExistError' + this.subscriptionName = subscriptionName + } +} + +export function isSubscriptionDoesNotExistError( + error: unknown, +): error is SubscriptionDoesNotExistError { + return ( + typeof error === 'object' && + error !== null && + 'name' in error && + error.name === 'SubscriptionDoesNotExistError' + ) +} diff --git a/packages/gcp-pubsub/lib/index.ts b/packages/gcp-pubsub/lib/index.ts index 70c25d88..4a92f5b0 100644 --- a/packages/gcp-pubsub/lib/index.ts +++ b/packages/gcp-pubsub/lib/index.ts @@ -1,4 +1,5 @@ export * from './errors/PubSubConsumerErrorResolver.ts' +export * from './errors/SubscriptionDoesNotExistError.ts' export * from './fakes/FakeConsumerErrorResolver.ts' export * from './fakes/TestPubSubPublisher.ts' export * from './pubsub/AbstractPubSubConsumer.ts' diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index c2ab99d1..1c64eed2 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -18,6 +18,7 @@ import { type QueueConsumerOptions, type TransactionObservabilityManager, } from '@message-queue-toolkit/core' +import { isSubscriptionDoesNotExistError } from '../errors/SubscriptionDoesNotExistError.ts' import type { PubSubMessage } from '../types/MessageTypes.ts' import { hasOffloadedPayload } from '../utils/messageUtils.ts' import { deletePubSub, initPubSub } from '../utils/pubSubInitter.ts' @@ -296,7 +297,7 @@ export abstract class AbstractPubSubConsumer< // Check if error is retryable (NOT_FOUND type errors from initPubSub) const errorMessage = error instanceof Error ? error.message : String(error) const isRetryable = - errorMessage.includes('does not exist') || + isSubscriptionDoesNotExistError(error) || errorMessage.includes('NOT_FOUND') || errorMessage.includes('PERMISSION_DENIED') diff --git a/packages/gcp-pubsub/lib/utils/pubSubInitter.ts b/packages/gcp-pubsub/lib/utils/pubSubInitter.ts index 6bdab4fa..fb7ad1e7 100644 --- a/packages/gcp-pubsub/lib/utils/pubSubInitter.ts +++ b/packages/gcp-pubsub/lib/utils/pubSubInitter.ts @@ -1,6 +1,7 @@ import type { PubSub, Subscription, Topic } from '@google-cloud/pubsub' import type { DeletionConfig } from '@message-queue-toolkit/core' import { isProduction, waitAndRetry } from '@message-queue-toolkit/core' +import { SubscriptionDoesNotExistError } from '../errors/SubscriptionDoesNotExistError.ts' import type { PubSubCreationConfig, PubSubQueueLocatorType, @@ -90,7 +91,7 @@ export async function initPubSub( const [subscriptionExists] = await subscription.exists() if (!subscriptionExists) { - throw new Error(`Subscription ${subscriptionName} does not exist`) + throw new SubscriptionDoesNotExistError(subscriptionName) } } } else if (creationConfig) { From 046d8fb4d09d93b74310e56dcea2e3dd81d29e43 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 13:40:47 +0200 Subject: [PATCH 08/11] Status code management cleanup --- packages/gcp-pubsub/lib/errors/grpcErrors.ts | 74 +++++++++++++++++++ packages/gcp-pubsub/lib/index.ts | 1 + .../lib/pubsub/AbstractPubSubConsumer.ts | 60 +++------------ packages/gcp-pubsub/package.json | 1 + 4 files changed, 87 insertions(+), 49 deletions(-) create mode 100644 packages/gcp-pubsub/lib/errors/grpcErrors.ts diff --git a/packages/gcp-pubsub/lib/errors/grpcErrors.ts b/packages/gcp-pubsub/lib/errors/grpcErrors.ts new file mode 100644 index 00000000..7dd3b2a6 --- /dev/null +++ b/packages/gcp-pubsub/lib/errors/grpcErrors.ts @@ -0,0 +1,74 @@ +import { status as GrpcStatus } from '@grpc/grpc-js' + +/** + * gRPC status codes for which subscription operations should be retried. + * + * Includes both: + * 1. GCP-documented retryable errors (DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, INTERNAL, UNAVAILABLE) + * 2. Eventual consistency errors common after Terraform deployments (NOT_FOUND, PERMISSION_DENIED) + * + * **Why PERMISSION_DENIED is included:** + * After Terraform deployments, IAM permissions can take several minutes to propagate across + * GCP's distributed infrastructure. During this window, the subscription may report + * PERMISSION_DENIED even though permissions are correctly configured. + * + * **Why NOT_FOUND is included:** + * Similar to PERMISSION_DENIED, newly created subscriptions may not be immediately visible + * across all GCP endpoints due to eventual consistency. + * + * @see https://cloud.google.com/pubsub/docs/reference/error-codes + * @see https://github.com/googleapis/nodejs-pubsub/issues/979 + */ +export const RETRYABLE_GRPC_STATUS_CODES = [ + GrpcStatus.DEADLINE_EXCEEDED, + GrpcStatus.NOT_FOUND, + GrpcStatus.PERMISSION_DENIED, + GrpcStatus.RESOURCE_EXHAUSTED, + GrpcStatus.INTERNAL, + GrpcStatus.UNAVAILABLE, +] as const + +export type RetryableGrpcStatusCode = (typeof RETRYABLE_GRPC_STATUS_CODES)[number] + +const RETRYABLE_CODES_SET = new Set(RETRYABLE_GRPC_STATUS_CODES) + +/** + * Type for errors with a numeric gRPC status code. + */ +export type GrpcError = Error & { code: number } + +/** + * Checks if an error has a gRPC status code property. + * + * @param error - The error to check + * @returns true if the error has a numeric `code` property + */ +export function isGrpcError(error: unknown): error is GrpcError { + return error instanceof Error && 'code' in error && typeof (error as GrpcError).code === 'number' +} + +/** + * Checks if an error is a gRPC error with a retryable status code. + * + * @param error - The error to check + * @returns true if the error has a retryable gRPC status code + */ +export function isRetryableGrpcError(error: unknown): error is GrpcError { + return isGrpcError(error) && RETRYABLE_CODES_SET.has(error.code) +} + +/** + * Gets the gRPC status code from an error if it has one. + * + * @param error - The error to extract the code from + * @returns The gRPC status code, or undefined if not a gRPC error + */ +export function getGrpcStatusCode(error: unknown): number | undefined { + if (isGrpcError(error)) { + return error.code + } + return undefined +} + +// Re-export for convenience +export { GrpcStatus } diff --git a/packages/gcp-pubsub/lib/index.ts b/packages/gcp-pubsub/lib/index.ts index 4a92f5b0..a6dbd4a6 100644 --- a/packages/gcp-pubsub/lib/index.ts +++ b/packages/gcp-pubsub/lib/index.ts @@ -1,3 +1,4 @@ +export * from './errors/grpcErrors.ts' export * from './errors/PubSubConsumerErrorResolver.ts' export * from './errors/SubscriptionDoesNotExistError.ts' export * from './fakes/FakeConsumerErrorResolver.ts' diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 1c64eed2..d54e5148 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -18,6 +18,7 @@ import { type QueueConsumerOptions, type TransactionObservabilityManager, } from '@message-queue-toolkit/core' +import { isRetryableGrpcError } from '../errors/grpcErrors.ts' import { isSubscriptionDoesNotExistError } from '../errors/SubscriptionDoesNotExistError.ts' import type { PubSubMessage } from '../types/MessageTypes.ts' import { hasOffloadedPayload } from '../utils/messageUtils.ts' @@ -30,39 +31,8 @@ import type { } from './AbstractPubSubService.ts' import { AbstractPubSubService } from './AbstractPubSubService.ts' -const _ABORT_EARLY_EITHER: Either<'abort', never> = { - error: 'abort', -} const DEFAULT_MAX_RETRY_DURATION = 4 * 24 * 60 * 60 // 4 days in seconds -/** - * gRPC status codes for which subscription operations should be retried. - * - * Includes both: - * 1. GCP-documented retryable errors (DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, INTERNAL, UNAVAILABLE) - * 2. Eventual consistency errors common after Terraform deployments (NOT_FOUND, PERMISSION_DENIED) - * - * **Why PERMISSION_DENIED is included:** - * After Terraform deployments, IAM permissions can take several minutes to propagate across - * GCP's distributed infrastructure. During this window, the subscription may report - * PERMISSION_DENIED even though permissions are correctly configured. - * - * **Why NOT_FOUND is included:** - * Similar to PERMISSION_DENIED, newly created subscriptions may not be immediately visible - * across all GCP endpoints due to eventual consistency. - * - * @see https://cloud.google.com/pubsub/docs/reference/error-codes - * @see https://github.com/googleapis/nodejs-pubsub/issues/979 - */ -const RETRYABLE_SUBSCRIPTION_ERROR_CODES = [ - 4, // DEADLINE_EXCEEDED - Request timeout, may succeed on retry - 5, // NOT_FOUND - Subscription may not be propagated yet (eventual consistency) - 7, // PERMISSION_DENIED - IAM permissions may not be propagated yet (eventual consistency) - 8, // RESOURCE_EXHAUSTED - Quota exceeded, retry with backoff - 13, // INTERNAL - Server error, should be transient - 14, // UNAVAILABLE - Service temporarily unable to process -] as const - /** * Default configuration for subscription error retry behavior. */ @@ -294,14 +264,11 @@ export abstract class AbstractPubSubConsumer< throw error } - // Check if error is retryable (NOT_FOUND type errors from initPubSub) - const errorMessage = error instanceof Error ? error.message : String(error) - const isRetryable = - isSubscriptionDoesNotExistError(error) || - errorMessage.includes('NOT_FOUND') || - errorMessage.includes('PERMISSION_DENIED') + // Check if error is retryable using gRPC status codes + const isRetryableSubscriptionError = isSubscriptionDoesNotExistError(error) + const isRetryableGrpc = isRetryableGrpcError(error) - if (!isRetryable) { + if (!isRetryableSubscriptionError && !isRetryableGrpc) { throw error } @@ -316,7 +283,9 @@ export abstract class AbstractPubSubConsumer< subscriptionName: this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, - errorMessage, + errorCode: isRetryableGrpc ? error.code : undefined, + errorMessage: + isRetryableGrpc || isRetryableSubscriptionError ? error.message : String(error), attempt, delayMs: delay, }) @@ -375,20 +344,13 @@ export abstract class AbstractPubSubConsumer< return } - const errorCode = error.code - - // Check if this is a retryable subscription error - if ( - errorCode !== undefined && - RETRYABLE_SUBSCRIPTION_ERROR_CODES.includes( - errorCode as (typeof RETRYABLE_SUBSCRIPTION_ERROR_CODES)[number], - ) - ) { + // Check if this is a retryable subscription error using gRPC status codes + if (isRetryableGrpcError(error)) { this.logger.warn({ msg: 'Retryable subscription error occurred, attempting to reinitialize', subscriptionName: this.subscriptionName, topicName: this.topicName, - errorCode, + errorCode: error.code, errorMessage: error.message, }) diff --git a/packages/gcp-pubsub/package.json b/packages/gcp-pubsub/package.json index 5b42a4fd..f06a3d60 100644 --- a/packages/gcp-pubsub/package.json +++ b/packages/gcp-pubsub/package.json @@ -27,6 +27,7 @@ "prepublishOnly": "npm run lint && npm run build" }, "dependencies": { + "@grpc/grpc-js": "^1.14.2", "@lokalise/node-core": "^14.6.1" }, "peerDependencies": { From 196dbd6549d806b9b5af6e48dc5fa8a8ad10da84 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 13:45:08 +0200 Subject: [PATCH 09/11] minor cleanup --- .../errors/SubscriptionDoesNotExistError.ts | 4 +++- packages/gcp-pubsub/lib/errors/grpcErrors.ts | 21 ++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts index 615e8672..5f134142 100644 --- a/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts +++ b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts @@ -15,6 +15,8 @@ export function isSubscriptionDoesNotExistError( typeof error === 'object' && error !== null && 'name' in error && - error.name === 'SubscriptionDoesNotExistError' + 'message' in error && + error.name === 'SubscriptionDoesNotExistError' && + typeof error.message === 'string' ) } diff --git a/packages/gcp-pubsub/lib/errors/grpcErrors.ts b/packages/gcp-pubsub/lib/errors/grpcErrors.ts index 7dd3b2a6..3db41410 100644 --- a/packages/gcp-pubsub/lib/errors/grpcErrors.ts +++ b/packages/gcp-pubsub/lib/errors/grpcErrors.ts @@ -33,18 +33,29 @@ export type RetryableGrpcStatusCode = (typeof RETRYABLE_GRPC_STATUS_CODES)[numbe const RETRYABLE_CODES_SET = new Set(RETRYABLE_GRPC_STATUS_CODES) /** - * Type for errors with a numeric gRPC status code. + * Type for errors with a numeric gRPC status code and message. */ -export type GrpcError = Error & { code: number } +export type GrpcError = { + code: number + message: string +} /** - * Checks if an error has a gRPC status code property. + * Checks if an error has gRPC error properties (code and message). + * Uses duck typing to avoid fragile instanceof checks. * * @param error - The error to check - * @returns true if the error has a numeric `code` property + * @returns true if the error has numeric `code` and string `message` properties */ export function isGrpcError(error: unknown): error is GrpcError { - return error instanceof Error && 'code' in error && typeof (error as GrpcError).code === 'number' + return ( + typeof error === 'object' && + error !== null && + 'code' in error && + 'message' in error && + typeof (error as GrpcError).code === 'number' && + typeof (error as GrpcError).message === 'string' + ) } /** From 9dccfe35fc975f75a86bfaaac4f1af90dcf78ce3 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 13:57:50 +0200 Subject: [PATCH 10/11] Address CodeRabbit concerns --- packages/gcp-pubsub/README.md | 31 ++- .../errors/SubscriptionDoesNotExistError.ts | 4 +- .../lib/pubsub/AbstractPubSubConsumer.ts | 185 ++++++++++-------- 3 files changed, 137 insertions(+), 83 deletions(-) diff --git a/packages/gcp-pubsub/README.md b/packages/gcp-pubsub/README.md index bb3c327f..982325da 100644 --- a/packages/gcp-pubsub/README.md +++ b/packages/gcp-pubsub/README.md @@ -33,6 +33,7 @@ Google Cloud Pub/Sub implementation for the message-queue-toolkit. Provides a ro - [Consumer Flow Control](#consumer-flow-control) - [Multiple Message Types](#multiple-message-types) - [Error Handling](#error-handling) + - [Subscription-Level Error Handling](#subscription-level-error-handling) - [Testing](#testing) - [TestPubSubPublisher](#testpubsubpublisher) - [Integration Tests with Emulator](#integration-tests-with-emulator) @@ -1561,7 +1562,7 @@ class MyConsumer extends AbstractPubSubConsumer { ``` **Exponential Backoff Formula:** -``` +```text delay = min(baseRetryDelayMs * 2^(attempt-1), maxRetryDelayMs) ``` @@ -1571,6 +1572,30 @@ With default settings, delays are: 1s, 2s, 4s, 8s, 16s (capped at 30s). The consumer also handles unexpected subscription closures (e.g., network issues, GCP service restarts). If the subscription closes while the consumer is still supposed to be consuming, it will automatically attempt reinitialization. +**Health Check Integration:** + +If all retry attempts are exhausted, the consumer enters a failed state. You can detect this via the `fatalError` getter for health check integration: + +```typescript +// In your health check endpoint +app.get('/health', (req, res) => { + const error = consumer.fatalError + if (error) { + return res.status(503).json({ + status: 'unhealthy', + error: error.message, + }) + } + return res.status(200).json({ status: 'healthy' }) +}) +``` + +The `fatalError` property returns: +- `null` when the consumer is healthy +- `Error` when the consumer has permanently failed (e.g., after exhausting all retry attempts) + +This allows your application to properly report unhealthy status to orchestration systems (Kubernetes, etc.) and trigger appropriate remediation (pod restart, alerting, etc.). + **References:** - [GCP Pub/Sub Error Codes](https://cloud.google.com/pubsub/docs/reference/error-codes) - [GCP Pub/Sub Troubleshooting](https://cloud.google.com/pubsub/docs/troubleshooting) @@ -1861,6 +1886,7 @@ it('publishes message', async () => { - `deadLetterQueue`: DLQ configuration - `maxRetryDuration`: Max retry time in seconds - `consumerOverrides`: Flow control settings +- `subscriptionRetryOptions`: Retry configuration for subscription errors (see [Subscription-Level Error Handling](#subscription-level-error-handling)) **Methods:** - `init()`: Initialize consumer (create/locate resources) @@ -1868,6 +1894,9 @@ it('publishes message', async () => { - `close()`: Stop consumer and close connections - `handlerSpy`: Access spy for testing +**Properties:** +- `fatalError`: Returns `Error` if consumer has permanently failed, `null` otherwise (for health checks) + ## Best Practices 1. **Use message ordering** for related events (same user, same entity) diff --git a/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts index 5f134142..2eb83afa 100644 --- a/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts +++ b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts @@ -16,7 +16,9 @@ export function isSubscriptionDoesNotExistError( error !== null && 'name' in error && 'message' in error && + 'subscriptionName' in error && error.name === 'SubscriptionDoesNotExistError' && - typeof error.message === 'string' + typeof error.message === 'string' && + typeof error.subscriptionName === 'string' ) } diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index d54e5148..2e4c34a7 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -1,4 +1,4 @@ -import type { Either, ErrorResolver } from '@lokalise/node-core' +import { type Either, type ErrorResolver, isError } from '@lokalise/node-core' import type { MessageInvalidFormatError, MessageValidationError, @@ -160,6 +160,7 @@ export abstract class AbstractPubSubConsumer< private maxRetryDuration: number private isConsuming = false private isReinitializing = false + private _fatalError: Error | null = null protected readonly errorResolver: ErrorResolver protected readonly executionContext: ExecutionContext @@ -167,6 +168,25 @@ export abstract class AbstractPubSubConsumer< public dlqTopicName?: string public readonly _messageSchemaContainer: MessageSchemaContainer + /** + * Returns the fatal error that caused the consumer to stop, or null if healthy. + * Use this in health checks to detect permanent subscription failures. + * + * @example + * ```typescript + * app.get('/health', (req, res) => { + * const error = consumer.fatalError + * if (error) { + * return res.status(503).json({ status: 'unhealthy', error: error.message }) + * } + * return res.status(200).json({ status: 'healthy' }) + * }) + * ``` + */ + public get fatalError(): Error | null { + return this._fatalError + } + protected constructor( dependencies: PubSubConsumerDependencies, options: ConsumerOptionsType, @@ -356,14 +376,10 @@ export abstract class AbstractPubSubConsumer< // Trigger reinitialization with retry this.reinitializeWithRetry(1).catch((reinitError) => { - this.logger.error({ - msg: 'Failed to reinitialize subscription after retryable error', - subscriptionName: this.subscriptionName, - topicName: this.topicName, - error: reinitError, - }) - // Re-throw to surface the error - consumer is now in a failed state - throw reinitError + // Mark consumer as failed - this will be visible via fatalError getter for health checks + this._fatalError = isError(reinitError) ? reinitError : new Error(String(reinitError)) + this.isConsuming = false + this.handleError(reinitError) }) } else { // Non-retryable error - log and report @@ -397,12 +413,10 @@ export abstract class AbstractPubSubConsumer< }) this.reinitializeWithRetry(1).catch((reinitError) => { - this.logger.error({ - msg: 'Failed to reinitialize subscription after unexpected close', - subscriptionName: this.subscriptionName, - topicName: this.topicName, - error: reinitError, - }) + // Mark consumer as failed - this will be visible via fatalError getter for health checks + this._fatalError = isError(reinitError) ? reinitError : new Error(String(reinitError)) + this.isConsuming = false + this.handleError(reinitError) }) } } @@ -416,9 +430,13 @@ export abstract class AbstractPubSubConsumer< * 3. Reinitializes the subscription * 4. Reattaches event handlers * - * @param attempt - Current retry attempt number (1-based) + * Uses an iterative loop to keep isReinitializing true for the entire + * retry sequence, preventing concurrent callers from starting their own + * reinitialization attempts. + * + * @param startAttempt - Starting retry attempt number (1-based) */ - private async reinitializeWithRetry(attempt: number): Promise { + private async reinitializeWithRetry(startAttempt: number): Promise { // Prevent concurrent reinitializations if (this.isReinitializing) { this.logger.debug({ @@ -428,86 +446,91 @@ export abstract class AbstractPubSubConsumer< return } - // Check if we've exceeded max retries - if (attempt > this.subscriptionRetryOptions.maxRetries) { - const error = new Error( - `Failed to reinitialize subscription ${this.subscriptionName} after ${this.subscriptionRetryOptions.maxRetries} attempts`, - ) - this.handleError(error) - throw error - } - this.isReinitializing = true try { - // Calculate delay with exponential backoff - const delay = Math.min( - this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), - this.subscriptionRetryOptions.maxRetryDelayMs, - ) - - this.logger.info({ - msg: `Reinitialization attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, - subscriptionName: this.subscriptionName, - topicName: this.topicName, - attempt, - delayMs: delay, - }) - - // Wait before retry - await new Promise((resolve) => setTimeout(resolve, delay)) + for ( + let attempt = startAttempt; + attempt <= this.subscriptionRetryOptions.maxRetries; + attempt++ + ) { + // Calculate delay with exponential backoff + const delay = Math.min( + this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), + this.subscriptionRetryOptions.maxRetryDelayMs, + ) - // Don't continue if we've been stopped during the wait - if (!this.isConsuming) { this.logger.info({ - msg: 'Consumer stopped during reinitialization wait, aborting', + msg: `Reinitialization attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + delayMs: delay, }) - return - } - // Close existing subscription to remove old event handlers - if (this.subscription) { - try { - this.subscription.removeAllListeners() - await this.subscription.close() - } catch { - // Ignore close errors - subscription may already be closed + // Wait before retry + await new Promise((resolve) => setTimeout(resolve, delay)) + + // Don't continue if we've been stopped during the wait + if (!this.isConsuming) { + this.logger.info({ + msg: 'Consumer stopped during reinitialization wait, aborting', + subscriptionName: this.subscriptionName, + }) + return } - } - // Reinitialize - await this.init() + try { + // Close existing subscription to remove old event handlers + if (this.subscription) { + try { + this.subscription.removeAllListeners() + await this.subscription.close() + } catch { + // Ignore close errors - subscription may already be closed + } + } - if (!this.subscription) { - throw new Error('Subscription not initialized after init()') - } + // Reinitialize + await this.init() - // Wait for subscription to be ready - await this.waitForSubscriptionReady() + if (!this.subscription) { + throw new Error('Subscription not initialized after init()') + } - // Reattach event handlers - this.setupSubscriptionEventHandlers() + // Wait for subscription to be ready + await this.waitForSubscriptionReady() - this.logger.info({ - msg: 'Successfully reinitialized subscription', - subscriptionName: this.subscriptionName, - topicName: this.topicName, - attempt, - }) - } catch (error) { - this.logger.warn({ - msg: `Reinitialization attempt ${attempt} failed, will retry`, - subscriptionName: this.subscriptionName, - topicName: this.topicName, - attempt, - error, - }) + // Reattach event handlers + this.setupSubscriptionEventHandlers() - this.isReinitializing = false + this.logger.info({ + msg: 'Successfully reinitialized subscription', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + }) - // Retry with incremented attempt count - await this.reinitializeWithRetry(attempt + 1) + // Success - exit the retry loop + return + } catch (error) { + this.logger.warn({ + msg: `Reinitialization attempt ${attempt} failed, will retry`, + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + error, + }) + // Continue to next iteration + } + } + + // All retries exhausted + const error = new Error( + `Failed to reinitialize subscription ${this.subscriptionName} after ${this.subscriptionRetryOptions.maxRetries} attempts`, + ) + this.handleError(error) + throw error } finally { this.isReinitializing = false } From 07fd7317bc35b9db10ae937a5a96604f5c3fba45 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 11 Dec 2025 14:16:25 +0200 Subject: [PATCH 11/11] Address CodeRabbit concerns --- packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 2e4c34a7..035efe83 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -525,12 +525,10 @@ export abstract class AbstractPubSubConsumer< } } - // All retries exhausted - const error = new Error( + // All retries exhausted - throw error to be handled by caller's catch block + throw new Error( `Failed to reinitialize subscription ${this.subscriptionName} after ${this.subscriptionRetryOptions.maxRetries} attempts`, ) - this.handleError(error) - throw error } finally { this.isReinitializing = false }