Skip to content

Commit 93fc0cd

Browse files
committed
more tests and fixes
1 parent 2c6fe2c commit 93fc0cd

File tree

3 files changed

+201
-106
lines changed

3 files changed

+201
-106
lines changed

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { QueueConsumer, RunQueueConsumer, RunQueueConsumerOptions } from "./queu
33
import { QueueMetricsProcessor } from "./queueMetricsProcessor.js";
44
import {
55
ScalingStrategy,
6-
ScalingContext,
76
ScalingStrategyKind,
87
ScalingStrategyOptions,
98
} from "./scalingStrategies.js";
@@ -65,11 +64,6 @@ export class RunQueueConsumerPool {
6564
private readonly scaleDownCooldownMs: number;
6665
private readonly batchWindowMs: number;
6766

68-
// Target ratio of queue items to consumers
69-
// 1.0 = each consumer handles ~1 item (aggressive scaling)
70-
// Higher values = fewer consumers needed (relaxed scaling)
71-
private readonly targetRatio: number;
72-
7367
constructor(opts: ConsumerPoolOptions) {
7468
this.consumerOptions = opts.consumer;
7569

@@ -84,7 +78,6 @@ export class RunQueueConsumerPool {
8478
this.maxConsumerCount = Math.max(this.minConsumerCount, opts.scaling.maxConsumerCount ?? 10);
8579
this.scaleUpCooldownMs = opts.scaling.scaleUpCooldownMs ?? 10000; // 10 seconds default
8680
this.scaleDownCooldownMs = opts.scaling.scaleDownCooldownMs ?? 60000; // 60 seconds default
87-
this.targetRatio = opts.scaling.targetRatio ?? 1.0;
8881
this.disableJitter = opts.scaling.disableJitter ?? false;
8982

9083
// Configure EWMA parameters from options
@@ -105,10 +98,16 @@ export class RunQueueConsumerPool {
10598
batchWindowMs: this.batchWindowMs,
10699
});
107100

101+
const targetRatio = opts.scaling.targetRatio ?? 1.0;
102+
const dampingFactor = opts.scaling.strategyOptions?.dampingFactor;
103+
108104
// Create scaling strategy with metrics processor injected
109105
this.scalingStrategy = ScalingStrategy.create(opts.scaling.strategy ?? "none", {
110-
...opts.scaling.strategyOptions,
111106
metricsProcessor: this.metricsProcessor,
107+
dampingFactor,
108+
targetRatio,
109+
minConsumerCount: this.minConsumerCount,
110+
maxConsumerCount: this.maxConsumerCount,
112111
});
113112

114113
// Use provided factory or default to RunQueueConsumer
@@ -318,16 +317,7 @@ export class RunQueueConsumerPool {
318317
}
319318

320319
private calculateTargetConsumerCount(): number {
321-
const context: ScalingContext = {
322-
currentConsumerCount: this.consumers.size,
323-
minConsumerCount: this.minConsumerCount,
324-
maxConsumerCount: this.maxConsumerCount,
325-
targetRatio: this.targetRatio,
326-
};
327-
328-
const target = this.scalingStrategy.calculateTargetCount(context);
329-
330-
return target;
320+
return this.scalingStrategy.calculateTargetCount(this.consumers.size);
331321
}
332322

333323
private scaleToTarget(targetCount: number) {

packages/core/src/v3/runEngineWorker/supervisor/scalingStrategies.test.ts

Lines changed: 141 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
import { describe, it, expect } from "vitest";
22
import {
3-
ScalingContext,
43
NoneScalingStrategy,
54
SmoothScalingStrategy,
65
AggressiveScalingStrategy,
6+
ScalingStrategyOptions,
77
} from "./scalingStrategies.js";
88
import { QueueMetricsProcessor } from "./queueMetricsProcessor.js";
99

1010
describe("Scaling Strategies", () => {
11-
const baseContext: ScalingContext = {
12-
currentConsumerCount: 5,
11+
const baseOptions: ScalingStrategyOptions = {
1312
minConsumerCount: 1,
1413
maxConsumerCount: 20,
1514
targetRatio: 1.0,
@@ -24,62 +23,114 @@ describe("Scaling Strategies", () => {
2423
}
2524

2625
describe("NoneScalingStrategy", () => {
27-
const strategy = new NoneScalingStrategy();
28-
29-
it("should always return maxConsumerCount", () => {
30-
expect(strategy.calculateTargetCount(baseContext)).toBe(20);
31-
expect(strategy.calculateTargetCount({ ...baseContext, currentConsumerCount: 1 })).toBe(20);
32-
expect(strategy.calculateTargetCount({ ...baseContext, maxConsumerCount: 25 })).toBe(25);
26+
const strategy = new NoneScalingStrategy(baseOptions);
27+
28+
it("should always return current count (static mode)", () => {
29+
expect(strategy.calculateTargetCount(5)).toBe(5);
30+
expect(strategy.calculateTargetCount(1)).toBe(1);
31+
expect(strategy.calculateTargetCount(10)).toBe(10);
32+
// Clamping still applies
33+
expect(strategy.calculateTargetCount(25)).toBe(20); // Clamped to max
34+
expect(strategy.calculateTargetCount(0)).toBe(1); // Clamped to min
3335
});
3436

3537
it("should have correct name", () => {
3638
expect(strategy.name).toBe("none");
3739
});
40+
41+
it("should handle zero current count", () => {
42+
// Should clamp to minConsumerCount
43+
const result = strategy.calculateTargetCount(0);
44+
expect(result).toBe(1);
45+
});
3846
});
3947

4048
describe("SmoothScalingStrategy", () => {
4149
it("should calculate target based on smoothed queue length", () => {
4250
const metricsProcessor = createMetricsProcessor(10); // smoothed value = 10
43-
const strategy = new SmoothScalingStrategy({ metricsProcessor });
51+
const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor });
4452

4553
// With targetRatio=1.0, target consumers = ceil(10/1.0) = 10
4654
// With dampingFactor=0.7 and currentCount=5:
4755
// dampedTarget = 5 + (10 - 5) * 0.7 = 5 + 3.5 = 8.5 → 9
48-
const result = strategy.calculateTargetCount(baseContext);
56+
const result = strategy.calculateTargetCount(5);
4957
expect(result).toBe(9);
5058
});
5159

5260
it("should apply damping factor correctly", () => {
5361
const metricsProcessor = createMetricsProcessor(20); // smoothed value = 20
54-
const strategy = new SmoothScalingStrategy({ metricsProcessor, dampingFactor: 0.5 }); // 50% damping
62+
const strategy = new SmoothScalingStrategy({
63+
...baseOptions,
64+
metricsProcessor,
65+
dampingFactor: 0.5,
66+
}); // 50% damping
5567

5668
// With targetRatio=1.0, target consumers = ceil(20/1.0) = 20
5769
// With dampingFactor=0.5 and currentCount=5:
5870
// dampedTarget = 5 + (20 - 5) * 0.5 = 5 + 7.5 = 12.5 → 13
59-
const result = strategy.calculateTargetCount(baseContext);
71+
const result = strategy.calculateTargetCount(5);
6072
expect(result).toBe(13);
6173
});
6274

6375
it("should handle zero current count", () => {
6476
const metricsProcessor = createMetricsProcessor(5);
65-
const strategy = new SmoothScalingStrategy({ metricsProcessor });
66-
const context = { ...baseContext, currentConsumerCount: 0 };
77+
const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor });
6778

68-
// Should use minConsumerCount when currentCount is 0
69-
const result = strategy.calculateTargetCount(context);
70-
expect(result).toBeGreaterThan(0);
79+
// With smoothedQueueLength=5, targetRatio=1.0:
80+
// targetConsumers = ceil(5/1.0) = 5
81+
// dampedTarget = 0 + (5 - 0) * 0.7 = 3.5 → 4
82+
const result = strategy.calculateTargetCount(0);
83+
expect(result).toBe(4);
7184
});
7285

7386
it("should validate damping factor", () => {
7487
const metricsProcessor = createMetricsProcessor(10);
75-
expect(() => new SmoothScalingStrategy({ metricsProcessor, dampingFactor: -0.1 })).toThrow(
76-
"dampingFactor must be between 0 and 1"
77-
);
78-
expect(() => new SmoothScalingStrategy({ metricsProcessor, dampingFactor: 1.1 })).toThrow(
79-
"dampingFactor must be between 0 and 1"
80-
);
81-
expect(() => new SmoothScalingStrategy({ metricsProcessor, dampingFactor: 0 })).not.toThrow();
82-
expect(() => new SmoothScalingStrategy({ metricsProcessor, dampingFactor: 1 })).not.toThrow();
88+
expect(
89+
() =>
90+
new SmoothScalingStrategy({
91+
...baseOptions,
92+
metricsProcessor,
93+
dampingFactor: -0.1,
94+
})
95+
).toThrow("dampingFactor must be between 0 and 1");
96+
97+
expect(
98+
() =>
99+
new SmoothScalingStrategy({
100+
...baseOptions,
101+
metricsProcessor,
102+
dampingFactor: 1.1,
103+
})
104+
).toThrow("dampingFactor must be between 0 and 1");
105+
106+
expect(
107+
() =>
108+
new SmoothScalingStrategy({
109+
...baseOptions,
110+
metricsProcessor,
111+
dampingFactor: 0,
112+
})
113+
).not.toThrow();
114+
115+
expect(
116+
() =>
117+
new SmoothScalingStrategy({
118+
...baseOptions,
119+
metricsProcessor,
120+
dampingFactor: 1,
121+
})
122+
).not.toThrow();
123+
});
124+
125+
it("should handle zero current count", () => {
126+
const metricsProcessor = createMetricsProcessor(10);
127+
const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor });
128+
129+
// With smoothedQueueLength=10, targetRatio=1.0:
130+
// targetConsumers = ceil(10/1.0) = 10
131+
// dampedTarget = 0 + (10 - 0) * 0.7 = 7
132+
const result = strategy.calculateTargetCount(0);
133+
expect(result).toBe(7);
83134
});
84135
});
85136

@@ -88,66 +139,72 @@ describe("Scaling Strategies", () => {
88139
// queuePerConsumer = 2/5 = 0.4, scaleDownThreshold = 1.0 * 0.5 = 0.5
89140
// Under-utilized since 0.4 < 0.5
90141
const metricsProcessor = createMetricsProcessor(2);
91-
const strategy = new AggressiveScalingStrategy({ metricsProcessor });
142+
const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor });
92143

93-
const result = strategy.calculateTargetCount(baseContext);
94-
expect(result).toBeLessThan(baseContext.currentConsumerCount);
95-
expect(result).toBeGreaterThanOrEqual(baseContext.minConsumerCount);
144+
const result = strategy.calculateTargetCount(5);
145+
expect(result).toBeLessThan(5);
146+
expect(result).toBeGreaterThanOrEqual(baseOptions.minConsumerCount);
96147
});
97148

98149
it("should maintain count when in optimal zone", () => {
99150
// queuePerConsumer = 5/5 = 1.0
100151
// Optimal zone: 0.5 < 1.0 < 2.0
101152
const metricsProcessor = createMetricsProcessor(5);
102-
const strategy = new AggressiveScalingStrategy({ metricsProcessor });
153+
const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor });
103154

104-
const result = strategy.calculateTargetCount(baseContext);
105-
expect(result).toBe(baseContext.currentConsumerCount);
155+
const result = strategy.calculateTargetCount(5);
156+
expect(result).toBe(5);
106157
});
107158

