Skip to content

Commit fd5d482

Browse files
authored
Implement polling for topics (#388)
1 parent 691cd73 commit fd5d482

15 files changed

+2058
-55
lines changed

README.md

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,206 @@ Both publishers and consumers accept a queue name and configuration as parameter
244244

245245
If you do not want to create a new queue/topic, you can set `queueLocator` field for `queueConfiguration`. In that case `message-queue-toolkit` will not attempt to create a new queue or topic, and instead throw an error if they don't already exist.
246246

247+
## Startup Resource Polling (Eventual Consistency Mode)
248+
249+
When using `locatorConfig` to reference existing queues or topics, the default behavior is to fail immediately if the resource doesn't exist. However, in some deployment scenarios (especially with cross-service dependencies), resources may not be available at startup time.
250+
251+
The `startupResourcePolling` option within `locatorConfig` enables "eventual consistency mode" where consumers poll for resources to become available instead of failing immediately.
252+
253+
### Use Case: Cross-Service Dependencies
254+
255+
Consider two services that need to subscribe to each other's topics:
256+
- **Service A** needs to subscribe to **Service B's** topic
257+
- **Service B** needs to subscribe to **Service A's** topic
258+
259+
Without eventual consistency mode, neither service can deploy first because the other's topic doesn't exist yet. With `startupResourcePolling`, both services can start simultaneously and wait for the other's resources to appear.
260+
261+
### Configuration
262+
263+
```typescript
264+
import { NO_TIMEOUT } from '@message-queue-toolkit/core'
265+
266+
const consumer = new MySnsSqsConsumer(dependencies, {
267+
locatorConfig: {
268+
topicArn: 'arn:aws:sns:...',
269+
queueUrl: 'https://sqs...',
270+
subscriptionArn: '...',
271+
// Enable eventual consistency mode
272+
startupResourcePolling: {
273+
enabled: true, // Enable polling for resource availability
274+
pollingIntervalMs: 5000, // Check every 5 seconds (default: 5000)
275+
timeoutMs: 300000, // Fail after 5 minutes (required)
276+
}
277+
}
278+
})
279+
```
280+
281+
### Configuration Options
282+
283+
| Option | Type | Default | Description |
284+
|--------|------|---------|-------------|
285+
| `enabled` | `boolean` | - | Must be set to `true` to enable polling |
286+
| `pollingIntervalMs` | `number` | `5000` | Interval between availability checks (ms) |
287+
| `timeoutMs` | `number \| NO_TIMEOUT` | - (required) | Maximum wait time before throwing `StartupResourcePollingTimeoutError`. Use `NO_TIMEOUT` to poll indefinitely |
288+
| `throwOnTimeout` | `boolean` | `true` | When `true`, throws error on timeout. When `false`, reports error via errorReporter, resets timeout counter, and continues polling |
289+
| `nonBlocking` | `boolean` | `false` | When `true`, `init()` returns immediately if resource is not available, and polling continues in the background. A callback is invoked when the resource becomes available |
290+
291+
### Environment-Specific Configuration
292+
293+
```typescript
294+
import { NO_TIMEOUT } from '@message-queue-toolkit/core'
295+
296+
// Production - with timeout (throws error when timeout reached)
297+
{
298+
locatorConfig: {
299+
queueUrl: '...',
300+
startupResourcePolling: {
301+
enabled: true,
302+
timeoutMs: 5 * 60 * 1000, // 5 minutes
303+
}
304+
}
305+
}
306+
307+
// Production - report timeout but keep trying
308+
// Useful when you want visibility into prolonged unavailability without failing
309+
{
310+
locatorConfig: {
311+
queueUrl: '...',
312+
startupResourcePolling: {
313+
enabled: true,
314+
timeoutMs: 5 * 60 * 1000, // Report every 5 minutes
315+
throwOnTimeout: false, // Don't throw, just report and continue
316+
}
317+
}
318+
}
319+
320+
// Development/Staging - poll indefinitely
321+
{
322+
locatorConfig: {
323+
queueUrl: '...',
324+
startupResourcePolling: {
325+
enabled: true,
326+
timeoutMs: NO_TIMEOUT,
327+
}
328+
}
329+
}
330+
331+
// Custom timeout and interval
332+
{
333+
locatorConfig: {
334+
queueUrl: '...',
335+
startupResourcePolling: {
336+
enabled: true,
337+
pollingIntervalMs: 10000,
338+
timeoutMs: 10 * 60 * 1000, // 10 minutes
339+
}
340+
}
341+
}
342+
343+
// Non-blocking mode - service starts immediately, polling continues in background
344+
// Useful when you want the service to be available even if dependencies are not ready
345+
{
346+
locatorConfig: {
347+
queueUrl: '...',
348+
startupResourcePolling: {
349+
enabled: true,
350+
timeoutMs: 5 * 60 * 1000,
351+
nonBlocking: true, // init() resolves immediately
352+
}
353+
}
354+
}
355+
```
356+
357+
### Non-Blocking Mode
358+
359+
When `nonBlocking: true` is set, `init()` returns immediately if the resource is not available on the first check. Polling continues in the background, and you can use callbacks to be notified when resources become available.
360+
361+
**Important behaviors:**
362+
- If the resource IS immediately available, `init()` returns normally with the resource ready
363+
- If the resource is NOT immediately available, `init()` returns with `resourcesReady: false` (for SNS/SQS) or `queueArn: undefined` (for SQS)
364+
- Background polling continues and invokes the callback when the resource becomes available
365+
- For SNS+SQS consumers, the callback is only invoked when BOTH topic and queue are available
366+
367+
```typescript
368+
// SQS non-blocking with callback
369+
import { initSqs } from '@message-queue-toolkit/sqs'
370+
371+
const result = await initSqs(
372+
sqsClient,
373+
{
374+
queueUrl: '...',
375+
startupResourcePolling: {
376+
enabled: true,
377+
timeoutMs: 5 * 60 * 1000,
378+
nonBlocking: true,
379+
},
380+
},
381+
undefined,
382+
undefined,
383+
{
384+
onQueueReady: ({ queueArn }) => {
385+
console.log(`Queue is now available: ${queueArn}`)
386+
// Start consuming messages, update health checks, etc.
387+
},
388+
},
389+
)
390+
391+
if (result.queueArn) {
392+
console.log('Queue was immediately available')
393+
} else {
394+
console.log('Queue not yet available, will be notified via callback')
395+
}
396+
397+
// SNS+SQS non-blocking with callback
398+
import { initSnsSqs } from '@message-queue-toolkit/sns'
399+
400+
const result = await initSnsSqs(
401+
sqsClient,
402+
snsClient,
403+
stsClient,
404+
{
405+
topicArn: '...',
406+
queueUrl: '...',
407+
subscriptionArn: '...',
408+
startupResourcePolling: {
409+
enabled: true,
410+
timeoutMs: 5 * 60 * 1000,
411+
nonBlocking: true,
412+
},
413+
},
414+
undefined,
415+
undefined,
416+
{
417+
onResourcesReady: ({ topicArn, queueUrl }) => {
418+
// Called only when BOTH topic and queue are available
419+
console.log(`Resources ready: topic=${topicArn}, queue=${queueUrl}`)
420+
},
421+
},
422+
)
423+
424+
if (result.resourcesReady) {
425+
console.log('All resources were immediately available')
426+
} else {
427+
console.log('Resources not yet available, will be notified via callback')
428+
}
429+
```
430+
431+
### Error Handling
432+
433+
When `timeoutMs` is configured and the resource doesn't become available within the specified time, a `StartupResourcePollingTimeoutError` is thrown:
434+
435+
```typescript
436+
import { StartupResourcePollingTimeoutError } from '@message-queue-toolkit/core'
437+
438+
try {
439+
await consumer.init()
440+
} catch (error) {
441+
if (error instanceof StartupResourcePollingTimeoutError) {
442+
console.error(`Resource ${error.resourceName} not available after ${error.timeoutMs}ms`)
443+
}
444+
}
445+
```
446+
247447
## SQS Policy Configuration
248448

249449
SQS queues can be configured with access policies to control who can send messages to and receive messages from the queue. The `policyConfig` parameter allows you to define these policies when creating or updating SQS queues.

packages/core/lib/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,12 @@ export { isProduction, reloadConfig } from './utils/envUtils.ts'
105105
export { isShallowSubset, objectMatches } from './utils/matchUtils.ts'
106106
export { type ParseMessageResult, parseMessage } from './utils/parseUtils.ts'
107107
export { objectToBuffer } from './utils/queueUtils.ts'
108+
export {
109+
isStartupResourcePollingEnabled,
110+
type StartupResourcePollingCheckResult,
111+
StartupResourcePollingTimeoutError,
112+
type WaitForResourceOptions,
113+
waitForResource,
114+
} from './utils/startupResourcePollingUtils.ts'
108115
export { toDatePreprocessor } from './utils/toDateProcessor.ts'
109116
export { waitAndRetry } from './utils/waitUtils.ts'

packages/core/lib/types/MessageQueueTypes.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import type { CommonLogger, TransactionObservabilityManager } from '@lokalise/node-core'
1+
import type {
2+
CommonLogger,
3+
ErrorReporter,
4+
TransactionObservabilityManager,
5+
} from '@lokalise/node-core'
26
import type { ZodSchema } from 'zod/v4'
37

48
import type { PublicHandlerSpy } from '../queues/HandlerSpy.ts'
@@ -38,6 +42,7 @@ export type { TransactionObservabilityManager }
3842

3943
export type ExtraParams = {
4044
logger?: CommonLogger
45+
errorReporter?: ErrorReporter
4146
}
4247

4348
export type SchemaMap<SupportedMessageTypes extends string> = Record<

packages/core/lib/types/queueOptionsTypes.ts

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,126 @@ export type DeletionConfig = {
130130
forceDeleteInProduction?: boolean
131131
}
132132

133+
/**
134+
* Symbol to indicate no timeout - polling will continue indefinitely.
135+
* Use this for dev/staging environments where you want to wait indefinitely for resources.
136+
*
137+
* @example
138+
* {
139+
* locatorConfig: {
140+
* queueUrl: '...',
141+
* startupResourcePolling: {
142+
* enabled: true,
143+
* timeoutMs: NO_TIMEOUT,
144+
* }
145+
* }
146+
* }
147+
*/
148+
export const NO_TIMEOUT = Symbol('NO_TIMEOUT')
149+
150+
/**
151+
* Configuration for startup resource polling mode when resources may not exist at startup.
152+
*
153+
* This is useful in scenarios where services have cross-dependencies:
154+
* - Service A needs to subscribe to Service B's topic
155+
* - Service B needs to subscribe to Service A's topic
156+
* - Neither can deploy first without the other's topic existing
157+
*
158+
* When `startupResourcePolling` is provided with `enabled: true` inside `locatorConfig`,
159+
* the consumer will poll for the topic/queue to become available instead of failing immediately.
160+
*
161+
* @example
162+
* // Enable with 5 minute timeout
163+
* {
164+
* locatorConfig: {
165+
* queueUrl: '...',
166+
* startupResourcePolling: {
167+
* enabled: true,
168+
* timeoutMs: 5 * 60 * 1000,
169+
* }
170+
* }
171+
* }
172+
*
173+
* @example
174+
* // Poll indefinitely (useful for dev/staging environments)
175+
* {
176+
* locatorConfig: {
177+
* queueUrl: '...',
178+
* startupResourcePolling: {
179+
* enabled: true,
180+
* timeoutMs: NO_TIMEOUT,
181+
* }
182+
* }
183+
* }
184+
*
185+
* @example
186+
* // Custom timeout and interval
187+
* {
188+
* locatorConfig: {
189+
* queueUrl: '...',
190+
* startupResourcePolling: {
191+
* enabled: true,
192+
* timeoutMs: 10 * 60 * 1000, // 10 minutes
193+
* pollingIntervalMs: 10000,
194+
* }
195+
* }
196+
* }
197+
*/
198+
export type StartupResourcePollingConfig = {
199+
/**
200+
* Controls whether polling is enabled.
201+
* Must be set to true to enable polling.
202+
*/
203+
enabled?: boolean
204+
205+
/**
206+
* Maximum time in milliseconds to wait for the resource to become available.
207+
* Use `NO_TIMEOUT` to disable timeout and poll indefinitely.
208+
*/
209+
timeoutMs: number | typeof NO_TIMEOUT
210+
211+
/**
212+
* Interval in milliseconds between polling attempts.
213+
* Default: 5000 (5 seconds)
214+
*/
215+
pollingIntervalMs?: number
216+
217+
/**
218+
* Whether to throw an error when timeout is reached.
219+
* - `true` (default): Throws `StartupResourcePollingTimeoutError` when timeout is reached
220+
* - `false`: Reports the error via errorReporter, resets the timeout counter, and continues polling
221+
*
222+
* Use `false` when you want to be notified about prolonged unavailability but don't want to fail.
223+
* Default: true
224+
*/
225+
throwOnTimeout?: boolean
226+
227+
/**
228+
* Whether to run polling in non-blocking mode.
229+
* - `false` (default): init() waits for the resource to become available before resolving
230+
* - `true`: If resource is not immediately available, init() resolves immediately and
231+
* polling continues in the background. When the resource becomes available,
232+
* the `onResourceAvailable` callback is invoked.
233+
*
234+
* Use `true` when you want the service to start quickly without waiting for dependencies.
235+
* Default: false
236+
*/
237+
nonBlocking?: boolean
238+
}
239+
240+
/**
241+
* Base type for queue locator configurations that includes startup resource polling.
242+
* Protocol-specific locator types should extend this.
243+
*/
244+
export type BaseQueueLocatorType = {
245+
/**
246+
* Configuration for startup resource polling mode.
247+
* When enabled, the consumer will poll for the resource to become available
248+
* instead of failing immediately.
249+
*/
250+
startupResourcePolling?: StartupResourcePollingConfig
251+
}
252+
133253
type NewQueueOptions<CreationConfigType extends CommonCreationConfigType> = {
134254
creationConfig?: CreationConfigType
135255
}

0 commit comments

Comments
 (0)