Skip to content

Commit 1a1e70a

Browse files
committed
WIP deployed processKeepAlive
1 parent 19bdf3b commit 1a1e70a

File tree

6 files changed

+252
-35
lines changed

6 files changed

+252
-35
lines changed

packages/cli-v3/src/entryPoints/managed-index-worker.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,12 @@ await sendMessageInCatalog(
159159
loaderEntryPoint: buildManifest.loaderEntryPoint,
160160
customConditions: buildManifest.customConditions,
161161
initEntryPoint: buildManifest.initEntryPoint,
162+
processKeepAlive:
163+
typeof config.experimental_processKeepAlive === "object"
164+
? config.experimental_processKeepAlive
165+
: typeof config.experimental_processKeepAlive === "boolean"
166+
? { enabled: config.experimental_processKeepAlive }
167+
: undefined,
162168
timings,
163169
},
164170
importErrors,

packages/cli-v3/src/entryPoints/managed/controller.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { RunnerEnv } from "./env.js";
1111
import { ManagedRunLogger, RunLogger, SendDebugLogOptions } from "./logger.js";
1212
import { EnvObject } from "std-env";
1313
import { RunExecution } from "./execution.js";
14+
import { TaskRunProcessProvider } from "./taskRunProcessProvider.js";
1415
import { tryCatch } from "@trigger.dev/core/utils";
1516

1617
type ManagedRunControllerOptions = {
@@ -27,6 +28,7 @@ export class ManagedRunController {
2728
private readonly warmStartClient: WarmStartClient | undefined;
2829
private socket: SupervisorSocket;
2930
private readonly logger: RunLogger;
31+
private readonly taskRunProcessProvider: TaskRunProcessProvider;
3032

3133
private warmStartCount = 0;
3234
private restoreCount = 0;
@@ -36,11 +38,17 @@ export class ManagedRunController {
3638

3739
private currentExecution: RunExecution | null = null;
3840

41+
private processKeepAliveEnabled: boolean;
42+
private processKeepAliveMaxExecutionCount: number;
43+
3944
constructor(opts: ManagedRunControllerOptions) {
4045
const env = new RunnerEnv(opts.env);
4146
this.env = env;
4247

4348
this.workerManifest = opts.workerManifest;
49+
this.processKeepAliveEnabled = opts.workerManifest.processKeepAlive?.enabled ?? false;
50+
this.processKeepAliveMaxExecutionCount =
51+
opts.workerManifest.processKeepAlive?.maxExecutionsPerProcess ?? 100;
4452

4553
this.httpClient = new WorkloadHttpClient({
4654
workerApiUrl: this.workerApiUrl,
@@ -55,6 +63,15 @@ export class ManagedRunController {
5563
env,
5664
});
5765

66+
// Create the TaskRunProcessProvider
67+
this.taskRunProcessProvider = new TaskRunProcessProvider({
68+
workerManifest: this.workerManifest,
69+
env: this.env,
70+
logger: this.logger,
71+
processKeepAliveEnabled: this.processKeepAliveEnabled,
72+
processKeepAliveMaxExecutionCount: this.processKeepAliveMaxExecutionCount,
73+
});
74+
5875
const properties = {
5976
...env.raw,
6077
TRIGGER_POD_SCHEDULED_AT_MS: env.TRIGGER_POD_SCHEDULED_AT_MS.toISOString(),
@@ -96,6 +113,7 @@ export class ManagedRunController {
96113
restoreCount: this.restoreCount,
97114
notificationCount: this.notificationCount,
98115
lastNotificationAt: this.lastNotificationAt,
116+
...this.taskRunProcessProvider.metrics,
99117
};
100118
}
101119

@@ -203,6 +221,7 @@ export class ManagedRunController {
203221
httpClient: this.httpClient,
204222
logger: this.logger,
205223
supervisorSocket: this.socket,
224+
taskRunProcessProvider: this.taskRunProcessProvider,
206225
});
207226
}
208227

@@ -298,6 +317,7 @@ export class ManagedRunController {
298317
httpClient: this.httpClient,
299318
logger: this.logger,
300319
supervisorSocket: this.socket,
320+
taskRunProcessProvider: this.taskRunProcessProvider,
301321
}).prepareForExecution({
302322
taskRunEnv: previousTaskRunEnv,
303323
});
@@ -395,6 +415,7 @@ export class ManagedRunController {
395415
});
396416

397417
this.currentExecution?.kill().catch(() => {});
418+
this.taskRunProcessProvider.cleanup();
398419

399420
process.exit(code);
400421
}
@@ -534,6 +555,9 @@ export class ManagedRunController {
534555
});
535556
}
536557

558+
// Cleanup the task run process provider
559+
this.taskRunProcessProvider.cleanup();
560+
537561
// Close the socket
538562
this.socket.close();
539563
}

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { randomBytes } from "node:crypto";
2222
import { SnapshotManager, SnapshotState } from "./snapshot.js";
2323
import type { SupervisorSocket } from "./controller.js";
2424
import { RunNotifier } from "./notifier.js";
25+
import { TaskRunProcessProvider } from "./taskRunProcessProvider.js";
2526

