Skip to content

Commit 19bdf3b

Browse files
committed
scope the pool by background version, allow configuring pool params in dev/prod
1 parent 427c450 commit 19bdf3b

File tree

7 files changed

+205
-46
lines changed

7 files changed

+205
-46
lines changed

packages/cli-v3/src/dev/devSupervisor.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,17 @@ class DevSupervisor implements WorkerRuntime {
117117
enableProcessReuse:
118118
typeof this.options.config.experimental_processKeepAlive === "boolean"
119119
? this.options.config.experimental_processKeepAlive
120+
: typeof this.options.config.experimental_processKeepAlive === "object"
121+
? this.options.config.experimental_processKeepAlive.enabled
120122
: false,
121-
maxPoolSize: 3,
122-
maxExecutionsPerProcess: 50,
123+
maxPoolSize:
124+
typeof this.options.config.experimental_processKeepAlive === "object"
125+
? this.options.config.experimental_processKeepAlive.devMaxPoolSize ?? 25
126+
: 25,
127+
maxExecutionsPerProcess:
128+
typeof this.options.config.experimental_processKeepAlive === "object"
129+
? this.options.config.experimental_processKeepAlive.maxExecutionsPerProcess ?? 50
130+
: 50,
123131
});
124132

125133
this.socket = this.#createSocket();
@@ -615,6 +623,10 @@ class DevSupervisor implements WorkerRuntime {
615623
return;
616624
}
617625

626+
if (worker.serverWorker?.version) {
627+
this.taskRunProcessPool?.deprecateVersion(worker.serverWorker?.version);
628+
}
629+
618630
if (this.#workerHasInProgressRuns(friendlyId)) {
619631
return;
620632
}

packages/cli-v3/src/dev/taskRunProcessPool.ts

Lines changed: 156 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,48 +15,88 @@ export type TaskRunProcessPoolOptions = {
1515
};
1616

