Skip to content

Commit 389e2d1

Browse files
committed
feat(supervisor): dynamic queue consumer pool
1 parent 847ea86 commit 389e2d1

File tree

11 files changed

+1980
-15
lines changed

11 files changed

+1980
-15
lines changed

apps/supervisor/src/env.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,15 @@ const Env = z.object({
3535
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default(true),
3636
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(250),
3737
TRIGGER_DEQUEUE_IDLE_INTERVAL_MS: z.coerce.number().int().default(1000),
38-
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
39-
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(1),
38+
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(1),
39+
TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT: z.coerce.number().int().default(1),
40+
TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT: z.coerce.number().int().default(10),
41+
TRIGGER_DEQUEUE_SCALING_STRATEGY: z.enum(["none", "smooth", "aggressive"]).default("none"),
42+
TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS: z.coerce.number().int().default(10000), // 10 seconds
43+
TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS: z.coerce.number().int().default(60000), // 60 seconds
44+
TRIGGER_DEQUEUE_SCALING_TARGET_RATIO: z.coerce.number().default(1.0), // Target ratio of queue items to consumers (1.0 = 1 item per consumer)
45+
TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA: z.coerce.number().min(0).max(1).default(0.3), // EWMA smoothing factor (0-1)
46+
TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS: z.coerce.number().int().positive().default(1000), // Batch window for metrics processing (ms)
4047

4148
// Optional services
4249
TRIGGER_WARM_START_URL: z.string().optional(),

apps/supervisor/src/index.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,16 @@ class ManagedSupervisor {
128128
dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS,
129129
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
130130
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
131-
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
131+
scaling: {
132+
strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY,
133+
minConsumerCount: env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT,
134+
maxConsumerCount: env.TRIGGER_DEQUEUE_MAX_CONSUMER_COUNT,
135+
scaleUpCooldownMs: env.TRIGGER_DEQUEUE_SCALING_UP_COOLDOWN_MS,
136+
scaleDownCooldownMs: env.TRIGGER_DEQUEUE_SCALING_DOWN_COOLDOWN_MS,
137+
targetRatio: env.TRIGGER_DEQUEUE_SCALING_TARGET_RATIO,
138+
ewmaAlpha: env.TRIGGER_DEQUEUE_SCALING_EWMA_ALPHA,
139+
batchWindowMs: env.TRIGGER_DEQUEUE_SCALING_BATCH_WINDOW_MS,
140+
},
132141
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
133142
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
134143
sendRunDebugLogs: env.SEND_RUN_DEBUG_LOGS,

packages/core/src/v3/runEngineWorker/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export * from "./consts.js";
22
export * from "./supervisor/http.js";
33
export * from "./supervisor/schemas.js";
44
export * from "./supervisor/session.js";
5+
export * from "./supervisor/consumerPool.js";
56
export * from "./workload/http.js";
67
export * from "./workload/schemas.js";
78
export * from "./types.js";

0 commit comments

Comments
 (0)