Skip to content

Commit 1b4dabb

Browse files
authored
v4: current concurrency sweeper (#2206)
* v4: current concurrency sweeper * Fix webapp tests * Ensure only a single instance performs concurrency sweeping by using redis-worker cron jobs * Improved the mark phase * Ensure cron jobs get rescheduled even if the handler throws an error * Better property names
1 parent aa24a9a commit 1b4dabb

File tree

15 files changed

+986
-61
lines changed

15 files changed

+986
-61
lines changed

apps/webapp/app/env.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,10 @@ const EnvironmentSchema = z.object({
428428
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
429429
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
430430
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
431+
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE: z.string().optional(),
432+
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE: z.string().optional(),
433+
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
434+
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS: z.coerce.number().int().optional(),
431435

432436
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
433437
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
@@ -593,6 +597,7 @@ const EnvironmentSchema = z.object({
593597

594598
RUN_ENGINE_WORKER_ENABLED: z.string().default("1"),
595599
RUN_ENGINE_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
600+
RUN_ENGINE_RUN_QUEUE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
596601

597602
/** How long should the presence ttl last */
598603
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { RunEngine } from "@internal/run-engine";
2-
import { defaultMachine } from "~/services/platform.v3.server";
3-
import { prisma } from "~/db.server";
2+
import { $replica, prisma } from "~/db.server";
43
import { env } from "~/env.server";
4+
import { defaultMachine } from "~/services/platform.v3.server";
55
import { singleton } from "~/utils/singleton";
66
import { allMachines } from "./machinePresets.server";
7-
import { tracer, meter } from "./tracer.server";
7+
import { meter, tracer } from "./tracer.server";
88

99
export const engine = singleton("RunEngine", createRunEngine);
1010

@@ -13,6 +13,7 @@ export type { RunEngine };
1313
function createRunEngine() {
1414
const engine = new RunEngine({
1515
prisma,
16+
readOnlyPrisma: $replica,
1617
logLevel: env.RUN_ENGINE_WORKER_LOG_LEVEL,
1718
worker: {
1819
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
@@ -39,6 +40,7 @@ function createRunEngine() {
3940
},
4041
queue: {
4142
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
43+
logLevel: env.RUN_ENGINE_RUN_QUEUE_LOG_LEVEL,
4244
redis: {
4345
keyPrefix: "engine:",
4446
port: env.RUN_ENGINE_RUN_QUEUE_REDIS_PORT ?? undefined,
@@ -64,6 +66,12 @@ function createRunEngine() {
6466
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
6567
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
6668
masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
69+
concurrencySweeper: {
70+
scanSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE,
71+
processMarkedSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE,
72+
scanJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS,
73+
processMarkedJitterInMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_JITTER_IN_MS,
74+
},
6775
},
6876
runLock: {
6977
redis: {

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { describe, expect, vi } from "vitest";
33
// Mock the db prisma client
44
vi.mock("~/db.server", () => ({
55
prisma: {},
6+
$replica: {},
67
}));
78

89
vi.mock("~/services/platform.v3.server", async (importOriginal) => {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
Prisma,
1717
PrismaClient,
1818
PrismaClientOrTransaction,
19+
PrismaReplicaClient,
1920
TaskRun,
2021
TaskRunExecutionSnapshot,
2122
Waitpoint,
@@ -50,6 +51,7 @@ import { TtlSystem } from "./systems/ttlSystem.js";
5051
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5152
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5253
import { workerCatalog } from "./workerCatalog.js";
54+
import { getFinalRunStatuses, isFinalRunStatus } from "./statuses.js";
5355

5456
export class RunEngine {
5557
private runLockRedis: Redis;
@@ -61,6 +63,7 @@ export class RunEngine {
6163
private heartbeatTimeouts: HeartbeatTimeouts;
6264

6365
prisma: PrismaClient;
66+
readOnlyPrisma: PrismaReplicaClient;
6467
runQueue: RunQueue;
6568
eventBus: EventBus = new EventEmitter<EventBusEvents>();
6669
executionSnapshotSystem: ExecutionSnapshotSystem;
@@ -79,6 +82,7 @@ export class RunEngine {
7982
constructor(private readonly options: RunEngineOptions) {
8083
this.logger = options.logger ?? new Logger("RunEngine", this.options.logLevel ?? "info");
8184
this.prisma = options.prisma;
85+
this.readOnlyPrisma = options.readOnlyPrisma ?? this.prisma;
8286
this.runLockRedis = createRedisClient(
8387
{
8488
...options.runLock.redis,
@@ -123,7 +127,7 @@ export class RunEngine {
123127
defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10,
124128
}),
125129
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
126-
logger: new Logger("RunQueue", this.options.logLevel ?? "info"),
130+
logger: new Logger("RunQueue", options.queue?.logLevel ?? "info"),
127131
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
128132
retryOptions: options.queue?.retryOptions,
129133
workerOptions: {
@@ -133,6 +137,13 @@ export class RunEngine {
133137
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
134138
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
135139
},
140+
concurrencySweeper: {
141+
scanSchedule: options.queue?.concurrencySweeper?.scanSchedule,
142+
processMarkedSchedule: options.queue?.concurrencySweeper?.processMarkedSchedule,
143+
scanJitterInMs: options.queue?.concurrencySweeper?.scanJitterInMs,
144+
processMarkedJitterInMs: options.queue?.concurrencySweeper?.processMarkedJitterInMs,
145+
callback: this.#concurrencySweeperCallback.bind(this),
146+
},
136147
shardCount: options.queue?.shardCount,
137148
masterQueueConsumersDisabled: options.queue?.masterQueueConsumersDisabled,
138149
masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs,
@@ -1329,4 +1340,44 @@ export class RunEngine {
13291340
}
13301341
});
13311342
}
1343+
1344+
async #concurrencySweeperCallback(
1345+
runIds: string[]
1346+
): Promise<Array<{ id: string; orgId: string }>> {
1347+
const runs = await this.readOnlyPrisma.taskRun.findMany({
1348+
where: {
1349+
id: { in: runIds },
1350+
completedAt: {
1351+
lte: new Date(Date.now() - 1000 * 60 * 10), // This only finds runs that were completed more than 10 minutes ago
1352+
},
1353+
organizationId: {
1354+
not: null,
1355+
},
1356+
status: {
1357+
in: getFinalRunStatuses(),
1358+
},
1359+
},
1360+
select: {
1361+
id: true,
1362+
status: true,
1363+
organizationId: true,
1364+
},
1365+
});
1366+
1367+
// Log the finished runs
1368+
for (const run of runs) {
1369+
this.logger.info("Concurrency sweeper callback found finished run", {
1370+
runId: run.id,
1371+
orgId: run.organizationId,
1372+
status: run.status,
1373+
});
1374+
}
1375+
1376+
return runs
1377+
.filter((run) => !!run.organizationId)
1378+
.map((run) => ({
1379+
id: run.id,
1380+
orgId: run.organizationId!,
1381+
}));
1382+
}
13321383
}

internal-packages/run-engine/src/engine/statuses.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,25 @@ export function isInitialState(status: TaskRunExecutionStatus): boolean {
4141
return startedStatuses.includes(status);
4242
}
4343

44-
export function isFinalRunStatus(status: TaskRunStatus): boolean {
45-
const finalStatuses: TaskRunStatus[] = [
46-
"CANCELED",
47-
"INTERRUPTED",
48-
"COMPLETED_SUCCESSFULLY",
49-
"COMPLETED_WITH_ERRORS",
50-
"SYSTEM_FAILURE",
51-
"CRASHED",
52-
"EXPIRED",
53-
"TIMED_OUT",
54-
];
44+
const finalStatuses: TaskRunStatus[] = [
45+
"CANCELED",
46+
"INTERRUPTED",
47+
"COMPLETED_SUCCESSFULLY",
48+
"COMPLETED_WITH_ERRORS",
49+
"SYSTEM_FAILURE",
50+
"CRASHED",
51+
"EXPIRED",
52+
"TIMED_OUT",
53+
];
5554

55+
export function isFinalRunStatus(status: TaskRunStatus): boolean {
5656
return finalStatuses.includes(status);
5757
}
5858

59+
export function getFinalRunStatuses(): TaskRunStatus[] {
60+
return finalStatuses;
61+
}
62+
5963
export function canReleaseConcurrency(status: TaskRunExecutionStatus): boolean {
6064
const releaseableStatuses: TaskRunExecutionStatus[] = ["SUSPENDED", "EXECUTING_WITH_WAITPOINTS"];
6165
return releaseableStatuses.includes(status);

internal-packages/run-engine/src/engine/types.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
RetryOptions,
99
RunChainState,
1010
} from "@trigger.dev/core/v3";
11-
import { PrismaClient } from "@trigger.dev/database";
11+
import { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database";
1212
import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelectionStrategy.js";
1313
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
1414
import { workerCatalog } from "./workerCatalog.js";
@@ -17,6 +17,7 @@ import { LockRetryConfig } from "./locking.js";
1717

1818
export type RunEngineOptions = {
1919
prisma: PrismaClient;
20+
readOnlyPrisma?: PrismaReplicaClient;
2021
worker: {
2122
disabled?: boolean;
2223
redis: RedisOptions;
@@ -38,11 +39,18 @@ export type RunEngineOptions = {
3839
workerOptions?: WorkerConcurrencyOptions;
3940
retryOptions?: RetryOptions;
4041
defaultEnvConcurrency?: number;
42+
logLevel?: LogLevel;
4143
queueSelectionStrategyOptions?: Pick<
4244
FairQueueSelectionStrategyOptions,
4345
"parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount"
4446
>;
4547
dequeueBlockingTimeoutSeconds?: number;
48+
concurrencySweeper?: {
49+
scanSchedule?: string;
50+
processMarkedSchedule?: string;
51+
scanJitterInMs?: number;
52+
processMarkedJitterInMs?: number;
53+
};
4654
};
4755
runLock: {
4856
redis: RedisOptions;

0 commit comments

Comments
 (0)