Skip to content
2 changes: 2 additions & 0 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const Env = z.object({
// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),

// Optional services
TRIGGER_WARM_START_URL: z.string().optional(),
Expand All @@ -50,6 +51,7 @@ const Env = z.object({
// Kubernetes specific settings
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),
KUBERNETES_NAMESPACE: z.string().default("default"),
KUBERNETES_WORKER_NODETYPE_LABEL: z.string().default("v4-worker"),
EPHEMERAL_STORAGE_SIZE_LIMIT: z.string().default("10Gi"),
EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"),

Expand Down
17 changes: 11 additions & 6 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ class ManagedSupervisor {
this.logger.warn("[ManagedWorker] Failed pod handler disabled");
}

this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
this.resourceMonitor = new KubernetesResourceMonitor(
createK8sApi(),
env.TRIGGER_WORKER_INSTANCE_NAME
);
this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions);
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
Expand All @@ -113,10 +116,11 @@ class ManagedSupervisor {
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
preDequeue: async () => {
if (this.isKubernetes) {
// TODO: Test k8s resource monitor and remove this
// Not used in k8s for now
return {};
}

Expand Down Expand Up @@ -234,10 +238,11 @@ class ManagedSupervisor {
snapshotFriendlyId: message.snapshot.friendlyId,
});

this.resourceMonitor.blockResources({
cpu: message.run.machine.cpu,
memory: message.run.machine.memory,
});
// Disabled for now
// this.resourceMonitor.blockResources({
// cpu: message.run.machine.cpu,
// memory: message.run.machine.memory,
// });
} catch (error) {
this.logger.error("[ManagedWorker] Failed to create workload", { error });
}
Expand Down
4 changes: 4 additions & 0 deletions apps/supervisor/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ export function getDockerHostDomain() {

return isMacOs || isWindows ? "host.docker.internal" : "localhost";
}

export function getRunnerId(runId: string) {
return `runner-${runId.replace("run_", "")}`;
}
6 changes: 3 additions & 3 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import {
type WorkloadManager,
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { x } from "tinyexec";
import { env } from "../env.js";
import { getDockerHostDomain } from "../util.js";
import { getDockerHostDomain, getRunnerId } from "../util.js";

export class DockerWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("docker-workload-provider");
Expand All @@ -23,7 +22,8 @@ export class DockerWorkloadManager implements WorkloadManager {
async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[DockerWorkloadProvider] Creating container", { opts });

const runnerId = RunnerId.generate();
const runnerId = getRunnerId(opts.runFriendlyId);

const runArgs = [
"run",
"--detach",
Expand Down
6 changes: 3 additions & 3 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import {
type WorkloadManagerCreateOptions,
type WorkloadManagerOptions,
} from "./types.js";
import { RunnerId } from "@trigger.dev/core/v3/isomorphic";
import type { EnvironmentType, MachinePreset } from "@trigger.dev/core/v3";
import { env } from "../env.js";
import { type K8sApi, createK8sApi, type k8s } from "../clients/kubernetes.js";
import { getRunnerId } from "../util.js";

type ResourceQuantities = {
[K in "cpu" | "memory" | "ephemeral-storage"]?: string;
Expand All @@ -31,7 +31,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[KubernetesWorkloadManager] Creating container", { opts });

const runnerId = RunnerId.generate().replace(/_/g, "-");
const runnerId = getRunnerId(opts.runFriendlyId);

try {
await this.k8s.core.createNamespacedPod({
Expand Down Expand Up @@ -217,7 +217,7 @@ export class KubernetesWorkloadManager implements WorkloadManager {
automountServiceAccountToken: false,
imagePullSecrets: this.getImagePullSecrets(),
nodeSelector: {
nodetype: "worker-re2",
nodetype: env.KUBERNETES_WORKER_NODETYPE_LABEL,
},
};
}
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { json, TypedResponse } from "@remix-run/server-runtime";
import { WorkerApiDequeueRequestBody, WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers";
import {
WorkerApiDequeueRequestBody,
WorkerApiDequeueResponseBody,
} from "@trigger.dev/core/v3/workers";
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";

export const action = createActionWorkerApiRoute(
Expand All @@ -10,6 +13,7 @@ export const action = createActionWorkerApiRoute(
return json(
await authenticatedWorker.dequeue({
maxResources: body.maxResources,
maxRunCount: body.maxRunCount,
})
);
}
Expand Down
Loading