diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index 845165f173..c4a0c984d4 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -6,8 +6,9 @@ "type": "module", "scripts": { "build": "tsc", - "dev": "tsx --experimental-sqlite --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)", - "start": "node --experimental-sqlite dist/index.js", + "dev": "tsx --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)", + "start": "node dist/index.js", + "test:watch": "vitest", "typecheck": "tsc --noEmit" }, "dependencies": { @@ -15,6 +16,7 @@ "@trigger.dev/core": "workspace:*", "dockerode": "^4.0.3", "nanoid": "^5.0.9", + "prom-client": "^15.1.0", "socket.io": "4.7.4", "std-env": "^3.8.0", "tinyexec": "^0.3.1", diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index ceab570069..77fd4144d6 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -1,4 +1,7 @@ import * as k8s from "@kubernetes/client-node"; +import { Informer } from "@kubernetes/client-node"; +import { ListPromise } from "@kubernetes/client-node"; +import { KubernetesObject } from "@kubernetes/client-node"; import { assertExhaustive } from "@trigger.dev/core/utils"; export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local"; @@ -6,10 +9,20 @@ export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local"; export function createK8sApi() { const kubeConfig = getKubeConfig(); + function makeInformer( + path: string, + listPromiseFn: ListPromise, + labelSelector?: string, + fieldSelector?: string + ): Informer { + return k8s.makeInformer(kubeConfig, path, listPromiseFn, labelSelector, fieldSelector); + } + const api = { core: kubeConfig.makeApiClient(k8s.CoreV1Api), batch: kubeConfig.makeApiClient(k8s.BatchV1Api), apps: kubeConfig.makeApiClient(k8s.AppsV1Api), + makeInformer, }; return api; diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 69a8024f34..5fe5abe21b 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -51,6 +51,18 @@ const Env = z.object({ KUBERNETES_NAMESPACE: z.string().default("default"), EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"), EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), + + // Metrics + METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), + + // Pod cleaner + POD_CLEANER_ENABLED: BoolEnv.default(true), + POD_CLEANER_INTERVAL_MS: z.coerce.number().int().default(10000), + POD_CLEANER_BATCH_SIZE: z.coerce.number().int().default(500), + + // Failed pod handler + FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), + FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), }); export const env = Env.parse(stdEnv); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 28c96f2ab7..b5d7ea36d1 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -18,7 +18,15 @@ import { CheckpointClient, isKubernetesEnvironment, } from "@trigger.dev/core/v3/serverOnly"; -import { createK8sApi, RUNTIME_ENV } from "./clients/kubernetes.js"; +import { createK8sApi } from "./clients/kubernetes.js"; +import { collectDefaultMetrics } from "prom-client"; +import { register } from "./metrics.js"; +import { PodCleaner } from "./services/podCleaner.js"; +import { FailedPodHandler } from "./services/failedPodHandler.js"; + +if (env.METRICS_COLLECT_DEFAULTS) { + collectDefaultMetrics({ register }); +} class ManagedSupervisor { private readonly workerSession: SupervisorSession; @@ -29,6 +37,9 @@ class ManagedSupervisor { private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; + private readonly podCleaner?: PodCleaner; + private readonly failedPodHandler?: FailedPodHandler; + private readonly isKubernetes = isKubernetesEnvironment(env.KUBERNETES_FORCE_ENABLED); private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; @@ -37,6 +48,21 @@ class ManagedSupervisor { const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN; const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL; + if (env.POD_CLEANER_ENABLED) { + this.podCleaner = new PodCleaner({ + namespace: env.KUBERNETES_NAMESPACE, + batchSize: env.POD_CLEANER_BATCH_SIZE, + intervalMs: env.POD_CLEANER_INTERVAL_MS, + }); + } + + if (env.FAILED_POD_HANDLER_ENABLED) { + this.failedPodHandler = new FailedPodHandler({ + namespace: env.KUBERNETES_NAMESPACE, + reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, + }); + } + if (this.warmStartUrl) { this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", { warmStartUrl: this.warmStartUrl, @@ -273,6 +299,14 @@ class ManagedSupervisor { async start() { this.logger.log("[ManagedWorker] Starting up"); + if (this.podCleaner) { + await this.podCleaner.start(); + } + + if (this.failedPodHandler) { + await this.failedPodHandler.start(); + } + if (env.TRIGGER_WORKLOAD_API_ENABLED) { this.logger.log("[ManagedWorker] Workload API enabled", { protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL, @@ -292,6 +326,14 @@ class ManagedSupervisor { async stop() { this.logger.log("[ManagedWorker] Shutting down"); await this.httpServer.stop(); + + if (this.podCleaner) { + await this.podCleaner.stop(); + } + + if (this.failedPodHandler) { + await this.failedPodHandler.stop(); + } } } diff --git a/apps/supervisor/src/metrics.ts b/apps/supervisor/src/metrics.ts new file mode 100644 index 0000000000..caec486153 --- /dev/null +++ b/apps/supervisor/src/metrics.ts @@ -0,0 +1,3 @@ +import { Registry } from "prom-client"; + +export const register = new Registry(); diff --git a/apps/supervisor/src/services/failedPodHandler.test.ts b/apps/supervisor/src/services/failedPodHandler.test.ts new file mode 100644 index 0000000000..28458ea8ef --- /dev/null +++ b/apps/supervisor/src/services/failedPodHandler.test.ts @@ -0,0 +1,477 @@ +import { describe, it, expect, beforeAll, afterEach } from "vitest"; +import { FailedPodHandler } from "./failedPodHandler.js"; +import { K8sApi, createK8sApi } from "../clients/kubernetes.js"; +import { Registry } from "prom-client"; +import { setTimeout } from "timers/promises"; + +describe("FailedPodHandler Integration Tests", () => { + const k8s = createK8sApi(); + const namespace = "integration-test"; + const register = new Registry(); + + beforeAll(async () => { + // Create the test namespace if it doesn't exist + try { + await k8s.core.readNamespace({ name: namespace }); + } catch (error) { + await k8s.core.createNamespace({ + body: { + metadata: { + name: namespace, + }, + }, + }); + } + + // Clear any existing pods in the namespace + await deleteAllPodsInNamespace({ k8sApi: k8s, namespace }); + }); + + afterEach(async () => { + // Clear metrics to avoid conflicts + register.clear(); + + // Delete any remaining pods in the namespace + await deleteAllPodsInNamespace({ k8sApi: k8s, namespace }); + }); + + it("should process and delete failed pods with app=task-run label", async () => { + const handler = new FailedPodHandler({ namespace, k8s, register }); + + try { + // Create failed pods with the correct label + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 2, + shouldFail: true, + }); + + // Wait for pods to reach Failed state + await waitForPodsPhase({ + k8sApi: k8s, + namespace, + podNames, + phase: "Failed", + }); + + // Start the handler + await handler.start(); + + // Wait for pods to be deleted + await waitForPodsDeletion({ + k8sApi: k8s, + namespace, + podNames, + }); + + // Verify metrics + const metrics = handler.getMetrics(); + + // Check informer events were recorded + const informerEvents = await metrics.informerEventsTotal.get(); + expect(informerEvents.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + verb: "add", + }), + value: 2, + }) + ); + expect(informerEvents.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + verb: "connect", + }), + value: 1, + }) + ); + expect(informerEvents.values).not.toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + verb: "error", + }), + }) + ); + + // Check pods were processed + const processedPods = await metrics.processedPodsTotal.get(); + expect(processedPods.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + status: "Failed", + }), + value: 2, + }) + ); + + // Check pods were deleted + const deletedPods = await metrics.deletedPodsTotal.get(); + expect(deletedPods.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + status: "Failed", + }), + value: 2, + }) + ); + + // Check no deletion errors were recorded + const deletionErrors = await metrics.deletionErrorsTotal.get(); + expect(deletionErrors.values).toHaveLength(0); + + // Check processing durations were recorded + const durations = await metrics.processingDurationSeconds.get(); + const failedDurations = durations.values.filter( + (v) => v.labels.namespace === namespace && v.labels.status === "Failed" + ); + expect(failedDurations.length).toBeGreaterThan(0); + } finally { + await handler.stop(); + } + }, 30000); + + it("should ignore pods without app=task-run label", async () => { + const handler = new FailedPodHandler({ namespace, k8s, register }); + + try { + // Create failed pods without the task-run label + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 1, + shouldFail: true, + labels: { app: "not-task-run" }, + }); + + // Wait for pod to reach Failed state + await waitForPodsPhase({ + k8sApi: k8s, + namespace, + podNames, + phase: "Failed", + }); + + await handler.start(); + + // Wait a reasonable time to ensure pod isn't deleted + await setTimeout(5000); + + // Verify pod still exists + const exists = await podExists({ k8sApi: k8s, namespace, podName: podNames[0]! }); + expect(exists).toBe(true); + + // Verify no metrics were recorded + const metrics = handler.getMetrics(); + const processedPods = await metrics.processedPodsTotal.get(); + expect(processedPods.values).toHaveLength(0); + } finally { + await handler.stop(); + } + }, 30000); + + it("should not process pods that are being deleted", async () => { + const handler = new FailedPodHandler({ namespace, k8s, register }); + + try { + // Create a failed pod that we'll mark for deletion + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 1, + shouldFail: true, + command: ["/bin/sh", "-c", "sleep 30"], + }); + + // Wait for pod to reach Failed state + await waitForPodsPhase({ + k8sApi: k8s, + namespace, + podNames, + phase: "Running", + }); + + // Delete the pod but don't wait for deletion + await k8s.core.deleteNamespacedPod({ + namespace, + name: podNames[0]!, + gracePeriodSeconds: 5, + }); + + // Start the handler + await handler.start(); + + // Wait for pod to be fully deleted + await waitForPodsDeletion({ + k8sApi: k8s, + namespace, + podNames, + }); + + // Verify metrics show we skipped processing + const metrics = handler.getMetrics(); + const processedPods = await metrics.processedPodsTotal.get(); + expect(processedPods.values).toHaveLength(0); + } finally { + await handler.stop(); + } + }, 30000); + + it("should detect and process pods that fail after handler starts", async () => { + const handler = new FailedPodHandler({ namespace, k8s, register }); + + try { + // Start the handler + await handler.start(); + + // Create failed pods with the correct label + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 3, + shouldFail: true, + }); + + // Wait for pods to be deleted + await waitForPodsDeletion({ + k8sApi: k8s, + namespace, + podNames, + }); + + // Verify metrics + const metrics = handler.getMetrics(); + + // Check informer events were recorded + const informerEvents = await metrics.informerEventsTotal.get(); + expect(informerEvents.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + verb: "add", + }), + value: 3, + }) + ); + expect(informerEvents.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + verb: "connect", + }), + value: 1, + }) + ); + expect(informerEvents.values).not.toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + verb: "error", + }), + }) + ); + + // Check pods were processed + const processedPods = await metrics.processedPodsTotal.get(); + expect(processedPods.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + status: "Failed", + }), + value: 3, + }) + ); + + // Check pods were deleted + const deletedPods = await metrics.deletedPodsTotal.get(); + expect(deletedPods.values).toContainEqual( + expect.objectContaining({ + labels: expect.objectContaining({ + namespace, + status: "Failed", + }), + value: 3, + }) + ); + + // Check no deletion errors were recorded + const deletionErrors = await metrics.deletionErrorsTotal.get(); + expect(deletionErrors.values).toHaveLength(0); + + // Check processing durations were recorded + const durations = await metrics.processingDurationSeconds.get(); + const failedDurations = durations.values.filter( + (v) => v.labels.namespace === namespace && v.labels.status === "Failed" + ); + expect(failedDurations.length).toBeGreaterThan(0); + } finally { + await handler.stop(); + } + }, 60000); +}); + +async function createTestPods({ + k8sApi, + namespace, + count, + labels = { app: "task-run" }, + shouldFail = false, + namePrefix = "test-pod", + command = ["/bin/sh", "-c", shouldFail ? "exit 1" : "exit 0"], + randomizeName = true, +}: { + k8sApi: K8sApi; + namespace: string; + count: number; + labels?: Record; + shouldFail?: boolean; + namePrefix?: string; + command?: string[]; + randomizeName?: boolean; +}) { + const createdPods: string[] = []; + + for (let i = 0; i < count; i++) { + const podName = randomizeName + ? `${namePrefix}-${i}-${Math.random().toString(36).substring(2, 15)}` + : `${namePrefix}-${i}`; + await k8sApi.core.createNamespacedPod({ + namespace, + body: { + metadata: { + name: podName, + labels, + }, + spec: { + restartPolicy: "Never", + containers: [ + { + name: "test", + image: "busybox:1.37.0", + command, + }, + ], + }, + }, + }); + createdPods.push(podName); + } + + return createdPods; +} + +async function waitForPodsDeletion({ + k8sApi, + namespace, + podNames, + timeoutMs = 10000, + waitMs = 1000, +}: { + k8sApi: K8sApi; + namespace: string; + podNames: string[]; + timeoutMs?: number; + waitMs?: number; +}) { + const startTime = Date.now(); + const pendingPods = new Set(podNames); + + while (pendingPods.size > 0 && Date.now() - startTime < timeoutMs) { + const pods = await k8sApi.core.listNamespacedPod({ namespace }); + const existingPods = new Set(pods.items.map((pod) => pod.metadata?.name ?? "")); + + for (const podName of pendingPods) { + if (!existingPods.has(podName)) { + pendingPods.delete(podName); + } + } + + if (pendingPods.size > 0) { + await setTimeout(waitMs); + } + } + + if (pendingPods.size > 0) { + throw new Error( + `Pods [${Array.from(pendingPods).join(", ")}] were not deleted within ${timeoutMs}ms` + ); + } +} + +async function podExists({ + k8sApi, + namespace, + podName, +}: { + k8sApi: K8sApi; + namespace: string; + podName: string; +}) { + const pods = await k8sApi.core.listNamespacedPod({ namespace }); + return pods.items.some((p) => p.metadata?.name === podName); +} + +async function waitForPodsPhase({ + k8sApi, + namespace, + podNames, + phase, + timeoutMs = 10000, + waitMs = 1000, +}: { + k8sApi: K8sApi; + namespace: string; + podNames: string[]; + phase: "Pending" | "Running" | "Succeeded" | "Failed" | "Unknown"; + timeoutMs?: number; + waitMs?: number; +}) { + const startTime = Date.now(); + const pendingPods = new Set(podNames); + + while (pendingPods.size > 0 && Date.now() - startTime < timeoutMs) { + const pods = await k8sApi.core.listNamespacedPod({ namespace }); + + for (const pod of pods.items) { + if (pendingPods.has(pod.metadata?.name ?? "") && pod.status?.phase === phase) { + pendingPods.delete(pod.metadata?.name ?? ""); + } + } + + if (pendingPods.size > 0) { + await setTimeout(waitMs); + } + } + + if (pendingPods.size > 0) { + throw new Error( + `Pods [${Array.from(pendingPods).join( + ", " + )}] did not reach phase ${phase} within ${timeoutMs}ms` + ); + } +} + +async function deleteAllPodsInNamespace({ + k8sApi, + namespace, +}: { + k8sApi: K8sApi; + namespace: string; +}) { + // Get all pods + const pods = await k8sApi.core.listNamespacedPod({ namespace }); + const podNames = pods.items.map((p) => p.metadata?.name ?? ""); + + // Delete all pods + await k8sApi.core.deleteCollectionNamespacedPod({ namespace }); + + // Wait for all pods to be deleted + await waitForPodsDeletion({ k8sApi, namespace, podNames }); +} diff --git a/apps/supervisor/src/services/failedPodHandler.ts b/apps/supervisor/src/services/failedPodHandler.ts new file mode 100644 index 0000000000..677eda74fd --- /dev/null +++ b/apps/supervisor/src/services/failedPodHandler.ts @@ -0,0 +1,281 @@ +import { LogLevel, SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { K8sApi } from "../clients/kubernetes.js"; +import { createK8sApi } from "../clients/kubernetes.js"; +import { Informer, V1Pod } from "@kubernetes/client-node"; +import { Counter, Registry, Histogram } from "prom-client"; +import { register } from "../metrics.js"; +import { setTimeout } from "timers/promises"; + +type PodStatus = "Pending" | "Running" | "Succeeded" | "Failed" | "Unknown"; + +export type FailedPodHandlerOptions = { + namespace: string; + reconnectIntervalMs?: number; + k8s?: K8sApi; + register?: Registry; +}; + +export class FailedPodHandler { + private readonly id: string; + private readonly logger: SimpleStructuredLogger; + private readonly k8s: K8sApi; + private readonly namespace: string; + + private isRunning = false; + + private readonly informer: Informer; + private readonly reconnectIntervalMs: number; + + // Metrics + private readonly register: Registry; + private readonly processedPodsTotal: Counter; + private readonly deletedPodsTotal: Counter; + private readonly deletionErrorsTotal: Counter; + private readonly processingDurationSeconds: Histogram; + private readonly informerEventsTotal: Counter; + + constructor(opts: FailedPodHandlerOptions) { + this.id = Math.random().toString(36).substring(2, 15); + this.logger = new SimpleStructuredLogger("failed-pod-handler", LogLevel.debug, { + id: this.id, + }); + + this.k8s = opts.k8s ?? createK8sApi(); + + this.namespace = opts.namespace; + this.reconnectIntervalMs = opts.reconnectIntervalMs ?? 1000; + + this.informer = this.k8s.makeInformer( + `/api/v1/namespaces/${this.namespace}/pods`, + () => + this.k8s.core.listNamespacedPod({ + namespace: this.namespace, + labelSelector: "app=task-run", + fieldSelector: "status.phase=Failed", + }), + "app=task-run", + "status.phase=Failed" + ); + + // Whenever a matching pod is added to the informer cache + this.informer.on("add", this.onPodCompleted.bind(this)); + + // Informer events + this.informer.on("connect", this.makeOnConnect("failed-pod-informer").bind(this)); + this.informer.on("error", this.makeOnError("failed-pod-informer").bind(this)); + + // Initialize metrics + this.register = opts.register ?? register; + + this.processedPodsTotal = new Counter({ + name: "failed_pod_handler_processed_pods_total", + help: "Total number of failed pods processed", + labelNames: ["namespace", "status"], + registers: [this.register], + }); + + this.deletedPodsTotal = new Counter({ + name: "failed_pod_handler_deleted_pods_total", + help: "Total number of pods deleted", + labelNames: ["namespace", "status"], + registers: [this.register], + }); + + this.deletionErrorsTotal = new Counter({ + name: "failed_pod_handler_deletion_errors_total", + help: "Total number of errors encountered while deleting pods", + labelNames: ["namespace", "error_type"], + registers: [this.register], + }); + + this.processingDurationSeconds = new Histogram({ + name: "failed_pod_handler_processing_duration_seconds", + help: "The duration of pod processing", + labelNames: ["namespace", "status"], + registers: [this.register], + }); + + this.informerEventsTotal = new Counter({ + name: "failed_pod_handler_informer_events_total", + help: "Total number of informer events", + labelNames: ["namespace", "verb"], + registers: [this.register], + }); + } + + async start() { + if (this.isRunning) { + this.logger.warn("failed pod handler already running"); + return; + } + + this.isRunning = true; + + this.logger.info("starting failed pod handler"); + await this.informer.start(); + } + + async stop() { + if (!this.isRunning) { + this.logger.warn("failed pod handler not running"); + return; + } + + this.isRunning = false; + + this.logger.info("stopping failed pod handler"); + await this.informer.stop(); + } + + private async withHistogram( + histogram: Histogram, + promise: Promise, + labels?: Record + ): Promise { + const end = histogram.startTimer({ namespace: this.namespace, ...labels }); + try { + return await promise; + } finally { + end(); + } + } + + /** + * Returns the non-nullable status of a pod + */ + private podStatus(pod: V1Pod): PodStatus { + return (pod.status?.phase ?? "Unknown") as PodStatus; + } + + private async onPodCompleted(pod: V1Pod) { + this.logger.info("pod-completed", this.podSummary(pod)); + this.informerEventsTotal.inc({ namespace: this.namespace, verb: "add" }); + + if (!pod.metadata?.name) { + this.logger.error("pod-completed: no name", this.podSummary(pod)); + return; + } + + if (!pod.status) { + this.logger.error("pod-completed: no status", this.podSummary(pod)); + return; + } + + if (pod.metadata?.deletionTimestamp) { + this.logger.info("pod-completed: pod is being deleted", this.podSummary(pod)); + return; + } + + const podStatus = this.podStatus(pod); + + switch (podStatus) { + case "Succeeded": + await this.withHistogram(this.processingDurationSeconds, this.onPodSucceeded(pod), { + status: podStatus, + }); + break; + case "Failed": + await this.withHistogram(this.processingDurationSeconds, this.onPodFailed(pod), { + status: podStatus, + }); + break; + default: + this.logger.error("pod-completed: unknown phase", this.podSummary(pod)); + } + } + + private async onPodSucceeded(pod: V1Pod) { + this.logger.info("pod-succeeded", this.podSummary(pod)); + this.processedPodsTotal.inc({ + namespace: this.namespace, + status: this.podStatus(pod), + }); + } + + private async onPodFailed(pod: V1Pod) { + this.logger.info("pod-failed", this.podSummary(pod)); + + try { + await this.processFailedPod(pod); + } catch (error) { + this.logger.error("pod-failed: error processing pod", this.podSummary(pod), { error }); + } finally { + await this.deletePod(pod); + } + } + + private async processFailedPod(pod: V1Pod) { + this.logger.info("pod-failed: processing pod", this.podSummary(pod)); + this.processedPodsTotal.inc({ + namespace: this.namespace, + status: this.podStatus(pod), + }); + } + + private async deletePod(pod: V1Pod) { + this.logger.info("pod-failed: deleting pod", this.podSummary(pod)); + try { + await this.k8s.core.deleteNamespacedPod({ + name: pod.metadata!.name!, + namespace: this.namespace, + }); + this.deletedPodsTotal.inc({ + namespace: this.namespace, + status: this.podStatus(pod), + }); + } catch (error) { + this.logger.error("pod-failed: error deleting pod", this.podSummary(pod), { error }); + this.deletionErrorsTotal.inc({ + namespace: this.namespace, + error_type: error instanceof Error ? error.name : "unknown", + }); + } + } + + private makeOnError(informerName: string) { + return () => this.onError(informerName); + } + + private async onError(informerName: string) { + if (!this.isRunning) { + this.logger.warn("onError: informer not running"); + return; + } + + this.logger.error("error event fired", { informerName }); + this.informerEventsTotal.inc({ namespace: this.namespace, verb: "error" }); + + // Reconnect on errors + await setTimeout(this.reconnectIntervalMs); + await this.informer.start(); + } + + private makeOnConnect(informerName: string) { + return () => this.onConnect(informerName); + } + + private async onConnect(informerName: string) { + this.logger.info(`informer connected: ${informerName}`); + this.informerEventsTotal.inc({ namespace: this.namespace, verb: "connect" }); + } + + private podSummary(pod: V1Pod) { + return { + name: pod.metadata?.name, + namespace: pod.metadata?.namespace, + status: pod.status?.phase, + deletionTimestamp: pod.metadata?.deletionTimestamp, + }; + } + + // Method to expose metrics for testing + public getMetrics() { + return { + processedPodsTotal: this.processedPodsTotal, + deletedPodsTotal: this.deletedPodsTotal, + deletionErrorsTotal: this.deletionErrorsTotal, + informerEventsTotal: this.informerEventsTotal, + processingDurationSeconds: this.processingDurationSeconds, + }; + } +} diff --git a/apps/supervisor/src/services/podCleaner.test.ts b/apps/supervisor/src/services/podCleaner.test.ts new file mode 100644 index 0000000000..3ffb920183 --- /dev/null +++ b/apps/supervisor/src/services/podCleaner.test.ts @@ -0,0 +1,472 @@ +import { PodCleaner } from "./podCleaner.js"; +import { K8sApi, createK8sApi } from "../clients/kubernetes.js"; +import { setTimeout } from "timers/promises"; +import { describe, it, expect, beforeAll, afterEach } from "vitest"; +import { Registry } from "prom-client"; + +describe("PodCleaner Integration Tests", () => { + const k8s = createK8sApi(); + const namespace = "integration-test"; + const register = new Registry(); + + beforeAll(async () => { + // Create the test namespace, only if it doesn't exist + try { + await k8s.core.readNamespace({ name: namespace }); + } catch (error) { + await k8s.core.createNamespace({ + body: { + metadata: { + name: namespace, + }, + }, + }); + } + }); + + afterEach(async () => { + // Clear metrics to avoid conflicts + register.clear(); + + // Delete all pods in the namespace + await k8s.core.deleteCollectionNamespacedPod({ namespace }); + }); + + it("should clean up succeeded pods", async () => { + const podCleaner = new PodCleaner({ namespace, k8s, register }); + + try { + // Create a test pod that's in succeeded state + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 1, + namePrefix: "test-succeeded-pod", + }); + + if (!podNames[0]) { + throw new Error("Failed to create test pod"); + } + const podName = podNames[0]; + + // Wait for pod to complete + await waitForPodPhase({ + k8sApi: k8s, + namespace, + podName, + phase: "Succeeded", + }); + + // Start the pod cleaner + await podCleaner.start(); + + // Wait for pod to be deleted + await waitForPodDeletion({ + k8sApi: k8s, + namespace, + podName, + }); + + // Verify pod was deleted + expect(await podExists({ k8sApi: k8s, namespace, podName })).toBe(false); + } finally { + await podCleaner.stop(); + } + }, 30000); + + it("should accurately track deletion metrics", async () => { + const podCleaner = new PodCleaner({ namespace, k8s, register }); + try { + // Create a test pod that's in succeeded state + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 1, + namePrefix: "test-succeeded-pod", + }); + + // Wait for pod to be in succeeded state + await waitForPodsPhase({ + k8sApi: k8s, + namespace, + podNames, + phase: "Succeeded", + }); + + await podCleaner.start(); + + // Wait for pod to be deleted + await waitForPodsDeletion({ + k8sApi: k8s, + namespace, + podNames, + }); + + const metrics = podCleaner.getMetrics(); + const deletionCycles = await metrics.deletionCyclesTotal.get(); + const deletionTimestamp = await metrics.lastDeletionTimestamp.get(); + + expect(deletionCycles?.values[0]?.value).toBeGreaterThan(0); + expect(deletionTimestamp?.values[0]?.value).toBeGreaterThan(0); + } finally { + await podCleaner.stop(); + } + }, 30000); + + it("should handle different batch sizes - small", async () => { + const podCleaner = new PodCleaner({ + namespace, + k8s, + register, + batchSize: 1, + }); + + try { + // Create some pods that will succeed + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 2, + }); + + await waitForPodsPhase({ + k8sApi: k8s, + namespace, + podNames, + phase: "Succeeded", + }); + + await podCleaner.start(); + + await waitForPodsDeletion({ + k8sApi: k8s, + namespace, + podNames, + }); + + const metrics = podCleaner.getMetrics(); + const cycles = await metrics.deletionCyclesTotal.get(); + + expect(cycles?.values[0]?.value).toBe(2); + } finally { + await podCleaner.stop(); + } + }, 30000); + + it("should handle different batch sizes - large", async () => { + const podCleaner = new PodCleaner({ + namespace, + k8s, + register, + batchSize: 5000, + }); + + try { + // Create some pods that will succeed + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 10, + }); + + await waitForPodsPhase({ + k8sApi: k8s, + namespace, + podNames, + phase: "Succeeded", + }); + + await podCleaner.start(); + + await waitForPodsDeletion({ + k8sApi: k8s, + namespace, + podNames, + }); + + const metrics = podCleaner.getMetrics(); + const cycles = await metrics.deletionCyclesTotal.get(); + + expect(cycles?.values[0]?.value).toBe(1); + } finally { + await podCleaner.stop(); + } + }, 30000); + + it("should not delete pods without app=task-run label", async () => { + const podCleaner = new PodCleaner({ namespace, k8s, register }); + + try { + // Create a test pod without the task-run label + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 1, + labels: { app: "different-label" }, + namePrefix: "non-task-run-pod", + }); + + if (!podNames[0]) { + throw new Error("Failed to create test pod"); + } + const podName = podNames[0]; + + // Wait for pod to complete + await waitForPodPhase({ + k8sApi: k8s, + namespace, + podName, + phase: "Succeeded", + }); + + await podCleaner.start(); + + // Wait a reasonable time to ensure pod isn't deleted + await setTimeout(5000); + + // Verify pod still exists + expect(await podExists({ k8sApi: k8s, namespace, podName })).toBe(true); + } finally { + await podCleaner.stop(); + } + }, 30000); + + it("should not delete pods that are still running", async () => { + const podCleaner = new PodCleaner({ namespace, k8s, register }); + + try { + // Create a test pod with a long-running command + const podNames = await createTestPods({ + k8sApi: k8s, + namespace, + count: 1, + namePrefix: "running-pod", + command: ["sleep", "30"], // Will keep pod running + }); + + if (!podNames[0]) { + throw new Error("Failed to create test pod"); + } + const podName = podNames[0]; + + // Wait for pod to be running + await waitForPodPhase({ + k8sApi: k8s, + namespace, + podName, + phase: "Running", + }); + + await podCleaner.start(); + + // Wait a reasonable time to ensure pod isn't deleted + await setTimeout(5000); + + // Verify pod still exists + expect(await podExists({ k8sApi: k8s, namespace, podName })).toBe(true); + } finally { + await podCleaner.stop(); + } + }, 30000); +}); + +// Helper functions +async function waitForPodPhase({ + k8sApi, + namespace, + podName, + phase, + timeoutMs = 10000, + waitMs = 1000, +}: { + k8sApi: K8sApi; + namespace: string; + podName: string; + phase: string; + timeoutMs?: number; + waitMs?: number; +}) { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + const pod = await k8sApi.core.readNamespacedPod({ + namespace, + name: podName, + }); + if (pod.status?.phase === phase) { + return; + } + await setTimeout(waitMs); + } + + throw new Error(`Pod ${podName} did not reach phase ${phase} within ${timeoutMs}ms`); +} + +async function waitForPodDeletion({ + k8sApi, + namespace, + podName, + timeoutMs = 10000, + waitMs = 1000, +}: { + k8sApi: K8sApi; + namespace: string; + podName: string; + timeoutMs?: number; + waitMs?: number; +}) { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + try { + await k8sApi.core.readNamespacedPod({ + namespace, + name: podName, + }); + await setTimeout(waitMs); + } catch (error) { + // Pod was deleted + return; + } + } + + throw new Error(`Pod ${podName} was not deleted within ${timeoutMs}ms`); +} + +async function createTestPods({ + k8sApi, + namespace, + count, + labels = { app: "task-run" }, + shouldFail = false, + namePrefix = "test-pod", + command = ["/bin/sh", "-c", shouldFail ? "exit 1" : "exit 0"], +}: { + k8sApi: K8sApi; + namespace: string; + count: number; + labels?: Record; + shouldFail?: boolean; + namePrefix?: string; + command?: string[]; +}) { + const createdPods: string[] = []; + + for (let i = 0; i < count; i++) { + const podName = `${namePrefix}-${i}`; + await k8sApi.core.createNamespacedPod({ + namespace, + body: { + metadata: { + name: podName, + labels, + }, + spec: { + restartPolicy: "Never", + containers: [ + { + name: "test", + image: "busybox:1.37.0", + command, + }, + ], + }, + }, + }); + createdPods.push(podName); + } + + return createdPods; +} + +async function waitForPodsPhase({ + k8sApi, + namespace, + podNames, + phase, + timeoutMs = 10000, + waitMs = 1000, +}: { + k8sApi: K8sApi; + namespace: string; + podNames: string[]; + phase: "Pending" | "Running" | "Succeeded" | "Failed" | "Unknown"; + timeoutMs?: number; + waitMs?: number; +}) { + const startTime = Date.now(); + const pendingPods = new Set(podNames); + + while (pendingPods.size > 0 && Date.now() - startTime < timeoutMs) { + const pods = await k8sApi.core.listNamespacedPod({ namespace }); + + for (const pod of pods.items) { + if (pendingPods.has(pod.metadata?.name ?? "") && pod.status?.phase === phase) { + pendingPods.delete(pod.metadata?.name ?? ""); + } + } + + if (pendingPods.size > 0) { + await setTimeout(waitMs); + } + } + + if (pendingPods.size > 0) { + throw new Error( + `Pods [${Array.from(pendingPods).join( + ", " + )}] did not reach phase ${phase} within ${timeoutMs}ms` + ); + } +} + +async function waitForPodsDeletion({ + k8sApi, + namespace, + podNames, + timeoutMs = 10000, + waitMs = 1000, +}: { + k8sApi: K8sApi; + namespace: string; + podNames: string[]; + timeoutMs?: number; + waitMs?: number; +}) { + const startTime = Date.now(); + const pendingPods = new Set(podNames); + + while (pendingPods.size > 0 && Date.now() - startTime < timeoutMs) { + const pods = await k8sApi.core.listNamespacedPod({ namespace }); + const existingPods = new Set(pods.items.map((pod) => pod.metadata?.name ?? "")); + + for (const podName of pendingPods) { + if (!existingPods.has(podName)) { + pendingPods.delete(podName); + } + } + + if (pendingPods.size > 0) { + await setTimeout(waitMs); + } + } + + if (pendingPods.size > 0) { + throw new Error( + `Pods [${Array.from(pendingPods).join(", ")}] were not deleted within ${timeoutMs}ms` + ); + } +} + +async function podExists({ + k8sApi, + namespace, + podName, +}: { + k8sApi: K8sApi; + namespace: string; + podName: string; +}) { + const pods = await k8sApi.core.listNamespacedPod({ namespace }); + return pods.items.some((p) => p.metadata?.name === podName); +} diff --git a/apps/supervisor/src/services/podCleaner.ts b/apps/supervisor/src/services/podCleaner.ts new file mode 100644 index 0000000000..e39a98cfbe --- /dev/null +++ b/apps/supervisor/src/services/podCleaner.ts @@ -0,0 +1,118 @@ +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; +import { K8sApi } from "../clients/kubernetes.js"; +import { createK8sApi } from "../clients/kubernetes.js"; +import { HeartbeatService } from "@trigger.dev/core/v3"; +import { Counter, Gauge, Registry } from "prom-client"; +import { register } from "../metrics.js"; + +export type PodCleanerOptions = { + namespace: string; + k8s?: K8sApi; + register?: Registry; + batchSize?: number; + intervalMs?: number; +}; + +export class PodCleaner { + private readonly logger = new SimpleStructuredLogger("pod-cleaner"); + private readonly k8s: K8sApi; + private readonly namespace: string; + + private readonly batchSize: number; + private readonly deletionHeartbeat: HeartbeatService; + + // Metrics + private readonly register: Registry; + private readonly deletionCyclesTotal: Counter; + private readonly lastDeletionTimestamp: Gauge; + + constructor(opts: PodCleanerOptions) { + this.k8s = opts.k8s ?? createK8sApi(); + + this.namespace = opts.namespace; + this.batchSize = opts.batchSize ?? 500; + + this.deletionHeartbeat = new HeartbeatService({ + intervalMs: opts.intervalMs ?? 10000, + leadingEdge: true, + heartbeat: this.deleteCompletedPods.bind(this), + }); + + // Initialize metrics + this.register = opts.register ?? register; + + this.deletionCyclesTotal = new Counter({ + name: "pod_cleaner_deletion_cycles_total", + help: "Total number of pod deletion cycles run", + labelNames: ["namespace", "status", "batch_size"], + registers: [this.register], + }); + + this.lastDeletionTimestamp = new Gauge({ + name: "pod_cleaner_last_deletion_timestamp", + help: "Timestamp of the last deletion cycle", + labelNames: ["namespace"], + registers: [this.register], + }); + } + + async start() { + this.deletionHeartbeat.start(); + } + + async stop() { + this.deletionHeartbeat.stop(); + } + + private async deleteCompletedPods() { + let continuationToken: string | undefined; + + do { + try { + const result = await this.k8s.core.deleteCollectionNamespacedPod({ + namespace: this.namespace, + labelSelector: "app=task-run", + fieldSelector: "status.phase=Succeeded", + limit: this.batchSize, + _continue: continuationToken, + gracePeriodSeconds: 0, + propagationPolicy: "Background", + timeoutSeconds: 30, + }); + + // Update continuation token for next batch + continuationToken = result.metadata?._continue; + + // Increment the deletion cycles counter + this.deletionCyclesTotal.inc({ + namespace: this.namespace, + batch_size: this.batchSize, + status: "succeeded", + }); + + this.logger.info("Deleted batch of pods", { continuationToken }); + } catch (err) { + this.logger.error("Failed to delete batch of pods", { + err: err instanceof Error ? err.message : String(err), + }); + + this.deletionCyclesTotal.inc({ + namespace: this.namespace, + batch_size: this.batchSize, + status: "failed", + }); + break; + } + } while (continuationToken); + + this.lastDeletionTimestamp.set({ namespace: this.namespace }, Date.now()); + } + + // Method to expose metrics for testing + public getMetrics() { + return { + deletionCyclesTotal: this.deletionCyclesTotal, + lastDeletionTimestamp: this.lastDeletionTimestamp, + }; + } +} diff --git a/package.json b/package.json index 45f5d91bb6..f3d3ae6c99 100644 --- a/package.json +++ b/package.json @@ -78,7 +78,8 @@ "engine.io-parser@5.2.2": "patches/engine.io-parser@5.2.2.patch", "graphile-worker@0.16.6": "patches/graphile-worker@0.16.6.patch", "redlock@5.0.0-beta.2": "patches/redlock@5.0.0-beta.2.patch", - "supports-hyperlinks@2.3.0": "patches/supports-hyperlinks@2.3.0.patch" + "supports-hyperlinks@2.3.0": "patches/supports-hyperlinks@2.3.0.patch", + "@kubernetes/client-node@1.0.0": "patches/@kubernetes__client-node@1.0.0.patch" } } } \ No newline at end of file diff --git a/patches/@kubernetes__client-node@1.0.0.patch b/patches/@kubernetes__client-node@1.0.0.patch new file mode 100644 index 0000000000..60f7106a17 --- /dev/null +++ b/patches/@kubernetes__client-node@1.0.0.patch @@ -0,0 +1,53 @@ +diff --git a/dist/cache.js b/dist/cache.js +index d58eea314385e5b5dea1f5d9104ba9446deb6364..3554cef6ba6cd1c8186bf0fa880eab2c5a4f3289 100644 +--- a/dist/cache.js ++++ b/dist/cache.js +@@ -1,11 +1,12 @@ + import { ADD, CHANGE, CONNECT, DELETE, ERROR, UPDATE, } from './informer.js'; + import { ObjectSerializer } from './serializer.js'; + export class ListWatch { +- constructor(path, watch, listFn, autoStart = true, labelSelector) { ++ constructor(path, watch, listFn, autoStart = true, labelSelector, fieldSelector) { + this.path = path; + this.watch = watch; + this.listFn = listFn; + this.labelSelector = labelSelector; ++ this.fieldSelector = fieldSelector; + this.objects = new Map(); + this.indexCache = {}; + this.callbackCache = {}; +@@ -113,6 +114,9 @@ export class ListWatch { + if (this.labelSelector !== undefined) { + queryParams.labelSelector = ObjectSerializer.serialize(this.labelSelector, 'string'); + } ++ if (this.fieldSelector !== undefined) { ++ queryParams.fieldSelector = ObjectSerializer.serialize(this.fieldSelector, 'string'); ++ } + this.request = await this.watch.watch(this.path, queryParams, this.watchHandler.bind(this), this.doneHandler.bind(this)); + } + addOrUpdateItems(items) { +diff --git a/dist/informer.d.ts b/dist/informer.d.ts +index 903de9078b4b11a5fd933802be042e1a4f966079..4177cf12b5705a373def8f22d235416cd82114c6 100644 +--- a/dist/informer.d.ts ++++ b/dist/informer.d.ts +@@ -16,4 +16,4 @@ export interface Informer { + start(): Promise; + stop(): Promise; + } +-export declare function makeInformer(kubeconfig: KubeConfig, path: string, listPromiseFn: ListPromise, labelSelector?: string): Informer; ++export declare function makeInformer(kubeconfig: KubeConfig, path: string, listPromiseFn: ListPromise, labelSelector?: string, fieldSelector?: string): Informer; +diff --git a/dist/informer.js b/dist/informer.js +index 0dc471fc3e2d8e653a4c4795211b4774563d2d14..ae33a4357308b77bfb4e7588807e9ac0d7643ba3 100644 +--- a/dist/informer.js ++++ b/dist/informer.js +@@ -9,8 +9,8 @@ export const DELETE = 'delete'; + export const CONNECT = 'connect'; + // This is issued when there is an error + export const ERROR = 'error'; +-export function makeInformer(kubeconfig, path, listPromiseFn, labelSelector) { ++export function makeInformer(kubeconfig, path, listPromiseFn, labelSelector, fieldSelector) { + const watch = new Watch(kubeconfig); +- return new ListWatch(path, watch, listPromiseFn, false, labelSelector); ++ return new ListWatch(path, watch, listPromiseFn, false, labelSelector, fieldSelector); + } + //# sourceMappingURL=informer.js.map diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9eca9dd3e5..fee2e33f9c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,6 +8,9 @@ patchedDependencies: '@changesets/assemble-release-plan@5.2.4': hash: 3wuhjtl4hjck4itk3w32z4cd5u path: patches/@changesets__assemble-release-plan@5.2.4.patch + '@kubernetes/client-node@1.0.0': + hash: s75bgwaoixupmywtvgoy5ruszq + path: patches/@kubernetes__client-node@1.0.0.patch engine.io-parser@5.2.2: hash: e6nctogrhpxoivwiwy37ersfu4 path: patches/engine.io-parser@5.2.2.patch @@ -154,7 +157,7 @@ importers: dependencies: '@kubernetes/client-node': specifier: ^1.0.0 - version: 1.0.0 + version: 1.0.0(patch_hash=s75bgwaoixupmywtvgoy5ruszq) '@trigger.dev/core': specifier: workspace:* version: link:../../packages/core @@ -164,6 +167,9 @@ importers: nanoid: specifier: ^5.0.9 version: 5.1.2 + prom-client: + specifier: ^15.1.0 + version: 15.1.0 socket.io: specifier: 4.7.4 version: 4.7.4 @@ -8278,7 +8284,7 @@ packages: - utf-8-validate dev: false - /@kubernetes/client-node@1.0.0: + /@kubernetes/client-node@1.0.0(patch_hash=s75bgwaoixupmywtvgoy5ruszq): resolution: {integrity: sha512-a8NSvFDSHKFZ0sR1hbPSf8IDFNJwctEU5RodSCNiq/moRXWmrdmqhb1RRQzF+l+TSBaDgHw3YsYNxxE92STBzw==} dependencies: '@types/js-yaml': 4.0.9 @@ -8304,6 +8310,7 @@ packages: - encoding - utf-8-validate dev: false + patched: true /@lezer/common@1.0.2: resolution: {integrity: sha512-SVgiGtMnMnW3ActR8SXgsDhw7a0w0ChHSYAyAUxxrOiJ1OqYWEKk/xJd84tTSPo1mo6DXLObAJALNnd0Hrv7Ng==}