Skip to content

Commit f68fc0a

Browse files
authored
Avoid starting consumers prematurely (#392)
1 parent dbe8ce7 commit f68fc0a

File tree

5 files changed

+255
-8
lines changed

5 files changed

+255
-8
lines changed

packages/sns/lib/sns/AbstractSnsSqsConsumer.ts

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ export abstract class AbstractSnsSqsConsumer<
5555
private readonly snsClient: SNSClient
5656
private readonly stsClient: STSClient
5757

58+
/**
59+
* Tracks whether resources (SNS topic, SQS queue, subscription) are ready.
60+
* In non-blocking polling mode, this may be false initially and become true
61+
* when the onResourcesReady callback fires.
62+
*/
63+
private resourcesReady: boolean = false
64+
65+
/**
66+
* Tracks whether start() has been called but consumers couldn't be started
67+
* because resources weren't ready yet. When resources become ready and this
68+
* is true, consumers will be started automatically.
69+
*/
70+
private startRequested: boolean = false
71+
5872
// @ts-expect-error
5973
protected topicArn: string
6074
// @ts-expect-error
@@ -112,21 +126,42 @@ export abstract class AbstractSnsSqsConsumer<
112126
this.queueUrl = result.queueUrl
113127
this.subscriptionArn = result.subscriptionArn
114128
this.queueName = result.queueName
129+
this.resourcesReady = true
130+
115131
// Initialize DLQ now that resources are ready (this is mutually exclusive
116132
// with the synchronous initDeadLetterQueue call below)
117-
this.initDeadLetterQueue().catch((err) => {
118-
this.logger.error({
119-
message: 'Failed to initialize dead letter queue after resources became ready',
120-
error: err,
133+
this.initDeadLetterQueue()
134+
.catch((err) => {
135+
this.logger.error({
136+
message: 'Failed to initialize dead letter queue after resources became ready',
137+
error: err,
138+
})
139+
})
140+
.then(() => {
141+
// If start() was called while resources weren't ready, start consumers now
142+
if (this.startRequested) {
143+
this.logger.info({
144+
message: 'Resources now ready, starting consumers',
145+
queueName: this.queueName,
146+
topicArn: this.topicArn,
147+
})
148+
return this.startConsumers()
149+
}
150+
})
151+
.catch((err) => {
152+
this.logger.error({
153+
message: 'Failed to start consumers after resources became ready',
154+
error: err,
155+
})
121156
})
122-
})
123157
},
124158
},
125159
)
126160

127161
// Always assign topicArn and queueName (always valid in both blocking and non-blocking modes)
128162
this.topicArn = initSnsSqsResult.topicArn
129163
this.queueName = initSnsSqsResult.queueName
164+
this.resourcesReady = initSnsSqsResult.resourcesReady
130165

131166
// Only assign queueUrl and subscriptionArn if resources are ready,
132167
// or if they have valid values (non-blocking mode with locatorConfig provides valid values)
@@ -141,6 +176,31 @@ export abstract class AbstractSnsSqsConsumer<
141176
}
142177
}
143178

