Skip to content

Commit 2c6fe2c

Browse files
committed
fix tests
1 parent f11ea5a commit 2c6fe2c

File tree

2 files changed

+34
-30
lines changed

2 files changed

+34
-30
lines changed

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.test.ts

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,13 @@ describe("RunQueueConsumerPool", () => {
7272
}
7373
});
7474

75-
function advanceTimeAndTriggerBatch(ms: number) {
75+
function advanceTimeAndProcessMetrics(ms: number) {
7676
vi.advanceTimersByTime(ms);
77-
pool.updateQueueLength(0);
77+
78+
// Trigger batch processing if ready (without adding a sample)
79+
if (pool["metricsProcessor"].shouldProcessBatch()) {
80+
pool["processMetricsBatch"]();
81+
}
7882
}
7983

8084
describe("Static mode (strategy='none')", () => {
@@ -124,11 +128,11 @@ describe("RunQueueConsumerPool", () => {
124128
expect(pool.size).toBe(1);
125129

126130
pool.updateQueueLength(5);
127-
advanceTimeAndTriggerBatch(1100);
131+
advanceTimeAndProcessMetrics(1100);
128132
expect(pool.size).toBe(4); // Damped scaling
129133

130134
pool.updateQueueLength(5);
131-
advanceTimeAndTriggerBatch(1100);
135+
advanceTimeAndProcessMetrics(1100);
132136
expect(pool.size).toBe(5); // Gradually approaches target
133137
});
134138

@@ -148,11 +152,11 @@ describe("RunQueueConsumerPool", () => {
148152
expect(pool.size).toBe(1);
149153

150154
pool.updateQueueLength(100);
151-
advanceTimeAndTriggerBatch(1100);
155+
advanceTimeAndProcessMetrics(1100);
152156
expect(pool.size).toBe(5);
153157

154158
pool.updateQueueLength(100);
155-
advanceTimeAndTriggerBatch(1100);
159+
advanceTimeAndProcessMetrics(1100);
156160
expect(pool.size).toBe(5);
157161
});
158162
});
@@ -174,11 +178,11 @@ describe("RunQueueConsumerPool", () => {
174178
expect(pool.size).toBe(2);
175179

176180
pool.updateQueueLength(10);
177-
advanceTimeAndTriggerBatch(1100);
181+
advanceTimeAndProcessMetrics(1100);
178182
expect(pool.size).toBe(3);
179183

180184
pool.updateQueueLength(20);
181-
advanceTimeAndTriggerBatch(1100);
185+
advanceTimeAndProcessMetrics(1100);
182186
expect(pool.size).toBe(4);
183187
});
184188

@@ -199,15 +203,15 @@ describe("RunQueueConsumerPool", () => {
199203
expect(pool.size).toBe(1);
200204

201205
pool.updateQueueLength(10);
202-
advanceTimeAndTriggerBatch(1100);
206+
advanceTimeAndProcessMetrics(1100);
203207
expect(pool.size).toBe(2);
204208

205209
pool.updateQueueLength(0.5);
206-
advanceTimeAndTriggerBatch(1100);
210+
advanceTimeAndProcessMetrics(1100);
207211
expect(pool.size).toBe(3); // EWMA smoothing delays scale down
208212

209213
pool.updateQueueLength(0.5);
210-
advanceTimeAndTriggerBatch(1100);
214+
advanceTimeAndProcessMetrics(1100);
211215
expect(pool.size).toBeGreaterThanOrEqual(3); // Stays in optimal zone
212216
});
213217

@@ -227,11 +231,11 @@ describe("RunQueueConsumerPool", () => {
227231
expect(pool.size).toBe(3);
228232

229233
pool.updateQueueLength(3);
230-
advanceTimeAndTriggerBatch(1100);
234+
advanceTimeAndProcessMetrics(1100);
231235
expect(pool.size).toBe(3);
232236

233237
pool.updateQueueLength(4);
234-
advanceTimeAndTriggerBatch(1100);
238+
advanceTimeAndProcessMetrics(1100);
235239
expect(pool.size).toBe(3);
236240
});
237241
});
@@ -276,12 +280,12 @@ describe("RunQueueConsumerPool", () => {
276280
await pool.start();
277281

278282
pool.updateQueueLength(2);
279-
advanceTimeAndTriggerBatch(1100);
283+
advanceTimeAndProcessMetrics(1100);
280284
const metrics1 = pool.getMetrics();
281285
expect(metrics1.smoothedQueueLength).toBe(2);
282286

283287
pool.updateQueueLength(20);
284-
advanceTimeAndTriggerBatch(1100);
288+
advanceTimeAndProcessMetrics(1100);
285289
const metrics2 = pool.getMetrics();
286290

287291
expect(metrics2.smoothedQueueLength).toBeGreaterThan(2);
@@ -312,7 +316,7 @@ describe("RunQueueConsumerPool", () => {
312316
setTimeout(() => pool.updateQueueLength(length), index * 10);
313317
});
314318

315-
advanceTimeAndTriggerBatch(1100);
319+
advanceTimeAndProcessMetrics(1100);
316320

317321
const metrics = pool.getMetrics();
318322
expect(metrics.queueLength).toBeDefined();
@@ -338,7 +342,7 @@ describe("RunQueueConsumerPool", () => {
338342
}
339343

340344
expect(evaluateScalingSpy).not.toHaveBeenCalled();
341-
advanceTimeAndTriggerBatch(1000);
345+
advanceTimeAndProcessMetrics(1000);
342346
expect(evaluateScalingSpy).toHaveBeenCalledTimes(1);
343347
});
344348

@@ -357,7 +361,7 @@ describe("RunQueueConsumerPool", () => {
357361

358362
const updates = [10, 11, 9, 12, 10, 100, 11, 10, 9, 11, 1];
359363
updates.forEach((length) => pool.updateQueueLength(length));
360-
advanceTimeAndTriggerBatch(1100);
364+
advanceTimeAndProcessMetrics(1100);
361365

362366
const metrics = pool.getMetrics();
363367
expect(metrics.queueLength).toBeGreaterThanOrEqual(9);
@@ -383,12 +387,12 @@ describe("RunQueueConsumerPool", () => {
383387
const scaleToTargetSpy = vi.spyOn(pool as any, "scaleToTarget");
384388

385389
pool.updateQueueLength(10);
386-
advanceTimeAndTriggerBatch(1100);
390+
advanceTimeAndProcessMetrics(1100);
387391
expect(scaleToTargetSpy).not.toHaveBeenCalled();
388392

389393
vi.advanceTimersByTime(10000);
390394
pool.updateQueueLength(20);
391-
advanceTimeAndTriggerBatch(1100);
395+
advanceTimeAndProcessMetrics(1100);
392396
});
393397

394398
it("should respect scale-down cooldown (longer than scale-up)", async () => {
@@ -411,7 +415,7 @@ describe("RunQueueConsumerPool", () => {
411415
pool["metrics"].lastScaleTime = new Date(Date.now() - 70000);
412416

413417
pool.updateQueueLength(1);
414-
advanceTimeAndTriggerBatch(1100);
418+
advanceTimeAndProcessMetrics(1100);
415419

416420
const metrics = pool.getMetrics();
417421
expect(metrics.queueLength).toBe(1);
@@ -443,7 +447,7 @@ describe("RunQueueConsumerPool", () => {
443447
}
444448

445449
pools.forEach((p) => p.updateQueueLength(20));
446-
advanceTimeAndTriggerBatch(1100);
450+
advanceTimeAndProcessMetrics(1100);
447451
vi.advanceTimersByTime(15000);
448452

449453
await Promise.all(pools.map((p) => p.stop()));
@@ -496,7 +500,7 @@ describe("RunQueueConsumerPool", () => {
496500

497501
expect(mockOnDequeue).toHaveBeenCalledWith(messages);
498502

499-
advanceTimeAndTriggerBatch(1100);
503+
advanceTimeAndProcessMetrics(1100);
500504
const metrics = pool.getMetrics();
501505
expect(metrics.queueLength).toBe(15);
502506
});
@@ -553,7 +557,7 @@ describe("RunQueueConsumerPool", () => {
553557
pool.updateQueueLength(10 + i);
554558
}
555559

556-
advanceTimeAndTriggerBatch(1100);
560+
advanceTimeAndProcessMetrics(1100);
557561
const metrics = pool.getMetrics();
558562
expect(metrics.queueLength).toBeDefined();
559563
});
@@ -626,7 +630,7 @@ describe("RunQueueConsumerPool", () => {
626630
expect(pool.size).toBe(2);
627631

628632
pool.updateQueueLength(100);
629-
advanceTimeAndTriggerBatch(1100);
633+
advanceTimeAndProcessMetrics(1100);
630634
expect(pool.size).toBeLessThanOrEqual(5);
631635
});
632636

@@ -647,14 +651,14 @@ describe("RunQueueConsumerPool", () => {
647651
expect(pool.size).toBe(1);
648652

649653
pool.updateQueueLength(10);
650-
advanceTimeAndTriggerBatch(1100);
654+
advanceTimeAndProcessMetrics(1100);
651655

652656
const firstSize = pool.size;
653657
expect(firstSize).toBeGreaterThanOrEqual(1);
654658
expect(firstSize).toBeLessThanOrEqual(2);
655659

656660
pool.updateQueueLength(10);
657-
advanceTimeAndTriggerBatch(1100);
661+
advanceTimeAndProcessMetrics(1100);
658662
expect(pool.size).toBeLessThanOrEqual(2);
659663
});
660664

@@ -675,13 +679,13 @@ describe("RunQueueConsumerPool", () => {
675679
expect(pool.size).toBe(1);
676680

677681
pool.updateQueueLength(20);
678-
advanceTimeAndTriggerBatch(1100);
682+
advanceTimeAndProcessMetrics(1100);
679683

680684
const sizeAfterFirstScale = pool.size;
681685
expect(sizeAfterFirstScale).toBeGreaterThanOrEqual(1);
682686

683687
pool.updateQueueLength(20);
684-
advanceTimeAndTriggerBatch(1100);
688+
advanceTimeAndProcessMetrics(1100);
685689
expect(pool.size).toBeLessThanOrEqual(6);
686690
});
687691
});

packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ export class QueueMetricsProcessor {
8686
}
8787

8888
private calculateMedian(samples: number[]): number | null {
89-
const sortedSamples = [...this.samples].sort((a, b) => a - b);
89+
const sortedSamples = [...samples].sort((a, b) => a - b);
9090
const mid = Math.floor(sortedSamples.length / 2);
9191

9292
if (sortedSamples.length % 2 === 1) {

0 commit comments

Comments
 (0)