From 3ba5674ca837e49ee07eca256741354b8b400e6a Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Fri, 12 Jun 2026 13:02:36 +0200 Subject: [PATCH 1/3] feat: verify warm-start delivery and cold-start silently lost dispatches Firestarter's didWarmStart: true means the response was written to a socket, not that the runner received it. A silently dead poller (no FIN, e.g. a VM torn down mid-poll) leaves the dispatched run stuck in PENDING_EXECUTING until the run engine's heartbeat redrive minutes later, burning a queue redelivery toward TASK_RUN_DEQUEUED_MAX_RETRIES each time. After a warm-start hit the supervisor now retains the DequeuedMessage, waits TRIGGER_WARM_START_VERIFY_DELAY_MS (default 10s), then asks the platform for the run's latest snapshot. If it is still the exact snapshot that was dequeued, no runner ever started the attempt - the run falls through to the regular cold-create path. Double-starts are prevented by the engine: startRunAttempt runs under a per-run lock and rejects stale snapshot ids, so a reviving runner and the fallback workload can't both execute. On probe errors nothing happens - during platform brownouts healthy runners legitimately act late, and falling back on uncertainty would stampede duplicates; the heartbeat redrive stays as the backstop. Off by default; enable with TRIGGER_WARM_START_VERIFY_ENABLED. When disabled the code path is a no-op. Emits warmstart.verify wide events (outcome: delivered / fallback / probe_error). Resolves TRI-10659. --- apps/supervisor/src/env.ts | 10 + apps/supervisor/src/index.ts | 136 +++++++++----- .../warmStartVerificationService.test.ts | 132 +++++++++++++ .../services/warmStartVerificationService.ts | 173 ++++++++++++++++++ 4 files changed, 404 insertions(+), 47 deletions(-) create mode 100644 apps/supervisor/src/services/warmStartVerificationService.test.ts create mode 100644 apps/supervisor/src/services/warmStartVerificationService.ts diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 3919e73a7e..99d440820a 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -79,6 +79,16 @@ const Env = z TRIGGER_CHECKPOINT_URL: z.string().optional(), TRIGGER_METADATA_URL: z.string().optional(), + // Warm-start delivery verification: after a warm-start hit, probe the + // platform and cold-start the run if no runner acted on the dispatch + TRIGGER_WARM_START_VERIFY_ENABLED: BoolEnv.default(false), + TRIGGER_WARM_START_VERIFY_DELAY_MS: z.coerce + .number() + .int() + .min(1_000) + .max(60_000) + .default(10_000), + // Used by the resource monitor RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index e97c4c7bb9..be9b61b36b 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -27,6 +27,10 @@ import { PodCleaner } from "./services/podCleaner.js"; import { FailedPodHandler } from "./services/failedPodHandler.js"; import { getWorkerToken } from "./workerToken.js"; import { OtlpTraceService } from "./services/otlpTraceService.js"; +import { + WarmStartVerificationService, + type WarmStartTimings, +} from "./services/warmStartVerificationService.js"; import { extractTraceparent, getRestoreRunnerId } from "./util.js"; import { Redis } from "ioredis"; import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js"; @@ -54,6 +58,7 @@ class ManagedSupervisor { private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; + private readonly warmStartVerifier?: WarmStartVerificationService; private readonly podCleaner?: PodCleaner; private readonly failedPodHandler?: FailedPodHandler; @@ -299,6 +304,19 @@ class ManagedSupervisor { }); } + if (env.TRIGGER_WARM_START_VERIFY_ENABLED && this.warmStartUrl) { + this.logger.log("Warm-start delivery verification enabled", { + delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS, + }); + + this.warmStartVerifier = new WarmStartVerificationService({ + workerClient: this.workerSession.httpClient, + delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS, + createWorkload: (message, timings) => this.createWorkload(message, timings), + wideEventOpts: this.wideEventOpts, + }); + } + this.workerSession.on("runNotification", async ({ time, run }) => { this.logger.verbose("runNotification", { time, run }); @@ -455,58 +473,24 @@ class ManagedSupervisor { if (didWarmStart) { setExtra(fromContext(), "path_taken", "warm_start"); this.logger.debug("Warm start successful", { runId: message.run.id }); - return; - } - - setExtra(fromContext(), "path_taken", "cold_create"); - - const createStart = performance.now(); - try { - if (!message.deployment.friendlyId) { - // mostly a type guard, deployments always exists for deployed environments - // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. - throw new Error("Deployment is missing"); - } - - await this.workloadManager.create({ - dequeuedAt: message.dequeuedAt, + // A hit only means the response was written to the long-poll + // socket, not that the runner received it. Schedule a delivery + // verification that cold-starts the run if nobody acts on it. + this.warmStartVerifier?.schedule(message, { dequeueResponseMs, pollingIntervalMs, warmStartCheckMs, - envId: message.environment.id, - envType: message.environment.type, - image: message.image, - machine: message.run.machine, - orgId: message.organization.id, - projectId: message.project.id, - deploymentFriendlyId: message.deployment.friendlyId, - deploymentVersion: message.backgroundWorker.version, - runId: message.run.id, - runFriendlyId: message.run.friendlyId, - version: message.version, - nextAttemptNumber: message.run.attemptNumber, - snapshotId: message.snapshot.id, - snapshotFriendlyId: message.snapshot.friendlyId, - placementTags: message.placementTags, - traceContext: message.run.traceContext, - annotations: message.run.annotations, - hasPrivateLink: message.organization.hasPrivateLink, }); - recordPhaseSince("workload_create", createStart, undefined); - - // Disabled for now - // this.resourceMonitor.blockResources({ - // cpu: message.run.machine.cpu, - // memory: message.run.machine.memory, - // }); - } catch (error) { - recordPhaseSince( - "workload_create", - createStart, - error instanceof Error ? error : new Error(String(error)) - ); - this.logger.error("Failed to create workload", { error }); + return; } + + setExtra(fromContext(), "path_taken", "cold_create"); + + await this.createWorkload(message, { + dequeueResponseMs, + pollingIntervalMs, + warmStartCheckMs, + }); } ); } @@ -541,6 +525,8 @@ class ManagedSupervisor { async onRunConnected({ run }: { run: { friendlyId: string } }) { this.logger.debug("Run connected", { run }); + // The dispatched run reached a runner on this node - no fallback needed. + this.warmStartVerifier?.cancel(run.friendlyId); this.workerSession.subscribeToRunNotifications([run.friendlyId]); } @@ -549,6 +535,61 @@ class ManagedSupervisor { this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]); } + private async createWorkload(message: DequeuedMessage, timings: WarmStartTimings) { + const createStart = performance.now(); + try { + if (!message.deployment.friendlyId) { + // mostly a type guard, deployments always exists for deployed environments + // a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments. + throw new Error("Deployment is missing"); + } + + if (!message.image) { + // same type-guard situation as deployment above + throw new Error("Image is missing"); + } + + await this.workloadManager.create({ + dequeuedAt: message.dequeuedAt, + dequeueResponseMs: timings.dequeueResponseMs, + pollingIntervalMs: timings.pollingIntervalMs, + warmStartCheckMs: timings.warmStartCheckMs, + envId: message.environment.id, + envType: message.environment.type, + image: message.image, + machine: message.run.machine, + orgId: message.organization.id, + projectId: message.project.id, + deploymentFriendlyId: message.deployment.friendlyId, + deploymentVersion: message.backgroundWorker.version, + runId: message.run.id, + runFriendlyId: message.run.friendlyId, + version: message.version, + nextAttemptNumber: message.run.attemptNumber, + snapshotId: message.snapshot.id, + snapshotFriendlyId: message.snapshot.friendlyId, + placementTags: message.placementTags, + traceContext: message.run.traceContext, + annotations: message.run.annotations, + hasPrivateLink: message.organization.hasPrivateLink, + }); + recordPhaseSince("workload_create", createStart, undefined); + + // Disabled for now + // this.resourceMonitor.blockResources({ + // cpu: message.run.machine.cpu, + // memory: message.run.machine.memory, + // }); + } catch (error) { + recordPhaseSince( + "workload_create", + createStart, + error instanceof Error ? error : new Error(String(error)) + ); + this.logger.error("Failed to create workload", { error }); + } + } + private async tryWarmStart( dequeuedMessage: DequeuedMessage, traceparent: string | undefined @@ -632,6 +673,7 @@ class ManagedSupervisor { this.logger.log("Shutting down"); await this.workloadServer.stop(); await this.workerSession.stop(); + this.warmStartVerifier?.stop(); // Optional services this.backpressureMonitor?.stop(); diff --git a/apps/supervisor/src/services/warmStartVerificationService.test.ts b/apps/supervisor/src/services/warmStartVerificationService.test.ts new file mode 100644 index 0000000000..994c678b4b --- /dev/null +++ b/apps/supervisor/src/services/warmStartVerificationService.test.ts @@ -0,0 +1,132 @@ +import { describe, expect, it, vi } from "vitest"; +import { setTimeout as sleep } from "node:timers/promises"; +import { WarmStartVerificationService } from "./warmStartVerificationService.js"; +import type { DequeuedMessage } from "@trigger.dev/core/v3"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; + +// The TimerWheel ticks every 100ms, so a 1000ms delay (the env minimum) +// fires within ~1.1s. +const DELAY_MS = 1_000; +// Long enough that a pending verification would certainly have fired. +const SETTLE_MS = 1_600; + +const DEQUEUED_SNAPSHOT_ID = "snapshot_dequeued"; + +function makeMessage(runFriendlyId = "run_1"): DequeuedMessage { + return { + run: { friendlyId: runFriendlyId }, + snapshot: { friendlyId: DEQUEUED_SNAPSHOT_ID }, + } as unknown as DequeuedMessage; +} + +function createService(opts: { + latestSnapshotId?: string; + probeError?: boolean; +}) { + const getLatestSnapshot = vi.fn(async (_runId: string) => + opts.probeError + ? { success: false as const, error: "connection refused" } + : { + success: true as const, + data: { execution: { snapshot: { friendlyId: opts.latestSnapshotId } } }, + } + ); + + const createWorkload = vi.fn(async () => {}); + + const service = new WarmStartVerificationService({ + workerClient: { getLatestSnapshot } as unknown as SupervisorHttpClient, + delayMs: DELAY_MS, + createWorkload, + wideEventOpts: { service: "supervisor-test", env: {}, enabled: false }, + }); + + return { service, getLatestSnapshot, createWorkload }; +} + +describe("WarmStartVerificationService", () => { + it("falls back to a cold create when the snapshot is unchanged", async () => { + const { service, createWorkload } = createService({ + latestSnapshotId: DEQUEUED_SNAPSHOT_ID, + }); + try { + const message = makeMessage(); + const timings = { warmStartCheckMs: 12 }; + service.schedule(message, timings); + + await vi.waitFor(() => expect(createWorkload).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + expect(createWorkload).toHaveBeenCalledWith(message, timings); + } finally { + service.stop(); + } + }); + + it("does nothing when the snapshot has moved on (delivered)", async () => { + const { service, getLatestSnapshot, createWorkload } = createService({ + latestSnapshotId: "snapshot_executing", + }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 12 }); + + await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + await sleep(100); + expect(createWorkload).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("never falls back when the probe errors", async () => { + const { service, getLatestSnapshot, createWorkload } = createService({ probeError: true }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 12 }); + + await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + await sleep(100); + expect(createWorkload).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("cancel before the delay prevents the probe entirely", async () => { + const { service, getLatestSnapshot, createWorkload } = createService({ + latestSnapshotId: DEQUEUED_SNAPSHOT_ID, + }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 12 }); + + expect(service.cancel("run_1")).toBe(true); + + await sleep(SETTLE_MS); + expect(getLatestSnapshot).not.toHaveBeenCalled(); + expect(createWorkload).not.toHaveBeenCalled(); + } finally { + service.stop(); + } + }); + + it("re-scheduling the same run replaces the pending verification", async () => { + const { service, getLatestSnapshot } = createService({ + latestSnapshotId: "snapshot_executing", + }); + try { + service.schedule(makeMessage(), { warmStartCheckMs: 1 }); + service.schedule(makeMessage(), { warmStartCheckMs: 2 }); + + await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), { + timeout: 3_000, + }); + await sleep(SETTLE_MS); + expect(getLatestSnapshot).toHaveBeenCalledTimes(1); + } finally { + service.stop(); + } + }); +}); diff --git a/apps/supervisor/src/services/warmStartVerificationService.ts b/apps/supervisor/src/services/warmStartVerificationService.ts new file mode 100644 index 0000000000..ce0bbe4f16 --- /dev/null +++ b/apps/supervisor/src/services/warmStartVerificationService.ts @@ -0,0 +1,173 @@ +import pLimit from "p-limit"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import type { DequeuedMessage } from "@trigger.dev/core/v3"; +import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers"; +import { tryCatch } from "@trigger.dev/core"; +import { TimerWheel } from "./timerWheel.js"; +import { emitOneShot, type WideEventOptions } from "../wideEvents/index.js"; + +const PROBE_CONCURRENCY_LIMIT = 10; + +export type WarmStartTimings = { + dequeueResponseMs?: number; + pollingIntervalMs?: number; + warmStartCheckMs: number; +}; + +type PendingVerification = { + message: DequeuedMessage; + timings: WarmStartTimings; +}; + +export type WarmStartVerificationServiceOptions = { + workerClient: SupervisorHttpClient; + /** How long after a warm-start hit to verify the runner acted on it. */ + delayMs: number; + /** Cold-creates the workload for a dispatched-but-lost run. */ + createWorkload: (message: DequeuedMessage, timings: WarmStartTimings) => Promise; + wideEventOpts: WideEventOptions; +}; + +/** + * Verifies that warm-start dispatches were actually acted on. + * + * Firestarter's `didWarmStart: true` means "response written to a socket", + * not "runner received it". A silently dead poller (no FIN - e.g. a VM torn + * down mid-poll) leaves the dispatched run stuck in PENDING_EXECUTING until + * the run engine's heartbeat redrive minutes later, burning a queue + * redelivery each time (TRI-10659). + * + * After a hit, the dequeued message is retained for `delayMs`, then the + * platform is asked for the run's latest snapshot. If it is still the exact + * snapshot we dequeued, no runner ever started the attempt - fall through to + * the regular cold-create path with the original message. Double-starts are + * impossible: `startRunAttempt` runs under a per-run lock and rejects stale + * snapshot ids, so if the original runner revives and races the fallback + * workload, exactly one wins and the loser exits before executing anything. + * + * On a probe ERROR we deliberately do nothing: the runner's attempt-start + * goes through nested retries, so during platform brownouts a healthy runner + * can legitimately act late - falling back on uncertainty would stampede + * duplicate workloads exactly when the platform is degraded. The heartbeat + * redrive remains the backstop for that case (and for supervisor restarts, + * which drop the in-memory timers). + */ +export class WarmStartVerificationService { + private readonly logger = new SimpleStructuredLogger("warm-start-verification"); + + private readonly timerWheel: TimerWheel; + private readonly probeLimit: ReturnType; + + private readonly workerClient: SupervisorHttpClient; + private readonly delayMs: number; + private readonly createWorkload: WarmStartVerificationServiceOptions["createWorkload"]; + private readonly wideEventOpts: WideEventOptions; + + constructor(opts: WarmStartVerificationServiceOptions) { + this.workerClient = opts.workerClient; + this.delayMs = opts.delayMs; + this.createWorkload = opts.createWorkload; + this.wideEventOpts = opts.wideEventOpts; + + this.probeLimit = pLimit(PROBE_CONCURRENCY_LIMIT); + this.timerWheel = new TimerWheel({ + delayMs: opts.delayMs, + onExpire: (item) => { + this.probeLimit(() => this.verify(item.data)).catch((error) => { + this.logger.error("Verification failed", { + runId: item.data.message.run.friendlyId, + error, + }); + }); + }, + }); + this.timerWheel.start(); + } + + /** Schedule delivery verification for a warm-start hit. */ + schedule(message: DequeuedMessage, timings: WarmStartTimings) { + this.timerWheel.submit(message.run.friendlyId, { message, timings }); + this.logger.debug("Verification scheduled", { + runId: message.run.friendlyId, + snapshotId: message.snapshot.friendlyId, + delayMs: this.delayMs, + }); + } + + /** + * Cancel a pending verification, e.g. when the runner connects to this + * supervisor. Purely an optimization: the matched runner often lives on a + * different node and connects to that node's supervisor, so most healthy + * deliveries are confirmed by the probe, not by this. + */ + cancel(runFriendlyId: string): boolean { + return this.timerWheel.cancel(runFriendlyId); + } + + /** Stop the timer wheel, dropping pending verifications. The run engine's + * heartbeat redrive covers anything dropped here. */ + stop() { + const remaining = this.timerWheel.stop(); + if (remaining.length > 0) { + this.logger.info("Stopped, dropped pending verifications", { count: remaining.length }); + } + } + + private async verify({ message, timings }: PendingVerification) { + const runFriendlyId = message.run.friendlyId; + + const result = await this.workerClient.getLatestSnapshot(runFriendlyId); + + if (!result.success) { + // Never fall back on uncertainty - see class docs. + this.emitOutcome(message, "probe_error", String(result.error)); + this.logger.warn("Verification probe failed, skipping", { + runId: runFriendlyId, + error: result.error, + }); + return; + } + + const latestSnapshotId = result.data.execution.snapshot.friendlyId; + + if (latestSnapshotId !== message.snapshot.friendlyId) { + // Something acted on the run (attempt started, or it was cancelled or + // requeued) - the dispatch is no longer ours to worry about. + this.emitOutcome(message, "delivered"); + return; + } + + this.emitOutcome(message, "fallback"); + this.logger.warn("Warm start dispatch was never acted on, cold starting", { + runId: runFriendlyId, + snapshotId: message.snapshot.friendlyId, + }); + + const [error] = await tryCatch(this.createWorkload(message, timings)); + if (error) { + this.logger.error("Fallback workload create failed", { + runId: runFriendlyId, + error: error instanceof Error ? error.message : String(error), + }); + } + } + + private emitOutcome( + message: DequeuedMessage, + outcome: "delivered" | "fallback" | "probe_error", + error?: string + ) { + emitOneShot({ + ...this.wideEventOpts, + op: "warmstart.verify", + kind: "event", + populate: (state) => { + state.meta.run_id = message.run.friendlyId; + state.meta.snapshot_id = message.snapshot.friendlyId; + state.extras.outcome = outcome; + state.extras.delay_ms = this.delayMs; + if (error) state.extras.error = error; + }, + }); + } +} From 5ac2f2227df3eb36a00e8f4335afa66166bf405d Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Fri, 12 Jun 2026 13:02:55 +0200 Subject: [PATCH 2/3] chore: add server-changes entry --- .server-changes/warm-start-delivery-verification.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .server-changes/warm-start-delivery-verification.md diff --git a/.server-changes/warm-start-delivery-verification.md b/.server-changes/warm-start-delivery-verification.md new file mode 100644 index 0000000000..f7eef4bb1a --- /dev/null +++ b/.server-changes/warm-start-delivery-verification.md @@ -0,0 +1,6 @@ +--- +area: supervisor +type: feature +--- + +Verify warm-start dispatches were acted on and cold-start the run within seconds when a dispatch is silently lost (opt-in via TRIGGER_WARM_START_VERIFY_ENABLED). From 58cef9a29a299f82c97ab3b99f95e0613f59f6b5 Mon Sep 17 00:00:00 2001 From: Saadi Myftija Date: Fri, 12 Jun 2026 13:13:59 +0200 Subject: [PATCH 3/3] fix: attribute fallback create failures and stop verifier before servers Review follow-ups: the workload-create error log now carries the run id (fallback creates run outside the dequeue wide event, so the log was the only attribution), and the verifier stops before the workload server and session so its timer can't cold-create a workload mid-shutdown. --- apps/supervisor/src/index.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index be9b61b36b..1ee2fc682c 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -586,7 +586,10 @@ class ManagedSupervisor { createStart, error instanceof Error ? error : new Error(String(error)) ); - this.logger.error("Failed to create workload", { error }); + this.logger.error("Failed to create workload", { + runId: message.run.friendlyId, + error, + }); } } @@ -671,9 +674,11 @@ class ManagedSupervisor { async stop() { this.logger.log("Shutting down"); + // Stop the verifier first: its timer can otherwise fire mid-shutdown and + // cold-create a workload on a node that is going down. + this.warmStartVerifier?.stop(); await this.workloadServer.stop(); await this.workerSession.stop(); - this.warmStartVerifier?.stop(); // Optional services this.backpressureMonitor?.stop();