Skip to content

Commit e7b6c2f

Browse files
authored
Merge pull request #2183 from triggerdotdev/process-keep-alive
v4: Experimental process keep alive
2 parents d619e79 + 0deb2c6 commit e7b6c2f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1732
-514
lines changed

.changeset/polite-badgers-suffer.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"trigger.dev": patch
3+
---
4+
5+
experimental processKeepAlive

apps/supervisor/src/util.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
import { isMacOS, isWindows } from "std-env";
22

3+
export function normalizeDockerHostUrl(url: string) {
4+
const $url = new URL(url);
5+
6+
if ($url.hostname === "localhost") {
7+
$url.hostname = getDockerHostDomain();
8+
}
9+
10+
return $url.toString();
11+
}
12+
313
export function getDockerHostDomain() {
414
return isMacOS || isWindows ? "host.docker.internal" : "localhost";
515
}

apps/supervisor/src/workloadManager/docker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
type WorkloadManagerOptions,
66
} from "./types.js";
77
import { env } from "../env.js";
8-
import { getDockerHostDomain, getRunnerId } from "../util.js";
8+
import { getDockerHostDomain, getRunnerId, normalizeDockerHostUrl } from "../util.js";
99
import Docker from "dockerode";
1010
import { tryCatch } from "@trigger.dev/core";
1111

@@ -78,7 +78,7 @@ export class DockerWorkloadManager implements WorkloadManager {
7878
];
7979

8080
if (this.opts.warmStartUrl) {
81-
envVars.push(`TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`);
81+
envVars.push(`TRIGGER_WARM_START_URL=${normalizeDockerHostUrl(this.opts.warmStartUrl)}`);
8282
}
8383

8484
if (this.opts.metadataUrl) {

apps/webapp/app/utils/timelineSpanEvents.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,6 @@ function getFriendlyNameForEvent(event: string, properties?: Record<string, any>
142142
return "Attempt created";
143143
}
144144
case "import": {
145-
if (properties && typeof properties.file === "string") {
146-
return `Importing ${properties.file}`;
147-
}
148145
return "Importing task file";
149146
}
150147
case "lazy_payload": {

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 180 additions & 78 deletions
Large diffs are not rendered by default.

apps/webapp/test/timelineSpanEvents.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
7676
expect(result.some((event) => event.name === "Dequeued")).toBe(true);
7777
expect(result.some((event) => event.name === "Launched")).toBe(true);
7878
expect(result.some((event) => event.name === "Attempt created")).toBe(true);
79-
expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true);
79+
expect(result.some((event) => event.name === "Importing task file")).toBe(true);
8080
});
8181

8282
test("should sort events by timestamp", () => {
@@ -86,7 +86,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
8686
expect(result[0].name).toBe("Dequeued");
8787
expect(result[1].name).toBe("Attempt created");
8888
expect(result[2].name).toBe("Launched");
89-
expect(result[3].name).toBe("Importing src/trigger/chat.ts");
89+
expect(result[3].name).toBe("Importing task file");
9090
});
9191

9292
test("should calculate offsets correctly from the first event", () => {
@@ -176,7 +176,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
176176
expect(result.find((e) => e.name === "Attempt created")?.helpText).toBe(
177177
"An attempt was created for the run"
178178
);
179-
expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.helpText).toBe(
179+
expect(result.find((e) => e.name === "Importing task file")?.helpText).toBe(
180180
"A task file was imported"
181181
);
182182
});
@@ -187,7 +187,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
187187
expect(result.find((e) => e.name === "Dequeued")?.duration).toBe(0);
188188
expect(result.find((e) => e.name === "Launched")?.duration).toBe(127);
189189
expect(result.find((e) => e.name === "Attempt created")?.duration).toBe(56);
190-
expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.duration).toBe(67);
190+
expect(result.find((e) => e.name === "Importing task file")?.duration).toBe(67);
191191
});
192192