108159
it("should scale up when over-utilized", () => {
109160
// queuePerConsumer = 15/5 = 3.0, scaleUpThreshold = 1.0 * 2.0 = 2.0
110161
// Over-utilized since 3.0 > 2.0
111162
const metricsProcessor = createMetricsProcessor(15);
112-
const strategy = new AggressiveScalingStrategy({ metricsProcessor });
163+
const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor });
113164

114-
const result = strategy.calculateTargetCount(baseContext);
115-
expect(result).toBeGreaterThan(baseContext.currentConsumerCount);
116-
expect(result).toBeLessThanOrEqual(baseContext.maxConsumerCount);
165+
const result = strategy.calculateTargetCount(5);
166+
expect(result).toBeGreaterThan(5);
167+
expect(result).toBeLessThanOrEqual(baseOptions.maxConsumerCount);
117168
});
118169

119170
it("should scale aggressively for critical load", () => {
120171
// queuePerConsumer = 25/5 = 5.0 (critical: 5x target ratio)
121172
const metricsProcessor = createMetricsProcessor(25);
122-
const strategy = new AggressiveScalingStrategy({ metricsProcessor });
173+
const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor });
123174

124-
const result = strategy.calculateTargetCount(baseContext);
175+
const result = strategy.calculateTargetCount(5);
125176
// Should apply 50% scale factor: ceil(5 * 1.5) = 8
126177
// But capped by 50% max increment: 5 + ceil(5 * 0.5) = 5 + 3 = 8
127178
expect(result).toBe(8);
128179
});
129180

