From 971ddc3043ccafec4da437c66bcc71e7200e8c51 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 15 May 2025 15:40:48 +0100 Subject: [PATCH 01/10] don't require docker binary --- apps/supervisor/package.json | 2 - apps/supervisor/src/env.ts | 1 + apps/supervisor/src/workloadManager/docker.ts | 96 +++++++++++-------- pnpm-lock.yaml | 6 -- 4 files changed, 57 insertions(+), 48 deletions(-) diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index 8b29055f9a..59f0399125 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -16,11 +16,9 @@ "@kubernetes/client-node": "^1.0.0", "@trigger.dev/core": "workspace:*", "dockerode": "^4.0.3", - "nanoid": "^5.0.9", "prom-client": "^15.1.0", "socket.io": "4.7.4", "std-env": "^3.8.0", - "tinyexec": "^0.3.1", "zod": "3.23.8" }, "devDependencies": { diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index cd9bf5bead..ce449a1ad7 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -44,6 +44,7 @@ const Env = z.object({ // Used by the workload manager, e.g docker/k8s DOCKER_NETWORK: z.string().default("host"), + DOCKER_SOCKET_PATH: z.string().default("/var/run/docker.sock"), OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), ENFORCE_MACHINE_PRESETS: z.coerce.boolean().default(false), KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 171e2c0971..9dd4cecea8 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -4,88 +4,104 @@ import { type WorkloadManagerCreateOptions, type WorkloadManagerOptions, } from "./types.js"; -import { x } from "tinyexec"; import { env } from "../env.js"; import { getDockerHostDomain, getRunnerId } from "../util.js"; +import Docker from "dockerode"; export class DockerWorkloadManager implements WorkloadManager { - private readonly logger = new SimpleStructuredLogger("docker-workload-provider"); + private readonly logger = new SimpleStructuredLogger("docker-workload-manager"); + private readonly docker: Docker; constructor(private opts: WorkloadManagerOptions) { + this.docker = new Docker({ + socketPath: env.DOCKER_SOCKET_PATH, + }); + if (opts.workloadApiDomain) { - this.logger.warn("[DockerWorkloadProvider] ⚠️ Custom workload API domain", { + this.logger.warn("⚠️ Custom workload API domain", { domain: opts.workloadApiDomain, }); } } async create(opts: WorkloadManagerCreateOptions) { - this.logger.log("[DockerWorkloadProvider] Creating container", { opts }); + this.logger.log("create()", { opts }); const runnerId = getRunnerId(opts.runFriendlyId, opts.nextAttemptNumber); - const runArgs = [ - "run", - "--detach", - `--network=${env.DOCKER_NETWORK}`, - `--env=TRIGGER_DEQUEUED_AT_MS=${opts.dequeuedAt.getTime()}`, - `--env=TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`, - `--env=TRIGGER_ENV_ID=${opts.envId}`, - `--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`, - `--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`, - `--env=TRIGGER_SUPERVISOR_API_PROTOCOL=${this.opts.workloadApiProtocol}`, - `--env=TRIGGER_SUPERVISOR_API_PORT=${this.opts.workloadApiPort}`, - `--env=TRIGGER_SUPERVISOR_API_DOMAIN=${this.opts.workloadApiDomain ?? getDockerHostDomain()}`, - `--env=TRIGGER_WORKER_INSTANCE_NAME=${env.TRIGGER_WORKER_INSTANCE_NAME}`, - `--env=OTEL_EXPORTER_OTLP_ENDPOINT=${env.OTEL_EXPORTER_OTLP_ENDPOINT}`, - `--env=TRIGGER_RUNNER_ID=${runnerId}`, - `--hostname=${runnerId}`, - `--name=${runnerId}`, + // Build environment variables + const envVars: string[] = [ + `TRIGGER_DEQUEUED_AT_MS=${opts.dequeuedAt.getTime()}`, + `TRIGGER_POD_SCHEDULED_AT_MS=${Date.now()}`, + `TRIGGER_ENV_ID=${opts.envId}`, + `TRIGGER_RUN_ID=${opts.runFriendlyId}`, + `TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`, + `TRIGGER_SUPERVISOR_API_PROTOCOL=${this.opts.workloadApiProtocol}`, + `TRIGGER_SUPERVISOR_API_PORT=${this.opts.workloadApiPort}`, + `TRIGGER_SUPERVISOR_API_DOMAIN=${this.opts.workloadApiDomain ?? getDockerHostDomain()}`, + `TRIGGER_WORKER_INSTANCE_NAME=${env.TRIGGER_WORKER_INSTANCE_NAME}`, + `OTEL_EXPORTER_OTLP_ENDPOINT=${env.OTEL_EXPORTER_OTLP_ENDPOINT}`, + `TRIGGER_RUNNER_ID=${runnerId}`, ]; - if (this.opts.dockerAutoremove) { - runArgs.push("--rm"); - } - if (this.opts.warmStartUrl) { - runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`); + envVars.push(`TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`); } if (this.opts.metadataUrl) { - runArgs.push(`--env=TRIGGER_METADATA_URL=${this.opts.metadataUrl}`); + envVars.push(`TRIGGER_METADATA_URL=${this.opts.metadataUrl}`); } if (this.opts.heartbeatIntervalSeconds) { - runArgs.push( - `--env=TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}` - ); + envVars.push(`TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}`); } if (this.opts.snapshotPollIntervalSeconds) { - runArgs.push( - `--env=TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS=${this.opts.snapshotPollIntervalSeconds}` + envVars.push( + `TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS=${this.opts.snapshotPollIntervalSeconds}` ); } if (this.opts.additionalEnvVars) { Object.entries(this.opts.additionalEnvVars).forEach(([key, value]) => { - runArgs.push(`--env=${key}=${value}`); + envVars.push(`${key}=${value}`); }); } + const hostConfig: Docker.HostConfig = { + NetworkMode: env.DOCKER_NETWORK, + AutoRemove: !!this.opts.dockerAutoremove, + }; + if (env.ENFORCE_MACHINE_PRESETS) { - runArgs.push(`--cpus=${opts.machine.cpu}`, `--memory=${opts.machine.memory}G`); - runArgs.push(`--env=TRIGGER_MACHINE_CPU=${opts.machine.cpu}`); - runArgs.push(`--env=TRIGGER_MACHINE_MEMORY=${opts.machine.memory}`); + envVars.push(`TRIGGER_MACHINE_CPU=${opts.machine.cpu}`); + envVars.push(`TRIGGER_MACHINE_MEMORY=${opts.machine.memory}`); + + hostConfig.NanoCpus = opts.machine.cpu * 1e9; + hostConfig.Memory = opts.machine.memory * 1024 * 1024 * 1024; } - runArgs.push(`${opts.image}`); + const containerCreateOpts: Docker.ContainerCreateOptions = { + Env: envVars, + name: runnerId, + Hostname: runnerId, + HostConfig: hostConfig, + Image: opts.image, + AttachStdout: false, + AttachStderr: false, + AttachStdin: false, + }; try { - const { stdout, stderr } = await x("docker", runArgs); - this.logger.debug("[DockerWorkloadProvider] Create succeeded", { stdout, stderr }); + // Create container + const container = await this.docker.createContainer(containerCreateOpts); + + // Start container + const startResult = await container.start(); + + this.logger.debug("create succeeded", { opts, startResult, container, containerCreateOpts }); } catch (error) { - this.logger.error("[DockerWorkloadProvider] Create failed:", { opts, error }); + this.logger.error("create failed:", { opts, error, containerCreateOpts }); } } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 56c69c4e99..508abb1d17 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -161,9 +161,6 @@ importers: dockerode: specifier: ^4.0.3 version: 4.0.4 - nanoid: - specifier: ^5.0.9 - version: 5.1.2 prom-client: specifier: ^15.1.0 version: 15.1.0 @@ -173,9 +170,6 @@ importers: std-env: specifier: ^3.8.0 version: 3.8.1 - tinyexec: - specifier: ^0.3.1 - version: 0.3.2 zod: specifier: 3.23.8 version: 3.23.8 From 00bf3808b7a4cdd263f41ad3fcf4968b51cb8683 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 15 May 2025 15:41:23 +0100 Subject: [PATCH 02/10] more structured logs --- apps/supervisor/src/clients/kubernetes.ts | 5 +- apps/supervisor/src/envUtil.ts | 5 +- apps/supervisor/src/index.ts | 63 +++++++++--------- apps/supervisor/src/workloadServer/index.ts | 66 +++++++++---------- .../src/v3/runEngineWorker/supervisor/http.ts | 7 +- .../supervisor/queueConsumer.ts | 21 +++--- .../v3/runEngineWorker/supervisor/session.ts | 56 ++++++++-------- packages/core/src/v3/schemas/common.ts | 2 + packages/core/src/v3/serverOnly/httpServer.ts | 4 +- .../core/src/v3/utils/structuredLogger.ts | 8 ++- 10 files changed, 126 insertions(+), 111 deletions(-) diff --git a/apps/supervisor/src/clients/kubernetes.ts b/apps/supervisor/src/clients/kubernetes.ts index 77fd4144d6..f66e57e435 100644 --- a/apps/supervisor/src/clients/kubernetes.ts +++ b/apps/supervisor/src/clients/kubernetes.ts @@ -3,9 +3,12 @@ import { Informer } from "@kubernetes/client-node"; import { ListPromise } from "@kubernetes/client-node"; import { KubernetesObject } from "@kubernetes/client-node"; import { assertExhaustive } from "@trigger.dev/core/utils"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; export const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local"; +const logger = new SimpleStructuredLogger("kubernetes-client"); + export function createK8sApi() { const kubeConfig = getKubeConfig(); @@ -31,7 +34,7 @@ export function createK8sApi() { export type K8sApi = ReturnType; function getKubeConfig() { - console.log("getKubeConfig()", { RUNTIME_ENV }); + logger.debug("getKubeConfig()", { RUNTIME_ENV }); const kubeConfig = new k8s.KubeConfig(); diff --git a/apps/supervisor/src/envUtil.ts b/apps/supervisor/src/envUtil.ts index 41dd5ca22a..95d44d6c45 100644 --- a/apps/supervisor/src/envUtil.ts +++ b/apps/supervisor/src/envUtil.ts @@ -1,4 +1,7 @@ import { z } from "zod"; +import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; + +const logger = new SimpleStructuredLogger("env-util"); export const BoolEnv = z.preprocess((val) => { if (typeof val !== "string") { @@ -33,7 +36,7 @@ export const AdditionalEnvVars = z.preprocess((val) => { // Return undefined if no valid key-value pairs were found return Object.keys(result).length === 0 ? undefined : result; } catch (error) { - console.warn("Failed to parse additional env vars", { error, val }); + logger.warn("Failed to parse additional env vars", { error, val }); return undefined; } }, z.record(z.string(), z.string()).optional()); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index b32e4b00ef..7cda6fd54f 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -33,7 +33,7 @@ class ManagedSupervisor { private readonly metricsServer?: HttpServer; private readonly workloadServer: WorkloadServer; private readonly workloadManager: WorkloadManager; - private readonly logger = new SimpleStructuredLogger("managed-worker"); + private readonly logger = new SimpleStructuredLogger("managed-supervisor"); private readonly resourceMonitor: ResourceMonitor; private readonly checkpointClient?: CheckpointClient; @@ -47,11 +47,11 @@ class ManagedSupervisor { const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, ...envWithoutSecrets } = env; if (env.DEBUG) { - console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets }); + this.logger.debug("Starting up", { envWithoutSecrets }); } if (this.warmStartUrl) { - this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", { + this.logger.log("🔥 Warm starts enabled", { warmStartUrl: this.warmStartUrl, }); } @@ -71,7 +71,7 @@ class ManagedSupervisor { if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { - this.logger.log("[ManagedWorker] 🧹 Pod cleaner enabled", { + this.logger.log("🧹 Pod cleaner enabled", { namespace: env.KUBERNETES_NAMESPACE, batchSize: env.POD_CLEANER_BATCH_SIZE, intervalMs: env.POD_CLEANER_INTERVAL_MS, @@ -83,11 +83,11 @@ class ManagedSupervisor { intervalMs: env.POD_CLEANER_INTERVAL_MS, }); } else { - this.logger.warn("[ManagedWorker] Pod cleaner disabled"); + this.logger.warn("Pod cleaner disabled"); } if (env.FAILED_POD_HANDLER_ENABLED) { - this.logger.log("[ManagedWorker] 🔁 Failed pod handler enabled", { + this.logger.log("🔁 Failed pod handler enabled", { namespace: env.KUBERNETES_NAMESPACE, reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, }); @@ -97,7 +97,7 @@ class ManagedSupervisor { reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, }); } else { - this.logger.warn("[ManagedWorker] Failed pod handler disabled"); + this.logger.warn("Failed pod handler disabled"); } this.resourceMonitor = new KubernetesResourceMonitor( @@ -144,7 +144,7 @@ class ManagedSupervisor { }); if (env.TRIGGER_CHECKPOINT_URL) { - this.logger.log("[ManagedWorker] 🥶 Checkpoints enabled", { + this.logger.log("🥶 Checkpoints enabled", { checkpointUrl: env.TRIGGER_CHECKPOINT_URL, }); @@ -157,23 +157,20 @@ class ManagedSupervisor { // setInterval(async () => { // const resources = await this.resourceMonitor.getNodeResources(true); - // this.logger.debug("[ManagedWorker] Current resources", { resources }); + // this.logger.debug("Current resources", { resources }); // }, 1000); this.workerSession.on("runNotification", async ({ time, run }) => { - this.logger.log("[ManagedWorker] runNotification", { time, run }); + this.logger.log("runNotification", { time, run }); this.workloadServer.notifyRun({ run }); }); this.workerSession.on("runQueueMessage", async ({ time, message }) => { - this.logger.log( - `[ManagedWorker] Received message with timestamp ${time.toLocaleString()}`, - message - ); + this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message); if (message.completedWaitpoints.length > 0) { - this.logger.debug("[ManagedWorker] Run has completed waitpoints", { + this.logger.debug("Run has completed waitpoints", { runId: message.run.id, completedWaitpoints: message.completedWaitpoints.length, }); @@ -181,17 +178,17 @@ class ManagedSupervisor { } if (!message.image) { - this.logger.error("[ManagedWorker] Run has no image", { runId: message.run.id }); + this.logger.error("Run has no image", { runId: message.run.id }); return; } const { checkpoint, ...rest } = message; if (checkpoint) { - this.logger.log("[ManagedWorker] Restoring run", { runId: message.run.id }); + this.logger.log("Restoring run", { runId: message.run.id }); if (!this.checkpointClient) { - this.logger.error("[ManagedWorker] No checkpoint client", { runId: message.run.id }); + this.logger.error("No checkpoint client", { runId: message.run.id }); return; } @@ -206,23 +203,23 @@ class ManagedSupervisor { }); if (didRestore) { - this.logger.log("[ManagedWorker] Restore successful", { runId: message.run.id }); + this.logger.log("Restore successful", { runId: message.run.id }); } else { - this.logger.error("[ManagedWorker] Restore failed", { runId: message.run.id }); + this.logger.error("Restore failed", { runId: message.run.id }); } } catch (error) { - this.logger.error("[ManagedWorker] Failed to restore run", { error }); + this.logger.error("Failed to restore run", { error }); } return; } - this.logger.log("[ManagedWorker] Scheduling run", { runId: message.run.id }); + this.logger.log("Scheduling run", { runId: message.run.id }); const didWarmStart = await this.tryWarmStart(message); if (didWarmStart) { - this.logger.log("[ManagedWorker] Warm start successful", { runId: message.run.id }); + this.logger.log("Warm start successful", { runId: message.run.id }); return; } @@ -249,7 +246,7 @@ class ManagedSupervisor { // memory: message.run.machine.memory, // }); } catch (error) { - this.logger.error("[ManagedWorker] Failed to create workload", { error }); + this.logger.error("Failed to create workload", { error }); } }); @@ -277,12 +274,12 @@ class ManagedSupervisor { } async onRunConnected({ run }: { run: { friendlyId: string } }) { - this.logger.debug("[ManagedWorker] Run connected", { run }); + this.logger.debug("Run connected", { run }); this.workerSession.subscribeToRunNotifications([run.friendlyId]); } async onRunDisconnected({ run }: { run: { friendlyId: string } }) { - this.logger.debug("[ManagedWorker] Run disconnected", { run }); + this.logger.debug("Run disconnected", { run }); this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]); } @@ -303,7 +300,7 @@ class ManagedSupervisor { }); if (!res.ok) { - this.logger.error("[ManagedWorker] Warm start failed", { + this.logger.error("Warm start failed", { runId: dequeuedMessage.run.id, }); return false; @@ -313,7 +310,7 @@ class ManagedSupervisor { const parsedData = z.object({ didWarmStart: z.boolean() }).safeParse(data); if (!parsedData.success) { - this.logger.error("[ManagedWorker] Warm start response invalid", { + this.logger.error("Warm start response invalid", { runId: dequeuedMessage.run.id, data, }); @@ -322,7 +319,7 @@ class ManagedSupervisor { return parsedData.data.didWarmStart; } catch (error) { - this.logger.error("[ManagedWorker] Warm start error", { + this.logger.error("Warm start error", { runId: dequeuedMessage.run.id, error, }); @@ -331,7 +328,7 @@ class ManagedSupervisor { } async start() { - this.logger.log("[ManagedWorker] Starting up"); + this.logger.log("Starting up"); // Optional services await this.podCleaner?.start(); @@ -339,21 +336,21 @@ class ManagedSupervisor { await this.metricsServer?.start(); if (env.TRIGGER_WORKLOAD_API_ENABLED) { - this.logger.log("[ManagedWorker] Workload API enabled", { + this.logger.log("Workload API enabled", { protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL, domain: env.TRIGGER_WORKLOAD_API_DOMAIN, port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL, }); await this.workloadServer.start(); } else { - this.logger.warn("[ManagedWorker] Workload API disabled"); + this.logger.warn("Workload API disabled"); } await this.workerSession.start(); } async stop() { - this.logger.log("[ManagedWorker] Shutting down"); + this.logger.log("Shutting down"); await this.workerSession.stop(); // Optional services diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index cc41e2bfbf..fb7c12c17a 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -63,6 +63,8 @@ type WorkloadServerOptions = { export class WorkloadServer extends EventEmitter { private checkpointClient?: CheckpointClient; + private readonly logger = new SimpleStructuredLogger("workload-server"); + private readonly httpServer: HttpServer; private readonly websocketServer: Namespace< WorkloadClientToServerEvents, @@ -151,7 +153,7 @@ export class WorkloadServer extends EventEmitter { ); if (!startResponse.success) { - console.error("Failed to start run", { + this.logger.error("Failed to start run", { params, error: startResponse.error, }); @@ -171,7 +173,6 @@ export class WorkloadServer extends EventEmitter { paramsSchema: WorkloadActionParams, bodySchema: WorkloadRunAttemptCompleteRequestBody, handler: async ({ req, reply, params, body }) => { - console.log("headers", req.headers); const completeResponse = await this.workerClient.completeRunAttempt( params.runFriendlyId, params.snapshotFriendlyId, @@ -180,7 +181,7 @@ export class WorkloadServer extends EventEmitter { ); if (!completeResponse.success) { - console.error("Failed to complete run", { + this.logger.error("Failed to complete run", { params, error: completeResponse.error, }); @@ -208,7 +209,7 @@ export class WorkloadServer extends EventEmitter { ); if (!heartbeatResponse.success) { - console.error("Failed to heartbeat run", { + this.logger.error("Failed to heartbeat run", { params, error: heartbeatResponse.error, }); @@ -228,7 +229,7 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, handler: async ({ reply, params, req }) => { - console.debug("Suspend request", { params, headers: req.headers }); + this.logger.debug("Suspend request", { params, headers: req.headers }); if (!this.checkpointClient) { reply.json( @@ -247,7 +248,7 @@ export class WorkloadServer extends EventEmitter { const projectRef = this.projectRefFromRequest(req); if (!runnerId || !deploymentVersion || !projectRef) { - console.error("Invalid headers for suspend request", { + this.logger.error("Invalid headers for suspend request", { ...params, headers: req.headers, }); @@ -283,7 +284,7 @@ export class WorkloadServer extends EventEmitter { }); if (!suspendResult) { - console.error("Failed to suspend run", { params }); + this.logger.error("Failed to suspend run", { params }); return; } }, @@ -295,7 +296,7 @@ export class WorkloadServer extends EventEmitter { { paramsSchema: WorkloadActionParams, handler: async ({ req, reply, params }) => { - console.debug("Run continuation request", { params }); + this.logger.debug("Run continuation request", { params }); const continuationResult = await this.workerClient.continueRunExecution( params.runFriendlyId, @@ -304,7 +305,7 @@ export class WorkloadServer extends EventEmitter { ); if (!continuationResult.success) { - console.error("Failed to continue run execution", { params }); + this.logger.error("Failed to continue run execution", { params }); reply.json( { ok: false, @@ -329,7 +330,7 @@ export class WorkloadServer extends EventEmitter { ); if (!latestSnapshotResponse.success) { - console.error("Failed to get latest snapshot", { + this.logger.error("Failed to get latest snapshot", { runId: params.runFriendlyId, error: latestSnapshotResponse.error, }); @@ -355,7 +356,7 @@ export class WorkloadServer extends EventEmitter { ); if (!sinceSnapshotResponse.success) { - console.error("Failed to get snapshots since", { + this.logger.error("Failed to get snapshots since", { runId: params.runFriendlyId, error: sinceSnapshotResponse.error, }); @@ -393,7 +394,7 @@ export class WorkloadServer extends EventEmitter { ); if (!dequeueResponse.success) { - console.error("Failed to get latest snapshot", { + this.logger.error("Failed to get latest snapshot", { deploymentId: params.deploymentId, error: dequeueResponse.error, }); @@ -417,14 +418,14 @@ export class WorkloadServer extends EventEmitter { > = io.of("/workload"); websocketServer.on("disconnect", (socket) => { - console.log("[WorkloadSocket] disconnect", socket.id); + this.logger.log("[WS] disconnect", socket.id); }); websocketServer.use(async (socket, next) => { - function setSocketDataFromHeader( + const setSocketDataFromHeader = ( dataKey: keyof typeof socket.data, headerName: string, required: boolean = true - ) { + ) => { const value = socket.handshake.headers[headerName]; if (value) { @@ -440,27 +441,26 @@ export class WorkloadServer extends EventEmitter { } if (required) { - console.error("[WorkloadSocket] missing required header", { headerName }); + this.logger.error("[WS] missing required header", { headerName }); throw new Error("missing header"); } - } + }; try { setSocketDataFromHeader("deploymentId", WORKLOAD_HEADERS.DEPLOYMENT_ID); setSocketDataFromHeader("runnerId", WORKLOAD_HEADERS.RUNNER_ID); } catch (error) { - console.error("[WorkloadSocket] setSocketDataFromHeader error", { error }); + this.logger.error("[WS] setSocketDataFromHeader error", { error }); socket.disconnect(true); return; } - console.debug("[WorkloadSocket] auth success", socket.data); + this.logger.debug("[WS] auth success", socket.data); next(); }); websocketServer.on("connection", (socket) => { - const logger = new SimpleStructuredLogger("workload-namespace", undefined, { - namespace: "workload", + const socketLogger = this.logger.child({ socketId: socket.id, socketData: socket.data, }); @@ -475,11 +475,11 @@ export class WorkloadServer extends EventEmitter { }; const runConnected = (friendlyId: string) => { - logger.debug("runConnected", { ...getSocketMetadata() }); + socketLogger.debug("runConnected", { ...getSocketMetadata() }); // If there's already a run ID set, we should "disconnect" it from this socket if (socket.data.runFriendlyId && socket.data.runFriendlyId !== friendlyId) { - logger.debug("runConnected: disconnecting existing run", { + socketLogger.debug("runConnected: disconnecting existing run", { ...getSocketMetadata(), newRunId: friendlyId, oldRunId: socket.data.runFriendlyId, @@ -493,14 +493,14 @@ export class WorkloadServer extends EventEmitter { }; const runDisconnected = (friendlyId: string) => { - logger.debug("runDisconnected", { ...getSocketMetadata() }); + socketLogger.debug("runDisconnected", { ...getSocketMetadata() }); this.runSockets.delete(friendlyId); this.emit("runDisconnected", { run: { friendlyId } }); socket.data.runFriendlyId = undefined; }; - logger.log("wsServer socket connected", { ...getSocketMetadata() }); + socketLogger.log("wsServer socket connected", { ...getSocketMetadata() }); // FIXME: where does this get set? if (socket.data.runFriendlyId) { @@ -508,7 +508,7 @@ export class WorkloadServer extends EventEmitter { } socket.on("disconnecting", (reason, description) => { - logger.log("Socket disconnecting", { ...getSocketMetadata(), reason, description }); + socketLogger.log("Socket disconnecting", { ...getSocketMetadata(), reason, description }); if (socket.data.runFriendlyId) { runDisconnected(socket.data.runFriendlyId); @@ -516,11 +516,11 @@ export class WorkloadServer extends EventEmitter { }); socket.on("disconnect", (reason, description) => { - logger.log("Socket disconnected", { ...getSocketMetadata(), reason, description }); + socketLogger.log("Socket disconnected", { ...getSocketMetadata(), reason, description }); }); socket.on("error", (error) => { - logger.error("Socket error", { + socketLogger.error("Socket error", { ...getSocketMetadata(), error: { name: error.name, @@ -531,7 +531,7 @@ export class WorkloadServer extends EventEmitter { }); socket.on("run:start", async (message) => { - const log = logger.child({ + const log = socketLogger.child({ eventName: "run:start", ...getSocketMetadata(), ...message, @@ -547,7 +547,7 @@ export class WorkloadServer extends EventEmitter { }); socket.on("run:stop", async (message) => { - const log = logger.child({ + const log = socketLogger.child({ eventName: "run:stop", ...getSocketMetadata(), ...message, @@ -571,7 +571,7 @@ export class WorkloadServer extends EventEmitter { const runSocket = this.runSockets.get(run.friendlyId); if (!runSocket) { - console.debug("[WorkloadServer] notifyRun: Run socket not found", { run }); + this.logger.debug("notifyRun: Run socket not found", { run }); this.workerClient.sendDebugLog(run.friendlyId, { time: new Date(), @@ -582,14 +582,14 @@ export class WorkloadServer extends EventEmitter { } runSocket.emit("run:notify", { version: "1", run }); - console.debug("[WorkloadServer] run:notify sent", { run }); + this.logger.debug("run:notify sent", { run }); this.workerClient.sendDebugLog(run.friendlyId, { time: new Date(), message: "run:notify supervisor -> runner", }); } catch (error) { - console.error("[WorkloadServer] Error in notifyRun", { run, error }); + this.logger.error("Error in notifyRun", { run, error }); this.workerClient.sendDebugLog(run.friendlyId, { time: new Date(), diff --git a/packages/core/src/v3/runEngineWorker/supervisor/http.ts b/packages/core/src/v3/runEngineWorker/supervisor/http.ts index 43305b456a..407b9deb5f 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/http.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/http.ts @@ -24,6 +24,7 @@ import { getDefaultWorkerHeaders } from "./util.js"; import { wrapZodFetch } from "../../zodfetch.js"; import { createHeaders } from "../util.js"; import { WORKER_HEADERS } from "../consts.js"; +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; type SupervisorHttpClientOptions = SupervisorClientCommonOptions; @@ -33,6 +34,8 @@ export class SupervisorHttpClient { private readonly instanceName: string; private readonly defaultHeaders: Record; + private readonly logger = new SimpleStructuredLogger("supervisor-http-client"); + constructor(opts: SupervisorHttpClientOptions) { this.apiUrl = opts.apiUrl.replace(/\/$/, ""); this.workerToken = opts.workerToken; @@ -217,10 +220,10 @@ export class SupervisorHttpClient { ); if (!res.success) { - console.error("Failed to send debug log", res); + this.logger.error("Failed to send debug log", { error: res.error }); } } catch (error) { - console.error("Failed to send debug log", { error }); + this.logger.error("Failed to send debug log (caught error)", { error }); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 273c7bbe0a..66df549678 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -1,3 +1,4 @@ +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; import { SupervisorHttpClient } from "./http.js"; import { WorkerApiDequeueResponseBody } from "./schemas.js"; import { PreDequeueFn, PreSkipFn } from "./types.js"; @@ -19,6 +20,8 @@ export class RunQueueConsumer { private readonly maxRunCount?: number; private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise; + private readonly logger = new SimpleStructuredLogger("queue-consumer"); + private intervalMs: number; private idleIntervalMs: number; private isEnabled: boolean; @@ -53,7 +56,7 @@ export class RunQueueConsumer { private async dequeue() { // Incredibly verbose logging for debugging purposes - // console.debug("[RunQueueConsumer] dequeue()", { enabled: this.isEnabled }); + // this.logger.debug("dequeue()", { enabled: this.isEnabled }); if (!this.isEnabled) { return; @@ -61,16 +64,16 @@ export class RunQueueConsumer { let preDequeueResult: Awaited> | undefined; if (this.preDequeue) { - // console.debug("[RunQueueConsumer] preDequeue()"); + // this.logger.debug("preDequeue()"); try { preDequeueResult = await this.preDequeue(); } catch (preDequeueError) { - console.error("[RunQueueConsumer] preDequeue error", { error: preDequeueError }); + this.logger.error("preDequeue error", { error: preDequeueError }); } } - // console.debug("[RunQueueConsumer] preDequeueResult", { preDequeueResult }); + // this.logger.debug("preDequeueResult", { preDequeueResult }); if ( preDequeueResult?.skipDequeue || @@ -78,12 +81,12 @@ export class RunQueueConsumer { preDequeueResult?.maxResources?.memory === 0 ) { if (this.preSkip) { - console.debug("[RunQueueConsumer] preSkip()"); + this.logger.debug("preSkip()"); try { await this.preSkip(); } catch (preSkipError) { - console.error("[RunQueueConsumer] preSkip error", { error: preSkipError }); + this.logger.error("preSkip error", { error: preSkipError }); } } @@ -99,7 +102,7 @@ export class RunQueueConsumer { }); if (!response.success) { - console.error("[RunQueueConsumer] Failed to dequeue", { error: response.error }); + this.logger.error("Failed to dequeue", { error: response.error }); } else { try { await this.onDequeue(response.data); @@ -108,11 +111,11 @@ export class RunQueueConsumer { nextIntervalMs = this.intervalMs; } } catch (handlerError) { - console.error("[RunQueueConsumer] onDequeue error", { error: handlerError }); + this.logger.error("onDequeue error", { error: handlerError }); } } } catch (clientError) { - console.error("[RunQueueConsumer] client.dequeue error", { error: clientError }); + this.logger.error("client.dequeue error", { error: clientError }); } this.scheduleNextDequeue(nextIntervalMs); diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 3f19bb97de..8767008fda 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -9,6 +9,7 @@ import { io, Socket } from "socket.io-client"; import { WorkerClientToServerEvents, WorkerServerToClientEvents } from "../types.js"; import { getDefaultWorkerHeaders } from "./util.js"; import { IntervalService } from "../../utils/interval.js"; +import { SimpleStructuredLogger } from "../../utils/structuredLogger.js"; type SupervisorSessionOptions = SupervisorClientCommonOptions & { queueConsumerEnabled?: boolean; @@ -25,6 +26,8 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & { export class SupervisorSession extends EventEmitter { public readonly httpClient: SupervisorHttpClient; + private readonly logger = new SimpleStructuredLogger("supervisor-session"); + private readonly runNotificationsEnabled: boolean; private runNotificationsSocket?: Socket; @@ -54,30 +57,29 @@ export class SupervisorSession extends EventEmitter { this.heartbeat = new IntervalService({ onInterval: async () => { - console.debug("[SupervisorSession] Sending heartbeat"); + this.logger.debug("Sending heartbeat"); const body = this.getHeartbeatBody(); const response = await this.httpClient.heartbeatWorker(body); if (!response.success) { - console.error("[SupervisorSession] Heartbeat failed", { error: response.error }); + this.logger.error("Heartbeat failed", { error: response.error }); } }, intervalMs: opts.heartbeatIntervalSeconds * 1000, leadingEdge: false, onError: async (error) => { - console.error("[SupervisorSession] Failed to send heartbeat", { error }); + this.logger.error("Failed to send heartbeat", { error }); }, }); } private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise { // Incredibly verbose logging for debugging purposes - // console.log("[SupervisorSession] Dequeued messages", { count: messages.length }); - // console.debug("[SupervisorSession] Dequeued messages with contents", messages); + // this.logger.debug("Dequeued messages with contents", { count: messages.length, messages }); for (const message of messages) { - console.log("[SupervisorSession] Emitting message", { message }); + this.logger.debug("Emitting message", { message }); this.emit("runQueueMessage", { time: new Date(), message, @@ -86,10 +88,10 @@ export class SupervisorSession extends EventEmitter { } subscribeToRunNotifications(runFriendlyIds: string[]) { - console.log("[SupervisorSession] Subscribing to run notifications", { runFriendlyIds }); + this.logger.debug("Subscribing to run notifications", { runFriendlyIds }); if (!this.runNotificationsSocket) { - console.error("[SupervisorSession] Socket not connected"); + this.logger.error("Socket not connected"); return; } @@ -106,10 +108,10 @@ export class SupervisorSession extends EventEmitter { } unsubscribeFromRunNotifications(runFriendlyIds: string[]) { - console.log("[SupervisorSession] Unsubscribing from run notifications", { runFriendlyIds }); + this.logger.debug("Unsubscribing from run notifications", { runFriendlyIds }); if (!this.runNotificationsSocket) { - console.error("[SupervisorSession] Socket not connected"); + this.logger.error("Socket not connected"); return; } @@ -137,26 +139,22 @@ export class SupervisorSession extends EventEmitter { extraHeaders: getDefaultWorkerHeaders(this.opts), }); socket.on("run:notify", ({ version, run }) => { - console.log("[SupervisorSession][WS] Received run notification", { version, run }); + this.logger.debug("[WS] Received run notification", { version, run }); this.emit("runNotification", { time: new Date(), run }); - this.httpClient - .sendDebugLog(run.friendlyId, { - time: new Date(), - message: "run:notify received by supervisor", - }) - .catch((error) => { - console.error("[SupervisorSession] Failed to send debug log", { error }); - }); + this.httpClient.sendDebugLog(run.friendlyId, { + time: new Date(), + message: "run:notify received by supervisor", + }); }); socket.on("connect", () => { - console.log("[SupervisorSession][WS] Connected to platform"); + this.logger.log("[WS] Connected to platform"); }); socket.on("connect_error", (error) => { - console.error("[SupervisorSession][WS] Connection error", { error }); + this.logger.error("[WS] Connection error", { error }); }); socket.on("disconnect", (reason, description) => { - console.log("[SupervisorSession][WS] Disconnected from platform", { reason, description }); + this.logger.log("[WS] Disconnected from platform", { reason, description }); }); return socket; @@ -170,30 +168,30 @@ export class SupervisorSession extends EventEmitter { }); if (!connect.success) { - console.error("[SupervisorSession][HTTP] Failed to connect", { error: connect.error }); - throw new Error("[SupervisorSession][HTTP] Failed to connect"); + this.logger.error("Failed to connect", { error: connect.error }); + throw new Error("[SupervisorSession]Failed to connect"); } const { workerGroup } = connect.data; - console.log("[SupervisorSession][HTTP] Connected to platform", { + this.logger.log("Connected to platform", { type: workerGroup.type, name: workerGroup.name, }); if (this.queueConsumerEnabled) { - console.log("[SupervisorSession] Queue consumer enabled"); + this.logger.log("Queue consumer enabled"); await Promise.allSettled(this.queueConsumers.map(async (q) => q.start())); this.heartbeat.start(); } else { - console.warn("[SupervisorSession] Queue consumer disabled"); + this.logger.warn("Queue consumer disabled"); } if (this.runNotificationsEnabled) { - console.log("[SupervisorSession] Run notifications enabled"); + this.logger.log("Run notifications enabled"); this.runNotificationsSocket = this.createRunNotificationsSocket(); } else { - console.warn("[SupervisorSession] Run notifications disabled"); + this.logger.warn("Run notifications disabled"); } } diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index a4d37409a2..259b902687 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -116,7 +116,9 @@ export type MachineConfig = z.infer; export const MachinePreset = z.object({ name: MachinePresetName, + /** unit: vCPU */ cpu: z.number(), + /** unit: GB */ memory: z.number(), centsPerMs: z.number(), }); diff --git a/packages/core/src/v3/serverOnly/httpServer.ts b/packages/core/src/v3/serverOnly/httpServer.ts index bdbfcb7589..4c4fa4eeb1 100644 --- a/packages/core/src/v3/serverOnly/httpServer.ts +++ b/packages/core/src/v3/serverOnly/httpServer.ts @@ -5,7 +5,7 @@ import { HttpReply, getJsonBody } from "../apps/http.js"; import { Registry, Histogram, Counter } from "prom-client"; import { tryCatch } from "../../utils.js"; -const logger = new SimpleStructuredLogger("worker-http"); +const logger = new SimpleStructuredLogger("http-server"); type RouteHandler< TParams extends z.ZodFirstPartySchemaTypes = z.ZodUnknown, @@ -124,7 +124,7 @@ export class HttpServer { try { const { url, method } = req; - logger.log(`${method} ${url?.split("?")[0]}`, { url }); + logger.debug(`${method} ${url?.split("?")[0]}`, { url }); if (!url) { logger.error("Request URL is empty", { method }); diff --git a/packages/core/src/v3/utils/structuredLogger.ts b/packages/core/src/v3/utils/structuredLogger.ts index 72c675aecd..96a8957ef7 100644 --- a/packages/core/src/v3/utils/structuredLogger.ts +++ b/packages/core/src/v3/utils/structuredLogger.ts @@ -18,6 +18,8 @@ export enum LogLevel { } export class SimpleStructuredLogger implements StructuredLogger { + private prettyPrint = ["1", "true"].includes(process.env.PRETTY_LOGS ?? ""); + constructor( private name: string, private level: LogLevel = ["1", "true"].includes(process.env.DEBUG ?? "") @@ -82,6 +84,10 @@ export class SimpleStructuredLogger implements StructuredLogger { ...(args.length === 1 ? args[0] : args), }; - loggerFunction(JSON.stringify(structuredLog)); + if (this.prettyPrint) { + loggerFunction(JSON.stringify(structuredLog, null, 2)); + } else { + loggerFunction(JSON.stringify(structuredLog)); + } } } From 43290892827adcf4bb093a55a32b7e0ee779aed3 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 15 May 2025 15:43:33 +0100 Subject: [PATCH 03/10] remove docker type dep --- apps/supervisor/package.json | 1 - apps/supervisor/src/resourceMonitor.ts | 8 ++++++-- pnpm-lock.yaml | 7 ------- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/apps/supervisor/package.json b/apps/supervisor/package.json index 59f0399125..7d0c148cb3 100644 --- a/apps/supervisor/package.json +++ b/apps/supervisor/package.json @@ -23,7 +23,6 @@ }, "devDependencies": { "@types/dockerode": "^3.3.33", - "docker-api-ts": "^0.2.2", "vitest": "^1.4.0" } } diff --git a/apps/supervisor/src/resourceMonitor.ts b/apps/supervisor/src/resourceMonitor.ts index 2aa844b9c7..d25d82ddcd 100644 --- a/apps/supervisor/src/resourceMonitor.ts +++ b/apps/supervisor/src/resourceMonitor.ts @@ -1,5 +1,4 @@ import type Docker from "dockerode"; -import type * as TDocker from "docker-api-ts"; import type { MachineResources } from "@trigger.dev/core/v3"; import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger"; import { env } from "./env.js"; @@ -100,6 +99,11 @@ export abstract class ResourceMonitor { } } +type SystemInfo = { + NCPU: number | undefined; + MemTotal: number | undefined; +}; + export class DockerResourceMonitor extends ResourceMonitor { private docker: Docker; @@ -114,7 +118,7 @@ export class DockerResourceMonitor extends ResourceMonitor { return this.cachedResources; } - const info: TDocker.SystemInfo = await this.docker.info(); + const info: SystemInfo = await this.docker.info(); const stats = await this.docker.listContainers({ all: true }); // Get system-wide resources diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 508abb1d17..8ef216fb69 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -177,9 +177,6 @@ importers: '@types/dockerode': specifier: ^3.3.33 version: 3.3.35 - docker-api-ts: - specifier: ^0.2.2 - version: 0.2.2 vitest: specifier: ^1.4.0 version: 1.6.0(@types/node@20.14.14) @@ -22600,10 +22597,6 @@ packages: /dlv@1.1.3: resolution: {integrity: sha512-+HlytyjlPKnIG8XuRG8WvmBP8xs8P71y+SKKS6ZXWoEgLuePxtDoUEiH7WkdePWrQ5JBpE6aoVqfZfJUQkjXwA==} - /docker-api-ts@0.2.2: - resolution: {integrity: sha512-ayoc0OuS6lY7b64GeUtKcPzbKMkK70Vh3BYLKKG13cXX+/gGS9LyTNVvvJyvZ19Y6kbE4Kbv+2gwRUD17UVTRA==} - dev: true - /docker-compose@0.24.8: resolution: {integrity: sha512-plizRs/Vf15H+GCVxq2EUvyPK7ei9b/cVesHvjnX4xaXjM9spHe2Ytq0BitndFgvTJ3E3NljPNUEl7BAN43iZw==} engines: {node: '>= 6.0.0'} From 427d0fe4ed2df58f7de212d3ab49f103615e6375 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 15 May 2025 16:23:32 +0100 Subject: [PATCH 04/10] add core changeset --- .changeset/lazy-panthers-shop.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/lazy-panthers-shop.md diff --git a/.changeset/lazy-panthers-shop.md b/.changeset/lazy-panthers-shop.md new file mode 100644 index 0000000000..fa622e087e --- /dev/null +++ b/.changeset/lazy-panthers-shop.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Improve structured logs From 2234fb876f902223a57ba935d7ae3df9a9bfc32b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 15 May 2025 16:24:20 +0100 Subject: [PATCH 05/10] use implicit DOCKER_HOST instead --- apps/supervisor/.env.example | 2 +- apps/supervisor/src/env.ts | 1 - apps/supervisor/src/workloadManager/docker.ts | 4 +--- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/apps/supervisor/.env.example b/apps/supervisor/.env.example index da91ebb6aa..652fc03942 100644 --- a/apps/supervisor/.env.example +++ b/apps/supervisor/.env.example @@ -7,7 +7,7 @@ MANAGED_WORKER_SECRET=managed-secret # Point this at the webapp in prod TRIGGER_API_URL=http://localhost:3030 -# Point this at the OTel collector in prod +# Point this at the webapp or an OTel collector in prod OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:3030/otel # Use this on macOS # OTEL_EXPORTER_OTLP_ENDPOINT=http://host.docker.internal:3030/otel diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index ce449a1ad7..cd9bf5bead 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -44,7 +44,6 @@ const Env = z.object({ // Used by the workload manager, e.g docker/k8s DOCKER_NETWORK: z.string().default("host"), - DOCKER_SOCKET_PATH: z.string().default("/var/run/docker.sock"), OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), ENFORCE_MACHINE_PRESETS: z.coerce.boolean().default(false), KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index 9dd4cecea8..e9dfa2f034 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -13,9 +13,7 @@ export class DockerWorkloadManager implements WorkloadManager { private readonly docker: Docker; constructor(private opts: WorkloadManagerOptions) { - this.docker = new Docker({ - socketPath: env.DOCKER_SOCKET_PATH, - }); + this.docker = new Docker(); if (opts.workloadApiDomain) { this.logger.warn("⚠️ Custom workload API domain", { From 338a4422606c699c0f75c384a87a84949b749706 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Thu, 15 May 2025 16:57:00 +0100 Subject: [PATCH 06/10] disable resource monitor for now --- apps/supervisor/src/env.ts | 5 ++-- apps/supervisor/src/index.ts | 25 ++++++++++------ apps/supervisor/src/resourceMonitor.ts | 40 ++++++++++++++++++++++---- 3 files changed, 53 insertions(+), 17 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index cd9bf5bead..9d53b61300 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -49,8 +49,9 @@ const Env = z.object({ KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv // Used by the resource monitor - OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), - OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), + RESOURCE_MONITOR_ENABLED: BoolEnv.default(false), + RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), + RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB: z.coerce.number().optional(), // Kubernetes specific settings KUBERNETES_FORCE_ENABLED: BoolEnv.default(false), diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 7cda6fd54f..7dfaaf2cc1 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -9,6 +9,7 @@ import { type DequeuedMessage } from "@trigger.dev/core/v3"; import { DockerResourceMonitor, KubernetesResourceMonitor, + NoopResourceMonitor, type ResourceMonitor, } from "./resourceMonitor.js"; import { KubernetesWorkloadManager } from "./workloadManager/kubernetes.js"; @@ -69,6 +70,16 @@ class ManagedSupervisor { dockerAutoremove: env.RUNNER_DOCKER_AUTOREMOVE, } satisfies WorkloadManagerOptions; + this.resourceMonitor = env.RESOURCE_MONITOR_ENABLED + ? this.isKubernetes + ? new KubernetesResourceMonitor(createK8sApi(), env.TRIGGER_WORKER_INSTANCE_NAME) + : new DockerResourceMonitor(new Docker()) + : new NoopResourceMonitor(); + + this.workloadManager = this.isKubernetes + ? new KubernetesWorkloadManager(workloadManagerOptions) + : new DockerWorkloadManager(workloadManagerOptions); + if (this.isKubernetes) { if (env.POD_CLEANER_ENABLED) { this.logger.log("🧹 Pod cleaner enabled", { @@ -99,15 +110,6 @@ class ManagedSupervisor { } else { this.logger.warn("Failed pod handler disabled"); } - - this.resourceMonitor = new KubernetesResourceMonitor( - createK8sApi(), - env.TRIGGER_WORKER_INSTANCE_NAME - ); - this.workloadManager = new KubernetesWorkloadManager(workloadManagerOptions); - } else { - this.resourceMonitor = new DockerResourceMonitor(new Docker()); - this.workloadManager = new DockerWorkloadManager(workloadManagerOptions); } this.workerSession = new SupervisorSession({ @@ -123,12 +125,17 @@ class ManagedSupervisor { runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED, heartbeatIntervalSeconds: env.TRIGGER_WORKER_HEARTBEAT_INTERVAL_SECONDS, preDequeue: async () => { + if (!env.RESOURCE_MONITOR_ENABLED) { + return {}; + } + if (this.isKubernetes) { // Not used in k8s for now return {}; } const resources = await this.resourceMonitor.getNodeResources(); + return { maxResources: { cpu: resources.cpuAvailable, diff --git a/apps/supervisor/src/resourceMonitor.ts b/apps/supervisor/src/resourceMonitor.ts index d25d82ddcd..507a52bbf6 100644 --- a/apps/supervisor/src/resourceMonitor.ts +++ b/apps/supervisor/src/resourceMonitor.ts @@ -70,18 +70,21 @@ export abstract class ResourceMonitor { } protected applyOverrides(resources: NodeResources): NodeResources { - if (!env.OVERRIDE_CPU_TOTAL && !env.OVERRIDE_MEMORY_TOTAL_GB) { + if ( + !env.RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL && + !env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB + ) { return resources; } logger.debug("[ResourceMonitor] 🛡️ Applying resource overrides", { - cpuTotal: env.OVERRIDE_CPU_TOTAL, - memoryTotalGb: env.OVERRIDE_MEMORY_TOTAL_GB, + cpuTotal: env.RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL, + memoryTotalGb: env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB, }); - const cpuTotal = env.OVERRIDE_CPU_TOTAL ?? resources.cpuTotal; - const memoryTotal = env.OVERRIDE_MEMORY_TOTAL_GB - ? this.gbToBytes(env.OVERRIDE_MEMORY_TOTAL_GB) + const cpuTotal = env.RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL ?? resources.cpuTotal; + const memoryTotal = env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB + ? this.gbToBytes(env.RESOURCE_MONITOR_OVERRIDE_MEMORY_TOTAL_GB) : resources.memoryTotal; const cpuDiff = cpuTotal - resources.cpuTotal; @@ -212,6 +215,21 @@ export class KubernetesResourceMonitor extends ResourceMonitor { } } +export class NoopResourceMonitor extends ResourceMonitor { + constructor() { + super(NoopResourceParser); + } + + async getNodeResources(): Promise { + return { + cpuTotal: 0, + cpuAvailable: Infinity, + memoryTotal: 0, + memoryAvailable: Infinity, + }; + } +} + abstract class ResourceParser { abstract cpu(cpu: number | string): number; abstract memory(memory: number | string): number; @@ -248,3 +266,13 @@ class KubernetesResourceParser extends ResourceParser { return parseInt(memory); } } + +class NoopResourceParser extends ResourceParser { + cpu(cpu: number): number { + return cpu; + } + + memory(memory: number): number { + return memory; + } +} From f103f025368ec043aadb093cbada7ba4f01b7d77 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 16 May 2025 02:49:15 +0100 Subject: [PATCH 07/10] support attaching docker runners to multiple networks --- apps/supervisor/src/env.ts | 12 ++- apps/supervisor/src/workloadManager/docker.ts | 74 ++++++++++++++++++- 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 9d53b61300..a4f6596011 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -29,6 +29,17 @@ const Env = z.object({ RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().optional(), RUNNER_ADDITIONAL_ENV_VARS: AdditionalEnvVars, // optional (csv) RUNNER_DOCKER_AUTOREMOVE: BoolEnv.default(true), + /** + * Network mode to use for all runners. Supported standard values are: `bridge`, `host`, `none`, and `container:`. + * Any other value is taken as a custom network's name to which all runners should connect to. + * + * Accepts a list of comma-separated values to attach to multiple networks. Additional networks are interpreted as network names and will be attached after container creation. + * + * **WARNING**: Specifying multiple networks will slightly increase startup times. + * + * @default "host" + */ + RUNNER_DOCKER_NETWORKS: z.string().default("host"), // Dequeue settings (provider mode) TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"), @@ -43,7 +54,6 @@ const Env = z.object({ TRIGGER_METADATA_URL: z.string().optional(), // Used by the workload manager, e.g docker/k8s - DOCKER_NETWORK: z.string().default("host"), OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), ENFORCE_MACHINE_PRESETS: z.coerce.boolean().default(false), KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv diff --git a/apps/supervisor/src/workloadManager/docker.ts b/apps/supervisor/src/workloadManager/docker.ts index e9dfa2f034..3669e492a9 100644 --- a/apps/supervisor/src/workloadManager/docker.ts +++ b/apps/supervisor/src/workloadManager/docker.ts @@ -7,11 +7,14 @@ import { import { env } from "../env.js"; import { getDockerHostDomain, getRunnerId } from "../util.js"; import Docker from "dockerode"; +import { tryCatch } from "@trigger.dev/core"; export class DockerWorkloadManager implements WorkloadManager { private readonly logger = new SimpleStructuredLogger("docker-workload-manager"); private readonly docker: Docker; + private readonly runnerNetworks: string[]; + constructor(private opts: WorkloadManagerOptions) { this.docker = new Docker(); @@ -20,6 +23,8 @@ export class DockerWorkloadManager implements WorkloadManager { domain: opts.workloadApiDomain, }); } + + this.runnerNetworks = env.RUNNER_DOCKER_NETWORKS.split(","); } async create(opts: WorkloadManagerCreateOptions) { @@ -67,10 +72,16 @@ export class DockerWorkloadManager implements WorkloadManager { } const hostConfig: Docker.HostConfig = { - NetworkMode: env.DOCKER_NETWORK, AutoRemove: !!this.opts.dockerAutoremove, }; + const [firstNetwork, ...remainingNetworks] = this.runnerNetworks; + + // Always attach the first network at container creation time. This has the following benefits: + // - If there is only a single network to attach, this will prevent having to make a separate request. + // - If there are multiple networks to attach, this will ensure the runner won't also be connected to the bridge network + hostConfig.NetworkMode = firstNetwork; + if (env.ENFORCE_MACHINE_PRESETS) { envVars.push(`TRIGGER_MACHINE_CPU=${opts.machine.cpu}`); envVars.push(`TRIGGER_MACHINE_MEMORY=${opts.machine.memory}`); @@ -94,12 +105,71 @@ export class DockerWorkloadManager implements WorkloadManager { // Create container const container = await this.docker.createContainer(containerCreateOpts); + // If there are multiple networks to attach to we need to attach the remaining ones after creation + if (remainingNetworks.length > 0) { + await this.attachContainerToNetworks({ + containerId: container.id, + networkNames: remainingNetworks, + }); + } + // Start container const startResult = await container.start(); - this.logger.debug("create succeeded", { opts, startResult, container, containerCreateOpts }); + this.logger.debug("create succeeded", { + opts, + startResult, + containerId: container.id, + containerCreateOpts, + }); } catch (error) { this.logger.error("create failed:", { opts, error, containerCreateOpts }); } } + + private async attachContainerToNetworks({ + containerId, + networkNames, + }: { + containerId: string; + networkNames: string[]; + }) { + this.logger.debug("Attaching container to networks", { containerId, networkNames }); + + const [error, networkResults] = await tryCatch( + this.docker.listNetworks({ + filters: { + // Full name matches only to prevent unexpected results + name: networkNames.map((name) => `^${name}$`), + }, + }) + ); + + if (error) { + this.logger.error("Failed to list networks", { networkNames }); + return; + } + + const results = await Promise.allSettled( + networkResults.map((networkInfo) => { + const network = this.docker.getNetwork(networkInfo.Id); + return network.connect({ Container: containerId }); + }) + ); + + if (results.some((r) => r.status === "rejected")) { + this.logger.error("Failed to attach container to some networks", { + containerId, + networkNames, + results, + }); + return; + } + + this.logger.debug("Attached container to networks", { + containerId, + networkNames, + results, + }); + } } From f9c3f8242bf2b63049b88d2a1c86a9e7cb8108f2 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 16 May 2025 02:50:04 +0100 Subject: [PATCH 08/10] warn if dequeue interval > idle dequeue interval --- apps/supervisor/src/index.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index 7dfaaf2cc1..c43a4e7c50 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -112,6 +112,12 @@ class ManagedSupervisor { } } + if (env.TRIGGER_DEQUEUE_INTERVAL_MS > env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS) { + this.logger.warn( + `⚠️ TRIGGER_DEQUEUE_INTERVAL_MS (${env.TRIGGER_DEQUEUE_INTERVAL_MS}) is greater than TRIGGER_DEQUEUE_IDLE_INTERVAL_MS (${env.TRIGGER_DEQUEUE_IDLE_INTERVAL_MS}) - did you mix them up?` + ); + } + this.workerSession = new SupervisorSession({ workerToken: env.TRIGGER_WORKER_TOKEN, apiUrl: env.TRIGGER_API_URL, From 8633d0b514d449a4f4dfd06f9a3c50acda2fd7c4 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 16 May 2025 02:51:56 +0100 Subject: [PATCH 09/10] verbose logs --- apps/supervisor/src/index.ts | 6 ----- .../supervisor/queueConsumer.ts | 24 +++++++++++++++---- .../v3/runEngineWorker/supervisor/session.ts | 4 +--- .../core/src/v3/utils/structuredLogger.ts | 11 ++++++++- 4 files changed, 30 insertions(+), 15 deletions(-) diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index c43a4e7c50..eb5ca4679f 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -168,11 +168,6 @@ class ManagedSupervisor { }); } - // setInterval(async () => { - // const resources = await this.resourceMonitor.getNodeResources(true); - // this.logger.debug("Current resources", { resources }); - // }, 1000); - this.workerSession.on("runNotification", async ({ time, run }) => { this.logger.log("runNotification", { time, run }); @@ -187,7 +182,6 @@ class ManagedSupervisor { runId: message.run.id, completedWaitpoints: message.completedWaitpoints.length, }); - // TODO: Do something with them or if we don't need the data here, maybe we shouldn't even send it } if (!message.image) { diff --git a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts index 66df549678..6eb5572bf3 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts @@ -55,16 +55,23 @@ export class RunQueueConsumer { } private async dequeue() { - // Incredibly verbose logging for debugging purposes - // this.logger.debug("dequeue()", { enabled: this.isEnabled }); + this.logger.verbose("dequeue()", { + enabled: this.isEnabled, + intervalMs: this.intervalMs, + idleIntervalMs: this.idleIntervalMs, + maxRunCount: this.maxRunCount, + preDequeue: !!this.preDequeue, + preSkip: !!this.preSkip, + }); if (!this.isEnabled) { + this.logger.warn("dequeue() - not enabled"); return; } let preDequeueResult: Awaited> | undefined; if (this.preDequeue) { - // this.logger.debug("preDequeue()"); + this.logger.verbose("preDequeue()"); try { preDequeueResult = await this.preDequeue(); @@ -73,13 +80,15 @@ export class RunQueueConsumer { } } - // this.logger.debug("preDequeueResult", { preDequeueResult }); + this.logger.verbose("preDequeueResult", { preDequeueResult }); if ( preDequeueResult?.skipDequeue || preDequeueResult?.maxResources?.cpu === 0 || preDequeueResult?.maxResources?.memory === 0 ) { + this.logger.debug("skipping dequeue", { preDequeueResult }); + if (this.preSkip) { this.logger.debug("preSkip()"); @@ -90,7 +99,8 @@ export class RunQueueConsumer { } } - return this.scheduleNextDequeue(this.idleIntervalMs); + this.scheduleNextDequeue(this.idleIntervalMs); + return; } let nextIntervalMs = this.idleIntervalMs; @@ -122,6 +132,10 @@ export class RunQueueConsumer { } scheduleNextDequeue(delayMs: number) { + if (delayMs === this.idleIntervalMs && this.idleIntervalMs !== this.intervalMs) { + this.logger.verbose("scheduled dequeue with idle interval", { delayMs }); + } + setTimeout(this.dequeue.bind(this), delayMs); } } diff --git a/packages/core/src/v3/runEngineWorker/supervisor/session.ts b/packages/core/src/v3/runEngineWorker/supervisor/session.ts index 8767008fda..b97a147216 100644 --- a/packages/core/src/v3/runEngineWorker/supervisor/session.ts +++ b/packages/core/src/v3/runEngineWorker/supervisor/session.ts @@ -75,11 +75,9 @@ export class SupervisorSession extends EventEmitter { } private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise { - // Incredibly verbose logging for debugging purposes - // this.logger.debug("Dequeued messages with contents", { count: messages.length, messages }); + this.logger.verbose("Dequeued messages with contents", { count: messages.length, messages }); for (const message of messages) { - this.logger.debug("Emitting message", { message }); this.emit("runQueueMessage", { time: new Date(), message, diff --git a/packages/core/src/v3/utils/structuredLogger.ts b/packages/core/src/v3/utils/structuredLogger.ts index 96a8957ef7..1aae399bbf 100644 --- a/packages/core/src/v3/utils/structuredLogger.ts +++ b/packages/core/src/v3/utils/structuredLogger.ts @@ -15,6 +15,7 @@ export enum LogLevel { "warn", "info", "debug", + "verbose", } export class SimpleStructuredLogger implements StructuredLogger { @@ -22,7 +23,9 @@ export class SimpleStructuredLogger implements StructuredLogger { constructor( private name: string, - private level: LogLevel = ["1", "true"].includes(process.env.DEBUG ?? "") + private level: LogLevel = ["1", "true"].includes(process.env.VERBOSE ?? "") + ? LogLevel.verbose + : ["1", "true"].includes(process.env.DEBUG ?? "") ? LogLevel.debug : LogLevel.info, private fields?: Record @@ -62,6 +65,12 @@ export class SimpleStructuredLogger implements StructuredLogger { this.#structuredLog(console.debug, message, "debug", ...args); } + verbose(message: string, ...args: StructuredArgs) { + if (this.level < LogLevel.verbose) return; + + this.#structuredLog(console.debug, message, "verbose", ...args); + } + addFields(fields: Record) { this.fields = { ...this.fields, From 14bf47d0f1a886556417e619ba50bb1b5da51dca Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 16 May 2025 03:23:59 +0100 Subject: [PATCH 10/10] add changeset --- .changeset/rare-beds-accept.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/rare-beds-accept.md diff --git a/.changeset/rare-beds-accept.md b/.changeset/rare-beds-accept.md new file mode 100644 index 0000000000..dccd97a96a --- /dev/null +++ b/.changeset/rare-beds-accept.md @@ -0,0 +1,5 @@ +--- +"@trigger.dev/core": patch +--- + +Add verbose structured log level