diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index 11e4a5d7..9e1113e1 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -55,6 +55,20 @@ export abstract class AbstractSnsSqsConsumer< private readonly snsClient: SNSClient private readonly stsClient: STSClient + /** + * Tracks whether resources (SNS topic, SQS queue, subscription) are ready. + * In non-blocking polling mode, this may be false initially and become true + * when the onResourcesReady callback fires. + */ + private resourcesReady: boolean = false + + /** + * Tracks whether start() has been called but consumers couldn't be started + * because resources weren't ready yet. When resources become ready and this + * is true, consumers will be started automatically. + */ + private startRequested: boolean = false + // @ts-expect-error protected topicArn: string // @ts-expect-error @@ -112,14 +126,34 @@ export abstract class AbstractSnsSqsConsumer< this.queueUrl = result.queueUrl this.subscriptionArn = result.subscriptionArn this.queueName = result.queueName + this.resourcesReady = true + // Initialize DLQ now that resources are ready (this is mutually exclusive // with the synchronous initDeadLetterQueue call below) - this.initDeadLetterQueue().catch((err) => { - this.logger.error({ - message: 'Failed to initialize dead letter queue after resources became ready', - error: err, + this.initDeadLetterQueue() + .catch((err) => { + this.logger.error({ + message: 'Failed to initialize dead letter queue after resources became ready', + error: err, + }) + }) + .then(() => { + // If start() was called while resources weren't ready, start consumers now + if (this.startRequested) { + this.logger.info({ + message: 'Resources now ready, starting consumers', + queueName: this.queueName, + topicArn: this.topicArn, + }) + return this.startConsumers() + } + }) + .catch((err) => { + this.logger.error({ + message: 'Failed to start consumers after resources became ready', + error: err, + }) }) - }) }, }, ) @@ -127,6 +161,7 @@ export abstract class AbstractSnsSqsConsumer< // Always assign topicArn and queueName (always valid in both blocking and non-blocking modes) this.topicArn = initSnsSqsResult.topicArn this.queueName = initSnsSqsResult.queueName + this.resourcesReady = initSnsSqsResult.resourcesReady // Only assign queueUrl and subscriptionArn if resources are ready, // or if they have valid values (non-blocking mode with locatorConfig provides valid values) @@ -141,6 +176,31 @@ export abstract class AbstractSnsSqsConsumer< } } + /** + * Starts the consumer. In non-blocking polling mode, if resources aren't ready yet, + * this method will return immediately and consumers will start automatically once + * resources become available. + */ + public override async start() { + await this.init() + + if (!this.resourcesReady) { + // Resources not ready yet (non-blocking polling mode), mark that start was requested. + // Consumers will be started automatically when onResourcesReady callback fires. + this.startRequested = true + this.logger.info({ + message: + 'Start requested but resources not ready yet, will start when resources become available', + queueName: this.queueName, + topicArn: this.topicArn, + }) + return + } + + // Resources are ready, start consumers immediately + await this.startConsumers() + } + protected override resolveMessage(message: SQSMessage) { const result = readSnsMessage(message, this.errorResolver) if (result.result) { diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index af0f7d6b..c0c825f6 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -288,10 +288,10 @@ export async function initSnsSqs( ) } - // biome-ignore lint/style/noNonNullAssertion: Validated above that at least one of topicArn or topicName is present + // topicName is guaranteed to be defined here because we validated above that at least one of topicArn or topicName is present const topicArnToWaitFor = topicResolutionOptions.topicArn ?? - (await buildTopicArn(stsClient, topicResolutionOptions.topicName!)) + (await buildTopicArn(stsClient, topicResolutionOptions.topicName as string)) return await createSubscriptionWithPolling( sqsClient, @@ -351,7 +351,8 @@ export async function initSnsSqs( extraParams?.onResourcesReady?.({ topicArn: subscriptionTopicArn, queueUrl, - subscriptionArn: locatorConfig.subscriptionArn!, + // subscriptionArn is guaranteed to be defined here because we're in the branch where locatorConfig.subscriptionArn exists + subscriptionArn: locatorConfig.subscriptionArn as string, queueName, }) } diff --git a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts index 2c416cb8..4bc82df5 100644 --- a/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts +++ b/packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts @@ -220,6 +220,83 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(consumer.subscriptionProps.topicArn).toBe(topicArn) expect(consumer.subscriptionProps.queueName).toBe(queueName) }) + + it('start() waits for resources and starts consumers (blocking mode)', async () => { + // Create queue first, but not the topic + await assertQueue(sqsClient, { QueueName: queueName }) + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicName, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 5000, + }, + }, + creationConfig: { + queue: { QueueName: queueName }, + }, + }) + + // Consumer should not be running before start + expect(consumer.isRunning).toBe(false) + + // Start in background (blocking mode waits for resources) + const startPromise = consumer.start() + + // Wait a bit then create the topic + await setTimeout(200) + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + // start() should complete successfully after topic appears + await startPromise + + expect(consumer.subscriptionProps.topicArn).toBe(topicArn) + expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl) + // Consumer should be running after start completes + expect(consumer.isRunning).toBe(true) + + // Clean up + await consumer.close() + }) + + it('start() works immediately when resources already exist (blocking mode)', async () => { + // Create both resources before starting + await assertQueue(sqsClient, { QueueName: queueName }) + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicArn, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + }, + }, + }) + + // Consumer should not be running before start + expect(consumer.isRunning).toBe(false) + + // start() should complete immediately since resources exist + await consumer.start() + + expect(consumer.subscriptionProps.topicArn).toBe(topicArn) + expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl) + // Consumer should be running after start completes + expect(consumer.isRunning).toBe(true) + + // Clean up + await consumer.close() + }) }) describe('when nonBlocking mode is enabled', () => { @@ -465,6 +542,91 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => { expect(callbackError?.message).toContain('Timeout') expect(callbackContext).toEqual({ isFinal: true }) }) + + it('start() returns immediately when topic is not available and starts consumers when resources become ready', async () => { + // Create queue but not topic + await assertQueue(sqsClient, { QueueName: queueName }) + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicName, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 50, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + creationConfig: { + queue: { QueueName: queueName }, + }, + }) + + // Consumer should not be running before start + expect(consumer.isRunning).toBe(false) + + // start() should return immediately even though topic doesn't exist + await consumer.start() + + // queueName should be set but consumer should NOT be running yet (resources not ready) + expect(consumer.subscriptionProps.queueName).toBe(queueName) + expect(consumer.isRunning).toBe(false) + + // Create topic after start returns + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + // Wait for consumer to start running (happens when resources become ready) + await vi.waitFor( + () => { + expect(consumer.isRunning).toBe(true) + }, + { timeout: 3000, interval: 50 }, + ) + + // Verify topicArn was updated + expect(consumer.subscriptionProps.topicArn).toBe(topicArn) + + // Clean up + await consumer.close() + }) + + it('start() works immediately when resources are already available', async () => { + // Create both queue and topic before starting + await assertQueue(sqsClient, { QueueName: queueName }) + const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName }) + + const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, { + locatorConfig: { + topicArn, + queueUrl, + subscriptionArn: + 'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395', + startupResourcePolling: { + enabled: true, + pollingIntervalMs: 100, + timeoutMs: 5000, + nonBlocking: true, + }, + }, + }) + + // Consumer should not be running before start + expect(consumer.isRunning).toBe(false) + + // start() should work immediately since resources exist + await consumer.start() + + expect(consumer.subscriptionProps.topicArn).toBe(topicArn) + expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl) + // Consumer should be running after start completes (resources were available) + expect(consumer.isRunning).toBe(true) + + // Clean up + await consumer.close() + }) }) describe('when subscriptionArn is not provided (subscription creation mode)', () => { diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 42b7b8d7..0437a445 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -200,6 +200,14 @@ export abstract class AbstractSqsConsumer< public readonly _messageSchemaContainer: MessageSchemaContainer + /** + * Returns true if the consumer has active SQS polling consumers running. + * Useful for checking if the consumer has started processing messages. + */ + public get isRunning(): boolean { + return this.consumers.length > 0 + } + protected constructor( dependencies: SQSConsumerDependencies, options: ConsumerOptionsType, @@ -269,6 +277,15 @@ export abstract class AbstractSqsConsumer< public async start() { await this.init() + await this.startConsumers() + } + + /** + * Creates and starts the SQS consumers. + * This method is separated from start() to allow subclasses to defer consumer creation + * until resources are ready (e.g., in non-blocking polling mode). + */ + protected async startConsumers() { await this.stopExistingConsumers() const visibilityTimeout = await this.getQueueVisibilityTimeout() diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 851d764f..7c75a2e5 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -992,8 +992,15 @@ describe('SqsPermissionConsumer', () => { return Promise.resolve({ error: 'retryLater' }) }, }) + + // Consumer should not be running before start + expect(consumer.isRunning).toBe(false) + await consumer.start() + // Consumer should be running after start + expect(consumer.isRunning).toBe(true) + const publisher = new SqsPermissionPublisher(diContainer.cradle, { locatorConfig: { queueUrl: consumer.queueProps.url }, })