diff --git a/packages/gcp-pubsub/README.md b/packages/gcp-pubsub/README.md index 644a4f2a..982325da 100644 --- a/packages/gcp-pubsub/README.md +++ b/packages/gcp-pubsub/README.md @@ -33,6 +33,7 @@ Google Cloud Pub/Sub implementation for the message-queue-toolkit. Provides a ro - [Consumer Flow Control](#consumer-flow-control) - [Multiple Message Types](#multiple-message-types) - [Error Handling](#error-handling) + - [Subscription-Level Error Handling](#subscription-level-error-handling) - [Testing](#testing) - [TestPubSubPublisher](#testpubsubpublisher) - [Integration Tests with Emulator](#integration-tests-with-emulator) @@ -1490,6 +1491,116 @@ When a message cannot be processed (invalid format, schema validation failure, h **Best Practice:** Always configure a DLQ in production to capture and analyze failed messages. +### Subscription-Level Error Handling + +The consumer provides a higher-level error recovery mechanism that complements the SDK's built-in gRPC retry logic. While the `@google-cloud/pubsub` SDK automatically retries some transient errors at the gRPC level, there are scenarios where the SDK does not recover automatically: + +1. **Eventual consistency errors** (`NOT_FOUND`, `PERMISSION_DENIED`) are not in the SDK's default retry codes +2. **Subscription stream disconnections** may not automatically reconnect in all cases +3. **Infrastructure changes** (e.g., after Terraform deployments) may require full subscription reinitialization + +#### Initialization Retry + +When calling `start()`, the consumer will automatically retry initialization if it encounters retryable errors. This is particularly useful when: + +- Using `locatorConfig` and the subscription doesn't exist yet due to eventual consistency +- Services start in parallel and the subscription is being created by another process +- Terraform deployments are still propagating + +The retry logic handles errors containing: +- `does not exist` - Resource not yet visible +- `NOT_FOUND` - gRPC error code 5 +- `PERMISSION_DENIED` - gRPC error code 7 (IAM propagation delay) + +#### Runtime Reconnection + +**Retryable Error Codes:** + +*Errors the consumer handles via reinitialization during runtime:* +- `DEADLINE_EXCEEDED` (4): Request timeout that SDK retry couldn't resolve +- `NOT_FOUND` (5): Subscription may not be propagated yet (eventual consistency) +- `PERMISSION_DENIED` (7): IAM permissions may not be propagated yet (eventual consistency) +- `RESOURCE_EXHAUSTED` (8): Quota exceeded, retry with backoff +- `INTERNAL` (13): Server error, should be transient +- `UNAVAILABLE` (14): Service temporarily unable to process + +When these errors reach the `subscription.on('error')` handler (meaning SDK's built-in retry couldn't resolve them), the consumer will: +1. Log a warning with error details +2. Close the existing subscription and remove event listeners +3. Reinitialize the subscription with exponential backoff +4. Reattach event handlers and continue consuming + +**Why `NOT_FOUND` and `PERMISSION_DENIED`?** + +After Terraform deployments, GCP resources and IAM permissions can take several minutes to propagate across GCP's distributed infrastructure. During this window, the subscription may report these errors even though the configuration is correct. The consumer retries with exponential backoff to handle this eventual consistency. + +**Note:** For most transient errors, the SDK's built-in retry will handle recovery automatically. The consumer's reinitialization logic is a safety net for cases where SDK retry is exhausted or not applicable. + +**Configuration:** + +The same retry options apply to both initialization and runtime reconnection: + +```typescript +class MyConsumer extends AbstractPubSubConsumer { + constructor(dependencies: PubSubConsumerDependencies) { + super( + dependencies, + { + // ... other options ... + + // Optional: Configure retry behavior for both init and runtime errors + subscriptionRetryOptions: { + maxRetries: 5, // Maximum retry attempts (default: 5) + baseRetryDelayMs: 1000, // Base delay for exponential backoff (default: 1000ms) + maxRetryDelayMs: 30000, // Maximum delay between retries (default: 30000ms) + }, + }, + executionContext, + ) + } +} +``` + +**Exponential Backoff Formula:** +```text +delay = min(baseRetryDelayMs * 2^(attempt-1), maxRetryDelayMs) +``` + +With default settings, delays are: 1s, 2s, 4s, 8s, 16s (capped at 30s). + +**Unexpected Subscription Closure:** + +The consumer also handles unexpected subscription closures (e.g., network issues, GCP service restarts). If the subscription closes while the consumer is still supposed to be consuming, it will automatically attempt reinitialization. + +**Health Check Integration:** + +If all retry attempts are exhausted, the consumer enters a failed state. You can detect this via the `fatalError` getter for health check integration: + +```typescript +// In your health check endpoint +app.get('/health', (req, res) => { + const error = consumer.fatalError + if (error) { + return res.status(503).json({ + status: 'unhealthy', + error: error.message, + }) + } + return res.status(200).json({ status: 'healthy' }) +}) +``` + +The `fatalError` property returns: +- `null` when the consumer is healthy +- `Error` when the consumer has permanently failed (e.g., after exhausting all retry attempts) + +This allows your application to properly report unhealthy status to orchestration systems (Kubernetes, etc.) and trigger appropriate remediation (pod restart, alerting, etc.). + +**References:** +- [GCP Pub/Sub Error Codes](https://cloud.google.com/pubsub/docs/reference/error-codes) +- [GCP Pub/Sub Troubleshooting](https://cloud.google.com/pubsub/docs/troubleshooting) +- [Node.js Pub/Sub Subscription Reconnection Issues](https://github.com/googleapis/nodejs-pubsub/issues/979) + ### Error Resolver ```typescript @@ -1775,6 +1886,7 @@ it('publishes message', async () => { - `deadLetterQueue`: DLQ configuration - `maxRetryDuration`: Max retry time in seconds - `consumerOverrides`: Flow control settings +- `subscriptionRetryOptions`: Retry configuration for subscription errors (see [Subscription-Level Error Handling](#subscription-level-error-handling)) **Methods:** - `init()`: Initialize consumer (create/locate resources) @@ -1782,6 +1894,9 @@ it('publishes message', async () => { - `close()`: Stop consumer and close connections - `handlerSpy`: Access spy for testing +**Properties:** +- `fatalError`: Returns `Error` if consumer has permanently failed, `null` otherwise (for health checks) + ## Best Practices 1. **Use message ordering** for related events (same user, same entity) diff --git a/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts new file mode 100644 index 00000000..2eb83afa --- /dev/null +++ b/packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts @@ -0,0 +1,24 @@ +export class SubscriptionDoesNotExistError extends Error { + public readonly subscriptionName: string + + constructor(subscriptionName: string) { + super(`Subscription ${subscriptionName} does not exist`) + this.name = 'SubscriptionDoesNotExistError' + this.subscriptionName = subscriptionName + } +} + +export function isSubscriptionDoesNotExistError( + error: unknown, +): error is SubscriptionDoesNotExistError { + return ( + typeof error === 'object' && + error !== null && + 'name' in error && + 'message' in error && + 'subscriptionName' in error && + error.name === 'SubscriptionDoesNotExistError' && + typeof error.message === 'string' && + typeof error.subscriptionName === 'string' + ) +} diff --git a/packages/gcp-pubsub/lib/errors/grpcErrors.ts b/packages/gcp-pubsub/lib/errors/grpcErrors.ts new file mode 100644 index 00000000..3db41410 --- /dev/null +++ b/packages/gcp-pubsub/lib/errors/grpcErrors.ts @@ -0,0 +1,85 @@ +import { status as GrpcStatus } from '@grpc/grpc-js' + +/** + * gRPC status codes for which subscription operations should be retried. + * + * Includes both: + * 1. GCP-documented retryable errors (DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, INTERNAL, UNAVAILABLE) + * 2. Eventual consistency errors common after Terraform deployments (NOT_FOUND, PERMISSION_DENIED) + * + * **Why PERMISSION_DENIED is included:** + * After Terraform deployments, IAM permissions can take several minutes to propagate across + * GCP's distributed infrastructure. During this window, the subscription may report + * PERMISSION_DENIED even though permissions are correctly configured. + * + * **Why NOT_FOUND is included:** + * Similar to PERMISSION_DENIED, newly created subscriptions may not be immediately visible + * across all GCP endpoints due to eventual consistency. + * + * @see https://cloud.google.com/pubsub/docs/reference/error-codes + * @see https://github.com/googleapis/nodejs-pubsub/issues/979 + */ +export const RETRYABLE_GRPC_STATUS_CODES = [ + GrpcStatus.DEADLINE_EXCEEDED, + GrpcStatus.NOT_FOUND, + GrpcStatus.PERMISSION_DENIED, + GrpcStatus.RESOURCE_EXHAUSTED, + GrpcStatus.INTERNAL, + GrpcStatus.UNAVAILABLE, +] as const + +export type RetryableGrpcStatusCode = (typeof RETRYABLE_GRPC_STATUS_CODES)[number] + +const RETRYABLE_CODES_SET = new Set(RETRYABLE_GRPC_STATUS_CODES) + +/** + * Type for errors with a numeric gRPC status code and message. + */ +export type GrpcError = { + code: number + message: string +} + +/** + * Checks if an error has gRPC error properties (code and message). + * Uses duck typing to avoid fragile instanceof checks. + * + * @param error - The error to check + * @returns true if the error has numeric `code` and string `message` properties + */ +export function isGrpcError(error: unknown): error is GrpcError { + return ( + typeof error === 'object' && + error !== null && + 'code' in error && + 'message' in error && + typeof (error as GrpcError).code === 'number' && + typeof (error as GrpcError).message === 'string' + ) +} + +/** + * Checks if an error is a gRPC error with a retryable status code. + * + * @param error - The error to check + * @returns true if the error has a retryable gRPC status code + */ +export function isRetryableGrpcError(error: unknown): error is GrpcError { + return isGrpcError(error) && RETRYABLE_CODES_SET.has(error.code) +} + +/** + * Gets the gRPC status code from an error if it has one. + * + * @param error - The error to extract the code from + * @returns The gRPC status code, or undefined if not a gRPC error + */ +export function getGrpcStatusCode(error: unknown): number | undefined { + if (isGrpcError(error)) { + return error.code + } + return undefined +} + +// Re-export for convenience +export { GrpcStatus } diff --git a/packages/gcp-pubsub/lib/index.ts b/packages/gcp-pubsub/lib/index.ts index d7ed559d..a6dbd4a6 100644 --- a/packages/gcp-pubsub/lib/index.ts +++ b/packages/gcp-pubsub/lib/index.ts @@ -1,4 +1,6 @@ +export * from './errors/grpcErrors.ts' export * from './errors/PubSubConsumerErrorResolver.ts' +export * from './errors/SubscriptionDoesNotExistError.ts' export * from './fakes/FakeConsumerErrorResolver.ts' export * from './fakes/TestPubSubPublisher.ts' export * from './pubsub/AbstractPubSubConsumer.ts' @@ -9,4 +11,3 @@ export * from './pubsub/PubSubPublisherManager.ts' export * from './schemas/pubSubSchemas.ts' export * from './types/MessageTypes.ts' export * from './utils/pubSubInitter.ts' -export * from './utils/pubSubUtils.ts' diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 222804e7..035efe83 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -1,4 +1,4 @@ -import type { Either, ErrorResolver } from '@lokalise/node-core' +import { type Either, type ErrorResolver, isError } from '@lokalise/node-core' import type { MessageInvalidFormatError, MessageValidationError, @@ -18,6 +18,8 @@ import { type QueueConsumerOptions, type TransactionObservabilityManager, } from '@message-queue-toolkit/core' +import { isRetryableGrpcError } from '../errors/grpcErrors.ts' +import { isSubscriptionDoesNotExistError } from '../errors/SubscriptionDoesNotExistError.ts' import type { PubSubMessage } from '../types/MessageTypes.ts' import { hasOffloadedPayload } from '../utils/messageUtils.ts' import { deletePubSub, initPubSub } from '../utils/pubSubInitter.ts' @@ -29,11 +31,41 @@ import type { } from './AbstractPubSubService.ts' import { AbstractPubSubService } from './AbstractPubSubService.ts' -const _ABORT_EARLY_EITHER: Either<'abort', never> = { - error: 'abort', -} const DEFAULT_MAX_RETRY_DURATION = 4 * 24 * 60 * 60 // 4 days in seconds +/** + * Default configuration for subscription error retry behavior. + */ +const DEFAULT_SUBSCRIPTION_RETRY_OPTIONS = { + maxRetries: 5, + baseRetryDelayMs: 1000, + maxRetryDelayMs: 30000, +} as const + +/** + * Configuration options for subscription-level error retry behavior. + * This handles transient errors that occur at the subscription level + * (e.g., NOT_FOUND, PERMISSION_DENIED after Terraform deployments). + */ +export type SubscriptionRetryOptions = { + /** + * Maximum number of retry attempts before giving up. + * @default 5 + */ + maxRetries?: number + /** + * Base delay in milliseconds for exponential backoff. + * Actual delay = min(baseRetryDelayMs * 2^attempt, maxRetryDelayMs) + * @default 1000 + */ + baseRetryDelayMs?: number + /** + * Maximum delay in milliseconds between retries. + * @default 30000 + */ + maxRetryDelayMs?: number +} + export type PubSubDeadLetterQueueOptions = { deadLetterPolicy: { maxDeliveryAttempts: number @@ -76,6 +108,12 @@ export type PubSubConsumerOptions< maxMilliseconds?: number } } + /** + * Configuration for subscription-level error retry behavior. + * Handles transient errors like NOT_FOUND and PERMISSION_DENIED + * that can occur after Terraform deployments due to GCP's eventual consistency. + */ + subscriptionRetryOptions?: SubscriptionRetryOptions } export abstract class AbstractPubSubConsumer< @@ -118,8 +156,11 @@ export abstract class AbstractPubSubConsumer< > private readonly deadLetterQueueOptions?: PubSubDeadLetterQueueOptions private readonly isDeduplicationEnabled: boolean + private readonly subscriptionRetryOptions: Required private maxRetryDuration: number private isConsuming = false + private isReinitializing = false + private _fatalError: Error | null = null protected readonly errorResolver: ErrorResolver protected readonly executionContext: ExecutionContext @@ -127,6 +168,25 @@ export abstract class AbstractPubSubConsumer< public dlqTopicName?: string public readonly _messageSchemaContainer: MessageSchemaContainer + /** + * Returns the fatal error that caused the consumer to stop, or null if healthy. + * Use this in health checks to detect permanent subscription failures. + * + * @example + * ```typescript + * app.get('/health', (req, res) => { + * const error = consumer.fatalError + * if (error) { + * return res.status(503).json({ status: 'unhealthy', error: error.message }) + * } + * return res.status(200).json({ status: 'healthy' }) + * }) + * ``` + */ + public get fatalError(): Error | null { + return this._fatalError + } + protected constructor( dependencies: PubSubConsumerDependencies, options: ConsumerOptionsType, @@ -140,6 +200,17 @@ export abstract class AbstractPubSubConsumer< this.maxRetryDuration = options.maxRetryDuration ?? DEFAULT_MAX_RETRY_DURATION this.executionContext = executionContext this.isDeduplicationEnabled = !!options.enableConsumerDeduplication + this.subscriptionRetryOptions = { + maxRetries: + options.subscriptionRetryOptions?.maxRetries ?? + DEFAULT_SUBSCRIPTION_RETRY_OPTIONS.maxRetries, + baseRetryDelayMs: + options.subscriptionRetryOptions?.baseRetryDelayMs ?? + DEFAULT_SUBSCRIPTION_RETRY_OPTIONS.baseRetryDelayMs, + maxRetryDelayMs: + options.subscriptionRetryOptions?.maxRetryDelayMs ?? + DEFAULT_SUBSCRIPTION_RETRY_OPTIONS.maxRetryDelayMs, + } this._messageSchemaContainer = this.resolveConsumerMessageSchemaContainer(options) this.handlerContainer = new HandlerContainer({ @@ -175,7 +246,7 @@ export abstract class AbstractPubSubConsumer< return } - await this.init() + await this.initWithRetry() if (!this.subscription) { throw new Error('Subscription not initialized after init()') @@ -186,14 +257,86 @@ export abstract class AbstractPubSubConsumer< this.isConsuming = true + this.setupSubscriptionEventHandlers() + } + + /** + * Initializes the consumer with retry logic for transient errors. + * + * This handles eventual consistency issues that can occur after Terraform deployments + * where topics/subscriptions may not be immediately visible across all GCP endpoints. + * + * @param attempt - Current retry attempt number (1-based) + */ + private async initWithRetry(attempt = 1): Promise { + try { + await this.init() + } catch (error) { + // Check if we should retry + if (attempt >= this.subscriptionRetryOptions.maxRetries) { + this.logger.error({ + msg: `Failed to initialize subscription after ${attempt} attempts`, + subscriptionName: + this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, + topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, + error, + }) + throw error + } + + // Check if error is retryable using gRPC status codes + const isRetryableSubscriptionError = isSubscriptionDoesNotExistError(error) + const isRetryableGrpc = isRetryableGrpcError(error) + + if (!isRetryableSubscriptionError && !isRetryableGrpc) { + throw error + } + + // Calculate delay with exponential backoff + const delay = Math.min( + this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), + this.subscriptionRetryOptions.maxRetryDelayMs, + ) + + this.logger.warn({ + msg: `Retryable error during initialization, attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, + subscriptionName: + this.locatorConfig?.subscriptionName ?? this.creationConfig?.subscription?.name, + topicName: this.locatorConfig?.topicName ?? this.creationConfig?.topic.name, + errorCode: isRetryableGrpc ? error.code : undefined, + errorMessage: + isRetryableGrpc || isRetryableSubscriptionError ? error.message : String(error), + attempt, + delayMs: delay, + }) + + await new Promise((resolve) => setTimeout(resolve, delay)) + await this.initWithRetry(attempt + 1) + } + } + + /** + * Sets up event handlers for the subscription. + * Extracted to allow reattachment after reinitialization. + */ + private setupSubscriptionEventHandlers(): void { + if (!this.subscription) { + return + } + // Configure message handler this.subscription.on('message', async (message: PubSubMessage) => { await this.handleMessage(message) }) - // Configure error handler - this.subscription.on('error', (error) => { - this.handleError(error) + // Configure error handler with retry logic for transient errors + this.subscription.on('error', (error: Error & { code?: number }) => { + this.handleSubscriptionError(error) + }) + + // Configure close handler to detect unexpected disconnections + this.subscription.on('close', () => { + this.handleSubscriptionClose() }) // Configure flow control if provided @@ -206,6 +349,191 @@ export abstract class AbstractPubSubConsumer< } } + /** + * Handles subscription-level errors. + * + * For retryable errors (NOT_FOUND, PERMISSION_DENIED), attempts to reinitialize + * the subscription with exponential backoff. These errors commonly occur after + * Terraform deployments due to GCP's eventual consistency. + * + * @see https://cloud.google.com/pubsub/docs/reference/error-codes + */ + private handleSubscriptionError(error: Error & { code?: number }): void { + // Don't handle errors during shutdown + if (!this.isConsuming) { + return + } + + // Check if this is a retryable subscription error using gRPC status codes + if (isRetryableGrpcError(error)) { + this.logger.warn({ + msg: 'Retryable subscription error occurred, attempting to reinitialize', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + errorCode: error.code, + errorMessage: error.message, + }) + + // Trigger reinitialization with retry + this.reinitializeWithRetry(1).catch((reinitError) => { + // Mark consumer as failed - this will be visible via fatalError getter for health checks + this._fatalError = isError(reinitError) ? reinitError : new Error(String(reinitError)) + this.isConsuming = false + this.handleError(reinitError) + }) + } else { + // Non-retryable error - log and report + this.handleError(error) + } + } + + /** + * Handles unexpected subscription close events. + * + * If the subscription closes while we're still supposed to be consuming, + * attempts to reinitialize. This can happen due to: + * - Network issues + * - GCP service restarts + * - Subscription deletion/recreation + */ + private handleSubscriptionClose(): void { + this.logger.info({ + msg: 'PubSub subscription closed', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + isConsuming: this.isConsuming, + }) + + // If we're still supposed to be consuming, try to reinitialize + if (this.isConsuming && !this.isReinitializing) { + this.logger.warn({ + msg: 'Subscription closed unexpectedly while consuming, attempting to reinitialize', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + }) + + this.reinitializeWithRetry(1).catch((reinitError) => { + // Mark consumer as failed - this will be visible via fatalError getter for health checks + this._fatalError = isError(reinitError) ? reinitError : new Error(String(reinitError)) + this.isConsuming = false + this.handleError(reinitError) + }) + } + } + + /** + * Reinitializes the subscription with exponential backoff retry. + * + * This method: + * 1. Closes the existing subscription (if any) + * 2. Waits with exponential backoff + * 3. Reinitializes the subscription + * 4. Reattaches event handlers + * + * Uses an iterative loop to keep isReinitializing true for the entire + * retry sequence, preventing concurrent callers from starting their own + * reinitialization attempts. + * + * @param startAttempt - Starting retry attempt number (1-based) + */ + private async reinitializeWithRetry(startAttempt: number): Promise { + // Prevent concurrent reinitializations + if (this.isReinitializing) { + this.logger.debug({ + msg: 'Reinitialization already in progress, skipping', + subscriptionName: this.subscriptionName, + }) + return + } + + this.isReinitializing = true + + try { + for ( + let attempt = startAttempt; + attempt <= this.subscriptionRetryOptions.maxRetries; + attempt++ + ) { + // Calculate delay with exponential backoff + const delay = Math.min( + this.subscriptionRetryOptions.baseRetryDelayMs * Math.pow(2, attempt - 1), + this.subscriptionRetryOptions.maxRetryDelayMs, + ) + + this.logger.info({ + msg: `Reinitialization attempt ${attempt}/${this.subscriptionRetryOptions.maxRetries}, waiting ${delay}ms`, + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + delayMs: delay, + }) + + // Wait before retry + await new Promise((resolve) => setTimeout(resolve, delay)) + + // Don't continue if we've been stopped during the wait + if (!this.isConsuming) { + this.logger.info({ + msg: 'Consumer stopped during reinitialization wait, aborting', + subscriptionName: this.subscriptionName, + }) + return + } + + try { + // Close existing subscription to remove old event handlers + if (this.subscription) { + try { + this.subscription.removeAllListeners() + await this.subscription.close() + } catch { + // Ignore close errors - subscription may already be closed + } + } + + // Reinitialize + await this.init() + + if (!this.subscription) { + throw new Error('Subscription not initialized after init()') + } + + // Wait for subscription to be ready + await this.waitForSubscriptionReady() + + // Reattach event handlers + this.setupSubscriptionEventHandlers() + + this.logger.info({ + msg: 'Successfully reinitialized subscription', + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + }) + + // Success - exit the retry loop + return + } catch (error) { + this.logger.warn({ + msg: `Reinitialization attempt ${attempt} failed, will retry`, + subscriptionName: this.subscriptionName, + topicName: this.topicName, + attempt, + error, + }) + // Continue to next iteration + } + } + + // All retries exhausted - throw error to be handled by caller's catch block + throw new Error( + `Failed to reinitialize subscription ${this.subscriptionName} after ${this.subscriptionRetryOptions.maxRetries} attempts`, + ) + } finally { + this.isReinitializing = false + } + } + private async waitForSubscriptionReady(maxAttempts = 100, delayMs = 20): Promise { if (!this.subscription) { throw new Error('Subscription not initialized') @@ -227,6 +555,8 @@ export abstract class AbstractPubSubConsumer< public override async close(): Promise { this.isConsuming = false if (this.subscription) { + // Remove listeners first to prevent close handler from triggering reinitialization + this.subscription.removeAllListeners() await this.subscription.close() } await super.close() @@ -383,6 +713,7 @@ export abstract class AbstractPubSubConsumer< } // Success + await this.deduplicateMessage(validatedMessage, DeduplicationRequesterEnum.Consumer) this.handleMessageProcessed({ message: validatedMessage, processingResult: { status: 'consumed' }, diff --git a/packages/gcp-pubsub/lib/utils/pubSubInitter.ts b/packages/gcp-pubsub/lib/utils/pubSubInitter.ts index 6bdab4fa..fb7ad1e7 100644 --- a/packages/gcp-pubsub/lib/utils/pubSubInitter.ts +++ b/packages/gcp-pubsub/lib/utils/pubSubInitter.ts @@ -1,6 +1,7 @@ import type { PubSub, Subscription, Topic } from '@google-cloud/pubsub' import type { DeletionConfig } from '@message-queue-toolkit/core' import { isProduction, waitAndRetry } from '@message-queue-toolkit/core' +import { SubscriptionDoesNotExistError } from '../errors/SubscriptionDoesNotExistError.ts' import type { PubSubCreationConfig, PubSubQueueLocatorType, @@ -90,7 +91,7 @@ export async function initPubSub( const [subscriptionExists] = await subscription.exists() if (!subscriptionExists) { - throw new Error(`Subscription ${subscriptionName} does not exist`) + throw new SubscriptionDoesNotExistError(subscriptionName) } } } else if (creationConfig) { diff --git a/packages/gcp-pubsub/lib/utils/pubSubUtils.ts b/packages/gcp-pubsub/lib/utils/pubSubUtils.ts deleted file mode 100644 index 99840ad9..00000000 --- a/packages/gcp-pubsub/lib/utils/pubSubUtils.ts +++ /dev/null @@ -1,18 +0,0 @@ -import type { PubSub } from '@google-cloud/pubsub' - -export type PubSubConfig = { - projectId: string - emulatorHost?: string -} - -export function createPubSubClient(config: PubSubConfig): PubSub { - const { PubSub } = require('@google-cloud/pubsub') - - if (config.emulatorHost) { - process.env.PUBSUB_EMULATOR_HOST = config.emulatorHost - } - - return new PubSub({ - projectId: config.projectId, - }) -} diff --git a/packages/gcp-pubsub/package.json b/packages/gcp-pubsub/package.json index 5b42a4fd..f06a3d60 100644 --- a/packages/gcp-pubsub/package.json +++ b/packages/gcp-pubsub/package.json @@ -27,6 +27,7 @@ "prepublishOnly": "npm run lint && npm run build" }, "dependencies": { + "@grpc/grpc-js": "^1.14.2", "@lokalise/node-core": "^14.6.1" }, "peerDependencies": { diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts new file mode 100644 index 00000000..50cc00f2 --- /dev/null +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.deduplication.spec.ts @@ -0,0 +1,261 @@ +import { randomUUID } from 'node:crypto' +import type { PubSub } from '@google-cloud/pubsub' +import { + AcquireLockTimeoutError, + type MessageDeduplicationConfig, +} from '@message-queue-toolkit/core' +import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store' +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import type { Redis } from 'ioredis' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import { PubSubPermissionPublisher } from '../publishers/PubSubPermissionPublisher.ts' +import { deletePubSubTopicAndSubscription } from '../utils/cleanupPubSub.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { PubSubPermissionConsumer } from './PubSubPermissionConsumer.ts' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from './userConsumerSchemas.ts' + +describe('PubSubPermissionConsumer - Deduplication', () => { + const TOPIC_NAME = 'user_permissions_dedup_test' + const SUBSCRIPTION_NAME = 'user_permissions_dedup_test_sub' + + let diContainer: AwilixContainer + let pubSubClient: PubSub + let redis: Redis + let messageDeduplicationStore: RedisMessageDeduplicationStore + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + redis = diContainer.cradle.redis + messageDeduplicationStore = new RedisMessageDeduplicationStore({ + redis, + }) + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + async function cleanRedis() { + await redis.flushall() + } + + describe('consumer deduplication', () => { + let consumer: PubSubPermissionConsumer + let publisher: PubSubPermissionPublisher + let messageDeduplicationConfig: MessageDeduplicationConfig + + beforeEach(async () => { + messageDeduplicationConfig = { + deduplicationStore: messageDeduplicationStore, + } + + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + enableConsumerDeduplication: true, + messageDeduplicationConfig, + // Use 'id' field as deduplication ID since our schema doesn't have deduplicationId + messageDeduplicationIdField: 'id', + }) + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + await cleanRedis() + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await consumer.close() + await publisher.close() + await cleanRedis() + }) + + it('processes message normally when deduplication is enabled', async () => { + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: randomUUID(), + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(message.id, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + }) + + it('skips duplicate message when already processed', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + // First publish + await publisher.publish(message) + + // Message is successfully processed during the first consumption + const firstConsumptionResult = await consumer.handlerSpy.waitForMessageWithId(messageId) + expect(firstConsumptionResult.processingResult).toEqual({ status: 'consumed' }) + expect(consumer.addCounter).toBe(1) + + // Clear the spy, so we can check subsequent call + consumer.handlerSpy.clear() + + // Publish again - should be skipped as duplicate + await publisher.publish(message) + + // Message is not processed due to deduplication + const secondConsumptionResult = await consumer.handlerSpy.waitForMessageWithId(messageId) + expect(secondConsumptionResult.processingResult).toEqual({ + status: 'consumed', + skippedAsDuplicate: true, + }) + + // Handler should only have been called once + expect(consumer.addCounter).toBe(1) + }) + }) + + describe('lock acquisition', () => { + let consumer: PubSubPermissionConsumer + let publisher: PubSubPermissionPublisher + let messageDeduplicationConfig: MessageDeduplicationConfig + + beforeEach(async () => { + messageDeduplicationConfig = { + deduplicationStore: messageDeduplicationStore, + } + + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + enableConsumerDeduplication: true, + messageDeduplicationConfig, + // Use 'id' field as deduplication ID since our schema doesn't have deduplicationId + messageDeduplicationIdField: 'id', + }) + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + await cleanRedis() + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + vi.restoreAllMocks() + await consumer.close() + await publisher.close() + await cleanRedis() + }) + + it('acquires and releases lock during message processing', { timeout: 15000 }, async () => { + const messageId = randomUUID() + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + + // After successful processing, the lock should be released + // but the deduplication key should still exist + const deduplicationKeyExists = await messageDeduplicationStore.keyExists( + `consumer:${messageId}`, + ) + expect(deduplicationKeyExists).toBe(true) + }) + + it('nacks message when lock acquisition times out', { timeout: 15000 }, async () => { + const messageId = randomUUID() + + // Mock acquireLock to simulate timeout (another consumer holds the lock) + // Only AcquireLockTimeoutError causes retry - regular errors are swallowed and message is processed + vi.spyOn(messageDeduplicationStore, 'acquireLock').mockResolvedValue({ + error: new AcquireLockTimeoutError('Lock acquisition timeout'), + }) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + // Wait a bit for processing attempts - message should be nacked and redelivered + await new Promise((resolve) => setTimeout(resolve, 2000)) + + // Handler should not have been called because lock couldn't be acquired + expect(consumer.addCounter).toBe(0) + + // Restore the mock so the message can be processed + vi.restoreAllMocks() + + // Now the message should be processed on redelivery + const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + }) + + it( + 'processes message when lock acquisition has non-timeout error', + { timeout: 15000 }, + async () => { + const messageId = randomUUID() + + // Mock acquireLock to simulate a non-timeout error (e.g., Redis connection error) + // Non-timeout errors are swallowed and message is processed normally + vi.spyOn(messageDeduplicationStore, 'acquireLock').mockResolvedValue({ + error: new Error('Redis connection error'), + }) + + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + // Message should be processed even though lock acquisition failed + const result = await consumer.handlerSpy.waitForMessageWithId(messageId, 'consumed') + expect(result.processingResult.status).toBe('consumed') + expect(consumer.addCounter).toBe(1) + }, + ) + }) +}) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts index 1b5ea296..3905edf9 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.payloadOffloading.spec.ts @@ -1,11 +1,12 @@ +import type { PubSub } from '@google-cloud/pubsub' import type { Storage } from '@google-cloud/storage' import type { PayloadStoreConfig } from '@message-queue-toolkit/core' import { GCSPayloadStore } from '@message-queue-toolkit/gcs-payload-store' import type { AwilixContainer } from 'awilix' import { asValue } from 'awilix' import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' - import { PUBSUB_MESSAGE_MAX_SIZE } from '../../lib/pubsub/AbstractPubSubService.ts' +import { OFFLOADED_PAYLOAD_SIZE_ATTRIBUTE } from '../../lib/utils/messageUtils.ts' import { PubSubPermissionPublisher } from '../publishers/PubSubPermissionPublisher.ts' import { deletePubSubTopicAndSubscription } from '../utils/cleanupPubSub.ts' import { assertBucket, emptyBucket } from '../utils/gcsUtils.ts' @@ -208,4 +209,90 @@ describe('PubSubPermissionConsumer - Payload Offloading', () => { }, ) }) + + describe('payload retrieval errors', () => { + const largeMessageSizeThreshold = PUBSUB_MESSAGE_MAX_SIZE + const gcsBucketName = 'test-bucket' + + let diContainer: AwilixContainer + let pubSubClient: PubSub + let gcsStorage: Storage + let payloadStoreConfig: PayloadStoreConfig + let consumer: PubSubPermissionConsumer + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + gcsStorage = diContainer.cradle.gcsStorage + pubSubClient = diContainer.cradle.pubSubClient + + await assertBucket(gcsStorage, gcsBucketName) + payloadStoreConfig = { + messageSizeThreshold: largeMessageSizeThreshold, + store: new GCSPayloadStore(diContainer.cradle, { bucketName: gcsBucketName }), + storeName: 'gcs', + } + }) + + beforeEach(async () => { + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: PubSubPermissionConsumer.TOPIC_NAME }, + subscription: { name: PubSubPermissionConsumer.SUBSCRIPTION_NAME }, + }, + payloadStoreConfig, + }) + + await deletePubSubTopicAndSubscription( + pubSubClient, + PubSubPermissionConsumer.TOPIC_NAME, + PubSubPermissionConsumer.SUBSCRIPTION_NAME, + ) + + await consumer.start() + }) + + afterEach(async () => { + await consumer.close() + }) + + afterAll(async () => { + await emptyBucket(gcsStorage, gcsBucketName) + + const { awilixManager } = diContainer.cradle + await awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('handles error when offloaded payload cannot be retrieved', { timeout: 10000 }, async () => { + // Create a message that looks like it has offloaded payload but the payload doesn't exist + const topic = pubSubClient.topic(PubSubPermissionConsumer.TOPIC_NAME) + + const messageWithFakeOffload = { + id: 'fake-offload-1', + messageType: 'add', + timestamp: new Date().toISOString(), + // Reference to a non-existent GCS object + _payloadKey: 'non-existent-key-12345', + } + + // Publish with the offload attribute to trigger retrieval + await topic.publishMessage({ + data: Buffer.from(JSON.stringify(messageWithFakeOffload)), + attributes: { + [OFFLOADED_PAYLOAD_SIZE_ATTRIBUTE]: '12345', + }, + }) + + // Wait for the error to be tracked + const spyResult = await consumer.handlerSpy.waitForMessage({ id: 'fake-offload-1' }, 'error') + + expect(spyResult.processingResult.status).toBe('error') + // @ts-expect-error field exists + expect(spyResult.processingResult.errorReason).toBe('invalidMessage') + expect(consumer.addCounter).toBe(0) + }) + }) }) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts index dd6de237..4be81672 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.spec.ts @@ -263,4 +263,62 @@ describe('PubSubPermissionConsumer', () => { }) }) }) + + describe('logMessages option', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('logs messages when logMessages is enabled', { timeout: 10000 }, async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: 'log_messages_test' }, + subscription: { name: 'log_messages_test_sub' }, + }, + logMessages: true, + }) + const publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: 'log_messages_test' }, + }, + }) + + await deletePubSubTopicAndSubscription( + pubSubClient, + 'log_messages_test', + 'log_messages_test_sub', + ) + await consumer.start() + await publisher.init() + + const message = { + id: 'log-test-1', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + await consumer.handlerSpy.waitForMessageWithId('log-test-1', 'consumed') + + // The message should be logged (we can't easily verify log output, but we verify + // that the code path with logMessages=true doesn't crash) + expect(consumer.addCounter).toBe(1) + + await consumer.close() + await publisher.close() + }) + }) }) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts new file mode 100644 index 00000000..f3839190 --- /dev/null +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.subscriptionRetry.spec.ts @@ -0,0 +1,441 @@ +import { setTimeout } from 'node:timers/promises' + +import type { PubSub } from '@google-cloud/pubsub' +import type { AwilixContainer } from 'awilix' +import { asValue } from 'awilix' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from 'vitest' +import { PubSubPermissionPublisher } from '../publishers/PubSubPermissionPublisher.ts' +import { + deletePubSubSubscription, + deletePubSubTopicAndSubscription, +} from '../utils/cleanupPubSub.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { PubSubPermissionConsumer } from './PubSubPermissionConsumer.ts' + +describe('PubSubPermissionConsumer - Subscription Retry', () => { + const TOPIC_NAME = 'user_permissions_retry_test' + const SUBSCRIPTION_NAME = 'user_permissions_retry_test_sub' + + describe('subscriptionRetryOptions configuration', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + afterEach(async () => { + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + }) + + it('uses default retry options when not specified', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + + // Access private field via type assertion for testing + // @ts-expect-error - accessing private field for testing + const retryOptions = consumer.subscriptionRetryOptions + + expect(retryOptions).toEqual({ + maxRetries: 5, + baseRetryDelayMs: 1000, + maxRetryDelayMs: 30000, + }) + + await consumer.close() + }) + + it('accepts custom retry options', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 10, + baseRetryDelayMs: 500, + maxRetryDelayMs: 60000, + }, + }) + + // @ts-expect-error - accessing private field for testing + const retryOptions = consumer.subscriptionRetryOptions + + expect(retryOptions).toEqual({ + maxRetries: 10, + baseRetryDelayMs: 500, + maxRetryDelayMs: 60000, + }) + + await consumer.close() + }) + + it('merges partial retry options with defaults', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 3, + // baseRetryDelayMs and maxRetryDelayMs should use defaults + }, + }) + + // @ts-expect-error - accessing private field for testing + const retryOptions = consumer.subscriptionRetryOptions + + expect(retryOptions).toEqual({ + maxRetries: 3, + baseRetryDelayMs: 1000, + maxRetryDelayMs: 30000, + }) + + await consumer.close() + }) + }) + + describe('close behavior', () => { + let diContainer: AwilixContainer + let consumer: PubSubPermissionConsumer + let publisher: PubSubPermissionPublisher + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + beforeEach(async () => { + consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + await consumer.start() + await publisher.init() + }) + + afterEach(async () => { + await consumer.close() + await publisher.close() + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + it('sets isConsuming to false on close', async () => { + // Verify consumer is running + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(true) + + await consumer.close() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(false) + }) + + it('removes all listeners on close to prevent reconnection', async () => { + // Get the subscription reference before close + // @ts-expect-error - accessing protected field for testing + const subscription = consumer.subscription + + // Verify subscription exists and has listeners + expect(subscription).toBeDefined() + expect(subscription?.listenerCount('message')).toBeGreaterThan(0) + expect(subscription?.listenerCount('error')).toBeGreaterThan(0) + expect(subscription?.listenerCount('close')).toBeGreaterThan(0) + + await consumer.close() + + // After close, listeners should be removed + expect(subscription?.listenerCount('message')).toBe(0) + expect(subscription?.listenerCount('error')).toBe(0) + expect(subscription?.listenerCount('close')).toBe(0) + }) + + it('nacks messages received during shutdown', { timeout: 10000 }, async () => { + // First verify consumer is working + const message1 = { + id: 'shutdown-test-1', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message1) + await consumer.handlerSpy.waitForMessageWithId('shutdown-test-1', 'consumed') + expect(consumer.addCounter).toBe(1) + + // Now close the consumer + await consumer.close() + + // Messages published after close should not be processed + // (they'll be nacked and redelivered when consumer restarts) + // This test just verifies the consumer doesn't crash when receiving messages during shutdown + }) + }) + + describe('start behavior', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + afterEach(async () => { + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + }) + + it('does not start multiple times if already consuming', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + + await consumer.start() + + // @ts-expect-error - accessing protected field for testing + const subscription = consumer.subscription + const initialListenerCount = subscription?.listenerCount('message') + + // Call start again - should be a no-op + await consumer.start() + + // Listener count should not have increased (no duplicate handlers) + expect(subscription?.listenerCount('message')).toBe(initialListenerCount) + + await consumer.close() + }) + + it('sets up all required event handlers', async () => { + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + }) + + await consumer.start() + + // @ts-expect-error - accessing protected field for testing + const subscription = consumer.subscription + + expect(subscription?.listenerCount('message')).toBe(1) + expect(subscription?.listenerCount('error')).toBe(1) + expect(subscription?.listenerCount('close')).toBe(1) + + await consumer.close() + }) + }) + + describe('reconnection behavior', () => { + let diContainer: AwilixContainer + let pubSubClient: PubSub + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionConsumer: asValue(() => undefined), + permissionPublisher: asValue(() => undefined), + }) + pubSubClient = diContainer.cradle.pubSubClient + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + afterEach(async () => { + await deletePubSubTopicAndSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + }) + + it( + 'resubscribes after subscription is temporarily unavailable', + { timeout: 30000 }, + async () => { + expect.assertions(2) + + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + const publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + }) + + try { + await consumer.start() + await publisher.init() + + // Verify consumer is working initially + const message1 = { + id: 'reconnect-test-1', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message1) + await consumer.handlerSpy.waitForMessageWithId('reconnect-test-1', 'consumed') + expect(consumer.addCounter).toBe(1) + + // Delete the subscription while consumer is running + await deletePubSubSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + + // Wait for consumer to detect the error and reconnect + // The consumer should automatically recreate the subscription via creationConfig + await setTimeout(5000) + + // Verify consumer can process messages after reconnection + const message2 = { + id: 'reconnect-test-2', + messageType: 'add' as const, + timestamp: new Date().toISOString(), + userIds: ['user2'], + } + + await publisher.publish(message2) + await consumer.handlerSpy.waitForMessageWithId('reconnect-test-2', 'consumed') + expect(consumer.addCounter).toBe(2) + } finally { + await consumer.close() + await publisher.close() + } + }, + ) + + it( + 'retries initialization when subscription does not exist initially', + { timeout: 20000 }, + async () => { + expect.assertions(1) + + // First create the topic only (no subscription) + const topic = pubSubClient.topic(TOPIC_NAME) + const [topicExists] = await topic.exists() + if (!topicExists) { + await topic.create() + } + + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + locatorConfig: { + topicName: TOPIC_NAME, + subscriptionName: SUBSCRIPTION_NAME, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + + // Create subscription after a delay (simulating eventual consistency) + globalThis.setTimeout(async () => { + await topic.createSubscription(SUBSCRIPTION_NAME) + }, 1500) + + try { + // This should retry and eventually succeed when subscription is created + await consumer.start() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(true) + } finally { + await consumer.close() + } + }, + ) + + it('does not attempt reconnection after close is called', { timeout: 15000 }, async () => { + expect.assertions(4) + + const consumer = new PubSubPermissionConsumer(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + subscription: { name: SUBSCRIPTION_NAME }, + }, + subscriptionRetryOptions: { + maxRetries: 5, + baseRetryDelayMs: 500, + maxRetryDelayMs: 2000, + }, + }) + + try { + await consumer.start() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(true) + + // Close the consumer + await consumer.close() + + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(false) + + // Delete subscription - this should NOT trigger reconnection since consumer is closed + await deletePubSubSubscription(pubSubClient, TOPIC_NAME, SUBSCRIPTION_NAME) + + // Wait a bit to ensure no reconnection attempt happens + await setTimeout(2000) + + // Verify consumer is still closed and not attempting to reconnect + // @ts-expect-error - accessing private field for testing + expect(consumer.isConsuming).toBe(false) + // @ts-expect-error - accessing private field for testing + expect(consumer.isReinitializing).toBe(false) + } finally { + await consumer.close() + } + }) + }) +}) diff --git a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts index 68762907..0433dd47 100644 --- a/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts +++ b/packages/gcp-pubsub/test/consumers/PubSubPermissionConsumer.ts @@ -31,6 +31,8 @@ type PubSubPermissionConsumerOptions = Pick< | 'payloadStoreConfig' | 'messageDeduplicationConfig' | 'enableConsumerDeduplication' + | 'subscriptionRetryOptions' + | 'messageDeduplicationIdField' > & { addPreHandlerBarrier?: ( message: SupportedMessages, diff --git a/packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts b/packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts new file mode 100644 index 00000000..8674bb8b --- /dev/null +++ b/packages/gcp-pubsub/test/errors/PubSubConsumerErrorResolver.spec.ts @@ -0,0 +1,69 @@ +import { InternalError } from '@lokalise/node-core' +import { MessageInvalidFormatError, MessageValidationError } from '@message-queue-toolkit/core' +import { describe, expect, it } from 'vitest' +import { ZodError } from 'zod/v4' +import { PubSubConsumerErrorResolver } from '../../lib/errors/PubSubConsumerErrorResolver.ts' + +describe('PubSubConsumerErrorResolver', () => { + const errorResolver = new PubSubConsumerErrorResolver() + + it('returns MessageInvalidFormatError for SyntaxError', () => { + const syntaxError = new SyntaxError('Unexpected token') + + const result = errorResolver.processError(syntaxError) + + expect(result).toBeInstanceOf(MessageInvalidFormatError) + expect(result.message).toBe('Unexpected token') + }) + + it('returns MessageValidationError for ZodError', () => { + const zodError = new ZodError([ + { + code: 'invalid_type', + expected: 'string', + path: ['field'], + message: 'Expected string, received number', + }, + ]) + + const result = errorResolver.processError(zodError) + + expect(result).toBeInstanceOf(MessageValidationError) + expect(result.message).toContain('invalid_type') + }) + + it('returns InternalError for StandardizedError', () => { + // Create an error that matches the StandardizedError interface + // StandardizedError requires: name, message, code, and Symbol.for('StandardizedErrorSymbol') = true + const standardizedError = Object.assign(new Error('Standardized error'), { + code: 'CUSTOM_CODE', + [Symbol.for('StandardizedErrorSymbol')]: true, + }) + + const result = errorResolver.processError(standardizedError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Standardized error') + expect(result.errorCode).toBe('CUSTOM_CODE') + }) + + it('returns InternalError for unknown errors', () => { + const unknownError = { message: 'Unknown error' } + + const result = errorResolver.processError(unknownError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) + + it('returns InternalError for regular Error', () => { + const regularError = new Error('Regular error') + + const result = errorResolver.processError(regularError) + + expect(result).toBeInstanceOf(InternalError) + expect(result.message).toBe('Error processing message') + expect(result.errorCode).toBe('INTERNAL_ERROR') + }) +}) diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts new file mode 100644 index 00000000..32e1a6ae --- /dev/null +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.messageDeduplication.spec.ts @@ -0,0 +1,190 @@ +import { randomUUID } from 'node:crypto' +import { RedisMessageDeduplicationStore } from '@message-queue-toolkit/redis-message-deduplication-store' +import { type AwilixContainer, asValue } from 'awilix' +import type { Redis } from 'ioredis' +import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest' +import type { PERMISSIONS_ADD_MESSAGE_TYPE } from '../consumers/userConsumerSchemas.ts' +import { deletePubSubTopic } from '../utils/cleanupPubSub.ts' +import type { Dependencies } from '../utils/testContext.ts' +import { registerDependencies } from '../utils/testContext.ts' +import { PubSubPermissionPublisher } from './PubSubPermissionPublisher.ts' + +describe('PubSubPermissionPublisher - Message Deduplication', () => { + const TOPIC_NAME = 'publisher_dedup_test' + + let diContainer: AwilixContainer + let redis: Redis + let messageDeduplicationStore: RedisMessageDeduplicationStore + + beforeAll(async () => { + diContainer = await registerDependencies({ + permissionPublisher: asValue(() => undefined), + permissionConsumer: asValue(() => undefined), + }) + redis = diContainer.cradle.redis + messageDeduplicationStore = new RedisMessageDeduplicationStore({ + redis, + }) + }) + + afterAll(async () => { + await diContainer.cradle.awilixManager.executeDispose() + await diContainer.dispose() + }) + + async function cleanRedis() { + await redis.flushall() + } + + describe('publisher deduplication', () => { + let publisher: PubSubPermissionPublisher + + beforeEach(async () => { + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + messageDeduplicationConfig: { + deduplicationStore: messageDeduplicationStore, + }, + enablePublisherDeduplication: true, + }) + + await deletePubSubTopic(diContainer.cradle.pubSubClient, TOPIC_NAME) + await cleanRedis() + await publisher.init() + }) + + afterEach(async () => { + vi.restoreAllMocks() + await publisher.close() + await cleanRedis() + }) + + it('publishes message and stores deduplication key', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + await publisher.publish(message) + + const spy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(spy.processingResult).toEqual({ status: 'published' }) + + // Verify deduplication key was stored + const deduplicationKeyExists = await messageDeduplicationStore.keyExists( + `publisher:${messageId}`, + ) + expect(deduplicationKeyExists).toBe(true) + }) + + it('skips duplicate message when already published', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + // First publish + await publisher.publish(message) + + const firstSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(firstSpy.processingResult).toEqual({ status: 'published' }) + + // Clear spy for subsequent call + publisher.handlerSpy.clear() + + // Second publish - should be skipped as duplicate + await publisher.publish(message) + + const secondSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(secondSpy.processingResult).toEqual({ + status: 'published', + skippedAsDuplicate: true, + }) + }) + + it('publishes messages with different IDs independently', async () => { + const message1: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: randomUUID(), + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + const message2: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: randomUUID(), + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user2'], + } + + // First message + await publisher.publish(message1) + const spy1 = await publisher.handlerSpy.waitForMessageWithId(message1.id, 'published') + expect(spy1.processingResult).toEqual({ status: 'published' }) + + publisher.handlerSpy.clear() + + // Second message with different ID + await publisher.publish(message2) + const spy2 = await publisher.handlerSpy.waitForMessageWithId(message2.id, 'published') + expect(spy2.processingResult).toEqual({ status: 'published' }) + }) + }) + + describe('publisher without deduplication', () => { + let publisher: PubSubPermissionPublisher + + beforeEach(async () => { + publisher = new PubSubPermissionPublisher(diContainer.cradle, { + creationConfig: { + topic: { name: TOPIC_NAME }, + }, + // No deduplication config + }) + + await deletePubSubTopic(diContainer.cradle.pubSubClient, TOPIC_NAME) + await cleanRedis() + await publisher.init() + }) + + afterEach(async () => { + await publisher.close() + await cleanRedis() + }) + + it('publishes duplicate messages when deduplication is disabled', async () => { + const messageId = randomUUID() + const message: PERMISSIONS_ADD_MESSAGE_TYPE = { + id: messageId, + messageType: 'add', + timestamp: new Date().toISOString(), + userIds: ['user1'], + } + + // First publish + await publisher.publish(message) + const firstSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(firstSpy.processingResult).toEqual({ status: 'published' }) + + publisher.handlerSpy.clear() + + // Second publish - should also be published (no deduplication) + await publisher.publish(message) + const secondSpy = await publisher.handlerSpy.waitForMessageWithId(messageId, 'published') + expect(secondSpy.processingResult).toEqual({ status: 'published' }) + + // Verify no deduplication key was stored + const deduplicationKeyExists = await messageDeduplicationStore.keyExists( + `publisher:${messageId}`, + ) + expect(deduplicationKeyExists).toBe(false) + }) + }) +}) diff --git a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts index c5db7763..ec46cf47 100644 --- a/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts +++ b/packages/gcp-pubsub/test/publishers/PubSubPermissionPublisher.ts @@ -1,3 +1,4 @@ +import type { MessageDeduplicationConfig } from '@message-queue-toolkit/core' import type { PubSubMessageOptions } from '../../lib/pubsub/AbstractPubSubPublisher.ts' import { AbstractPubSubPublisher } from '../../lib/pubsub/AbstractPubSubPublisher.ts' import type { PubSubDependencies } from '../../lib/pubsub/AbstractPubSubService.ts' @@ -27,6 +28,7 @@ type PubSubPermissionPublisherOptions = { } payloadStoreConfig?: any enablePublisherDeduplication?: boolean + messageDeduplicationConfig?: MessageDeduplicationConfig } export class PubSubPermissionPublisher extends AbstractPubSubPublisher { @@ -47,6 +49,7 @@ export class PubSubPermissionPublisher extends AbstractPubSubPublisher