diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index c066df9150..6ceac2ac6b 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -1,23 +1,24 @@ -import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis"; +import { createRedisClient, type Redis } from "@internal/redis"; import { startSpan, type Counter, type Histogram, type Meter, + type ObservableGauge, type Span, type Tracer, } from "@internal/tracing"; +import { Logger } from "@trigger.dev/core/logger"; import { - FairQueue, - DRRScheduler, - CallbackFairQueueKeyProducer, - WorkerQueueManager, BatchedSpanManager, + CallbackFairQueueKeyProducer, + DRRScheduler, + FairQueue, isAbortError, + WorkerQueueManager, type FairQueueOptions, - type StoredMessage, } from "@trigger.dev/redis-worker"; -import { Logger } from "@trigger.dev/core/logger"; +import { BatchCompletionTracker } from "./completionTracker.js"; import type { BatchCompletionCallback, BatchItem, @@ -29,10 +30,9 @@ import type { ProcessBatchItemCallback, } from "./types.js"; import { BatchItemPayload as BatchItemPayloadSchema } from "./types.js"; -import { BatchCompletionTracker } from "./completionTracker.js"; -export type { BatchQueueOptions, InitializeBatchOptions, CompleteBatchResult } from "./types.js"; export { BatchCompletionTracker } from "./completionTracker.js"; +export type { BatchQueueOptions, CompleteBatchResult, InitializeBatchOptions } from "./types.js"; /** * BatchQueue manages batch trigger processing with fair scheduling using @@ -84,6 +84,7 @@ export class BatchQueue { private batchCompletedCounter?: Counter; private batchProcessingDurationHistogram?: Histogram; private itemQueueTimeHistogram?: Histogram; + private workerQueueLengthGauge?: ObservableGauge; constructor(private options: BatchQueueOptions) { this.logger = options.logger ?? new Logger("BatchQueue", options.logLevel ?? "info"); @@ -590,6 +591,16 @@ export class BatchQueue { description: "Time from item enqueue to processing start", unit: "ms", }); + + this.workerQueueLengthGauge = meter.createObservableGauge("batch_queue.worker_queue.length", { + description: "Number of items waiting in the batch worker queue", + unit: "items", + }); + + this.workerQueueLengthGauge.addCallback(async (observableResult) => { + const length = await this.workerQueueManager.getLength(BATCH_WORKER_QUEUE_ID); + observableResult.observe(length); + }); } // ============================================================================