Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ export { type ParseMessageResult, parseMessage } from './utils/parseUtils.ts'
export { objectToBuffer } from './utils/queueUtils.ts'
export {
isStartupResourcePollingEnabled,
type PollingErrorCallback,
type PollingErrorContext,
type StartupResourcePollingCheckResult,
StartupResourcePollingTimeoutError,
type WaitForResourceOptions,
Expand Down
32 changes: 30 additions & 2 deletions packages/core/lib/utils/startupResourcePollingUtils.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
import { setTimeout } from 'node:timers/promises'
import type { CommonLogger, ErrorReporter } from '@lokalise/node-core'
import { type CommonLogger, type ErrorReporter, isError } from '@lokalise/node-core'
import { NO_TIMEOUT, type StartupResourcePollingConfig } from '../types/queueOptionsTypes.ts'

const DEFAULT_POLLING_INTERVAL_MS = 5000

/**
* Context passed to error callbacks indicating whether the error is final.
*/
export type PollingErrorContext = {
/**
* If true, the operation has stopped and will not retry.
* If false, this is a transient error and the operation will continue.
*/
isFinal: boolean
}

/**
* Callback invoked when a polling operation fails.
*/
export type PollingErrorCallback = (error: Error, context: PollingErrorContext) => void

export type StartupResourcePollingCheckResult<T> =
| {
isAvailable: true
Expand Down Expand Up @@ -48,6 +64,13 @@ export type WaitForResourceOptions<T> = {
* Only used when config.nonBlocking is true and the resource was not immediately available.
*/
onResourceAvailable?: (result: T) => void

/**
* Callback invoked when background polling fails in non-blocking mode.
* This can happen due to polling timeout or unexpected errors during polling.
* Only used when config.nonBlocking is true.
*/
onError?: PollingErrorCallback
}

export class StartupResourcePollingTimeoutError extends Error {
Expand Down Expand Up @@ -294,17 +317,22 @@ export async function waitForResource<T>(
})

// Fire and forget - start polling in background
const { onError } = options
setTimeout(pollingIntervalMs).then(() => {
pollForResource(options, pollingIntervalMs, hasTimeout, timeoutMs, throwOnTimeout, 1)
.then((result) => {
onResourceAvailable?.(result)
})
.catch((error) => {
.catch((err) => {
const error = isError(err) ? err : new Error(String(err))
logger?.error({
message: `Background polling for resource "${resourceName}" failed`,
resourceName,
error,
})
// isFinal: true because pollForResource only throws when it gives up
// (timeout with throwOnTimeout: true, or unexpected error)
onError?.(error, { isFinal: true })
})
})

Expand Down
158 changes: 158 additions & 0 deletions packages/sns/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,164 @@ When using `locatorConfig`, you connect to an existing topic without creating it
}
```

### Startup Resource Polling

When your SNS topic or SQS queue may not exist at startup (e.g., created by another service or infrastructure-as-code pipeline), you can enable polling to wait for resources to become available instead of failing immediately:

```typescript
{
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
subscriptionArn: 'arn:aws:sns:us-east-1:123456789012:my-topic:uuid',

// Enable startup resource polling
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000, // Check every 5 seconds (default)
timeoutMs: 60000, // Timeout after 60 seconds
// timeoutMs: 'NO_TIMEOUT', // Or poll indefinitely

// Optional: Non-blocking mode
nonBlocking: false, // Default: false (blocking)
},
},
}
```

#### Blocking Mode (Default)

In blocking mode, `init()` or `start()` will wait until both the topic and queue are available:

```typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs...',
subscriptionArn: 'arn:aws:sns:...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
},
},
})

// This will block until topic and queue are available (or timeout)
await consumer.start()
```

If the timeout is reached before resources are available, a `StartupResourcePollingTimeoutError` is thrown.

#### Non-Blocking Mode

In non-blocking mode, `init()` returns immediately even if resources aren't available. Background polling continues and you're notified via callbacks:

```typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic',
queueUrl: 'https://sqs...',
subscriptionArn: 'arn:aws:sns:...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
nonBlocking: true, // Return immediately
},
},
})

// Returns immediately, even if resources don't exist yet
await consumer.start()

// Check if resources are ready
if (!consumer.resourcesReady) {
console.log('Resources not yet available, waiting in background...')
}
```

#### Callbacks for Non-Blocking Mode

When using non-blocking mode, you can provide callbacks to be notified when resources become available or when polling fails:

```typescript
// Using initSnsSqs directly for more control
import { initSnsSqs } from '@message-queue-toolkit/sns'

const result = await initSnsSqs(
sqsClient,
snsClient,
stsClient,
{
topicName: 'my-topic',
queueUrl: 'https://sqs...',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
nonBlocking: true,
},
},
creationConfig,
subscriptionConfig,
{
// Called when all resources become available
onResourcesReady: ({ topicArn, queueUrl }) => {
console.log('Resources are now available!')
console.log('Topic ARN:', topicArn)
console.log('Queue URL:', queueUrl)
},

// Called if background polling fails (e.g., timeout or unexpected error)
onResourcesError: (error, context) => {
if (context.isFinal) {
// Polling has stopped and will not retry
console.error('Failed to wait for resources:', error.message)
// Handle error - maybe alert, retry, or graceful degradation
} else {
// Transient error, polling will continue
console.warn('Transient error while polling:', error.message)
}
},
},
)

if (result.resourcesReady) {
console.log('Resources were immediately available')
} else {
console.log('Waiting for resources in background...')
}
```

#### Subscription Creation Mode

When you want to **create** a subscription (no existing `subscriptionArn`), startup resource polling will wait for the topic to exist before attempting to subscribe:

```typescript
const consumer = new MyConsumer(deps, {
locatorConfig: {
topicName: 'my-topic', // Topic created by another service
// No subscriptionArn - we'll create the subscription
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 5000,
timeoutMs: 60000,
},
},
creationConfig: {
queue: { QueueName: 'my-consumer-queue' }, // Queue will be created
},
})

// This will:
// 1. Poll until the topic exists
// 2. Create the SQS queue
// 3. Subscribe the queue to the topic
// 4. Start consuming
await consumer.start()
```

### Publisher Options

```typescript
Expand Down
Loading
Loading