Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,10 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(1000),
RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS: z.coerce.number().int().default(10_000),
RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD: z.coerce.number().int().default(10),
RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT: z.coerce.number().int().default(10),
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE: z.string().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE: z.string().optional(),
RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_JITTER_IN_MS: z.coerce.number().int().optional(),
Expand Down
7 changes: 5 additions & 2 deletions apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ export const action = createActionWorkerApiRoute(
{
body: WorkerApiDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
},
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
return json(await authenticatedWorker.dequeue());
async ({
authenticatedWorker,
runnerId,
}): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
return json(await authenticatedWorker.dequeue({ runnerId }));
}
);
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export const action = createActionWorkerApiRoute(
authenticatedWorker,
body,
params,
runnerId,
}): Promise<TypedResponse<WorkerApiRunAttemptCompleteResponseBody>> => {
const { completion } = body;
const { runFriendlyId, snapshotFriendlyId } = params;
Expand All @@ -26,6 +27,7 @@ export const action = createActionWorkerApiRoute(
runFriendlyId,
snapshotFriendlyId,
completion,
runnerId,
});

return json({ result: completeResult });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ export const action = createActionWorkerApiRoute(
authenticatedWorker,
body,
params,
runnerId,
}): Promise<TypedResponse<WorkerApiRunAttemptStartResponseBody>> => {
const { runFriendlyId, snapshotFriendlyId } = params;

const runExecutionData = await authenticatedWorker.startRunAttempt({
runFriendlyId,
snapshotFriendlyId,
isWarmStart: body.isWarmStart,
runnerId,
});

return json(runExecutionData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export const loader = createLoaderWorkerApiRoute(
async ({
authenticatedWorker,
params,
runnerId,
}): Promise<TypedResponse<WorkerApiContinueRunExecutionRequestBody>> => {
const { runFriendlyId, snapshotFriendlyId } = params;

Expand All @@ -23,6 +24,7 @@ export const loader = createLoaderWorkerApiRoute(
const continuationResult = await authenticatedWorker.continueRunExecution({
runFriendlyId,
snapshotFriendlyId,
runnerId,
});

return json(continuationResult);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ export const action = createActionWorkerApiRoute(
async ({
authenticatedWorker,
params,
runnerId,
}): Promise<TypedResponse<WorkloadHeartbeatResponseBody>> => {
const { runFriendlyId, snapshotFriendlyId } = params;

await authenticatedWorker.heartbeatRun({
runFriendlyId,
snapshotFriendlyId,
runnerId,
});

return json({ ok: true });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const action = createActionWorkerApiRoute(
authenticatedWorker,
params,
body,
runnerId,
}): Promise<TypedResponse<WorkerApiSuspendRunResponseBody>> => {
const { runFriendlyId, snapshotFriendlyId } = params;

Expand All @@ -39,6 +40,7 @@ export const action = createActionWorkerApiRoute(
runFriendlyId,
snapshotFriendlyId,
checkpoint: body.checkpoint,
runnerId,
});

return json({ ok: true });
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
WorkerGroupTokenService,
} from "~/v3/services/worker/workerGroupTokenService.server";
import { API_VERSIONS, getApiVersion } from "~/api/versions";
import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker";

Comment on lines +25 to 26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm subpath export for WORKER_HEADERS.

Ensure "@trigger.dev/core/v3/runEngineWorker" re-exports WORKER_HEADERS. If not, import from the consts file.

Run:


🏁 Script executed:

#!/bin/bash
rg -nP 'export\s+const\s+WORKER_HEADERS' packages/core/src -n -C2
rg -n '@trigger.dev/core/v3/runEngineWorker' packages/core -n || true

Length of output: 417


Replace import to use the consts subpath for WORKER_HEADERS

The WORKER_HEADERS constant lives in packages/core/src/v3/runEngineWorker/consts.ts and is not re-exported at the module root. Please update the import to pull directly from the consts file:

• File: apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Lines: 25–26

- import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker";
+ import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker/consts";

This ensures you’re importing the actual export location and avoids a broken module resolution.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker";
import { WORKER_HEADERS } from "@trigger.dev/core/v3/runEngineWorker/consts";
🤖 Prompt for AI Agents
In apps/webapp/app/services/routeBuilders/apiBuilder.server.ts around lines
25–26, the import for WORKER_HEADERS currently points to
"@trigger.dev/core/v3/runEngineWorker" which does not re-export that constant;
update the import to reference the actual file path
"packages/core/src/v3/runEngineWorker/consts" (or the package-relative subpath
that maps to runEngineWorker/consts) so WORKER_HEADERS is imported directly from
its defining module; replace the existing import statement with one that imports
WORKER_HEADERS from the consts subpath and ensure any import path uses the
package's published subpath form (e.g.
"@trigger.dev/core/v3/runEngineWorker/consts") to avoid broken module
resolution.

type AnyZodSchema = z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;

Expand Down Expand Up @@ -795,6 +796,7 @@ type WorkerLoaderHandlerFunction<
headers: THeadersSchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>
? z.infer<THeadersSchema>
: undefined;
runnerId?: string;
}) => Promise<Response>;

export function createLoaderWorkerApiRoute<
Expand Down Expand Up @@ -858,12 +860,15 @@ export function createLoaderWorkerApiRoute<
parsedHeaders = headers.data;
}

const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined;

const result = await handler({
params: parsedParams,
searchParams: parsedSearchParams,
authenticatedWorker: authenticationResult,
request,
headers: parsedHeaders,
runnerId,
});
return result;
} catch (error) {
Expand Down Expand Up @@ -924,6 +929,7 @@ type WorkerActionHandlerFunction<
body: TBodySchema extends z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>
? z.infer<TBodySchema>
: undefined;
runnerId?: string;
}) => Promise<Response>;