193193
test("should use fallback name for import event without file property", () => {
@@ -214,7 +214,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
214214
// Without fork event, import should also be visible for non-admins
215215
expect(result.length).toBe(2);
216216
expect(result.some((event) => event.name === "Dequeued")).toBe(true);
217-
expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true);
217+
expect(result.some((event) => event.name === "Importing task file")).toBe(true);
218218

219219
// create_attempt should still be admin-only
220220
expect(result.some((event) => event.name === "Attempt created")).toBe(false);

internal-packages/run-engine/vitest.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export default defineConfig({
1111
singleThread: true,
1212
},
1313
},
14-
testTimeout: 60_000,
14+
testTimeout: 120_000,
1515
coverage: {
1616
provider: "v8",
1717
},

packages/cli-v3/src/dev/devSupervisor.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import {
2525
import pLimit from "p-limit";
2626
import { resolveLocalEnvVars } from "../utilities/localEnvVars.js";
2727
import type { Metafile } from "esbuild";
28+
import { TaskRunProcessPool } from "./taskRunProcessPool.js";
29+
import { tryCatch } from "@trigger.dev/core/utils";
2830

2931
export type WorkerRuntimeOptions = {
3032
name: string | undefined;
@@ -67,6 +69,7 @@ class DevSupervisor implements WorkerRuntime {
6769
private socketConnections = new Set<string>();
6870

6971
private runLimiter?: ReturnType<typeof pLimit>;
72+
private taskRunProcessPool?: TaskRunProcessPool;
7073

7174
constructor(public readonly options: WorkerRuntimeOptions) {}
7275

@@ -95,6 +98,42 @@ class DevSupervisor implements WorkerRuntime {
9598

9699
this.runLimiter = pLimit(maxConcurrentRuns);
97100

101+
// Initialize the task run process pool
102+
const env = await this.#getEnvVars();
103+
104+
const enableProcessReuse =
105+
typeof this.options.config.experimental_processKeepAlive === "boolean"
106+
? this.options.config.experimental_processKeepAlive
107+
: typeof this.options.config.experimental_processKeepAlive === "object"
108+
? this.options.config.experimental_processKeepAlive.enabled
109+
: false;
110+
111+
const maxPoolSize =
112+
typeof this.options.config.experimental_processKeepAlive === "object"
113+
? this.options.config.experimental_processKeepAlive.devMaxPoolSize ?? 25
114+
: 25;
115+
116+
const maxExecutionsPerProcess =
117+
typeof this.options.config.experimental_processKeepAlive === "object"
118+
? this.options.config.experimental_processKeepAlive.maxExecutionsPerProcess ?? 50
119+
: 50;
120+
121+
if (enableProcessReuse) {
122+
logger.debug("[DevSupervisor] Enabling process reuse", {
123+
enableProcessReuse,
124+
maxPoolSize,
125+
maxExecutionsPerProcess,
126+
});
127+
}
128+
129+
this.taskRunProcessPool = new TaskRunProcessPool({
130+
env,
131+
cwd: this.options.config.workingDir,
132+
enableProcessReuse,
133+
maxPoolSize,
134+
maxExecutionsPerProcess,
135+
});
136+
98137
this.socket = this.#createSocket();
99138

100139
//start an SSE connection for presence
@@ -111,6 +150,17 @@ class DevSupervisor implements WorkerRuntime {
111150
} catch (error) {
112151
logger.debug("[DevSupervisor] shutdown, socket failed to close", { error });
113152
}
153+
154+
// Shutdown the task run process pool
155+
if (this.taskRunProcessPool) {
156+
const [shutdownError] = await tryCatch(this.taskRunProcessPool.shutdown());
157+
158+
if (shutdownError) {
159+
logger.debug("[DevSupervisor] shutdown, task run process pool failed to shutdown", {
160+
error: shutdownError,
161+
});
162+
}
163+
}
114164
}
115165

116166
async initializeWorker(
@@ -293,12 +343,21 @@ class DevSupervisor implements WorkerRuntime {
293343
continue;
294344
}
295345

346+
if (!this.taskRunProcessPool) {
347+
logger.debug(`[DevSupervisor] dequeueRuns. No task run process pool`, {
348+
run: message.run.friendlyId,
349+
worker,
350+
});
351+
continue;
352+
}
353+
296354
//new run
297355
runController = new DevRunController({
298356
runFriendlyId: message.run.friendlyId,
299357
worker: worker,
300358
httpClient: this.options.client,
301359
logLevel: this.options.args.logLevel,
360+
taskRunProcessPool: this.taskRunProcessPool,
302361
onFinished: () => {
303362
logger.debug("[DevSupervisor] Run finished", { runId: message.run.friendlyId });
304363

@@ -574,6 +633,10 @@ class DevSupervisor implements WorkerRuntime {
574633
return;
575634
}
576635

636+
if (worker.serverWorker?.version) {
637+
this.taskRunProcessPool?.deprecateVersion(worker.serverWorker?.version);
638+
}
639+
577640
if (this.#workerHasInProgressRuns(friendlyId)) {
578641
return;
579642
}

0 commit comments

Comments
 (0)