130181
it("should respect max consumer count", () => {
131-
const context = { ...baseContext, maxConsumerCount: 6 };
132182
const metricsProcessor = createMetricsProcessor(50); // Very high load
133-
const strategy = new AggressiveScalingStrategy({ metricsProcessor });
183+
const strategy = new AggressiveScalingStrategy({
184+
...baseOptions,
185+
maxConsumerCount: 6,
186+
metricsProcessor,
187+
});
134188

135-
const result = strategy.calculateTargetCount(context);
189+
const result = strategy.calculateTargetCount(5);
136190
expect(result).toBeLessThanOrEqual(6);
137191
});
138192

139193
it("should respect min consumer count", () => {
140-
const context = { ...baseContext, minConsumerCount: 3 };
141194
const metricsProcessor = createMetricsProcessor(0.1); // Very low load
142-
const strategy = new AggressiveScalingStrategy({ metricsProcessor });
195+
const strategy = new AggressiveScalingStrategy({
196+
...baseOptions,
197+
minConsumerCount: 3,
198+
metricsProcessor,
199+
});
143200

144-
const result = strategy.calculateTargetCount(context);
201+
const result = strategy.calculateTargetCount(5);
145202
expect(result).toBeGreaterThanOrEqual(3);
146203
});
147204

148205
it("should return thresholds", () => {
149206
const metricsProcessor = createMetricsProcessor(10);
150-
const strategy = new AggressiveScalingStrategy({ metricsProcessor });
207+
const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor });
151208
const thresholds = strategy.getThresholds(1.0);
152209
expect(thresholds).toEqual({
153210
scaleDownThreshold: 0.5,
@@ -156,13 +213,36 @@ describe("Scaling Strategies", () => {
156213
highThreshold: 3.0,
157214
});
158215
});
216+
217+
it("should handle zero current count without division by zero", () => {
218+
const metricsProcessor = createMetricsProcessor(10);
219+
const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor });
220+
221+
// Should use (currentCount || 1) to prevent division by zero
222+
// queuePerConsumer = 10 / 1 = 10 (not 10 / 0)
223+
// This is over-utilized (10 > 2.0), should scale up
224+
const result = strategy.calculateTargetCount(0);
225+
expect(result).toBeGreaterThan(0);
226+
expect(result).toBeLessThanOrEqual(baseOptions.maxConsumerCount);
227+
});
228+
229+
it("should handle zero queue with zero consumers", () => {
230+
const metricsProcessor = createMetricsProcessor(0);
231+
const strategy = new AggressiveScalingStrategy({ ...baseOptions, metricsProcessor });
232+
233+
// queuePerConsumer = 0 / 1 = 0
234+
// This is under-utilized (0 < 0.5), should scale down
235+
// But already at 0, so should return minConsumerCount
236+
const result = strategy.calculateTargetCount(0);
237+
expect(result).toBe(baseOptions.minConsumerCount);
238+
});
159239
});
160240

