diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index 406712da..b9d29a8d 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -107,6 +107,8 @@ export { type ParseMessageResult, parseMessage } from './utils/parseUtils.ts' export { objectToBuffer } from './utils/queueUtils.ts' export { isStartupResourcePollingEnabled, + type PollingErrorCallback, + type PollingErrorContext, type StartupResourcePollingCheckResult, StartupResourcePollingTimeoutError, type WaitForResourceOptions, diff --git a/packages/core/lib/utils/startupResourcePollingUtils.ts b/packages/core/lib/utils/startupResourcePollingUtils.ts index cd37764f..26734199 100644 --- a/packages/core/lib/utils/startupResourcePollingUtils.ts +++ b/packages/core/lib/utils/startupResourcePollingUtils.ts @@ -1,9 +1,25 @@ import { setTimeout } from 'node:timers/promises' -import type { CommonLogger, ErrorReporter } from '@lokalise/node-core' +import { type CommonLogger, type ErrorReporter, isError } from '@lokalise/node-core' import { NO_TIMEOUT, type StartupResourcePollingConfig } from '../types/queueOptionsTypes.ts' const DEFAULT_POLLING_INTERVAL_MS = 5000 +/** + * Context passed to error callbacks indicating whether the error is final. + */ +export type PollingErrorContext = { + /** + * If true, the operation has stopped and will not retry. + * If false, this is a transient error and the operation will continue. + */ + isFinal: boolean +} + +/** + * Callback invoked when a polling operation fails. + */ +export type PollingErrorCallback = (error: Error, context: PollingErrorContext) => void + export type StartupResourcePollingCheckResult = | { isAvailable: true @@ -48,6 +64,13 @@ export type WaitForResourceOptions = { * Only used when config.nonBlocking is true and the resource was not immediately available. */ onResourceAvailable?: (result: T) => void + + /** + * Callback invoked when background polling fails in non-blocking mode. + * This can happen due to polling timeout or unexpected errors during polling. + * Only used when config.nonBlocking is true. + */ + onError?: PollingErrorCallback } export class StartupResourcePollingTimeoutError extends Error { @@ -294,17 +317,22 @@ export async function waitForResource( }) // Fire and forget - start polling in background + const { onError } = options setTimeout(pollingIntervalMs).then(() => { pollForResource(options, pollingIntervalMs, hasTimeout, timeoutMs, throwOnTimeout, 1) .then((result) => { onResourceAvailable?.(result) }) - .catch((error) => { + .catch((err) => { + const error = isError(err) ? err : new Error(String(err)) logger?.error({ message: `Background polling for resource "${resourceName}" failed`, resourceName, error, }) + // isFinal: true because pollForResource only throws when it gives up + // (timeout with throwOnTimeout: true, or unexpected error) + onError?.(error, { isFinal: true }) }) }) diff --git a/packages/sns/README.md b/packages/sns/README.md index b5eb82e2..4165a7ed 100644 --- a/packages/sns/README.md +++ b/packages/sns/README.md @@ -489,6 +489,164 @@ When using `locatorConfig`, you connect to an existing topic without creating it } ``` +### Startup Resource Polling + +When your SNS topic or SQS queue may not exist at startup (e.g., created by another service or infrastructure-as-code pipeline), you can enable polling to wait for resources to become available instead of failing immediately: + +```typescript +{ + locatorConfig: { + topicName: 'my-topic', + queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue', + subscriptionArn: 'arn:aws:sns:us-east-1:123456789012:my-topic:uuid', + + // Enable startup resource polling + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, // Check every 5 seconds (default) + timeoutMs: 60000, // Timeout after 60 seconds + // timeoutMs: 'NO_TIMEOUT', // Or poll indefinitely + + // Optional: Non-blocking mode + nonBlocking: false, // Default: false (blocking) + }, + }, +} +``` + +#### Blocking Mode (Default) + +In blocking mode, `init()` or `start()` will wait until both the topic and queue are available: + +```typescript +const consumer = new MyConsumer(deps, { + locatorConfig: { + topicName: 'my-topic', + queueUrl: 'https://sqs...', + subscriptionArn: 'arn:aws:sns:...', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + }, + }, +}) + +// This will block until topic and queue are available (or timeout) +await consumer.start() +``` + +If the timeout is reached before resources are available, a `StartupResourcePollingTimeoutError` is thrown. + +#### Non-Blocking Mode + +In non-blocking mode, `init()` returns immediately even if resources aren't available. Background polling continues and you're notified via callbacks: + +```typescript +const consumer = new MyConsumer(deps, { + locatorConfig: { + topicName: 'my-topic', + queueUrl: 'https://sqs...', + subscriptionArn: 'arn:aws:sns:...', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + nonBlocking: true, // Return immediately + }, + }, +}) + +// Returns immediately, even if resources don't exist yet +await consumer.start() + +// Check if resources are ready +if (!consumer.resourcesReady) { + console.log('Resources not yet available, waiting in background...') +} +``` + +#### Callbacks for Non-Blocking Mode + +When using non-blocking mode, you can provide callbacks to be notified when resources become available or when polling fails: + +```typescript +// Using initSnsSqs directly for more control +import { initSnsSqs } from '@message-queue-toolkit/sns' + +const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName: 'my-topic', + queueUrl: 'https://sqs...', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + nonBlocking: true, + }, + }, + creationConfig, + subscriptionConfig, + { + // Called when all resources become available + onResourcesReady: ({ topicArn, queueUrl }) => { + console.log('Resources are now available!') + console.log('Topic ARN:', topicArn) + console.log('Queue URL:', queueUrl) + }, + + // Called if background polling fails (e.g., timeout or unexpected error) + onResourcesError: (error, context) => { + if (context.isFinal) { + // Polling has stopped and will not retry + console.error('Failed to wait for resources:', error.message) + // Handle error - maybe alert, retry, or graceful degradation + } else { + // Transient error, polling will continue + console.warn('Transient error while polling:', error.message) + } + }, + }, +) + +if (result.resourcesReady) { + console.log('Resources were immediately available') +} else { + console.log('Waiting for resources in background...') +} +``` + +#### Subscription Creation Mode + +When you want to **create** a subscription (no existing `subscriptionArn`), startup resource polling will wait for the topic to exist before attempting to subscribe: + +```typescript +const consumer = new MyConsumer(deps, { + locatorConfig: { + topicName: 'my-topic', // Topic created by another service + // No subscriptionArn - we'll create the subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + }, + }, + creationConfig: { + queue: { QueueName: 'my-consumer-queue' }, // Queue will be created + }, +}) + +// This will: +// 1. Poll until the topic exists +// 2. Create the SQS queue +// 3. Subscribe the queue to the topic +// 4. Start consuming +await consumer.start() +``` + ### Publisher Options ```typescript diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 91a43909..9b8eae3f 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -1,8 +1,8 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns' import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs' import type { STSClient } from '@aws-sdk/client-sts' -import type { Either } from '@lokalise/node-core' -import type { DeletionConfig, ExtraParams } from '@message-queue-toolkit/core' +import { type Either, isError } from '@lokalise/node-core' +import type { DeletionConfig, ExtraParams, PollingErrorCallback } from '@message-queue-toolkit/core' import { isProduction, isStartupResourcePollingEnabled, @@ -22,12 +22,197 @@ import { subscribeToTopic } from './snsSubscriber.ts' import { assertTopic, deleteSubscription, deleteTopic, getTopicAttributes } from './snsUtils.ts' import { buildTopicArn } from './stsUtils.ts' +// Helper type for topic polling result +type TopicPollingResult = { + topicResult: unknown | undefined + topicArn: string +} + +// Helper function to poll for SNS topic availability +async function pollForTopic( + snsClient: SNSClient, + topicArn: string, + startupResourcePolling: NonNullable, + extraParams?: ExtraParams, + onResourceAvailable?: () => void, + onError?: PollingErrorCallback, +): Promise { + const topicResult = await waitForResource({ + config: startupResourcePolling, + resourceName: `SNS topic ${topicArn}`, + logger: extraParams?.logger, + errorReporter: extraParams?.errorReporter, + onResourceAvailable, + onError, + checkFn: async () => { + const result = await getTopicAttributes(snsClient, topicArn) + if (result.error === 'not_found') { + return { isAvailable: false } + } + return { isAvailable: true, result: result.result } + }, + }) + + return { topicResult, topicArn } +} + +// Helper function to create subscription +async function createSubscription( + sqsClient: SQSClient, + snsClient: SNSClient, + stsClient: STSClient, + creationConfig: SNSCreationConfig & SQSCreationConfig, + topicResolutionOptions: TopicResolutionOptions, + subscriptionConfig: SNSSubscriptionOptions, + extraParams?: ExtraParams, +) { + const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic( + sqsClient, + snsClient, + stsClient, + creationConfig.queue, + topicResolutionOptions, + subscriptionConfig, + { + updateAttributesIfExists: creationConfig.updateAttributesIfExists, + queueUrlsWithSubscribePermissionsPrefix: + creationConfig.queueUrlsWithSubscribePermissionsPrefix, + allowedSourceOwner: creationConfig.allowedSourceOwner, + topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix, + logger: extraParams?.logger, + forceTagUpdate: creationConfig.forceTagUpdate, + }, + ) + if (!subscriptionArn) { + throw new Error('Failed to subscribe') + } + return { subscriptionArn, topicArn, queueUrl } +} + +// Helper to handle subscription creation with optional topic polling (blocking and non-blocking) +async function createSubscriptionWithPolling( + sqsClient: SQSClient, + snsClient: SNSClient, + stsClient: STSClient, + creationConfig: SNSCreationConfig & SQSCreationConfig, + topicResolutionOptions: TopicResolutionOptions, + subscriptionConfig: SNSSubscriptionOptions, + topicArn: string, + startupResourcePolling: NonNullable, + extraParams?: InitSnsSqsExtraParams, +): Promise<{ + subscriptionArn: string + topicArn: string + queueUrl: string + queueName: string + resourcesReady: boolean +}> { + const nonBlocking = startupResourcePolling.nonBlocking === true + const queueName = creationConfig.queue.QueueName! + + const onTopicReady = async () => { + try { + const result = await createSubscription( + sqsClient, + snsClient, + stsClient, + creationConfig, + topicResolutionOptions, + subscriptionConfig, + extraParams, + ) + extraParams?.onResourcesReady?.({ + topicArn: result.topicArn, + queueUrl: result.queueUrl, + }) + } catch (err) { + const error = isError(err) ? err : new Error(String(err)) + extraParams?.logger?.error({ + message: 'Background subscription creation failed', + topicArn, + error, + }) + // Subscription creation failure is final - we don't retry + extraParams?.onResourcesError?.(error, { isFinal: true }) + } + } + + const { topicResult } = await pollForTopic( + snsClient, + topicArn, + startupResourcePolling, + extraParams, + nonBlocking ? onTopicReady : undefined, + nonBlocking ? extraParams?.onResourcesError : undefined, + ) + + // Non-blocking: return early if topic wasn't immediately available + if (nonBlocking && topicResult === undefined) { + return { + subscriptionArn: '', + topicArn, + queueName, + queueUrl: '', + resourcesReady: false, + } + } + + // Blocking: topic is now available, create subscription + const result = await createSubscription( + sqsClient, + snsClient, + stsClient, + creationConfig, + topicResolutionOptions, + subscriptionConfig, + extraParams, + ) + return { + subscriptionArn: result.subscriptionArn, + topicArn: result.topicArn, + queueName, + queueUrl: result.queueUrl, + resourcesReady: true, + } +} + +// Helper function to poll for SQS queue availability +async function pollForQueue( + sqsClient: SQSClient, + queueUrl: string, + startupResourcePolling: NonNullable, + extraParams?: ExtraParams, + onResourceAvailable?: () => void, + onError?: PollingErrorCallback, +): Promise { + return await waitForResource({ + config: startupResourcePolling, + resourceName: `SQS queue ${queueUrl}`, + logger: extraParams?.logger, + errorReporter: extraParams?.errorReporter, + onResourceAvailable, + onError, + checkFn: async () => { + const result = await getQueueAttributes(sqsClient, queueUrl) + if (result.error === 'not_found') { + return { isAvailable: false } + } + return { isAvailable: true, result: result.result } + }, + }) +} + export type InitSnsSqsExtraParams = ExtraParams & { /** * Callback invoked when resources become available in non-blocking mode. * Only called when startupResourcePolling.nonBlocking is true and resources were not immediately available. */ onResourcesReady?: (result: { topicArn: string; queueUrl: string }) => void + /** + * Callback invoked when background resource polling or subscription creation fails in non-blocking mode. + * This can happen due to polling timeout or subscription creation failure. + */ + onResourcesError?: PollingErrorCallback } export type InitSnsExtraParams = ExtraParams & { @@ -81,26 +266,40 @@ export async function initSnsSqs( ...creationConfig.topic, } - const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic( + // If startup resource polling is enabled and we're not creating the topic (just locating it), + // we should poll for the topic to exist before attempting to subscribe + const startupResourcePolling = locatorConfig?.startupResourcePolling + if ( + isStartupResourcePollingEnabled(startupResourcePolling) && + !isCreateTopicCommand(topicResolutionOptions) + ) { + const topicArnToWaitFor = + topicResolutionOptions.topicArn ?? + (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) + + return await createSubscriptionWithPolling( + sqsClient, + snsClient, + stsClient, + creationConfig, + topicResolutionOptions, + subscriptionConfig, + topicArnToWaitFor, + startupResourcePolling, + extraParams, + ) + } + + // No polling needed - create subscription immediately + const { subscriptionArn, topicArn, queueUrl } = await createSubscription( sqsClient, snsClient, stsClient, - creationConfig.queue, + creationConfig, topicResolutionOptions, subscriptionConfig, - { - updateAttributesIfExists: creationConfig.updateAttributesIfExists, - queueUrlsWithSubscribePermissionsPrefix: - creationConfig.queueUrlsWithSubscribePermissionsPrefix, - allowedSourceOwner: creationConfig.allowedSourceOwner, - topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix, - logger: extraParams?.logger, - forceTagUpdate: creationConfig.forceTagUpdate, - }, + extraParams, ) - if (!subscriptionArn) { - throw new Error('Failed to subscribe') - } return { subscriptionArn, topicArn, @@ -133,23 +332,19 @@ export async function initSnsSqs( } // Wait for topic to become available - const topicResult = await waitForResource({ - config: startupResourcePolling, - resourceName: `SNS topic ${subscriptionTopicArn}`, - logger: extraParams?.logger, - errorReporter: extraParams?.errorReporter, - onResourceAvailable: () => { + const { topicResult } = await pollForTopic( + snsClient, + subscriptionTopicArn, + startupResourcePolling, + extraParams, + () => { topicAvailable = true notifyIfBothReady() }, - checkFn: async () => { - const result = await getTopicAttributes(snsClient, subscriptionTopicArn) - if (result.error === 'not_found') { - return { isAvailable: false } - } - return { isAvailable: true, result: result.result } + (error, context) => { + extraParams?.onResourcesError?.(error, context) }, - }) + ) // If topic was immediately available, mark it if (topicResult !== undefined) { @@ -164,37 +359,36 @@ export async function initSnsSqs( const queueName = splitUrl[splitUrl.length - 1]! // Also start polling for queue in background so we can notify when both are ready - waitForResource({ - config: startupResourcePolling, - resourceName: `SQS queue ${queueUrl}`, - logger: extraParams?.logger, - errorReporter: extraParams?.errorReporter, - onResourceAvailable: () => { + pollForQueue( + sqsClient, + queueUrl, + startupResourcePolling, + extraParams, + () => { queueAvailable = true notifyIfBothReady() }, - checkFn: async () => { - const result = await getQueueAttributes(sqsClient, queueUrl) - if (result.error === 'not_found') { - return { isAvailable: false } - } - return { isAvailable: true, result: result.result } + (error, context) => { + extraParams?.onResourcesError?.(error, context) }, - }) + ) .then((result) => { - // If queue was immediately available, waitForResource returns the result + // If queue was immediately available, pollForQueue returns the result // but doesn't call onResourceAvailable, so we handle it here if (result !== undefined) { queueAvailable = true notifyIfBothReady() } }) - .catch((error) => { + .catch((err) => { + // Handle unexpected errors during background polling + const error = isError(err) ? err : new Error(String(err)) extraParams?.logger?.error({ - message: 'Background polling for SQS queue failed', + message: 'Background queue polling failed unexpectedly', queueUrl, error, }) + extraParams?.onResourcesError?.(error, { isFinal: true }) }) return { @@ -207,23 +401,16 @@ export async function initSnsSqs( } // Wait for queue to become available - const queueResult = await waitForResource({ - config: startupResourcePolling, - resourceName: `SQS queue ${queueUrl}`, - logger: extraParams?.logger, - errorReporter: extraParams?.errorReporter, - onResourceAvailable: () => { + const queueResult = await pollForQueue( + sqsClient, + queueUrl, + startupResourcePolling, + extraParams, + () => { queueAvailable = true notifyIfBothReady() }, - checkFn: async () => { - const result = await getQueueAttributes(sqsClient, queueUrl) - if (result.error === 'not_found') { - return { isAvailable: false } - } - return { isAvailable: true, result: result.result } - }, - }) + ) // If queue was immediately available, mark it if (queueResult !== undefined) { diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts index b47cc412..2c416cb8 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts @@ -15,6 +15,7 @@ import { type SNSSQSConsumerDependencies, type SNSSQSConsumerOptions, } from '../../lib/sns/AbstractSnsSqsConsumer.ts' +import { initSnsSqs } from '../../lib/utils/snsInitter.ts' import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils.ts' import type { Dependencies } from '../utils/testContext.ts' import { registerDependencies } from '../utils/testContext.ts' @@ -304,7 +305,6 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { it('invokes onResourcesReady callback when both resources become available in background', async () => { // Test at the initter level since AbstractSnsSqsConsumer doesn't expose the callback - const { initSnsSqs } = await import('../../lib/utils/snsInitter.ts') let callbackInvoked = false let callbackTopicArn: string | undefined @@ -356,6 +356,336 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(callbackTopicArn).toBeDefined() expect(callbackQueueUrl).toBe(queueUrl) }) + + it('invokes onResourcesError callback when background queue polling times out', async () => { + // Neither topic nor queue exist initially + // Topic will be created after init returns, queue never appears + + let errorCallbackInvoked = false + let callbackError: Error | undefined + let callbackContext: { isFinal: boolean } | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout so it fails quickly + nonBlocking: true, + }, + }, + undefined, + undefined, + { + onResourcesError: (error, context) => { + errorCallbackInvoked = true + callbackError = error + callbackContext = context + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + + // Create topic so topic polling succeeds, but NOT queue + await assertTopic(snsClient, stsClient, { Name: topicName }) + + // Wait for error callback to be invoked (queue polling timeout) + // Need to wait for: topic to become available (so topic polling succeeds) + // + queue polling timeout (200ms) + some buffer + await vi.waitFor( + () => { + expect(errorCallbackInvoked).toBe(true) + }, + { timeout: 3000, interval: 50 }, + ) + + expect(callbackError).toBeDefined() + expect(callbackError?.message).toContain('Timeout') + expect(callbackContext).toEqual({ isFinal: true }) + }) + + it('invokes onResourcesError callback when background topic polling times out', async () => { + // Neither topic nor queue exist initially + // Topic never appears - will timeout + + let errorCallbackInvoked = false + let callbackError: Error | undefined + let callbackContext: { isFinal: boolean } | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout so it fails quickly + nonBlocking: true, + }, + }, + undefined, + undefined, + { + onResourcesError: (error, context) => { + errorCallbackInvoked = true + callbackError = error + callbackContext = context + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + + // Don't create topic - let it timeout + + // Wait for error callback to be invoked (topic polling timeout) + await vi.waitFor( + () => { + expect(errorCallbackInvoked).toBe(true) + }, + { timeout: 2000, interval: 50 }, + ) + + expect(callbackError).toBeDefined() + expect(callbackError?.message).toContain('Timeout') + expect(callbackContext).toEqual({ isFinal: true }) + }) + }) + + describe('when subscriptionArn is not provided (subscription creation mode)', () => { + it('waits for topic to become available before creating subscription', async () => { + // No topic and no queue exist initially + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + }, + }, + creationConfig: { + queue: { QueueName: queueName }, + }, + }) + + // Start init in background + const initPromise = consumer.init() + + // Wait a bit then create the topic (queue will be created by assertQueue in subscribeToTopic) + await setTimeout(300) + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + // Init should complete successfully after topic is created + await initPromise + + expect(consumer.subscriptionProps.topicArn).toBe(topicArn) + expect(consumer.subscriptionProps.queueName).toBe(queueName) + expect(consumer.subscriptionProps.subscriptionArn).toBeDefined() + }) + + it('throws StartupResourcePollingTimeoutError when topic never appears', async () => { + // No topic exists + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout + }, + }, + creationConfig: { + queue: { QueueName: queueName }, + }, + }) + + // Should throw timeout error since topic never appears + await expect(consumer.init()).rejects.toThrow(StartupResourcePollingTimeoutError) + }) + + it('returns immediately in non-blocking mode when topic not available', async () => { + // Test at the initter level since consumer.init() sets internal state + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + expect(result.subscriptionArn).toBe('') + expect(result.queueName).toBe(queueName) + }) + + it('creates subscription in background when topic becomes available in non-blocking mode', async () => { + // No topic exists initially + + let callbackInvoked = false + let callbackTopicArn: string | undefined + let callbackQueueUrl: string | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + { + onResourcesReady: ({ topicArn, queueUrl: url }) => { + callbackInvoked = true + callbackTopicArn = topicArn + callbackQueueUrl = url + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + expect(result.subscriptionArn).toBe('') + + // Create topic after init returns + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + // Wait for callback to be invoked (subscription created in background) + await vi.waitFor( + () => { + expect(callbackInvoked).toBe(true) + }, + { timeout: 3000, interval: 50 }, + ) + + expect(callbackTopicArn).toBe(topicArn) + expect(callbackQueueUrl).toContain(queueName) + }) + + it('creates subscription immediately when topic is available in non-blocking mode', async () => { + // Topic exists before init + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + ) + + // Should return immediately with resourcesReady: true since topic was immediately available + expect(result.resourcesReady).toBe(true) + expect(result.subscriptionArn).toBeDefined() + expect(result.subscriptionArn).not.toBe('') + expect(result.topicArn).toBe(topicArn) + expect(result.queueName).toBe(queueName) + }) + + it('invokes onResourcesError callback when topic polling times out in non-blocking mode', async () => { + // No topic exists - will timeout + + let errorCallbackInvoked = false + let callbackError: Error | undefined + let callbackContext: { isFinal: boolean } | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout so it fails quickly + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + { + onResourcesError: (error, context) => { + errorCallbackInvoked = true + callbackError = error + callbackContext = context + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + + // Wait for error callback to be invoked (topic polling timeout) + await vi.waitFor( + () => { + expect(errorCallbackInvoked).toBe(true) + }, + { timeout: 2000, interval: 50 }, + ) + + expect(callbackError).toBeDefined() + expect(callbackError?.message).toContain('Timeout') + expect(callbackContext).toEqual({ isFinal: true }) + }) }) describe('when startupResourcePolling is disabled', () => {