diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 29f97df5..f3ad774b 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -281,6 +281,9 @@ jobs: git config user.name "github-actions[bot]" git config user.email "github-actions[bot]@users.noreply.github.com" + - name: Pull latest changes + run: git pull --ff-only origin main + - name: Setup Node uses: actions/setup-node@v6 with: diff --git a/packages/core/README.md b/packages/core/README.md index 253b34cf..727a253b 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -1,5 +1,5 @@ # @message-queue-toolkit/core - + Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers. ## Table of Contents diff --git a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts index 7aa6a81f..11e4a5d7 100644 --- a/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts +++ b/packages/sns/lib/sns/AbstractSnsSqsConsumer.ts @@ -102,14 +102,43 @@ export abstract class AbstractSnsSqsConsumer< this.locatorConfig, this.creationConfig, this.subscriptionConfig, - { logger: this.logger }, + { + logger: this.logger, + // This callback is only invoked in non-blocking mode when resources were NOT + // immediately available. It will NOT be called if resourcesReady is true. + onResourcesReady: (result) => { + // Update values that were empty when resourcesReady was false + this.topicArn = result.topicArn + this.queueUrl = result.queueUrl + this.subscriptionArn = result.subscriptionArn + this.queueName = result.queueName + // Initialize DLQ now that resources are ready (this is mutually exclusive + // with the synchronous initDeadLetterQueue call below) + this.initDeadLetterQueue().catch((err) => { + this.logger.error({ + message: 'Failed to initialize dead letter queue after resources became ready', + error: err, + }) + }) + }, + }, ) - this.queueName = initSnsSqsResult.queueName - this.queueUrl = initSnsSqsResult.queueUrl + + // Always assign topicArn and queueName (always valid in both blocking and non-blocking modes) this.topicArn = initSnsSqsResult.topicArn - this.subscriptionArn = initSnsSqsResult.subscriptionArn + this.queueName = initSnsSqsResult.queueName + + // Only assign queueUrl and subscriptionArn if resources are ready, + // or if they have valid values (non-blocking mode with locatorConfig provides valid values) + if (initSnsSqsResult.resourcesReady || initSnsSqsResult.queueUrl) { + this.queueUrl = initSnsSqsResult.queueUrl + this.subscriptionArn = initSnsSqsResult.subscriptionArn + } - await this.initDeadLetterQueue() + // Only initialize DLQ if resources are ready and queueUrl is available + if (initSnsSqsResult.resourcesReady && initSnsSqsResult.queueUrl) { + await this.initDeadLetterQueue() + } } protected override resolveMessage(message: SQSMessage) { diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 9b8eae3f..af0f7d6b 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -108,6 +108,7 @@ async function createSubscriptionWithPolling( resourcesReady: boolean }> { const nonBlocking = startupResourcePolling.nonBlocking === true + // biome-ignore lint/style/noNonNullAssertion: QueueName is validated in initSnsSqs before calling this function const queueName = creationConfig.queue.QueueName! const onTopicReady = async () => { @@ -124,6 +125,8 @@ async function createSubscriptionWithPolling( extraParams?.onResourcesReady?.({ topicArn: result.topicArn, queueUrl: result.queueUrl, + subscriptionArn: result.subscriptionArn, + queueName, }) } catch (err) { const error = isError(err) ? err : new Error(String(err)) @@ -207,7 +210,12 @@ export type InitSnsSqsExtraParams = ExtraParams & { * Callback invoked when resources become available in non-blocking mode. * Only called when startupResourcePolling.nonBlocking is true and resources were not immediately available. */ - onResourcesReady?: (result: { topicArn: string; queueUrl: string }) => void + onResourcesReady?: (result: { + topicArn: string + queueUrl: string + subscriptionArn: string + queueName: string + }) => void /** * Callback invoked when background resource polling or subscription creation fails in non-blocking mode. * This can happen due to polling timeout or subscription creation failure. @@ -273,9 +281,17 @@ export async function initSnsSqs( isStartupResourcePollingEnabled(startupResourcePolling) && !isCreateTopicCommand(topicResolutionOptions) ) { + // Validate that we have either topicArn or topicName to build the ARN + if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) { + throw new Error( + 'When startup resource polling is enabled and topic is not being created, either topicArn or topicName must be provided in locatorConfig to identify the topic to poll for', + ) + } + + // biome-ignore lint/style/noNonNullAssertion: Validated above that at least one of topicArn or topicName is present const topicArnToWaitFor = topicResolutionOptions.topicArn ?? - (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? '')) + (await buildTopicArn(stsClient, topicResolutionOptions.topicName!)) return await createSubscriptionWithPolling( sqsClient, @@ -321,13 +337,23 @@ export async function initSnsSqs( if (isStartupResourcePollingEnabled(startupResourcePolling)) { const nonBlocking = startupResourcePolling.nonBlocking === true + // Extract queueName early for use in callbacks + const splitUrl = queueUrl.split('/') + // biome-ignore lint/style/noNonNullAssertion: It's ok + const queueName = splitUrl[splitUrl.length - 1]! + // Track availability for non-blocking mode coordination let topicAvailable = false let queueAvailable = false const notifyIfBothReady = () => { if (nonBlocking && topicAvailable && queueAvailable) { - extraParams?.onResourcesReady?.({ topicArn: subscriptionTopicArn, queueUrl }) + extraParams?.onResourcesReady?.({ + topicArn: subscriptionTopicArn, + queueUrl, + subscriptionArn: locatorConfig.subscriptionArn!, + queueName, + }) } } @@ -354,10 +380,6 @@ export async function initSnsSqs( // If non-blocking and topic wasn't immediately available, return early // Background polling will continue and call notifyIfBothReady when topic is available if (nonBlocking && topicResult === undefined) { - const splitUrl = queueUrl.split('/') - // biome-ignore lint/style/noNonNullAssertion: It's ok - const queueName = splitUrl[splitUrl.length - 1]! - // Also start polling for queue in background so we can notify when both are ready pollForQueue( sqsClient, @@ -419,10 +441,6 @@ export async function initSnsSqs( // If non-blocking and queue wasn't immediately available, return early if (nonBlocking && queueResult === undefined) { - const splitUrl = queueUrl.split('/') - // biome-ignore lint/style/noNonNullAssertion: It's ok - const queueName = splitUrl[splitUrl.length - 1]! - return { subscriptionArn: locatorConfig.subscriptionArn, topicArn: subscriptionTopicArn,