161241
describe("Integration scenarios", () => {
162242
it("should handle gradual load increase with smooth strategy", () => {
163243
const metricsProcessor = createMetricsProcessor(2);
164-
const strategy = new SmoothScalingStrategy({ metricsProcessor });
165-
let context = { ...baseContext, currentConsumerCount: 2 };
244+
const strategy = new SmoothScalingStrategy({ ...baseOptions, metricsProcessor });
245+
let currentCount = 2;
166246

167247
// Gradual increase: 2 → 6 → 10 → 15
168248
const loads = [2, 6, 10, 15];
@@ -172,9 +252,9 @@ describe("Scaling Strategies", () => {
172252
// Update the processor with the new load
173253
metricsProcessor.addSample(load);
174254
metricsProcessor.processBatch();
175-
const target = strategy.calculateTargetCount(context);
255+
const target = strategy.calculateTargetCount(currentCount);
176256
results.push(target);
177-
context = { ...context, currentConsumerCount: target };
257+
currentCount = target;
178258
}
179259

180260
// Should show gradual increase due to damping
@@ -187,20 +267,26 @@ describe("Scaling Strategies", () => {
187267
});
188268

189269
it("should handle load spike with aggressive strategy", () => {
190-
let context = { ...baseContext, currentConsumerCount: 3 };
270+
let currentCount = 3;
191271

192272
// Sudden spike from normal to critical
193273
const normalLoad = 3; // queuePerConsumer = 1.0 (optimal)
194274
const spikeLoad = 15; // queuePerConsumer = 5.0 (critical)
195275

196276
const normalProcessor = createMetricsProcessor(normalLoad);
197-
const normalStrategy = new AggressiveScalingStrategy({ metricsProcessor: normalProcessor });
198-
const normalTarget = normalStrategy.calculateTargetCount(context);
277+
const normalStrategy = new AggressiveScalingStrategy({
278+
...baseOptions,
279+
metricsProcessor: normalProcessor,
280+
});
281+
const normalTarget = normalStrategy.calculateTargetCount(currentCount);
199282
expect(normalTarget).toBe(3); // Should maintain
200283

201284
const spikeProcessor = createMetricsProcessor(spikeLoad);
202-
const spikeStrategy = new AggressiveScalingStrategy({ metricsProcessor: spikeProcessor });
203-
const spikeTarget = spikeStrategy.calculateTargetCount(context);
285+
const spikeStrategy = new AggressiveScalingStrategy({
286+
...baseOptions,
287+
metricsProcessor: spikeProcessor,
288+
});
289+
const spikeTarget = spikeStrategy.calculateTargetCount(currentCount);
204290
expect(spikeTarget).toBeGreaterThan(3); // Should scale up aggressively
205291
});
206292
});

0 commit comments

Comments
 (0)