Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 107 additions & 45 deletions packages/sns/lib/utils/snsInitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,62 @@ import { subscribeToTopic } from './snsSubscriber.ts'
import { assertTopic, deleteSubscription, deleteTopic, getTopicAttributes } from './snsUtils.ts'
import { buildTopicArn } from './stsUtils.ts'

// Helper type for topic polling result
type TopicPollingResult = {
topicResult: unknown | undefined
topicArn: string
}

// Helper function to poll for SNS topic availability
async function pollForTopic(
snsClient: SNSClient,
topicArn: string,
startupResourcePolling: NonNullable<SNSSQSQueueLocatorType['startupResourcePolling']>,
extraParams?: ExtraParams,
onResourceAvailable?: () => void,
): Promise<TopicPollingResult> {
const topicResult = await waitForResource({
config: startupResourcePolling,
resourceName: `SNS topic ${topicArn}`,
logger: extraParams?.logger,
errorReporter: extraParams?.errorReporter,
onResourceAvailable,
checkFn: async () => {
const result = await getTopicAttributes(snsClient, topicArn)
if (result.error === 'not_found') {
return { isAvailable: false }
}
return { isAvailable: true, result: result.result }
},
})

return { topicResult, topicArn }
}

// Helper function to poll for SQS queue availability
async function pollForQueue(
sqsClient: SQSClient,
queueUrl: string,
startupResourcePolling: NonNullable<SNSSQSQueueLocatorType['startupResourcePolling']>,
extraParams?: ExtraParams,
onResourceAvailable?: () => void,
): Promise<unknown | undefined> {
return await waitForResource({
config: startupResourcePolling,
resourceName: `SQS queue ${queueUrl}`,
logger: extraParams?.logger,
errorReporter: extraParams?.errorReporter,
onResourceAvailable,
checkFn: async () => {
const result = await getQueueAttributes(sqsClient, queueUrl)
if (result.error === 'not_found') {
return { isAvailable: false }
}
return { isAvailable: true, result: result.result }
},
})
}

