Skip to content

Commit 0d1eac9

Browse files
authored
feat(supervisor): dynamic queue consumer pool (#2461)
* feat(supervisor): dynamic queue consumer pool * add changeset * fix: correctly handle zero median and even samples * feat(supervisor): consumer pool metrics * fix tests * more tests and fixes * decrease default scaling cooldowns * don't treat initial pool size as scale up * handle scale down when queue length drops to zero * remove changeset, supervisor changes only * add damping factor env var
1 parent ddbae6b commit 0d1eac9

File tree

12 files changed

+2401
-15
lines changed

12 files changed

+2401
-15
lines changed

apps/supervisor/src/env.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,16 @@ const Env = z.object({
3535
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true),
3636
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
3737
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
38-
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
39-
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1),
38+
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1),
39+
TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1),
40+
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10),
41+
TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"),
42+
TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(5000), // 5 seconds
43+
TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(30000), // 30 seconds
44+
TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer)
45+
TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // Smooths queue length measurements (0=historical, 1=current)
46+
TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms)
47+
TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR: z.coerce.number().min(0).max(1).default(0.7), // Smooths consumer count changes after EWMA (0=no scaling, 1=immediate)
4048

4149
// Optional services
4250
TRIGGER_WARM_START_URL: z.string().optional(),

apps/supervisor/src/index.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,18 @@ class ManagedSupervisor {
128128
dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS,
129129
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
130130
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
131-
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
131+
metricsRegistry: register,
132+
scaling: {
133+
strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY,
134+
minConsumerCount: env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT,
135+
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
136+
scaleUpCooldownMs: env.TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS,
137+
scaleDownCooldownMs: env.TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS,
138+
targetRatio: env.TRIGGER_DEQUEUE_SCALING_TARGET_RATIO,
139+
ewmaAlpha: env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA,
140+
batchWindowMs: env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS,
141+
dampingFactor: env.TRIGGER_DEQUEUE_SCALING_DAMPING_FACTOR,
142+
},
132143
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
133144
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
134145
sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS,

packages/core/src/v3/runEngineWorker/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export * from "./consts.js";
22
export * from "./supervisor/http.js";
33
export * from "./supervisor/schemas.js";
44
export * from "./supervisor/session.js";
5+
export * from "./supervisor/consumerPool.js";
56
export * from "./workload/http.js";
67
export * from "./workload/schemas.js";
78
export * from "./types.js";

0 commit comments

Comments
 (0)