Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/lazy-panthers-shop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Improve structured logs
2 changes: 1 addition & 1 deletion apps/supervisor/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ MANAGED_WORKER_SECRET=managed-secret
# Point this at the webapp in prod
TRIGGER_API_URL=http://localhost:3030

# Point this at the OTel collector in prod
# Point this at the webapp or an OTel collector in prod
OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:3030/otel
# Use this on macOS
# OTEL_EXPORTER_OTLP_ENDPOINT=http://host.docker.internal:3030/otel
Expand Down
3 changes: 0 additions & 3 deletions apps/supervisor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
"@kubernetes/client-node": "^1.0.0",
"@trigger.dev/core": "workspace:*",
"dockerode": "^4.0.3",
"nanoid": "^5.0.9",
"prom-client": "^15.1.0",
"socket.io": "4.7.4",
"std-env": "^3.8.0",
"tinyexec": "^0.3.1",
"zod": "3.23.8"
},
"devDependencies": {
"@types/dockerode": "^3.3.33",
"docker-api-ts": "^0.2.2",
"vitest": "^1.4.0"
}
}
5 changes: 4 additions & 1 deletion apps/supervisor/src/clients/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ import { Informer } from "@kubernetes/client-node";
import { ListPromise } from "@kubernetes/client-node";
import { KubernetesObject } from "@kubernetes/client-node";
import { assertExhaustive } from "@trigger.dev/core/utils";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";

export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";

const logger = new SimpleStructuredLogger("kubernetes-client");

