diff --git a/apps/kubernetes-provider/src/index.ts b/apps/kubernetes-provider/src/index.ts index a5a6cf55f8..915b368d92 100644 --- a/apps/kubernetes-provider/src/index.ts +++ b/apps/kubernetes-provider/src/index.ts @@ -17,6 +17,8 @@ import { import { PodCleaner } from "./podCleaner"; import { TaskMonitor } from "./taskMonitor"; import { UptimeHeartbeat } from "./uptimeHeartbeat"; +import { assertExhaustive } from "@trigger.dev/core"; +import { CustomLabelHelper } from "./labelHelper"; const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local"; const NODE_NAME = process.env.NODE_NAME || "local"; @@ -37,7 +39,14 @@ const UPTIME_MAX_PENDING_ERRORS = Number(process.env.UPTIME_MAX_PENDING_ERRORS | const POD_EPHEMERAL_STORAGE_SIZE_LIMIT = process.env.POD_EPHEMERAL_STORAGE_SIZE_LIMIT || "10Gi"; const POD_EPHEMERAL_STORAGE_SIZE_REQUEST = process.env.POD_EPHEMERAL_STORAGE_SIZE_REQUEST || "2Gi"; +// Image config const PRE_PULL_DISABLED = process.env.PRE_PULL_DISABLED === "true"; +const ADDITIONAL_PULL_SECRETS = process.env.ADDITIONAL_PULL_SECRETS; +const PAUSE_IMAGE = process.env.PAUSE_IMAGE || "registry.k8s.io/pause:3.9"; +const BUSYBOX_IMAGE = process.env.BUSYBOX_IMAGE || "registry.digitalocean.com/trigger/busybox"; +const DEPLOYMENT_IMAGE_PREFIX = process.env.DEPLOYMENT_IMAGE_PREFIX; +const RESTORE_IMAGE_PREFIX = process.env.RESTORE_IMAGE_PREFIX; +const UTILITY_IMAGE_PREFIX = process.env.UTILITY_IMAGE_PREFIX; const logger = new SimpleLogger(`[${NODE_NAME}]`); logger.log(`running in ${RUNTIME_ENV} mode`); @@ -65,6 +74,8 @@ class KubernetesTaskOperations implements TaskOperations { apps: k8s.AppsV1Api; }; + #labelHelper = new CustomLabelHelper(); + constructor(opts: { namespace?: string } = {}) { if (opts.namespace) { this.#namespace.metadata.name = opts.namespace; @@ -103,7 +114,7 @@ class KubernetesTaskOperations implements TaskOperations { containers: [ { name: this.#getIndexContainerName(opts.shortCode), - image: opts.imageRef, + image: getImageRef("deployment", opts.imageRef), ports: [ { containerPort: 8000, @@ -157,6 +168,7 @@ class KubernetesTaskOperations implements TaskOperations { name: containerName, namespace: this.#namespace.metadata.name, labels: { + ...this.#labelHelper.getAdditionalLabels("create"), ...this.#getSharedLabels(opts), app: "task-run", "app.kubernetes.io/part-of": "trigger-worker", @@ -170,7 +182,7 @@ class KubernetesTaskOperations implements TaskOperations { containers: [ { name: containerName, - image: opts.image, + image: getImageRef("deployment", opts.image), ports: [ { containerPort: 8000, @@ -218,6 +230,7 @@ class KubernetesTaskOperations implements TaskOperations { name: `${this.#getRunContainerName(opts.runId)}-${opts.checkpointId.slice(-8)}`, namespace: this.#namespace.metadata.name, labels: { + ...this.#labelHelper.getAdditionalLabels("restore"), ...this.#getSharedLabels(opts), app: "task-run", "app.kubernetes.io/part-of": "trigger-worker", @@ -231,12 +244,12 @@ class KubernetesTaskOperations implements TaskOperations { initContainers: [ { name: "pull-base-image", - image: opts.imageRef, + image: getImageRef("deployment", opts.imageRef), command: ["sleep", "0"], }, { name: "populate-taskinfo", - image: "registry.digitalocean.com/trigger/busybox", + image: getImageRef("utility", BUSYBOX_IMAGE), imagePullPolicy: "IfNotPresent", command: ["/bin/sh", "-c"], args: ["printenv COORDINATOR_HOST | tee /etc/taskinfo/coordinator-host"], @@ -252,7 +265,7 @@ class KubernetesTaskOperations implements TaskOperations { containers: [ { name: this.#getRunContainerName(opts.runId), - image: opts.checkpointRef, + image: getImageRef("restore", opts.checkpointRef), ports: [ { containerPort: 8000, @@ -358,7 +371,7 @@ class KubernetesTaskOperations implements TaskOperations { initContainers: [ { name: "prepull", - image: opts.imageRef, + image: getImageRef("deployment", opts.imageRef), command: ["/usr/bin/true"], resources: { limits: { @@ -372,7 +385,7 @@ class KubernetesTaskOperations implements TaskOperations { containers: [ { name: "pause", - image: "registry.k8s.io/pause:3.9", + image: getImageRef("utility", PAUSE_IMAGE), resources: { limits: { cpu: "1m", @@ -403,17 +416,20 @@ class KubernetesTaskOperations implements TaskOperations { } get #defaultPodSpec(): Omit { + const pullSecrets = ["registry-trigger", "registry-trigger-failover"]; + + if (ADDITIONAL_PULL_SECRETS) { + pullSecrets.push(...ADDITIONAL_PULL_SECRETS.split(",")); + } + + const imagePullSecrets = pullSecrets.map( + (name) => ({ name }) satisfies k8s.V1LocalObjectReference + ); + return { restartPolicy: "Never", automountServiceAccountToken: false, - imagePullSecrets: [ - { - name: "registry-trigger", - }, - { - name: "registry-trigger-failover", - }, - ], + imagePullSecrets, nodeSelector: { nodetype: "worker", }, @@ -673,6 +689,26 @@ class KubernetesTaskOperations implements TaskOperations { } } +type ImageType = "deployment" | "restore" | "utility"; + +function getImagePrefix(type: ImageType) { + switch (type) { + case "deployment": + return DEPLOYMENT_IMAGE_PREFIX; + case "restore": + return RESTORE_IMAGE_PREFIX; + case "utility": + return UTILITY_IMAGE_PREFIX; + default: + assertExhaustive(type); + } +} + +function getImageRef(type: ImageType, ref: string) { + const prefix = getImagePrefix(type); + return prefix ? `${prefix}/${ref}` : ref; +} + const provider = new ProviderShell({ tasks: new KubernetesTaskOperations({ namespace: KUBERNETES_NAMESPACE, diff --git a/apps/kubernetes-provider/src/labelHelper.ts b/apps/kubernetes-provider/src/labelHelper.ts new file mode 100644 index 0000000000..98cd3d68be --- /dev/null +++ b/apps/kubernetes-provider/src/labelHelper.ts @@ -0,0 +1,153 @@ +import { assertExhaustive } from "@trigger.dev/core"; + +const CREATE_LABEL_ENV_VAR_PREFIX = "DEPLOYMENT_LABEL_"; +const RESTORE_LABEL_ENV_VAR_PREFIX = "RESTORE_LABEL_"; +const LABEL_SAMPLE_RATE_POSTFIX = "_SAMPLE_RATE"; + +type OperationType = "create" | "restore"; + +type CustomLabel = { + key: string; + value: string; + sampleRate: number; +}; + +export class CustomLabelHelper { + // Labels and sample rates are defined in environment variables so only need to be computed once + private createLabels?: CustomLabel[]; + private restoreLabels?: CustomLabel[]; + + private getLabelPrefix(type: OperationType) { + const prefix = type === "create" ? CREATE_LABEL_ENV_VAR_PREFIX : RESTORE_LABEL_ENV_VAR_PREFIX; + return prefix.toLowerCase(); + } + + private getLabelSampleRatePostfix() { + return LABEL_SAMPLE_RATE_POSTFIX.toLowerCase(); + } + + // Can only range from 0 to 1 + private fractionFromPercent(percent: number) { + return Math.min(1, Math.max(0, percent / 100)); + } + + private isLabelSampleRateEnvVar(key: string) { + return key.toLowerCase().endsWith(this.getLabelSampleRatePostfix()); + } + + private isLabelEnvVar(type: OperationType, key: string) { + const prefix = this.getLabelPrefix(type); + return key.toLowerCase().startsWith(prefix) && !this.isLabelSampleRateEnvVar(key); + } + + private getSampleRateEnvVarKey(type: OperationType, envKey: string) { + return `${envKey.toLowerCase()}${this.getLabelSampleRatePostfix()}`; + } + + private getLabelNameFromEnvVarKey(type: OperationType, key: string) { + return key + .slice(this.getLabelPrefix(type).length) + .toLowerCase() + .replace(/___/g, ".") + .replace(/__/g, "/") + .replace(/_/g, "-"); + } + + private getCaseInsensitiveEnvValue(key: string) { + for (const [envKey, value] of Object.entries(process.env)) { + if (envKey.toLowerCase() === key.toLowerCase()) { + return value; + } + } + } + + /** Returns the sample rate for a given label as fraction of 100 */ + private getSampleRateFromEnvVarKey(type: OperationType, envKey: string) { + // Apply default: always sample + const DEFAULT_SAMPLE_RATE_PERCENT = 100; + const defaultSampleRateFraction = this.fractionFromPercent(DEFAULT_SAMPLE_RATE_PERCENT); + + const value = this.getCaseInsensitiveEnvValue(this.getSampleRateEnvVarKey(type, envKey)); + + if (!value) { + return defaultSampleRateFraction; + } + + const sampleRatePercent = parseFloat(value || String(DEFAULT_SAMPLE_RATE_PERCENT)); + + if (isNaN(sampleRatePercent)) { + return defaultSampleRateFraction; + } + + const fractionalSampleRate = this.fractionFromPercent(sampleRatePercent); + + return fractionalSampleRate; + } + + private getCustomLabels(type: OperationType): CustomLabel[] { + switch (type) { + case "create": + if (this.createLabels) { + return this.createLabels; + } + break; + case "restore": + if (this.restoreLabels) { + return this.restoreLabels; + } + break; + default: + assertExhaustive(type); + } + + const customLabels: CustomLabel[] = []; + + for (const [envKey, value] of Object.entries(process.env)) { + const key = envKey.toLowerCase(); + + // Only process env vars that start with the expected prefix + if (!this.isLabelEnvVar(type, key)) { + continue; + } + + // Skip sample rates - deal with them separately + if (this.isLabelSampleRateEnvVar(key)) { + continue; + } + + const labelName = this.getLabelNameFromEnvVarKey(type, key); + const sampleRate = this.getSampleRateFromEnvVarKey(type, key); + + const label = { + key: labelName, + value: value || "", + sampleRate, + } satisfies CustomLabel; + + customLabels.push(label); + } + + return customLabels; + } + + getAdditionalLabels(type: OperationType): Record { + const labels = this.getCustomLabels(type); + + const additionalLabels: Record = {}; + + for (const { key, value, sampleRate } of labels) { + // Always apply label if sample rate is 1 + if (sampleRate === 1) { + additionalLabels[key] = value; + continue; + } + + if (Math.random() <= sampleRate) { + additionalLabels[key] = value; + continue; + } + } + + return additionalLabels; + } +} diff --git a/apps/kubernetes-provider/tsconfig.json b/apps/kubernetes-provider/tsconfig.json index 661823ef74..057952409b 100644 --- a/apps/kubernetes-provider/tsconfig.json +++ b/apps/kubernetes-provider/tsconfig.json @@ -8,6 +8,8 @@ "strict": true, "skipLibCheck": true, "paths": { + "@trigger.dev/core": ["../../packages/core/src"], + "@trigger.dev/core/*": ["../../packages/core/src/*"], "@trigger.dev/core/v3": ["../../packages/core/src/v3"], "@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"] }