export function createActionWorkerApiRoute<
Expand Down Expand Up @@ -1021,13 +1027,16 @@ export function createActionWorkerApiRoute<
parsedBody = parsed.data;
}

const runnerId = request.headers.get(WORKER_HEADERS.RUNNER_ID) ?? undefined;

const result = await handler({
params: parsedParams,
searchParams: parsedSearchParams,
authenticatedWorker: authenticationResult,
request,
body: parsedBody,
headers: parsedHeaders,
runnerId,
});
return result;
} catch (error) {
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ function createRunEngine() {
dequeueBlockingTimeoutSeconds: env.RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS,
masterQueueConsumersIntervalMs: env.RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS,
masterQueueConsumersDisabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
concurrencySweeper: {
Comment on lines +70 to 73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Defensive clamping of queue controls at runtime

Add guards so a bad env can’t zero-out dequeues or set a 0ms cooloff.

-      masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
-      masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
-      masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
+      masterQueueCooloffPeriodMs: Math.max(100, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS),
+      masterQueueCooloffCountThreshold: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD),
+      masterQueueConsumerDequeueCount: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
masterQueueCooloffPeriodMs: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS,
masterQueueCooloffCountThreshold: env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD,
masterQueueConsumerDequeueCount: env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT,
concurrencySweeper: {
masterQueueCooloffPeriodMs: Math.max(100, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_PERIOD_MS),
masterQueueCooloffCountThreshold: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_COOLOFF_COUNT_THRESHOLD),
masterQueueConsumerDequeueCount: Math.max(1, env.RUN_ENGINE_MASTER_QUEUE_CONSUMER_DEQUEUE_COUNT),
concurrencySweeper: {
🤖 Prompt for AI Agents
In apps/webapp/app/v3/runEngine.server.ts around lines 70-73, the runtime
options taken directly from env can be set to zero or invalid values; parse the
relevant env vars to numbers, apply sane minimums and fallbacks, and clamp them
before assigning (e.g., masterQueueCooloffPeriodMs = Math.max(parsedValue ||
DEFAULT_MS, MIN_MS), masterQueueCooloffCountThreshold = Math.max(parsedValue ||
DEFAULT_THRESHOLD, 1), masterQueueConsumerDequeueCount = Math.max(parsedValue ||
DEFAULT_DEQUEUE_COUNT, 1)); ensure non-numeric inputs fall back to defaults and
that cooloff ms cannot be 0 (use a small positive min like 1 or a more
conservative min such as 50ms).

scanSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_SCAN_SCHEDULE,
processMarkedSchedule: env.RUN_ENGINE_CONCURRENCY_SWEEPER_PROCESS_MARKED_SCHEDULE,
Expand Down
Loading
Loading