Skip to content
5 changes: 5 additions & 0 deletions .changeset/wise-mirrors-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add manual checkpoint schema
2 changes: 1 addition & 1 deletion apps/coordinator/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN find . -name "node_modules" -type d -prune -exec rm -rf '{}' +
FROM node-20 AS base

RUN apt-get update \
&& apt-get install -y buildah ca-certificates dumb-init docker.io \
&& apt-get install -y buildah ca-certificates dumb-init docker.io busybox \
&& rm -rf /var/lib/apt/lists/*

COPY --chown=node:node .gitignore .gitignore
Expand Down
15 changes: 10 additions & 5 deletions apps/coordinator/src/checkpointer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,14 @@ export class Checkpointer {
const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });
const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });

const removeCurrentAbortController = () => {
// Ensure only the current controller is removed
if (this.#abortControllers.get(runId) === controller) {
this.#abortControllers.delete(runId);
}
controller.signal.removeEventListener("abort", onAbort);
};

const cleanup = async () => {
const metadata = {
runId,
Expand All @@ -424,6 +432,7 @@ export class Checkpointer {

if (this.#dockerMode) {
this.#logger.debug("Skipping cleanup in docker mode", metadata);
removeCurrentAbortController();
return;
}

Expand All @@ -436,11 +445,7 @@ export class Checkpointer {
this.#logger.error("Error during cleanup", { ...metadata, error });
}

// Ensure only the current controller is removed
if (this.#abortControllers.get(runId) === controller) {
this.#abortControllers.delete(runId);
}
controller.signal.removeEventListener("abort", onAbort);
removeCurrentAbortController();
};

try {
Expand Down
250 changes: 239 additions & 11 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
import { ChaosMonkey } from "./chaosMonkey";
import { Checkpointer } from "./checkpointer";
import { boolFromEnv, numFromEnv } from "./util";
import { boolFromEnv, numFromEnv, safeJsonParse } from "./util";

import { collectDefaultMetrics, register, Gauge } from "prom-client";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
Expand Down Expand Up @@ -42,6 +42,8 @@ class CheckpointCancelError extends Error {}

class TaskCoordinator {
#httpServer: ReturnType<typeof createServer>;
#internalHttpServer: ReturnType<typeof createServer>;

#checkpointer = new Checkpointer({
dockerMode: !process.env.KUBERNETES_PORT,
forceSimulate: boolFromEnv("FORCE_CHECKPOINT_SIMULATION", false),
Expand Down Expand Up @@ -79,6 +81,8 @@ class TaskCoordinator {
private host = "0.0.0.0"
) {
this.#httpServer = this.#createHttpServer();
this.#internalHttpServer = this.#createInternalHttpServer();

this.#checkpointer.init();
this.#platformSocket = this.#createPlatformSocket();

Expand Down Expand Up @@ -653,11 +657,11 @@ class TaskCoordinator {

log.error("READY_FOR_LAZY_ATTEMPT error", { error });

await crashRun({
name: "ReadyForLazyAttemptError",
message:
error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error",
});
// await crashRun({
// name: "ReadyForLazyAttemptError",
// message:
// error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error",
// });

return;
}
Expand Down Expand Up @@ -1368,13 +1372,236 @@ class TaskCoordinator {
case "/metrics": {
return reply.text(await register.metrics(), 200, register.contentType);
}
default: {
return reply.empty(404);
}
}
});

httpServer.on("clientError", (err, socket) => {
socket.end("HTTP/1.1 400 Bad Request\r\n\r\n");
});

httpServer.on("listening", () => {
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
});

return httpServer;
}

#createInternalHttpServer() {
const httpServer = createServer(async (req, res) => {
logger.log(`[${req.method}]`, { url: req.url });

const reply = new HttpReply(res);

switch (req.url) {
case "/whoami": {
return reply.text(NODE_NAME);
}
case "/checkpoint": {
const body = await getTextBody(req);
// await this.#checkpointer.checkpointAndPush(body);
return reply.text(`sent restore request: ${body}`);
case "/checkpoint/duration": {
try {
const body = await getTextBody(req);
const json = safeJsonParse(body);

if (typeof json !== "object" || !json) {
return reply.text("Invalid body", 400);
}

if (!("runId" in json) || typeof json.runId !== "string") {
return reply.text("Missing or invalid: runId", 400);
}

if (!("now" in json) || typeof json.now !== "number") {
return reply.text("Missing or invalid: now", 400);
}

if (!("ms" in json) || typeof json.ms !== "number") {
return reply.text("Missing or invalid: ms", 400);
}

let keepRunAlive = false;
if ("keepRunAlive" in json && typeof json.keepRunAlive === "boolean") {
keepRunAlive = json.keepRunAlive;
}

let async = false;
if ("async" in json && typeof json.async === "boolean") {
async = json.async;
}

const { runId, now, ms } = json;

if (!runId) {
return reply.text("Missing runId", 400);
}

const runSocket = await this.#getRunSocket(runId);
if (!runSocket) {
return reply.text("Run socket not found", 404);
}

const { data } = runSocket;

console.log("Manual duration checkpoint", data);

if (async) {
reply.text("Creating checkpoint in the background", 202);
}

const checkpoint = await this.#checkpointer.checkpointAndPush({
runId: data.runId,
projectRef: data.projectRef,
deploymentVersion: data.deploymentVersion,
attemptNumber: data.attemptNumber ? parseInt(data.attemptNumber) : undefined,
});

if (!checkpoint) {
return reply.text("Failed to checkpoint", 500);
}

if (!data.attemptFriendlyId) {
return reply.text("Socket data missing attemptFriendlyId", 500);
}

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
runId,
attemptFriendlyId: data.attemptFriendlyId,
docker: checkpoint.docker,
location: checkpoint.location,
reason: {
type: "WAIT_FOR_DURATION",
ms,
now,
},
});

if (ack?.keepRunAlive || keepRunAlive) {
return reply.json({
message: `keeping run ${runId} alive after checkpoint`,
checkpoint,
requestJson: json,
platformAck: ack,
});
}

runSocket.emit("REQUEST_EXIT", {
version: "v1",
});

return reply.json({
message: `checkpoint created for run ${runId}`,
checkpoint,
requestJson: json,
platformAck: ack,
});
} catch (error) {
return reply.json({
message: `error`,
error,
});
}
}
case "/checkpoint/manual": {
try {
const body = await getTextBody(req);
const json = safeJsonParse(body);

if (typeof json !== "object" || !json) {
return reply.text("Invalid body", 400);
}

if (!("runId" in json) || typeof json.runId !== "string") {
return reply.text("Missing or invalid: runId", 400);
}

let restoreAtUnixTimeMs: number | undefined;
if ("restoreAtUnixTimeMs" in json && typeof json.restoreAtUnixTimeMs === "number") {
restoreAtUnixTimeMs = json.restoreAtUnixTimeMs;
}

let keepRunAlive = false;
if ("keepRunAlive" in json && typeof json.keepRunAlive === "boolean") {
keepRunAlive = json.keepRunAlive;
}

let async = false;
if ("async" in json && typeof json.async === "boolean") {
async = json.async;
}

const { runId } = json;

if (!runId) {
return reply.text("Missing runId", 400);
}

const runSocket = await this.#getRunSocket(runId);
if (!runSocket) {
return reply.text("Run socket not found", 404);
}

const { data } = runSocket;

console.log("Manual checkpoint", data);

if (async) {
reply.text("Creating checkpoint in the background", 202);
}

const checkpoint = await this.#checkpointer.checkpointAndPush({
runId: data.runId,
projectRef: data.projectRef,
deploymentVersion: data.deploymentVersion,
attemptNumber: data.attemptNumber ? parseInt(data.attemptNumber) : undefined,
});

if (!checkpoint) {
return reply.text("Failed to checkpoint", 500);
}

if (!data.attemptFriendlyId) {
return reply.text("Socket data missing attemptFriendlyId", 500);
}

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
runId,
attemptFriendlyId: data.attemptFriendlyId,
docker: checkpoint.docker,
location: checkpoint.location,
reason: {
type: "MANUAL",
restoreAtUnixTimeMs,
},
});

if (ack?.keepRunAlive || keepRunAlive) {
return reply.json({
message: `keeping run ${runId} alive after checkpoint`,
checkpoint,
requestJson: json,
platformAck: ack,
});
}

runSocket.emit("REQUEST_EXIT", {
version: "v1",
});

return reply.json({
message: `checkpoint created for run ${runId}`,
checkpoint,
requestJson: json,
platformAck: ack,
});
} catch (error) {
return reply.json({
message: `error`,
error,
});
}
}
default: {
return reply.empty(404);
Expand All @@ -1387,14 +1614,15 @@ class TaskCoordinator {
});

httpServer.on("listening", () => {
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
logger.log("internal server listening on port", { port: HTTP_SERVER_PORT + 100 });
});

return httpServer;
}

listen() {
this.#httpServer.listen(this.port, this.host);
this.#internalHttpServer.listen(this.port + 100, "127.0.0.1");
}
}

Expand Down
12 changes: 12 additions & 0 deletions apps/coordinator/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,15 @@ export const numFromEnv = (env: string, defaultValue: number): number => {

return parseInt(value, 10);
};

export function safeJsonParse(json?: string): unknown {
if (!json) {
return;
}

try {
return JSON.parse(json);
} catch (e) {
return null;
}
}
8 changes: 8 additions & 0 deletions apps/webapp/app/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@ export const RuntimeEnvironmentType = {
DEVELOPMENT: "DEVELOPMENT",
PREVIEW: "PREVIEW",
} as const satisfies Record<RuntimeEnvironmentTypeType, RuntimeEnvironmentTypeType>;

export function isTaskRunAttemptStatus(value: string): value is keyof typeof TaskRunAttemptStatus {
return Object.values(TaskRunAttemptStatus).includes(value as keyof typeof TaskRunAttemptStatus);
}

export function isTaskRunStatus(value: string): value is keyof typeof TaskRunStatus {
return Object.values(TaskRunStatus).includes(value as keyof typeof TaskRunStatus);
}
Loading
Loading