Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions packages/gcp-pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Google Cloud Pub/Sub implementation for the message-queue-toolkit. Provides a ro
- [Consumer Flow Control](#consumer-flow-control)
- [Multiple Message Types](#multiple-message-types)
- [Error Handling](#error-handling)
- [Subscription-Level Error Handling](#subscription-level-error-handling)
- [Testing](#testing)
- [TestPubSubPublisher](#testpubsubpublisher)
- [Integration Tests with Emulator](#integration-tests-with-emulator)
Expand Down Expand Up @@ -1490,6 +1491,116 @@ When a message cannot be processed (invalid format, schema validation failure, h

**Best Practice:** Always configure a DLQ in production to capture and analyze failed messages.

### Subscription-Level Error Handling

The consumer provides a higher-level error recovery mechanism that complements the SDK's built-in gRPC retry logic. While the `@google-cloud/pubsub` SDK automatically retries some transient errors at the gRPC level, there are scenarios where the SDK does not recover automatically:

1. **Eventual consistency errors** (`NOT_FOUND`, `PERMISSION_DENIED`) are not in the SDK's default retry codes
2. **Subscription stream disconnections** may not automatically reconnect in all cases
3. **Infrastructure changes** (e.g., after Terraform deployments) may require full subscription reinitialization

#### Initialization Retry

When calling `start()`, the consumer will automatically retry initialization if it encounters retryable errors. This is particularly useful when:

- Using `locatorConfig` and the subscription doesn't exist yet due to eventual consistency
- Services start in parallel and the subscription is being created by another process
- Terraform deployments are still propagating

The retry logic handles errors containing:
- `does not exist` - Resource not yet visible
- `NOT_FOUND` - gRPC error code 5
- `PERMISSION_DENIED` - gRPC error code 7 (IAM propagation delay)

#### Runtime Reconnection

**Retryable Error Codes:**

*Errors the consumer handles via reinitialization during runtime:*
- `DEADLINE_EXCEEDED` (4): Request timeout that SDK retry couldn't resolve
- `NOT_FOUND` (5): Subscription may not be propagated yet (eventual consistency)
- `PERMISSION_DENIED` (7): IAM permissions may not be propagated yet (eventual consistency)
- `RESOURCE_EXHAUSTED` (8): Quota exceeded, retry with backoff
- `INTERNAL` (13): Server error, should be transient
- `UNAVAILABLE` (14): Service temporarily unable to process

When these errors reach the `subscription.on('error')` handler (meaning SDK's built-in retry couldn't resolve them), the consumer will:
1. Log a warning with error details
2. Close the existing subscription and remove event listeners
3. Reinitialize the subscription with exponential backoff
4. Reattach event handlers and continue consuming

**Why `NOT_FOUND` and `PERMISSION_DENIED`?**

After Terraform deployments, GCP resources and IAM permissions can take several minutes to propagate across GCP's distributed infrastructure. During this window, the subscription may report these errors even though the configuration is correct. The consumer retries with exponential backoff to handle this eventual consistency.

**Note:** For most transient errors, the SDK's built-in retry will handle recovery automatically. The consumer's reinitialization logic is a safety net for cases where SDK retry is exhausted or not applicable.

**Configuration:**

The same retry options apply to both initialization and runtime reconnection:

```typescript
class MyConsumer extends AbstractPubSubConsumer<MyMessage, ExecutionContext> {
constructor(dependencies: PubSubConsumerDependencies) {
super(
dependencies,
{
// ... other options ...

// Optional: Configure retry behavior for both init and runtime errors
subscriptionRetryOptions: {
maxRetries: 5, // Maximum retry attempts (default: 5)
baseRetryDelayMs: 1000, // Base delay for exponential backoff (default: 1000ms)
maxRetryDelayMs: 30000, // Maximum delay between retries (default: 30000ms)
},
},
executionContext,
)
}
}
```

**Exponential Backoff Formula:**
```text
delay = min(baseRetryDelayMs * 2^(attempt-1), maxRetryDelayMs)
```

With default settings, delays are: 1s, 2s, 4s, 8s, 16s (capped at 30s).

**Unexpected Subscription Closure:**

The consumer also handles unexpected subscription closures (e.g., network issues, GCP service restarts). If the subscription closes while the consumer is still supposed to be consuming, it will automatically attempt reinitialization.

**Health Check Integration:**

If all retry attempts are exhausted, the consumer enters a failed state. You can detect this via the `fatalError` getter for health check integration:

```typescript
// In your health check endpoint
app.get('/health', (req, res) => {
const error = consumer.fatalError
if (error) {
return res.status(503).json({
status: 'unhealthy',
error: error.message,
})
}
return res.status(200).json({ status: 'healthy' })
})
```

The `fatalError` property returns:
- `null` when the consumer is healthy
- `Error` when the consumer has permanently failed (e.g., after exhausting all retry attempts)

This allows your application to properly report unhealthy status to orchestration systems (Kubernetes, etc.) and trigger appropriate remediation (pod restart, alerting, etc.).

**References:**
- [GCP Pub/Sub Error Codes](https://cloud.google.com/pubsub/docs/reference/error-codes)
- [GCP Pub/Sub Troubleshooting](https://cloud.google.com/pubsub/docs/troubleshooting)
- [Node.js Pub/Sub Subscription Reconnection Issues](https://github.com/googleapis/nodejs-pubsub/issues/979)

### Error Resolver

```typescript
Expand Down Expand Up @@ -1775,13 +1886,17 @@ it('publishes message', async () => {
- `deadLetterQueue`: DLQ configuration
- `maxRetryDuration`: Max retry time in seconds
- `consumerOverrides`: Flow control settings
- `subscriptionRetryOptions`: Retry configuration for subscription errors (see [Subscription-Level Error Handling](#subscription-level-error-handling))

**Methods:**
- `init()`: Initialize consumer (create/locate resources)
- `start()`: Start consuming messages
- `close()`: Stop consumer and close connections
- `handlerSpy`: Access spy for testing

**Properties:**
- `fatalError`: Returns `Error` if consumer has permanently failed, `null` otherwise (for health checks)

## Best Practices

1. **Use message ordering** for related events (same user, same entity)
Expand Down
24 changes: 24 additions & 0 deletions packages/gcp-pubsub/lib/errors/SubscriptionDoesNotExistError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
export class SubscriptionDoesNotExistError extends Error {
public readonly subscriptionName: string

constructor(subscriptionName: string) {
super(`Subscription ${subscriptionName} does not exist`)
this.name = 'SubscriptionDoesNotExistError'
this.subscriptionName = subscriptionName
}
}

export function isSubscriptionDoesNotExistError(
error: unknown,
): error is SubscriptionDoesNotExistError {
return (
typeof error === 'object' &&
error !== null &&
'name' in error &&
'message' in error &&
'subscriptionName' in error &&
error.name === 'SubscriptionDoesNotExistError' &&
typeof error.message === 'string' &&
typeof error.subscriptionName === 'string'
)
}
85 changes: 85 additions & 0 deletions packages/gcp-pubsub/lib/errors/grpcErrors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { status as GrpcStatus } from '@grpc/grpc-js'

/**
* gRPC status codes for which subscription operations should be retried.
*
* Includes both:
* 1. GCP-documented retryable errors (DEADLINE_EXCEEDED, RESOURCE_EXHAUSTED, INTERNAL, UNAVAILABLE)
* 2. Eventual consistency errors common after Terraform deployments (NOT_FOUND, PERMISSION_DENIED)
*
* **Why PERMISSION_DENIED is included:**
* After Terraform deployments, IAM permissions can take several minutes to propagate across
* GCP's distributed infrastructure. During this window, the subscription may report
* PERMISSION_DENIED even though permissions are correctly configured.
*
* **Why NOT_FOUND is included:**
* Similar to PERMISSION_DENIED, newly created subscriptions may not be immediately visible
* across all GCP endpoints due to eventual consistency.
*
* @see https://cloud.google.com/pubsub/docs/reference/error-codes
* @see https://github.com/googleapis/nodejs-pubsub/issues/979
*/
export const RETRYABLE_GRPC_STATUS_CODES = [
GrpcStatus.DEADLINE_EXCEEDED,
GrpcStatus.NOT_FOUND,
GrpcStatus.PERMISSION_DENIED,
GrpcStatus.RESOURCE_EXHAUSTED,
GrpcStatus.INTERNAL,
GrpcStatus.UNAVAILABLE,
] as const

export type RetryableGrpcStatusCode = (typeof RETRYABLE_GRPC_STATUS_CODES)[number]

const RETRYABLE_CODES_SET = new Set<number>(RETRYABLE_GRPC_STATUS_CODES)

/**
* Type for errors with a numeric gRPC status code and message.
*/
export type GrpcError = {
code: number
message: string
}

/**
* Checks if an error has gRPC error properties (code and message).
* Uses duck typing to avoid fragile instanceof checks.
*
* @param error - The error to check
* @returns true if the error has numeric `code` and string `message` properties
*/
export function isGrpcError(error: unknown): error is GrpcError {
return (
typeof error === 'object' &&
error !== null &&
'code' in error &&
'message' in error &&
typeof (error as GrpcError).code === 'number' &&
typeof (error as GrpcError).message === 'string'
)
}

/**
* Checks if an error is a gRPC error with a retryable status code.
*
* @param error - The error to check
* @returns true if the error has a retryable gRPC status code
*/
export function isRetryableGrpcError(error: unknown): error is GrpcError {
return isGrpcError(error) && RETRYABLE_CODES_SET.has(error.code)
}

/**
* Gets the gRPC status code from an error if it has one.
*
* @param error - The error to extract the code from
* @returns The gRPC status code, or undefined if not a gRPC error
*/
export function getGrpcStatusCode(error: unknown): number | undefined {
if (isGrpcError(error)) {
return error.code
}
return undefined
}

// Re-export for convenience
export { GrpcStatus }
3 changes: 2 additions & 1 deletion packages/gcp-pubsub/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
export * from './errors/grpcErrors.ts'
export * from './errors/PubSubConsumerErrorResolver.ts'
export * from './errors/SubscriptionDoesNotExistError.ts'
export * from './fakes/FakeConsumerErrorResolver.ts'
export * from './fakes/TestPubSubPublisher.ts'
export * from './pubsub/AbstractPubSubConsumer.ts'
Expand All @@ -9,4 +11,3 @@ export * from './pubsub/PubSubPublisherManager.ts'
export * from './schemas/pubSubSchemas.ts'
export * from './types/MessageTypes.ts'
export * from './utils/pubSubInitter.ts'
export * from './utils/pubSubUtils.ts'
Loading
Loading