From bad5fb1e78bc1ebcd451414f3017228ae3d935df Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 15:26:31 +0200 Subject: [PATCH 1/7] Correctly handle scenario when subscriptionArn is not passed (which, admittedly, is the usual case) --- packages/sns/lib/utils/snsInitter.ts | 152 ++++++++++++------ ...ionConsumer.startupResourcePolling.spec.ts | 87 ++++++++++ 2 files changed, 194 insertions(+), 45 deletions(-) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 91a43909..a96662a3 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -22,6 +22,62 @@ import { subscribeToTopic } from './snsSubscriber.ts' import { assertTopic, deleteSubscription, deleteTopic, getTopicAttributes } from './snsUtils.ts' import { buildTopicArn } from './stsUtils.ts' +// Helper type for topic polling result +type TopicPollingResult = { + topicResult: unknown | undefined + topicArn: string +} + +// Helper function to poll for SNS topic availability +async function pollForTopic( + snsClient: SNSClient, + topicArn: string, + startupResourcePolling: NonNullable, + extraParams?: ExtraParams, + onResourceAvailable?: () => void, +): Promise { + const topicResult = await waitForResource({ + config: startupResourcePolling, + resourceName: `SNS topic ${topicArn}`, + logger: extraParams?.logger, + errorReporter: extraParams?.errorReporter, + onResourceAvailable, + checkFn: async () => { + const result = await getTopicAttributes(snsClient, topicArn) + if (result.error === 'not_found') { + return { isAvailable: false } + } + return { isAvailable: true, result: result.result } + }, + }) + + return { topicResult, topicArn } +} + +// Helper function to poll for SQS queue availability +async function pollForQueue( + sqsClient: SQSClient, + queueUrl: string, + startupResourcePolling: NonNullable, + extraParams?: ExtraParams, + onResourceAvailable?: () => void, +): Promise { + return await waitForResource({ + config: startupResourcePolling, + resourceName: `SQS queue ${queueUrl}`, + logger: extraParams?.logger, + errorReporter: extraParams?.errorReporter, + onResourceAvailable, + checkFn: async () => { + const result = await getQueueAttributes(sqsClient, queueUrl) + if (result.error === 'not_found') { + return { isAvailable: false } + } + return { isAvailable: true, result: result.result } + }, + }) +} + export type InitSnsSqsExtraParams = ExtraParams & { /** * Callback invoked when resources become available in non-blocking mode. @@ -81,6 +137,39 @@ export async function initSnsSqs( ...creationConfig.topic, } + // If startup resource polling is enabled and we're not creating the topic (just locating it), + // we should poll for the topic to exist before attempting to subscribe + const startupResourcePolling = locatorConfig?.startupResourcePolling + if ( + isStartupResourcePollingEnabled(startupResourcePolling) && + !isCreateTopicCommand(topicResolutionOptions) + ) { + const topicArnToWaitFor = + topicResolutionOptions.topicArn ?? + (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) + + const nonBlocking = startupResourcePolling.nonBlocking === true + + const { topicResult } = await pollForTopic( + snsClient, + topicArnToWaitFor, + startupResourcePolling, + extraParams, + ) + + // If non-blocking and topic wasn't immediately available, return early with resourcesReady: false + // The caller will need to handle this case (e.g., retry later or use onResourcesReady callback) + if (nonBlocking && topicResult === undefined) { + return { + subscriptionArn: '', + topicArn: topicArnToWaitFor, + queueName: creationConfig.queue.QueueName, + queueUrl: '', + resourcesReady: false, + } + } + } + const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic( sqsClient, snsClient, @@ -133,23 +222,16 @@ export async function initSnsSqs( } // Wait for topic to become available - const topicResult = await waitForResource({ - config: startupResourcePolling, - resourceName: `SNS topic ${subscriptionTopicArn}`, - logger: extraParams?.logger, - errorReporter: extraParams?.errorReporter, - onResourceAvailable: () => { + const { topicResult } = await pollForTopic( + snsClient, + subscriptionTopicArn, + startupResourcePolling, + extraParams, + () => { topicAvailable = true notifyIfBothReady() }, - checkFn: async () => { - const result = await getTopicAttributes(snsClient, subscriptionTopicArn) - if (result.error === 'not_found') { - return { isAvailable: false } - } - return { isAvailable: true, result: result.result } - }, - }) + ) // If topic was immediately available, mark it if (topicResult !== undefined) { @@ -164,25 +246,12 @@ export async function initSnsSqs( const queueName = splitUrl[splitUrl.length - 1]! // Also start polling for queue in background so we can notify when both are ready - waitForResource({ - config: startupResourcePolling, - resourceName: `SQS queue ${queueUrl}`, - logger: extraParams?.logger, - errorReporter: extraParams?.errorReporter, - onResourceAvailable: () => { - queueAvailable = true - notifyIfBothReady() - }, - checkFn: async () => { - const result = await getQueueAttributes(sqsClient, queueUrl) - if (result.error === 'not_found') { - return { isAvailable: false } - } - return { isAvailable: true, result: result.result } - }, + pollForQueue(sqsClient, queueUrl, startupResourcePolling, extraParams, () => { + queueAvailable = true + notifyIfBothReady() }) .then((result) => { - // If queue was immediately available, waitForResource returns the result + // If queue was immediately available, pollForQueue returns the result // but doesn't call onResourceAvailable, so we handle it here if (result !== undefined) { queueAvailable = true @@ -207,23 +276,16 @@ export async function initSnsSqs( } // Wait for queue to become available - const queueResult = await waitForResource({ - config: startupResourcePolling, - resourceName: `SQS queue ${queueUrl}`, - logger: extraParams?.logger, - errorReporter: extraParams?.errorReporter, - onResourceAvailable: () => { + const queueResult = await pollForQueue( + sqsClient, + queueUrl, + startupResourcePolling, + extraParams, + () => { queueAvailable = true notifyIfBothReady() }, - checkFn: async () => { - const result = await getQueueAttributes(sqsClient, queueUrl) - if (result.error === 'not_found') { - return { isAvailable: false } - } - return { isAvailable: true, result: result.result } - }, - }) + ) // If queue was immediately available, mark it if (queueResult !== undefined) { diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts index b47cc412..92dc3f0a 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts @@ -358,6 +358,93 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { }) }) + describe('when subscriptionArn is not provided (subscription creation mode)', () => { + it('waits for topic to become available before creating subscription', async () => { + // No topic and no queue exist initially + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + }, + }, + creationConfig: { + queue: { QueueName: queueName }, + }, + }) + + // Start init in background + const initPromise = consumer.init() + + // Wait a bit then create the topic (queue will be created by assertQueue in subscribeToTopic) + await setTimeout(300) + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + // Init should complete successfully after topic is created + await initPromise + + expect(consumer.subscriptionProps.topicArn).toBe(topicArn) + expect(consumer.subscriptionProps.queueName).toBe(queueName) + expect(consumer.subscriptionProps.subscriptionArn).toBeDefined() + }) + + it('throws StartupResourcePollingTimeoutError when topic never appears', async () => { + // No topic exists + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout + }, + }, + creationConfig: { + queue: { QueueName: queueName }, + }, + }) + + // Should throw timeout error since topic never appears + await expect(consumer.init()).rejects.toThrow(StartupResourcePollingTimeoutError) + }) + + it('returns immediately in non-blocking mode when topic not available', async () => { + // Test at the initter level since consumer.init() sets internal state + const { initSnsSqs } = await import('../../lib/utils/snsInitter.ts') + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + expect(result.subscriptionArn).toBe('') + expect(result.queueName).toBe(queueName) + }) + }) + describe('when startupResourcePolling is disabled', () => { it('throws immediately when topic does not exist', async () => { // Create queue but not topic From 63d7c8ebb40523fdc5fb36e4191585f52d28a332 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 15:35:33 +0200 Subject: [PATCH 2/7] Fixes and refactorings --- packages/sns/lib/utils/snsInitter.ts | 155 ++++++++++++++---- ...ionConsumer.startupResourcePolling.spec.ts | 56 ++++++- 2 files changed, 180 insertions(+), 31 deletions(-) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index a96662a3..e9f69cfa 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -54,6 +54,122 @@ async function pollForTopic( return { topicResult, topicArn } } +// Helper function to create subscription +async function createSubscription( + sqsClient: SQSClient, + snsClient: SNSClient, + stsClient: STSClient, + creationConfig: SNSCreationConfig & SQSCreationConfig, + topicResolutionOptions: TopicResolutionOptions, + subscriptionConfig: SNSSubscriptionOptions, + extraParams?: ExtraParams, +) { + const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic( + sqsClient, + snsClient, + stsClient, + creationConfig.queue, + topicResolutionOptions, + subscriptionConfig, + { + updateAttributesIfExists: creationConfig.updateAttributesIfExists, + queueUrlsWithSubscribePermissionsPrefix: + creationConfig.queueUrlsWithSubscribePermissionsPrefix, + allowedSourceOwner: creationConfig.allowedSourceOwner, + topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix, + logger: extraParams?.logger, + forceTagUpdate: creationConfig.forceTagUpdate, + }, + ) + if (!subscriptionArn) { + throw new Error('Failed to subscribe') + } + return { subscriptionArn, topicArn, queueUrl } +} + +// Helper to handle subscription creation with optional topic polling (blocking and non-blocking) +async function createSubscriptionWithPolling( + sqsClient: SQSClient, + snsClient: SNSClient, + stsClient: STSClient, + creationConfig: SNSCreationConfig & SQSCreationConfig, + topicResolutionOptions: TopicResolutionOptions, + subscriptionConfig: SNSSubscriptionOptions, + topicArn: string, + startupResourcePolling: NonNullable, + extraParams?: InitSnsSqsExtraParams, +): Promise<{ + subscriptionArn: string + topicArn: string + queueUrl: string + queueName: string + resourcesReady: boolean +}> { + const nonBlocking = startupResourcePolling.nonBlocking === true + const queueName = creationConfig.queue.QueueName! + + const onTopicReady = async () => { + try { + const result = await createSubscription( + sqsClient, + snsClient, + stsClient, + creationConfig, + topicResolutionOptions, + subscriptionConfig, + extraParams, + ) + extraParams?.onResourcesReady?.({ + topicArn: result.topicArn, + queueUrl: result.queueUrl, + }) + } catch (error) { + extraParams?.logger?.error({ + message: 'Background subscription creation failed', + topicArn, + error, + }) + } + } + + const { topicResult } = await pollForTopic( + snsClient, + topicArn, + startupResourcePolling, + extraParams, + nonBlocking ? onTopicReady : undefined, + ) + + // Non-blocking: return early if topic wasn't immediately available + if (nonBlocking && topicResult === undefined) { + return { + subscriptionArn: '', + topicArn, + queueName, + queueUrl: '', + resourcesReady: false, + } + } + + // Blocking: topic is now available, create subscription + const result = await createSubscription( + sqsClient, + snsClient, + stsClient, + creationConfig, + topicResolutionOptions, + subscriptionConfig, + extraParams, + ) + return { + subscriptionArn: result.subscriptionArn, + topicArn: result.topicArn, + queueName, + queueUrl: result.queueUrl, + resourcesReady: true, + } +} + // Helper function to poll for SQS queue availability async function pollForQueue( sqsClient: SQSClient, @@ -148,48 +264,29 @@ export async function initSnsSqs( topicResolutionOptions.topicArn ?? (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) - const nonBlocking = startupResourcePolling.nonBlocking === true - - const { topicResult } = await pollForTopic( + return await createSubscriptionWithPolling( + sqsClient, snsClient, + stsClient, + creationConfig, + topicResolutionOptions, + subscriptionConfig, topicArnToWaitFor, startupResourcePolling, extraParams, ) - - // If non-blocking and topic wasn't immediately available, return early with resourcesReady: false - // The caller will need to handle this case (e.g., retry later or use onResourcesReady callback) - if (nonBlocking && topicResult === undefined) { - return { - subscriptionArn: '', - topicArn: topicArnToWaitFor, - queueName: creationConfig.queue.QueueName, - queueUrl: '', - resourcesReady: false, - } - } } - const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic( + // No polling needed - create subscription immediately + const { subscriptionArn, topicArn, queueUrl } = await createSubscription( sqsClient, snsClient, stsClient, - creationConfig.queue, + creationConfig, topicResolutionOptions, subscriptionConfig, - { - updateAttributesIfExists: creationConfig.updateAttributesIfExists, - queueUrlsWithSubscribePermissionsPrefix: - creationConfig.queueUrlsWithSubscribePermissionsPrefix, - allowedSourceOwner: creationConfig.allowedSourceOwner, - topicArnsWithPublishPermissionsPrefix: creationConfig.topicArnsWithPublishPermissionsPrefix, - logger: extraParams?.logger, - forceTagUpdate: creationConfig.forceTagUpdate, - }, + extraParams, ) - if (!subscriptionArn) { - throw new Error('Failed to subscribe') - } return { subscriptionArn, topicArn, diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts index 92dc3f0a..cb9c75c5 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts @@ -15,6 +15,7 @@ import { type SNSSQSConsumerDependencies, type SNSSQSConsumerOptions, } from '../../lib/sns/AbstractSnsSqsConsumer.ts' +import { initSnsSqs } from '../../lib/utils/snsInitter.ts' import { assertTopic, deleteTopic } from '../../lib/utils/snsUtils.ts' import type { Dependencies } from '../utils/testContext.ts' import { registerDependencies } from '../utils/testContext.ts' @@ -304,7 +305,6 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { it('invokes onResourcesReady callback when both resources become available in background', async () => { // Test at the initter level since AbstractSnsSqsConsumer doesn't expose the callback - const { initSnsSqs } = await import('../../lib/utils/snsInitter.ts') let callbackInvoked = false let callbackTopicArn: string | undefined @@ -416,7 +416,6 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { it('returns immediately in non-blocking mode when topic not available', async () => { // Test at the initter level since consumer.init() sets internal state - const { initSnsSqs } = await import('../../lib/utils/snsInitter.ts') const result = await initSnsSqs( sqsClient, @@ -443,6 +442,59 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(result.subscriptionArn).toBe('') expect(result.queueName).toBe(queueName) }) + + it('creates subscription in background when topic becomes available in non-blocking mode', async () => { + // No topic exists initially + + let callbackInvoked = false + let callbackTopicArn: string | undefined + let callbackQueueUrl: string | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + { + onResourcesReady: ({ topicArn, queueUrl: url }) => { + callbackInvoked = true + callbackTopicArn = topicArn + callbackQueueUrl = url + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + expect(result.subscriptionArn).toBe('') + + // Create topic after init returns + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + // Wait for callback to be invoked (subscription created in background) + await vi.waitFor( + () => { + expect(callbackInvoked).toBe(true) + }, + { timeout: 3000, interval: 50 }, + ) + + expect(callbackTopicArn).toBe(topicArn) + expect(callbackQueueUrl).toContain(queueName) + }) }) describe('when startupResourcePolling is disabled', () => { From 341ec7dde5ee585cbc9b10d4fc3389930832da35 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 15:56:22 +0200 Subject: [PATCH 3/7] More tests, onError callback --- .../lib/utils/startupResourcePollingUtils.ts | 12 ++++- packages/sns/lib/utils/snsInitter.ts | 53 ++++++++++++------- ...ionConsumer.startupResourcePolling.spec.ts | 53 +++++++++++++++++++ 3 files changed, 98 insertions(+), 20 deletions(-) diff --git a/packages/core/lib/utils/startupResourcePollingUtils.ts b/packages/core/lib/utils/startupResourcePollingUtils.ts index cd37764f..ce48485a 100644 --- a/packages/core/lib/utils/startupResourcePollingUtils.ts +++ b/packages/core/lib/utils/startupResourcePollingUtils.ts @@ -48,6 +48,13 @@ export type WaitForResourceOptions = { * Only used when config.nonBlocking is true and the resource was not immediately available. */ onResourceAvailable?: (result: T) => void + + /** + * Callback invoked when background polling fails in non-blocking mode. + * This can happen due to polling timeout or unexpected errors during polling. + * Only used when config.nonBlocking is true. + */ + onError?: (error: Error) => void } export class StartupResourcePollingTimeoutError extends Error { @@ -294,17 +301,20 @@ export async function waitForResource( }) // Fire and forget - start polling in background + const { onError } = options setTimeout(pollingIntervalMs).then(() => { pollForResource(options, pollingIntervalMs, hasTimeout, timeoutMs, throwOnTimeout, 1) .then((result) => { onResourceAvailable?.(result) }) - .catch((error) => { + .catch((err) => { + const error = err instanceof Error ? err : new Error(String(err)) logger?.error({ message: `Background polling for resource "${resourceName}" failed`, resourceName, error, }) + onError?.(error) }) }) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index e9f69cfa..ae2a091f 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -35,6 +35,7 @@ async function pollForTopic( startupResourcePolling: NonNullable, extraParams?: ExtraParams, onResourceAvailable?: () => void, + onError?: (error: Error) => void, ): Promise { const topicResult = await waitForResource({ config: startupResourcePolling, @@ -42,6 +43,7 @@ async function pollForTopic( logger: extraParams?.logger, errorReporter: extraParams?.errorReporter, onResourceAvailable, + onError, checkFn: async () => { const result = await getTopicAttributes(snsClient, topicArn) if (result.error === 'not_found') { @@ -123,12 +125,14 @@ async function createSubscriptionWithPolling( topicArn: result.topicArn, queueUrl: result.queueUrl, }) - } catch (error) { + } catch (err) { + const error = err instanceof Error ? err : new Error(String(err)) extraParams?.logger?.error({ message: 'Background subscription creation failed', topicArn, error, }) + extraParams?.onResourcesError?.(error) } } @@ -177,6 +181,7 @@ async function pollForQueue( startupResourcePolling: NonNullable, extraParams?: ExtraParams, onResourceAvailable?: () => void, + onError?: (error: Error) => void, ): Promise { return await waitForResource({ config: startupResourcePolling, @@ -184,6 +189,7 @@ async function pollForQueue( logger: extraParams?.logger, errorReporter: extraParams?.errorReporter, onResourceAvailable, + onError, checkFn: async () => { const result = await getQueueAttributes(sqsClient, queueUrl) if (result.error === 'not_found') { @@ -200,6 +206,11 @@ export type InitSnsSqsExtraParams = ExtraParams & { * Only called when startupResourcePolling.nonBlocking is true and resources were not immediately available. */ onResourcesReady?: (result: { topicArn: string; queueUrl: string }) => void + /** + * Callback invoked when background resource polling or subscription creation fails in non-blocking mode. + * This can happen due to polling timeout or subscription creation failure. + */ + onResourcesError?: (error: Error) => void } export type InitSnsExtraParams = ExtraParams & { @@ -328,6 +339,9 @@ export async function initSnsSqs( topicAvailable = true notifyIfBothReady() }, + (error) => { + extraParams?.onResourcesError?.(error) + }, ) // If topic was immediately available, mark it @@ -343,25 +357,26 @@ export async function initSnsSqs( const queueName = splitUrl[splitUrl.length - 1]! // Also start polling for queue in background so we can notify when both are ready - pollForQueue(sqsClient, queueUrl, startupResourcePolling, extraParams, () => { - queueAvailable = true - notifyIfBothReady() + pollForQueue( + sqsClient, + queueUrl, + startupResourcePolling, + extraParams, + () => { + queueAvailable = true + notifyIfBothReady() + }, + (error) => { + extraParams?.onResourcesError?.(error) + }, + ).then((result) => { + // If queue was immediately available, pollForQueue returns the result + // but doesn't call onResourceAvailable, so we handle it here + if (result !== undefined) { + queueAvailable = true + notifyIfBothReady() + } }) - .then((result) => { - // If queue was immediately available, pollForQueue returns the result - // but doesn't call onResourceAvailable, so we handle it here - if (result !== undefined) { - queueAvailable = true - notifyIfBothReady() - } - }) - .catch((error) => { - extraParams?.logger?.error({ - message: 'Background polling for SQS queue failed', - queueUrl, - error, - }) - }) return { subscriptionArn: locatorConfig.subscriptionArn, diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts index cb9c75c5..41e2e299 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts @@ -356,6 +356,59 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(callbackTopicArn).toBeDefined() expect(callbackQueueUrl).toBe(queueUrl) }) + + it('invokes onResourcesError callback when background queue polling times out', async () => { + // Neither topic nor queue exist initially + // Topic will be created after init returns, queue never appears + + let errorCallbackInvoked = false + let callbackError: Error | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout so it fails quickly + nonBlocking: true, + }, + }, + undefined, + undefined, + { + onResourcesError: (error) => { + errorCallbackInvoked = true + callbackError = error + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + + // Create topic so topic polling succeeds, but NOT queue + await assertTopic(snsClient, stsClient, { Name: topicName }) + + // Wait for error callback to be invoked (queue polling timeout) + // Need to wait for: topic to become available (so topic polling succeeds) + // + queue polling timeout (200ms) + some buffer + await vi.waitFor( + () => { + expect(errorCallbackInvoked).toBe(true) + }, + { timeout: 3000, interval: 50 }, + ) + + expect(callbackError).toBeDefined() + expect(callbackError?.message).toContain('Timeout') + }) }) describe('when subscriptionArn is not provided (subscription creation mode)', () => { From 9fee5a08a69badb1bd86441d0c8ce14831098e0b Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:00:18 +0200 Subject: [PATCH 4/7] Cleanup --- .../lib/utils/startupResourcePollingUtils.ts | 4 +- packages/sns/README.md | 152 ++++++++++++++++++ packages/sns/lib/utils/snsInitter.ts | 4 +- 3 files changed, 156 insertions(+), 4 deletions(-) diff --git a/packages/core/lib/utils/startupResourcePollingUtils.ts b/packages/core/lib/utils/startupResourcePollingUtils.ts index ce48485a..f9b0e87b 100644 --- a/packages/core/lib/utils/startupResourcePollingUtils.ts +++ b/packages/core/lib/utils/startupResourcePollingUtils.ts @@ -1,5 +1,5 @@ import { setTimeout } from 'node:timers/promises' -import type { CommonLogger, ErrorReporter } from '@lokalise/node-core' +import { type CommonLogger, type ErrorReporter, isError } from '@lokalise/node-core' import { NO_TIMEOUT, type StartupResourcePollingConfig } from '../types/queueOptionsTypes.ts' const DEFAULT_POLLING_INTERVAL_MS = 5000 @@ -308,7 +308,7 @@ export async function waitForResource( onResourceAvailable?.(result) }) .catch((err) => { - const error = err instanceof Error ? err : new Error(String(err)) + const error = isError(err) ? err : new Error(String(err)) logger?.error({ message: `Background polling for resource "${resourceName}" failed`, resourceName, diff --git a/packages/sns/README.md b/packages/sns/README.md index b5eb82e2..947256ef 100644 --- a/packages/sns/README.md +++ b/packages/sns/README.md @@ -489,6 +489,158 @@ When using `locatorConfig`, you connect to an existing topic without creating it } ``` +### Startup Resource Polling + +When your SNS topic or SQS queue may not exist at startup (e.g., created by another service or infrastructure-as-code pipeline), you can enable polling to wait for resources to become available instead of failing immediately: + +```typescript +{ + locatorConfig: { + topicName: 'my-topic', + queueUrl: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue', + subscriptionArn: 'arn:aws:sns:us-east-1:123456789012:my-topic:uuid', + + // Enable startup resource polling + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, // Check every 5 seconds (default) + timeoutMs: 60000, // Timeout after 60 seconds + // timeoutMs: 'NO_TIMEOUT', // Or poll indefinitely + + // Optional: Non-blocking mode + nonBlocking: false, // Default: false (blocking) + }, + }, +} +``` + +#### Blocking Mode (Default) + +In blocking mode, `init()` or `start()` will wait until both the topic and queue are available: + +```typescript +const consumer = new MyConsumer(deps, { + locatorConfig: { + topicName: 'my-topic', + queueUrl: 'https://sqs...', + subscriptionArn: 'arn:aws:sns:...', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + }, + }, +}) + +// This will block until topic and queue are available (or timeout) +await consumer.start() +``` + +If the timeout is reached before resources are available, a `StartupResourcePollingTimeoutError` is thrown. + +#### Non-Blocking Mode + +In non-blocking mode, `init()` returns immediately even if resources aren't available. Background polling continues and you're notified via callbacks: + +```typescript +const consumer = new MyConsumer(deps, { + locatorConfig: { + topicName: 'my-topic', + queueUrl: 'https://sqs...', + subscriptionArn: 'arn:aws:sns:...', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + nonBlocking: true, // Return immediately + }, + }, +}) + +// Returns immediately, even if resources don't exist yet +await consumer.start() + +// Check if resources are ready +if (!consumer.resourcesReady) { + console.log('Resources not yet available, waiting in background...') +} +``` + +#### Callbacks for Non-Blocking Mode + +When using non-blocking mode, you can provide callbacks to be notified when resources become available or when polling fails: + +```typescript +// Using initSnsSqs directly for more control +import { initSnsSqs } from '@message-queue-toolkit/sns' + +const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName: 'my-topic', + queueUrl: 'https://sqs...', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + nonBlocking: true, + }, + }, + creationConfig, + subscriptionConfig, + { + // Called when all resources become available + onResourcesReady: ({ topicArn, queueUrl }) => { + console.log('Resources are now available!') + console.log('Topic ARN:', topicArn) + console.log('Queue URL:', queueUrl) + }, + + // Called if background polling fails (e.g., timeout) + onResourcesError: (error) => { + console.error('Failed to wait for resources:', error.message) + // Handle error - maybe alert, retry, or graceful degradation + }, + }, +) + +if (result.resourcesReady) { + console.log('Resources were immediately available') +} else { + console.log('Waiting for resources in background...') +} +``` + +#### Subscription Creation Mode + +When you want to **create** a subscription (no existing `subscriptionArn`), startup resource polling will wait for the topic to exist before attempting to subscribe: + +```typescript +const consumer = new MyConsumer(deps, { + locatorConfig: { + topicName: 'my-topic', // Topic created by another service + // No subscriptionArn - we'll create the subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 5000, + timeoutMs: 60000, + }, + }, + creationConfig: { + queue: { QueueName: 'my-consumer-queue' }, // Queue will be created + }, +}) + +// This will: +// 1. Poll until the topic exists +// 2. Create the SQS queue +// 3. Subscribe the queue to the topic +// 4. Start consuming +await consumer.start() +``` + ### Publisher Options ```typescript diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index ae2a091f..8a6c00bf 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -1,7 +1,7 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns' import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs' import type { STSClient } from '@aws-sdk/client-sts' -import type { Either } from '@lokalise/node-core' +import { type Either, isError } from '@lokalise/node-core' import type { DeletionConfig, ExtraParams } from '@message-queue-toolkit/core' import { isProduction, @@ -126,7 +126,7 @@ async function createSubscriptionWithPolling( queueUrl: result.queueUrl, }) } catch (err) { - const error = err instanceof Error ? err : new Error(String(err)) + const error = isError(err) ? err : new Error(String(err)) extraParams?.logger?.error({ message: 'Background subscription creation failed', topicArn, From 5ca8733d96ac88afd8da0050b4805780031f9df4 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:06:10 +0200 Subject: [PATCH 5/7] Cleanup --- packages/core/lib/index.ts | 2 ++ .../lib/utils/startupResourcePollingUtils.ts | 22 +++++++++++++++++-- packages/sns/README.md | 14 ++++++++---- packages/sns/lib/utils/snsInitter.ts | 19 ++++++++-------- ...ionConsumer.startupResourcePolling.spec.ts | 5 ++++- 5 files changed, 46 insertions(+), 16 deletions(-) diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index 406712da..b9d29a8d 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -107,6 +107,8 @@ export { type ParseMessageResult, parseMessage } from './utils/parseUtils.ts' export { objectToBuffer } from './utils/queueUtils.ts' export { isStartupResourcePollingEnabled, + type PollingErrorCallback, + type PollingErrorContext, type StartupResourcePollingCheckResult, StartupResourcePollingTimeoutError, type WaitForResourceOptions, diff --git a/packages/core/lib/utils/startupResourcePollingUtils.ts b/packages/core/lib/utils/startupResourcePollingUtils.ts index f9b0e87b..26734199 100644 --- a/packages/core/lib/utils/startupResourcePollingUtils.ts +++ b/packages/core/lib/utils/startupResourcePollingUtils.ts @@ -4,6 +4,22 @@ import { NO_TIMEOUT, type StartupResourcePollingConfig } from '../types/queueOpt const DEFAULT_POLLING_INTERVAL_MS = 5000 +/** + * Context passed to error callbacks indicating whether the error is final. + */ +export type PollingErrorContext = { + /** + * If true, the operation has stopped and will not retry. + * If false, this is a transient error and the operation will continue. + */ + isFinal: boolean +} + +/** + * Callback invoked when a polling operation fails. + */ +export type PollingErrorCallback = (error: Error, context: PollingErrorContext) => void + export type StartupResourcePollingCheckResult = | { isAvailable: true @@ -54,7 +70,7 @@ export type WaitForResourceOptions = { * This can happen due to polling timeout or unexpected errors during polling. * Only used when config.nonBlocking is true. */ - onError?: (error: Error) => void + onError?: PollingErrorCallback } export class StartupResourcePollingTimeoutError extends Error { @@ -314,7 +330,9 @@ export async function waitForResource( resourceName, error, }) - onError?.(error) + // isFinal: true because pollForResource only throws when it gives up + // (timeout with throwOnTimeout: true, or unexpected error) + onError?.(error, { isFinal: true }) }) }) diff --git a/packages/sns/README.md b/packages/sns/README.md index 947256ef..4165a7ed 100644 --- a/packages/sns/README.md +++ b/packages/sns/README.md @@ -598,10 +598,16 @@ const result = await initSnsSqs( console.log('Queue URL:', queueUrl) }, - // Called if background polling fails (e.g., timeout) - onResourcesError: (error) => { - console.error('Failed to wait for resources:', error.message) - // Handle error - maybe alert, retry, or graceful degradation + // Called if background polling fails (e.g., timeout or unexpected error) + onResourcesError: (error, context) => { + if (context.isFinal) { + // Polling has stopped and will not retry + console.error('Failed to wait for resources:', error.message) + // Handle error - maybe alert, retry, or graceful degradation + } else { + // Transient error, polling will continue + console.warn('Transient error while polling:', error.message) + } }, }, ) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 8a6c00bf..61b70042 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -2,7 +2,7 @@ import type { CreateTopicCommandInput, SNSClient } from '@aws-sdk/client-sns' import type { CreateQueueCommandInput, SQSClient } from '@aws-sdk/client-sqs' import type { STSClient } from '@aws-sdk/client-sts' import { type Either, isError } from '@lokalise/node-core' -import type { DeletionConfig, ExtraParams } from '@message-queue-toolkit/core' +import type { DeletionConfig, ExtraParams, PollingErrorCallback } from '@message-queue-toolkit/core' import { isProduction, isStartupResourcePollingEnabled, @@ -35,7 +35,7 @@ async function pollForTopic( startupResourcePolling: NonNullable, extraParams?: ExtraParams, onResourceAvailable?: () => void, - onError?: (error: Error) => void, + onError?: PollingErrorCallback, ): Promise { const topicResult = await waitForResource({ config: startupResourcePolling, @@ -132,7 +132,8 @@ async function createSubscriptionWithPolling( topicArn, error, }) - extraParams?.onResourcesError?.(error) + // Subscription creation failure is final - we don't retry + extraParams?.onResourcesError?.(error, { isFinal: true }) } } @@ -181,7 +182,7 @@ async function pollForQueue( startupResourcePolling: NonNullable, extraParams?: ExtraParams, onResourceAvailable?: () => void, - onError?: (error: Error) => void, + onError?: PollingErrorCallback, ): Promise { return await waitForResource({ config: startupResourcePolling, @@ -210,7 +211,7 @@ export type InitSnsSqsExtraParams = ExtraParams & { * Callback invoked when background resource polling or subscription creation fails in non-blocking mode. * This can happen due to polling timeout or subscription creation failure. */ - onResourcesError?: (error: Error) => void + onResourcesError?: PollingErrorCallback } export type InitSnsExtraParams = ExtraParams & { @@ -339,8 +340,8 @@ export async function initSnsSqs( topicAvailable = true notifyIfBothReady() }, - (error) => { - extraParams?.onResourcesError?.(error) + (error, context) => { + extraParams?.onResourcesError?.(error, context) }, ) @@ -366,8 +367,8 @@ export async function initSnsSqs( queueAvailable = true notifyIfBothReady() }, - (error) => { - extraParams?.onResourcesError?.(error) + (error, context) => { + extraParams?.onResourcesError?.(error, context) }, ).then((result) => { // If queue was immediately available, pollForQueue returns the result diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts index 41e2e299..853cf7e9 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts @@ -363,6 +363,7 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { let errorCallbackInvoked = false let callbackError: Error | undefined + let callbackContext: { isFinal: boolean } | undefined const result = await initSnsSqs( sqsClient, @@ -383,9 +384,10 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { undefined, undefined, { - onResourcesError: (error) => { + onResourcesError: (error, context) => { errorCallbackInvoked = true callbackError = error + callbackContext = context }, }, ) @@ -408,6 +410,7 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(callbackError).toBeDefined() expect(callbackError?.message).toContain('Timeout') + expect(callbackContext).toEqual({ isFinal: true }) }) }) From b6799ef2522ddbc928537824afe11cf60b7fe79f Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:10:11 +0200 Subject: [PATCH 6/7] Fixes --- packages/sns/lib/utils/snsInitter.ts | 1 + ...ionConsumer.startupResourcePolling.spec.ts | 135 ++++++++++++++++++ 2 files changed, 136 insertions(+) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 61b70042..4f489046 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -143,6 +143,7 @@ async function createSubscriptionWithPolling( startupResourcePolling, extraParams, nonBlocking ? onTopicReady : undefined, + nonBlocking ? extraParams?.onResourcesError : undefined, ) // Non-blocking: return early if topic wasn't immediately available diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts index 853cf7e9..2c416cb8 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts @@ -412,6 +412,59 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(callbackError?.message).toContain('Timeout') expect(callbackContext).toEqual({ isFinal: true }) }) + + it('invokes onResourcesError callback when background topic polling times out', async () => { + // Neither topic nor queue exist initially + // Topic never appears - will timeout + + let errorCallbackInvoked = false + let callbackError: Error | undefined + let callbackContext: { isFinal: boolean } | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout so it fails quickly + nonBlocking: true, + }, + }, + undefined, + undefined, + { + onResourcesError: (error, context) => { + errorCallbackInvoked = true + callbackError = error + callbackContext = context + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + + // Don't create topic - let it timeout + + // Wait for error callback to be invoked (topic polling timeout) + await vi.waitFor( + () => { + expect(errorCallbackInvoked).toBe(true) + }, + { timeout: 2000, interval: 50 }, + ) + + expect(callbackError).toBeDefined() + expect(callbackError?.message).toContain('Timeout') + expect(callbackContext).toEqual({ isFinal: true }) + }) }) describe('when subscriptionArn is not provided (subscription creation mode)', () => { @@ -551,6 +604,88 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(callbackTopicArn).toBe(topicArn) expect(callbackQueueUrl).toContain(queueName) }) + + it('creates subscription immediately when topic is available in non-blocking mode', async () => { + // Topic exists before init + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + ) + + // Should return immediately with resourcesReady: true since topic was immediately available + expect(result.resourcesReady).toBe(true) + expect(result.subscriptionArn).toBeDefined() + expect(result.subscriptionArn).not.toBe('') + expect(result.topicArn).toBe(topicArn) + expect(result.queueName).toBe(queueName) + }) + + it('invokes onResourcesError callback when topic polling times out in non-blocking mode', async () => { + // No topic exists - will timeout + + let errorCallbackInvoked = false + let callbackError: Error | undefined + let callbackContext: { isFinal: boolean } | undefined + + const result = await initSnsSqs( + sqsClient, + snsClient, + stsClient, + { + topicName, + // No subscriptionArn - will create subscription + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 200, // Short timeout so it fails quickly + nonBlocking: true, + }, + }, + { + queue: { QueueName: queueName }, + }, + { updateAttributesIfExists: false }, + { + onResourcesError: (error, context) => { + errorCallbackInvoked = true + callbackError = error + callbackContext = context + }, + }, + ) + + // Should return immediately with resourcesReady: false + expect(result.resourcesReady).toBe(false) + + // Wait for error callback to be invoked (topic polling timeout) + await vi.waitFor( + () => { + expect(errorCallbackInvoked).toBe(true) + }, + { timeout: 2000, interval: 50 }, + ) + + expect(callbackError).toBeDefined() + expect(callbackError?.message).toContain('Timeout') + expect(callbackContext).toEqual({ isFinal: true }) + }) }) describe('when startupResourcePolling is disabled', () => { From fa3d92863af352c289a3734e3d9055cbbc16cbcb Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:14:00 +0200 Subject: [PATCH 7/7] address comment --- packages/sns/lib/utils/snsInitter.ts | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 4f489046..9b8eae3f 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -371,14 +371,25 @@ export async function initSnsSqs( (error, context) => { extraParams?.onResourcesError?.(error, context) }, - ).then((result) => { - // If queue was immediately available, pollForQueue returns the result - // but doesn't call onResourceAvailable, so we handle it here - if (result !== undefined) { - queueAvailable = true - notifyIfBothReady() - } - }) + ) + .then((result) => { + // If queue was immediately available, pollForQueue returns the result + // but doesn't call onResourceAvailable, so we handle it here + if (result !== undefined) { + queueAvailable = true + notifyIfBothReady() + } + }) + .catch((err) => { + // Handle unexpected errors during background polling + const error = isError(err) ? err : new Error(String(err)) + extraParams?.logger?.error({ + message: 'Background queue polling failed unexpectedly', + queueUrl, + error, + }) + extraParams?.onResourcesError?.(error, { isFinal: true }) + }) return { subscriptionArn: locatorConfig.subscriptionArn,