Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 59 additions & 3 deletions server/src/queue/initWorkers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ let totalMessagesProcessed = 0;
let lastStatsTime = Date.now();

// Process recycling — exit after processing this many messages to prevent memory leaks
const MAX_MESSAGES_BEFORE_RECYCLE = 500_000;
const MAX_MESSAGES_BEFORE_RECYCLE = 50_000;

// Idle self-kill — exit if worker processes 0 messages for this many consecutive intervals
const IDLE_SELF_KILL_THRESHOLD = 5; // ~5 min of 0 messages (5 * 60s)

// Per-message processing timeout — must be under VisibilityTimeout (30s)
const MESSAGE_TIMEOUT_MS = 25_000;

// Stale connection detection
let consecutiveEmptyPolls = 0;
Expand All @@ -43,6 +49,17 @@ const ZERO_MESSAGE_ALERT_THRESHOLD = 20; // ~20 min of 0 messages

// ============ Helper Functions ============

const withTimeout = <T>(promise: Promise<T>, timeoutMs: number): Promise<T> =>
Promise.race([
promise,
new Promise<never>((_, reject) =>
setTimeout(
() => reject(new Error(`Processing timed out after ${timeoutMs}ms`)),
timeoutMs,
),
),
]);

const logPrefix = () => `[SQS Worker ${process.pid}]`;

