Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,10 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE: z.string().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE: z.string().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER: z.coerce.number().int().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER: z.coerce.number().int().optional(),

RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
Expand Down Expand Up @@ -593,6 +597,7 @@ const EnvironmentSchema = z.object({

RUN_ENGINE_WORKER_ENABLED: z.string().default("1"),
RUN_ENGINE_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
RUN_ENGINE_RUN_QUEUE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

/** How long should the presence ttl last */
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),
Expand Down
14 changes: 11 additions & 3 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { RunEngine } from "@internal/run-engine";
import { defaultMachine } from "~/services/platform.v3.server";
import { prisma } from "~/db.server";
import { $replica, prisma } from "~/db.server";
import { env } from "~/env.server";
import { defaultMachine } from "~/services/platform.v3.server";
import { singleton } from "~/utils/singleton";
import { allMachines } from "./machinePresets.server";
import { tracer, meter } from "./tracer.server";
import { meter, tracer } from "./tracer.server";

export const engine = singleton("RunEngine", createRunEngine);

Expand All @@ -13,6 +13,7 @@ export type { RunEngine };
function createRunEngine() {
const engine = new RunEngine({
prisma,
readOnlyPrisma: $replica,
logLevel: env.RUN_ENGINE_WORKER_LOG_LEVEL,
worker: {
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
Expand All @@ -39,6 +40,7 @@ function createRunEngine() {
},
queue: {
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
logLevel: env.RUN_ENGINE_RUN_QUEUE_LOG_LEVEL,
redis: {
keyPrefix: "engine:",
port: env.RUN_ENGINE_RUN_QUEUE_REDIS_PORT ?? undefined,
Expand All @@ -64,6 +66,12 @@ function createRunEngine() {
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
concurrencySweeper: {
scanSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE,
processMarkedSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE,
scanJitter: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER,
processMarkedJitter: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER,
},
},
runLock: {
redis: {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/test/engine/triggerTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { describe, expect, vi } from "vitest";
// Mock the db prisma client
vi.mock("~/db.server", () => ({
prisma: {},
$replica: {},
}));

vi.mock("~/services/platform.v3.server", async (importOriginal) => {
Expand Down
53 changes: 52 additions & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
Prisma,
PrismaClient,
PrismaClientOrTransaction,
PrismaReplicaClient,
TaskRun,
TaskRunExecutionSnapshot,
Waitpoint,
Expand Down Expand Up @@ -50,6 +51,7 @@ import { TtlSystem } from "./systems/ttlSystem.js";
import { WaitpointSystem } from "./systems/waitpointSystem.js";
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
import { workerCatalog } from "./workerCatalog.js";
import { getFinalRunStatuses, isFinalRunStatus } from "./statuses.js";

export class RunEngine {
private runLockRedis: Redis;
Expand All @@ -61,6 +63,7 @@ export class RunEngine {
private heartbeatTimeouts: HeartbeatTimeouts;

prisma: PrismaClient;
readOnlyPrisma: PrismaReplicaClient;
runQueue: RunQueue;
eventBus: EventBus = new EventEmitter<EventBusEvents>();
executionSnapshotSystem: ExecutionSnapshotSystem;
Expand All @@ -79,6 +82,7 @@ export class RunEngine {
constructor(private readonly options: RunEngineOptions) {
this.logger = options.logger ?? new Logger("RunEngine", this.options.logLevel ?? "info");
this.prisma = options.prisma;
this.readOnlyPrisma = options.readOnlyPrisma ?? this.prisma;
this.runLockRedis = createRedisClient(
{
...options.runLock.redis,
Expand Down Expand Up @@ -123,7 +127,7 @@ export class RunEngine {
defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10,
}),
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
logger: new Logger("RunQueue", this.options.logLevel ?? "info"),
logger: new Logger("RunQueue", options.queue?.logLevel ?? "info"),
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
retryOptions: options.queue?.retryOptions,
workerOptions: {
Expand All @@ -133,6 +137,13 @@ export class RunEngine {
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
},
concurrencySweeper: {
scanSchedule: options.queue?.concurrencySweeper?.scanSchedule,
processMarkedSchedule: options.queue?.concurrencySweeper?.processMarkedSchedule,
scanJitter: options.queue?.concurrencySweeper?.scanJitter,
processMarkedJitter: options.queue?.concurrencySweeper?.processMarkedJitter,
callback: this.#concurrencySweeperCallback.bind(this),
},
shardCount: options.queue?.shardCount,
masterQueueConsumersDisabled: options.queue?.masterQueueConsumersDisabled,
masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs,
Expand Down Expand Up @@ -1329,4 +1340,44 @@ export class RunEngine {
}
});
}

async #concurrencySweeperCallback(
runIds: string[]
): Promise<Array<{ id: string; orgId: string }>> {
const runs = await this.readOnlyPrisma.taskRun.findMany({
where: {
id: { in: runIds },
completedAt: {
lte: new Date(Date.now() - 1000 * 60 * 10), // This only finds runs that were completed more than 10 minutes ago
},
organizationId: {
not: null,
},
status: {
in: getFinalRunStatuses(),
},
},
select: {
id: true,
status: true,
organizationId: true,
},
});

// Log the finished runs
for (const run of runs) {
this.logger.info("Concurrency sweeper callback found finished run", {
runId: run.id,
orgId: run.organizationId,
status: run.status,
});
}

return runs
.filter((run) => !!run.organizationId)
.map((run) => ({
id: run.id,
orgId: run.organizationId!,
}));
}
}
26 changes: 15 additions & 11 deletions internal-packages/run-engine/src/engine/statuses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,25 @@ export function isInitialState(status: TaskRunExecutionStatus): boolean {
return startedStatuses.includes(status);
}

export function isFinalRunStatus(status: TaskRunStatus): boolean {
const finalStatuses: TaskRunStatus[] = [
"CANCELED",
"INTERRUPTED",
"COMPLETED_SUCCESSFULLY",
"COMPLETED_WITH_ERRORS",
"SYSTEM_FAILURE",
"CRASHED",
"EXPIRED",
"TIMED_OUT",
];
const finalStatuses: TaskRunStatus[] = [
"CANCELED",
"INTERRUPTED",
"COMPLETED_SUCCESSFULLY",
"COMPLETED_WITH_ERRORS",
"SYSTEM_FAILURE",
"CRASHED",
"EXPIRED",
"TIMED_OUT",
];

export function isFinalRunStatus(status: TaskRunStatus): boolean {
return finalStatuses.includes(status);
}

export function getFinalRunStatuses(): TaskRunStatus[] {
return finalStatuses;
}

export function canReleaseConcurrency(status: TaskRunExecutionStatus): boolean {
const releaseableStatuses: TaskRunExecutionStatus[] = ["SUSPENDED", "EXECUTING_WITH_WAITPOINTS"];
return releaseableStatuses.includes(status);
Expand Down
10 changes: 9 additions & 1 deletion internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
RetryOptions,
RunChainState,
} from "@trigger.dev/core/v3";
import { PrismaClient } from "@trigger.dev/database";
import { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database";
import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelectionStrategy.js";
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
import { workerCatalog } from "./workerCatalog.js";
Expand All @@ -17,6 +17,7 @@ import { LockRetryConfig } from "./locking.js";

export type RunEngineOptions = {
prisma: PrismaClient;
readOnlyPrisma?: PrismaReplicaClient;
worker: {
disabled?: boolean;
redis: RedisOptions;
Expand All @@ -38,11 +39,18 @@ export type RunEngineOptions = {
workerOptions?: WorkerConcurrencyOptions;
retryOptions?: RetryOptions;
defaultEnvConcurrency?: number;
logLevel?: LogLevel;
queueSelectionStrategyOptions?: Pick<
FairQueueSelectionStrategyOptions,
"parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount"
>;
dequeueBlockingTimeoutSeconds?: number;
concurrencySweeper?: {
scanSchedule?: string;
processMarkedSchedule?: string;
scanJitter?: number;
processMarkedJitter?: number;
};
};
runLock: {
redis: RedisOptions;
Expand Down
Loading
Loading