Skip to content

Commit 36b0762

Browse files
authored
feat(metrics): add observable gauge for batch queue worker length (#2848)
1 parent f5b2ccb commit 36b0762

File tree

1 file changed

+20
-9
lines changed
  • internal-packages/run-engine/src/batch-queue

1 file changed

+20
-9
lines changed

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,24 @@
1-
import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis";
1+
import { createRedisClient, type Redis } from "@internal/redis";
22
import {
33
startSpan,
44
type Counter,
55
type Histogram,
66
type Meter,
7+
type ObservableGauge,
78
type Span,
89
type Tracer,
910
} from "@internal/tracing";
11+
import { Logger } from "@trigger.dev/core/logger";
1012
import {
11-
FairQueue,
12-
DRRScheduler,
13-
CallbackFairQueueKeyProducer,
14-
WorkerQueueManager,
1513
BatchedSpanManager,
14+
CallbackFairQueueKeyProducer,
15+
DRRScheduler,
16+
FairQueue,
1617
isAbortError,
18+
WorkerQueueManager,
1719
type FairQueueOptions,
18-
type StoredMessage,
1920
} from "@trigger.dev/redis-worker";
20-
import { Logger } from "@trigger.dev/core/logger";
21+
import { BatchCompletionTracker } from "./completionTracker.js";
2122
import type {
2223
BatchCompletionCallback,
2324
BatchItem,
@@ -29,10 +30,9 @@ import type {
2930
ProcessBatchItemCallback,
3031
} from "./types.js";
3132
import { BatchItemPayload as BatchItemPayloadSchema } from "./types.js";
32-
import { BatchCompletionTracker } from "./completionTracker.js";
3333

34-
export type { BatchQueueOptions, InitializeBatchOptions, CompleteBatchResult } from "./types.js";
3534
export { BatchCompletionTracker } from "./completionTracker.js";
35+
export type { BatchQueueOptions, CompleteBatchResult, InitializeBatchOptions } from "./types.js";
3636

3737
/**
3838
* BatchQueue manages batch trigger processing with fair scheduling using
@@ -84,6 +84,7 @@ export class BatchQueue {
8484
private batchCompletedCounter?: Counter;
8585
private batchProcessingDurationHistogram?: Histogram;
8686
private itemQueueTimeHistogram?: Histogram;
87+
private workerQueueLengthGauge?: ObservableGauge;
8788

8889
constructor(private options: BatchQueueOptions) {
8990
this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info");
@@ -590,6 +591,16 @@ export class BatchQueue {
590591
description: "Time from item enqueue to processing start",
591592
unit: "ms",
592593
});
594+
595+
this.workerQueueLengthGauge = meter.createObservableGauge("batch_queue.worker_queue.length", {
596+
description: "Number of items waiting in the batch worker queue",
597+
unit: "items",
598+
});
599+
600+
this.workerQueueLengthGauge.addCallback(async (observableResult) => {
601+
const length = await this.workerQueueManager.getLength(BATCH_WORKER_QUEUE_ID);
602+
observableResult.observe(length);
603+
});
593604
}
594605

595606
// ============================================================================

0 commit comments

Comments
 (0)