diff --git a/apps/supervisor/README.md b/apps/supervisor/README.md index 9f2f5b9e23..e3bad3dcb6 100644 --- a/apps/supervisor/README.md +++ b/apps/supervisor/README.md @@ -8,20 +8,15 @@ api_url=http://localhost:3030 wg_name=my-worker -# edit these +# edit this admin_pat=tr_pat_... -project_id=clsw6q8wz... curl -sS \ -X POST \ "$api_url/admin/api/v1/workers" \ -H "Authorization: Bearer $admin_pat" \ -H "Content-Type: application/json" \ - -d "{ - \"name\": \"$wg_name\", - \"makeDefault\": true, - \"projectId\": \"$project_id\" - }" + -d "{\"name\": \"$wg_name\"}" ``` 2. Create `.env` and set the worker token @@ -47,3 +42,26 @@ pnpm exec trigger deploy --self-hosted # The additional network flag is required on linux pnpm exec trigger deploy --self-hosted --network host ``` + +## Additional worker groups + +When adding more worker groups you might also want to make them the default for a specific project. This will allow you to test it without having to change the global default: + +```sh +api_url=http://localhost:3030 +wg_name=my-worker + +# edit these +admin_pat=tr_pat_... +project_id=clsw6q8wz... + +curl -sS \ + -X POST \ + "$api_url/admin/api/v1/workers" \ + -H "Authorization: Bearer $admin_pat" \ + -H "Content-Type: application/json" \ + -d "{ + \"name\": \"$wg_name\", + \"makeDefaultForProjectId\": \"$project_id\" + }" +``` diff --git a/apps/supervisor/src/env.ts b/apps/supervisor/src/env.ts index 5fe5abe21b..1b2eb52ddc 100644 --- a/apps/supervisor/src/env.ts +++ b/apps/supervisor/src/env.ts @@ -26,6 +26,7 @@ const Env = z.object({ .transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase())) .default("http"), TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default + TRIGGER_WORKLOAD_API_HOST_INTERNAL: z.string().default("0.0.0.0"), TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller @@ -41,6 +42,7 @@ const Env = z.object({ DOCKER_NETWORK: z.string().default("host"), OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url(), ENFORCE_MACHINE_PRESETS: z.coerce.boolean().default(false), + KUBERNETES_IMAGE_PULL_SECRETS: z.string().optional(), // csv // Used by the resource monitor OVERRIDE_CPU_TOTAL: z.coerce.number().optional(), @@ -53,7 +55,10 @@ const Env = z.object({ EPHEMERAL_STORAGE_SIZE_REQUEST: z.string().default("2Gi"), // Metrics + METRICS_ENABLED: BoolEnv.default(true), METRICS_COLLECT_DEFAULTS: BoolEnv.default(true), + METRICS_HOST: z.string().default("127.0.0.1"), + METRICS_PORT: z.coerce.number().int().default(9090), // Pod cleaner POD_CLEANER_ENABLED: BoolEnv.default(true), @@ -63,6 +68,9 @@ const Env = z.object({ // Failed pod handler FAILED_POD_HANDLER_ENABLED: BoolEnv.default(true), FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS: z.coerce.number().int().default(1000), + + // Debug + DEBUG: BoolEnv.default(false), }); export const env = Env.parse(stdEnv); diff --git a/apps/supervisor/src/index.ts b/apps/supervisor/src/index.ts index b5d7ea36d1..27b9998427 100644 --- a/apps/supervisor/src/index.ts +++ b/apps/supervisor/src/index.ts @@ -30,7 +30,7 @@ if (env.METRICS_COLLECT_DEFAULTS) { class ManagedSupervisor { private readonly workerSession: SupervisorSession; - private readonly httpServer: HttpServer; + private readonly metricsServer?: HttpServer; private readonly workloadServer: WorkloadServer; private readonly workloadManager: WorkloadManager; private readonly logger = new SimpleStructuredLogger("managed-worker"); @@ -44,24 +44,15 @@ class ManagedSupervisor { private readonly warmStartUrl = env.TRIGGER_WARM_START_URL; constructor() { - const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL; - const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN; - const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL; + const { TRIGGER_WORKER_TOKEN, MANAGED_WORKER_SECRET, ...envWithoutSecrets } = env; - if (env.POD_CLEANER_ENABLED) { - this.podCleaner = new PodCleaner({ - namespace: env.KUBERNETES_NAMESPACE, - batchSize: env.POD_CLEANER_BATCH_SIZE, - intervalMs: env.POD_CLEANER_INTERVAL_MS, - }); + if (env.DEBUG) { + console.debug("[ManagedSupervisor] Starting up", { envWithoutSecrets }); } - if (env.FAILED_POD_HANDLER_ENABLED) { - this.failedPodHandler = new FailedPodHandler({ - namespace: env.KUBERNETES_NAMESPACE, - reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, - }); - } + const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL; + const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN; + const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL; if (this.warmStartUrl) { this.logger.log("[ManagedWorker] ๐Ÿ”ฅ Warm starts enabled", { @@ -70,12 +61,43 @@ class ManagedSupervisor { } if (this.isKubernetes) { + if (env.POD_CLEANER_ENABLED) { + this.logger.log("[ManagedWorker] ๐Ÿงน Pod cleaner enabled", { + namespace: env.KUBERNETES_NAMESPACE, + batchSize: env.POD_CLEANER_BATCH_SIZE, + intervalMs: env.POD_CLEANER_INTERVAL_MS, + }); + this.podCleaner = new PodCleaner({ + register, + namespace: env.KUBERNETES_NAMESPACE, + batchSize: env.POD_CLEANER_BATCH_SIZE, + intervalMs: env.POD_CLEANER_INTERVAL_MS, + }); + } else { + this.logger.warn("[ManagedWorker] Pod cleaner disabled"); + } + + if (env.FAILED_POD_HANDLER_ENABLED) { + this.logger.log("[ManagedWorker] ๐Ÿ” Failed pod handler enabled", { + namespace: env.KUBERNETES_NAMESPACE, + reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, + }); + this.failedPodHandler = new FailedPodHandler({ + register, + namespace: env.KUBERNETES_NAMESPACE, + reconnectIntervalMs: env.FAILED_POD_HANDLER_RECONNECT_INTERVAL_MS, + }); + } else { + this.logger.warn("[ManagedWorker] Failed pod handler disabled"); + } + this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), ""); this.workloadManager = new KubernetesWorkloadManager({ workloadApiProtocol, workloadApiDomain, workloadApiPort: workloadApiPortExternal, warmStartUrl: this.warmStartUrl, + imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","), }); } else { this.resourceMonitor = new DockerResourceMonitor(new Docker()); @@ -224,16 +246,21 @@ class ManagedSupervisor { } }); - // Used for health checks and metrics - this.httpServer = new HttpServer({ port: 8080, host: "0.0.0.0" }).route("/health", "GET", { - handler: async ({ reply }) => { - reply.text("OK"); - }, - }); + if (env.METRICS_ENABLED) { + this.metricsServer = new HttpServer({ + port: env.METRICS_PORT, + host: env.METRICS_HOST, + metrics: { + register, + expose: true, + }, + }); + } // Responds to workload requests only this.workloadServer = new WorkloadServer({ port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL, + host: env.TRIGGER_WORKLOAD_API_HOST_INTERNAL, workerClient: this.workerSession.httpClient, checkpointClient: this.checkpointClient, }); @@ -299,13 +326,10 @@ class ManagedSupervisor { async start() { this.logger.log("[ManagedWorker] Starting up"); - if (this.podCleaner) { - await this.podCleaner.start(); - } - - if (this.failedPodHandler) { - await this.failedPodHandler.start(); - } + // Optional services + await this.podCleaner?.start(); + await this.failedPodHandler?.start(); + await this.metricsServer?.start(); if (env.TRIGGER_WORKLOAD_API_ENABLED) { this.logger.log("[ManagedWorker] Workload API enabled", { @@ -319,21 +343,16 @@ class ManagedSupervisor { } await this.workerSession.start(); - - await this.httpServer.start(); } async stop() { this.logger.log("[ManagedWorker] Shutting down"); - await this.httpServer.stop(); + await this.workerSession.stop(); - if (this.podCleaner) { - await this.podCleaner.stop(); - } - - if (this.failedPodHandler) { - await this.failedPodHandler.stop(); - } + // Optional services + await this.podCleaner?.stop(); + await this.failedPodHandler?.stop(); + await this.metricsServer?.stop(); } } diff --git a/apps/supervisor/src/workloadManager/kubernetes.ts b/apps/supervisor/src/workloadManager/kubernetes.ts index 90977ed21b..50e9b81f47 100644 --- a/apps/supervisor/src/workloadManager/kubernetes.ts +++ b/apps/supervisor/src/workloadManager/kubernetes.ts @@ -182,18 +182,15 @@ export class KubernetesWorkloadManager implements WorkloadManager { } } + private getImagePullSecrets(): k8s.V1LocalObjectReference[] | undefined { + return this.opts.imagePullSecrets?.map((name) => ({ name })); + } + get #defaultPodSpec(): Omit { return { restartPolicy: "Never", automountServiceAccountToken: false, - imagePullSecrets: [ - { - name: "registry-trigger", - }, - { - name: "registry-trigger-failover", - }, - ], + imagePullSecrets: this.getImagePullSecrets(), nodeSelector: { nodetype: "worker-re2", }, diff --git a/apps/supervisor/src/workloadManager/types.ts b/apps/supervisor/src/workloadManager/types.ts index ea2046b631..ed06abc8c8 100644 --- a/apps/supervisor/src/workloadManager/types.ts +++ b/apps/supervisor/src/workloadManager/types.ts @@ -5,6 +5,7 @@ export interface WorkloadManagerOptions { workloadApiDomain?: string; // If unset, will use orchestrator-specific default workloadApiPort: number; warmStartUrl?: string; + imagePullSecrets?: string[]; } export interface WorkloadManager { diff --git a/apps/supervisor/src/workloadServer/index.ts b/apps/supervisor/src/workloadServer/index.ts index 1421d8f98d..ed90c450c3 100644 --- a/apps/supervisor/src/workloadServer/index.ts +++ b/apps/supervisor/src/workloadServer/index.ts @@ -22,6 +22,7 @@ import { } from "@trigger.dev/core/v3/workers"; import { HttpServer, type CheckpointClient } from "@trigger.dev/core/v3/serverOnly"; import { type IncomingMessage } from "node:http"; +import { register } from "../metrics.js"; // Use the official export when upgrading to socket.io@4.8.0 interface DefaultEventsMap { @@ -121,7 +122,19 @@ export class WorkloadServer extends EventEmitter { } private createHttpServer({ host, port }: { host: string; port: number }) { - return new HttpServer({ port, host }) + return new HttpServer({ + port, + host, + metrics: { + register, + expose: false, + }, + }) + .route("/health", "GET", { + handler: async ({ reply }) => { + reply.text("OK"); + }, + }) .route( "/api/v1/workload-actions/runs/:runFriendlyId/snapshots/:snapshotFriendlyId/attempts/start", "POST", diff --git a/apps/webapp/app/routes/admin.api.v1.workers.ts b/apps/webapp/app/routes/admin.api.v1.workers.ts index 185c9cc4d0..9299c0e2c0 100644 --- a/apps/webapp/app/routes/admin.api.v1.workers.ts +++ b/apps/webapp/app/routes/admin.api.v1.workers.ts @@ -7,8 +7,7 @@ import { WorkerGroupService } from "~/v3/services/worker/workerGroupService.serv const RequestBodySchema = z.object({ name: z.string().optional(), description: z.string().optional(), - projectId: z.string().optional(), - makeDefault: z.boolean().optional(), + makeDefaultForProjectId: z.string().optional(), }); export async function action({ request }: ActionFunctionArgs) { @@ -35,7 +34,7 @@ export async function action({ request }: ActionFunctionArgs) { try { const rawBody = await request.json(); - const { name, description, projectId, makeDefault } = RequestBodySchema.parse(rawBody ?? {}); + const { name, description, makeDefaultForProjectId } = RequestBodySchema.parse(rawBody ?? {}); const service = new WorkerGroupService(); const { workerGroup, token } = await service.createWorkerGroup({ @@ -43,14 +42,13 @@ export async function action({ request }: ActionFunctionArgs) { description, }); - if (makeDefault && projectId) { + if (makeDefaultForProjectId) { await prisma.project.update({ where: { - id: projectId, + id: makeDefaultForProjectId, }, data: { defaultWorkerGroupId: workerGroup.id, - engine: "V2", }, }); } diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.restore.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.restore.ts deleted file mode 100644 index 8f892288e6..0000000000 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.restore.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { json, TypedResponse } from "@remix-run/server-runtime"; -import { - WorkerApiSuspendRunRequestBody, - WorkerApiSuspendRunResponseBody, -} from "@trigger.dev/core/v3/workers"; -import { z } from "zod"; -import { logger } from "~/services/logger.server"; -import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server"; - -export const action = createActionWorkerApiRoute( - { - params: z.object({ - runFriendlyId: z.string(), - snapshotFriendlyId: z.string(), - }), - body: WorkerApiSuspendRunRequestBody, - }, - async ({ - authenticatedWorker, - params, - body, - }): Promise> => { - const { runFriendlyId, snapshotFriendlyId } = params; - - logger.debug("Restoring run", { runFriendlyId, snapshotFriendlyId, body }); - - if (!body.success) { - // TODO: we could create a debug span here - logger.error("Failed to restore run", { - runFriendlyId, - snapshotFriendlyId, - error: body.error, - }); - - return json({ ok: true }); - } - - try { - await authenticatedWorker.createCheckpoint({ - runFriendlyId, - snapshotFriendlyId, - checkpoint: body.checkpoint, - }); - - return json({ ok: true }); - } catch (error) { - logger.error("Failed to restore run", { runFriendlyId, snapshotFriendlyId, error }); - throw error; - } - } -); diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts index 323c98405f..2d745ed94a 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts @@ -25,7 +25,6 @@ export const action = createActionWorkerApiRoute( logger.debug("Suspending run", { runFriendlyId, snapshotFriendlyId, body }); if (!body.success) { - // TODO: we could create a debug span here logger.error("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, diff --git a/apps/webapp/app/v3/services/initializeDeployment.server.ts b/apps/webapp/app/v3/services/initializeDeployment.server.ts index c5a375ba90..e99473ca9c 100644 --- a/apps/webapp/app/v3/services/initializeDeployment.server.ts +++ b/apps/webapp/app/v3/services/initializeDeployment.server.ts @@ -18,8 +18,20 @@ export class InitializeDeploymentService extends BaseService { payload: InitializeDeploymentRequestBody ) { return this.traceWithEnv("call", environment, async (span) => { - if (payload.type !== "V1" && environment.project.engine !== "V2") { - throw new ServiceValidationError("Only V1 deployments are supported for this project"); + if (payload.type === "UNMANAGED") { + throw new ServiceValidationError("UNMANAGED deployments are not supported"); + } + + // Upgrade the project to engine "V2" if it's not already. This should cover cases where people deploy to V2 without running dev first. + if (payload.type === "MANAGED" && environment.project.engine === "V1") { + await this._prisma.project.update({ + where: { + id: environment.project.id, + }, + data: { + engine: "V2", + }, + }); } const latestDeployment = await this._prisma.workerDeployment.findFirst({ diff --git a/apps/webapp/app/v3/services/worker/workerGroupService.server.ts b/apps/webapp/app/v3/services/worker/workerGroupService.server.ts index 24d457a882..c654dd3bf5 100644 --- a/apps/webapp/app/v3/services/worker/workerGroupService.server.ts +++ b/apps/webapp/app/v3/services/worker/workerGroupService.server.ts @@ -47,7 +47,13 @@ export class WorkerGroupService extends WithRunEngine { }, }); - if (managedCount === 1) { + const getFlag = makeFlags(this._prisma); + const defaultWorkerInstanceGroupId = await getFlag({ + key: "defaultWorkerInstanceGroupId", + }); + + // If there's no global default yet we should set it to the new worker group + if (!defaultWorkerInstanceGroupId) { const setFlag = makeSetFlags(this._prisma); await setFlag({ key: "defaultWorkerInstanceGroupId", diff --git a/packages/cli-v3/src/entryPoints/managed-run-controller.ts b/packages/cli-v3/src/entryPoints/managed-run-controller.ts index 08cf64ab03..495e85b009 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-controller.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-controller.ts @@ -485,6 +485,7 @@ class ManagedRunController { console.log("Run is finished, nothing to do"); return; } + case "QUEUED_EXECUTING": case "EXECUTING_WITH_WAITPOINTS": { console.log("Run is executing with waitpoints", { snapshot }); @@ -629,7 +630,6 @@ class ManagedRunController { return; } case "RUN_CREATED": - case "QUEUED_EXECUTING": case "QUEUED": { console.log("Status change not handled", { status: snapshot.executionStatus }); return; diff --git a/packages/core/src/v3/apps/http.ts b/packages/core/src/v3/apps/http.ts index 10c02f074f..7e41f35f4a 100644 --- a/packages/core/src/v3/apps/http.ts +++ b/packages/core/src/v3/apps/http.ts @@ -45,7 +45,7 @@ export class HttpReply { constructor(private response: Parameters[1]) {} empty(status?: number) { - if (this.alreadyReplied) { + if (this.hasReplied) { return; } @@ -53,7 +53,7 @@ export class HttpReply { } text(text: string, status?: number, contentType?: string) { - if (this.alreadyReplied) { + if (this.hasReplied) { return; } @@ -63,7 +63,7 @@ export class HttpReply { } json(value: any, pretty?: boolean, status?: number) { - if (this.alreadyReplied) { + if (this.hasReplied) { return; } @@ -74,7 +74,7 @@ export class HttpReply { ); } - private get alreadyReplied() { + get hasReplied() { return this.response.headersSent; } } diff --git a/packages/core/src/v3/serverOnly/httpServer.ts b/packages/core/src/v3/serverOnly/httpServer.ts index 722fa777b3..3aa327a8bd 100644 --- a/packages/core/src/v3/serverOnly/httpServer.ts +++ b/packages/core/src/v3/serverOnly/httpServer.ts @@ -28,6 +28,7 @@ interface RouteDefinition< paramsSchema?: TParams; querySchema?: TQuery; bodySchema?: TBody; + keepConnectionAlive?: boolean; handler: RouteHandler; } @@ -156,7 +157,8 @@ export class HttpServer { return reply.empty(405); } - const { handler, paramsSchema, querySchema, bodySchema } = routeDefinition; + const { handler, paramsSchema, querySchema, bodySchema, keepConnectionAlive } = + routeDefinition; const params = this.parseRouteParams(route, url); const parsedParams = this.optionalSchema(paramsSchema, params); @@ -202,6 +204,11 @@ export class HttpServer { logger.error("Route handler error", { error }); return reply.empty(500); } + + if (keepConnectionAlive) { + // Return early to keep the connection alive + return; + } } catch (error) { logger.error("Failed to handle request", { error }); return reply.empty(500); diff --git a/packages/core/src/v3/serverOnly/k8s.ts b/packages/core/src/v3/serverOnly/k8s.ts index 333166a038..4234e15c49 100644 --- a/packages/core/src/v3/serverOnly/k8s.ts +++ b/packages/core/src/v3/serverOnly/k8s.ts @@ -12,5 +12,7 @@ export function isKubernetesEnvironment(override?: boolean): boolean { env.KUBERNETES_SERVICE_PORT, ]; + console.debug("k8sIndicators", { k8sIndicators }); + return k8sIndicators.some(Boolean); }