Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ export { type ParseMessageResult, parseMessage } from './utils/parseUtils.ts'
export { objectToBuffer } from './utils/queueUtils.ts'
export {
isStartupResourcePollingEnabled,
type PollingErrorCallback,
type PollingErrorContext,
type StartupResourcePollingCheckResult,
StartupResourcePollingTimeoutError,
type WaitForResourceOptions,
Expand Down
32 changes: 30 additions & 2 deletions packages/core/lib/utils/startupResourcePollingUtils.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
import { setTimeout } from 'node:timers/promises'
import type { CommonLogger, ErrorReporter } from '@lokalise/node-core'
import { type CommonLogger, type ErrorReporter, isError } from '@lokalise/node-core'
import { NO_TIMEOUT, type StartupResourcePollingConfig } from '../types/queueOptionsTypes.ts'

const DEFAULT_POLLING_INTERVAL_MS = 5000

/**
* Context passed to error callbacks indicating whether the error is final.
*/
export type PollingErrorContext = {
/**
* If true, the operation has stopped and will not retry.
* If false, this is a transient error and the operation will continue.
*/
isFinal: boolean
}

/**
* Callback invoked when a polling operation fails.
*/
export type PollingErrorCallback = (error: Error, context: PollingErrorContext) => void

export type StartupResourcePollingCheckResult<T> =
| {
isAvailable: true
Expand Down Expand Up @@ -48,6 +64,13 @@ export type WaitForResourceOptions<T> = {
* Only used when config.nonBlocking is true and the resource was not immediately available.
*/
onResourceAvailable?: (result: T) => void

/**
* Callback invoked when background polling fails in non-blocking mode.
* This can happen due to polling timeout or unexpected errors during polling.
* Only used when config.nonBlocking is true.
*/
onError?: PollingErrorCallback
}

export class StartupResourcePollingTimeoutError extends Error {
Expand Down Expand Up @@ -294,17 +317,22 @@ export async function waitForResource<T>(
})

// Fire and forget - start polling in background
const { onError } = options
setTimeout(pollingIntervalMs).then(() => {
pollForResource(options, pollingIntervalMs, hasTimeout, timeoutMs, throwOnTimeout, 1)
.then((result) => {
onResourceAvailable?.(result)
})
.catch((error) => {
.catch((err) => {
const error = isError(err) ? err : new Error(String(err))
logger?.error({
message: `Background polling for resource "${resourceName}" failed`,
resourceName,
error,
})
// isFinal: true because pollForResource only throws when it gives up
// (timeout with throwOnTimeout: true, or unexpected error)
onError?.(error, { isFinal: true })
})
})

Expand Down
158 changes: 158 additions & 0 deletions packages/sns/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,164 @@ When using `locatorConfig`, you connect to an existing topic without creating it
}
```

### Startup Resource Polling

When your SNS topic or SQS queue may not exist at startup (e.g., created by another service or infrastructure-as-code pipeline), you can enable polling to wait for resources to become available instead of failing immediately:

```typescript
{
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
subscriptionArn: 'arn:aws:sns:us-east-1:123456789012:my-topic:uuid',

// Enable startup resource polling
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000, // Check every 5 seconds (default)
timeoutMs: 60000, // Timeout after 60 seconds
// timeoutMs: 'NO_TIMEOUT', // Or poll indefinitely

// Optional: Non-blocking mode
nonBlocking: false, // Default: false (blocking)
},
},
}
```

#### Blocking Mode (Default)

In blocking mode, `init()` or `start()` will wait until both the topic and queue are available:

```typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs...',
subscriptionArn: 'arn:aws:sns:...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
},
},
})

// This will block until topic and queue are available (or timeout)
await consumer.start()
```

If the timeout is reached before resources are available, a `StartupResourcePollingTimeoutError` is thrown.

#### Non-Blocking Mode

In non-blocking mode, `init()` returns immediately even if resources aren't available. Background polling continues and you're notified via callbacks:

```typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs...',
subscriptionArn: 'arn:aws:sns:...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
nonBlocking: true, // Return immediately
},
},
})

// Returns immediately, even if resources don't exist yet
await consumer.start()

// Check if resources are ready
if (!consumer.resourcesReady) {
console.log('Resources not yet available, waiting in background...')
}
```

#### Callbacks for Non-Blocking Mode

When using non-blocking mode, you can provide callbacks to be notified when resources become available or when polling fails:

```typescript
// Using initSnsSqs directly for more control
import { initSnsSqs } from '@message-queue-toolkit/sns'

const result = await initSnsSqs(
sqsClient,
snsClient,
stsClient,
{
topicName: 'my-topic',
queueUrl: 'https://sqs...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
nonBlocking: true,
},
},
creationConfig,
subscriptionConfig,
{
// Called when all resources become available
onResourcesReady: ({ topicArn, queueUrl }) => {
console.log('Resources are now available!')
console.log('Topic ARN:', topicArn)
console.log('Queue URL:', queueUrl)
},

// Called if background polling fails (e.g., timeout or unexpected error)
onResourcesError: (error, context) => {
if (context.isFinal) {
// Polling has stopped and will not retry
console.error('Failed to wait for resources:', error.message)
// Handle error - maybe alert, retry, or graceful degradation
} else {
// Transient error, polling will continue
console.warn('Transient error while polling:', error.message)
}
},
},
)

if (result.resourcesReady) {
console.log('Resources were immediately available')
} else {
console.log('Waiting for resources in background...')
}
```

#### Subscription Creation Mode

When you want to **create** a subscription (no existing `subscriptionArn`), startup resource polling will wait for the topic to exist before attempting to subscribe:

```typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic', // Topic created by another service
// No subscriptionArn - we'll create the subscription
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
},
},
creationConfig: {
queue: { QueueName: 'my-consumer-queue' }, // Queue will be created
},
})

// This will:
// 1. Poll until the topic exists
// 2. Create the SQS queue
// 3. Subscribe the queue to the topic
// 4. Start consuming
await consumer.start()
```

### Publisher Options

```typescript
Expand Down
Loading
Loading