Skip to content

Commit 3afe0ca

Browse files
committed
metrics now working, configure the run queue settings, additional metrics for run engine and redis-worker
1 parent aa39194 commit 3afe0ca

File tree

12 files changed

+176
-104
lines changed

12 files changed

+176
-104
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,8 @@ const EnvironmentSchema = z.object({
330330
INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(),
331331
INTERNAL_OTEL_METRIC_EXPORTER_URL: z.string().optional(),
332332
INTERNAL_OTEL_METRIC_EXPORTER_AUTH_HEADERS: z.string().optional(),
333-
INTERNAL_OTEL_METRIC_EXPORTER_DISABLED: z.string().default("0"),
334-
INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL: z.coerce.number().int().default(30_000),
333+
INTERNAL_OTEL_METRIC_EXPORTER_ENABLED: z.string().default("0"),
334+
INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS: z.coerce.number().int().default(30_000),
335335

336336
ORG_SLACK_INTEGRATION_CLIENT_ID: z.string().optional(),
337337
ORG_SLACK_INTEGRATION_CLIENT_SECRET: z.string().optional(),
@@ -468,8 +468,10 @@ const EnvironmentSchema = z.object({
468468
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
469469
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
470470
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
471+
RUN_ENGINE_RUN_QUEUE_SHARD_COUNT: z.coerce.number().int().default(4),
471472
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
472473
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
474+
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
473475

474476
RUN_ENGINE_WORKER_REDIS_HOST: z
475477
.string()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ function createRunEngine() {
5959
maximumEnvCount: env.RUN_ENGINE_MAXIMUM_ENV_COUNT,
6060
tracer,
6161
},
62+
shardCount: env.RUN_ENGINE_RUN_QUEUE_SHARD_COUNT,
63+
processWorkerQueueDebounceMs: env.RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS,
6264
},
6365
runLock: {
6466
redis: {

apps/webapp/app/v3/tracer.server.ts

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import {
2626
ConsoleMetricExporter,
2727
PeriodicExportingMetricReader,
2828
} from "@opentelemetry/sdk-metrics";
29-
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http";
29+
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto";
3030
import { Resource } from "@opentelemetry/resources";
3131
import {
3232
BatchSpanProcessor,
@@ -289,17 +289,11 @@ function setupTelemetry() {
289289
}
290290

291291
function setupMetrics() {
292-
if (env.INTERNAL_OTEL_METRIC_EXPORTER_DISABLED === "1") {
292+
if (env.INTERNAL_OTEL_METRIC_EXPORTER_ENABLED === "0") {
293293
return metrics.getMeter("trigger.dev", "3.3.12");
294294
}
295295

296-
const exporter = env.INTERNAL_OTEL_METRIC_EXPORTER_URL
297-
? new OTLPMetricExporter({
298-
url: env.INTERNAL_OTEL_METRIC_EXPORTER_URL,
299-
timeoutMillis: 30_000,
300-
headers: parseInternalMetricsHeaders() ?? {},
301-
})
302-
: new ConsoleMetricExporter();
296+
const exporter = createMetricsExporter();
303297

304298
const meterProvider = new MeterProvider({
305299
resource: new Resource({
@@ -309,8 +303,8 @@ function setupMetrics() {
309303
readers: [
310304
new PeriodicExportingMetricReader({
311305
exporter,
312-
exportIntervalMillis: env.INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL,
313-
exportTimeoutMillis: 30_000,
306+
exportIntervalMillis: env.INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS,
307+
exportTimeoutMillis: env.INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS,
314308
}),
315309
],
316310
});
@@ -412,3 +406,23 @@ function parseInternalMetricsHeaders(): Record<string, string> | undefined {
412406
return;
413407
}
414408
}
409+
410+
function createMetricsExporter() {
411+
if (env.INTERNAL_OTEL_METRIC_EXPORTER_URL) {
412+
const headers = parseInternalMetricsHeaders() ?? {};
413+
414+
console.log(
415+
`🔦 Tracer: OTLP metric exporter enabled to ${
416+
env.INTERNAL_OTEL_METRIC_EXPORTER_URL
417+
} with headers: ${Object.keys(headers)}`
418+
);
419+
420+
return new OTLPMetricExporter({
421+
url: env.INTERNAL_OTEL_METRIC_EXPORTER_URL,
422+
timeoutMillis: 30_000,
423+
headers,
424+
});
425+
} else {
426+
return new ConsoleMetricExporter();
427+
}
428+
}

apps/webapp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
"@opentelemetry/core": "1.25.1",
6262
"@opentelemetry/exporter-logs-otlp-http": "0.52.1",
6363
"@opentelemetry/exporter-trace-otlp-http": "0.52.1",
64-
"@opentelemetry/exporter-metrics-otlp-http": "0.52.1",
64+
"@opentelemetry/exporter-metrics-otlp-proto": "0.52.1",
6565
"@opentelemetry/instrumentation": "0.52.1",
6666
"@opentelemetry/instrumentation-express": "^0.36.1",
6767
"@opentelemetry/instrumentation-http": "0.52.1",

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,14 @@ export class RunEngine {
122122
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
123123
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
124124
},
125-
masterQueueConsumersDisabled: options.queue?.masterQueueConsumersDisabled,
125+
shardCount: options.queue?.shardCount,
126+
masterQueueConsumersDisabled: options.worker.disabled,
126127
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
127128
meter: options.meter,
128129
});
129130

130131
this.worker = new Worker({
131-
name: "worker",
132+
name: "run-engine-worker",
132133
redisOptions: {
133134
...options.worker.redis,
134135
keyPrefix: `${options.worker.redis.keyPrefix}worker:`,

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,16 @@ import { Redis } from "@internal/redis";
55
import * as redlock from "redlock";
66
import { tryCatch } from "@trigger.dev/core";
77
import { Logger } from "@trigger.dev/core/logger";
8-
import { startSpan, Tracer, Meter, getMeter, ValueType, ObservableResult, Attributes, Histogram } from "@internal/tracing";
8+
import {
9+
startSpan,
10+
Tracer,
11+
Meter,
12+
getMeter,
13+
ValueType,
14+
ObservableResult,
15+
Attributes,
16+
Histogram,
17+
} from "@internal/tracing";
918

1019
const SemanticAttributes = {
1120
LOCK_TYPE: "run_engine.lock.type",
@@ -41,23 +50,17 @@ export class RunLocker {
4150
this.tracer = options.tracer;
4251
this.meter = options.meter ?? getMeter("run-engine");
4352

44-
const activeLocksObservableGauge = this.meter.createObservableGauge(
45-
"run_engine.locks.active",
46-
{
47-
description: "The number of active locks by type",
48-
unit: "1",
49-
valueType: ValueType.INT,
50-
}
51-
);
53+
const activeLocksObservableGauge = this.meter.createObservableGauge("run_engine.locks.active", {
54+
description: "The number of active locks by type",
55+
unit: "locks",
56+
valueType: ValueType.INT,
57+
});
5258

53-
const lockDurationHistogram = this.meter.createHistogram(
54-
"run_engine.lock.duration",
55-
{
56-
description: "The duration of lock operations",
57-
unit: "ms",
58-
valueType: ValueType.DOUBLE,
59-
}
60-
);
59+
const lockDurationHistogram = this.meter.createHistogram("run_engine.lock.duration", {
60+
description: "The duration of lock operations",
61+
unit: "ms",
62+
valueType: ValueType.DOUBLE,
63+
});
6164

6265
activeLocksObservableGauge.addCallback(this.#updateActiveLocksCount.bind(this));
6366
this.lockDurationHistogram = lockDurationHistogram;
@@ -108,10 +111,10 @@ export class RunLocker {
108111

109112
const [error, result] = await tryCatch(
110113
this.redlock.using(resources, duration, async (signal) => {
111-
const newContext: LockContext = {
112-
resources: joinedResources,
114+
const newContext: LockContext = {
115+
resources: joinedResources,
113116
signal,
114-
lockType: name
117+
lockType: name,
115118
};
116119

117120
// Track active lock

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
import { Callback, createRedisClient, Redis, Result, type RedisOptions } from "@internal/redis";
2-
import { startSpan, Tracer, Meter, getMeter, ValueType, ObservableResult, Attributes } from "@internal/tracing";
2+
import {
3+
startSpan,
4+
Tracer,
5+
Meter,
6+
getMeter,
7+
ValueType,
8+
ObservableResult,
9+
Attributes,
10+
} from "@internal/tracing";
311
import { Logger } from "@trigger.dev/core/logger";
412
import { z } from "zod";
513
import { setInterval } from "node:timers/promises";
@@ -100,16 +108,16 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
100108
"release_concurrency.releasings.length",
101109
{
102110
description: "Number of items in the releasings sorted set",
103-
unit: "1",
111+
unit: "items",
104112
valueType: ValueType.INT,
105113
}
106114
);
107115

108116
const masterQueueLengthGauge = this.meter.createObservableGauge(
109117
"release_concurrency.master_queue.length",
110118
{
111-
description: "Number of queues in the master queue sorted set",
112-
unit: "1",
119+
description: "Number of items in the master queue sorted set",
120+
unit: "items",
113121
valueType: ValueType.INT,
114122
}
115123
);

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ export type RunEngineOptions = {
3030
};
3131
queue: {
3232
redis: RedisOptions;
33-
masterQueueConsumersDisabled?: boolean;
33+
shardCount?: number;
3434
processWorkerQueueDebounceMs?: number;
3535
workerOptions?: WorkerConcurrencyOptions;
3636
retryOptions?: RetryOptions;

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ export class RunQueue {
137137
"runqueue.workerQueue.length",
138138
{
139139
description: "The number of messages in the worker queue",
140-
unit: "1",
140+
unit: "messages",
141141
valueType: ValueType.INT,
142142
}
143143
);
@@ -146,7 +146,7 @@ export class RunQueue {
146146
"runqueue.masterQueue.length",
147147
{
148148
description: "The number of queues in the master queue shard",
149-
unit: "1",
149+
unit: "queues",
150150
valueType: ValueType.INT,
151151
}
152152
);
@@ -791,8 +791,11 @@ export class RunQueue {
791791
projectId: string
792792
) {
793793
// Calculate the master queue shard for this environment
794-
const masterQueue = this.keys.masterQueueKeyForEnvironment(runtimeEnvironmentId, this.shardCount);
795-
794+
const masterQueue = this.keys.masterQueueKeyForEnvironment(
795+
runtimeEnvironmentId,
796+
this.shardCount
797+
);
798+
796799
// Use scanStream to find all matching members
797800
const stream = this.redis.zscanStream(masterQueue, {
798801
match: this.keys.queueKey(organizationId, projectId, "*", "*"),

packages/redis-worker/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
"lodash.omit": "^4.5.0",
2828
"nanoid": "^5.0.7",
2929
"p-limit": "^6.2.0",
30-
"prom-client": "^15.1.0",
3130
"zod": "3.23.8"
3231
},
3332
"devDependencies": {
@@ -52,4 +51,4 @@
5251
"require": "./dist/index.cjs"
5352
}
5453
}
55-
}
54+
}

0 commit comments

Comments
 (0)