2627
class ExecutionAbortError extends Error {
2728
constructor(message: string) {
@@ -36,6 +37,7 @@ type RunExecutionOptions = {
3637
httpClient: WorkloadHttpClient;
3738
logger: RunLogger;
3839
supervisorSocket: SupervisorSocket;
40+
taskRunProcessProvider: TaskRunProcessProvider;
3941
};
4042

4143
type RunExecutionPrepareOptions = {
@@ -77,6 +79,7 @@ export class RunExecution {
7779
private supervisorSocket: SupervisorSocket;
7880
private notifier?: RunNotifier;
7981
private metadataClient?: MetadataClient;
82+
private taskRunProcessProvider: TaskRunProcessProvider;
8083

8184
constructor(opts: RunExecutionOptions) {
8285
this.id = randomBytes(4).toString("hex");
@@ -85,6 +88,7 @@ export class RunExecution {
8588
this.httpClient = opts.httpClient;
8689
this.logger = opts.logger;
8790
this.supervisorSocket = opts.supervisorSocket;
91+
this.taskRunProcessProvider = opts.taskRunProcessProvider;
8892

8993
this.restoreCount = 0;
9094
this.executionAbortController = new AbortController();
@@ -131,40 +135,21 @@ export class RunExecution {
131135
throw new Error("prepareForExecution called after process was already created");
132136
}
133137

134-
this.taskRunProcess = this.createTaskRunProcess({
135-
envVars: opts.taskRunEnv,
138+
this.taskRunProcess = this.taskRunProcessProvider.getProcess({
139+
taskRunEnv: opts.taskRunEnv,
136140
isWarmStart: true,
137141
});
138142

143+
// Attach event handlers to the process
144+
this.attachTaskRunProcessHandlers(this.taskRunProcess);
145+
139146
return this;
140147
}
141148

142-
private createTaskRunProcess({
143-
envVars,
144-
isWarmStart,
145-
}: {
146-
envVars: Record<string, string>;
147-
isWarmStart?: boolean;
148-
}) {
149-
const taskRunProcess = new TaskRunProcess({
150-
workerManifest: this.workerManifest,
151-
env: {
152-
...envVars,
153-
...this.env.gatherProcessEnv(),
154-
HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000),
155-
},
156-
serverWorker: {
157-
id: "managed",
158-
contentHash: this.env.TRIGGER_CONTENT_HASH,
159-
version: this.env.TRIGGER_DEPLOYMENT_VERSION,
160-
engine: "V2",
161-
},
162-
machineResources: {
163-
cpu: Number(this.env.TRIGGER_MACHINE_CPU),
164-
memory: Number(this.env.TRIGGER_MACHINE_MEMORY),
165-
},
166-
isWarmStart,
167-
}).initialize();
149+
private attachTaskRunProcessHandlers(taskRunProcess: TaskRunProcess): void {
150+
taskRunProcess.onTaskRunHeartbeat.detach();
151+
taskRunProcess.onSendDebugLog.detach();
152+
taskRunProcess.onSetSuspendable.detach();
168153

169154
taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
170155
if (!this.runFriendlyId) {
@@ -194,8 +179,6 @@ export class RunExecution {
194179
taskRunProcess.onSetSuspendable.attach(async ({ suspendable }) => {
195180
this.suspendable = suspendable;
196181
});
197-
198-
return taskRunProcess;
199182
}
200183

201184
/**
@@ -586,10 +569,11 @@ export class RunExecution {
586569

587570
// To skip this step and eagerly create the task run process, run prepareForExecution first
588571
if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) {
589-
this.taskRunProcess = this.createTaskRunProcess({
590-
envVars: { ...envVars, TRIGGER_PROJECT_REF: execution.project.ref },
572+
this.taskRunProcess = this.taskRunProcessProvider.getProcess({
573+
taskRunEnv: { ...envVars, TRIGGER_PROJECT_REF: execution.project.ref },
591574
isWarmStart,
592575
});
576+
this.attachTaskRunProcessHandlers(this.taskRunProcess);
593577
}
594578

595579
this.sendDebugLog("executing task run process", { runId: execution.run.id });
@@ -619,7 +603,10 @@ export class RunExecution {
619603
// If we get here, the task completed normally
620604
this.sendDebugLog("completed run attempt", { attemptSuccess: completion.ok });
621605

622-
// The execution has finished, so we can cleanup the task run process. Killing it should be safe.
606+
// Return the process to the provider for potential reuse
607+
this.taskRunProcessProvider.returnProcess(this.taskRunProcess);
608+
609+
// The execution has finished, so we can cleanup the task run process if not being reused
623610
const [error] = await tryCatch(this.taskRunProcess.cleanup(true));
624611

625612
if (error) {

0 commit comments

Comments
 (0)