diff --git a/apps/kubernetes-provider/src/index.ts b/apps/kubernetes-provider/src/index.ts index 3fab373f59..5e1887e92e 100644 --- a/apps/kubernetes-provider/src/index.ts +++ b/apps/kubernetes-provider/src/index.ts @@ -1,27 +1,30 @@ import * as k8s from "@kubernetes/client-node"; +import { + EnvironmentType, + MachinePreset, + PostStartCauses, + PreStopCauses, +} from "@trigger.dev/core/v3"; import { ProviderShell, + SimpleLogger, TaskOperations, TaskOperationsCreateOptions, TaskOperationsIndexOptions, TaskOperationsPrePullDeploymentOptions, TaskOperationsRestoreOptions, } from "@trigger.dev/core/v3/apps"; -import { SimpleLogger } from "@trigger.dev/core/v3/apps"; -import { - MachinePreset, - PostStartCauses, - PreStopCauses, - EnvironmentType, -} from "@trigger.dev/core/v3"; -import { TaskMonitor } from "./taskMonitor"; import { PodCleaner } from "./podCleaner"; +import { TaskMonitor } from "./taskMonitor"; import { UptimeHeartbeat } from "./uptimeHeartbeat"; const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local"; const NODE_NAME = process.env.NODE_NAME || "local"; const OTEL_EXPORTER_OTLP_ENDPOINT = process.env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318"; +const COORDINATOR_HOST = process.env.COORDINATOR_HOST ?? undefined; +const COORDINATOR_PORT = process.env.COORDINATOR_PORT ?? undefined; +const KUBERNETES_NAMESPACE = process.env.KUBERNETES_NAMESPACE ?? "default"; const POD_CLEANER_INTERVAL_SECONDS = Number(process.env.POD_CLEANER_INTERVAL_SECONDS || "300"); @@ -45,19 +48,22 @@ type ResourceQuantities = { }; class KubernetesTaskOperations implements TaskOperations { - #namespace: Namespace; + #namespace: Namespace = { + metadata: { + name: "default", + }, + }; + #k8sApi: { core: k8s.CoreV1Api; batch: k8s.BatchV1Api; apps: k8s.AppsV1Api; }; - constructor(namespace = "default") { - this.#namespace = { - metadata: { - name: namespace, - }, - }; + constructor(opts: { namespace?: string } = {}) { + if (opts.namespace) { + this.#namespace.metadata.name = opts.namespace; + } this.#k8sApi = this.#createK8sApi(); } @@ -229,16 +235,7 @@ class KubernetesTaskOperations implements TaskOperations { imagePullPolicy: "IfNotPresent", command: ["/bin/sh", "-c"], args: ["printenv COORDINATOR_HOST | tee /etc/taskinfo/coordinator-host"], - env: [ - { - name: "COORDINATOR_HOST", - valueFrom: { - fieldRef: { - fieldPath: "status.hostIP", - }, - }, - }, - ], + env: this.#coordinatorEnvVars, volumeMounts: [ { name: "taskinfo", @@ -409,6 +406,41 @@ class KubernetesTaskOperations implements TaskOperations { }; } + get #coordinatorHostEnvVar(): k8s.V1EnvVar { + return COORDINATOR_HOST + ? { + name: "COORDINATOR_HOST", + value: COORDINATOR_HOST, + } + : { + name: "COORDINATOR_HOST", + valueFrom: { + fieldRef: { + fieldPath: "status.hostIP", + }, + }, + }; + } + + get #coordinatorPortEnvVar(): k8s.V1EnvVar | undefined { + if (COORDINATOR_PORT) { + return { + name: "COORDINATOR_PORT", + value: COORDINATOR_PORT, + }; + } + } + + get #coordinatorEnvVars(): k8s.V1EnvVar[] { + const envVars = [this.#coordinatorHostEnvVar]; + + if (this.#coordinatorPortEnvVar) { + envVars.push(this.#coordinatorPortEnvVar); + } + + return envVars; + } + #getSharedEnv(envId: string): k8s.V1EnvVar[] { return [ { @@ -435,14 +467,6 @@ class KubernetesTaskOperations implements TaskOperations { }, }, }, - { - name: "COORDINATOR_HOST", - valueFrom: { - fieldRef: { - fieldPath: "status.hostIP", - }, - }, - }, { name: "MACHINE_NAME", valueFrom: { @@ -451,6 +475,7 @@ class KubernetesTaskOperations implements TaskOperations { }, }, }, + ...this.#coordinatorEnvVars, ]; } @@ -623,7 +648,9 @@ class KubernetesTaskOperations implements TaskOperations { } const provider = new ProviderShell({ - tasks: new KubernetesTaskOperations(), + tasks: new KubernetesTaskOperations({ + namespace: KUBERNETES_NAMESPACE, + }), type: "kubernetes", }); @@ -663,7 +690,7 @@ taskMonitor.start(); const podCleaner = new PodCleaner({ runtimeEnv: RUNTIME_ENV, - namespace: "default", + namespace: KUBERNETES_NAMESPACE, intervalInSeconds: POD_CLEANER_INTERVAL_SECONDS, }); @@ -672,7 +699,7 @@ podCleaner.start(); if (UPTIME_HEARTBEAT_URL) { const uptimeHeartbeat = new UptimeHeartbeat({ runtimeEnv: RUNTIME_ENV, - namespace: "default", + namespace: KUBERNETES_NAMESPACE, intervalInSeconds: UPTIME_INTERVAL_SECONDS, pingUrl: UPTIME_HEARTBEAT_URL, maxPendingRuns: UPTIME_MAX_PENDING_RUNS,