Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@ jobs:
git config user.name "github-actions[bot]"
git config user.email "github-actions[bot]@users.noreply.github.com"

- name: Pull latest changes
run: git pull --ff-only origin main

- name: Setup Node
uses: actions/setup-node@v6
with:
Expand Down
2 changes: 1 addition & 1 deletion packages/core/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# @message-queue-toolkit/core

Core library for message-queue-toolkit. Provides foundational abstractions, utilities, and base classes for building message queue publishers and consumers.

## Table of Contents
Expand Down
39 changes: 34 additions & 5 deletions packages/sns/lib/sns/AbstractSnsSqsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,43 @@ export abstract class AbstractSnsSqsConsumer<
this.locatorConfig,
this.creationConfig,
this.subscriptionConfig,
{ logger: this.logger },
{
logger: this.logger,
// This callback is only invoked in non-blocking mode when resources were NOT
// immediately available. It will NOT be called if resourcesReady is true.
onResourcesReady: (result) => {
// Update values that were empty when resourcesReady was false
this.topicArn = result.topicArn
this.queueUrl = result.queueUrl
this.subscriptionArn = result.subscriptionArn
this.queueName = result.queueName
// Initialize DLQ now that resources are ready (this is mutually exclusive
// with the synchronous initDeadLetterQueue call below)
this.initDeadLetterQueue().catch((err) => {
this.logger.error({
message: 'Failed to initialize dead letter queue after resources became ready',
error: err,
})
})
},
},
)
this.queueName = initSnsSqsResult.queueName
this.queueUrl = initSnsSqsResult.queueUrl

// Always assign topicArn and queueName (always valid in both blocking and non-blocking modes)
this.topicArn = initSnsSqsResult.topicArn
this.subscriptionArn = initSnsSqsResult.subscriptionArn
this.queueName = initSnsSqsResult.queueName

// Only assign queueUrl and subscriptionArn if resources are ready,
// or if they have valid values (non-blocking mode with locatorConfig provides valid values)
if (initSnsSqsResult.resourcesReady || initSnsSqsResult.queueUrl) {
this.queueUrl = initSnsSqsResult.queueUrl
this.subscriptionArn = initSnsSqsResult.subscriptionArn
}

await this.initDeadLetterQueue()
// Only initialize DLQ if resources are ready and queueUrl is available
if (initSnsSqsResult.resourcesReady && initSnsSqsResult.queueUrl) {
await this.initDeadLetterQueue()
}
}

protected override resolveMessage(message: SQSMessage) {
Expand Down
40 changes: 29 additions & 11 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ async function createSubscriptionWithPolling(
resourcesReady: boolean
}> {
const nonBlocking = startupResourcePolling.nonBlocking === true
// biome-ignore lint/style/noNonNullAssertion: QueueName is validated in initSnsSqs before calling this function
const queueName = creationConfig.queue.QueueName!

const onTopicReady = async () => {
Expand All @@ -124,6 +125,8 @@ async function createSubscriptionWithPolling(
extraParams?.onResourcesReady?.({
topicArn: result.topicArn,
queueUrl: result.queueUrl,
subscriptionArn: result.subscriptionArn,
queueName,
})
} catch (err) {
const error = isError(err) ? err : new Error(String(err))
Expand Down Expand Up @@ -207,7 +210,12 @@ export type InitSnsSqsExtraParams = ExtraParams & {
* Callback invoked when resources become available in non-blocking mode.
* Only called when startupResourcePolling.nonBlocking is true and resources were not immediately available.
*/
onResourcesReady?: (result: { topicArn: string; queueUrl: string }) => void
onResourcesReady?: (result: {
topicArn: string
queueUrl: string
subscriptionArn: string
queueName: string
}) => void
/**
* Callback invoked when background resource polling or subscription creation fails in non-blocking mode.
* This can happen due to polling timeout or subscription creation failure.
Expand Down Expand Up @@ -273,9 +281,17 @@ export async function initSnsSqs(
isStartupResourcePollingEnabled(startupResourcePolling) &&
!isCreateTopicCommand(topicResolutionOptions)
) {
// Validate that we have either topicArn or topicName to build the ARN
if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) {
throw new Error(
'When startup resource polling is enabled and topic is not being created, either topicArn or topicName must be provided in locatorConfig to identify the topic to poll for',
)
}

// biome-ignore lint/style/noNonNullAssertion: Validated above that at least one of topicArn or topicName is present
const topicArnToWaitFor =
topicResolutionOptions.topicArn ??
(await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? ''))
(await buildTopicArn(stsClient, topicResolutionOptions.topicName!))

return await createSubscriptionWithPolling(
sqsClient,
Expand Down Expand Up @@ -321,13 +337,23 @@ export async function initSnsSqs(
if (isStartupResourcePollingEnabled(startupResourcePolling)) {
const nonBlocking = startupResourcePolling.nonBlocking === true

// Extract queueName early for use in callbacks
const splitUrl = queueUrl.split('/')
// biome-ignore lint/style/noNonNullAssertion: It's ok
const queueName = splitUrl[splitUrl.length - 1]!

// Track availability for non-blocking mode coordination
let topicAvailable = false
let queueAvailable = false

const notifyIfBothReady = () => {
if (nonBlocking && topicAvailable && queueAvailable) {
extraParams?.onResourcesReady?.({ topicArn: subscriptionTopicArn, queueUrl })
extraParams?.onResourcesReady?.({
topicArn: subscriptionTopicArn,
queueUrl,
subscriptionArn: locatorConfig.subscriptionArn!,
queueName,
})
}
}

Expand All @@ -354,10 +380,6 @@ export async function initSnsSqs(
// If non-blocking and topic wasn't immediately available, return early
// Background polling will continue and call notifyIfBothReady when topic is available
if (nonBlocking && topicResult === undefined) {
const splitUrl = queueUrl.split('/')
// biome-ignore lint/style/noNonNullAssertion: It's ok
const queueName = splitUrl[splitUrl.length - 1]!

// Also start polling for queue in background so we can notify when both are ready
pollForQueue(
sqsClient,
Expand Down Expand Up @@ -419,10 +441,6 @@ export async function initSnsSqs(

// If non-blocking and queue wasn't immediately available, return early
if (nonBlocking && queueResult === undefined) {
const splitUrl = queueUrl.split('/')
// biome-ignore lint/style/noNonNullAssertion: It's ok
const queueName = splitUrl[splitUrl.length - 1]!

return {
subscriptionArn: locatorConfig.subscriptionArn,
topicArn: subscriptionTopicArn,
Expand Down
Loading