diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index a7868ee5bf..fe7411e50b 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -573,6 +573,8 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL: z.coerce.number().int().default(500), RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE: z.coerce.number().int().default(10), + RUN_ENGINE_WORKER_ENABLED: z.string().default("1"), + /** How long should the presence ttl last */ DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000), DEV_PRESENCE_TTL_MS: z.coerce.number().int().default(5_000), diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index 15150072cb..7c9b07dcc8 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -14,6 +14,7 @@ function createRunEngine() { const engine = new RunEngine({ prisma, worker: { + disabled: env.RUN_ENGINE_WORKER_ENABLED === "0", workers: env.RUN_ENGINE_WORKER_COUNT, tasksPerWorker: env.RUN_ENGINE_TASKS_PER_WORKER, pollIntervalMs: env.RUN_ENGINE_WORKER_POLL_INTERVAL, diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 7f1f44db6d..b4f6bff232 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -1,5 +1,4 @@ import { createRedisClient, Redis } from "@internal/redis"; -import { Worker } from "@trigger.dev/redis-worker"; import { startSpan, trace, Tracer } from "@internal/tracing"; import { Logger } from "@trigger.dev/core/logger"; import { @@ -13,7 +12,7 @@ import { StartRunAttemptResult, TaskRunExecutionResult, } from "@trigger.dev/core/v3"; -import { BatchId, QueueId, RunId, WaitpointId } from "@trigger.dev/core/v3/isomorphic"; +import { BatchId, RunId, WaitpointId } from "@trigger.dev/core/v3/isomorphic"; import { Prisma, PrismaClient, @@ -22,6 +21,7 @@ import { TaskRunExecutionSnapshot, Waitpoint, } from "@trigger.dev/database"; +import { Worker } from "@trigger.dev/redis-worker"; import { assertNever } from "assert-never"; import { EventEmitter } from "node:events"; import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrategy.js"; @@ -44,11 +44,11 @@ import { ExecutionSnapshotSystem, getLatestExecutionSnapshot, } from "./systems/executionSnapshotSystem.js"; +import { PendingVersionSystem } from "./systems/pendingVersionSystem.js"; import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js"; import { RunAttemptSystem } from "./systems/runAttemptSystem.js"; import { SystemResources } from "./systems/systems.js"; import { TtlSystem } from "./systems/ttlSystem.js"; -import { PendingVersionSystem } from "./systems/pendingVersionSystem.js"; import { WaitpointSystem } from "./systems/waitpointSystem.js"; import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js"; import { workerCatalog } from "./workerCatalog.js"; @@ -165,7 +165,11 @@ export class RunEngine { await this.delayedRunSystem.enqueueDelayedRun({ runId: payload.runId }); }, }, - }).start(); + }); + + if (!options.worker.disabled) { + this.worker.start(); + } this.tracer = options.tracer; diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index abf520573f..114f38b05e 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -10,6 +10,7 @@ import { workerCatalog } from "./workerCatalog.js"; export type RunEngineOptions = { prisma: PrismaClient; worker: { + disabled?: boolean; redis: RedisOptions; pollIntervalMs?: number; immediatePollIntervalMs?: number;