Skip to content

Commit e15feb9

Browse files
authored
Minor fixes for polling functionality (#391)
1 parent e09dfcd commit e15feb9

File tree

4 files changed

+67
-17
lines changed

4 files changed

+67
-17
lines changed

.github/workflows/publish.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,9 @@ jobs:
281281
git config user.name "github-actions[bot]"
282282
git config user.email "github-actions[bot]@users.noreply.github.com"
283283
284+
- name: Pull latest changes
285+
run: git pull --ff-only origin main
286+
284287
- name: Setup Node
285288
uses: actions/setup-node@v6
286289
with:

packages/core/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# @message-queue-toolkit/core
2-
2+
33
Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers.
44

55
## Table of Contents

packages/sns/lib/sns/AbstractSnsSqsConsumer.ts

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,43 @@ export abstract class AbstractSnsSqsConsumer<
102102
this.locatorConfig,
103103
this.creationConfig,
104104
this.subscriptionConfig,
105-
{ logger: this.logger },
105+
{
106+
logger: this.logger,
107+
// This callback is only invoked in non-blocking mode when resources were NOT
108+
// immediately available. It will NOT be called if resourcesReady is true.
109+
onResourcesReady: (result) => {
110+
// Update values that were empty when resourcesReady was false
111+
this.topicArn = result.topicArn
112+
this.queueUrl = result.queueUrl
113+
this.subscriptionArn = result.subscriptionArn
114+
this.queueName = result.queueName
115+
// Initialize DLQ now that resources are ready (this is mutually exclusive
116+
// with the synchronous initDeadLetterQueue call below)
117+
this.initDeadLetterQueue().catch((err) => {
118+
this.logger.error({
119+
message: 'Failed to initialize dead letter queue after resources became ready',
120+
error: err,
121+
})
122+
})
123+
},
124+
},
106125
)
107-
this.queueName = initSnsSqsResult.queueName
108-
this.queueUrl = initSnsSqsResult.queueUrl
126+
127+
// Always assign topicArn and queueName (always valid in both blocking and non-blocking modes)
109128
this.topicArn = initSnsSqsResult.topicArn
110-
this.subscriptionArn = initSnsSqsResult.subscriptionArn
129+
this.queueName = initSnsSqsResult.queueName
130+
131+
// Only assign queueUrl and subscriptionArn if resources are ready,
132+
// or if they have valid values (non-blocking mode with locatorConfig provides valid values)
133+
if (initSnsSqsResult.resourcesReady || initSnsSqsResult.queueUrl) {
134+
this.queueUrl = initSnsSqsResult.queueUrl
135+
this.subscriptionArn = initSnsSqsResult.subscriptionArn
136+
}
111137

112-
await this.initDeadLetterQueue()
138+
// Only initialize DLQ if resources are ready and queueUrl is available
139+
if (initSnsSqsResult.resourcesReady && initSnsSqsResult.queueUrl) {
140+
await this.initDeadLetterQueue()
141+
}
113142
}
114143

115144
protected override resolveMessage(message: SQSMessage) {

packages/sns/lib/utils/snsInitter.ts

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ async function createSubscriptionWithPolling(
108108
resourcesReady: boolean
109109
}> {
110110
const nonBlocking = startupResourcePolling.nonBlocking === true
111+
// biome-ignore lint/style/noNonNullAssertion: QueueName is validated in initSnsSqs before calling this function
111112
const queueName = creationConfig.queue.QueueName!
112113

113114
const onTopicReady = async () => {
@@ -124,6 +125,8 @@ async function createSubscriptionWithPolling(
124125
extraParams?.onResourcesReady?.({
125126
topicArn: result.topicArn,
126127
queueUrl: result.queueUrl,
128+
subscriptionArn: result.subscriptionArn,
129+
queueName,
127130
})
128131
} catch (err) {
129132
const error = isError(err) ? err : new Error(String(err))
@@ -207,7 +210,12 @@ export type InitSnsSqsExtraParams = ExtraParams & {
207210
* Callback invoked when resources become available in non-blocking mode.
208211
* Only called when startupResourcePolling.nonBlocking is true and resources were not immediately available.
209212
*/
210-
onResourcesReady?: (result: { topicArn: string; queueUrl: string }) => void
213+
onResourcesReady?: (result: {
214+
topicArn: string
215+
queueUrl: string
216+
subscriptionArn: string
217+
queueName: string
218+
}) => void
211219
/**
212220
* Callback invoked when background resource polling or subscription creation fails in non-blocking mode.
213221
* This can happen due to polling timeout or subscription creation failure.
@@ -273,9 +281,17 @@ export async function initSnsSqs(
273281
isStartupResourcePollingEnabled(startupResourcePolling) &&
274282
!isCreateTopicCommand(topicResolutionOptions)
275283
) {
284+
// Validate that we have either topicArn or topicName to build the ARN
285+
if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) {
286+
throw new Error(
287+
'When startup resource polling is enabled and topic is not being created, either topicArn or topicName must be provided in locatorConfig to identify the topic to poll for',
288+
)
289+
}
290+
291+
// biome-ignore lint/style/noNonNullAssertion: Validated above that at least one of topicArn or topicName is present
276292
const topicArnToWaitFor =
277293
topicResolutionOptions.topicArn ??
278-
(await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? ''))
294+
(await buildTopicArn(stsClient, topicResolutionOptions.topicName!))
279295

280296
return await createSubscriptionWithPolling(
281297
sqsClient,
@@ -321,13 +337,23 @@ export async function initSnsSqs(
321337
if (isStartupResourcePollingEnabled(startupResourcePolling)) {
322338
const nonBlocking = startupResourcePolling.nonBlocking === true
323339

340+
// Extract queueName early for use in callbacks
341+
const splitUrl = queueUrl.split('/')
342+
// biome-ignore lint/style/noNonNullAssertion: It's ok
343+
const queueName = splitUrl[splitUrl.length - 1]!
344+
324345
// Track availability for non-blocking mode coordination
325346
let topicAvailable = false
326347
let queueAvailable = false
327348

328349
const notifyIfBothReady = () => {
329350
if (nonBlocking && topicAvailable && queueAvailable) {
330-
extraParams?.onResourcesReady?.({ topicArn: subscriptionTopicArn, queueUrl })
351+
extraParams?.onResourcesReady?.({
352+
topicArn: subscriptionTopicArn,
353+
queueUrl,
354+
subscriptionArn: locatorConfig.subscriptionArn!,
355+
queueName,
356+
})
331357
}
332358
}
333359

@@ -354,10 +380,6 @@ export async function initSnsSqs(
354380
// If non-blocking and topic wasn't immediately available, return early
355381
// Background polling will continue and call notifyIfBothReady when topic is available
356382
if (nonBlocking && topicResult === undefined) {
357-
const splitUrl = queueUrl.split('/')
358-
// biome-ignore lint/style/noNonNullAssertion: It's ok
359-
const queueName = splitUrl[splitUrl.length - 1]!
360-
361383
// Also start polling for queue in background so we can notify when both are ready
362384
pollForQueue(
363385
sqsClient,
@@ -419,10 +441,6 @@ export async function initSnsSqs(
419441

420442
// If non-blocking and queue wasn't immediately available, return early
421443
if (nonBlocking && queueResult === undefined) {
422-
const splitUrl = queueUrl.split('/')
423-
// biome-ignore lint/style/noNonNullAssertion: It's ok
424-
const queueName = splitUrl[splitUrl.length - 1]!
425-
426444
return {
427445
subscriptionArn: locatorConfig.subscriptionArn,
428446
topicArn: subscriptionTopicArn,

0 commit comments

Comments
 (0)