diff --git a/.server-changes/warm-start-delivery-verification.md b/.server-changes/warm-start-delivery-verification.md new file mode 100644 index 0000000000..f7eef4bb1a --- /dev/null +++ b/.server-changes/warm-start-delivery-verification.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +Verify warm-start dispatches were acted on and cold-start the run within seconds when a dispatch is silently lost (opt-in via TRIGGER_WARM_START_VERIFY_ENABLED). diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 3919e73a7e..99d440820a 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -79,6 +79,16 @@ const Env = z TRIGGER_CHECKPOINT_URL: z.string().optional(), TRIGGER_METADATA_URL: z.string().optional(), + // Warm-start delivery verification: after a warm-start hit, probe the + // platform and cold-start the run if no runner acted on the dispatch + TRIGGER_WARM_START_VERIFY_ENABLED: BoolEnv.default(false), + TRIGGER_WARM_START_VERIFY_DELAY_MS: z.coerce + .number() + .int() + .min(1_000) + .max(60_000) + .default(10_000), + // Used by the resource monitor RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index e97c4c7bb9..1ee2fc682c 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -27,6 +27,10 @@ import { PodCleaner } from "./services/podCleaner.js"; import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; import { OtlpTraceService } from "./services/otlpTraceService.js"; +import { + WarmStartVerificationService, + type WarmStartTimings, +} from "./services/warmStartVerificationService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; import { Redis } from "ioredis"; import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; @@ -54,6 +58,7 @@ class ManagedSupervisor { private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; + private readonly warmStartVerifier?: WarmStartVerificationService; private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; @@ -299,6 +304,19 @@ class ManagedSupervisor { }); } + if (env.TRIGGER_WARM_START_VERIFY_ENABLED && this.warmStartUrl) { + this.logger.log("Warm-start delivery verification enabled", { + delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS, + }); + + this.warmStartVerifier = new WarmStartVerificationService({ + workerClient: this.workerSession.httpClient, + delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS, + createWorkload: (message, timings) => this.createWorkload(message, timings), + wideEventOpts: this.wideEventOpts, + }); + } + this.workerSession.on("runNotification", async ({ time, run }) => { this.logger.verbose("runNotification", { time, run }); @@ -455,58 +473,24 @@ class ManagedSupervisor { if (didWarmStart) { setExtra(fromContext(), "path_taken", "warm_start"); this.logger.debug("Warm start successful", { runId: message.run.id }); - return; - } - - setExtra(fromContext(), "path_taken", "cold_create"); - - const createStart = performance.now(); - try { - if (!message.deployment.friendlyId) { - // mostly a type guard, deployments always exists for deployed environments - // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. - throw new Error("Deployment is missing"); - } - - await this.workloadManager.create({ - dequeuedAt: message.dequeuedAt, + // A hit only means the response was written to the long-poll + // socket, not that the runner received it. Schedule a delivery + // verification that cold-starts the run if nobody acts on it. + this.warmStartVerifier?.schedule(message, { dequeueResponseMs, pollingIntervalMs, warmStartCheckMs, - envId: message.environment.id, - envType: message.environment.type, - image: message.image, - machine: message.run.machine, - orgId: message.organization.id, - projectId: message.project.id, - deploymentFriendlyId: message.deployment.friendlyId, - deploymentVersion: message.backgroundWorker.version, - runId: message.run.id, - runFriendlyId: message.run.friendlyId, - version: message.version, - nextAttemptNumber: message.run.attemptNumber, - snapshotId: message.snapshot.id, - snapshotFriendlyId: message.snapshot.friendlyId, - placementTags: message.placementTags, - traceContext: message.run.traceContext, - annotations: message.run.annotations, - hasPrivateLink: message.organization.hasPrivateLink, }); - recordPhaseSince("workload_create", createStart, undefined); - - // Disabled for now - // this.resourceMonitor.blockResources({ - // cpu: message.run.machine.cpu, - // memory: message.run.machine.memory, - // }); - } catch (error) { - recordPhaseSince( - "workload_create", - createStart, - error instanceof Error ? error : new Error(String(error)) - ); - this.logger.error("Failed to create workload", { error }); + return; } + + setExtra(fromContext(), "path_taken", "cold_create"); + + await this.createWorkload(message, { + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, + }); } ); } @@ -541,6 +525,8 @@ class ManagedSupervisor { async onRunConnected({ run }: { run: { friendlyId: string } }) { this.logger.debug("Run connected", { run }); + // The dispatched run reached a runner on this node - no fallback needed. + this.warmStartVerifier?.cancel(run.friendlyId); this.workerSession.subscribeToRunNotifications([run.friendlyId]); } @@ -549,6 +535,64 @@ class ManagedSupervisor { this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]); } + private async createWorkload(message: DequeuedMessage, timings: WarmStartTimings) { + const createStart = performance.now(); + try { + if (!message.deployment.friendlyId) { + // mostly a type guard, deployments always exists for deployed environments + // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. + throw new Error("Deployment is missing"); + } + + if (!message.image) { + // same type-guard situation as deployment above + throw new Error("Image is missing"); + } + + await this.workloadManager.create({ + dequeuedAt: message.dequeuedAt, + dequeueResponseMs: timings.dequeueResponseMs, + pollingIntervalMs: timings.pollingIntervalMs, + warmStartCheckMs: timings.warmStartCheckMs, + envId: message.environment.id, + envType: message.environment.type, + image: message.image, + machine: message.run.machine, + orgId: message.organization.id, + projectId: message.project.id, + deploymentFriendlyId: message.deployment.friendlyId, + deploymentVersion: message.backgroundWorker.version, + runId: message.run.id, + runFriendlyId: message.run.friendlyId, + version: message.version, + nextAttemptNumber: message.run.attemptNumber, + snapshotId: message.snapshot.id, + snapshotFriendlyId: message.snapshot.friendlyId, + placementTags: message.placementTags, + traceContext: message.run.traceContext, + annotations: message.run.annotations, + hasPrivateLink: message.organization.hasPrivateLink, + }); + recordPhaseSince("workload_create", createStart, undefined); + + // Disabled for now + // this.resourceMonitor.blockResources({ + // cpu: message.run.machine.cpu, + // memory: message.run.machine.memory, + // }); + } catch (error) { + recordPhaseSince( + "workload_create", + createStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to create workload", { + runId: message.run.friendlyId, + error, + }); + } + } + private async tryWarmStart( dequeuedMessage: DequeuedMessage, traceparent: string | undefined @@ -630,6 +674,9 @@ class ManagedSupervisor { async stop() { this.logger.log("Shutting down"); + // Stop the verifier first: its timer can otherwise fire mid-shutdown and + // cold-create a workload on a node that is going down. + this.warmStartVerifier?.stop(); await this.workloadServer.stop(); await this.workerSession.stop(); diff --git a/apps/supervisor/src/services/warmStartVerificationService.test.ts b/apps/supervisor/src/services/warmStartVerificationService.test.ts new file mode 100644 index 0000000000..994c678b4b --- /dev/null +++ b/apps/supervisor/src/services/warmStartVerificationService.test.ts @@ -0,0 +1,132 @@ +import { describe, expect, it, vi } from "vitest"; +import { setTimeout as sleep } from "node:timers/promises"; +import { WarmStartVerificationService } from "./warmStartVerificationService.js"; +import type { DequeuedMessage } from "@trigger.dev/core/v3"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; + +// The TimerWheel ticks every 100ms, so a 1000ms delay (the env minimum) +// fires within ~1.1s. +const DELAY_MS = 1_000; +// Long enough that a pending verification would certainly have fired. +const SETTLE_MS = 1_600; + +const DEQUEUED_SNAPSHOT_ID = "snapshot_dequeued"; + +function makeMessage(runFriendlyId = "run_1"): DequeuedMessage { + return { + run: { friendlyId: runFriendlyId }, + snapshot: { friendlyId: DEQUEUED_SNAPSHOT_ID }, + } as unknown as DequeuedMessage; +} + +function createService(opts: { + latestSnapshotId?: string; + probeError?: boolean; +}) { + const getLatestSnapshot = vi.fn(async (_runId: string) => + opts.probeError + ? { success: false as const, error: "connection refused" } + : { + success: true as const, + data: { execution: { snapshot: { friendlyId: opts.latestSnapshotId } } }, + } + ); + + const createWorkload = vi.fn(async () => {}); + + const service = new WarmStartVerificationService({ + workerClient: { getLatestSnapshot } as unknown as SupervisorHttpClient, + delayMs: DELAY_MS, + createWorkload, + wideEventOpts: { service: "supervisor-test", env: {}, enabled: false }, + }); + + return { service, getLatestSnapshot, createWorkload }; +} + +describe("WarmStartVerificationService", () => { + it("falls back to a cold create when the snapshot is unchanged", async () => { + const { service, createWorkload } = createService({ + latestSnapshotId: DEQUEUED_SNAPSHOT_ID, + }); + try { + const message = makeMessage(); + const timings = { warmStartCheckMs: 12 }; + service.schedule(message, timings); + + await vi.waitFor(() => expect(createWorkload).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + expect(createWorkload).toHaveBeenCalledWith(message, timings); + } finally { + service.stop(); + } + }); + + it("does nothing when the snapshot has moved on (delivered)", async () => { + const { service, getLatestSnapshot, createWorkload } = createService({ + latestSnapshotId: "snapshot_executing", + }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 12 }); + + await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + await sleep(100); + expect(createWorkload).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("never falls back when the probe errors", async () => { + const { service, getLatestSnapshot, createWorkload } = createService({ probeError: true }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 12 }); + + await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + await sleep(100); + expect(createWorkload).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel before the delay prevents the probe entirely", async () => { + const { service, getLatestSnapshot, createWorkload } = createService({ + latestSnapshotId: DEQUEUED_SNAPSHOT_ID, + }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 12 }); + + expect(service.cancel("run_1")).toBe(true); + + await sleep(SETTLE_MS); + expect(getLatestSnapshot).not.toHaveBeenCalled(); + expect(createWorkload).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("re-scheduling the same run replaces the pending verification", async () => { + const { service, getLatestSnapshot } = createService({ + latestSnapshotId: "snapshot_executing", + }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 1 }); + service.schedule(makeMessage(), { warmStartCheckMs: 2 }); + + await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + await sleep(SETTLE_MS); + expect(getLatestSnapshot).toHaveBeenCalledTimes(1); + } finally { + service.stop(); + } + }); +}); diff --git a/apps/supervisor/src/services/warmStartVerificationService.ts b/apps/supervisor/src/services/warmStartVerificationService.ts new file mode 100644 index 0000000000..ce0bbe4f16 --- /dev/null +++ b/apps/supervisor/src/services/warmStartVerificationService.ts @@ -0,0 +1,173 @@ +import pLimit from "p-limit"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import type { DequeuedMessage } from "@trigger.dev/core/v3"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; +import { tryCatch } from "@trigger.dev/core"; +import { TimerWheel } from "./timerWheel.js"; +import { emitOneShot, type WideEventOptions } from "../wideEvents/index.js"; + +const PROBE_CONCURRENCY_LIMIT = 10; + +export type WarmStartTimings = { + dequeueResponseMs?: number; + pollingIntervalMs?: number; + warmStartCheckMs: number; +}; + +type PendingVerification = { + message: DequeuedMessage; + timings: WarmStartTimings; +}; + +export type WarmStartVerificationServiceOptions = { + workerClient: SupervisorHttpClient; + /** How long after a warm-start hit to verify the runner acted on it. */ + delayMs: number; + /** Cold-creates the workload for a dispatched-but-lost run. */ + createWorkload: (message: DequeuedMessage, timings: WarmStartTimings) => Promise; + wideEventOpts: WideEventOptions; +}; + +/** + * Verifies that warm-start dispatches were actually acted on. + * + * Firestarter's `didWarmStart: true` means "response written to a socket", + * not "runner received it". A silently dead poller (no FIN - e.g. a VM torn + * down mid-poll) leaves the dispatched run stuck in PENDING_EXECUTING until + * the run engine's heartbeat redrive minutes later, burning a queue + * redelivery each time (TRI-10659). + * + * After a hit, the dequeued message is retained for `delayMs`, then the + * platform is asked for the run's latest snapshot. If it is still the exact + * snapshot we dequeued, no runner ever started the attempt - fall through to + * the regular cold-create path with the original message. Double-starts are + * impossible: `startRunAttempt` runs under a per-run lock and rejects stale + * snapshot ids, so if the original runner revives and races the fallback + * workload, exactly one wins and the loser exits before executing anything. + * + * On a probe ERROR we deliberately do nothing: the runner's attempt-start + * goes through nested retries, so during platform brownouts a healthy runner + * can legitimately act late - falling back on uncertainty would stampede + * duplicate workloads exactly when the platform is degraded. The heartbeat + * redrive remains the backstop for that case (and for supervisor restarts, + * which drop the in-memory timers). + */ +export class WarmStartVerificationService { + private readonly logger = new SimpleStructuredLogger("warm-start-verification"); + + private readonly timerWheel: TimerWheel; + private readonly probeLimit: ReturnType; + + private readonly workerClient: SupervisorHttpClient; + private readonly delayMs: number; + private readonly createWorkload: WarmStartVerificationServiceOptions["createWorkload"]; + private readonly wideEventOpts: WideEventOptions; + + constructor(opts: WarmStartVerificationServiceOptions) { + this.workerClient = opts.workerClient; + this.delayMs = opts.delayMs; + this.createWorkload = opts.createWorkload; + this.wideEventOpts = opts.wideEventOpts; + + this.probeLimit = pLimit(PROBE_CONCURRENCY_LIMIT); + this.timerWheel = new TimerWheel({ + delayMs: opts.delayMs, + onExpire: (item) => { + this.probeLimit(() => this.verify(item.data)).catch((error) => { + this.logger.error("Verification failed", { + runId: item.data.message.run.friendlyId, + error, + }); + }); + }, + }); + this.timerWheel.start(); + } + + /** Schedule delivery verification for a warm-start hit. */ + schedule(message: DequeuedMessage, timings: WarmStartTimings) { + this.timerWheel.submit(message.run.friendlyId, { message, timings }); + this.logger.debug("Verification scheduled", { + runId: message.run.friendlyId, + snapshotId: message.snapshot.friendlyId, + delayMs: this.delayMs, + }); + } + + /** + * Cancel a pending verification, e.g. when the runner connects to this + * supervisor. Purely an optimization: the matched runner often lives on a + * different node and connects to that node's supervisor, so most healthy + * deliveries are confirmed by the probe, not by this. + */ + cancel(runFriendlyId: string): boolean { + return this.timerWheel.cancel(runFriendlyId); + } + + /** Stop the timer wheel, dropping pending verifications. The run engine's + * heartbeat redrive covers anything dropped here. */ + stop() { + const remaining = this.timerWheel.stop(); + if (remaining.length > 0) { + this.logger.info("Stopped, dropped pending verifications", { count: remaining.length }); + } + } + + private async verify({ message, timings }: PendingVerification) { + const runFriendlyId = message.run.friendlyId; + + const result = await this.workerClient.getLatestSnapshot(runFriendlyId); + + if (!result.success) { + // Never fall back on uncertainty - see class docs. + this.emitOutcome(message, "probe_error", String(result.error)); + this.logger.warn("Verification probe failed, skipping", { + runId: runFriendlyId, + error: result.error, + }); + return; + } + + const latestSnapshotId = result.data.execution.snapshot.friendlyId; + + if (latestSnapshotId !== message.snapshot.friendlyId) { + // Something acted on the run (attempt started, or it was cancelled or + // requeued) - the dispatch is no longer ours to worry about. + this.emitOutcome(message, "delivered"); + return; + } + + this.emitOutcome(message, "fallback"); + this.logger.warn("Warm start dispatch was never acted on, cold starting", { + runId: runFriendlyId, + snapshotId: message.snapshot.friendlyId, + }); + + const [error] = await tryCatch(this.createWorkload(message, timings)); + if (error) { + this.logger.error("Fallback workload create failed", { + runId: runFriendlyId, + error: error instanceof Error ? error.message : String(error), + }); + } + } + + private emitOutcome( + message: DequeuedMessage, + outcome: "delivered" | "fallback" | "probe_error", + error?: string + ) { + emitOneShot({ + ...this.wideEventOpts, + op: "warmstart.verify", + kind: "event", + populate: (state) => { + state.meta.run_id = message.run.friendlyId; + state.meta.snapshot_id = message.snapshot.friendlyId; + state.extras.outcome = outcome; + state.extras.delay_ms = this.delayMs; + if (error) state.extras.error = error; + }, + }); + } +}