Skip to content

Commit 16ab40a

Browse files
committed
handle scale down when queue length drops to zero
1 parent 4af3dd6 commit 16ab40a

File tree

2 files changed

+46
-23
lines changed

2 files changed

+46
-23
lines changed

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts

Lines changed: 43 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -589,20 +589,6 @@ describe("RunQueueConsumerPool", () => {
589589
});
590590

591591
describe("Edge cases", () => {
592-
it("should handle undefined queue lengths gracefully", async () => {
593-
pool = new RunQueueConsumerPool({
594-
...defaultOptions,
595-
scaling: { strategy: "smooth" },
596-
});
597-
598-
await pool.start();
599-
600-
expect(() => pool.updateQueueLength(undefined)).not.toThrow();
601-
602-
const metrics = pool.getMetrics();
603-
expect(metrics.queueLength).toBeUndefined();
604-
});
605-
606592
it("should handle empty recent queue lengths", async () => {
607593
pool = new RunQueueConsumerPool({
608594
...defaultOptions,
@@ -688,5 +674,48 @@ describe("RunQueueConsumerPool", () => {
688674
advanceTimeAndProcessMetrics(1100);
689675
expect(pool.size).toBeLessThanOrEqual(6);
690676
});
677+
678+
it("should scale down when no items are dequeued (zero queue length)", async () => {
679+
pool = new RunQueueConsumerPool({
680+
...defaultOptions,
681+
scaling: {
682+
strategy: "smooth",
683+
minConsumerCount: 1,
684+
maxConsumerCount: 10,
685+
scaleUpCooldownMs: 0,
686+
scaleDownCooldownMs: 0,
687+
disableJitter: true,
688+
},
689+
});
690+
691+
await pool.start();
692+
expect(pool.size).toBe(1);
693+
694+
// Scale up first
695+
pool.updateQueueLength(20);
696+
advanceTimeAndProcessMetrics(1100);
697+
expect(pool.size).toBeGreaterThan(1);
698+
const sizeAfterScaleUp = pool.size;
699+
700+
// Now send multiple zero queue lengths to converge EWMA to 0
701+
// The EWMA needs time to converge due to exponential smoothing
702+
for (let i = 0; i < 5; i++) {
703+
pool.updateQueueLength(0);
704+
advanceTimeAndProcessMetrics(1100);
705+
}
706+
707+
// After multiple iterations with zero queue, should scale down but not to minimum yet
708+
expect(pool.size).toBeLessThan(sizeAfterScaleUp);
709+
expect(pool.size).toBeGreaterThan(1);
710+
711+
// Continue until we reach minimum
712+
for (let i = 0; i < 5; i++) {
713+
pool.updateQueueLength(0);
714+
advanceTimeAndProcessMetrics(1100);
715+
}
716+
717+
// Should eventually reach minimum
718+
expect(pool.size).toBe(1);
719+
});
691720
});
692721
});

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,7 @@ export class RunQueueConsumerPool {
182182
* Updates the queue length metric and triggers scaling decisions
183183
* Uses QueueMetricsProcessor for batching and EWMA smoothing
184184
*/
185-
updateQueueLength(queueLength: number | undefined) {
186-
if (queueLength === undefined) {
187-
return;
188-
}
189-
185+
updateQueueLength(queueLength: number) {
190186
// Track queue length update in metrics
191187
this.promMetrics?.recordQueueLengthUpdate();
192188

@@ -356,10 +352,8 @@ export class RunQueueConsumerPool {
356352
const consumer = this.consumerFactory({
357353
...this.consumerOptions,
358354
onDequeue: async (messages) => {
359-
// Update queue length if provided
360-
if (messages.length > 0 && messages[0]?.workerQueueLength !== undefined) {
361-
this.updateQueueLength(messages[0]?.workerQueueLength);
362-
}
355+
// Always update queue length, default to 0 for empty dequeues or missing value
356+
this.updateQueueLength(messages[0]?.workerQueueLength ?? 0);
363357

364358
// Forward to the original handler
365359
await this.consumerOptions.onDequeue(messages);

0 commit comments

Comments
 (0)