export type InitSnsSqsExtraParams = ExtraParams & {
/**
* Callback invoked when resources become available in non-blocking mode.
Expand Down Expand Up @@ -81,6 +137,39 @@ export async function initSnsSqs(
...creationConfig.topic,
}

// If startup resource polling is enabled and we're not creating the topic (just locating it),
// we should poll for the topic to exist before attempting to subscribe
const startupResourcePolling = locatorConfig?.startupResourcePolling
if (
isStartupResourcePollingEnabled(startupResourcePolling) &&
!isCreateTopicCommand(topicResolutionOptions)
) {
const topicArnToWaitFor =
topicResolutionOptions.topicArn ??
(await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? ''))
Comment on lines +276 to +278
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add validation for topicArn and topicName before calling buildTopicArn.

If both topicResolutionOptions.topicArn and topicResolutionOptions.topicName are undefined, an empty string is passed to buildTopicArn, which would create an invalid ARN. While the validation at line 243 ensures at least one topic identifier exists, the logic flow doesn't guarantee that the locator config has topicArn or topicName set specifically.

🛡️ Add validation before building topic ARN
+    if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) {
+      throw new Error(
+        'Either topicArn or topicName must be specified when using startup resource polling without topic creation'
+      )
+    }
+
     const topicArnToWaitFor =
       topicResolutionOptions.topicArn ??
       (await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? ''))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const topicArnToWaitFor =
topicResolutionOptions.topicArn ??
(await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? ''))
if (!topicResolutionOptions.topicArn && !topicResolutionOptions.topicName) {
throw new Error(
'Either topicArn or topicName must be specified when using startup resource polling without topic creation'
)
}
const topicArnToWaitFor =
topicResolutionOptions.topicArn ??
(await buildTopicArn(stsClient, topicResolutionOptions.topicName ?? ''))
🤖 Prompt for AI Agents
In @packages/sns/lib/utils/snsInitter.ts around lines 276 - 278, The assignment
to topicArnToWaitFor can pass an empty string to buildTopicArn when both
topicResolutionOptions.topicArn and topicResolutionOptions.topicName are
undefined; add explicit validation before calling buildTopicArn to ensure at
least one of topicResolutionOptions.topicArn or topicResolutionOptions.topicName
is present and non-empty, and if not throw a clear error (or return a failure)
instead of calling buildTopicArn, referencing the topicResolutionOptions
variable and the buildTopicArn function so the check occurs immediately before
computing topicArnToWaitFor.


const nonBlocking = startupResourcePolling.nonBlocking === true

const { topicResult } = await pollForTopic(
snsClient,
topicArnToWaitFor,
startupResourcePolling,
extraParams,
)

// If non-blocking and topic wasn't immediately available, return early with resourcesReady: false
// The caller will need to handle this case (e.g., retry later or use onResourcesReady callback)
if (nonBlocking && topicResult === undefined) {
return {
subscriptionArn: '',
topicArn: topicArnToWaitFor,
queueName: creationConfig.queue.QueueName,
queueUrl: '',
resourcesReady: false,
}
}
}

const { subscriptionArn, topicArn, queueUrl } = await subscribeToTopic(
sqsClient,
snsClient,
Expand Down Expand Up @@ -133,23 +222,16 @@ export async function initSnsSqs(
}

// Wait for topic to become available
const topicResult = await waitForResource({
config: startupResourcePolling,
resourceName: `SNS topic ${subscriptionTopicArn}`,
logger: extraParams?.logger,
errorReporter: extraParams?.errorReporter,
onResourceAvailable: () => {
const { topicResult } = await pollForTopic(
snsClient,
subscriptionTopicArn,
startupResourcePolling,
extraParams,
() => {
topicAvailable = true
notifyIfBothReady()
},
checkFn: async () => {
const result = await getTopicAttributes(snsClient, subscriptionTopicArn)
if (result.error === 'not_found') {
return { isAvailable: false }
}
return { isAvailable: true, result: result.result }
},
})
)

// If topic was immediately available, mark it
if (topicResult !== undefined) {
Expand All @@ -164,25 +246,12 @@ export async function initSnsSqs(
const queueName = splitUrl[splitUrl.length - 1]!

// Also start polling for queue in background so we can notify when both are ready
waitForResource({
config: startupResourcePolling,
resourceName: `SQS queue ${queueUrl}`,
logger: extraParams?.logger,
errorReporter: extraParams?.errorReporter,
onResourceAvailable: () => {
queueAvailable = true
notifyIfBothReady()
},
checkFn: async () => {
const result = await getQueueAttributes(sqsClient, queueUrl)
if (result.error === 'not_found') {
return { isAvailable: false }
}
return { isAvailable: true, result: result.result }
},
pollForQueue(sqsClient, queueUrl, startupResourcePolling, extraParams, () => {
queueAvailable = true
notifyIfBothReady()
})
.then((result) => {
// If queue was immediately available, waitForResource returns the result
// If queue was immediately available, pollForQueue returns the result
// but doesn't call onResourceAvailable, so we handle it here
if (result !== undefined) {
queueAvailable = true
Expand All @@ -207,23 +276,16 @@ export async function initSnsSqs(
}

// Wait for queue to become available
const queueResult = await waitForResource({
config: startupResourcePolling,
resourceName: `SQS queue ${queueUrl}`,
logger: extraParams?.logger,
errorReporter: extraParams?.errorReporter,
onResourceAvailable: () => {
const queueResult = await pollForQueue(
sqsClient,
queueUrl,
startupResourcePolling,
extraParams,
() => {
queueAvailable = true
notifyIfBothReady()
},
checkFn: async () => {
const result = await getQueueAttributes(sqsClient, queueUrl)
if (result.error === 'not_found') {
return { isAvailable: false }
}
return { isAvailable: true, result: result.result }
},
})
)

// If queue was immediately available, mark it
if (queueResult !== undefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,93 @@ describe('SnsSqsPermissionConsumer - startupResourcePollingConfig', () => {
})
})

describe('when subscriptionArn is not provided (subscription creation mode)', () => {
it('waits for topic to become available before creating subscription', async () => {
// No topic and no queue exist initially

const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
locatorConfig: {
topicName,
// No subscriptionArn - will create subscription
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 100,
timeoutMs: 5000,
},
},
creationConfig: {
queue: { QueueName: queueName },
},
})

// Start init in background
const initPromise = consumer.init()

// Wait a bit then create the topic (queue will be created by assertQueue in subscribeToTopic)
await setTimeout(300)
const topicArn = await assertTopic(snsClient, stsClient, { Name: topicName })

// Init should complete successfully after topic is created
await initPromise

expect(consumer.subscriptionProps.topicArn).toBe(topicArn)
expect(consumer.subscriptionProps.queueName).toBe(queueName)
expect(consumer.subscriptionProps.subscriptionArn).toBeDefined()
})

it('throws StartupResourcePollingTimeoutError when topic never appears', async () => {
// No topic exists

const consumer = new TestStartupResourcePollingConsumer(diContainer.cradle, {
locatorConfig: {
topicName,
// No subscriptionArn - will create subscription
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 50,
timeoutMs: 200, // Short timeout
},
},
creationConfig: {
queue: { QueueName: queueName },
},
})

// Should throw timeout error since topic never appears
await expect(consumer.init()).rejects.toThrow(StartupResourcePollingTimeoutError)
})

it('returns immediately in non-blocking mode when topic not available', async () => {
// Test at the initter level since consumer.init() sets internal state
const { initSnsSqs } = await import('../../lib/utils/snsInitter.ts')

const result = await initSnsSqs(
sqsClient,
snsClient,
stsClient,
{
topicName,
// No subscriptionArn
startupResourcePolling: {
enabled: true,
pollingIntervalMs: 100,
timeoutMs: 5000,
nonBlocking: true,
},
},
{
queue: { QueueName: queueName },
},
{ updateAttributesIfExists: false },
)

// Should return immediately with resourcesReady: false
expect(result.resourcesReady).toBe(false)
expect(result.subscriptionArn).toBe('')
expect(result.queueName).toBe(queueName)
})
})

describe('when startupResourcePolling is disabled', () => {
it('throws immediately when topic does not exist', async () => {
// Create queue but not topic
Expand Down
Loading