179+
/**
180+
* Starts the consumer. In non-blocking polling mode, if resources aren't ready yet,
181+
* this method will return immediately and consumers will start automatically once
182+
* resources become available.
183+
*/
184+
public override async start() {
185+
await this.init()
186+
187+
if (!this.resourcesReady) {
188+
// Resources not ready yet (non-blocking polling mode), mark that start was requested.
189+
// Consumers will be started automatically when onResourcesReady callback fires.
190+
this.startRequested = true
191+
this.logger.info({
192+
message:
193+
'Start requested but resources not ready yet, will start when resources become available',
194+
queueName: this.queueName,
195+
topicArn: this.topicArn,
196+
})
197+
return
198+
}
199+
200+
// Resources are ready, start consumers immediately
201+
await this.startConsumers()
202+
}
203+
144204
protected override resolveMessage(message: SQSMessage) {
145205
const result = readSnsMessage(message, this.errorResolver)
146206
if (result.result) {

packages/sns/lib/utils/snsInitter.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,10 +288,10 @@ export async function initSnsSqs(
288288
)
289289
}
290290

291-
// biome-ignore lint/style/noNonNullAssertion: Validated above that at least one of topicArn or topicName is present
291+
// topicName is guaranteed to be defined here because we validated above that at least one of topicArn or topicName is present
292292
const topicArnToWaitFor =
293293
topicResolutionOptions.topicArn ??
294-
(await buildTopicArn(stsClient, topicResolutionOptions.topicName!))
294+
(await buildTopicArn(stsClient, topicResolutionOptions.topicName as string))
295295

296296
return await createSubscriptionWithPolling(
297297
sqsClient,
@@ -351,7 +351,8 @@ export async function initSnsSqs(
351351
extraParams?.onResourcesReady?.({
352352
topicArn: subscriptionTopicArn,
353353
queueUrl,
354-
subscriptionArn: locatorConfig.subscriptionArn!,
354+
// subscriptionArn is guaranteed to be defined here because we're in the branch where locatorConfig.subscriptionArn exists
355+
subscriptionArn: locatorConfig.subscriptionArn as string,
355356
queueName,
356357
})
357358
}

packages/sns/test/consumers/SnsSqsPermissionConsumer.startupResourcePolling.spec.ts

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,83 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => {
220220
expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
221221
expect(consumer.subscriptionProps.queueName).toBe(queueName)
222222
})
223+
224+
it('start() waits for resources and starts consumers (blocking mode)', async () => {
225+
// Create queue first, but not the topic
226+
await assertQueue(sqsClient, { QueueName: queueName })
227+
228+
const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
229+
locatorConfig: {
230+
topicName,
231+
queueUrl,
232+
subscriptionArn:
233+
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
234+
startupResourcePolling: {
235+
enabled: true,
236+
pollingIntervalMs: 50,
237+
timeoutMs: 5000,
238+
},
239+
},
240+
creationConfig: {
241+
queue: { QueueName: queueName },
242+
},
243+
})
244+
245+
// Consumer should not be running before start
246+
expect(consumer.isRunning).toBe(false)
247+
248+
// Start in background (blocking mode waits for resources)
249+
const startPromise = consumer.start()
250+
251+
// Wait a bit then create the topic
252+
await setTimeout(200)
253+
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })
254+
255+
// start() should complete successfully after topic appears
256+
await startPromise
257+
258+
expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
259+
expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl)
260+
// Consumer should be running after start completes
261+
expect(consumer.isRunning).toBe(true)
262+
263+
// Clean up
264+
await consumer.close()
265+
})
266+
267+
it('start() works immediately when resources already exist (blocking mode)', async () => {
268+
// Create both resources before starting
269+
await assertQueue(sqsClient, { QueueName: queueName })
270+
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })
271+
272+
const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
273+
locatorConfig: {
274+
topicArn,
275+
queueUrl,
276+
subscriptionArn:
277+
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
278+
startupResourcePolling: {
279+
enabled: true,
280+
pollingIntervalMs: 100,
281+
timeoutMs: 5000,
282+
},
283+
},
284+
})
285+
286+
// Consumer should not be running before start
287+
expect(consumer.isRunning).toBe(false)
288+
289+
// start() should complete immediately since resources exist
290+
await consumer.start()
291+
292+
expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
293+
expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl)
294+
// Consumer should be running after start completes
295+
expect(consumer.isRunning).toBe(true)
296+
297+
// Clean up
298+
await consumer.close()
299+
})
223300
})
224301

