Skip to content

Commit 43d03ad

Browse files
authored
Merge pull request #873 from useautumn/fix/worker-backlog
fix: worker backlog
2 parents 466a655 + 29b208b commit 43d03ad

File tree

2 files changed

+60
-5
lines changed

2 files changed

+60
-5
lines changed

server/src/queue/initWorkers.ts

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,13 @@ let totalMessagesProcessed = 0;
2929
let lastStatsTime = Date.now();
3030

3131
// Process recycling — exit after processing this many messages to prevent memory leaks
32-
const MAX_MESSAGES_BEFORE_RECYCLE = 500_000;
32+
const MAX_MESSAGES_BEFORE_RECYCLE = 50_000;
33+
34+
// Idle self-kill — exit if worker processes 0 messages for this many consecutive intervals
35+
const IDLE_SELF_KILL_THRESHOLD = 5; // ~5 min of 0 messages (5 * 60s)
36+
37+
// Per-message processing timeout — must be under VisibilityTimeout (30s)
38+
const MESSAGE_TIMEOUT_MS = 25_000;
3339

3440
// Stale connection detection
3541
let consecutiveEmptyPolls = 0;
@@ -43,6 +49,17 @@ const ZERO_MESSAGE_ALERT_THRESHOLD = 20; // ~20 min of 0 messages
4349

4450
// ============ Helper Functions ============
4551

52+
const withTimeout = <T>(promise: Promise<T>, timeoutMs: number): Promise<T> =>
53+
Promise.race([
54+
promise,
55+
new Promise<never>((_, reject) =>
56+
setTimeout(
57+
() => reject(new Error(`Processing timed out after ${timeoutMs}ms`)),
58+
timeoutMs,
59+
),
60+
),
61+
]);
62+
4663
const logPrefix = () => `[SQS Worker ${process.pid}]`;
4764

4865
const alertZeroMessages = () => {
@@ -67,6 +84,17 @@ const logStatsAndCheckZeroMessages = () => {
6784

6885
if (messagesProcessed === 0) {
6986
consecutiveZeroMessageIntervals++;
87+
88+
if (
89+
consecutiveZeroMessageIntervals >= IDLE_SELF_KILL_THRESHOLD &&
90+
totalMessagesProcessed > 0
91+
) {
92+
console.log(
93+
`${logPrefix()} Idle self-kill: 0 messages for ${consecutiveZeroMessageIntervals} intervals after processing ${totalMessagesProcessed} total. Exiting for cluster respawn.`,
94+
);
95+
process.exit(0);
96+
}
97+
7098
if (consecutiveZeroMessageIntervals >= ZERO_MESSAGE_ALERT_THRESHOLD) {
7199
alertZeroMessages();
72100
consecutiveZeroMessageIntervals = 0;
@@ -126,7 +154,13 @@ const handleSingleMessage = async ({
126154
await deleteMigrationJobImmediately({ sqs, message, job });
127155
}
128156

129-
await processMessage({ message, db });
157+
const isMigration = job.name === JobName.Migration;
158+
if (isMigration) {
159+
await processMessage({ message, db });
160+
} else {
161+
await withTimeout(processMessage({ message, db }), MESSAGE_TIMEOUT_MS);
162+
}
163+
130164
messagesProcessed++;
131165
totalMessagesProcessed++;
132166

@@ -230,8 +264,30 @@ const startPollingLoop = async ({ db }: { db: DrizzleCli }) => {
230264
if (messages.length > 0) {
231265
consecutiveEmptyPolls = 0;
232266

267+
// Separate migration jobs — they're long-running and already deleted
268+
// from the queue before processing, so fire-and-forget to avoid
269+
// blocking the polling loop
270+
const regularMessages: Message[] = [];
271+
for (const message of messages) {
272+
if (!message.Body) continue;
273+
const job: SqsJob = JSON.parse(message.Body);
274+
if (job.name === JobName.Migration) {
275+
handleSingleMessage({ sqs, message, db }).catch((error) => {
276+
console.error(
277+
`${logPrefix()} Migration job failed:`,
278+
error instanceof Error ? error.message : error,
279+
);
280+
Sentry.captureException(error);
281+
});
282+
} else {
283+
regularMessages.push(message);
284+
}
285+
}
286+
233287
const results = await Promise.allSettled(
234-
messages.map((message) => handleSingleMessage({ sqs, message, db })),
288+
regularMessages.map((message) =>
289+
handleSingleMessage({ sqs, message, db }),
290+
),
235291
);
236292

237293
const toDelete = results

server/tests/integration/billing/migrations/migrate-paid.test.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ test.concurrent(`${chalk.yellowBright("migrate-paid-2: allocated seats with pric
156156

157157
// Verify initial state
158158
let customer = await autumnV1.customers.get<ApiCustomerV3>(customerId);
159-
const invoiceCountBefore = customer.invoices?.length ?? 0;
160159

161160
expectCustomerFeatureCorrect({
162161
customer,
@@ -202,7 +201,7 @@ test.concurrent(`${chalk.yellowBright("migrate-paid-2: allocated seats with pric
202201
// CRITICAL: No new invoice created
203202
await expectCustomerInvoiceCorrect({
204203
customer,
205-
count: invoiceCountBefore,
204+
count: 2,
206205
});
207206

208207
// Verify Stripe subscription is correct

0 commit comments

Comments
 (0)