Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class ManagedSupervisor {

try {
await this.workloadManager.create({
dequeuedAt: message.dequeuedAt,
envId: message.environment.id,
envType: message.environment.type,
image: message.image,
Expand Down
2 changes: 2 additions & 0 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export class DockerWorkloadManager implements WorkloadManager {
"run",
"--detach",
`--network=${env.DOCKER_NETWORK}`,
`--env=TRIGGER_DEQUEUED_AT_MS=${opts.dequeuedAt.getTime()}`,
`--env=TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`,
`--env=TRIGGER_ENV_ID=${opts.envId}`,
`--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`,
`--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`,
Expand Down
14 changes: 13 additions & 1 deletion apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ export class KubernetesWorkloadManager implements WorkloadManager {
],
resources: this.#getResourcesForMachine(opts.machine),
env: [
{
name: "TRIGGER_DEQUEUED_AT_MS",
value: opts.dequeuedAt.getTime().toString(),
},
{
name: "TRIGGER_POD_SCHEDULED_AT_MS",
value: Date.now().toString(),
},
{
name: "TRIGGER_RUN_ID",
value: opts.runFriendlyId,
Expand Down Expand Up @@ -97,7 +105,11 @@ export class KubernetesWorkloadManager implements WorkloadManager {
},
{
name: "TRIGGER_WORKER_INSTANCE_NAME",
value: env.TRIGGER_WORKER_INSTANCE_NAME,
valueFrom: {
fieldRef: {
fieldPath: "spec.nodeName",
},
},
},
{
name: "OTEL_EXPORTER_OTLP_ENDPOINT",
Expand Down
1 change: 1 addition & 0 deletions apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface WorkloadManagerCreateOptions {
machine: MachinePreset;
version: string;
nextAttemptNumber?: number;
dequeuedAt: Date;
// identifiers
envId: string;
envType: EnvironmentType;
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/utils/timelineSpanEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ function getAdminOnlyForEvent(event: string): boolean {
return true;
}
case "import": {
return true;
return false;
}
case "lazy_payload": {
return true;
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/test/timelineSpanEvents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ describe("createTimelineSpanEventsFromSpanEvents", () => {
expect(result.some((event) => event.name === "Attempt created")).toBe(false);
});

test("should filter import events for non-admin when fork event exists", () => {
test.skip("should filter import events for non-admin when fork event exists", () => {
const result = createTimelineSpanEventsFromSpanEvents(sampleSpanEvents, false);

// With fork event, import should be hidden for non-admins
Expand Down
15 changes: 6 additions & 9 deletions packages/cli-v3/src/entryPoints/dev-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,15 @@ export class DevRunController {
try {
await this.cancelAttempt();
} catch (error) {
logger.debug("Failed to cancel attempt, shutting down", {
logger.debug("Failed to cancel attempt, killing task run process", {
error,
});

//todo kill the process?
try {
await this.taskRunProcess?.kill("SIGKILL");
} catch (error) {
logger.debug("Failed to cancel attempt, failed to kill task run process", { error });
}

return;
}
Expand Down Expand Up @@ -512,8 +516,6 @@ export class DevRunController {
snapshot: snapshot.friendlyId,
});

// TODO: We may already be executing this run, this may be a new attempt
// This is the only case where incrementing the attempt number is allowed
this.enterRunPhase(run, snapshot);

const metrics = [
Expand All @@ -539,9 +541,6 @@ export class DevRunController {
try {
return await this.executeRun({ run, snapshot, execution, envVars, metrics });
} catch (error) {
// TODO: Handle the case where we're in the warm start phase or executing a new run
// This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch

logger.debug("Error while executing attempt", {
error,
});
Expand Down Expand Up @@ -570,8 +569,6 @@ export class DevRunController {
error: completionResult.error,
});

// TODO: Maybe we should keep retrying for a while longer

this.runFinished();
return;
}
Expand Down
107 changes: 99 additions & 8 deletions packages/cli-v3/src/entryPoints/managed-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
type CompleteRunAttemptResult,
HeartbeatService,
type RunExecutionData,
type TaskRunExecutionMetrics,
type TaskRunExecutionResult,
type TaskRunFailedExecutionResult,
WorkerManifest,
Expand All @@ -25,6 +26,11 @@ import { assertExhaustive } from "../utilities/assertExhaustive.js";
import { setTimeout as sleep } from "timers/promises";
import { io, type Socket } from "socket.io-client";

const DateEnv = z
.string()
.transform((val) => new Date(parseInt(val, 10)))
.pipe(z.date());

// All IDs are friendly IDs
const Env = z.object({
// Set at build time
Expand All @@ -50,6 +56,10 @@ const Env = z.object({
TRIGGER_RUNNER_ID: z.string(),
TRIGGER_METADATA_URL: z.string().optional(),

// Timeline metrics
TRIGGER_POD_SCHEDULED_AT_MS: DateEnv,
TRIGGER_DEQUEUED_AT_MS: DateEnv,

// May be overridden
TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]),
TRIGGER_SUPERVISOR_API_DOMAIN: z.string(),
Expand Down Expand Up @@ -238,6 +248,14 @@ class ManagedRunController {

if (!response.success) {
console.error("[ManagedRunController] Heartbeat failed", { error: response.error });

this.sendDebugLog({
runId: this.runFriendlyId,
message: "heartbeat: failed",
properties: {
error: response.error,
},
});
}
},
intervalMs: this.heartbeatIntervalSeconds * 1000,
Expand Down Expand Up @@ -620,6 +638,14 @@ class ManagedRunController {
if (!continuationResult.success) {
console.error("Failed to continue execution", { error: continuationResult.error });

this.sendDebugLog({
runId: run.friendlyId,
message: "failed to continue execution",
properties: {
error: continuationResult.error,
},
});

this.waitForNextRun();
return;
}
Expand Down Expand Up @@ -734,10 +760,14 @@ class ManagedRunController {
private async startAndExecuteRunAttempt({
runFriendlyId,
snapshotFriendlyId,
dequeuedAt,
podScheduledAt,
isWarmStart = false,
}: {
runFriendlyId: string;
snapshotFriendlyId: string;
dequeuedAt?: Date;
podScheduledAt?: Date;
isWarmStart?: boolean;
}) {
if (!this.socket) {
Expand All @@ -749,39 +779,79 @@ class ManagedRunController {
snapshot: { friendlyId: snapshotFriendlyId },
});

const attemptStartedAt = Date.now();

const start = await this.httpClient.startRunAttempt(runFriendlyId, snapshotFriendlyId, {
isWarmStart,
});

if (!start.success) {
console.error("[ManagedRunController] Failed to start run", { error: start.error });

this.sendDebugLog({
runId: runFriendlyId,
message: "failed to start run attempt",
properties: {
error: start.error,
},
});

this.waitForNextRun();
return;
}

const attemptDuration = Date.now() - attemptStartedAt;

const { run, snapshot, execution, envVars } = start.data;

logger.debug("[ManagedRunController] Started run", {
runId: run.friendlyId,
snapshot: snapshot.friendlyId,
});

// TODO: We may already be executing this run, this may be a new attempt
// This is the only case where incrementing the attempt number is allowed
this.enterRunPhase(run, snapshot);

const metrics = [
{
name: "start",
event: "create_attempt",
timestamp: attemptStartedAt,
duration: attemptDuration,
},
]
.concat(
dequeuedAt
? [
{
name: "start",
event: "dequeue",
timestamp: dequeuedAt.getTime(),
duration: 0,
},
]
: []
)
.concat(
podScheduledAt
? [
{
name: "start",
event: "pod_scheduled",
timestamp: podScheduledAt.getTime(),
duration: 0,
},
]
: []
) satisfies TaskRunExecutionMetrics;

const taskRunEnv = {
...gatherProcessEnv(),
...envVars,
};

try {
return await this.executeRun({ run, snapshot, envVars: taskRunEnv, execution });
return await this.executeRun({ run, snapshot, envVars: taskRunEnv, execution, metrics });
} catch (error) {
// TODO: Handle the case where we're in the warm start phase or executing a new run
// This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch

console.error("Error while executing attempt", {
error,
});
Expand Down Expand Up @@ -810,7 +880,13 @@ class ManagedRunController {
error: completionResult.error,
});

// TODO: Maybe we should keep retrying for a while longer
this.sendDebugLog({
runId: run.friendlyId,
message: "completion: failed to submit after error",
properties: {
error: completionResult.error,
},
});

this.waitForNextRun();
return;
Expand Down Expand Up @@ -923,6 +999,7 @@ class ManagedRunController {
this.startAndExecuteRunAttempt({
runFriendlyId: nextRun.run.friendlyId,
snapshotFriendlyId: nextRun.snapshot.friendlyId,
dequeuedAt: nextRun.dequeuedAt,
isWarmStart: true,
}).finally(() => {});
return;
Expand Down Expand Up @@ -1032,7 +1109,10 @@ class ManagedRunController {
snapshot,
envVars,
execution,
}: WorkloadRunAttemptStartResponseBody) {
metrics,
}: WorkloadRunAttemptStartResponseBody & {
metrics?: TaskRunExecutionMetrics;
}) {
this.snapshotPoller.start();

if (!this.taskRunProcess || !this.taskRunProcess.isPreparedForNextRun) {
Expand All @@ -1058,6 +1138,7 @@ class ManagedRunController {
payload: {
execution,
traceContext: execution.run.traceContext ?? {},
metrics,
},
messageId: run.friendlyId,
env: envVars,
Expand Down Expand Up @@ -1096,6 +1177,14 @@ class ManagedRunController {
error: completionResult.error,
});

this.sendDebugLog({
runId: run.friendlyId,
message: "completion: failed to submit",
properties: {
error: completionResult.error,
},
});

this.waitForNextRun();
return;
}
Expand Down Expand Up @@ -1212,6 +1301,8 @@ class ManagedRunController {
this.startAndExecuteRunAttempt({
runFriendlyId: env.TRIGGER_RUN_ID,
snapshotFriendlyId: env.TRIGGER_SNAPSHOT_ID,
dequeuedAt: env.TRIGGER_DEQUEUED_AT_MS,
podScheduledAt: env.TRIGGER_POD_SCHEDULED_AT_MS,
}).finally(() => {});
return;
}
Expand Down
Loading