Skip to content

Commit b2f83f7

Browse files
committed
process the queue on nack
1 parent c9ed990 commit b2f83f7

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

internal-packages/run-engine/src/engine/tests/heartbeats.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,8 @@ describe("RunEngine heartbeats", () => {
350350
assertNonNullable(executionData2);
351351
expect(executionData2.snapshot.executionStatus).toBe("QUEUED");
352352

353+
await setTimeout(1_000);
354+
353355
//have to dequeue again
354356
const dequeued2 = await engine.dequeueFromWorkerQueue({
355357
consumerId: "test_12345",

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,11 +610,13 @@ export class RunQueue {
610610
messageId,
611611
retryAt,
612612
incrementAttemptCount = true,
613+
skipDequeueProcessing = false,
613614
}: {
614615
orgId: string;
615616
messageId: string;
616617
retryAt?: number;
617618
incrementAttemptCount?: boolean;
619+
skipDequeueProcessing?: boolean;
618620
}) {
619621
return this.#trace(
620622
"nackMessage",
@@ -648,6 +650,21 @@ export class RunQueue {
648650
}
649651
}
650652

653+
if (!skipDequeueProcessing) {
654+
// This will move the message to the worker queue so it can be dequeued
655+
await this.worker.enqueueOnce({
656+
id: message.queue, // dedupe by environment, queue, and concurrency key
657+
job: "processQueueForWorkerQueue",
658+
payload: {
659+
queueKey: message.queue,
660+
environmentId: message.environmentId,
661+
},
662+
// Add a small delay to dedupe messages so at most one of these will processed,
663+
// every 500ms per queue, concurrency key, and environment
664+
availableAt: new Date(Date.now() + (this.options.processWorkerQueueDebounceMs ?? 500)), // 500ms from now
665+
});
666+
}
667+
651668
await this.#callNackMessage({ message, retryAt });
652669

653670
return true;

0 commit comments

Comments
 (0)