Skip to content

Commit c365884

Browse files
committed
v4: current concurrency sweeper
1 parent 8a86e7b commit c365884

File tree

10 files changed

+775
-57
lines changed

10 files changed

+775
-57
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,11 @@ const EnvironmentSchema = z.object({
428428
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
429429
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
430430
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
431+
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_INTERVAL_MS: z.coerce.number().int().default(60_000),
432+
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_INTERVAL_MS: z.coerce.number().int().default(5_000),
433+
RUN_ENGINE_CONCURRENCY_SWEEPER_LOG_LEVEL: z
434+
.enum(["log", "error", "warn", "info", "debug"])
435+
.default("info"),
431436

432437
RUN_ENGINE_RUN_LOCK_DURATION: z.coerce.number().int().default(5000),
433438
RUN_ENGINE_RUN_LOCK_AUTOMATIC_EXTENSION_THRESHOLD: z.coerce.number().int().default(1000),
@@ -593,6 +598,7 @@ const EnvironmentSchema = z.object({
593598

594599
RUN_ENGINE_WORKER_ENABLED: z.string().default("1"),
595600
RUN_ENGINE_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
601+
RUN_ENGINE_RUN_QUEUE_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
596602

597603
/** How long should the presence ttl last */
598604
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { RunEngine } from "@internal/run-engine";
2-
import { defaultMachine } from "~/services/platform.v3.server";
3-
import { prisma } from "~/db.server";
2+
import { $replica, prisma } from "~/db.server";
43
import { env } from "~/env.server";
4+
import { defaultMachine } from "~/services/platform.v3.server";
55
import { singleton } from "~/utils/singleton";
66
import { allMachines } from "./machinePresets.server";
7-
import { tracer, meter } from "./tracer.server";
7+
import { meter, tracer } from "./tracer.server";
88

99
export const engine = singleton("RunEngine", createRunEngine);
1010

@@ -13,6 +13,7 @@ export type { RunEngine };
1313
function createRunEngine() {
1414
const engine = new RunEngine({
1515
prisma,
16+
readOnlyPrisma: $replica,
1617
logLevel: env.RUN_ENGINE_WORKER_LOG_LEVEL,
1718
worker: {
1819
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
@@ -39,6 +40,7 @@ function createRunEngine() {
3940
},
4041
queue: {
4142
defaultEnvConcurrency: env.DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT,
43+
logLevel: env.RUN_ENGINE_RUN_QUEUE_LOG_LEVEL,
4244
redis: {
4345
keyPrefix: "engine:",
4446
port: env.RUN_ENGINE_RUN_QUEUE_REDIS_PORT ?? undefined,
@@ -64,6 +66,11 @@ function createRunEngine() {
6466
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
6567
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
6668
masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
69+
concurrencySweeper: {
70+
scanIntervalMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_INTERVAL_MS,
71+
processMarkedIntervalMs: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_INTERVAL_MS,
72+
logLevel: env.RUN_ENGINE_CONCURRENCY_SWEEPER_LOG_LEVEL,
73+
},
6774
},
6875
runLock: {
6976
redis: {

internal-packages/run-engine/src/engine/index.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
Prisma,
1717
PrismaClient,
1818
PrismaClientOrTransaction,
19+
PrismaReplicaClient,
1920
TaskRun,
2021
TaskRunExecutionSnapshot,
2122
Waitpoint,
@@ -50,6 +51,7 @@ import { TtlSystem } from "./systems/ttlSystem.js";
5051
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5152
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5253
import { workerCatalog } from "./workerCatalog.js";
54+
import { getFinalRunStatuses, isFinalRunStatus } from "./statuses.js";
5355

5456
export class RunEngine {
5557
private runLockRedis: Redis;
@@ -61,6 +63,7 @@ export class RunEngine {
6163
private heartbeatTimeouts: HeartbeatTimeouts;
6264

6365
prisma: PrismaClient;
66+
readOnlyPrisma: PrismaReplicaClient;
6467
runQueue: RunQueue;
6568
eventBus: EventBus = new EventEmitter<EventBusEvents>();
6669
executionSnapshotSystem: ExecutionSnapshotSystem;
@@ -79,6 +82,7 @@ export class RunEngine {
7982
constructor(private readonly options: RunEngineOptions) {
8083
this.logger = options.logger ?? new Logger("RunEngine", this.options.logLevel ?? "info");
8184
this.prisma = options.prisma;
85+
this.readOnlyPrisma = options.readOnlyPrisma ?? this.prisma;
8286
this.runLockRedis = createRedisClient(
8387
{
8488
...options.runLock.redis,
@@ -123,7 +127,7 @@ export class RunEngine {
123127
defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10,
124128
}),
125129
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
126-
logger: new Logger("RunQueue", this.options.logLevel ?? "info"),
130+
logger: new Logger("RunQueue", options.queue?.logLevel ?? "info"),
127131
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
128132
retryOptions: options.queue?.retryOptions,
129133
workerOptions: {
@@ -133,6 +137,14 @@ export class RunEngine {
133137
immediatePollIntervalMs: options.worker.immediatePollIntervalMs,
134138
shutdownTimeoutMs: options.worker.shutdownTimeoutMs,
135139
},
140+
concurrencySweeper: {
141+
enabled: !options.worker.disabled,
142+
scanIntervalMs: options.queue?.concurrencySweeper?.scanIntervalMs ?? 60_000,
143+
processMarkedIntervalMs:
144+
options.queue?.concurrencySweeper?.processMarkedIntervalMs ?? 5_000,
145+
logLevel: options.queue?.concurrencySweeper?.logLevel ?? options.queue?.logLevel,
146+
callback: this.#concurrencySweeperCallback.bind(this),
147+
},
136148
shardCount: options.queue?.shardCount,
137149
masterQueueConsumersDisabled: options.queue?.masterQueueConsumersDisabled,
138150
masterQueueConsumersIntervalMs: options.queue?.masterQueueConsumersIntervalMs,
@@ -1329,4 +1341,44 @@ export class RunEngine {
13291341
}
13301342
});
13311343
}
1344+
1345+
async #concurrencySweeperCallback(
1346+
runIds: string[]
1347+
): Promise<Array<{ id: string; orgId: string }>> {
1348+
const runs = await this.readOnlyPrisma.taskRun.findMany({
1349+
where: {
1350+
id: { in: runIds },
1351+
completedAt: {
1352+
lte: new Date(Date.now() - 1000 * 60 * 10), // This only finds runs that were completed more than 10 minutes ago
1353+
},
1354+
organizationId: {
1355+
not: null,
1356+
},
1357+
status: {
1358+
in: getFinalRunStatuses(),
1359+
},
1360+
},
1361+
select: {
1362+
id: true,
1363+
status: true,
1364+
organizationId: true,
1365+
},
1366+
});
1367+
1368+
// Log the finished runs
1369+
for (const run of runs) {
1370+
this.logger.info("Concurrency sweeper callback found finished run", {
1371+
runId: run.id,
1372+
orgId: run.organizationId,
1373+
status: run.status,
1374+
});
1375+
}
1376+
1377+
return runs
1378+
.filter((run) => !!run.organizationId)
1379+
.map((run) => ({
1380+
id: run.id,
1381+
orgId: run.organizationId!,
1382+
}));
1383+
}
13321384
}

internal-packages/run-engine/src/engine/statuses.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,25 @@ export function isInitialState(status: TaskRunExecutionStatus): boolean {
4141
return startedStatuses.includes(status);
4242
}
4343

44-
export function isFinalRunStatus(status: TaskRunStatus): boolean {
45-
const finalStatuses: TaskRunStatus[] = [
46-
"CANCELED",
47-
"INTERRUPTED",
48-
"COMPLETED_SUCCESSFULLY",
49-
"COMPLETED_WITH_ERRORS",
50-
"SYSTEM_FAILURE",
51-
"CRASHED",
52-
"EXPIRED",
53-
"TIMED_OUT",
54-
];
44+
const finalStatuses: TaskRunStatus[] = [
45+
"CANCELED",
46+
"INTERRUPTED",
47+
"COMPLETED_SUCCESSFULLY",
48+
"COMPLETED_WITH_ERRORS",
49+
"SYSTEM_FAILURE",
50+
"CRASHED",
51+
"EXPIRED",
52+
"TIMED_OUT",
53+
];
5554

55+
export function isFinalRunStatus(status: TaskRunStatus): boolean {
5656
return finalStatuses.includes(status);
5757
}
5858

59+
export function getFinalRunStatuses(): TaskRunStatus[] {
60+
return finalStatuses;
61+
}
62+
5963
export function canReleaseConcurrency(status: TaskRunExecutionStatus): boolean {
6064
const releaseableStatuses: TaskRunExecutionStatus[] = ["SUSPENDED", "EXECUTING_WITH_WAITPOINTS"];
6165
return releaseableStatuses.includes(status);

internal-packages/run-engine/src/engine/types.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
RetryOptions,
99
RunChainState,
1010
} from "@trigger.dev/core/v3";
11-
import { PrismaClient } from "@trigger.dev/database";
11+
import { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database";
1212
import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelectionStrategy.js";
1313
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
1414
import { workerCatalog } from "./workerCatalog.js";
@@ -17,6 +17,7 @@ import { LockRetryConfig } from "./locking.js";
1717

1818
export type RunEngineOptions = {
1919
prisma: PrismaClient;
20+
readOnlyPrisma?: PrismaReplicaClient;
2021
worker: {
2122
disabled?: boolean;
2223
redis: RedisOptions;
@@ -38,11 +39,17 @@ export type RunEngineOptions = {
3839
workerOptions?: WorkerConcurrencyOptions;
3940
retryOptions?: RetryOptions;
4041
defaultEnvConcurrency?: number;
42+
logLevel?: LogLevel;
4143
queueSelectionStrategyOptions?: Pick<
4244
FairQueueSelectionStrategyOptions,
4345
"parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount"
4446
>;
4547
dequeueBlockingTimeoutSeconds?: number;
48+
concurrencySweeper?: {
49+
scanIntervalMs?: number;
50+
processMarkedIntervalMs?: number;
51+
logLevel?: LogLevel;
52+
};
4653
};
4754
runLock: {
4855
redis: RedisOptions;

0 commit comments

Comments
 (0)