Skip to content
99 changes: 63 additions & 36 deletions apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
import * as k8s from "@kubernetes/client-node";
import {
EnvironmentType,
MachinePreset,
PostStartCauses,
PreStopCauses,
} from "@trigger.dev/core/v3";
import {
ProviderShell,
SimpleLogger,
TaskOperations,
TaskOperationsCreateOptions,
TaskOperationsIndexOptions,
TaskOperationsPrePullDeploymentOptions,
TaskOperationsRestoreOptions,
} from "@trigger.dev/core/v3/apps";
import { SimpleLogger } from "@trigger.dev/core/v3/apps";
import {
MachinePreset,
PostStartCauses,
PreStopCauses,
EnvironmentType,
} from "@trigger.dev/core/v3";
import { TaskMonitor } from "./taskMonitor";
import { PodCleaner } from "./podCleaner";
import { TaskMonitor } from "./taskMonitor";
import { UptimeHeartbeat } from "./uptimeHeartbeat";

const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
const NODE_NAME = process.env.NODE_NAME || "local";
const OTEL_EXPORTER_OTLP_ENDPOINT =
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318";
const COORDINATOR_HOST = process.env.COORDINATOR_HOST ?? undefined;
const COORDINATOR_PORT = process.env.COORDINATOR_PORT ?? undefined;
const KUBERNETES_NAMESPACE = process.env.KUBERNETES_NAMESPACE ?? "default";

const POD_CLEANER_INTERVAL_SECONDS = Number(process.env.POD_CLEANER_INTERVAL_SECONDS || "300");

Expand All @@ -45,19 +48,22 @@ type ResourceQuantities = {
};

class KubernetesTaskOperations implements TaskOperations {
#namespace: Namespace;
#namespace: Namespace = {
metadata: {
name: "default",
},
};

#k8sApi: {
core: k8s.CoreV1Api;
batch: k8s.BatchV1Api;
apps: k8s.AppsV1Api;
};

constructor(namespace = "default") {
this.#namespace = {
metadata: {
name: namespace,
},
};
constructor(opts: { namespace?: string } = {}) {
if (opts.namespace) {
this.#namespace.metadata.name = opts.namespace;
}

this.#k8sApi = this.#createK8sApi();
}
Expand Down Expand Up @@ -229,16 +235,7 @@ class KubernetesTaskOperations implements TaskOperations {
imagePullPolicy: "IfNotPresent",
command: ["/bin/sh", "-c"],
args: ["printenv COORDINATOR_HOST | tee /etc/taskinfo/coordinator-host"],
env: [
{
name: "COORDINATOR_HOST",
valueFrom: {
fieldRef: {
fieldPath: "status.hostIP",
},
},
},
],
env: this.#coordinatorEnvVars,
volumeMounts: [
{
name: "taskinfo",
Expand Down Expand Up @@ -409,6 +406,41 @@ class KubernetesTaskOperations implements TaskOperations {
};
}

get #coordinatorHostEnvVar(): k8s.V1EnvVar {
return COORDINATOR_HOST
? {
name: "COORDINATOR_HOST",
value: COORDINATOR_HOST,
}
: {
name: "COORDINATOR_HOST",
valueFrom: {
fieldRef: {
fieldPath: "status.hostIP",
},
},
};
}

get #coordinatorPortEnvVar(): k8s.V1EnvVar | undefined {
if (COORDINATOR_PORT) {
return {
name: "COORDINATOR_PORT",
value: COORDINATOR_PORT,
};
}
}

get #coordinatorEnvVars(): k8s.V1EnvVar[] {
const envVars = [this.#coordinatorHostEnvVar];

if (this.#coordinatorPortEnvVar) {
envVars.push(this.#coordinatorPortEnvVar);
}

return envVars;
}

#getSharedEnv(envId: string): k8s.V1EnvVar[] {
return [
{
Expand All @@ -435,14 +467,6 @@ class KubernetesTaskOperations implements TaskOperations {
},
},
},
{
name: "COORDINATOR_HOST",
valueFrom: {
fieldRef: {
fieldPath: "status.hostIP",
},
},
},
{
name: "MACHINE_NAME",
valueFrom: {
Expand All @@ -451,6 +475,7 @@ class KubernetesTaskOperations implements TaskOperations {
},
},
},
...this.#coordinatorEnvVars,
];
}

Expand Down Expand Up @@ -623,7 +648,9 @@ class KubernetesTaskOperations implements TaskOperations {
}

const provider = new ProviderShell({
tasks: new KubernetesTaskOperations(),
tasks: new KubernetesTaskOperations({
namespace: KUBERNETES_NAMESPACE,
}),
type: "kubernetes",
});

Expand Down Expand Up @@ -663,7 +690,7 @@ taskMonitor.start();

const podCleaner = new PodCleaner({
runtimeEnv: RUNTIME_ENV,
namespace: "default",
namespace: KUBERNETES_NAMESPACE,
intervalInSeconds: POD_CLEANER_INTERVAL_SECONDS,
});

Expand All @@ -672,7 +699,7 @@ podCleaner.start();
if (UPTIME_HEARTBEAT_URL) {
const uptimeHeartbeat = new UptimeHeartbeat({
runtimeEnv: RUNTIME_ENV,
namespace: "default",
namespace: KUBERNETES_NAMESPACE,
intervalInSeconds: UPTIME_INTERVAL_SECONDS,
pingUrl: UPTIME_HEARTBEAT_URL,
maxPendingRuns: UPTIME_MAX_PENDING_RUNS,
Expand Down