Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@
"type": "module",
"scripts": {
"build": "tsc",
"dev": "tsx --experimental-sqlite --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
"start": "node --experimental-sqlite dist/index.js",
"dev": "tsx --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
"start": "node dist/index.js",
"test:watch": "vitest",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@kubernetes/client-node": "^1.0.0",
"@trigger.dev/core": "workspace:*",
"dockerode": "^4.0.3",
"nanoid": "^5.0.9",
"prom-client": "^15.1.0",
"socket.io": "4.7.4",
"std-env": "^3.8.0",
"tinyexec": "^0.3.1",
Expand Down
13 changes: 13 additions & 0 deletions apps/supervisor/src/clients/kubernetes.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import * as k8s from "@kubernetes/client-node";
import { Informer } from "@kubernetes/client-node";
import { ListPromise } from "@kubernetes/client-node";
import { KubernetesObject } from "@kubernetes/client-node";
import { assertExhaustive } from "@trigger.dev/core/utils";

export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";

export function createK8sApi() {
const kubeConfig = getKubeConfig();

function makeInformer<T extends KubernetesObject>(
path: string,
listPromiseFn: ListPromise<T>,
labelSelector?: string,
fieldSelector?: string
): Informer<T> {
return k8s.makeInformer(kubeConfig, path, listPromiseFn, labelSelector, fieldSelector);
}

const api = {
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
batch: kubeConfig.makeApiClient(k8s.BatchV1Api),
apps: kubeConfig.makeApiClient(k8s.AppsV1Api),
makeInformer,
};

return api;
Expand Down
12 changes: 12 additions & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ const Env = z.object({
KUBERNETES_NAMESPACE: z.string().default("default"),
EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),

// Metrics
METRICS_COLLECT_DEFAULTS: BoolEnv.default(true),

// Pod cleaner
POD_CLEANER_ENABLED: BoolEnv.default(true),
POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000),
POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500),

// Failed pod handler
FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true),
FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000),
});

export const env = Env.parse(stdEnv);
44 changes: 43 additions & 1 deletion apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ import {
CheckpointClient,
isKubernetesEnvironment,
} from "@trigger.dev/core/v3/serverOnly";
import { createK8sApi, RUNTIME_ENV } from "./clients/kubernetes.js";
import { createK8sApi } from "./clients/kubernetes.js";
import { collectDefaultMetrics } from "prom-client";
import { register } from "./metrics.js";
import { PodCleaner } from "./services/podCleaner.js";
import { FailedPodHandler } from "./services/failedPodHandler.js";

if (env.METRICS_COLLECT_DEFAULTS) {
collectDefaultMetrics({ register });
}

class ManagedSupervisor {
private readonly workerSession: SupervisorSession;
Expand All @@ -29,6 +37,9 @@ class ManagedSupervisor {
private readonly resourceMonitor: ResourceMonitor;
private readonly checkpointClient?: CheckpointClient;

private readonly podCleaner?: PodCleaner;
private readonly failedPodHandler?: FailedPodHandler;

private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED);
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;

Expand All @@ -37,6 +48,21 @@ class ManagedSupervisor {
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;

if (env.POD_CLEANER_ENABLED) {
this.podCleaner = new PodCleaner({
namespace: env.KUBERNETES_NAMESPACE,
batchSize: env.POD_CLEANER_BATCH_SIZE,
intervalMs: env.POD_CLEANER_INTERVAL_MS,
});
}

if (env.FAILED_POD_HANDLER_ENABLED) {
this.failedPodHandler = new FailedPodHandler({
namespace: env.KUBERNETES_NAMESPACE,
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
}

if (this.warmStartUrl) {
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
warmStartUrl: this.warmStartUrl,
Expand Down Expand Up @@ -273,6 +299,14 @@ class ManagedSupervisor {
async start() {
this.logger.log("[ManagedWorker] Starting up");

if (this.podCleaner) {
await this.podCleaner.start();
}

if (this.failedPodHandler) {
await this.failedPodHandler.start();
}

if (env.TRIGGER_WORKLOAD_API_ENABLED) {
this.logger.log("[ManagedWorker] Workload API enabled", {
protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
Expand All @@ -292,6 +326,14 @@ class ManagedSupervisor {
async stop() {
this.logger.log("[ManagedWorker] Shutting down");
await this.httpServer.stop();

if (this.podCleaner) {
await this.podCleaner.stop();
}

if (this.failedPodHandler) {
await this.failedPodHandler.stop();
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions apps/supervisor/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { Registry } from "prom-client";

export const register = new Registry();
Loading