diff --git a/.changeset/polite-badgers-suffer.md b/.changeset/polite-badgers-suffer.md new file mode 100644 index 0000000000..bba234e21d --- /dev/null +++ b/.changeset/polite-badgers-suffer.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +experimental processKeepAlive diff --git a/apps/supervisor/src/util.ts b/apps/supervisor/src/util.ts index 7cb554cd03..4fcda27b2a 100644 --- a/apps/supervisor/src/util.ts +++ b/apps/supervisor/src/util.ts @@ -1,5 +1,15 @@ import { isMacOS, isWindows } from "std-env"; +export function normalizeDockerHostUrl(url: string) { + const $url = new URL(url); + + if ($url.hostname === "localhost") { + $url.hostname = getDockerHostDomain(); + } + + return $url.toString(); +} + export function getDockerHostDomain() { return isMacOS || isWindows ? "host.docker.internal" : "localhost"; } diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index e3b39bfed6..6aa74a7ecc 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -5,7 +5,7 @@ import { type WorkloadManagerOptions, } from "./types.js"; import { env } from "../env.js"; -import { getDockerHostDomain, getRunnerId } from "../util.js"; +import { getDockerHostDomain, getRunnerId, normalizeDockerHostUrl } from "../util.js"; import Docker from "dockerode"; import { tryCatch } from "@trigger.dev/core"; @@ -78,7 +78,7 @@ export class DockerWorkloadManager implements WorkloadManager { ]; if (this.opts.warmStartUrl) { - envVars.push(`TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`); + envVars.push(`TRIGGER_WARM_START_URL=${normalizeDockerHostUrl(this.opts.warmStartUrl)}`); } if (this.opts.metadataUrl) { diff --git a/apps/webapp/app/utils/timelineSpanEvents.ts b/apps/webapp/app/utils/timelineSpanEvents.ts index 0dcbd2a401..9e09ba0a79 100644 --- a/apps/webapp/app/utils/timelineSpanEvents.ts +++ b/apps/webapp/app/utils/timelineSpanEvents.ts @@ -142,9 +142,6 @@ function getFriendlyNameForEvent(event: string, properties?: Record return "Attempt created"; } case "import": { - if (properties && typeof properties.file === "string") { - return `Importing ${properties.file}`; - } return "Importing task file"; } case "lazy_payload": { diff --git a/apps/webapp/app/v3/otlpExporter.server.ts b/apps/webapp/app/v3/otlpExporter.server.ts index bcfefdabb4..b87718f86e 100644 --- a/apps/webapp/app/v3/otlpExporter.server.ts +++ b/apps/webapp/app/v3/otlpExporter.server.ts @@ -183,7 +183,7 @@ class OTLPExporter { function convertLogsToCreateableEvents(resourceLog: ResourceLogs): Array { const resourceAttributes = resourceLog.resource?.attributes ?? []; - const resourceProperties = extractResourceProperties(resourceAttributes); + const resourceProperties = extractEventProperties(resourceAttributes); return resourceLog.scopeLogs.flatMap((scopeLog) => { return scopeLog.logRecords @@ -194,6 +194,11 @@ function convertLogsToCreateableEvents(resourceLog: ResourceLogs): Array { const resourceAttributes = resourceSpan.resource?.attributes ?? []; - const resourceProperties = extractResourceProperties(resourceAttributes); + const resourceProperties = extractEventProperties(resourceAttributes); return resourceSpan.scopeSpans.flatMap((scopeSpan) => { return scopeSpan.spans @@ -269,6 +297,11 @@ function convertSpansToCreateableEvents(resourceSpan: ResourceSpans): Array +): string | undefined; +function extractStringAttribute( + attributes: KeyValue[], + name: string | Array, + fallback: string +): string; +function extractStringAttribute( + attributes: KeyValue[], + name: string | Array, fallback?: string ): string | undefined { - const attribute = attributes.find((attribute) => attribute.key === name); + const key = Array.isArray(name) ? name.filter(Boolean).join(".") : name; + + const attribute = attributes.find((attribute) => attribute.key === key); if (!attribute) return fallback; return isStringValue(attribute?.value) ? attribute.value.stringValue : fallback; } -function extractNumberAttribute(attributes: KeyValue[], name: string): number | undefined; -function extractNumberAttribute(attributes: KeyValue[], name: string, fallback: number): number; function extractNumberAttribute( attributes: KeyValue[], - name: string, + name: string | Array +): number | undefined; +function extractNumberAttribute( + attributes: KeyValue[], + name: string | Array, + fallback: number +): number; +function extractNumberAttribute( + attributes: KeyValue[], + name: string | Array, fallback?: number ): number | undefined { - const attribute = attributes.find((attribute) => attribute.key === name); + const key = Array.isArray(name) ? name.filter(Boolean).join(".") : name; + + const attribute = attributes.find((attribute) => attribute.key === key); if (!attribute) return fallback; return isIntValue(attribute?.value) ? Number(attribute.value.intValue) : fallback; } -function extractDoubleAttribute(attributes: KeyValue[], name: string): number | undefined; -function extractDoubleAttribute(attributes: KeyValue[], name: string, fallback: number): number; function extractDoubleAttribute( attributes: KeyValue[], - name: string, + name: string | Array +): number | undefined; +function extractDoubleAttribute( + attributes: KeyValue[], + name: string | Array, + fallback: number +): number; +function extractDoubleAttribute( + attributes: KeyValue[], + name: string | Array, fallback?: number ): number | undefined { - const attribute = attributes.find((attribute) => attribute.key === name); + const key = Array.isArray(name) ? name.filter(Boolean).join(".") : name; + + const attribute = attributes.find((attribute) => attribute.key === key); if (!attribute) return fallback; return isDoubleValue(attribute?.value) ? Number(attribute.value.doubleValue) : fallback; } -function extractBooleanAttribute(attributes: KeyValue[], name: string): boolean | undefined; -function extractBooleanAttribute(attributes: KeyValue[], name: string, fallback: boolean): boolean; function extractBooleanAttribute( attributes: KeyValue[], - name: string, + name: string | Array +): boolean | undefined; +function extractBooleanAttribute( + attributes: KeyValue[], + name: string | Array, + fallback: boolean +): boolean; +function extractBooleanAttribute( + attributes: KeyValue[], + name: string | Array, fallback?: boolean ): boolean | undefined { - const attribute = attributes.find((attribute) => attribute.key === name); + const key = Array.isArray(name) ? name.filter(Boolean).join(".") : name; + + const attribute = attributes.find((attribute) => attribute.key === key); if (!attribute) return fallback; diff --git a/apps/webapp/test/timelineSpanEvents.test.ts b/apps/webapp/test/timelineSpanEvents.test.ts index adc02b6f57..1c34bb8d13 100644 --- a/apps/webapp/test/timelineSpanEvents.test.ts +++ b/apps/webapp/test/timelineSpanEvents.test.ts @@ -76,7 +76,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => { expect(result.some((event) => event.name === "Dequeued")).toBe(true); expect(result.some((event) => event.name === "Launched")).toBe(true); expect(result.some((event) => event.name === "Attempt created")).toBe(true); - expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true); + expect(result.some((event) => event.name === "Importing task file")).toBe(true); }); test("should sort events by timestamp", () => { @@ -86,7 +86,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => { expect(result[0].name).toBe("Dequeued"); expect(result[1].name).toBe("Attempt created"); expect(result[2].name).toBe("Launched"); - expect(result[3].name).toBe("Importing src/trigger/chat.ts"); + expect(result[3].name).toBe("Importing task file"); }); test("should calculate offsets correctly from the first event", () => { @@ -176,7 +176,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => { expect(result.find((e) => e.name === "Attempt created")?.helpText).toBe( "An attempt was created for the run" ); - expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.helpText).toBe( + expect(result.find((e) => e.name === "Importing task file")?.helpText).toBe( "A task file was imported" ); }); @@ -187,7 +187,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => { expect(result.find((e) => e.name === "Dequeued")?.duration).toBe(0); expect(result.find((e) => e.name === "Launched")?.duration).toBe(127); expect(result.find((e) => e.name === "Attempt created")?.duration).toBe(56); - expect(result.find((e) => e.name === "Importing src/trigger/chat.ts")?.duration).toBe(67); + expect(result.find((e) => e.name === "Importing task file")?.duration).toBe(67); }); test("should use fallback name for import event without file property", () => { @@ -214,7 +214,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => { // Without fork event, import should also be visible for non-admins expect(result.length).toBe(2); expect(result.some((event) => event.name === "Dequeued")).toBe(true); - expect(result.some((event) => event.name === "Importing src/trigger/chat.ts")).toBe(true); + expect(result.some((event) => event.name === "Importing task file")).toBe(true); // create_attempt should still be admin-only expect(result.some((event) => event.name === "Attempt created")).toBe(false); diff --git a/internal-packages/run-engine/vitest.config.ts b/internal-packages/run-engine/vitest.config.ts index 1d779c0957..c364293a43 100644 --- a/internal-packages/run-engine/vitest.config.ts +++ b/internal-packages/run-engine/vitest.config.ts @@ -11,7 +11,7 @@ export default defineConfig({ singleThread: true, }, }, - testTimeout: 60_000, + testTimeout: 120_000, coverage: { provider: "v8", }, diff --git a/packages/cli-v3/src/dev/devSupervisor.ts b/packages/cli-v3/src/dev/devSupervisor.ts index 677999dcae..ce3835714d 100644 --- a/packages/cli-v3/src/dev/devSupervisor.ts +++ b/packages/cli-v3/src/dev/devSupervisor.ts @@ -25,6 +25,8 @@ import { import pLimit from "p-limit"; import { resolveLocalEnvVars } from "../utilities/localEnvVars.js"; import type { Metafile } from "esbuild"; +import { TaskRunProcessPool } from "./taskRunProcessPool.js"; +import { tryCatch } from "@trigger.dev/core/utils"; export type WorkerRuntimeOptions = { name: string | undefined; @@ -67,6 +69,7 @@ class DevSupervisor implements WorkerRuntime { private socketConnections = new Set(); private runLimiter?: ReturnType; + private taskRunProcessPool?: TaskRunProcessPool; constructor(public readonly options: WorkerRuntimeOptions) {} @@ -95,6 +98,42 @@ class DevSupervisor implements WorkerRuntime { this.runLimiter = pLimit(maxConcurrentRuns); + // Initialize the task run process pool + const env = await this.#getEnvVars(); + + const enableProcessReuse = + typeof this.options.config.experimental_processKeepAlive === "boolean" + ? this.options.config.experimental_processKeepAlive + : typeof this.options.config.experimental_processKeepAlive === "object" + ? this.options.config.experimental_processKeepAlive.enabled + : false; + + const maxPoolSize = + typeof this.options.config.experimental_processKeepAlive === "object" + ? this.options.config.experimental_processKeepAlive.devMaxPoolSize ?? 25 + : 25; + + const maxExecutionsPerProcess = + typeof this.options.config.experimental_processKeepAlive === "object" + ? this.options.config.experimental_processKeepAlive.maxExecutionsPerProcess ?? 50 + : 50; + + if (enableProcessReuse) { + logger.debug("[DevSupervisor] Enabling process reuse", { + enableProcessReuse, + maxPoolSize, + maxExecutionsPerProcess, + }); + } + + this.taskRunProcessPool = new TaskRunProcessPool({ + env, + cwd: this.options.config.workingDir, + enableProcessReuse, + maxPoolSize, + maxExecutionsPerProcess, + }); + this.socket = this.#createSocket(); //start an SSE connection for presence @@ -111,6 +150,17 @@ class DevSupervisor implements WorkerRuntime { } catch (error) { logger.debug("[DevSupervisor] shutdown, socket failed to close", { error }); } + + // Shutdown the task run process pool + if (this.taskRunProcessPool) { + const [shutdownError] = await tryCatch(this.taskRunProcessPool.shutdown()); + + if (shutdownError) { + logger.debug("[DevSupervisor] shutdown, task run process pool failed to shutdown", { + error: shutdownError, + }); + } + } } async initializeWorker( @@ -293,12 +343,21 @@ class DevSupervisor implements WorkerRuntime { continue; } + if (!this.taskRunProcessPool) { + logger.debug(`[DevSupervisor] dequeueRuns. No task run process pool`, { + run: message.run.friendlyId, + worker, + }); + continue; + } + //new run runController = new DevRunController({ runFriendlyId: message.run.friendlyId, worker: worker, httpClient: this.options.client, logLevel: this.options.args.logLevel, + taskRunProcessPool: this.taskRunProcessPool, onFinished: () => { logger.debug("[DevSupervisor] Run finished", { runId: message.run.friendlyId }); @@ -574,6 +633,10 @@ class DevSupervisor implements WorkerRuntime { return; } + if (worker.serverWorker?.version) { + this.taskRunProcessPool?.deprecateVersion(worker.serverWorker?.version); + } + if (this.#workerHasInProgressRuns(friendlyId)) { return; } diff --git a/packages/cli-v3/src/dev/taskRunProcessPool.ts b/packages/cli-v3/src/dev/taskRunProcessPool.ts new file mode 100644 index 0000000000..3b53a6f3f3 --- /dev/null +++ b/packages/cli-v3/src/dev/taskRunProcessPool.ts @@ -0,0 +1,284 @@ +import { + MachinePresetResources, + ServerBackgroundWorker, + WorkerManifest, +} from "@trigger.dev/core/v3"; +import { TaskRunProcess } from "../executions/taskRunProcess.js"; +import { logger } from "../utilities/logger.js"; + +export type TaskRunProcessPoolOptions = { + env: Record; + cwd: string; + enableProcessReuse: boolean; + maxPoolSize?: number; + maxExecutionsPerProcess?: number; +}; + +export class TaskRunProcessPool { + // Group processes by worker version + private availableProcessesByVersion: Map = new Map(); + private busyProcessesByVersion: Map> = new Map(); + private readonly options: TaskRunProcessPoolOptions; + private readonly maxPoolSize: number; + private readonly maxExecutionsPerProcess: number; + private readonly executionCountsPerProcess: Map = new Map(); + private readonly deprecatedVersions: Set = new Set(); + + constructor(options: TaskRunProcessPoolOptions) { + this.options = options; + this.maxPoolSize = options.maxPoolSize ?? 3; + this.maxExecutionsPerProcess = options.maxExecutionsPerProcess ?? 50; + } + + deprecateVersion(version: string) { + this.deprecatedVersions.add(version); + + logger.debug("[TaskRunProcessPool] Deprecating version", { version }); + + const versionProcesses = this.availableProcessesByVersion.get(version) || []; + + const processesToKill = versionProcesses.filter((process) => !process.isExecuting()); + Promise.all(processesToKill.map((process) => this.killProcess(process))).then(() => { + this.availableProcessesByVersion.delete(version); + }); + } + + async getProcess( + workerManifest: WorkerManifest, + serverWorker: ServerBackgroundWorker, + machineResources: MachinePresetResources, + env?: Record + ): Promise<{ taskRunProcess: TaskRunProcess; isReused: boolean }> { + const version = serverWorker.version || "unknown"; + + // Try to reuse an existing process if enabled + if (this.options.enableProcessReuse) { + const reusableProcess = this.findReusableProcess(version); + if (reusableProcess) { + const availableCount = this.availableProcessesByVersion.get(version)?.length || 0; + const busyCount = this.busyProcessesByVersion.get(version)?.size || 0; + + logger.debug("[TaskRunProcessPool] Reusing existing process", { + version, + availableCount, + busyCount, + }); + + // Remove from available and add to busy for this version + const availableProcesses = this.availableProcessesByVersion.get(version) || []; + this.availableProcessesByVersion.set( + version, + availableProcesses.filter((p) => p !== reusableProcess) + ); + + if (!this.busyProcessesByVersion.has(version)) { + this.busyProcessesByVersion.set(version, new Set()); + } + this.busyProcessesByVersion.get(version)!.add(reusableProcess); + + return { taskRunProcess: reusableProcess, isReused: true }; + } else { + const availableCount = this.availableProcessesByVersion.get(version)?.length || 0; + const busyCount = this.busyProcessesByVersion.get(version)?.size || 0; + + logger.debug("[TaskRunProcessPool] No reusable process found", { + version, + availableCount, + busyCount, + }); + } + } + + // Create new process + const availableCount = this.availableProcessesByVersion.get(version)?.length || 0; + const busyCount = this.busyProcessesByVersion.get(version)?.size || 0; + + logger.debug("[TaskRunProcessPool] Creating new process", { + version, + availableCount, + busyCount, + }); + + const newProcess = new TaskRunProcess({ + workerManifest, + env: { + ...this.options.env, + ...env, + }, + serverWorker, + machineResources, + cwd: this.options.cwd, + }).initialize(); + + // Add to busy processes for this version + if (!this.busyProcessesByVersion.has(version)) { + this.busyProcessesByVersion.set(version, new Set()); + } + this.busyProcessesByVersion.get(version)!.add(newProcess); + + return { taskRunProcess: newProcess, isReused: false }; + } + + async returnProcess(process: TaskRunProcess, version: string): Promise { + // Remove from busy processes for this version + const busyProcesses = this.busyProcessesByVersion.get(version); + if (busyProcesses) { + busyProcesses.delete(process); + } + + if (process.pid) { + this.executionCountsPerProcess.set( + process.pid, + (this.executionCountsPerProcess.get(process.pid) ?? 0) + 1 + ); + } + + if (this.shouldReuseProcess(process, version)) { + const availableCount = this.availableProcessesByVersion.get(version)?.length || 0; + const busyCount = this.busyProcessesByVersion.get(version)?.size || 0; + + logger.debug("[TaskRunProcessPool] Returning process to pool", { + version, + availableCount, + busyCount, + }); + + // Clean up but don't kill the process + try { + await process.cleanup(false); + + // Add to available processes for this version + if (!this.availableProcessesByVersion.has(version)) { + this.availableProcessesByVersion.set(version, []); + } + this.availableProcessesByVersion.get(version)!.push(process); + } catch (error) { + logger.debug("[TaskRunProcessPool] Failed to cleanup process for reuse, killing it", { + error, + }); + await this.killProcess(process); + } + } else { + const availableCount = this.availableProcessesByVersion.get(version)?.length || 0; + const busyCount = this.busyProcessesByVersion.get(version)?.size || 0; + + logger.debug("[TaskRunProcessPool] Killing process", { + version, + availableCount, + busyCount, + }); + await this.killProcess(process); + } + } + + private findReusableProcess(version: string): TaskRunProcess | undefined { + const availableProcesses = this.availableProcessesByVersion.get(version) || []; + return availableProcesses.find((process) => this.isProcessHealthy(process)); + } + + private shouldReuseProcess(process: TaskRunProcess, version: string): boolean { + const isHealthy = this.isProcessHealthy(process); + const isBeingKilled = process.isBeingKilled; + const pid = process.pid; + const executionCount = this.executionCountsPerProcess.get(pid ?? 0) ?? 0; + const availableCount = this.availableProcessesByVersion.get(version)?.length || 0; + const busyCount = this.busyProcessesByVersion.get(version)?.size || 0; + const isDeprecated = this.deprecatedVersions.has(version); + + logger.debug("[TaskRunProcessPool] Checking if process should be reused", { + version, + isHealthy, + isBeingKilled, + pid, + availableCount, + busyCount, + maxPoolSize: this.maxPoolSize, + executionCount, + isDeprecated, + }); + + return ( + this.options.enableProcessReuse && + this.isProcessHealthy(process) && + availableCount < this.maxPoolSize && + executionCount < this.maxExecutionsPerProcess && + !isDeprecated + ); + } + + private isProcessHealthy(process: TaskRunProcess): boolean { + // Basic health checks - we can expand this later + return !process.isBeingKilled && process.pid !== undefined; + } + + private async killProcess(process: TaskRunProcess): Promise { + try { + await process.cleanup(true); + } catch (error) { + logger.debug("[TaskRunProcessPool] Error killing process", { error }); + } + } + + async shutdown(): Promise { + const totalAvailable = Array.from(this.availableProcessesByVersion.values()).reduce( + (sum, processes) => sum + processes.length, + 0 + ); + const totalBusy = Array.from(this.busyProcessesByVersion.values()).reduce( + (sum, processes) => sum + processes.size, + 0 + ); + + logger.debug("[TaskRunProcessPool] Shutting down pool", { + availableCount: totalAvailable, + busyCount: totalBusy, + versions: Array.from(this.availableProcessesByVersion.keys()), + }); + + // Kill all available processes across all versions + const allAvailableProcesses = Array.from(this.availableProcessesByVersion.values()).flat(); + await Promise.all(allAvailableProcesses.map((process) => this.killProcess(process))); + this.availableProcessesByVersion.clear(); + + // Kill all busy processes across all versions + const allBusyProcesses = Array.from(this.busyProcessesByVersion.values()) + .map((processSet) => Array.from(processSet)) + .flat(); + await Promise.all(allBusyProcesses.map((process) => this.killProcess(process))); + this.busyProcessesByVersion.clear(); + } + + getStats() { + const totalAvailable = Array.from(this.availableProcessesByVersion.values()).reduce( + (sum, processes) => sum + processes.length, + 0 + ); + const totalBusy = Array.from(this.busyProcessesByVersion.values()).reduce( + (sum, processes) => sum + processes.size, + 0 + ); + + const statsByVersion: Record = {}; + for (const [version, processes] of this.availableProcessesByVersion.entries()) { + statsByVersion[version] = { + available: processes.length, + busy: this.busyProcessesByVersion.get(version)?.size || 0, + }; + } + for (const [version, processes] of this.busyProcessesByVersion.entries()) { + if (!statsByVersion[version]) { + statsByVersion[version] = { + available: 0, + busy: processes.size, + }; + } + } + + return { + availableCount: totalAvailable, + busyCount: totalBusy, + totalCount: totalAvailable + totalBusy, + byVersion: statsByVersion, + }; + } +} diff --git a/packages/cli-v3/src/entryPoints/dev-run-controller.ts b/packages/cli-v3/src/entryPoints/dev-run-controller.ts index 2f7ebc83fe..c52192783e 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-controller.ts @@ -19,6 +19,7 @@ import { sanitizeEnvVars } from "../utilities/sanitizeEnvVars.js"; import { join } from "node:path"; import { BackgroundWorker } from "../dev/backgroundWorker.js"; import { eventBus } from "../utilities/eventBus.js"; +import { TaskRunProcessPool } from "../dev/taskRunProcessPool.js"; type DevRunControllerOptions = { runFriendlyId: string; @@ -26,6 +27,7 @@ type DevRunControllerOptions = { httpClient: CliApiClient; logLevel: LogLevel; heartbeatIntervalSeconds?: number; + taskRunProcessPool: TaskRunProcessPool; onSubscribeToRunNotifications: (run: Run, snapshot: Snapshot) => void; onUnsubscribeFromRunNotifications: (run: Run, snapshot: Snapshot) => void; onFinished: () => void; @@ -605,45 +607,58 @@ export class DevRunController { this.snapshotPoller.start(); - this.taskRunProcess = new TaskRunProcess({ - workerManifest: this.opts.worker.manifest, - env: { - ...sanitizeEnvVars(envVars ?? {}), - ...sanitizeEnvVars(this.opts.worker.params.env), - TRIGGER_WORKER_MANIFEST_PATH: join(this.opts.worker.build.outputPath, "index.json"), - RUN_WORKER_SHOW_LOGS: this.opts.logLevel === "debug" ? "true" : "false", - TRIGGER_PROJECT_REF: execution.project.ref, - }, - serverWorker: { + // Get process from pool instead of creating new one + const { taskRunProcess, isReused } = await this.opts.taskRunProcessPool.getProcess( + this.opts.worker.manifest, + { id: "unmanaged", contentHash: this.opts.worker.build.contentHash, version: this.opts.worker.serverWorker?.version, engine: "V2", }, - machineResources: execution.machine, - }).initialize(); + execution.machine, + { + TRIGGER_WORKER_MANIFEST_PATH: join(this.opts.worker.build.outputPath, "index.json"), + RUN_WORKER_SHOW_LOGS: this.opts.logLevel === "debug" ? "true" : "false", + TRIGGER_WORKER_VERSION: this.opts.worker.serverWorker?.version, + } + ); + + this.taskRunProcess = taskRunProcess; - logger.debug("executing task run process", { + // Update the process environment for this specific run + // Note: We may need to enhance TaskRunProcess to support updating env vars + logger.debug("executing task run process from pool", { attemptNumber: execution.attempt.number, runId: execution.run.id, }); - const completion = await this.taskRunProcess.execute({ - payload: { - execution, - traceContext: execution.run.traceContext ?? {}, - metrics, + const completion = await this.taskRunProcess.execute( + { + payload: { + execution, + traceContext: execution.run.traceContext ?? {}, + metrics, + }, + messageId: run.friendlyId, + env: { + ...sanitizeEnvVars(envVars ?? {}), + ...sanitizeEnvVars(this.opts.worker.params.env), + TRIGGER_PROJECT_REF: execution.project.ref, + }, }, - messageId: run.friendlyId, - }); + isReused + ); logger.debug("Completed run", completion); + // Return process to pool instead of killing it try { - await this.taskRunProcess.cleanup(true); + const version = this.opts.worker.serverWorker?.version || "unknown"; + await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version); this.taskRunProcess = undefined; } catch (error) { - logger.debug("Failed to cleanup task run process, submitting completion anyway", { + logger.debug("Failed to return task run process to pool, submitting completion anyway", { error, }); } @@ -758,11 +773,15 @@ export class DevRunController { } private async runFinished() { - // Kill the run process - try { - await this.taskRunProcess?.kill("SIGKILL"); - } catch (error) { - logger.debug("Failed to kill task run process", { error }); + // Return the process to the pool instead of killing it directly + if (this.taskRunProcess) { + try { + const version = this.opts.worker.serverWorker?.version || "unknown"; + await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version); + this.taskRunProcess = undefined; + } catch (error) { + logger.debug("Failed to return task run process to pool during runFinished", { error }); + } } this.runHeartbeat.stop(); @@ -794,9 +813,11 @@ export class DevRunController { if (this.taskRunProcess && !this.taskRunProcess.isBeingKilled) { try { - await this.taskRunProcess.cleanup(true); + const version = this.opts.worker.serverWorker?.version || "unknown"; + await this.opts.taskRunProcessPool.returnProcess(this.taskRunProcess, version); + this.taskRunProcess = undefined; } catch (error) { - logger.debug("Failed to cleanup task run process", { error }); + logger.debug("Failed to return task run process to pool during stop", { error }); } } diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index f3994cd3c7..224a782221 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -19,6 +19,7 @@ import { runMetadata, runtime, runTimelineMetrics, + taskContext, TaskRunErrorCodes, TaskRunExecution, timeout, @@ -58,6 +59,7 @@ import sourceMapSupport from "source-map-support"; import { env } from "std-env"; import { normalizeImportPath } from "../utilities/normalizeImportPath.js"; import { VERSION } from "../version.js"; +import { promiseWithResolvers } from "@trigger.dev/core/utils"; sourceMapSupport.install({ handleUncaughtExceptions: false, @@ -99,6 +101,10 @@ process.on("uncaughtException", function (error, origin) { } }); +process.title = `trigger-dev-run-worker (${ + getEnvVar("TRIGGER_WORKER_VERSION") ?? "unknown version" +})`; + const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS"); const standardLocalsManager = new StandardLocalsManager(); @@ -112,8 +118,12 @@ runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager); const devUsageManager = new DevUsageManager(); usage.setGlobalUsageManager(devUsageManager); -timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); -resourceCatalog.setGlobalResourceCatalog(new StandardResourceCatalog()); + +const usageTimeoutManager = new UsageTimeoutManager(devUsageManager); +timeout.setGlobalManager(usageTimeoutManager); + +const standardResourceCatalog = new StandardResourceCatalog(); +resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog); const durableClock = new DurableClock(); clock.setGlobalClock(durableClock); @@ -153,84 +163,110 @@ async function loadWorkerManifest() { return WorkerManifest.parse(raw); } -async function bootstrap() { - const workerManifest = await loadWorkerManifest(); - - resourceCatalog.registerWorkerManifest(workerManifest); +async function doBootstrap() { + return await runTimelineMetrics.measureMetric("trigger.dev/start", "bootstrap", {}, async () => { + log("Bootstrapping worker"); - const { config, handleError } = await importConfig(workerManifest.configPath); + const workerManifest = await loadWorkerManifest(); - const tracingSDK = new TracingSDK({ - url: env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", - instrumentations: config.telemetry?.instrumentations ?? config.instrumentations ?? [], - exporters: config.telemetry?.exporters ?? [], - logExporters: config.telemetry?.logExporters ?? [], - diagLogLevel: (env.OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", - forceFlushTimeoutMillis: 30_000, - }); + resourceCatalog.registerWorkerManifest(workerManifest); - const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); - const otelLogger: Logger = tracingSDK.getLogger("trigger-dev-worker", VERSION); + const { config, handleError } = await importConfig(workerManifest.configPath); - const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger }); - const consoleInterceptor = new ConsoleInterceptor( - otelLogger, - typeof config.enableConsoleLogging === "boolean" ? config.enableConsoleLogging : true, - typeof config.disableConsoleInterceptor === "boolean" ? config.disableConsoleInterceptor : false - ); + const tracingSDK = new TracingSDK({ + url: env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", + instrumentations: config.telemetry?.instrumentations ?? config.instrumentations ?? [], + exporters: config.telemetry?.exporters ?? [], + logExporters: config.telemetry?.logExporters ?? [], + diagLogLevel: (env.OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", + forceFlushTimeoutMillis: 30_000, + }); - const configLogLevel = triggerLogLevel ?? config.logLevel ?? "info"; + const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); + const otelLogger: Logger = tracingSDK.getLogger("trigger-dev-worker", VERSION); - const otelTaskLogger = new OtelTaskLogger({ - logger: otelLogger, - tracer: tracer, - level: logLevels.includes(configLogLevel as any) ? (configLogLevel as LogLevel) : "info", - }); + const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger }); + const consoleInterceptor = new ConsoleInterceptor( + otelLogger, + typeof config.enableConsoleLogging === "boolean" ? config.enableConsoleLogging : true, + typeof config.disableConsoleInterceptor === "boolean" + ? config.disableConsoleInterceptor + : false + ); - logger.setGlobalTaskLogger(otelTaskLogger); + const configLogLevel = triggerLogLevel ?? config.logLevel ?? "info"; - if (config.init) { - lifecycleHooks.registerGlobalInitHook({ - id: "config", - fn: config.init as AnyOnInitHookFunction, + const otelTaskLogger = new OtelTaskLogger({ + logger: otelLogger, + tracer: tracer, + level: logLevels.includes(configLogLevel as any) ? (configLogLevel as LogLevel) : "info", }); - } - if (config.onStart) { - lifecycleHooks.registerGlobalStartHook({ - id: "config", - fn: config.onStart as AnyOnStartHookFunction, - }); - } + logger.setGlobalTaskLogger(otelTaskLogger); - if (config.onSuccess) { - lifecycleHooks.registerGlobalSuccessHook({ - id: "config", - fn: config.onSuccess as AnyOnSuccessHookFunction, - }); - } + if (config.init) { + lifecycleHooks.registerGlobalInitHook({ + id: "config", + fn: config.init as AnyOnInitHookFunction, + }); + } - if (config.onFailure) { - lifecycleHooks.registerGlobalFailureHook({ - id: "config", - fn: config.onFailure as AnyOnFailureHookFunction, - }); - } + if (config.onStart) { + lifecycleHooks.registerGlobalStartHook({ + id: "config", + fn: config.onStart as AnyOnStartHookFunction, + }); + } - if (handleError) { - lifecycleHooks.registerGlobalCatchErrorHook({ - id: "config", - fn: handleError as AnyOnCatchErrorHookFunction, - }); + if (config.onSuccess) { + lifecycleHooks.registerGlobalSuccessHook({ + id: "config", + fn: config.onSuccess as AnyOnSuccessHookFunction, + }); + } + + if (config.onFailure) { + lifecycleHooks.registerGlobalFailureHook({ + id: "config", + fn: config.onFailure as AnyOnFailureHookFunction, + }); + } + + if (handleError) { + lifecycleHooks.registerGlobalCatchErrorHook({ + id: "config", + fn: handleError as AnyOnCatchErrorHookFunction, + }); + } + + log("Bootstrapped worker"); + + return { + tracer, + tracingSDK, + consoleInterceptor, + config, + workerManifest, + }; + }); +} + +let bootstrapCache: + | { + tracer: TriggerTracer; + tracingSDK: TracingSDK; + consoleInterceptor: ConsoleInterceptor; + config: TriggerConfig; + workerManifest: WorkerManifest; + } + | undefined; + +async function bootstrap() { + if (!bootstrapCache) { + bootstrapCache = await doBootstrap(); } - return { - tracer, - tracingSDK, - consoleInterceptor, - config, - workerManifest, - }; + return bootstrapCache; } let _execution: TaskRunExecution | undefined; @@ -238,23 +274,67 @@ let _isRunning = false; let _isCancelled = false; let _tracingSDK: TracingSDK | undefined; let _executionMeasurement: UsageMeasurement | undefined; -const cancelController = new AbortController(); +let _cancelController = new AbortController(); +let _lastFlushPromise: Promise | undefined; +let _sharedWorkerRuntime: SharedRuntimeManager | undefined; + +let _lastEnv: Record | undefined; +let _executionCount = 0; + +function resetExecutionEnvironment() { + _execution = undefined; + _isRunning = false; + _isCancelled = false; + _executionMeasurement = undefined; + _cancelController = new AbortController(); + + standardLocalsManager.reset(); + standardLifecycleHooksManager.reset(); + standardRunTimelineMetricsManager.reset(); + devUsageManager.reset(); + usageTimeoutManager.reset(); + runMetadataManager.reset(); + waitUntilManager.reset(); + _sharedWorkerRuntime?.reset(); + durableClock.reset(); + taskContext.disable(); + + log(`[${new Date().toISOString()}] Reset execution environment`); +} const zodIpc = new ZodIpcConnection({ listenSchema: WorkerToExecutorMessageCatalog, emitSchema: ExecutorToWorkerMessageCatalog, process, handlers: { - EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics, env }, sender) => { + EXECUTE_TASK_RUN: async ( + { execution, traceContext, metadata, metrics, env, isWarmStart }, + sender + ) => { if (env) { populateEnv(env, { override: true, + previousEnv: _lastEnv, }); + + _lastEnv = env; } log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); - standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics); + if (_lastFlushPromise) { + const now = performance.now(); + + await _lastFlushPromise; + + const duration = performance.now() - now; + + log(`[${new Date().toISOString()}] Awaited last flush in ${duration}ms`); + } + + resetExecutionEnvironment(); + + standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); if (_isRunning) { logError("Worker is already running a task"); @@ -309,66 +389,74 @@ const zodIpc = new ZodIpcConnection({ return; } - try { - await runTimelineMetrics.measureMetric( - "trigger.dev/start", - "import", - { - entryPoint: taskManifest.entryPoint, - file: taskManifest.filePath, - }, - async () => { - const beforeImport = performance.now(); - resourceCatalog.setCurrentFileContext(taskManifest.entryPoint, taskManifest.filePath); - - // Load init file if it exists - if (workerManifest.initEntryPoint) { - try { - await import(normalizeImportPath(workerManifest.initEntryPoint)); - log(`Loaded init file from ${workerManifest.initEntryPoint}`); - } catch (err) { - logError(`Failed to load init file`, err); - throw err; + // First attempt to get the task from the resource catalog + let task = resourceCatalog.getTask(execution.task.id); + + if (!task) { + log(`Could not find task ${execution.task.id} in resource catalog, importing...`); + + try { + await runTimelineMetrics.measureMetric( + "trigger.dev/start", + "import", + { + entryPoint: taskManifest.entryPoint, + file: taskManifest.filePath, + }, + async () => { + const beforeImport = performance.now(); + resourceCatalog.setCurrentFileContext( + taskManifest.entryPoint, + taskManifest.filePath + ); + + // Load init file if it exists + if (workerManifest.initEntryPoint) { + try { + await import(normalizeImportPath(workerManifest.initEntryPoint)); + log(`Loaded init file from ${workerManifest.initEntryPoint}`); + } catch (err) { + logError(`Failed to load init file`, err); + throw err; + } } - } - await import(normalizeImportPath(taskManifest.entryPoint)); - resourceCatalog.clearCurrentFileContext(); - const durationMs = performance.now() - beforeImport; + await import(normalizeImportPath(taskManifest.entryPoint)); + resourceCatalog.clearCurrentFileContext(); + const durationMs = performance.now() - beforeImport; - log( - `Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms` - ); - } - ); - } catch (err) { - logError(`Failed to import task ${execution.task.id}`, err); + log( + `Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms` + ); + } + ); + } catch (err) { + logError(`Failed to import task ${execution.task.id}`, err); - await sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ok: false, - id: execution.run.id, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK, - message: err instanceof Error ? err.message : String(err), - stackTrace: err instanceof Error ? err.stack : undefined, - }, - usage: { - durationMs: 0, + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK, + message: err instanceof Error ? err.message : String(err), + stackTrace: err instanceof Error ? err.stack : undefined, + }, + usage: { + durationMs: 0, + }, + metadata: runMetadataManager.stopAndReturnLastFlush(), }, - metadata: runMetadataManager.stopAndReturnLastFlush(), - }, - }); - - return; - } + }); - process.title = `trigger-dev-worker: ${execution.task.id} ${execution.run.id}`; + return; + } - // Import the task module - const task = resourceCatalog.getTask(execution.task.id); + // Now try and get the task again + task = resourceCatalog.getTask(execution.task.id); + } if (!task) { logError(`Could not find task ${execution.task.id}`); @@ -393,12 +481,15 @@ const zodIpc = new ZodIpcConnection({ } runMetadataManager.runId = execution.run.id; + _executionCount++; const executor = new TaskExecutor(task, { tracer, tracingSDK, consoleInterceptor, retries: config.retries, + isWarmStart, + executionCount: _executionCount, }); try { @@ -413,7 +504,7 @@ const zodIpc = new ZodIpcConnection({ const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration); - const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]); + const signal = AbortSignal.any([_cancelController.signal, timeoutController.signal]); const { result } = await executor.execute(execution, metadata, traceContext, signal); @@ -434,6 +525,7 @@ const zodIpc = new ZodIpcConnection({ } finally { _execution = undefined; _isRunning = false; + log(`[${new Date().toISOString()}] Task run completed`); } } catch (err) { logError("Failed to execute task", err); @@ -459,7 +551,7 @@ const zodIpc = new ZodIpcConnection({ }, CANCEL: async ({ timeoutInMs }) => { _isCancelled = true; - cancelController.abort("run cancelled"); + _cancelController.abort("run cancelled"); await callCancelHooks(timeoutInMs); if (_executionMeasurement) { usage.stop(_executionMeasurement); @@ -470,7 +562,7 @@ const zodIpc = new ZodIpcConnection({ await flushAll(timeoutInMs); }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { - sharedWorkerRuntime.resolveWaitpoints([waitpoint]); + _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); }, }, }); @@ -490,6 +582,10 @@ async function callCancelHooks(timeoutInMs: number = 10_000) { async function flushAll(timeoutInMs: number = 10_000) { const now = performance.now(); + const { promise, resolve } = promiseWithResolvers(); + + _lastFlushPromise = promise; + const results = await Promise.allSettled([ flushTracingSDK(timeoutInMs), flushMetadata(timeoutInMs), @@ -522,6 +618,9 @@ async function flushAll(timeoutInMs: number = 10_000) { const duration = performance.now() - now; log(`Flushed all in ${duration}ms`); + + // Resolve the last flush promise + resolve(); } async function flushTracingSDK(timeoutInMs: number = 10_000) { @@ -554,10 +653,8 @@ async function flushMetadata(timeoutInMs: number = 10_000) { }; } -const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs); -runtime.setGlobalRuntimeManager(sharedWorkerRuntime); - -process.title = "trigger-managed-worker"; +_sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs); +runtime.setGlobalRuntimeManager(_sharedWorkerRuntime); const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10); diff --git a/packages/cli-v3/src/entryPoints/managed-index-worker.ts b/packages/cli-v3/src/entryPoints/managed-index-worker.ts index 6ae67dd49d..426f31bf27 100644 --- a/packages/cli-v3/src/entryPoints/managed-index-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-index-worker.ts @@ -159,6 +159,12 @@ await sendMessageInCatalog( loaderEntryPoint: buildManifest.loaderEntryPoint, customConditions: buildManifest.customConditions, initEntryPoint: buildManifest.initEntryPoint, + processKeepAlive: + typeof config.experimental_processKeepAlive === "object" + ? config.experimental_processKeepAlive + : typeof config.experimental_processKeepAlive === "boolean" + ? { enabled: config.experimental_processKeepAlive } + : undefined, timings, }, importErrors, diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index b0ff027ec4..20ea1582fd 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -18,6 +18,7 @@ import { runMetadata, runtime, runTimelineMetrics, + taskContext, TaskRunErrorCodes, TaskRunExecution, timeout, @@ -58,6 +59,7 @@ import sourceMapSupport from "source-map-support"; import { env } from "std-env"; import { normalizeImportPath } from "../utilities/normalizeImportPath.js"; import { VERSION } from "../version.js"; +import { promiseWithResolvers } from "@trigger.dev/core/utils"; sourceMapSupport.install({ handleUncaughtExceptions: false, @@ -110,15 +112,18 @@ lifecycleHooks.setGlobalLifecycleHooksManager(standardLifecycleHooksManager); const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager(); runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager); -resourceCatalog.setGlobalResourceCatalog(new StandardResourceCatalog()); +const standardResourceCatalog = new StandardResourceCatalog(); +resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog); const durableClock = new DurableClock(); clock.setGlobalClock(durableClock); + const runMetadataManager = new StandardMetadataManager( apiClientManager.clientOrThrow(), getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev" ); runMetadata.setGlobalManager(runMetadataManager); + const waitUntilManager = new StandardWaitUntilManager(); waitUntil.setGlobalManager(waitUntilManager); // Wait for all streams to finish before completing the run @@ -149,86 +154,108 @@ async function loadWorkerManifest() { return WorkerManifest.parse(raw); } -async function bootstrap() { - const workerManifest = await loadWorkerManifest(); +async function doBootstrap() { + return await runTimelineMetrics.measureMetric("trigger.dev/start", "bootstrap", {}, async () => { + const workerManifest = await loadWorkerManifest(); - resourceCatalog.registerWorkerManifest(workerManifest); + resourceCatalog.registerWorkerManifest(workerManifest); - const { config, handleError } = await importConfig( - normalizeImportPath(workerManifest.configPath) - ); + const { config, handleError } = await importConfig( + normalizeImportPath(workerManifest.configPath) + ); - const tracingSDK = new TracingSDK({ - url: env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", - instrumentations: config.instrumentations ?? [], - diagLogLevel: (env.OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", - forceFlushTimeoutMillis: 30_000, - exporters: config.telemetry?.exporters ?? [], - logExporters: config.telemetry?.logExporters ?? [], - }); + const tracingSDK = new TracingSDK({ + url: env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318", + instrumentations: config.instrumentations ?? [], + diagLogLevel: (env.OTEL_LOG_LEVEL as TracingDiagnosticLogLevel) ?? "none", + forceFlushTimeoutMillis: 30_000, + exporters: config.telemetry?.exporters ?? [], + logExporters: config.telemetry?.logExporters ?? [], + }); - const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); - const otelLogger: Logger = tracingSDK.getLogger("trigger-dev-worker", VERSION); + const otelTracer: Tracer = tracingSDK.getTracer("trigger-dev-worker", VERSION); + const otelLogger: Logger = tracingSDK.getLogger("trigger-dev-worker", VERSION); - const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger }); - const consoleInterceptor = new ConsoleInterceptor( - otelLogger, - typeof config.enableConsoleLogging === "boolean" ? config.enableConsoleLogging : true, - typeof config.disableConsoleInterceptor === "boolean" ? config.disableConsoleInterceptor : false - ); + const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger }); + const consoleInterceptor = new ConsoleInterceptor( + otelLogger, + typeof config.enableConsoleLogging === "boolean" ? config.enableConsoleLogging : true, + typeof config.disableConsoleInterceptor === "boolean" + ? config.disableConsoleInterceptor + : false + ); - const configLogLevel = triggerLogLevel ?? config.logLevel ?? "info"; + const configLogLevel = triggerLogLevel ?? config.logLevel ?? "info"; - const otelTaskLogger = new OtelTaskLogger({ - logger: otelLogger, - tracer: tracer, - level: logLevels.includes(configLogLevel as any) ? (configLogLevel as LogLevel) : "info", - }); + const otelTaskLogger = new OtelTaskLogger({ + logger: otelLogger, + tracer: tracer, + level: logLevels.includes(configLogLevel as any) ? (configLogLevel as LogLevel) : "info", + }); - logger.setGlobalTaskLogger(otelTaskLogger); + logger.setGlobalTaskLogger(otelTaskLogger); - if (config.init) { - lifecycleHooks.registerGlobalInitHook({ - id: "config", - fn: config.init as AnyOnInitHookFunction, - }); - } + if (config.init) { + lifecycleHooks.registerGlobalInitHook({ + id: "config", + fn: config.init as AnyOnInitHookFunction, + }); + } - if (config.onStart) { - lifecycleHooks.registerGlobalStartHook({ - id: "config", - fn: config.onStart as AnyOnStartHookFunction, - }); - } + if (config.onStart) { + lifecycleHooks.registerGlobalStartHook({ + id: "config", + fn: config.onStart as AnyOnStartHookFunction, + }); + } - if (config.onSuccess) { - lifecycleHooks.registerGlobalSuccessHook({ - id: "config", - fn: config.onSuccess as AnyOnSuccessHookFunction, - }); - } + if (config.onSuccess) { + lifecycleHooks.registerGlobalSuccessHook({ + id: "config", + fn: config.onSuccess as AnyOnSuccessHookFunction, + }); + } - if (config.onFailure) { - lifecycleHooks.registerGlobalFailureHook({ - id: "config", - fn: config.onFailure as AnyOnFailureHookFunction, - }); - } + if (config.onFailure) { + lifecycleHooks.registerGlobalFailureHook({ + id: "config", + fn: config.onFailure as AnyOnFailureHookFunction, + }); + } - if (handleError) { - lifecycleHooks.registerGlobalCatchErrorHook({ - id: "config", - fn: handleError as AnyOnCatchErrorHookFunction, - }); + if (handleError) { + lifecycleHooks.registerGlobalCatchErrorHook({ + id: "config", + fn: handleError as AnyOnCatchErrorHookFunction, + }); + } + + return { + tracer, + tracingSDK, + consoleInterceptor, + config, + workerManifest, + }; + }); +} + +let bootstrapCache: + | { + tracer: TriggerTracer; + tracingSDK: TracingSDK; + consoleInterceptor: ConsoleInterceptor; + config: TriggerConfig; + workerManifest: WorkerManifest; + } + | undefined; + +async function bootstrap() { + if (!bootstrapCache) { + bootstrapCache = await doBootstrap(); } - return { - tracer, - tracingSDK, - consoleInterceptor, - config, - workerManifest, - }; + return bootstrapCache; } let _execution: TaskRunExecution | undefined; @@ -236,7 +263,33 @@ let _isRunning = false; let _isCancelled = false; let _tracingSDK: TracingSDK | undefined; let _executionMeasurement: UsageMeasurement | undefined; -const cancelController = new AbortController(); +let _cancelController = new AbortController(); +let _lastFlushPromise: Promise | undefined; +let _sharedWorkerRuntime: SharedRuntimeManager | undefined; + +function resetExecutionEnvironment() { + _execution = undefined; + _isRunning = false; + _isCancelled = false; + _executionMeasurement = undefined; + _cancelController = new AbortController(); + + standardLocalsManager.reset(); + standardLifecycleHooksManager.reset(); + standardRunTimelineMetricsManager.reset(); + usage.reset(); + timeout.reset(); + runMetadataManager.reset(); + waitUntilManager.reset(); + _sharedWorkerRuntime?.reset(); + durableClock.reset(); + taskContext.disable(); + + console.log(`[${new Date().toISOString()}] Reset execution environment`); +} + +let _lastEnv: Record | undefined; +let _executionCount = 0; const zodIpc = new ZodIpcConnection({ listenSchema: WorkerToExecutorMessageCatalog, @@ -250,16 +303,35 @@ const zodIpc = new ZodIpcConnection({ if (env) { populateEnv(env, { override: true, + previousEnv: _lastEnv, }); + + _lastEnv = env; } + console.log( + `[${new Date().toISOString()}] Received EXECUTE_TASK_RUN isWarmStart ${String(isWarmStart)}` + ); + + if (_lastFlushPromise) { + const now = performance.now(); + + await _lastFlushPromise; + + const duration = performance.now() - now; + + console.log(`[${new Date().toISOString()}] Awaited last flush in ${duration}ms`); + } + + resetExecutionEnvironment(); + initializeUsageManager({ usageIntervalMs: getEnvVar("USAGE_HEARTBEAT_INTERVAL_MS"), usageEventUrl: getEnvVar("USAGE_EVENT_URL"), triggerJWT: getEnvVar("TRIGGER_JWT"), }); - standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics); + standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution); @@ -316,66 +388,74 @@ const zodIpc = new ZodIpcConnection({ return; } - try { - await runTimelineMetrics.measureMetric( - "trigger.dev/start", - "import", - { - entryPoint: taskManifest.entryPoint, - file: taskManifest.filePath, - }, - async () => { - const beforeImport = performance.now(); - resourceCatalog.setCurrentFileContext(taskManifest.entryPoint, taskManifest.filePath); - - // Load init file if it exists - if (workerManifest.initEntryPoint) { - try { - await import(normalizeImportPath(workerManifest.initEntryPoint)); - console.log(`Loaded init file from ${workerManifest.initEntryPoint}`); - } catch (err) { - console.error(`Failed to load init file`, err); - throw err; + // Import the task module + let task = resourceCatalog.getTask(execution.task.id); + + if (!task) { + try { + await runTimelineMetrics.measureMetric( + "trigger.dev/start", + "import", + { + entryPoint: taskManifest.entryPoint, + file: taskManifest.filePath, + }, + async () => { + const beforeImport = performance.now(); + resourceCatalog.setCurrentFileContext( + taskManifest.entryPoint, + taskManifest.filePath + ); + + // Load init file if it exists + if (workerManifest.initEntryPoint) { + try { + await import(normalizeImportPath(workerManifest.initEntryPoint)); + console.log(`Loaded init file from ${workerManifest.initEntryPoint}`); + } catch (err) { + console.error(`Failed to load init file`, err); + throw err; + } } - } - await import(normalizeImportPath(taskManifest.entryPoint)); - resourceCatalog.clearCurrentFileContext(); - const durationMs = performance.now() - beforeImport; + await import(normalizeImportPath(taskManifest.entryPoint)); + resourceCatalog.clearCurrentFileContext(); + const durationMs = performance.now() - beforeImport; - console.log( - `Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms` - ); - } - ); - } catch (err) { - console.error(`Failed to import task ${execution.task.id}`, err); + console.log( + `Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms` + ); + } + ); + } catch (err) { + console.error(`Failed to import task ${execution.task.id}`, err); - await sender.send("TASK_RUN_COMPLETED", { - execution, - result: { - ok: false, - id: execution.run.id, - error: { - type: "INTERNAL_ERROR", - code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK, - message: err instanceof Error ? err.message : String(err), - stackTrace: err instanceof Error ? err.stack : undefined, - }, - usage: { - durationMs: 0, + await sender.send("TASK_RUN_COMPLETED", { + execution, + result: { + ok: false, + id: execution.run.id, + error: { + type: "INTERNAL_ERROR", + code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK, + message: err instanceof Error ? err.message : String(err), + stackTrace: err instanceof Error ? err.stack : undefined, + }, + usage: { + durationMs: 0, + }, + metadata: runMetadataManager.stopAndReturnLastFlush(), }, - metadata: runMetadataManager.stopAndReturnLastFlush(), - }, - }); + }); - return; - } + return; + } - process.title = `trigger-dev-worker: ${execution.task.id} ${execution.run.id}`; + process.title = `trigger-dev-worker: ${execution.task.id} ${execution.run.id}`; - // Import the task module - const task = resourceCatalog.getTask(execution.task.id); + // Now try and get the task again + task = resourceCatalog.getTask(execution.task.id); + } if (!task) { console.error(`Could not find task ${execution.task.id}`); @@ -400,6 +480,7 @@ const zodIpc = new ZodIpcConnection({ } runMetadataManager.runId = execution.run.id; + _executionCount++; const executor = new TaskExecutor(task, { tracer, @@ -407,6 +488,7 @@ const zodIpc = new ZodIpcConnection({ consoleInterceptor, retries: config.retries, isWarmStart, + executionCount: _executionCount, }); try { @@ -421,7 +503,7 @@ const zodIpc = new ZodIpcConnection({ const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration); - const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]); + const signal = AbortSignal.any([_cancelController.signal, timeoutController.signal]); const { result } = await executor.execute(execution, metadata, traceContext, signal); @@ -442,6 +524,8 @@ const zodIpc = new ZodIpcConnection({ } finally { _execution = undefined; _isRunning = false; + + console.log(`[${new Date().toISOString()}] Task run completed`); } } catch (err) { console.error("Failed to execute task", err); @@ -467,7 +551,7 @@ const zodIpc = new ZodIpcConnection({ }, CANCEL: async ({ timeoutInMs }) => { _isCancelled = true; - cancelController.abort("run cancelled"); + _cancelController.abort("run cancelled"); await callCancelHooks(timeoutInMs); if (_executionMeasurement) { usage.stop(_executionMeasurement); @@ -478,7 +562,7 @@ const zodIpc = new ZodIpcConnection({ await flushAll(timeoutInMs); }, RESOLVE_WAITPOINT: async ({ waitpoint }) => { - sharedWorkerRuntime.resolveWaitpoints([waitpoint]); + _sharedWorkerRuntime?.resolveWaitpoints([waitpoint]); }, }, }); @@ -498,6 +582,10 @@ async function callCancelHooks(timeoutInMs: number = 10_000) { async function flushAll(timeoutInMs: number = 10_000) { const now = performance.now(); + const { promise, resolve } = promiseWithResolvers(); + + _lastFlushPromise = promise; + const results = await Promise.allSettled([ flushUsage(timeoutInMs), flushTracingSDK(timeoutInMs), @@ -530,6 +618,9 @@ async function flushAll(timeoutInMs: number = 10_000) { const duration = performance.now() - now; console.log(`Flushed all in ${duration}ms`); + + // Resolve the last flush promise + resolve(); } async function flushUsage(timeoutInMs: number = 10_000) { @@ -597,9 +688,8 @@ function initializeUsageManager({ timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager)); } -const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, true); - -runtime.setGlobalRuntimeManager(sharedWorkerRuntime); +_sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, true); +runtime.setGlobalRuntimeManager(_sharedWorkerRuntime); process.title = "trigger-managed-worker"; diff --git a/packages/cli-v3/src/entryPoints/managed/controller.ts b/packages/cli-v3/src/entryPoints/managed/controller.ts index a79b670354..2f9542092b 100644 --- a/packages/cli-v3/src/entryPoints/managed/controller.ts +++ b/packages/cli-v3/src/entryPoints/managed/controller.ts @@ -11,6 +11,7 @@ import { RunnerEnv } from "./env.js"; import { ManagedRunLogger, RunLogger, SendDebugLogOptions } from "./logger.js"; import { EnvObject } from "std-env"; import { RunExecution } from "./execution.js"; +import { TaskRunProcessProvider } from "./taskRunProcessProvider.js"; import { tryCatch } from "@trigger.dev/core/utils"; type ManagedRunControllerOptions = { @@ -27,6 +28,7 @@ export class ManagedRunController { private readonly warmStartClient: WarmStartClient | undefined; private socket: SupervisorSocket; private readonly logger: RunLogger; + private readonly taskRunProcessProvider: TaskRunProcessProvider; private warmStartCount = 0; private restoreCount = 0; @@ -36,11 +38,17 @@ export class ManagedRunController { private currentExecution: RunExecution | null = null; + private processKeepAliveEnabled: boolean; + private processKeepAliveMaxExecutionCount: number; + constructor(opts: ManagedRunControllerOptions) { const env = new RunnerEnv(opts.env); this.env = env; this.workerManifest = opts.workerManifest; + this.processKeepAliveEnabled = opts.workerManifest.processKeepAlive?.enabled ?? false; + this.processKeepAliveMaxExecutionCount = + opts.workerManifest.processKeepAlive?.maxExecutionsPerProcess ?? 100; this.httpClient = new WorkloadHttpClient({ workerApiUrl: this.workerApiUrl, @@ -55,6 +63,15 @@ export class ManagedRunController { env, }); + // Create the TaskRunProcessProvider + this.taskRunProcessProvider = new TaskRunProcessProvider({ + workerManifest: this.workerManifest, + env: this.env, + logger: this.logger, + processKeepAliveEnabled: this.processKeepAliveEnabled, + processKeepAliveMaxExecutionCount: this.processKeepAliveMaxExecutionCount, + }); + const properties = { ...env.raw, TRIGGER_POD_SCHEDULED_AT_MS: env.TRIGGER_POD_SCHEDULED_AT_MS.toISOString(), @@ -96,6 +113,7 @@ export class ManagedRunController { restoreCount: this.restoreCount, notificationCount: this.notificationCount, lastNotificationAt: this.lastNotificationAt, + ...this.taskRunProcessProvider.metrics, }; } @@ -189,7 +207,15 @@ export class ManagedRunController { runId: runFriendlyId, message: "killing existing execution before starting new run", }); - await this.currentExecution.kill().catch(() => {}); + + await this.currentExecution.shutdown().catch((error) => { + this.sendDebugLog({ + runId: runFriendlyId, + message: "Error during execution shutdown", + properties: { error: error instanceof Error ? error.message : String(error) }, + }); + }); + this.currentExecution = null; } @@ -203,6 +229,7 @@ export class ManagedRunController { httpClient: this.httpClient, logger: this.logger, supervisorSocket: this.socket, + taskRunProcessProvider: this.taskRunProcessProvider, }); } @@ -298,7 +325,10 @@ export class ManagedRunController { httpClient: this.httpClient, logger: this.logger, supervisorSocket: this.socket, - }).prepareForExecution({ + taskRunProcessProvider: this.taskRunProcessProvider, + }); + + await this.currentExecution.prepareForExecution({ taskRunEnv: previousTaskRunEnv, }); } @@ -395,6 +425,7 @@ export class ManagedRunController { }); this.currentExecution?.kill().catch(() => {}); + this.taskRunProcessProvider.cleanup().catch(() => {}); process.exit(code); } @@ -534,6 +565,17 @@ export class ManagedRunController { }); } + // Cleanup the task run process provider + const [cleanupError] = await tryCatch(this.taskRunProcessProvider.cleanup()); + + if (cleanupError) { + this.sendDebugLog({ + runId: this.runFriendlyId, + message: "Error during task run process provider cleanup", + properties: { error: String(cleanupError) }, + }); + } + // Close the socket this.socket.close(); } diff --git a/packages/cli-v3/src/entryPoints/managed/execution.ts b/packages/cli-v3/src/entryPoints/managed/execution.ts index 1a29190d75..17d945499c 100644 --- a/packages/cli-v3/src/entryPoints/managed/execution.ts +++ b/packages/cli-v3/src/entryPoints/managed/execution.ts @@ -22,6 +22,7 @@ import { randomBytes } from "node:crypto"; import { SnapshotManager, SnapshotState } from "./snapshot.js"; import type { SupervisorSocket } from "./controller.js"; import { RunNotifier } from "./notifier.js"; +import { TaskRunProcessProvider } from "./taskRunProcessProvider.js"; class ExecutionAbortError extends Error { constructor(message: string) { @@ -36,6 +37,7 @@ type RunExecutionOptions = { httpClient: WorkloadHttpClient; logger: RunLogger; supervisorSocket: SupervisorSocket; + taskRunProcessProvider: TaskRunProcessProvider; }; type RunExecutionPrepareOptions = { @@ -77,6 +79,7 @@ export class RunExecution { private supervisorSocket: SupervisorSocket; private notifier?: RunNotifier; private metadataClient?: MetadataClient; + private taskRunProcessProvider: TaskRunProcessProvider; constructor(opts: RunExecutionOptions) { this.id = randomBytes(4).toString("hex"); @@ -85,6 +88,7 @@ export class RunExecution { this.httpClient = opts.httpClient; this.logger = opts.logger; this.supervisorSocket = opts.supervisorSocket; + this.taskRunProcessProvider = opts.taskRunProcessProvider; this.restoreCount = 0; this.executionAbortController = new AbortController(); @@ -111,18 +115,28 @@ export class RunExecution { * Kills the current execution. */ public async kill({ exitExecution = true }: { exitExecution?: boolean } = {}) { - await this.taskRunProcess?.kill("SIGKILL"); + if (this.taskRunProcess) { + await this.taskRunProcessProvider.handleProcessAbort(this.taskRunProcess); + } if (exitExecution) { - this.shutdown("kill"); + this.shutdownExecution("kill"); + } + } + + public async shutdown() { + if (this.taskRunProcess) { + await this.taskRunProcessProvider.handleProcessAbort(this.taskRunProcess); } + + this.shutdownExecution("shutdown"); } /** * Prepares the execution with task run environment variables. * This should be called before executing, typically after a successful run to prepare for the next one. */ - public prepareForExecution(opts: RunExecutionPrepareOptions): this { + public async prepareForExecution(opts: RunExecutionPrepareOptions) { if (this.isShuttingDown) { throw new Error("prepareForExecution called after execution shut down"); } @@ -131,40 +145,14 @@ export class RunExecution { throw new Error("prepareForExecution called after process was already created"); } - this.taskRunProcess = this.createTaskRunProcess({ - envVars: opts.taskRunEnv, + this.taskRunProcess = await this.taskRunProcessProvider.getProcess({ + taskRunEnv: opts.taskRunEnv, isWarmStart: true, }); - - return this; } - private createTaskRunProcess({ - envVars, - isWarmStart, - }: { - envVars: Record; - isWarmStart?: boolean; - }) { - const taskRunProcess = new TaskRunProcess({ - workerManifest: this.workerManifest, - env: { - ...envVars, - ...this.env.gatherProcessEnv(), - HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000), - }, - serverWorker: { - id: "managed", - contentHash: this.env.TRIGGER_CONTENT_HASH, - version: this.env.TRIGGER_DEPLOYMENT_VERSION, - engine: "V2", - }, - machineResources: { - cpu: Number(this.env.TRIGGER_MACHINE_CPU), - memory: Number(this.env.TRIGGER_MACHINE_MEMORY), - }, - isWarmStart, - }).initialize(); + private attachTaskRunProcessHandlers(taskRunProcess: TaskRunProcess): void { + taskRunProcess.unsafeDetachEvtHandlers(); taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => { if (!this.runFriendlyId) { @@ -194,20 +182,23 @@ export class RunExecution { taskRunProcess.onSetSuspendable.attach(async ({ suspendable }) => { this.suspendable = suspendable; }); - - return taskRunProcess; } /** - * Returns true if no run has been started yet and the process is prepared for the next run. + * Returns true if no run has been started yet and we're prepared for the next run. */ get canExecute(): boolean { + if (this.taskRunProcessProvider.hasPersistentProcess) { + return true; + } + // If we've ever had a run ID, this execution can't be reused if (this._runFriendlyId) { return false; } - return !!this.taskRunProcess?.isPreparedForNextRun; + // We can execute if we have the task run environment ready + return !!this.currentTaskRunEnv; } /** @@ -249,7 +240,10 @@ export class RunExecution { if (this.currentAttemptNumber && this.currentAttemptNumber !== run.attemptNumber) { this.sendDebugLog("error: attempt number mismatch", snapshotMetadata); // This is a rogue execution, a new one will already have been created elsewhere - await this.exitTaskRunProcessWithoutFailingRun({ flush: false }); + await this.exitTaskRunProcessWithoutFailingRun({ + flush: false, + reason: "attempt number mismatch", + }); return; } @@ -263,7 +257,10 @@ export class RunExecution { if (deprecated) { this.sendDebugLog("run execution is deprecated", { incomingSnapshot: snapshot }); - await this.exitTaskRunProcessWithoutFailingRun({ flush: false }); + await this.exitTaskRunProcessWithoutFailingRun({ + flush: false, + reason: "deprecated execution", + }); return; } @@ -286,13 +283,13 @@ export class RunExecution { case "QUEUED": { this.sendDebugLog("run was re-queued", snapshotMetadata); - await this.exitTaskRunProcessWithoutFailingRun({ flush: true }); + await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "re-queued" }); return; } case "FINISHED": { this.sendDebugLog("run is finished", snapshotMetadata); - await this.exitTaskRunProcessWithoutFailingRun({ flush: true }); + // This can sometimes be called before the handleCompletionResult, so we don't need to do anything here return; } case "QUEUED_EXECUTING": @@ -307,7 +304,7 @@ export class RunExecution { // This will kill the process and fail the execution with a SuspendedProcessError // We don't flush because we already did before suspending - await this.exitTaskRunProcessWithoutFailingRun({ flush: false }); + await this.exitTaskRunProcessWithoutFailingRun({ flush: false, reason: "suspended" }); return; } case "PENDING_EXECUTING": { @@ -377,7 +374,7 @@ export class RunExecution { throw new Error("Cannot start attempt: missing run or snapshot manager"); } - this.sendDebugLog("starting attempt"); + this.sendDebugLog("starting attempt", { isWarmStart: String(isWarmStart) }); const attemptStartedAt = Date.now(); @@ -422,7 +419,7 @@ export class RunExecution { podScheduledAt: this.podScheduledAt?.getTime(), }); - this.sendDebugLog("started attempt"); + this.sendDebugLog("started attempt", { start: start.data }); return { ...start.data, metrics }; } @@ -478,21 +475,23 @@ export class RunExecution { if (startError) { this.sendDebugLog("failed to start attempt", { error: startError.message }); - this.shutdown("failed to start attempt"); + this.shutdownExecution("failed to start attempt"); return; } - const [executeError] = await tryCatch(this.executeRunWrapper(start)); + const [executeError] = await tryCatch( + this.executeRunWrapper({ ...start, isWarmStart: runOpts.isWarmStart }) + ); if (executeError) { this.sendDebugLog("failed to execute run", { error: executeError.message }); - this.shutdown("failed to execute run"); + this.shutdownExecution("failed to execute run"); return; } // This is here for safety, but it - this.shutdown("execute call finished"); + this.shutdownExecution("execute call finished"); } private async executeRunWrapper({ @@ -502,9 +501,11 @@ export class RunExecution { execution, metrics, isWarmStart, + isImmediateRetry, }: WorkloadRunAttemptStartResponseBody & { metrics: TaskRunExecutionMetrics; isWarmStart?: boolean; + isImmediateRetry?: boolean; }) { this.currentTaskRunEnv = envVars; @@ -516,6 +517,7 @@ export class RunExecution { execution, metrics, isWarmStart, + isImmediateRetry, }) ); @@ -570,38 +572,39 @@ export class RunExecution { execution, metrics, isWarmStart, + isImmediateRetry, }: WorkloadRunAttemptStartResponseBody & { metrics: TaskRunExecutionMetrics; isWarmStart?: boolean; + isImmediateRetry?: boolean; }) { - // For immediate retries, we need to ensure the task run process is prepared for the next attempt - if ( - this.runFriendlyId && - this.taskRunProcess && - !this.taskRunProcess.isPreparedForNextAttempt - ) { - this.sendDebugLog("killing existing task run process before executing next attempt"); - await this.kill({ exitExecution: false }).catch(() => {}); + if (isImmediateRetry) { + await this.taskRunProcessProvider.handleImmediateRetry(); } - // To skip this step and eagerly create the task run process, run prepareForExecution first - if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) { - this.taskRunProcess = this.createTaskRunProcess({ - envVars: { ...envVars, TRIGGER_PROJECT_REF: execution.project.ref }, - isWarmStart, - }); - } + const taskRunEnv = this.currentTaskRunEnv ?? envVars; + + this.taskRunProcess = await this.taskRunProcessProvider.getProcess({ + taskRunEnv: { ...taskRunEnv, TRIGGER_PROJECT_REF: execution.project.ref }, + isWarmStart, + }); + + this.attachTaskRunProcessHandlers(this.taskRunProcess); this.sendDebugLog("executing task run process", { runId: execution.run.id }); - // Set up an abort handler that will cleanup the task run process - this.executionAbortController.signal.addEventListener("abort", async () => { + const abortHandler = async () => { this.sendDebugLog("execution aborted during task run, cleaning up process", { runId: execution.run.id, }); - await this.taskRunProcess?.cleanup(true); - }); + if (this.taskRunProcess) { + await this.taskRunProcessProvider.handleProcessAbort(this.taskRunProcess); + } + }; + + // Set up an abort handler that will cleanup the task run process + this.executionAbortController.signal.addEventListener("abort", abortHandler); const completion = await this.taskRunProcess.execute( { @@ -616,15 +619,19 @@ export class RunExecution { isWarmStart ); + this.executionAbortController.signal.removeEventListener("abort", abortHandler); + // If we get here, the task completed normally this.sendDebugLog("completed run attempt", { attemptSuccess: completion.ok }); - // The execution has finished, so we can cleanup the task run process. Killing it should be safe. - const [error] = await tryCatch(this.taskRunProcess.cleanup(true)); + // Return the process to the provider - this handles all cleanup logic + const [returnError] = await tryCatch( + this.taskRunProcessProvider.returnProcess(this.taskRunProcess) + ); - if (error) { - this.sendDebugLog("failed to cleanup task run process, submitting completion anyway", { - error: error.message, + if (returnError) { + this.sendDebugLog("failed to return task run process, submitting completion anyway", { + error: returnError.message, }); } @@ -785,16 +792,18 @@ export class RunExecution { if (startError) { this.sendDebugLog("failed to start attempt for retry", { error: startError.message }); - this.shutdown("retryImmediately: failed to start attempt"); + this.shutdownExecution("retryImmediately: failed to start attempt"); return; } - const [executeError] = await tryCatch(this.executeRunWrapper({ ...start, isWarmStart: true })); + const [executeError] = await tryCatch( + this.executeRunWrapper({ ...start, isWarmStart: true, isImmediateRetry: true }) + ); if (executeError) { this.sendDebugLog("failed to execute run for retry", { error: executeError.message }); - this.shutdown("retryImmediately: failed to execute run"); + this.shutdownExecution("retryImmediately: failed to execute run"); return; } } @@ -828,11 +837,17 @@ export class RunExecution { this.restoreCount++; } - private async exitTaskRunProcessWithoutFailingRun({ flush }: { flush: boolean }) { - await this.taskRunProcess?.suspend({ flush }); + private async exitTaskRunProcessWithoutFailingRun({ + flush, + reason, + }: { + flush: boolean; + reason: string; + }) { + await this.taskRunProcessProvider.suspendProcess(flush, this.taskRunProcess); // No services should be left running after this line - let's make sure of it - this.shutdown("exitTaskRunProcessWithoutFailingRun"); + this.shutdownExecution(`exitTaskRunProcessWithoutFailingRun: ${reason}`); } /** @@ -1003,10 +1018,10 @@ export class RunExecution { } this.executionAbortController.abort(); - this.shutdown("abortExecution"); + this.shutdownExecution("abortExecution"); } - private shutdown(reason: string) { + private shutdownExecution(reason: string) { if (this.isShuttingDown) { this.sendDebugLog(`[shutdown] ${reason} (already shutting down)`, { firstShutdownReason: this.shutdownReason, diff --git a/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts b/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts new file mode 100644 index 0000000000..f039ba90d8 --- /dev/null +++ b/packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts @@ -0,0 +1,317 @@ +import { WorkerManifest } from "@trigger.dev/core/v3"; +import { TaskRunProcess } from "../../executions/taskRunProcess.js"; +import { RunnerEnv } from "./env.js"; +import { RunLogger, SendDebugLogOptions } from "./logger.js"; + +export interface TaskRunProcessProviderOptions { + workerManifest: WorkerManifest; + env: RunnerEnv; + logger: RunLogger; + processKeepAliveEnabled: boolean; + processKeepAliveMaxExecutionCount: number; +} + +export interface GetProcessOptions { + taskRunEnv: Record; + isWarmStart?: boolean; +} + +export class TaskRunProcessProvider { + private readonly workerManifest: WorkerManifest; + private readonly env: RunnerEnv; + private readonly logger: RunLogger; + private readonly processKeepAliveEnabled: boolean; + private readonly processKeepAliveMaxExecutionCount: number; + + // Process keep-alive state + private persistentProcess: TaskRunProcess | null = null; + private executionCount = 0; + + constructor(opts: TaskRunProcessProviderOptions) { + this.workerManifest = opts.workerManifest; + this.env = opts.env; + this.logger = opts.logger; + this.processKeepAliveEnabled = opts.processKeepAliveEnabled; + this.processKeepAliveMaxExecutionCount = opts.processKeepAliveMaxExecutionCount; + } + + get hasPersistentProcess(): boolean { + return !!this.persistentProcess; + } + + async handleImmediateRetry(): Promise { + if (!this.processKeepAliveEnabled) { + // For immediate retries, we need to ensure we have a clean process + if (this.persistentProcess) { + // If the process is not prepared for the next attempt, we need to get a fresh one + if (!this.persistentProcess.isPreparedForNextAttempt) { + this.sendDebugLog( + "existing task run process not prepared for retry, will get fresh process" + ); + await this.persistentProcess.kill("SIGKILL"); + this.persistentProcess = null; + } + } + } + } + + /** + * Gets a TaskRunProcess, either by reusing an existing one or creating a new one + */ + async getProcess(opts: GetProcessOptions): Promise { + this.sendDebugLog("Getting TaskRunProcess", { + processKeepAliveEnabled: this.processKeepAliveEnabled, + hasPersistentProcess: !!this.persistentProcess, + executionCount: this.executionCount, + maxExecutionCount: this.processKeepAliveMaxExecutionCount, + isWarmStart: opts.isWarmStart, + }); + + // If process keep-alive is disabled, always create a new process + if (!this.processKeepAliveEnabled) { + this.sendDebugLog("Creating new TaskRunProcess (keep-alive disabled)"); + return this.createTaskRunProcess(opts); + } + + // If process keep-alive is enabled and we have a healthy persistent process, reuse it + if (this.shouldReusePersistentProcess()) { + this.sendDebugLog("Reusing persistent TaskRunProcess", { + executionCount: this.executionCount, + }); + + return this.persistentProcess!; + } + + // Create new process (keep-alive enabled but no reusable process available) + this.sendDebugLog("Creating new TaskRunProcess", { + hadPersistentProcess: !!this.persistentProcess, + reason: this.processKeepAliveEnabled + ? "execution limit reached or unhealthy" + : "keep-alive disabled", + }); + + const existingPersistentProcess = this.persistentProcess; + + // Clean up old persistent process if it exists + if (existingPersistentProcess) { + await this.cleanupProcess(existingPersistentProcess); + } + + this.persistentProcess = this.createTaskRunProcess(opts); + this.executionCount = 0; + + return this.persistentProcess; + } + + /** + * Returns a process after execution, handling keep-alive logic and cleanup + */ + async returnProcess(process: TaskRunProcess): Promise { + this.sendDebugLog("Returning TaskRunProcess", { + processKeepAliveEnabled: this.processKeepAliveEnabled, + executionCount: this.executionCount, + maxExecutionCount: this.processKeepAliveMaxExecutionCount, + }); + + if (!this.processKeepAliveEnabled) { + // Keep-alive disabled - immediately cleanup the process + this.sendDebugLog("Keep-alive disabled, cleaning up process immediately"); + await process.cleanup(true); + return; + } + + // Keep-alive enabled - check if we should keep the process alive + if (this.shouldKeepProcessAlive(process)) { + this.sendDebugLog("Keeping TaskRunProcess alive for next run", { + executionCount: this.executionCount, + maxExecutionCount: this.processKeepAliveMaxExecutionCount, + }); + + // Call cleanup(false) to prepare for next run but keep process alive + await process.cleanup(false); + this.persistentProcess = process; + this.executionCount++; + } else { + this.sendDebugLog("Not keeping TaskRunProcess alive, cleaning up", { + executionCount: this.executionCount, + maxExecutionCount: this.processKeepAliveMaxExecutionCount, + isHealthy: this.isProcessHealthy(process), + }); + + // Cleanup the process completely + await process.cleanup(true); + } + } + + async suspendProcess(flush: boolean, process?: TaskRunProcess): Promise { + if (this.persistentProcess) { + if (process) { + if (this.persistentProcess.pid === process.pid) { + this.sendDebugLog("Suspending matching persistent TaskRunProcess (process provided)", { + pid: process.pid, + flush, + }); + + this.persistentProcess = null; + this.executionCount = 0; + + await process.suspend({ flush }); + } else { + this.sendDebugLog("Suspending TaskRunProcess (does not match persistent process)", { + pid: process.pid, + flush, + }); + + await process.suspend({ flush }); + } + } else { + this.sendDebugLog("Suspending persistent TaskRunProcess (no process provided)", { + pid: this.persistentProcess.pid, + flush, + }); + + this.persistentProcess = null; + this.executionCount = 0; + } + } else { + if (process) { + this.sendDebugLog("Suspending non-persistent TaskRunProcess (process provided)", { + pid: process.pid, + flush, + }); + + await process.suspend({ flush }); + } else { + this.sendDebugLog("Suspending non-persistent TaskRunProcess (no process provided)", { + flush, + }); + } + } + } + + /** + * Handles process abort/kill scenarios + */ + async handleProcessAbort(process: TaskRunProcess): Promise { + this.sendDebugLog("Handling process abort"); + + // If this was our persistent process, clear it + if (this.persistentProcess?.pid === process.pid) { + this.persistentProcess = null; + this.executionCount = 0; + } + + // Kill the process + await process.cleanup(true); + } + + async killProcess(process: TaskRunProcess): Promise { + this.sendDebugLog("Killing process"); + + // If this was our persistent process, clear it + if (this.persistentProcess?.pid === process.pid) { + this.persistentProcess = null; + this.executionCount = 0; + } + + // Kill the process + await this.cleanupProcess(process); + } + + /** + * Forces cleanup of any persistent process + */ + async cleanup() { + if (this.persistentProcess) { + this.sendDebugLog("cleanup() called"); + + await this.cleanupProcess(this.persistentProcess); + } + } + + /** + * Gets metrics about the provider state + */ + get metrics() { + return { + processKeepAlive: { + enabled: this.processKeepAliveEnabled, + executionCount: this.executionCount, + maxExecutionCount: this.processKeepAliveMaxExecutionCount, + hasPersistentProcess: !!this.persistentProcess, + }, + }; + } + + private createTaskRunProcess({ taskRunEnv, isWarmStart }: GetProcessOptions): TaskRunProcess { + const processEnv = this.buildProcessEnvironment(taskRunEnv); + + const taskRunProcess = new TaskRunProcess({ + workerManifest: this.workerManifest, + env: processEnv, + serverWorker: { + id: "managed", + contentHash: this.env.TRIGGER_CONTENT_HASH, + version: this.env.TRIGGER_DEPLOYMENT_VERSION, + engine: "V2", + }, + machineResources: { + cpu: Number(this.env.TRIGGER_MACHINE_CPU), + memory: Number(this.env.TRIGGER_MACHINE_MEMORY), + }, + isWarmStart, + }).initialize(); + + return taskRunProcess; + } + + private buildProcessEnvironment(taskRunEnv: Record): Record { + return { + ...taskRunEnv, + ...this.env.gatherProcessEnv(), + HEARTBEAT_INTERVAL_MS: String(this.env.TRIGGER_HEARTBEAT_INTERVAL_SECONDS * 1000), + }; + } + + private shouldReusePersistentProcess(): boolean { + this.sendDebugLog("Checking if persistent process should be reused", { + executionCount: this.executionCount, + maxExecutionCount: this.processKeepAliveMaxExecutionCount, + pid: this.persistentProcess?.pid ?? "unknown", + isBeingKilled: this.persistentProcess?.isBeingKilled ?? "unknown", + }); + + return ( + !!this.persistentProcess && + this.executionCount < this.processKeepAliveMaxExecutionCount && + this.isProcessHealthy(this.persistentProcess) + ); + } + + private shouldKeepProcessAlive(process: TaskRunProcess): boolean { + return ( + this.executionCount < this.processKeepAliveMaxExecutionCount && this.isProcessHealthy(process) + ); + } + + private isProcessHealthy(process: TaskRunProcess): boolean { + // Basic health check - TaskRunProcess will handle more detailed internal health checks + return !process.isBeingKilled && process.pid !== undefined; + } + + private async cleanupProcess(taskRunProcess: TaskRunProcess): Promise { + if (taskRunProcess && taskRunProcess.pid !== undefined) { + this.sendDebugLog("Cleaning up TaskRunProcess", { pid: taskRunProcess.pid }); + + await taskRunProcess.kill("SIGKILL").catch(() => {}); + } + } + + private sendDebugLog(message: string, properties?: SendDebugLogOptions["properties"]): void { + this.logger.sendDebugLog({ + runId: undefined, // Provider doesn't have access to current run ID + message: `[taskRunProcessProvider] ${message}`, + properties, + }); + } +} diff --git a/packages/cli-v3/src/executions/taskRunProcess.ts b/packages/cli-v3/src/executions/taskRunProcess.ts index 6821fbdfc9..16ac7beb4e 100644 --- a/packages/cli-v3/src/executions/taskRunProcess.ts +++ b/packages/cli-v3/src/executions/taskRunProcess.ts @@ -78,7 +78,6 @@ export class TaskRunProcess { public onTaskRunHeartbeat: Evt = new Evt(); public onExit: Evt<{ code: number | null; signal: NodeJS.Signals | null; pid?: number }> = new Evt(); - public onIsBeingKilled: Evt = new Evt(); public onSendDebugLog: Evt = new Evt(); public onSetSuspendable: Evt = new Evt(); @@ -100,7 +99,6 @@ export class TaskRunProcess { unsafeDetachEvtHandlers() { this.onExit.detach(); - this.onIsBeingKilled.detach(); this.onSendDebugLog.detach(); this.onSetSuspendable.detach(); this.onTaskRunHeartbeat.detach(); @@ -150,6 +148,7 @@ export class TaskRunProcess { PATH: process.env.PATH, TRIGGER_PROCESS_FORK_START_TIME: String(Date.now()), TRIGGER_WARM_START: this.options.isWarmStart ? "true" : "false", + TRIGGERDOTDEV: "1", }; logger.debug(`initializing task run process`, { @@ -169,6 +168,12 @@ export class TaskRunProcess { this._childPid = this._child?.pid; + logger.debug("initialized task run process", { + path: workerManifest.workerEntryPoint, + cwd, + pid: this._childPid, + }); + this._ipc = new ZodIpcConnection({ listenSchema: ExecutorToWorkerMessageCatalog, emitSchema: WorkerToExecutorMessageCatalog, @@ -286,6 +291,10 @@ export class TaskRunProcess { return result; } + isExecuting() { + return this._currentExecution !== undefined; + } + waitpointCompleted(waitpoint: CompletedWaitpoint) { if (!this._child?.connected || this._isBeingKilled || this._child.killed) { console.error( @@ -298,7 +307,7 @@ export class TaskRunProcess { } async #handleExit(code: number | null, signal: NodeJS.Signals | null) { - logger.debug("handling child exit", { code, signal }); + logger.debug("handling child exit", { code, signal, pid: this.pid }); // Go through all the attempts currently pending and reject them for (const [id, status] of this._attemptStatuses.entries()) { @@ -398,8 +407,6 @@ export class TaskRunProcess { const killTimeout = this.onExit.waitFor(timeoutInMs); - this.onIsBeingKilled.post(this); - try { this._child?.kill(signal); } catch (error) { diff --git a/packages/core/src/v3/config.ts b/packages/core/src/v3/config.ts index 2c828df686..061b40fc40 100644 --- a/packages/core/src/v3/config.ts +++ b/packages/core/src/v3/config.ts @@ -234,6 +234,31 @@ export type TriggerConfig = { env?: Record; }; + /** + * @default false + * @description Keep the process alive after the task has finished running so the next task doesn't have to wait for the process to start up again. + * + * Note that the process could be killed at any time, and we don't make any guarantees about the process being alive for a certain amount of time + */ + experimental_processKeepAlive?: + | boolean + | { + enabled: boolean; + /** + * The maximum number of executions per process. If the process has run more than this number of times, it will be killed. + * + * @default 50 + */ + maxExecutionsPerProcess?: number; + + /** + * The maximum number of processes to keep alive in dev. + * + * @default 25 + */ + devMaxPoolSize?: number; + }; + /** * @deprecated Use `dirs` instead */ diff --git a/packages/core/src/v3/lifecycleHooks/manager.ts b/packages/core/src/v3/lifecycleHooks/manager.ts index f6cceb8d55..282d2fe16c 100644 --- a/packages/core/src/v3/lifecycleHooks/manager.ts +++ b/packages/core/src/v3/lifecycleHooks/manager.ts @@ -68,6 +68,12 @@ export class StandardLifecycleHooksManager implements LifecycleHooksManager { private taskCancelHooks: Map> = new Map(); private onCancelHookListeners: (() => Promise)[] = []; + reset(): void { + this.onCancelHookListeners.length = 0; + this.onWaitHookListeners.length = 0; + this.onResumeHookListeners.length = 0; + } + registerOnCancelHookListener(listener: () => Promise): void { this.onCancelHookListeners.push(listener); } diff --git a/packages/core/src/v3/locals/manager.ts b/packages/core/src/v3/locals/manager.ts index befe219260..6984d9f414 100644 --- a/packages/core/src/v3/locals/manager.ts +++ b/packages/core/src/v3/locals/manager.ts @@ -33,5 +33,8 @@ export class StandardLocalsManager implements LocalsManager { setLocal(key: LocalsKey, value: T): void { this.store.set(key.__type, value); } + + reset(): void { + this.store.clear(); + } } -0; diff --git a/packages/core/src/v3/otel/tracingSDK.ts b/packages/core/src/v3/otel/tracingSDK.ts index 20a79b3b1b..c2d7b3d342 100644 --- a/packages/core/src/v3/otel/tracingSDK.ts +++ b/packages/core/src/v3/otel/tracingSDK.ts @@ -57,6 +57,10 @@ class AsyncResourceDetector implements DetectorSync { }); } + get isResolved() { + return this._resolved; + } + detect(_config?: ResourceDetectionConfig): Resource { return new Resource({}, this._promise); } @@ -123,6 +127,8 @@ export class TracingSDK { getEnvVar("OTEL_SERVICE_NAME") ?? "trigger.dev", [SemanticInternalAttributes.TRIGGER]: true, [SemanticInternalAttributes.CLI_VERSION]: VERSION, + [SemanticInternalAttributes.SDK_VERSION]: VERSION, + [SemanticInternalAttributes.SDK_LANGUAGE]: "typescript", }) ) .merge(config.resource ?? new Resource({})) diff --git a/packages/core/src/v3/runMetadata/manager.ts b/packages/core/src/v3/runMetadata/manager.ts index 974d3cd2d5..7916492cd3 100644 --- a/packages/core/src/v3/runMetadata/manager.ts +++ b/packages/core/src/v3/runMetadata/manager.ts @@ -30,6 +30,22 @@ export class StandardMetadataManager implements RunMetadataManager { private streamsVersion: "v1" | "v2" = "v1" ) {} + reset(): void { + this.queuedOperations.clear(); + this.queuedParentOperations.clear(); + this.queuedRootOperations.clear(); + this.activeStreams.clear(); + this.store = undefined; + this.runId = undefined; + + if (this.flushTimeoutId) { + clearTimeout(this.flushTimeoutId); + this.flushTimeoutId = null; + } + + this.isFlushing = false; + } + get parent(): RunMetadataUpdater { // Store a reference to 'this' to ensure proper context const self = this; diff --git a/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts b/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts index d521749657..3261e47524 100644 --- a/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts +++ b/packages/core/src/v3/runTimelineMetrics/runTimelineMetricsManager.ts @@ -13,8 +13,11 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana return this._metrics; } - registerMetricsFromExecution(metrics?: TaskRunExecutionMetrics): void { - this.#seedMetricsFromEnvironment(); + registerMetricsFromExecution( + metrics?: TaskRunExecutionMetrics, + isWarmStartOverride?: boolean + ): void { + this.#seedMetricsFromEnvironment(isWarmStartOverride); if (metrics) { metrics.forEach((metric) => { @@ -30,10 +33,16 @@ export class StandardRunTimelineMetricsManager implements RunTimelineMetricsMana } } - #seedMetricsFromEnvironment() { + reset(): void { + this._metrics = []; + } + + // TODO: handle this when processKeepAlive is enabled + #seedMetricsFromEnvironment(isWarmStartOverride?: boolean) { const forkStartTime = getEnvVar("TRIGGER_PROCESS_FORK_START_TIME"); const warmStart = getEnvVar("TRIGGER_WARM_START"); - const isWarmStart = warmStart === "true"; + const isWarmStart = + typeof isWarmStartOverride === "boolean" ? isWarmStartOverride : warmStart === "true"; if (typeof forkStartTime === "string" && !isWarmStart) { const forkStartTimeMs = parseInt(forkStartTime, 10); diff --git a/packages/core/src/v3/runtime/sharedRuntimeManager.ts b/packages/core/src/v3/runtime/sharedRuntimeManager.ts index a513e3a1d6..09c718c1f6 100644 --- a/packages/core/src/v3/runtime/sharedRuntimeManager.ts +++ b/packages/core/src/v3/runtime/sharedRuntimeManager.ts @@ -41,6 +41,11 @@ export class SharedRuntimeManager implements RuntimeManager { }, 300_000); } + reset(): void { + this.resolversById.clear(); + this.waitpointsByResolverId.clear(); + } + disable(): void { // do nothing } diff --git a/packages/core/src/v3/schemas/build.ts b/packages/core/src/v3/schemas/build.ts index b116d264d0..3dc9e83dcf 100644 --- a/packages/core/src/v3/schemas/build.ts +++ b/packages/core/src/v3/schemas/build.ts @@ -92,6 +92,13 @@ export const WorkerManifest = z.object({ runtime: BuildRuntime, customConditions: z.array(z.string()).optional(), timings: z.record(z.number()).optional(), + processKeepAlive: z + .object({ + enabled: z.boolean(), + maxExecutionsPerProcess: z.number().int().positive().optional(), + }) + .optional(), + otelImportHook: z .object({ include: z.array(z.string()).optional(), diff --git a/packages/core/src/v3/schemas/schemas.ts b/packages/core/src/v3/schemas/schemas.ts index 31667a01ad..f54ee4791c 100644 --- a/packages/core/src/v3/schemas/schemas.ts +++ b/packages/core/src/v3/schemas/schemas.ts @@ -163,7 +163,7 @@ export const QueueManifest = z.object({ /** An optional property that specifies the maximum number of concurrent run executions. * * If this property is omitted, the task can potentially use up the full concurrency of an environment */ - concurrencyLimit: z.number().int().min(0).max(1000).optional().nullable(), + concurrencyLimit: z.number().int().min(0).max(100000).optional().nullable(), /** An optional property that specifies whether to release concurrency on waitpoint. * * If this property is omitted, the task will not release concurrency on waitpoint. diff --git a/packages/core/src/v3/semanticInternalAttributes.ts b/packages/core/src/v3/semanticInternalAttributes.ts index 3ff44eb518..b5e0af63a5 100644 --- a/packages/core/src/v3/semanticInternalAttributes.ts +++ b/packages/core/src/v3/semanticInternalAttributes.ts @@ -60,4 +60,5 @@ export const SemanticInternalAttributes = { METRIC_EVENTS: "$metrics.events", EXECUTION_ENVIRONMENT: "exec_env", WARM_START: "warm_start", + ATTEMPT_EXECUTION_COUNT: "$trigger.executionCount", }; diff --git a/packages/core/src/v3/taskContext/index.ts b/packages/core/src/v3/taskContext/index.ts index 77598ff6cf..a9ef58cf6e 100644 --- a/packages/core/src/v3/taskContext/index.ts +++ b/packages/core/src/v3/taskContext/index.ts @@ -47,6 +47,27 @@ export class TaskContextAPI { return {}; } + get resourceAttributes(): Attributes { + if (this.ctx) { + return { + [SemanticInternalAttributes.ENVIRONMENT_ID]: this.ctx.environment.id, + [SemanticInternalAttributes.ENVIRONMENT_TYPE]: this.ctx.environment.type, + [SemanticInternalAttributes.ORGANIZATION_ID]: this.ctx.organization.id, + [SemanticInternalAttributes.PROJECT_ID]: this.ctx.project.id, + [SemanticInternalAttributes.PROJECT_REF]: this.ctx.project.ref, + [SemanticInternalAttributes.PROJECT_NAME]: this.ctx.project.name, + [SemanticInternalAttributes.ORGANIZATION_SLUG]: this.ctx.organization.slug, + [SemanticInternalAttributes.ORGANIZATION_NAME]: this.ctx.organization.name, + [SemanticInternalAttributes.MACHINE_PRESET_NAME]: this.ctx.machine?.name, + [SemanticInternalAttributes.MACHINE_PRESET_CPU]: this.ctx.machine?.cpu, + [SemanticInternalAttributes.MACHINE_PRESET_MEMORY]: this.ctx.machine?.memory, + [SemanticInternalAttributes.MACHINE_PRESET_CENTS_PER_MS]: this.ctx.machine?.centsPerMs, + }; + } + + return {}; + } + get workerAttributes(): Attributes { if (this.worker) { return { @@ -68,22 +89,10 @@ export class TaskContextAPI { [SemanticInternalAttributes.TASK_EXPORT_NAME]: this.ctx.task.exportName, [SemanticInternalAttributes.QUEUE_NAME]: this.ctx.queue.name, [SemanticInternalAttributes.QUEUE_ID]: this.ctx.queue.id, - [SemanticInternalAttributes.ENVIRONMENT_ID]: this.ctx.environment.id, - [SemanticInternalAttributes.ENVIRONMENT_TYPE]: this.ctx.environment.type, - [SemanticInternalAttributes.ORGANIZATION_ID]: this.ctx.organization.id, - [SemanticInternalAttributes.PROJECT_ID]: this.ctx.project.id, - [SemanticInternalAttributes.PROJECT_REF]: this.ctx.project.ref, - [SemanticInternalAttributes.PROJECT_NAME]: this.ctx.project.name, [SemanticInternalAttributes.RUN_ID]: this.ctx.run.id, [SemanticInternalAttributes.RUN_IS_TEST]: this.ctx.run.isTest, - [SemanticInternalAttributes.ORGANIZATION_SLUG]: this.ctx.organization.slug, - [SemanticInternalAttributes.ORGANIZATION_NAME]: this.ctx.organization.name, [SemanticInternalAttributes.BATCH_ID]: this.ctx.batch?.id, [SemanticInternalAttributes.IDEMPOTENCY_KEY]: this.ctx.run.idempotencyKey, - [SemanticInternalAttributes.MACHINE_PRESET_NAME]: this.ctx.machine?.name, - [SemanticInternalAttributes.MACHINE_PRESET_CPU]: this.ctx.machine?.cpu, - [SemanticInternalAttributes.MACHINE_PRESET_MEMORY]: this.ctx.machine?.memory, - [SemanticInternalAttributes.MACHINE_PRESET_CENTS_PER_MS]: this.ctx.machine?.centsPerMs, }; } diff --git a/packages/core/src/v3/taskContext/otelProcessors.ts b/packages/core/src/v3/taskContext/otelProcessors.ts index ff96b34cc8..db30624ff6 100644 --- a/packages/core/src/v3/taskContext/otelProcessors.ts +++ b/packages/core/src/v3/taskContext/otelProcessors.ts @@ -19,13 +19,7 @@ export class TaskContextSpanProcessor implements SpanProcessor { onStart(span: Span, parentContext: Context): void { if (taskContext.ctx) { span.setAttributes( - flattenAttributes( - { - [SemanticInternalAttributes.ATTEMPT_ID]: taskContext.ctx.attempt.id, - [SemanticInternalAttributes.ATTEMPT_NUMBER]: taskContext.ctx.attempt.number, - }, - SemanticInternalAttributes.METADATA - ) + flattenAttributes(taskContext.attributes, SemanticInternalAttributes.METADATA) ); } @@ -75,13 +69,7 @@ function createPartialSpan(tracer: Tracer, span: Span, parentContext: Context) { if (taskContext.ctx) { partialSpan.setAttributes( - flattenAttributes( - { - [SemanticInternalAttributes.ATTEMPT_ID]: taskContext.ctx.attempt.id, - [SemanticInternalAttributes.ATTEMPT_NUMBER]: taskContext.ctx.attempt.number, - }, - SemanticInternalAttributes.METADATA - ) + flattenAttributes(taskContext.attributes, SemanticInternalAttributes.METADATA) ); } @@ -107,13 +95,7 @@ export class TaskContextLogProcessor implements LogRecordProcessor { // Adds in the context attributes to the log record if (taskContext.ctx) { logRecord.setAttributes( - flattenAttributes( - { - [SemanticInternalAttributes.ATTEMPT_ID]: taskContext.ctx.attempt.id, - [SemanticInternalAttributes.ATTEMPT_NUMBER]: taskContext.ctx.attempt.number, - }, - SemanticInternalAttributes.METADATA - ) + flattenAttributes(taskContext.attributes, SemanticInternalAttributes.METADATA) ); } diff --git a/packages/core/src/v3/timeout/api.ts b/packages/core/src/v3/timeout/api.ts index ed6bd506ec..63ddb48db3 100644 --- a/packages/core/src/v3/timeout/api.ts +++ b/packages/core/src/v3/timeout/api.ts @@ -7,6 +7,8 @@ class NoopTimeoutManager implements TimeoutManager { abortAfterTimeout(timeoutInSeconds?: number): AbortController { return new AbortController(); } + + reset() {} } const NOOP_TIMEOUT_MANAGER = new NoopTimeoutManager(); @@ -40,6 +42,11 @@ export class TimeoutAPI implements TimeoutManager { unregisterGlobal(API_NAME); } + public reset() { + this.#getManager().reset(); + this.disable(); + } + #getManager(): TimeoutManager { return getGlobal(API_NAME) ?? NOOP_TIMEOUT_MANAGER; } diff --git a/packages/core/src/v3/timeout/types.ts b/packages/core/src/v3/timeout/types.ts index 7f263bb4e2..0ea0b8fe34 100644 --- a/packages/core/src/v3/timeout/types.ts +++ b/packages/core/src/v3/timeout/types.ts @@ -1,6 +1,7 @@ export interface TimeoutManager { abortAfterTimeout: (timeoutInSeconds?: number) => AbortController; signal?: AbortSignal; + reset: () => void; } export class TaskRunExceededMaxDuration extends Error { diff --git a/packages/core/src/v3/timeout/usageTimeoutManager.ts b/packages/core/src/v3/timeout/usageTimeoutManager.ts index b90546832b..ec14ffe6cc 100644 --- a/packages/core/src/v3/timeout/usageTimeoutManager.ts +++ b/packages/core/src/v3/timeout/usageTimeoutManager.ts @@ -14,6 +14,16 @@ export class UsageTimeoutManager implements TimeoutManager { return this._abortSignal; } + reset(): void { + this._abortController = new AbortController(); + this._abortSignal = undefined; + + if (this._intervalId) { + clearInterval(this._intervalId); + this._intervalId = undefined; + } + } + abortAfterTimeout(timeoutInSeconds?: number): AbortController { this._abortSignal = this._abortController.signal; diff --git a/packages/core/src/v3/usage/api.ts b/packages/core/src/v3/usage/api.ts index 338cc8cf80..e08c76929d 100644 --- a/packages/core/src/v3/usage/api.ts +++ b/packages/core/src/v3/usage/api.ts @@ -48,6 +48,11 @@ export class UsageAPI implements UsageManager { return this.#getUsageManager().flush(); } + public reset() { + this.#getUsageManager().reset(); + this.disable(); + } + #getUsageManager(): UsageManager { return getGlobal(API_NAME) ?? NOOP_USAGE_MANAGER; } diff --git a/packages/core/src/v3/usage/devUsageManager.ts b/packages/core/src/v3/usage/devUsageManager.ts index d8baa935f7..80e1fdde21 100644 --- a/packages/core/src/v3/usage/devUsageManager.ts +++ b/packages/core/src/v3/usage/devUsageManager.ts @@ -50,6 +50,12 @@ export class DevUsageManager implements UsageManager { async flush(): Promise {} + reset(): void { + this._firstMeasurement = undefined; + this._currentMeasurements.clear(); + this._pauses.clear(); + } + sample(): UsageSample | undefined { return this._firstMeasurement?.sample(); } diff --git a/packages/core/src/v3/usage/noopUsageManager.ts b/packages/core/src/v3/usage/noopUsageManager.ts index 4369fd3ca7..9e52144466 100644 --- a/packages/core/src/v3/usage/noopUsageManager.ts +++ b/packages/core/src/v3/usage/noopUsageManager.ts @@ -26,4 +26,8 @@ export class NoopUsageManager implements UsageManager { sample(): UsageSample | undefined { return undefined; } + + reset(): void { + // Noop + } } diff --git a/packages/core/src/v3/usage/prodUsageManager.ts b/packages/core/src/v3/usage/prodUsageManager.ts index 5fe324c884..5d3d49c3d0 100644 --- a/packages/core/src/v3/usage/prodUsageManager.ts +++ b/packages/core/src/v3/usage/prodUsageManager.ts @@ -27,6 +27,15 @@ export class ProdUsageManager implements UsageManager { return typeof this._usageClient !== "undefined"; } + reset(): void { + this.delegageUsageManager.reset(); + this._abortController?.abort(); + this._abortController = new AbortController(); + this._usageClient = undefined; + this._measurement = undefined; + this._lastSample = undefined; + } + disable(): void { this.delegageUsageManager.disable(); this._abortController?.abort(); @@ -67,12 +76,18 @@ export class ProdUsageManager implements UsageManager { this._abortController = new AbortController(); - for await (const _ of setInterval(this.options.heartbeatIntervalMs)) { - if (this._abortController.signal.aborted) { - break; + try { + for await (const _ of setInterval(this.options.heartbeatIntervalMs, undefined, { + signal: this._abortController.signal, + })) { + await this.#reportUsage(); + } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + return; } - await this.#reportUsage(); + throw error; } } diff --git a/packages/core/src/v3/usage/types.ts b/packages/core/src/v3/usage/types.ts index 5b763cc499..8655950df3 100644 --- a/packages/core/src/v3/usage/types.ts +++ b/packages/core/src/v3/usage/types.ts @@ -14,4 +14,5 @@ export interface UsageManager { sample(): UsageSample | undefined; pauseAsync(cb: () => Promise): Promise; flush(): Promise; + reset(): void; } diff --git a/packages/core/src/v3/waitUntil/manager.ts b/packages/core/src/v3/waitUntil/manager.ts index b58f518abb..cca6839789 100644 --- a/packages/core/src/v3/waitUntil/manager.ts +++ b/packages/core/src/v3/waitUntil/manager.ts @@ -3,6 +3,10 @@ import { MaybeDeferredPromise, WaitUntilManager } from "./types.js"; export class StandardWaitUntilManager implements WaitUntilManager { private maybeDeferredPromises: Set = new Set(); + reset(): void { + this.maybeDeferredPromises.clear(); + } + register(promise: MaybeDeferredPromise): void { this.maybeDeferredPromises.add(promise); } diff --git a/packages/core/src/v3/workers/populateEnv.ts b/packages/core/src/v3/workers/populateEnv.ts index 68fafddf12..b21673c3fa 100644 --- a/packages/core/src/v3/workers/populateEnv.ts +++ b/packages/core/src/v3/workers/populateEnv.ts @@ -13,6 +13,12 @@ interface PopulateEnvOptions { * @default false */ debug?: boolean; + + /** + * The previous environment variables + * @default undefined + */ + previousEnv?: Record; } /** @@ -25,7 +31,7 @@ export function populateEnv( envObject: Record, options: PopulateEnvOptions = {} ): void { - const { override = false, debug = false } = options; + const { override = false, debug = false, previousEnv } = options; if (!envObject || typeof envObject !== "object") { return; @@ -47,4 +53,13 @@ export function populateEnv( process.env[key] = envObject[key]; } } + + if (previousEnv) { + // if there are any keys in previousEnv that are not in envObject, remove them from process.env + for (const key of Object.keys(previousEnv)) { + if (!Object.prototype.hasOwnProperty.call(envObject, key)) { + delete process.env[key]; + } + } + } } diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index 3e8f64ab58..54b68bbb55 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -62,6 +62,7 @@ export type TaskExecutorOptions = { default?: RetryOptions; }; isWarmStart?: boolean; + executionCount?: number; }; export class TaskExecutor { @@ -75,6 +76,7 @@ export class TaskExecutor { } | undefined; private _isWarmStart: boolean | undefined; + private _executionCount: number | undefined; constructor( public task: TaskMetadataWithFunctions, @@ -85,6 +87,7 @@ export class TaskExecutor { this._consoleInterceptor = options.consoleInterceptor; this._retries = options.retries; this._isWarmStart = options.isWarmStart; + this._executionCount = options.executionCount; } async execute( @@ -112,11 +115,11 @@ export class TaskExecutor { runMetadata.enterWithMetadata(execution.run.metadata); } - this._tracingSDK.asyncResourceDetector.resolveWithAttributes({ - ...taskContext.attributes, - [SemanticInternalAttributes.SDK_VERSION]: VERSION, - [SemanticInternalAttributes.SDK_LANGUAGE]: "typescript", - }); + if (!this._tracingSDK.asyncResourceDetector.isResolved) { + this._tracingSDK.asyncResourceDetector.resolveWithAttributes({ + ...taskContext.resourceAttributes, + }); + } const result = await this._tracer.startActiveSpan( attemptMessage, @@ -351,11 +354,12 @@ export class TaskExecutor { ...(execution.attempt.number === 1 ? runTimelineMetrics.convertMetricsToSpanAttributes() : {}), - ...(execution.environment.type !== "DEVELOPMENT" + [SemanticInternalAttributes.STYLE_VARIANT]: this._isWarmStart + ? WARM_VARIANT + : COLD_VARIANT, + ...(typeof this._executionCount === "number" ? { - [SemanticInternalAttributes.STYLE_VARIANT]: this._isWarmStart - ? WARM_VARIANT - : COLD_VARIANT, + [SemanticInternalAttributes.ATTEMPT_EXECUTION_COUNT]: this._executionCount, } : {}), }, diff --git a/references/hello-world/src/trigger/example.ts b/references/hello-world/src/trigger/example.ts index 1a475d11de..771834a6dd 100644 --- a/references/hello-world/src/trigger/example.ts +++ b/references/hello-world/src/trigger/example.ts @@ -4,8 +4,20 @@ import { ResourceMonitor } from "../resourceMonitor.js"; export const helloWorldTask = task({ id: "hello-world", + retry: { + maxAttempts: 3, + minTimeoutInMs: 500, + maxTimeoutInMs: 1000, + factor: 1.5, + }, + onStart: async ({ payload, ctx, init }) => { + logger.info("Hello, world from the onStart hook", { payload, init }); + }, run: async (payload: any, { ctx }) => { logger.info("Hello, world from the init", { ctx, payload }); + logger.info("env vars", { + env: process.env, + }); logger.debug("debug: Hello, world!", { payload }); logger.info("info: Hello, world!", { payload }); @@ -17,6 +29,12 @@ export const helloWorldTask = task({ logger.debug("some log", { span }); }); + await setTimeout(payload.sleepFor ?? 180_000); + + if (payload.throwError) { + throw new Error("Forced error to cause a retry"); + } + logger.trace( "my trace", async (span) => { diff --git a/references/hello-world/src/trigger/schedule.ts b/references/hello-world/src/trigger/schedule.ts index c11c91ab7a..29c363d64c 100644 --- a/references/hello-world/src/trigger/schedule.ts +++ b/references/hello-world/src/trigger/schedule.ts @@ -2,8 +2,7 @@ import { schedules } from "@trigger.dev/sdk/v3"; export const simpleSchedule = schedules.task({ id: "simple-schedule", - // Every other minute - cron: "*/2 * * * *", + cron: "0 0 * * *", run: async (payload, { ctx }) => { return { message: "Hello, world!", diff --git a/references/hello-world/trigger.config.ts b/references/hello-world/trigger.config.ts index c3c6aea9e4..33935a4a13 100644 --- a/references/hello-world/trigger.config.ts +++ b/references/hello-world/trigger.config.ts @@ -4,6 +4,10 @@ import { syncEnvVars } from "@trigger.dev/build/extensions/core"; export default defineConfig({ compatibilityFlags: ["run_engine_v2"], project: "proj_rrkpdguyagvsoktglnod", + experimental_processKeepAlive: { + enabled: true, + maxExecutionsPerProcess: 20, + }, logLevel: "log", maxDuration: 3600, retries: {