From c863bb4a0c7c016ddedb1a54198c079d16d8982e Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:29:37 +0200 Subject: [PATCH 1/4] Minor fixes for polling functionality --- .../sns/lib/sns/AbstractSnsSqsConsumer.ts | 36 ++++++++++++++--- packages/sns/lib/utils/snsInitter.ts | 40 ++++++++++++++----- 2 files changed, 60 insertions(+), 16 deletions(-) diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index 7aa6a81f..e0e5c2a7 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -102,14 +102,40 @@ export abstract class AbstractSnsSqsConsumer< this.locatorConfig, this.creationConfig, this.subscriptionConfig, - { logger: this.logger }, + { + logger: this.logger, + onResourcesReady: (result) => { + // Update values that may have been empty when resourcesReady was false + this.topicArn = result.topicArn + this.queueUrl = result.queueUrl + this.subscriptionArn = result.subscriptionArn + this.queueName = result.queueName + // Initialize DLQ now that resources are ready + this.initDeadLetterQueue().catch((err) => { + this.logger.error({ + message: 'Failed to initialize dead letter queue after resources became ready', + error: err, + }) + }) + }, + }, ) - this.queueName = initSnsSqsResult.queueName - this.queueUrl = initSnsSqsResult.queueUrl + + // Always assign topicArn and queueName (always valid in both blocking and non-blocking modes) this.topicArn = initSnsSqsResult.topicArn - this.subscriptionArn = initSnsSqsResult.subscriptionArn + this.queueName = initSnsSqsResult.queueName + + // Only assign queueUrl and subscriptionArn if resources are ready, + // or if they have valid values (non-blocking mode with locatorConfig provides valid values) + if (initSnsSqsResult.resourcesReady || initSnsSqsResult.queueUrl) { + this.queueUrl = initSnsSqsResult.queueUrl + this.subscriptionArn = initSnsSqsResult.subscriptionArn + } - await this.initDeadLetterQueue() + // Only initialize DLQ if resources are ready and queueUrl is available + if (initSnsSqsResult.resourcesReady && initSnsSqsResult.queueUrl) { + await this.initDeadLetterQueue() + } } protected override resolveMessage(message: SQSMessage) { diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 9b8eae3f..af0f7d6b 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -108,6 +108,7 @@ async function createSubscriptionWithPolling( resourcesReady: boolean }> { const nonBlocking = startupResourcePolling.nonBlocking === true + // biome-ignore lint/style/noNonNullAssertion: QueueName is validated in initSnsSqs before calling this function const queueName = creationConfig.queue.QueueName! const onTopicReady = async () => { @@ -124,6 +125,8 @@ async function createSubscriptionWithPolling( extraParams?.onResourcesReady?.({ topicArn: result.topicArn, queueUrl: result.queueUrl, + subscriptionArn: result.subscriptionArn, + queueName, }) } catch (err) { const error = isError(err) ? err : new Error(String(err)) @@ -207,7 +210,12 @@ export type InitSnsSqsExtraParams = ExtraParams & { * Callback invoked when resources become available in non-blocking mode. * Only called when startupResourcePolling.nonBlocking is true and resources were not immediately available. */ - onResourcesReady?: (result: { topicArn: string; queueUrl: string }) => void + onResourcesReady?: (result: { + topicArn: string + queueUrl: string + subscriptionArn: string + queueName: string + }) => void /** * Callback invoked when background resource polling or subscription creation fails in non-blocking mode. * This can happen due to polling timeout or subscription creation failure. @@ -273,9 +281,17 @@ export async function initSnsSqs( isStartupResourcePollingEnabled(startupResourcePolling) && !isCreateTopicCommand(topicResolutionOptions) ) { + // Validate that we have either topicArn or topicName to build the ARN + if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) { + throw new Error( + 'When startup resource polling is enabled and topic is not being created, either topicArn or topicName must be provided in locatorConfig to identify the topic to poll for', + ) + } + + // biome-ignore lint/style/noNonNullAssertion: Validated above that at least one of topicArn or topicName is present const topicArnToWaitFor = topicResolutionOptions.topicArn ?? - (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) + (await buildTopicArn(stsClient, topicResolutionOptions.topicName!)) return await createSubscriptionWithPolling( sqsClient, @@ -321,13 +337,23 @@ export async function initSnsSqs( if (isStartupResourcePollingEnabled(startupResourcePolling)) { const nonBlocking = startupResourcePolling.nonBlocking === true + // Extract queueName early for use in callbacks + const splitUrl = queueUrl.split('/') + // biome-ignore lint/style/noNonNullAssertion: It's ok + const queueName = splitUrl[splitUrl.length - 1]! + // Track availability for non-blocking mode coordination let topicAvailable = false let queueAvailable = false const notifyIfBothReady = () => { if (nonBlocking && topicAvailable && queueAvailable) { - extraParams?.onResourcesReady?.({ topicArn: subscriptionTopicArn, queueUrl }) + extraParams?.onResourcesReady?.({ + topicArn: subscriptionTopicArn, + queueUrl, + subscriptionArn: locatorConfig.subscriptionArn!, + queueName, + }) } } @@ -354,10 +380,6 @@ export async function initSnsSqs( // If non-blocking and topic wasn't immediately available, return early // Background polling will continue and call notifyIfBothReady when topic is available if (nonBlocking && topicResult === undefined) { - const splitUrl = queueUrl.split('/') - // biome-ignore lint/style/noNonNullAssertion: It's ok - const queueName = splitUrl[splitUrl.length - 1]! - // Also start polling for queue in background so we can notify when both are ready pollForQueue( sqsClient, @@ -419,10 +441,6 @@ export async function initSnsSqs( // If non-blocking and queue wasn't immediately available, return early if (nonBlocking && queueResult === undefined) { - const splitUrl = queueUrl.split('/') - // biome-ignore lint/style/noNonNullAssertion: It's ok - const queueName = splitUrl[splitUrl.length - 1]! - return { subscriptionArn: locatorConfig.subscriptionArn, topicArn: subscriptionTopicArn, From a6c9e2c9ea7916c59df7e7d5bab9660ac643d01a Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:31:17 +0200 Subject: [PATCH 2/4] add git pull and trigger core release --- .github/workflows/ci.common.yml | 3 +++ packages/core/README.md | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.common.yml b/.github/workflows/ci.common.yml index c78f3192..6ca897b6 100644 --- a/.github/workflows/ci.common.yml +++ b/.github/workflows/ci.common.yml @@ -21,6 +21,9 @@ jobs: node-version: ${{ inputs.node_version }} package-manager-cache: false + - name: Pull latest changes + run: git pull origin ${{ github.head_ref || github.ref_name }} + - name: Install run: npm install --ignore-scripts diff --git a/packages/core/README.md b/packages/core/README.md index 253b34cf..727a253b 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -1,5 +1,5 @@ # @message-queue-toolkit/core - + Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers. ## Table of Contents From cadfe4e46fa2336a83276315ea696a6fb6fb438a Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:34:01 +0200 Subject: [PATCH 3/4] Extra comments --- packages/sns/lib/sns/AbstractSnsSqsConsumer.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index e0e5c2a7..11e4a5d7 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -104,13 +104,16 @@ export abstract class AbstractSnsSqsConsumer< this.subscriptionConfig, { logger: this.logger, + // This callback is only invoked in non-blocking mode when resources were NOT + // immediately available. It will NOT be called if resourcesReady is true. onResourcesReady: (result) => { - // Update values that may have been empty when resourcesReady was false + // Update values that were empty when resourcesReady was false this.topicArn = result.topicArn this.queueUrl = result.queueUrl this.subscriptionArn = result.subscriptionArn this.queueName = result.queueName - // Initialize DLQ now that resources are ready + // Initialize DLQ now that resources are ready (this is mutually exclusive + // with the synchronous initDeadLetterQueue call below) this.initDeadLetterQueue().catch((err) => { this.logger.error({ message: 'Failed to initialize dead letter queue after resources became ready', From 8a0a90912dbd0a9a517bd2bc47a58bafcd70df03 Mon Sep 17 00:00:00 2001 From: Igor Savin Date: Thu, 8 Jan 2026 16:35:49 +0200 Subject: [PATCH 4/4] fix pipeline --- .github/workflows/ci.common.yml | 3 --- .github/workflows/publish.yml | 3 +++ 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.common.yml b/.github/workflows/ci.common.yml index 6ca897b6..c78f3192 100644 --- a/.github/workflows/ci.common.yml +++ b/.github/workflows/ci.common.yml @@ -21,9 +21,6 @@ jobs: node-version: ${{ inputs.node_version }} package-manager-cache: false - - name: Pull latest changes - run: git pull origin ${{ github.head_ref || github.ref_name }} - - name: Install run: npm install --ignore-scripts diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 29f97df5..f3ad774b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -281,6 +281,9 @@ jobs: git config user.name "github-actions[bot]" git config user.email "github-actions[bot]@users.noreply.github.com" + - name: Pull latest changes + run: git pull --ff-only origin main + - name: Setup Node uses: actions/setup-node@v6 with: