diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 2b5547f3a8..e738177cbc 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -5,6 +5,7 @@ import { type WorkloadManagerOptions, } from "./types.js"; import type { EnvironmentType, MachinePreset, PlacementTag } from "@trigger.dev/core/v3"; +import { PlacementTagProcessor } from "@trigger.dev/core/v3/serverOnly"; import { env } from "../env.js"; import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js"; import { getRunnerId } from "../util.js"; @@ -13,18 +14,18 @@ type ResourceQuantities = { [K in "cpu" | "memory" | "ephemeral-storage"]?: string; }; -interface PlacementConfig { - enabled: boolean; - prefix: string; -} - export class KubernetesWorkloadManager implements WorkloadManager { private readonly logger = new SimpleStructuredLogger("kubernetes-workload-provider"); private k8s: K8sApi; private namespace = env.KUBERNETES_NAMESPACE; + private placementTagProcessor: PlacementTagProcessor; constructor(private opts: WorkloadManagerOptions) { this.k8s = createK8sApi(); + this.placementTagProcessor = new PlacementTagProcessor({ + enabled: env.PLACEMENT_TAGS_ENABLED, + prefix: env.PLACEMENT_TAGS_PREFIX, + }); if (opts.workloadApiDomain) { this.logger.warn("[KubernetesWorkloadManager] ⚠️ Custom workload API domain", { @@ -33,33 +34,14 @@ export class KubernetesWorkloadManager implements WorkloadManager { } } - private get placementConfig(): PlacementConfig { - return { - enabled: env.PLACEMENT_TAGS_ENABLED, - prefix: env.PLACEMENT_TAGS_PREFIX, - }; - } - private addPlacementTags( podSpec: Omit, placementTags?: PlacementTag[] ): Omit { - if (!this.placementConfig.enabled || !placementTags || placementTags.length === 0) { - return podSpec; - } - - const nodeSelector: Record = { ...podSpec.nodeSelector }; - - // Convert placement tags to nodeSelector labels - for (const tag of placementTags) { - const labelKey = `${this.placementConfig.prefix}/${tag.key}`; - - // Print warnings (if any) - this.printTagWarnings(tag); - - // For now we only support single values via nodeSelector - nodeSelector[labelKey] = tag.values?.[0] ?? ""; - } + const nodeSelector = this.placementTagProcessor.convertToNodeSelector( + placementTags, + podSpec.nodeSelector + ); return { ...podSpec, @@ -67,22 +49,6 @@ export class KubernetesWorkloadManager implements WorkloadManager { }; } - private printTagWarnings(tag: PlacementTag) { - if (!tag.values || tag.values.length === 0) { - // No values provided - this.logger.warn( - "[KubernetesWorkloadManager] Placement tag has no values, using empty string", - tag - ); - } else if (tag.values.length > 1) { - // Multiple values provided - this.logger.warn( - "[KubernetesWorkloadManager] Placement tag has multiple values, only using first one", - tag - ); - } - } - async create(opts: WorkloadManagerCreateOptions) { this.logger.log("[KubernetesWorkloadManager] Creating container", { opts }); diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 4e41e8fc6f..85683c5a17 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -1,7 +1,8 @@ import type { BillingCache } from "../billingCache.js"; import { startSpan } from "@internal/tracing"; import { assertExhaustive } from "@trigger.dev/core"; -import { DequeuedMessage, RetryOptions, placementTag } from "@trigger.dev/core/v3"; +import { DequeuedMessage, RetryOptions } from "@trigger.dev/core/v3"; +import { placementTag } from "@trigger.dev/core/v3/serverOnly"; import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic"; import { PrismaClientOrTransaction } from "@trigger.dev/database"; import { getRunWithBackgroundWorkerTasks } from "../db/worker.js"; diff --git a/packages/core/src/v3/schemas/runEngine.ts b/packages/core/src/v3/schemas/runEngine.ts index 5bfd8fe1d7..bf84edde3b 100644 --- a/packages/core/src/v3/schemas/runEngine.ts +++ b/packages/core/src/v3/schemas/runEngine.ts @@ -230,11 +230,6 @@ export const PlacementTag = z.object({ }); export type PlacementTag = z.infer; -/** Helper functions for placement tags. In the future this will be able to support multiple values and operators. For now it's just a single value. */ -export function placementTag(key: string, value: string): PlacementTag { - return { key, values: [value] }; -} - /** This is sent to a Worker when a run is dequeued (a new run or continuing run) */ export const DequeuedMessage = z.object({ version: z.literal("1"), diff --git a/packages/core/src/v3/serverOnly/index.ts b/packages/core/src/v3/serverOnly/index.ts index 3b28ab95fb..10d915a5d3 100644 --- a/packages/core/src/v3/serverOnly/index.ts +++ b/packages/core/src/v3/serverOnly/index.ts @@ -6,3 +6,4 @@ export * from "./shutdownManager.js"; export * from "./k8s.js"; export * from "./jumpHash.js"; export * from "../apiClient/version.js"; +export * from "./placementTags.js"; diff --git a/packages/core/src/v3/serverOnly/placementTags.ts b/packages/core/src/v3/serverOnly/placementTags.ts new file mode 100644 index 0000000000..244aad5be0 --- /dev/null +++ b/packages/core/src/v3/serverOnly/placementTags.ts @@ -0,0 +1,58 @@ +import { type PlacementTag } from "../schemas/index.js"; +import { SimpleStructuredLogger } from "../utils/structuredLogger.js"; + +export interface PlacementConfig { + enabled: boolean; + prefix: string; +} + +export class PlacementTagProcessor { + private readonly logger = new SimpleStructuredLogger("placement-tag-processor"); + + constructor(private readonly config: PlacementConfig) {} + + /** + * Converts placement tags to Kubernetes nodeSelector labels + */ + convertToNodeSelector( + placementTags?: PlacementTag[], + existingNodeSelector?: Record + ): Record { + if (!this.config.enabled || !placementTags || placementTags.length === 0) { + return existingNodeSelector ?? {}; + } + + const nodeSelector: Record = { ...existingNodeSelector }; + + // Convert placement tags to nodeSelector labels + for (const tag of placementTags) { + const labelKey = `${this.config.prefix}/${tag.key}`; + + // Print warnings (if any) + this.printTagWarnings(tag); + + // For now we only support single values via nodeSelector + nodeSelector[labelKey] = tag.values?.[0] ?? ""; + } + + return nodeSelector; + } + + private printTagWarnings(tag: PlacementTag) { + if (!tag.values || tag.values.length === 0) { + // No values provided + this.logger.warn("Placement tag has no values, using empty string", tag); + } else if (tag.values.length > 1) { + // Multiple values provided + this.logger.warn("Placement tag has multiple values, only using first one", tag); + } + } +} + +/** + * Helper function to create a placement tag. In the future this will be able to support multiple values and operators. + * For now it's just a single value. + */ +export function placementTag(key: string, value: string): PlacementTag { + return { key, values: [value] }; +}