const alertZeroMessages = () => {
Expand All @@ -67,6 +84,17 @@ const logStatsAndCheckZeroMessages = () => {

if (messagesProcessed === 0) {
consecutiveZeroMessageIntervals++;

if (
consecutiveZeroMessageIntervals >= IDLE_SELF_KILL_THRESHOLD &&
totalMessagesProcessed > 0
) {
console.log(
`${logPrefix()} Idle self-kill: 0 messages for ${consecutiveZeroMessageIntervals} intervals after processing ${totalMessagesProcessed} total. Exiting for cluster respawn.`,
);
process.exit(0);
}
Comment on lines +88 to +96
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idle self-kill can trigger while fire-and-forget migration jobs are still executing.

Migration jobs are dispatched with .catch() without being awaited (line 275), and messagesProcessed only increments when handleSingleMessage completes. If a migration job runs for 5+ minutes and no regular messages are processed in the interim, consecutiveZeroMessageIntervals reaches 5, causing process.exit(0) while the migration is still mid-flight.

Consider tracking active migration job count:

let activeMigrationJobs = 0;

// When dispatching a migration job (line 275):
activeMigrationJobs++;
handleSingleMessage({ sqs, message, db })
  .catch((error) => {
    console.error(...);
    Sentry.captureException(error);
  })
  .finally(() => activeMigrationJobs--);

// In idle self-kill check (line 88):
if (
  consecutiveZeroMessageIntervals >= IDLE_SELF_KILL_THRESHOLD &&
  totalMessagesProcessed > 0 &&
  activeMigrationJobs === 0
) {
  // safe to exit
}
Prompt To Fix With AI
This is a comment left during a code review.
Path: server/src/queue/initWorkers.ts
Line: 88-96

Comment:
Idle self-kill can trigger while fire-and-forget migration jobs are still executing.

Migration jobs are dispatched with `.catch()` without being awaited (line 275), and `messagesProcessed` only increments when `handleSingleMessage` completes. If a migration job runs for 5+ minutes and no regular messages are processed in the interim, `consecutiveZeroMessageIntervals` reaches 5, causing `process.exit(0)` while the migration is still mid-flight.

Consider tracking active migration job count:

```ts
let activeMigrationJobs = 0;

// When dispatching a migration job (line 275):
activeMigrationJobs++;
handleSingleMessage({ sqs, message, db })
  .catch((error) => {
    console.error(...);
    Sentry.captureException(error);
  })
  .finally(() => activeMigrationJobs--);

// In idle self-kill check (line 88):
if (
  consecutiveZeroMessageIntervals >= IDLE_SELF_KILL_THRESHOLD &&
  totalMessagesProcessed > 0 &&
  activeMigrationJobs === 0
) {
  // safe to exit
}
```

How can I resolve this? If you propose a fix, please make it concise.


if (consecutiveZeroMessageIntervals >= ZERO_MESSAGE_ALERT_THRESHOLD) {
alertZeroMessages();
consecutiveZeroMessageIntervals = 0;
Expand Down Expand Up @@ -126,7 +154,13 @@ const handleSingleMessage = async ({
await deleteMigrationJobImmediately({ sqs, message, job });
}

await processMessage({ message, db });
const isMigration = job.name === JobName.Migration;
if (isMigration) {
await processMessage({ message, db });
} else {
await withTimeout(processMessage({ message, db }), MESSAGE_TIMEOUT_MS);
}

messagesProcessed++;
totalMessagesProcessed++;

Expand Down Expand Up @@ -230,8 +264,30 @@ const startPollingLoop = async ({ db }: { db: DrizzleCli }) => {
if (messages.length > 0) {
consecutiveEmptyPolls = 0;

// Separate migration jobs — they're long-running and already deleted
// from the queue before processing, so fire-and-forget to avoid
// blocking the polling loop
const regularMessages: Message[] = [];
for (const message of messages) {
if (!message.Body) continue;
const job: SqsJob = JSON.parse(message.Body);
if (job.name === JobName.Migration) {
handleSingleMessage({ sqs, message, db }).catch((error) => {
console.error(
`${logPrefix()} Migration job failed:`,
error instanceof Error ? error.message : error,
);
Sentry.captureException(error);
});
} else {
regularMessages.push(message);
}
}

const results = await Promise.allSettled(
messages.map((message) => handleSingleMessage({ sqs, message, db })),
regularMessages.map((message) =>
handleSingleMessage({ sqs, message, db }),
),
);

const toDelete = results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ test.concurrent(`${chalk.yellowBright("migrate-paid-2: allocated seats with pric

// Verify initial state
let customer = await autumnV1.customers.get<ApiCustomerV3>(customerId);
const invoiceCountBefore = customer.invoices?.length ?? 0;

expectCustomerFeatureCorrect({
customer,
Expand Down Expand Up @@ -202,7 +201,7 @@ test.concurrent(`${chalk.yellowBright("migrate-paid-2: allocated seats with pric
// CRITICAL: No new invoice created
await expectCustomerInvoiceCorrect({
customer,
count: invoiceCountBefore,
count: 2,
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Hardcoding the invoice count to 2 no longer verifies “no new invoice created” and makes the test brittle if the initial invoice count changes. Capture the pre-migration invoice count and compare against it instead.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At server/tests/integration/billing/migrations/migrate-paid.test.ts, line 204:

<comment>Hardcoding the invoice count to 2 no longer verifies “no new invoice created” and makes the test brittle if the initial invoice count changes. Capture the pre-migration invoice count and compare against it instead.</comment>

<file context>
@@ -202,7 +201,7 @@ test.concurrent(`${chalk.yellowBright("migrate-paid-2: allocated seats with pric
 	await expectCustomerInvoiceCorrect({
 		customer,
-		count: invoiceCountBefore,
+		count: 2,
 	});
 
</file context>
Fix with Cubic

});
Comment on lines 201 to 205
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded invoice count weakens the migration invariant test.

Previously, this test captured invoiceCountBefore and asserted count: invoiceCountBefore, directly verifying "the migration does not create new invoices." By hardcoding count: 2, the test now only checks "there are exactly 2 invoices at this point" — a different guarantee that couples to test setup state.

This is inconsistent with migrate-paid-1 (line 110), migrate-paid-3 (line 298), and migrate-paid-4 (line 396), which all use the dynamic invoiceCountBefore pattern. If the test setup changes or produces an additional invoice earlier, the assertion will silently pass even if the migration itself created an extra invoice.

Consider reverting to the dynamic pattern to maintain the CRITICAL invariant:

Suggested change
// CRITICAL: No new invoice created
await expectCustomerInvoiceCorrect({
customer,
count: invoiceCountBefore,
count: 2,
});
// Verify initial state
let customer = await autumnV1.customers.get<ApiCustomerV3>(customerId);
const invoiceCountBefore = customer.invoices?.length ?? 0;
// ... later ...
await expectCustomerInvoiceCorrect({
customer,
count: invoiceCountBefore,
});
Prompt To Fix With AI
This is a comment left during a code review.
Path: server/tests/integration/billing/migrations/migrate-paid.test.ts
Line: 201-205

Comment:
Hardcoded invoice count weakens the migration invariant test.

Previously, this test captured `invoiceCountBefore` and asserted `count: invoiceCountBefore`, directly verifying "the migration does not create new invoices." By hardcoding `count: 2`, the test now only checks "there are exactly 2 invoices at this point" — a different guarantee that couples to test setup state.

This is inconsistent with `migrate-paid-1` (line 110), `migrate-paid-3` (line 298), and `migrate-paid-4` (line 396), which all use the dynamic `invoiceCountBefore` pattern. If the test setup changes or produces an additional invoice earlier, the assertion will silently pass even if the migration itself created an extra invoice.

Consider reverting to the dynamic pattern to maintain the CRITICAL invariant:

```suggestion
// Verify initial state
let customer = await autumnV1.customers.get<ApiCustomerV3>(customerId);
const invoiceCountBefore = customer.invoices?.length ?? 0;
// ... later ...
await expectCustomerInvoiceCorrect({
  customer,
  count: invoiceCountBefore,
});
```

How can I resolve this? If you propose a fix, please make it concise.


// Verify Stripe subscription is correct
Expand Down
Loading