Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 65 additions & 5 deletions packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ export abstract class AbstractSnsSqsConsumer<
private readonly snsClient: SNSClient
private readonly stsClient: STSClient

/**
* Tracks whether resources (SNS topic, SQS queue, subscription) are ready.
* In non-blocking polling mode, this may be false initially and become true
* when the onResourcesReady callback fires.
*/
private resourcesReady: boolean = false

/**
* Tracks whether start() has been called but consumers couldn't be started
* because resources weren't ready yet. When resources become ready and this
* is true, consumers will be started automatically.
*/
private startRequested: boolean = false

// @ts-expect-error
protected topicArn: string
// @ts-expect-error
Expand Down Expand Up @@ -112,21 +126,42 @@ export abstract class AbstractSnsSqsConsumer<
this.queueUrl = result.queueUrl
this.subscriptionArn = result.subscriptionArn
this.queueName = result.queueName
this.resourcesReady = true

// Initialize DLQ now that resources are ready (this is mutually exclusive
// with the synchronous initDeadLetterQueue call below)
this.initDeadLetterQueue().catch((err) => {
this.logger.error({
message: 'Failed to initialize dead letter queue after resources became ready',
error: err,
this.initDeadLetterQueue()
.catch((err) => {
this.logger.error({
message: 'Failed to initialize dead letter queue after resources became ready',
error: err,
})
})
.then(() => {
// If start() was called while resources weren't ready, start consumers now
if (this.startRequested) {
this.logger.info({
message: 'Resources now ready, starting consumers',
queueName: this.queueName,
topicArn: this.topicArn,
})
return this.startConsumers()
}
})
.catch((err) => {
this.logger.error({
message: 'Failed to start consumers after resources became ready',
error: err,
})
})
})
},
},
)

// Always assign topicArn and queueName (always valid in both blocking and non-blocking modes)
this.topicArn = initSnsSqsResult.topicArn
this.queueName = initSnsSqsResult.queueName
this.resourcesReady = initSnsSqsResult.resourcesReady

// Only assign queueUrl and subscriptionArn if resources are ready,
// or if they have valid values (non-blocking mode with locatorConfig provides valid values)
Expand All @@ -141,6 +176,31 @@ export abstract class AbstractSnsSqsConsumer<
}
}

/**
* Starts the consumer. In non-blocking polling mode, if resources aren't ready yet,
* this method will return immediately and consumers will start automatically once
* resources become available.
*/
public override async start() {
await this.init()

if (!this.resourcesReady) {
// Resources not ready yet (non-blocking polling mode), mark that start was requested.
// Consumers will be started automatically when onResourcesReady callback fires.
this.startRequested = true
this.logger.info({
message:
'Start requested but resources not ready yet, will start when resources become available',
queueName: this.queueName,
topicArn: this.topicArn,
})
return
}

// Resources are ready, start consumers immediately
await this.startConsumers()
}

protected override resolveMessage(message: SQSMessage) {
const result = readSnsMessage(message, this.errorResolver)
if (result.result) {
Expand Down
7 changes: 4 additions & 3 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ export async function initSnsSqs(
)
}

// biome-ignore lint/style/noNonNullAssertion: Validated above that at least one of topicArn or topicName is present
// topicName is guaranteed to be defined here because we validated above that at least one of topicArn or topicName is present
const topicArnToWaitFor =
topicResolutionOptions.topicArn ??
(await buildTopicArn(stsClient, topicResolutionOptions.topicName!))
(await buildTopicArn(stsClient, topicResolutionOptions.topicName as string))

return await createSubscriptionWithPolling(
sqsClient,
Expand Down Expand Up @@ -351,7 +351,8 @@ export async function initSnsSqs(
extraParams?.onResourcesReady?.({
topicArn: subscriptionTopicArn,
queueUrl,
subscriptionArn: locatorConfig.subscriptionArn!,
// subscriptionArn is guaranteed to be defined here because we're in the branch where locatorConfig.subscriptionArn exists
subscriptionArn: locatorConfig.subscriptionArn as string,
queueName,
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,83 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => {
expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
expect(consumer.subscriptionProps.queueName).toBe(queueName)
})

it('start() waits for resources and starts consumers (blocking mode)', async () => {
// Create queue first, but not the topic
await assertQueue(sqsClient, { QueueName: queueName })

const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
locatorConfig: {
topicName,
queueUrl,
subscriptionArn:
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 50,
timeoutMs: 5000,
},
},
creationConfig: {
queue: { QueueName: queueName },
},
})

// Consumer should not be running before start
expect(consumer.isRunning).toBe(false)

// Start in background (blocking mode waits for resources)
const startPromise = consumer.start()

// Wait a bit then create the topic
await setTimeout(200)
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })

// start() should complete successfully after topic appears
await startPromise

expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl)
// Consumer should be running after start completes
expect(consumer.isRunning).toBe(true)

