Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nice-colts-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"trigger.dev": patch
---

Improve warm start times by eagerly creating the child TaskRunProcess when a previous run as completed
18 changes: 9 additions & 9 deletions packages/cli-v3/src/entryPoints/dev-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,15 @@ export class DevRunController {
version: this.opts.worker.serverWorker?.version,
engine: "V2",
},
machine: execution.machine,
}).initialize();

logger.debug("executing task run process", {
attemptId: execution.attempt.id,
runId: execution.run.id,
});

const completion = await this.taskRunProcess.execute({
payload: {
execution,
traceContext: execution.run.traceContext ?? {},
Expand All @@ -630,15 +639,6 @@ export class DevRunController {
messageId: run.friendlyId,
});

await this.taskRunProcess.initialize();

logger.debug("executing task run process", {
attemptId: execution.attempt.id,
runId: execution.run.id,
});

const completion = await this.taskRunProcess.execute();

logger.debug("Completed run", completion);

try {
Expand Down
9 changes: 8 additions & 1 deletion packages/cli-v3/src/entryPoints/dev-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
logLevels,
ManagedRuntimeManager,
OtelTaskLogger,
populateEnv,
StandardLifecycleHooksManager,
StandardLocalsManager,
StandardMetadataManager,
Expand Down Expand Up @@ -238,7 +239,13 @@ const zodIpc = new ZodIpcConnection({
emitSchema: ExecutorToWorkerMessageCatalog,
process,
handlers: {
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics, env }, sender) => {
if (env) {
populateEnv(env, {
override: true,
});
}

log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);

standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);
Expand Down
54 changes: 36 additions & 18 deletions packages/cli-v3/src/entryPoints/managed-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,18 @@ class ManagedRunController {
this.exitProcess(this.successExitCode);
}

if (this.taskRunProcess) {
logger.debug("waitForNextRun: eagerly recreating task run process with options");
this.taskRunProcess = new TaskRunProcess({
...this.taskRunProcess.options,
isWarmStart: true,
}).initialize();
} else {
logger.debug(
"waitForNextRun: no existing task run process, so we can't eagerly recreate it"
);
}

// Check the service is up and get additional warm start config
const connect = await this.warmStartClient.connect();

Expand Down Expand Up @@ -904,6 +916,9 @@ class ManagedRunController {

private exitProcess(code?: number): never {
logger.log("Exiting process", { code });
if (this.taskRunProcess?.isPreparedForNextRun) {
this.taskRunProcess.forceExit();
}
process.exit(code);
}

Expand Down Expand Up @@ -980,30 +995,33 @@ class ManagedRunController {
}: WorkloadRunAttemptStartResponseBody) {
this.snapshotPoller.start();

this.taskRunProcess = new TaskRunProcess({
workerManifest: this.workerManifest,
env: envVars,
serverWorker: {
id: "unmanaged",
contentHash: env.TRIGGER_CONTENT_HASH,
version: env.TRIGGER_DEPLOYMENT_VERSION,
engine: "V2",
},
payload: {
execution,
traceContext: execution.run.traceContext ?? {},
},
messageId: run.friendlyId,
});

await this.taskRunProcess.initialize();
if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) {
this.taskRunProcess = new TaskRunProcess({
workerManifest: this.workerManifest,
env: envVars,
serverWorker: {
id: "unmanaged",
contentHash: env.TRIGGER_CONTENT_HASH,
version: env.TRIGGER_DEPLOYMENT_VERSION,
engine: "V2",
},
machine: execution.machine,
}).initialize();
}

logger.log("executing task run process", {
attemptId: execution.attempt.id,
runId: execution.run.id,
});

const completion = await this.taskRunProcess.execute();
const completion = await this.taskRunProcess.execute({
payload: {
execution,
traceContext: execution.run.traceContext ?? {},
},
messageId: run.friendlyId,
env: envVars,
});

logger.log("Completed run", completion);

Expand Down
53 changes: 51 additions & 2 deletions packages/cli-v3/src/entryPoints/managed-run-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
logLevels,
ManagedRuntimeManager,
OtelTaskLogger,
populateEnv,
ProdUsageManager,
StandardLifecycleHooksManager,
StandardLocalsManager,
Expand Down Expand Up @@ -248,7 +249,16 @@ const zodIpc = new ZodIpcConnection({
emitSchema: ExecutorToWorkerMessageCatalog,
process,
handlers: {
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
EXECUTE_TASK_RUN: async (
{ execution, traceContext, metadata, metrics, env, isWarmStart },
sender
) => {
if (env) {
populateEnv(env, {
override: true,
});
}

standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);

console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);
Expand Down Expand Up @@ -383,6 +393,7 @@ const zodIpc = new ZodIpcConnection({
tracingSDK,
consoleInterceptor,
retries: config.retries,
isWarmStart,
});

try {
Expand Down Expand Up @@ -461,12 +472,35 @@ const zodIpc = new ZodIpcConnection({
async function flushAll(timeoutInMs: number = 10_000) {
const now = performance.now();

await Promise.all([
const results = await Promise.allSettled([
flushUsage(timeoutInMs),
flushTracingSDK(timeoutInMs),
flushMetadata(timeoutInMs),
]);

const successfulFlushes = results
.filter((result) => result.status === "fulfilled")
.map((result) => result.value.flushed);
const failedFlushes = ["usage", "tracingSDK", "runMetadata"].filter(
(flushed) => !successfulFlushes.includes(flushed)
);

if (failedFlushes.length > 0) {
console.error(`Failed to flush ${failedFlushes.join(", ")}`);
}

const errorMessages = results
.filter((result) => result.status === "rejected")
.map((result) => result.reason);

if (errorMessages.length > 0) {
console.error(errorMessages.join("\n"));
}

for (const flushed of successfulFlushes) {
console.log(`Flushed ${flushed} successfully`);
}

const duration = performance.now() - now;

console.log(`Flushed all in ${duration}ms`);
Expand All @@ -480,6 +514,11 @@ async function flushUsage(timeoutInMs: number = 10_000) {
const duration = performance.now() - now;

console.log(`Flushed usage in ${duration}ms`);

return {
flushed: "usage",
durationMs: duration,
};
}

async function flushTracingSDK(timeoutInMs: number = 10_000) {
Expand All @@ -490,6 +529,11 @@ async function flushTracingSDK(timeoutInMs: number = 10_000) {
const duration = performance.now() - now;

console.log(`Flushed tracingSDK in ${duration}ms`);

return {
flushed: "tracingSDK",
durationMs: duration,
};
}

async function flushMetadata(timeoutInMs: number = 10_000) {
Expand All @@ -500,6 +544,11 @@ async function flushMetadata(timeoutInMs: number = 10_000) {
const duration = performance.now() - now;

console.log(`Flushed runMetadata in ${duration}ms`);

return {
flushed: "runMetadata",
durationMs: duration,
};
}

const managedWorkerRuntime = new ManagedRuntimeManager(zodIpc, true);
Expand Down
Loading
Loading