Skip to content

Commit 82401ec

Browse files
authored
fix(run-engine): dequeue performance improvements and improved telemetry (#2446)
* more eager dequeuing, queue cooloff periods, return workerQueueLength when dequeueing * Cache worker group authentication and remove old self-hosted worker code (only managed is currently supported) * add additional spans during dequeue * Add env vars and additional spans
1 parent 24a9151 commit 82401ec

20 files changed

+1090
-886
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,10 @@ const EnvironmentSchema = z.object({
501501
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
502502
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
503503
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
504-
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
504+
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
505+
RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000),
506+
RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
507+
RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10),
505508
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE: z.string().optional(),
506509
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE: z.string().optional(),
507510
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),

apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ export const action = createActionWorkerApiRoute(
99
{
1010
body: WorkerApiDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
1111
},
12-
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
13-
return json(await authenticatedWorker.dequeue());
12+
async ({
13+
authenticatedWorker,
14+
runnerId,
15+
}): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
16+
return json(await authenticatedWorker.dequeue({ runnerId }));
1417
}
1518
);

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.complete.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export const action = createActionWorkerApiRoute(
1818
authenticatedWorker,
1919
body,
2020
params,
21+
runnerId,
2122
}): Promise<TypedResponse<WorkerApiRunAttemptCompleteResponseBody>> => {
2223
const { completion } = body;
2324
const { runFriendlyId, snapshotFriendlyId } = params;
@@ -26,6 +27,7 @@ export const action = createActionWorkerApiRoute(
2627
runFriendlyId,
2728
snapshotFriendlyId,
2829
completion,
30+
runnerId,
2931
});
3032

3133
return json({ result: completeResult });

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.attempts.start.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ export const action = createActionWorkerApiRoute(
1818
authenticatedWorker,
1919
body,
2020
params,
21+
runnerId,
2122
}): Promise<TypedResponse<WorkerApiRunAttemptStartResponseBody>> => {
2223
const { runFriendlyId, snapshotFriendlyId } = params;
2324

2425
const runExecutionData = await authenticatedWorker.startRunAttempt({
2526
runFriendlyId,
2627
snapshotFriendlyId,
2728
isWarmStart: body.isWarmStart,
29+
runnerId,
2830
});
2931

3032
return json(runExecutionData);

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export const loader = createLoaderWorkerApiRoute(
1414
async ({
1515
authenticatedWorker,
1616
params,
17+
runnerId,
1718
}): Promise<TypedResponse<WorkerApiContinueRunExecutionRequestBody>> => {
1819
const { runFriendlyId, snapshotFriendlyId } = params;
1920

@@ -23,6 +24,7 @@ export const loader = createLoaderWorkerApiRoute(
2324
const continuationResult = await authenticatedWorker.continueRunExecution({
2425
runFriendlyId,
2526
snapshotFriendlyId,
27+
runnerId,
2628
});
2729

2830
return json(continuationResult);

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.heartbeat.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ export const action = createActionWorkerApiRoute(
1313
async ({
1414
authenticatedWorker,
1515
params,
16+
runnerId,
1617
}): Promise<TypedResponse<WorkloadHeartbeatResponseBody>> => {
1718
const { runFriendlyId, snapshotFriendlyId } = params;
1819

1920
await authenticatedWorker.heartbeatRun({
2021
runFriendlyId,
2122
snapshotFriendlyId,
23+
runnerId,
2224
});
2325

2426
return json({ ok: true });

apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.suspend.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export const action = createActionWorkerApiRoute(
1919
authenticatedWorker,
2020
params,
2121
body,
22+
runnerId,
2223
}): Promise<TypedResponse<WorkerApiSuspendRunResponseBody>> => {
2324
const { runFriendlyId, snapshotFriendlyId } = params;
2425

@@ -39,6 +40,7 @@ export const action = createActionWorkerApiRoute(
3940
runFriendlyId,
4041
snapshotFriendlyId,
4142
checkpoint: body.checkpoint,
43+
runnerId,
4244
});
4345

4446
return json({ ok: true });

apps/webapp/app/services/routeBuilders/apiBuilder.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
WorkerGroupTokenService,
2323
} from "~/v3/services/worker/workerGroupTokenService.server";
2424
import { API_VERSIONS, getApiVersion } from "~/api/versions";
25+
import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker";
2526

2627
type AnyZodSchema = z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
2728

@@ -795,6 +796,7 @@ type WorkerLoaderHandlerFunction<
795796
headers: THeadersSchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>
796797
? z.infer<THeadersSchema>
797798
: undefined;
799+
runnerId?: string;
798800
}) => Promise<Response>;
799801

800802
export function createLoaderWorkerApiRoute<
@@ -858,12 +860,15 @@ export function createLoaderWorkerApiRoute<
858860
parsedHeaders = headers.data;
859861
}
860862

863+
const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined;
864+
861865
const result = await handler({
862866
params: parsedParams,
863867
searchParams: parsedSearchParams,
864868
authenticatedWorker: authenticationResult,
865869
request,
866870
headers: parsedHeaders,
871+
runnerId,
867872
});
868873
return result;
869874
} catch (error) {
@@ -924,6 +929,7 @@ type WorkerActionHandlerFunction<
924929
body: TBodySchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>
925930
? z.infer<TBodySchema>
926931
: undefined;
932+
runnerId?: string;
927933
}) => Promise<Response>;
928934

929935
export function createActionWorkerApiRoute<
@@ -1021,13 +1027,16 @@ export function createActionWorkerApiRoute<
10211027
parsedBody = parsed.data;
10221028
}
10231029

1030+
const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined;
1031+
10241032
const result = await handler({
10251033
params: parsedParams,
10261034
searchParams: parsedSearchParams,
10271035
authenticatedWorker: authenticationResult,
10281036
request,
10291037
body: parsedBody,
10301038
headers: parsedHeaders,
1039+
runnerId,
10311040
});
10321041
return result;
10331042
} catch (error) {

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ function createRunEngine() {
6767
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
6868
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
6969
masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
70+
masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
71+
masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
72+
masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
7073
concurrencySweeper: {
7174
scanSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE,
7275
processMarkedSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE,

0 commit comments

Comments
 (0)