diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 0c38d1bf44..ca8628c952 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -129,15 +129,22 @@ export class RunEngine { const keys = new RunQueueFullKeyProducer(); + const queueSelectionStrategyOptions = { + keys, + redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` }, + defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10, + ...options.queue?.queueSelectionStrategyOptions, + }; + + this.logger.log("RunEngine FairQueueSelectionStrategy queueSelectionStrategyOptions", { + options: queueSelectionStrategyOptions, + }); + this.runQueue = new RunQueue({ name: "rq", tracer: trace.getTracer("rq"), keys, - queueSelectionStrategy: new FairQueueSelectionStrategy({ - keys, - redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` }, - defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10, - }), + queueSelectionStrategy: new FairQueueSelectionStrategy(queueSelectionStrategyOptions), defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10, defaultEnvConcurrencyBurstFactor: options.queue?.defaultEnvConcurrencyBurstFactor, logger: new Logger("RunQueue", options.queue?.logLevel ?? "info"), @@ -1730,10 +1737,13 @@ export class RunEngine { }); if (!taskRun) { - this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found", { - runId, - snapshotId, - }); + this.logger.error( + "RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found", + { + runId, + snapshotId, + } + ); return; } diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 7a4066e706..5127ec3c75 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -1706,6 +1706,14 @@ export class RunQueue { const message = await this.#dequeueMessageFromKey(messageKey); if (!message) { + this.logger.error("Failed to dequeue message from worker queue", { + messageKey, + workerQueue, + workerQueueKey, + workerQueueLength, + service: this.name, + }); + return; }