225302
describe('when nonBlocking mode is enabled', () => {
@@ -465,6 +542,91 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => {
465542
expect(callbackError?.message).toContain('Timeout')
466543
expect(callbackContext).toEqual({ isFinal: true })
467544
})
545+
546+
it('start() returns immediately when topic is not available and starts consumers when resources become ready', async () => {
547+
// Create queue but not topic
548+
await assertQueue(sqsClient, { QueueName: queueName })
549+
550+
const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
551+
locatorConfig: {
552+
topicName,
553+
queueUrl,
554+
subscriptionArn:
555+
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
556+
startupResourcePolling: {
557+
enabled: true,
558+
pollingIntervalMs: 50,
559+
timeoutMs: 5000,
560+
nonBlocking: true,
561+
},
562+
},
563+
creationConfig: {
564+
queue: { QueueName: queueName },
565+
},
566+
})
567+
568+
// Consumer should not be running before start
569+
expect(consumer.isRunning).toBe(false)
570+
571+
// start() should return immediately even though topic doesn't exist
572+
await consumer.start()
573+
574+
// queueName should be set but consumer should NOT be running yet (resources not ready)
575+
expect(consumer.subscriptionProps.queueName).toBe(queueName)
576+
expect(consumer.isRunning).toBe(false)
577+
578+
// Create topic after start returns
579+
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })
580+
581+
// Wait for consumer to start running (happens when resources become ready)
582+
await vi.waitFor(
583+
() => {
584+
expect(consumer.isRunning).toBe(true)
585+
},
586+
{ timeout: 3000, interval: 50 },
587+
)
588+
589+
// Verify topicArn was updated
590+
expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
591+
592+
// Clean up
593+
await consumer.close()
594+
})
595+
596+
it('start() works immediately when resources are already available', async () => {
597+
// Create both queue and topic before starting
598+
await assertQueue(sqsClient, { QueueName: queueName })
599+
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })
600+
601+
const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
602+
locatorConfig: {
603+
topicArn,
604+
queueUrl,
605+
subscriptionArn:
606+
'arn:aws:sns:eu-west-1:000000000000:dummy:bdf640a2-bedf-475a-98b8-758b88c87395',
607+
startupResourcePolling: {
608+
enabled: true,
609+
pollingIntervalMs: 100,
610+
timeoutMs: 5000,
611+
nonBlocking: true,
612+
},
613+
},
614+
})
615+
616+
// Consumer should not be running before start
617+
expect(consumer.isRunning).toBe(false)
618+
619+
// start() should work immediately since resources exist
620+
await consumer.start()
621+
622+
expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
623+
expect(consumer.subscriptionProps.queueUrl).toBe(queueUrl)
624+
// Consumer should be running after start completes (resources were available)
625+
expect(consumer.isRunning).toBe(true)
626+
627+
// Clean up
628+
await consumer.close()
629+
})
468630
})
469631

470632
describe('when subscriptionArn is not provided (subscription creation mode)', () => {

packages/sqs/lib/sqs/AbstractSqsConsumer.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,14 @@ export abstract class AbstractSqsConsumer<
200200

201201
public readonly _messageSchemaContainer: MessageSchemaContainer<MessagePayloadType>
202202

203+
/**
204+
* Returns true if the consumer has active SQS polling consumers running.
205+
* Useful for checking if the consumer has started processing messages.
206+
*/
207+
public get isRunning(): boolean {
208+
return this.consumers.length > 0
209+
}
210+
203211
protected constructor(
204212
dependencies: SQSConsumerDependencies,
205213
options: ConsumerOptionsType,
@@ -269,6 +277,15 @@ export abstract class AbstractSqsConsumer<
269277

270278
public async start() {
271279
await this.init()
280+
await this.startConsumers()
281+
}
282+
283+
/**
284+
* Creates and starts the SQS consumers.
285+
* This method is separated from start() to allow subclasses to defer consumer creation
286+
* until resources are ready (e.g., in non-blocking polling mode).
287+
*/
288+
protected async startConsumers() {
272289
await this.stopExistingConsumers()
273290

274291
const visibilityTimeout = await this.getQueueVisibilityTimeout()

packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -992,8 +992,15 @@ describe('SqsPermissionConsumer', () => {
992992
return Promise.resolve({ error: 'retryLater' })
993993
},
994994
})
995+
996+
// Consumer should not be running before start
997+
expect(consumer.isRunning).toBe(false)
998+
995999
await consumer.start()
9961000

1001+
// Consumer should be running after start
1002+
expect(consumer.isRunning).toBe(true)
1003+
9971004
const publisher = new SqsPermissionPublisher(diContainer.cradle, {
9981005
locatorConfig: { queueUrl: consumer.queueProps.url },
9991006
})

0 commit comments

Comments
 (0)