Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 10 additions & 44 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type WorkloadManagerOptions,
} from "./types.js";
import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3";
import { PlacementTagProcessor } from "@trigger.dev/core/v3";
import { env } from "../env.js";
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
import { getRunnerId } from "../util.js";
Expand All @@ -13,18 +14,18 @@ type ResourceQuantities = {
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
};

interface PlacementConfig {
enabled: boolean;
prefix: string;
}

export class KubernetesWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider");
private k8s: K8sApi;
private namespace = env.KUBERNETES_NAMESPACE;
private placementTagProcessor: PlacementTagProcessor;

constructor(private opts: WorkloadManagerOptions) {
this.k8s = createK8sApi();
this.placementTagProcessor = new PlacementTagProcessor({
enabled: env.PLACEMENT_TAGS_ENABLED,
prefix: env.PLACEMENT_TAGS_PREFIX,
});

Comment on lines +21 to 29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Make the processor field readonly; ensure env prefix is set when enabled

  • Marking the field readonly communicates intent and prevents accidental reassignment.
  • If PLACEMENT_TAGS_ENABLED is true but the prefix is missing or has a trailing slash, label keys could be malformed unless normalized in the processor (recommended). Validate or normalize centrally.
-  private placementTagProcessor: PlacementTagProcessor;
+  private readonly placementTagProcessor: PlacementTagProcessor;

To verify env wiring and defaults:


🏁 Script executed:

#!/bin/bash
# Check env.ts/js for PLACEMENT_TAGS_ENABLED/PLACEMENT_TAGS_PREFIX definitions and defaults
fd -t f -g '*env.*' apps | xargs -I{} rg -n -C2 'PLACEMENT_TAGS_(ENABLED|PREFIX)' {}

Length of output: 272


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "---- Search: placementTagProcessor usages ----"
rg -n --hidden -S 'placementTagProcessor' || true

echo
echo "---- Search: PlacementTagProcessor symbols ----"
rg -n --hidden -S 'PlacementTagProcessor' || true

echo
echo "---- Search: PLACEMENT_TAGS_* env defs ----"
rg -n --hidden -S 'PLACEMENT_TAGS_(ENABLED|PREFIX)' || true

echo
echo "---- Show files that define PLACEMENT_TAGS_PREFIX (first 200 lines) ----"
files=$(rg -l --hidden -S 'PLACEMENT_TAGS_PREFIX' || true)
if [ -n "$files" ]; then
  for f in $files; do
    echo "---- File: $f ----"
    sed -n '1,200p' "$f"
    echo
  done
else
  echo "No files found defining PLACEMENT_TAGS_PREFIX"
fi

echo
echo "---- Show files that define PlacementTagProcessor (first 200 lines) ----"
files2=$(rg -l --hidden -S 'class PlacementTagProcessor|function PlacementTagProcessor|new PlacementTagProcessor' || true)
if [ -n "$files2" ]; then
  for f in $files2; do
    echo "---- File: $f ----"
    sed -n '1,200p' "$f"
    echo
  done
else
  echo "No files found with obvious PlacementTagProcessor definition; showing files that reference it instead"
  rg -n --hidden -S 'PlacementTagProcessor' || true
fi

Length of output: 24031


Make placementTagProcessor readonly and normalize/validate PLACEMENT_TAGS_PREFIX

Quick summary: I verified that apps/supervisor/src/env.ts provides a default PLACEMENT_TAGS_PREFIX ("node.cluster.x-k8s.io") but PlacementTagProcessor uses the prefix directly and does not normalize or validate it. Also the field in KubernetesWorkloadManager is not readonly. Recommend making the field readonly and normalizing/validating the prefix centrally.

Files to change:

  • apps/supervisor/src/workloadManager/kubernetes.ts — mark the processor field readonly.
  • packages/core/src/v3/serverOnly/placementTags.ts — normalize (trim trailing slashes) and validate prefix when enabled; use the normalized prefix when building label keys.

Suggested diffs:

Make field readonly in KubernetesWorkloadManager:

-  private placementTagProcessor: PlacementTagProcessor;
+  private readonly placementTagProcessor: PlacementTagProcessor;

Normalize/validate prefix in PlacementTagProcessor (outline):

-export class PlacementTagProcessor {
-  private readonly logger = new SimpleStructuredLogger("placement-tag-processor");
-
-  constructor(private readonly config: PlacementConfig) {}
+export class PlacementTagProcessor {
+  private readonly logger = new SimpleStructuredLogger("placement-tag-processor");
+  private readonly config: PlacementConfig;
+  private readonly prefix: string;
+
+  constructor(config: PlacementConfig) {
+    this.config = config;
+    // normalize prefix: remove trailing slashes
+    this.prefix = (config.prefix ?? "").replace(/\/+$/, "");
+    if (config.enabled && this.prefix.length === 0) {
+      throw new Error(
+        "PlacementTagProcessor: PLACEMENT_TAGS_PREFIX must be a non-empty string when PLACEMENT_TAGS_ENABLED is true"
+      );
+    }
+  }

And replace label construction in convertToNodeSelector:

-      const labelKey = `${this.config.prefix}/${tag.key}`;
+      const labelKey = `${this.prefix}/${tag.key}`;

Note: apps/supervisor/src/env.ts currently defines:

  • PLACEMENT_TAGS_ENABLED: BoolEnv.default(false)
  • PLACEMENT_TAGS_PREFIX: z.string().default("node.cluster.x-k8s.io")

Even with that default, normalizing/validating is recommended because the prefix can be overridden via environment and a trailing slash or empty value would produce malformed label keys.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private placementTagProcessor: PlacementTagProcessor;
constructor(private opts: WorkloadManagerOptions) {
this.k8s = createK8sApi();
this.placementTagProcessor = new PlacementTagProcessor({
enabled: env.PLACEMENT_TAGS_ENABLED,
prefix: env.PLACEMENT_TAGS_PREFIX,
});
private readonly placementTagProcessor: PlacementTagProcessor;
constructor(private opts: WorkloadManagerOptions) {
this.k8s = createK8sApi();
this.placementTagProcessor = new PlacementTagProcessor({
enabled: env.PLACEMENT_TAGS_ENABLED,
prefix: env.PLACEMENT_TAGS_PREFIX,
});

if (opts.workloadApiDomain) {
this.logger.warn("[KubernetesWorkloadManager] ⚠️ Custom workload API domain", {
Expand All @@ -33,56 +34,21 @@ export class KubernetesWorkloadManager implements WorkloadManager {
}
}

private get placementConfig(): PlacementConfig {
return {
enabled: env.PLACEMENT_TAGS_ENABLED,
prefix: env.PLACEMENT_TAGS_PREFIX,
};
}

private addPlacementTags(
podSpec: Omit<k8s.V1PodSpec, "containers">,
placementTags?: PlacementTag[]
): Omit<k8s.V1PodSpec, "containers"> {
if (!this.placementConfig.enabled || !placementTags || placementTags.length === 0) {
return podSpec;
}

const nodeSelector: Record<string, string> = { ...podSpec.nodeSelector };

// Convert placement tags to nodeSelector labels
for (const tag of placementTags) {
const labelKey = `${this.placementConfig.prefix}/${tag.key}`;

// Print warnings (if any)
this.printTagWarnings(tag);

// For now we only support single values via nodeSelector
nodeSelector[labelKey] = tag.values?.[0] ?? "";
}
const nodeSelector = this.placementTagProcessor.convertToNodeSelector(
placementTags,
podSpec.nodeSelector
);

return {
...podSpec,
nodeSelector,
};
}

private printTagWarnings(tag: PlacementTag) {
if (!tag.values || tag.values.length === 0) {
// No values provided
this.logger.warn(
"[KubernetesWorkloadManager] Placement tag has no values, using empty string",
tag
);
} else if (tag.values.length > 1) {
// Multiple values provided
this.logger.warn(
"[KubernetesWorkloadManager] Placement tag has multiple values, only using first one",
tag
);
}
}

async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export {

export * from "./utils/imageRef.js";
export * from "./utils/interval.js";
export * from "./utils/placementTags.js";

export * from "./config.js";
export {
Expand Down
5 changes: 0 additions & 5 deletions packages/core/src/v3/schemas/runEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,6 @@ export const PlacementTag = z.object({
});
export type PlacementTag = z.infer<typeof PlacementTag>;

/** Helper functions for placement tags. In the future this will be able to support multiple values and operators. For now it's just a single value. */
export function placementTag(key: string, value: string): PlacementTag {
return { key, values: [value] };
}

/** This is sent to a Worker when a run is dequeued (a new run or continuing run) */
export const DequeuedMessage = z.object({
version: z.literal("1"),
Expand Down
64 changes: 64 additions & 0 deletions packages/core/src/v3/utils/placementTags.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { type PlacementTag } from "../schemas/index.js";
import { SimpleStructuredLogger } from "./structuredLogger.js";

export interface PlacementConfig {
enabled: boolean;
prefix: string;
}

export class PlacementTagProcessor {
private readonly logger = new SimpleStructuredLogger("placement-tag-processor");

constructor(private readonly config: PlacementConfig) {}

/**
* Converts placement tags to Kubernetes nodeSelector labels
*/
convertToNodeSelector(
placementTags?: PlacementTag[],
existingNodeSelector?: Record<string, string>
): Record<string, string> {
if (!this.config.enabled || !placementTags || placementTags.length === 0) {
return existingNodeSelector ?? {};
}

const nodeSelector: Record<string, string> = { ...existingNodeSelector };

// Convert placement tags to nodeSelector labels
for (const tag of placementTags) {
const labelKey = `${this.config.prefix}/${tag.key}`;

// Print warnings (if any)
this.printTagWarnings(tag);

// For now we only support single values via nodeSelector
nodeSelector[labelKey] = tag.values?.[0] ?? "";
}

return nodeSelector;
}

private printTagWarnings(tag: PlacementTag) {
if (!tag.values || tag.values.length === 0) {
// No values provided
this.logger.warn(
"Placement tag has no values, using empty string",
tag
);
} else if (tag.values.length > 1) {
// Multiple values provided
this.logger.warn(
"Placement tag has multiple values, only using first one",
tag
);
}
}
}

/**
* Helper function to create a placement tag. In the future this will be able to support multiple values and operators.
* For now it's just a single value.
*/
export function placementTag(key: string, value: string): PlacementTag {
return { key, values: [value] };
}
Loading