Skip to content

Commit 59c17e0

Browse files
authored
feat(run-engine): worker queue resolver (#2476)
1 parent ed23615 commit 59c17e0

File tree

3 files changed

+590
-13
lines changed

3 files changed

+590
-13
lines changed

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import {
4242
RunQueueKeyProducer,
4343
RunQueueSelectionStrategy,
4444
} from "./types.js";
45+
import { WorkerQueueResolver } from "./workerQueueResolver.js";
4546

4647
const SemanticAttributes = {
4748
QUEUE: "runqueue.queue",
@@ -169,6 +170,7 @@ export class RunQueue {
169170
private shardCount: number;
170171
private abortController: AbortController;
171172
private worker: Worker<typeof workerCatalog>;
173+
private workerQueueResolver: WorkerQueueResolver;
172174
private _observableWorkerQueues: Set<string> = new Set();
173175
private _meter: Meter;
174176
private _queueCooloffStates: Map<string, QueueCooloffState> = new Map();
@@ -185,6 +187,8 @@ export class RunQueue {
185187
},
186188
});
187189
this.logger = options.logger ?? new Logger("RunQueue", options.logLevel ?? "info");
190+
191+
this.workerQueueResolver = new WorkerQueueResolver({ logger: this.logger });
188192
this._meter = options.meter ?? getMeter("run-queue");
189193

190194
const workerQueueObservableGauge = this._meter.createObservableGauge(
@@ -1845,19 +1849,8 @@ export class RunQueue {
18451849
);
18461850
}
18471851

1848-
#getWorkerQueueFromMessage(message: OutputPayload) {
1849-
if (message.version === "2") {
1850-
return message.workerQueue;
1851-
}
1852-
1853-
// In v2, if the environment is development, the worker queue is the environment id.
1854-
if (message.environmentType === "DEVELOPMENT") {
1855-
return message.environmentId;
1856-
}
1857-
1858-
// In v1, the master queue is something like us-nyc-3,
1859-
// which in v2 is the worker queue.
1860-
return message.masterQueues[0];
1852+
#getWorkerQueueFromMessage(message: OutputPayload): string {
1853+
return this.workerQueueResolver.getWorkerQueueFromMessage(message);
18611854
}
18621855

18631856
#createBlockingDequeueClient() {

0 commit comments

Comments
 (0)