1717
export class TaskRunProcessPool {
18-
private availableProcesses: TaskRunProcess[] = [];
19-
private busyProcesses: Set<TaskRunProcess> = new Set();
18+
// Group processes by worker version
19+
private availableProcessesByVersion: Map<string, TaskRunProcess[]> = new Map();
20+
private busyProcessesByVersion: Map<string, Set<TaskRunProcess>> = new Map();
2021
private readonly options: TaskRunProcessPoolOptions;
2122
private readonly maxPoolSize: number;
2223
private readonly maxExecutionsPerProcess: number;
24+
private readonly executionCountsPerProcess: Map<number, number> = new Map();
25+
private readonly deprecatedVersions: Set<string> = new Set();
2326

2427
constructor(options: TaskRunProcessPoolOptions) {
2528
this.options = options;
2629
this.maxPoolSize = options.maxPoolSize ?? 3;
2730
this.maxExecutionsPerProcess = options.maxExecutionsPerProcess ?? 50;
2831
}
2932

33+
deprecateVersion(version: string) {
34+
this.deprecatedVersions.add(version);
35+
36+
logger.debug("[TaskRunProcessPool] Deprecating version", { version });
37+
38+
const versionProcesses = this.availableProcessesByVersion.get(version) || [];
39+
40+
const processesToKill = versionProcesses.filter((process) => !process.isExecuting());
41+
Promise.all(processesToKill.map((process) => this.killProcess(process))).then(() => {
42+
this.availableProcessesByVersion.delete(version);
43+
});
44+
}
45+
3046
async getProcess(
3147
workerManifest: WorkerManifest,
3248
serverWorker: ServerBackgroundWorker,
3349
machineResources: MachinePresetResources,
3450
env?: Record<string, string>
3551
): Promise<{ taskRunProcess: TaskRunProcess; isReused: boolean }> {
52+
const version = serverWorker.version || "unknown";
53+
3654
// Try to reuse an existing process if enabled
3755
if (this.options.enableProcessReuse) {
38-
const reusableProcess = this.findReusableProcess();
56+
const reusableProcess = this.findReusableProcess(version);
3957
if (reusableProcess) {
58+
const availableCount = this.availableProcessesByVersion.get(version)?.length || 0;
59+
const busyCount = this.busyProcessesByVersion.get(version)?.size || 0;
60+
4061
logger.debug("[TaskRunProcessPool] Reusing existing process", {
41-
availableCount: this.availableProcesses.length,
42-
busyCount: this.busyProcesses.size,
62+
version,
63+
availableCount,
64+
busyCount,
4365
});
4466

45-
this.availableProcesses = this.availableProcesses.filter((p) => p !== reusableProcess);
46-
this.busyProcesses.add(reusableProcess);
67+
// Remove from available and add to busy for this version
68+
const availableProcesses = this.availableProcessesByVersion.get(version) || [];
69+
this.availableProcessesByVersion.set(
70+
version,
71+
availableProcesses.filter((p) => p !== reusableProcess)
72+
);
73+
74+
if (!this.busyProcessesByVersion.has(version)) {
75+
this.busyProcessesByVersion.set(version, new Set());
76+
}
77+
this.busyProcessesByVersion.get(version)!.add(reusableProcess);
78+
4779
return { taskRunProcess: reusableProcess, isReused: true };
4880
} else {
81+
const availableCount = this.availableProcessesByVersion.get(version)?.length || 0;
82+
const busyCount = this.busyProcessesByVersion.get(version)?.size || 0;
83+
4984
logger.debug("[TaskRunProcessPool] No reusable process found", {
50-
availableCount: this.availableProcesses.length,
51-
busyCount: this.busyProcesses.size,
85+
version,
86+
availableCount,
87+
busyCount,
5288
});
5389
}
5490
}
5591

5692
// Create new process
93+
const availableCount = this.availableProcessesByVersion.get(version)?.length || 0;
94+
const busyCount = this.busyProcessesByVersion.get(version)?.size || 0;
95+
5796
logger.debug("[TaskRunProcessPool] Creating new process", {
58-
availableCount: this.availableProcesses.length,
59-
busyCount: this.busyProcesses.size,
97+
version,
98+
availableCount,
99+
busyCount,
60100
});
61101

62102
const newProcess = new TaskRunProcess({
@@ -70,60 +110,99 @@ export class TaskRunProcessPool {
70110
cwd: this.options.cwd,
71111
}).initialize();
72112

73-
this.busyProcesses.add(newProcess);
113+
// Add to busy processes for this version
114+
if (!this.busyProcessesByVersion.has(version)) {
115+
this.busyProcessesByVersion.set(version, new Set());
116+
}
117+
this.busyProcessesByVersion.get(version)!.add(newProcess);
118+
74119
return { taskRunProcess: newProcess, isReused: false };
75120
}
76121

77-
async returnProcess(process: TaskRunProcess): Promise<void> {
78-
this.busyProcesses.delete(process);
122+
async returnProcess(process: TaskRunProcess, version: string): Promise<void> {
123+
// Remove from busy processes for this version
124+
const busyProcesses = this.busyProcessesByVersion.get(version);
125+
if (busyProcesses) {
126+
busyProcesses.delete(process);
127+
}
128+
129+
if (process.pid) {
130+
this.executionCountsPerProcess.set(
131+
process.pid,
132+
(this.executionCountsPerProcess.get(process.pid) ?? 0) + 1
133+
);
134+
}
135+
136+
if (this.shouldReuseProcess(process, version)) {
137+
const availableCount = this.availableProcessesByVersion.get(version)?.length || 0;
138+
const busyCount = this.busyProcessesByVersion.get(version)?.size || 0;
79139

80-
if (this.shouldReuseProcess(process)) {
81140
logger.debug("[TaskRunProcessPool] Returning process to pool", {
82-
availableCount: this.availableProcesses.length,
83-
busyCount: this.busyProcesses.size,
141+
version,
142+
availableCount,
143+
busyCount,
84144
});
85145

86146
// Clean up but don't kill the process
87147
try {
88148
await process.cleanup(false);
89-
this.availableProcesses.push(process);
149+
150+
// Add to available processes for this version
151+
if (!this.availableProcessesByVersion.has(version)) {
152+
this.availableProcessesByVersion.set(version, []);
153+
}
154+
this.availableProcessesByVersion.get(version)!.push(process);
90155
} catch (error) {
91156
logger.debug("[TaskRunProcessPool] Failed to cleanup process for reuse, killing it", {
92157
error,
93158
});
94159
await this.killProcess(process);
95160
}
96161
} else {
162+
const availableCount = this.availableProcessesByVersion.get(version)?.length || 0;
163+
const busyCount = this.busyProcessesByVersion.get(version)?.size || 0;
164+
97165
logger.debug("[TaskRunProcessPool] Killing process", {
98-
availableCount: this.availableProcesses.length,
99-
busyCount: this.busyProcesses.size,
166+
version,
167+
availableCount,
168+
busyCount,
100169
});
101170
await this.killProcess(process);
102171
}
103172
}
104173

105-
private findReusableProcess(): TaskRunProcess | undefined {
106-
return this.availableProcesses.find((process) => this.isProcessHealthy(process));
174+
private findReusableProcess(version: string): TaskRunProcess | undefined {
175+
const availableProcesses = this.availableProcessesByVersion.get(version) || [];
176+
return availableProcesses.find((process) => this.isProcessHealthy(process));
107177
}
108178

109-
private shouldReuseProcess(process: TaskRunProcess): boolean {
179+
private shouldReuseProcess(process: TaskRunProcess, version: string): boolean {
110180
const isHealthy = this.isProcessHealthy(process);
111181
const isBeingKilled = process.isBeingKilled;
112182
const pid = process.pid;
183+
const executionCount = this.executionCountsPerProcess.get(pid ?? 0) ?? 0;
184+
const availableCount = this.availableProcessesByVersion.get(version)?.length || 0;
185+
const busyCount = this.busyProcessesByVersion.get(version)?.size || 0;
186+
const isDeprecated = this.deprecatedVersions.has(version);
113187

114188
logger.debug("[TaskRunProcessPool] Checking if process should be reused", {
189+
version,
115190
isHealthy,
116191
isBeingKilled,
117192
pid,
118-
availableCount: this.availableProcesses.length,
119-
busyCount: this.busyProcesses.size,
193+
availableCount,
194+
busyCount,
120195
maxPoolSize: this.maxPoolSize,
196+
executionCount,
197+
isDeprecated,
121198
});
122199

123200
return (
124201
this.options.enableProcessReuse &&
125202
this.isProcessHealthy(process) &&
126-
this.availableProcesses.length < this.maxPoolSize
203+
availableCount < this.maxPoolSize &&
204+
executionCount < this.maxExecutionsPerProcess &&
205+
!isDeprecated
127206
);
128207
}
129208

@@ -141,25 +220,65 @@ export class TaskRunProcessPool {
141220
}
142221

143222
async shutdown(): Promise<void> {
223+
const totalAvailable = Array.from(this.availableProcessesByVersion.values()).reduce(
224+
(sum, processes) => sum + processes.length,
225+
0
226+
);
227+
const totalBusy = Array.from(this.busyProcessesByVersion.values()).reduce(
228+
(sum, processes) => sum + processes.size,
229+
0
230+
);
231+
144232
logger.debug("[TaskRunProcessPool] Shutting down pool", {
145-
availableCount: this.availableProcesses.length,
146-
busyCount: this.busyProcesses.size,
233+
availableCount: totalAvailable,
234+
busyCount: totalBusy,
235+
versions: Array.from(this.availableProcessesByVersion.keys()),
147236
});
148237

149-
// Kill all available processes
150-
await Promise.all(this.availableProcesses.map((process) => this.killProcess(process)));
151-
this.availableProcesses = [];
238+
// Kill all available processes across all versions
239+
const allAvailableProcesses = Array.from(this.availableProcessesByVersion.values()).flat();
240+
await Promise.all(allAvailableProcesses.map((process) => this.killProcess(process)));
241+
this.availableProcessesByVersion.clear();
152242

153-
// Kill all busy processes
154-
await Promise.all(Array.from(this.busyProcesses).map((process) => this.killProcess(process)));
155-
this.busyProcesses.clear();
243+
// Kill all busy processes across all versions
244+
const allBusyProcesses = Array.from(this.busyProcessesByVersion.values())
245+
.map((processSet) => Array.from(processSet))
246+
.flat();
247+
await Promise.all(allBusyProcesses.map((process) => this.killProcess(process)));
248+
this.busyProcessesByVersion.clear();
156249
}
157250

158251
getStats() {
252+
const totalAvailable = Array.from(this.availableProcessesByVersion.values()).reduce(
253+
(sum, processes) => sum + processes.length,
254+
0
255+
);
256+
const totalBusy = Array.from(this.busyProcessesByVersion.values()).reduce(
257+
(sum, processes) => sum + processes.size,
258+
0
259+
);
260+
261+
const statsByVersion: Record<string, { available: number; busy: number }> = {};
262+
for (const [version, processes] of this.availableProcessesByVersion.entries()) {
263+
statsByVersion[version] = {
264+
available: processes.length,
265+
busy: this.busyProcessesByVersion.get(version)?.size || 0,
266+
};
267+
}
268+
for (const [version, processes] of this.busyProcessesByVersion.entries()) {
269+
if (!statsByVersion[version]) {
270+
statsByVersion[version] = {
271+
available: 0,
272+
busy: processes.size,
273+
};
274+
}
275+
}
276+
159277
return {
160-
availableCount: this.availableProcesses.length,
161-
busyCount: this.busyProcesses.size,
162-
totalCount: this.availableProcesses.length + this.busyProcesses.size,
278+
availableCount: totalAvailable,
279+
busyCount: totalBusy,
280+
totalCount: totalAvailable + totalBusy,
281+
byVersion: statsByVersion,
163282
};
164283
}
165284
}

packages/cli-v3/src/entryPoints/dev-run-controller.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,7 @@ export class DevRunController {
620620
{
621621
TRIGGER_WORKER_MANIFEST_PATH: join(this.opts.worker.build.outputPath, "index.json"),
622622
RUN_WORKER_SHOW_LOGS: this.opts.logLevel === "debug" ? "true" : "false",
623+
TRIGGER_WORKER_VERSION: this.opts.worker.serverWorker?.version,
623624
}
624625
);
625626

@@ -653,7 +654,8 @@ export class DevRunController {
653654

654655
// Return process to pool instead of killing it
655656
try {
656-
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess);
657+
const version = this.opts.worker.serverWorker?.version || "unknown";
658+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version);
657659
this.taskRunProcess = undefined;
658660
} catch (error) {
659661
logger.debug("Failed to return task run process to pool, submitting completion anyway", {
@@ -774,7 +776,8 @@ export class DevRunController {
774776
// Return the process to the pool instead of killing it directly
775777
if (this.taskRunProcess) {
776778
try {
777-
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess);
779+
const version = this.opts.worker.serverWorker?.version || "unknown";
780+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version);
778781
this.taskRunProcess = undefined;
779782
} catch (error) {
780783
logger.debug("Failed to return task run process to pool during runFinished", { error });
@@ -810,7 +813,8 @@ export class DevRunController {
810813

811814
if (this.taskRunProcess && !this.taskRunProcess.isBeingKilled) {
812815
try {
813-
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess);
816+
const version = this.opts.worker.serverWorker?.version || "unknown";
817+
await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version);
814818
this.taskRunProcess = undefined;
815819
} catch (error) {
816820
logger.debug("Failed to return task run process to pool during stop", { error });

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ process.on("uncaughtException", function (error, origin) {
101101
}
102102
});
103103

104+
process.title = `trigger-dev-run-worker (${
105+
getEnvVar("TRIGGER_WORKER_VERSION") ?? "unknown version"
106+
})`;
107+
104108
const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS");
105109

106110
const standardLocalsManager = new StandardLocalsManager();
@@ -605,8 +609,6 @@ async function flushMetadata(timeoutInMs: number = 10_000) {
605609
const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs);
606610
runtime.setGlobalRuntimeManager(sharedWorkerRuntime);
607611

608-
process.title = "trigger-managed-worker";
609-
610612
const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
611613

612614
for await (const _ of setInterval(heartbeatInterval)) {

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ export class TaskRunProcess {
150150
PATH: process.env.PATH,
151151
TRIGGER_PROCESS_FORK_START_TIME: String(Date.now()),
152152
TRIGGER_WARM_START: this.options.isWarmStart ? "true" : "false",
153+
TRIGGER: "1",
153154
};
154155

155156
logger.debug(`initializing task run process`, {
@@ -286,6 +287,10 @@ export class TaskRunProcess {
286287
return result;
287288
}
288289

290+
isExecuting() {
291+
return this._currentExecution !== undefined;
292+
}
293+
289294
waitpointCompleted(waitpoint: CompletedWaitpoint) {
290295
if (!this._child?.connected || this._isBeingKilled || this._child.killed) {
291296
console.error(

0 commit comments

Comments
 (0)