export function createK8sApi() {
const kubeConfig = getKubeConfig();

Expand All @@ -31,7 +34,7 @@ export function createK8sApi() {
export type K8sApi = ReturnType<typeof createK8sApi>;

function getKubeConfig() {
console.log("getKubeConfig()", { RUNTIME_ENV });
logger.debug("getKubeConfig()", { RUNTIME_ENV });

const kubeConfig = new k8s.KubeConfig();

Expand Down
5 changes: 3 additions & 2 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ const Env = z.object({
KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv

// Used by the resource monitor
OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),
OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(),
RESOURCE_MONITOR_ENABLED: BoolEnv.default(false),
RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),
RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(),

// Kubernetes specific settings
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),
Expand Down
5 changes: 4 additions & 1 deletion apps/supervisor/src/envUtil.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { z } from "zod";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";

const logger = new SimpleStructuredLogger("env-util");

export const BoolEnv = z.preprocess((val) => {
if (typeof val !== "string") {
Expand Down Expand Up @@ -33,7 +36,7 @@ export const AdditionalEnvVars = z.preprocess((val) => {
// Return undefined if no valid key-value pairs were found
return Object.keys(result).length === 0 ? undefined : result;
} catch (error) {
console.warn("Failed to parse additional env vars", { error, val });
logger.warn("Failed to parse additional env vars", { error, val });
return undefined;
}
}, z.record(z.string(), z.string()).optional());
88 changes: 46 additions & 42 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { type DequeuedMessage } from "@trigger.dev/core/v3";
import {
DockerResourceMonitor,
KubernetesResourceMonitor,
NoopResourceMonitor,
type ResourceMonitor,
} from "./resourceMonitor.js";
import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js";
Expand All @@ -33,7 +34,7 @@ class ManagedSupervisor {
private readonly metricsServer?: HttpServer;
private readonly workloadServer: WorkloadServer;
private readonly workloadManager: WorkloadManager;
private readonly logger = new SimpleStructuredLogger("managed-worker");
private readonly logger = new SimpleStructuredLogger("managed-supervisor");
private readonly resourceMonitor: ResourceMonitor;
private readonly checkpointClient?: CheckpointClient;

Expand All @@ -47,11 +48,11 @@ class ManagedSupervisor {
const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, ...envWithoutSecrets } = env;

if (env.DEBUG) {
console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets });
this.logger.debug("Starting up", { envWithoutSecrets });
}

if (this.warmStartUrl) {
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
this.logger.log("🔥 Warm starts enabled", {
warmStartUrl: this.warmStartUrl,
});
}
Expand All @@ -69,9 +70,19 @@ class ManagedSupervisor {
dockerAutoremove: env.RUNNER_DOCKER_AUTOREMOVE,
} satisfies WorkloadManagerOptions;

this.resourceMonitor = env.RESOURCE_MONITOR_ENABLED
? this.isKubernetes
? new KubernetesResourceMonitor(createK8sApi(), env.TRIGGER_WORKER_INSTANCE_NAME)
: new DockerResourceMonitor(new Docker())
: new NoopResourceMonitor();

this.workloadManager = this.isKubernetes
? new KubernetesWorkloadManager(workloadManagerOptions)
: new DockerWorkloadManager(workloadManagerOptions);

if (this.isKubernetes) {
if (env.POD_CLEANER_ENABLED) {
this.logger.log("[ManagedWorker] 🧹 Pod cleaner enabled", {
this.logger.log("🧹 Pod cleaner enabled", {
namespace: env.KUBERNETES_NAMESPACE,
batchSize: env.POD_CLEANER_BATCH_SIZE,
intervalMs: env.POD_CLEANER_INTERVAL_MS,
Expand All @@ -83,11 +94,11 @@ class ManagedSupervisor {
intervalMs: env.POD_CLEANER_INTERVAL_MS,
});
} else {
this.logger.warn("[ManagedWorker] Pod cleaner disabled");
this.logger.warn("Pod cleaner disabled");
}

if (env.FAILED_POD_HANDLER_ENABLED) {
this.logger.log("[ManagedWorker] 🔁 Failed pod handler enabled", {
this.logger.log("🔁 Failed pod handler enabled", {
namespace: env.KUBERNETES_NAMESPACE,
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
Expand All @@ -97,17 +108,8 @@ class ManagedSupervisor {
reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS,
});
} else {
this.logger.warn("[ManagedWorker] Failed pod handler disabled");
this.logger.warn("Failed pod handler disabled");
}

this.resourceMonitor = new KubernetesResourceMonitor(
createK8sApi(),
env.TRIGGER_WORKER_INSTANCE_NAME
);
this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions);
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
this.workloadManager = new DockerWorkloadManager(workloadManagerOptions);
}

this.workerSession = new SupervisorSession({
Expand All @@ -123,12 +125,17 @@ class ManagedSupervisor {
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS,
preDequeue: async () => {
if (!env.RESOURCE_MONITOR_ENABLED) {
return {};
}

if (this.isKubernetes) {
// Not used in k8s for now
return {};
}

const resources = await this.resourceMonitor.getNodeResources();

return {
maxResources: {
cpu: resources.cpuAvailable,
Expand All @@ -144,7 +151,7 @@ class ManagedSupervisor {
});

if (env.TRIGGER_CHECKPOINT_URL) {
this.logger.log("[ManagedWorker] 🥶 Checkpoints enabled", {
this.logger.log("🥶 Checkpoints enabled", {
checkpointUrl: env.TRIGGER_CHECKPOINT_URL,
});

Expand All @@ -157,41 +164,38 @@ class ManagedSupervisor {

// setInterval(async () => {
// const resources = await this.resourceMonitor.getNodeResources(true);
// this.logger.debug("[ManagedWorker] Current resources", { resources });
// this.logger.debug("Current resources", { resources });
// }, 1000);

this.workerSession.on("runNotification", async ({ time, run }) => {
this.logger.log("[ManagedWorker] runNotification", { time, run });
this.logger.log("runNotification", { time, run });

this.workloadServer.notifyRun({ run });
});

this.workerSession.on("runQueueMessage", async ({ time, message }) => {
this.logger.log(
`[ManagedWorker] Received message with timestamp ${time.toLocaleString()}`,
message
);
this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message);

if (message.completedWaitpoints.length > 0) {
this.logger.debug("[ManagedWorker] Run has completed waitpoints", {
this.logger.debug("Run has completed waitpoints", {
runId: message.run.id,
completedWaitpoints: message.completedWaitpoints.length,
});
// TODO: Do something with them or if we don't need the data here, maybe we shouldn't even send it
}

if (!message.image) {
this.logger.error("[ManagedWorker] Run has no image", { runId: message.run.id });
this.logger.error("Run has no image", { runId: message.run.id });
return;
}

const { checkpoint, ...rest } = message;

if (checkpoint) {
this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id });
this.logger.log("Restoring run", { runId: message.run.id });

if (!this.checkpointClient) {
this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id });
this.logger.error("No checkpoint client", { runId: message.run.id });
return;
}

Expand All @@ -206,23 +210,23 @@ class ManagedSupervisor {
});

if (didRestore) {
this.logger.log("[ManagedWorker] Restore successful", { runId: message.run.id });
this.logger.log("Restore successful", { runId: message.run.id });
} else {
this.logger.error("[ManagedWorker] Restore failed", { runId: message.run.id });
this.logger.error("Restore failed", { runId: message.run.id });
}
} catch (error) {
this.logger.error("[ManagedWorker] Failed to restore run", { error });
this.logger.error("Failed to restore run", { error });
}

return;
}

this.logger.log("[ManagedWorker] Scheduling run", { runId: message.run.id });
this.logger.log("Scheduling run", { runId: message.run.id });

const didWarmStart = await this.tryWarmStart(message);

if (didWarmStart) {
this.logger.log("[ManagedWorker] Warm start successful", { runId: message.run.id });
this.logger.log("Warm start successful", { runId: message.run.id });
return;
}

Expand All @@ -249,7 +253,7 @@ class ManagedSupervisor {
// memory: message.run.machine.memory,
// });
} catch (error) {
this.logger.error("[ManagedWorker] Failed to create workload", { error });
this.logger.error("Failed to create workload", { error });
}
});

Expand Down Expand Up @@ -277,12 +281,12 @@ class ManagedSupervisor {
}

async onRunConnected({ run }: { run: { friendlyId: string } }) {
this.logger.debug("[ManagedWorker] Run connected", { run });
this.logger.debug("Run connected", { run });
this.workerSession.subscribeToRunNotifications([run.friendlyId]);
}

async onRunDisconnected({ run }: { run: { friendlyId: string } }) {
this.logger.debug("[ManagedWorker] Run disconnected", { run });
this.logger.debug("Run disconnected", { run });
this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]);
}

Expand All @@ -303,7 +307,7 @@ class ManagedSupervisor {
});

if (!res.ok) {
this.logger.error("[ManagedWorker] Warm start failed", {
this.logger.error("Warm start failed", {
runId: dequeuedMessage.run.id,
});
return false;
Expand All @@ -313,7 +317,7 @@ class ManagedSupervisor {
const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data);

if (!parsedData.success) {
this.logger.error("[ManagedWorker] Warm start response invalid", {
this.logger.error("Warm start response invalid", {
runId: dequeuedMessage.run.id,
data,
});
Expand All @@ -322,7 +326,7 @@ class ManagedSupervisor {

return parsedData.data.didWarmStart;
} catch (error) {
this.logger.error("[ManagedWorker] Warm start error", {
this.logger.error("Warm start error", {
runId: dequeuedMessage.run.id,
error,
});
Expand All @@ -331,29 +335,29 @@ class ManagedSupervisor {
}

async start() {
this.logger.log("[ManagedWorker] Starting up");
this.logger.log("Starting up");

// Optional services
await this.podCleaner?.start();
await this.failedPodHandler?.start();
await this.metricsServer?.start();

if (env.TRIGGER_WORKLOAD_API_ENABLED) {
this.logger.log("[ManagedWorker] Workload API enabled", {
this.logger.log("Workload API enabled", {
protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
domain: env.TRIGGER_WORKLOAD_API_DOMAIN,
port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL,
});
await this.workloadServer.start();
} else {
this.logger.warn("[ManagedWorker] Workload API disabled");
this.logger.warn("Workload API disabled");
}

await this.workerSession.start();
}

async stop() {
this.logger.log("[ManagedWorker] Shutting down");
this.logger.log("Shutting down");
await this.workerSession.stop();

// Optional services
Expand Down
Loading