Skip to content

Commit 712e7bd

Browse files
committed
feat(supervisor): consumer pool metrics
1 parent 4c2659e commit 712e7bd

File tree

5 files changed

+209
-4
lines changed

5 files changed

+209
-4
lines changed

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ class ManagedSupervisor {
128128
dequeueIdleIntervalMs: env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS,
129129
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
130130
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
131+
metricsRegistry: register,
131132
scaling: {
132133
strategy: env.TRIGGER_DEQUEUE_SCALING_STRATEGY,
133134
minConsumerCount: env.TRIGGER_DEQUEUE_MIN_CONSUMER_COUNT,

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import {
77
ScalingStrategyKind,
88
ScalingStrategyOptions,
99
} from "./scalingStrategies.js";
10+
import { ConsumerPoolMetrics } from "./consumerPoolMetrics.js";
11+
import type { Registry } from "prom-client";
1012

1113
export type QueueConsumerFactory = (opts: RunQueueConsumerOptions) => QueueConsumer;
1214

@@ -27,6 +29,7 @@ export type ConsumerPoolOptions = {
2729
consumer: RunQueueConsumerOptions;
2830
scaling: ScalingOptions;
2931
consumerFactory?: QueueConsumerFactory;
32+
metricsRegistry?: Registry;
3033
};
3134

3235
type ScalingMetrics = {
@@ -41,6 +44,7 @@ export class RunQueueConsumerPool {
4144
private readonly consumerOptions: RunQueueConsumerOptions;
4245

4346
private readonly logger = new SimpleStructuredLogger("consumer-pool");
47+
private readonly promMetrics?: ConsumerPoolMetrics;
4448

4549
private readonly minConsumerCount: number;
4650
private readonly maxConsumerCount: number;
@@ -69,6 +73,13 @@ export class RunQueueConsumerPool {
6973
constructor(opts: ConsumerPoolOptions) {
7074
this.consumerOptions = opts.consumer;
7175

76+
// Initialize Prometheus metrics if registry provided
77+
if (opts.metricsRegistry) {
78+
this.promMetrics = new ConsumerPoolMetrics({
79+
register: opts.metricsRegistry,
80+
});
81+
}
82+
7283
this.minConsumerCount = Math.max(1, opts.scaling.minConsumerCount ?? 1);
7384
this.maxConsumerCount = Math.max(this.minConsumerCount, opts.scaling.maxConsumerCount ?? 10);
7485
this.scaleUpCooldownMs = opts.scaling.scaleUpCooldownMs ?? 10000; // 10 seconds default
@@ -142,6 +153,15 @@ export class RunQueueConsumerPool {
142153
this.logger.log("Started dynamic consumer pool", {
143154
initialConsumerCount: this.consumers.size,
144155
});
156+
157+
// Initialize Prometheus metrics with initial state
158+
this.promMetrics?.updateState({
159+
consumerCount: this.consumers.size,
160+
queueLength: this.metrics.queueLength,
161+
smoothedQueueLength: this.metrics.smoothedQueueLength,
162+
targetConsumerCount: initialCount,
163+
strategy: this.scalingStrategy.name,
164+
});
145165
}
146166

147167
async stop() {
@@ -168,6 +188,9 @@ export class RunQueueConsumerPool {
168188
return;
169189
}
170190

191+
// Track queue length update in metrics
192+
this.promMetrics?.recordQueueLengthUpdate();
193+
171194
// Skip metrics tracking for static mode
172195
if (this.scalingStrategy.name === "none") {
173196
return;
@@ -250,6 +273,7 @@ export class RunQueueConsumerPool {
250273
jitterMs,
251274
remainingMs: effectiveCooldown - timeSinceLastScale,
252275
});
276+
this.promMetrics?.recordCooldownApplied("up");
253277
return;
254278
}
255279
} else if (targetCount < this.consumers.size) {
@@ -262,6 +286,7 @@ export class RunQueueConsumerPool {
262286
jitterMs,
263287
remainingMs: effectiveCooldown - timeSinceLastScale,
264288
});
289+
this.promMetrics?.recordCooldownApplied("down");
265290
return;
266291
}
267292
}
@@ -310,13 +335,26 @@ export class RunQueueConsumerPool {
310335

311336
if (targetCount > actualCurrentCount) {
312337
// Scale up
313-
this.addConsumers(targetCount - actualCurrentCount);
338+
const count = targetCount - actualCurrentCount;
339+
this.addConsumers(count);
340+
this.promMetrics?.recordScalingOperation("up", this.scalingStrategy.name, count);
314341
} else if (targetCount < actualCurrentCount) {
315342
// Scale down
316-
this.removeConsumers(actualCurrentCount - targetCount);
343+
const count = actualCurrentCount - targetCount;
344+
this.removeConsumers(count);
345+
this.promMetrics?.recordScalingOperation("down", this.scalingStrategy.name, count);
317346
}
318347

319348
this.metrics.lastScaleTime = new Date();
349+
350+
// Update Prometheus state metrics
351+
this.promMetrics?.updateState({
352+
consumerCount: this.consumers.size,
353+
queueLength: this.metrics.queueLength,
354+
smoothedQueueLength: this.metrics.smoothedQueueLength,
355+
targetConsumerCount: targetCount,
356+
strategy: this.scalingStrategy.name,
357+
});
320358
}
321359

322360
private addConsumers(count: number) {
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
import { Counter, Gauge, Histogram, Registry } from "prom-client";
2+
3+
export interface ConsumerPoolMetricsOptions {
4+
register?: Registry;
5+
prefix?: string;
6+
}
7+
8+
export class ConsumerPoolMetrics {
9+
private readonly register: Registry;
10+
private readonly prefix: string;
11+
12+
// Current state metrics
13+
public readonly consumerCount: Gauge;
14+
public readonly queueLength: Gauge;
15+
public readonly smoothedQueueLength: Gauge;
16+
public readonly targetConsumerCount: Gauge;
17+
public readonly scalingStrategy: Gauge;
18+
19+
// Scaling operation metrics
20+
public readonly scalingOperationsTotal: Counter;
21+
public readonly consumersAddedTotal: Counter;
22+
public readonly consumersRemovedTotal: Counter;
23+
public readonly scalingCooldownsApplied: Counter;
24+
25+
// Performance metrics
26+
public readonly queueLengthUpdatesTotal: Counter;
27+
public readonly batchesProcessedTotal: Counter;
28+
29+
constructor(opts: ConsumerPoolMetricsOptions = {}) {
30+
this.register = opts.register ?? new Registry();
31+
this.prefix = opts.prefix ?? "queue_consumer_pool";
32+
33+
// Current state metrics
34+
this.consumerCount = new Gauge({
35+
name: `${this.prefix}_consumer_count`,
36+
help: "Current number of active queue consumers",
37+
labelNames: ["strategy"],
38+
registers: [this.register],
39+
});
40+
41+
this.queueLength = new Gauge({
42+
name: `${this.prefix}_queue_length`,
43+
help: "Current queue length (median of recent samples)",
44+
registers: [this.register],
45+
});
46+
47+
this.smoothedQueueLength = new Gauge({
48+
name: `${this.prefix}_smoothed_queue_length`,
49+
help: "EWMA smoothed queue length",
50+
registers: [this.register],
51+
});
52+
53+
this.targetConsumerCount = new Gauge({
54+
name: `${this.prefix}_target_consumer_count`,
55+
help: "Target number of consumers calculated by scaling strategy",
56+
labelNames: ["strategy"],
57+
registers: [this.register],
58+
});
59+
60+
this.scalingStrategy = new Gauge({
61+
name: `${this.prefix}_scaling_strategy_info`,
62+
help: "Information about the active scaling strategy (1 = active, 0 = inactive)",
63+
labelNames: ["strategy"],
64+
registers: [this.register],
65+
});
66+
67+
// Scaling operation metrics
68+
this.scalingOperationsTotal = new Counter({
69+
name: `${this.prefix}_scaling_operations_total`,
70+
help: "Total number of scaling operations performed",
71+
labelNames: ["direction", "strategy"],
72+
registers: [this.register],
73+
});
74+
75+
this.consumersAddedTotal = new Counter({
76+
name: `${this.prefix}_consumers_added_total`,
77+
help: "Total number of consumers added",
78+
registers: [this.register],
79+
});
80+
81+
this.consumersRemovedTotal = new Counter({
82+
name: `${this.prefix}_consumers_removed_total`,
83+
help: "Total number of consumers removed",
84+
registers: [this.register],
85+
});
86+
87+
this.scalingCooldownsApplied = new Counter({
88+
name: `${this.prefix}_scaling_cooldowns_applied_total`,
89+
help: "Number of times scaling was prevented due to cooldown",
90+
labelNames: ["direction"],
91+
registers: [this.register],
92+
});
93+
94+
this.queueLengthUpdatesTotal = new Counter({
95+
name: `${this.prefix}_queue_length_updates_total`,
96+
help: "Total number of queue length updates received",
97+
registers: [this.register],
98+
});
99+
100+
this.batchesProcessedTotal = new Counter({
101+
name: `${this.prefix}_batches_processed_total`,
102+
help: "Total number of metric batches processed",
103+
registers: [this.register],
104+
});
105+
}
106+
107+
/**
108+
* Update all gauge metrics with current state
109+
*/
110+
updateState(state: {
111+
consumerCount: number;
112+
queueLength?: number;
113+
smoothedQueueLength: number;
114+
targetConsumerCount: number;
115+
strategy: string;
116+
}) {
117+
this.consumerCount.set({ strategy: state.strategy }, state.consumerCount);
118+
119+
if (state.queueLength !== undefined) {
120+
this.queueLength.set(state.queueLength);
121+
}
122+
123+
this.smoothedQueueLength.set(state.smoothedQueueLength);
124+
this.targetConsumerCount.set({ strategy: state.strategy }, state.targetConsumerCount);
125+
126+
// Set strategy info (1 for active strategy, 0 for others)
127+
["none", "smooth", "aggressive"].forEach((s) => {
128+
this.scalingStrategy.set({ strategy: s }, s === state.strategy ? 1 : 0);
129+
});
130+
}
131+
132+
/**
133+
* Record a scaling operation
134+
*/
135+
recordScalingOperation(direction: "up" | "down" | "none", strategy: string, count: number) {
136+
if (direction !== "none") {
137+
this.scalingOperationsTotal.inc({ direction, strategy });
138+
139+
if (direction === "up") {
140+
this.consumersAddedTotal.inc(count);
141+
} else {
142+
this.consumersRemovedTotal.inc(count);
143+
}
144+
}
145+
}
146+
147+
/**
148+
* Record that scaling was prevented by cooldown
149+
*/
150+
recordCooldownApplied(direction: "up" | "down") {
151+
this.scalingCooldownsApplied.inc({ direction });
152+
}
153+
154+
/**
155+
* Record a queue length update
156+
*/
157+
recordQueueLengthUpdate() {
158+
this.queueLengthUpdatesTotal.inc();
159+
}
160+
}

packages/core/src/v3/runEngineWorker/supervisor/queueMetricsProcessor.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { SimpleStructuredLogger } from "../../utils/structuredLogger.js";
2+
13
export interface QueueMetricsProcessorOptions {
24
/**
35
* EWMA smoothing factor (0-1)
@@ -37,6 +39,7 @@ export interface BatchProcessingResult {
3739
export class QueueMetricsProcessor {
3840
private readonly ewmaAlpha: number;
3941
private readonly batchWindowMs: number;
42+
private readonly logger = new SimpleStructuredLogger("queue-metrics-processor");
4043

4144
private samples: number[] = [];
4245
private smoothedValue: number = 0;
@@ -91,7 +94,7 @@ export class QueueMetricsProcessor {
9194
const median = sortedSamples[mid];
9295

9396
if (median === undefined) {
94-
console.error("Invalid median calculated from odd samples", {
97+
this.logger.error("Invalid median calculated from odd samples", {
9598
sortedSamples,
9699
mid,
97100
median,
@@ -106,7 +109,7 @@ export class QueueMetricsProcessor {
106109
const highMid = sortedSamples[mid];
107110

108111
if (lowMid === undefined || highMid === undefined) {
109-
console.error("Invalid median calculated from even samples", {
112+
this.logger.error("Invalid median calculated from even samples", {
110113
sortedSamples,
111114
mid,
112115
lowMid,

packages/core/src/v3/runEngineWorker/supervisor/session.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { WorkerClientToServerEvents, WorkerServerToClientEvents } from "../types
1010
import { getDefaultWorkerHeaders } from "./util.js";
1111
import { IntervalService } from "../../utils/interval.js";
1212
import { SimpleStructuredLogger } from "../../utils/structuredLogger.js";
13+
import type { Registry } from "prom-client";
1314

1415
type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1516
queueConsumerEnabled?: boolean;
@@ -22,6 +23,7 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
2223
maxRunCount?: number;
2324
sendRunDebugLogs?: boolean;
2425
scaling: ScalingOptions;
26+
metricsRegistry?: Registry;
2527
};
2628

2729
export class SupervisorSession extends EventEmitter<WorkerEvents> {
@@ -56,6 +58,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
5658
maxRunCount: opts.maxRunCount,
5759
},
5860
scaling: opts.scaling,
61+
metricsRegistry: opts.metricsRegistry,
5962
});
6063

6164
this.heartbeat = new IntervalService({

0 commit comments

Comments
 (0)