// Clean up
await consumer.close()
})

it('start() works immediately when resources already exist (blocking mode)', async () => {
// Create both resources before starting
await assertQueue(sqsClient, { QueueName: queueName })
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })

const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
locatorConfig: {
topicArn,
queueUrl,
subscriptionArn:
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 100,
timeoutMs: 5000,
},
},
})

// Consumer should not be running before start
expect(consumer.isRunning).toBe(false)

// start() should complete immediately since resources exist
await consumer.start()

expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl)
// Consumer should be running after start completes
expect(consumer.isRunning).toBe(true)

// Clean up
await consumer.close()
})
})

describe('when nonBlocking mode is enabled', () => {
Expand Down Expand Up @@ -465,6 +542,91 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => {
expect(callbackError?.message).toContain('Timeout')
expect(callbackContext).toEqual({ isFinal: true })
})

it('start() returns immediately when topic is not available and starts consumers when resources become ready', async () => {
// Create queue but not topic
await assertQueue(sqsClient, { QueueName: queueName })

const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
locatorConfig: {
topicName,
queueUrl,
subscriptionArn:
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 50,
timeoutMs: 5000,
nonBlocking: true,
},
},
creationConfig: {
queue: { QueueName: queueName },
},
})

// Consumer should not be running before start
expect(consumer.isRunning).toBe(false)

// start() should return immediately even though topic doesn't exist
await consumer.start()

// queueName should be set but consumer should NOT be running yet (resources not ready)
expect(consumer.subscriptionProps.queueName).toBe(queueName)
expect(consumer.isRunning).toBe(false)

// Create topic after start returns
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })

// Wait for consumer to start running (happens when resources become ready)
await vi.waitFor(
() => {
expect(consumer.isRunning).toBe(true)
},
{ timeout: 3000, interval: 50 },
)

// Verify topicArn was updated
expect(consumer.subscriptionProps.topicArn).toBe(topicArn)

// Clean up
await consumer.close()
})

it('start() works immediately when resources are already available', async () => {
// Create both queue and topic before starting
await assertQueue(sqsClient, { QueueName: queueName })
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })

const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
locatorConfig: {
topicArn,
queueUrl,
subscriptionArn:
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 100,
timeoutMs: 5000,
nonBlocking: true,
},
},
})

// Consumer should not be running before start
expect(consumer.isRunning).toBe(false)

// start() should work immediately since resources exist
await consumer.start()

expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl)
// Consumer should be running after start completes (resources were available)
expect(consumer.isRunning).toBe(true)

// Clean up
await consumer.close()
})
})

describe('when subscriptionArn is not provided (subscription creation mode)', () => {
Expand Down
17 changes: 17 additions & 0 deletions packages/sqs/lib/sqs/AbstractSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,14 @@ export abstract class AbstractSqsConsumer<

public readonly _messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>

/**
* Returns true if the consumer has active SQS polling consumers running.
* Useful for checking if the consumer has started processing messages.
*/
public get isRunning(): boolean {
return this.consumers.length > 0
}

protected constructor(
dependencies: SQSConsumerDependencies,
options: ConsumerOptionsType,
Expand Down Expand Up @@ -269,6 +277,15 @@ export abstract class AbstractSqsConsumer<

public async start() {
await this.init()
await this.startConsumers()
}

/**
* Creates and starts the SQS consumers.
* This method is separated from start() to allow subclasses to defer consumer creation
* until resources are ready (e.g., in non-blocking polling mode).
*/
protected async startConsumers() {
await this.stopExistingConsumers()

const visibilityTimeout = await this.getQueueVisibilityTimeout()
Expand Down
7 changes: 7 additions & 0 deletions packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,15 @@ describe('SqsPermissionConsumer', () => {
return Promise.resolve({ error: 'retryLater' })
},
})

// Consumer should not be running before start
expect(consumer.isRunning).toBe(false)

await consumer.start()

// Consumer should be running after start
expect(consumer.isRunning).toBe(true)

const publisher = new SqsPermissionPublisher(diContainer.cradle, {
locatorConfig: { queueUrl: consumer.queueProps.url },
})
Expand Down
Loading