Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d8352f9
WIP
ericallam May 28, 2025
9cec702
Run queue now works with the worker queue / master queue split
ericallam May 29, 2025
9204ba0
Acking should also cause the master queue to be processed
ericallam May 29, 2025
0ca8b0a
Convert run engine tests and run engine to use runQueue changes
ericallam May 29, 2025
9193e53
Include the util files in the test tsconfig
ericallam May 29, 2025
a38f8e5
coordinator target should be es2020 as well
ericallam May 29, 2025
6d5c8d2
providers target 2020
ericallam Jun 2, 2025
a25bae2
Fix the triggerTask tests in the webapp
ericallam Jun 2, 2025
f5d54b0
v4 now working with the new worker queues, and added the legacy maste…
ericallam Jun 2, 2025
a2aa2c8
report worker queue lengths via opentelemetry metrics
ericallam Jun 3, 2025
e2d7c13
Adding lock metrics
ericallam Jun 3, 2025
ff2768c
Release concurrency bucket metrics
ericallam Jun 3, 2025
57ee381
• Updated RunQueue.removeEnvironmentQueuesFromMasterQueue() method si…
ericallam Jun 3, 2025
2e0ead9
metrics now working, configure the run queue settings, additional met…
ericallam Jun 4, 2025
5fd0efd
Fix CodeRabbit suggestions
ericallam Jun 4, 2025
5b78b5d
return undefined from dequeueFromWorkerQueue, not null
ericallam Jun 4, 2025
01f62df
Remove message from worker queue in certain circumstances when acking
ericallam Jun 4, 2025
95a522b
Update log
ericallam Jun 4, 2025
be82a67
Ensure master queue consumers cannot stop from a processing error, an…
ericallam Jun 4, 2025
4b073b2
Change how the run queue master queue consumers are disabled internally
ericallam Jun 4, 2025
d5cfa24
Fixed tests
ericallam Jun 4, 2025
9037d3c
process the queue on nack
ericallam Jun 4, 2025
79e7a15
Fix more tests
ericallam Jun 4, 2025
7387c0e
Fix priority tests
ericallam Jun 4, 2025
54f06f5
Fixed dequeueing test
ericallam Jun 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
"type": "node-terminal",
"request": "launch",
"name": "Debug RunQueue tests",
"command": "pnpm run test ./src/run-queue/index.test.ts",
"command": "pnpm run test ./src/run-queue/index.test.ts --run",
"cwd": "${workspaceFolder}/internal-packages/run-engine",
"sourceMaps": true
},
Expand Down
2 changes: 1 addition & 1 deletion apps/coordinator/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2018",
"target": "es2020",
"module": "commonjs",
"esModuleInterop": true,
"resolveJsonModule": true,
Expand Down
2 changes: 1 addition & 1 deletion apps/docker-provider/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2018",
"target": "es2020",
"module": "commonjs",
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
Expand Down
2 changes: 1 addition & 1 deletion apps/kubernetes-provider/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2018",
"target": "es2020",
"module": "commonjs",
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ const EnvironmentSchema = z.object({
INTERNAL_OTEL_TRACE_DISABLED: z.string().default("0"),

INTERNAL_OTEL_LOG_EXPORTER_URL: z.string().optional(),
INTERNAL_OTEL_METRIC_EXPORTER_URL: z.string().optional(),
INTERNAL_OTEL_METRIC_EXPORTER_AUTH_HEADERS: z.string().optional(),
INTERNAL_OTEL_METRIC_EXPORTER_ENABLED: z.string().default("0"),
INTERNAL_OTEL_METRIC_EXPORTER_INTERVAL_MS: z.coerce.number().int().default(30_000),

ORG_SLACK_INTEGRATION_CLIENT_ID: z.string().optional(),
ORG_SLACK_INTEGRATION_CLIENT_SECRET: z.string().optional(),
Expand Down Expand Up @@ -460,8 +464,12 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
RUN_ENGINE_RUN_QUEUE_SHARD_COUNT: z.coerce.number().int().default(4),
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000),
RUN_ENGINE_PROCESS_WORKER_QUEUE_DEBOUNCE_MS: z.coerce.number().int().default(200),
RUN_ENGINE_DEQUEUE_BLOCKING_TIMEOUT_SECONDS: z.coerce.number().int().default(10),
RUN_ENGINE_MASTER_QUEUE_CONSUMERS_INTERVAL_MS: z.coerce.number().int().default(500),

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
Expand Down Expand Up @@ -617,6 +625,7 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE: z.coerce.number().int().default(10),

RUN_ENGINE_WORKER_ENABLED: z.string().default("1"),
RUN_ENGINE_WORKER_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),

/** How long should the presence ttl last */
DEV_PRESENCE_SSE_TIMEOUT: z.coerce.number().int().default(30_000),
Expand Down
6 changes: 2 additions & 4 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ export class SpanPresenter extends BasePresenter {
},
},
engine: true,
masterQueue: true,
secondaryMasterQueue: true,
workerQueue: true,
error: true,
output: true,
outputType: true,
Expand Down Expand Up @@ -364,8 +363,7 @@ export class SpanPresenter extends BasePresenter {
maxDurationInSeconds: getMaxDuration(run.maxDurationInSeconds),
batch: run.batch ? { friendlyId: run.batch.friendlyId } : undefined,
engine: run.engine,
masterQueue: run.masterQueue,
secondaryMasterQueue: run.secondaryMasterQueue,
workerQueue: run.workerQueue,
spanId: run.spanId,
isCached: !!span.originalRun,
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { prisma } from "~/db.server";
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
import { engine } from "~/v3/runEngine.server";

export async function action({ request }: ActionFunctionArgs) {
// Next authenticate the request
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

const user = await prisma.user.findUnique({
where: {
id: authenticationResult.userId,
},
});

if (!user) {
return json({ error: "Invalid or Missing API key" }, { status: 401 });
}

if (!user.admin) {
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
}

try {
await engine.migrateLegacyMasterQueues();

return json({
success: true,
});
} catch (error) {
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
}
}
84 changes: 7 additions & 77 deletions apps/webapp/app/routes/engine.v1.dev.dequeue.ts
Original file line number Diff line number Diff line change
@@ -1,92 +1,22 @@
import { json } from "@remix-run/server-runtime";
import { DequeuedMessage, DevDequeueRequestBody, MachineResources } from "@trigger.dev/core/v3";
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
import { env } from "~/env.server";
import { DevDequeueRequestBody } from "@trigger.dev/core/v3";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { engine } from "~/v3/runEngine.server";

const { action } = createActionApiRoute(
{
body: DevDequeueRequestBody,
body: DevDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
maxContentLength: 1024 * 10, // 10KB
method: "POST",
},
async ({ authentication, body }) => {
//we won't return more runs than this in one API call
let maxDequeueCount = env.DEV_DEQUEUE_MAX_RUNS_PER_PULL;

//we can't use more than the max resources
const availableResources = body.maxResources ?? {
cpu: 8,
memory: 16,
};

let dequeuedMessages: DequeuedMessage[] = [];

//we need to check the current worker, because a run might have been locked to it
const workers = body.oldWorkers.concat(body.currentWorker);

//first we want to clear out old runs
for (const worker of workers) {
//dequeue
const latestResult = await engine.dequeueFromBackgroundWorkerMasterQueue({
consumerId: authentication.environment.id,
//specific version
backgroundWorkerId: BackgroundWorkerId.toId(worker),
maxRunCount: maxDequeueCount,
maxResources: availableResources,
});

//add runs to the array
dequeuedMessages.push(...latestResult);

//update availableResources
const consumedResources = latestResult.reduce(
(acc, r) => {
return {
cpu: acc.cpu + r.run.machine.cpu,
memory: acc.memory + r.run.machine.memory,
};
},
{ cpu: 0, memory: 0 }
);
updateAvailableResources(availableResources, consumedResources);

//update maxDequeueCount
maxDequeueCount -= latestResult.length;

//if we have no resources left, we exit the loop
if (!hasAvailableResources(availableResources)) break;
//we've already dequeued the max number of runs
if (maxDequeueCount <= 0) break;
}

//dequeue from the current version if we still have space
if (hasAvailableResources(availableResources) && maxDequeueCount > 0) {
const latestResult = await engine.dequeueFromEnvironmentMasterQueue({
consumerId: authentication.environment.id,
//current dev version (no specific version specified)
environmentId: authentication.environment.id,
maxRunCount: maxDequeueCount,
maxResources: availableResources,
});
dequeuedMessages.push(...latestResult);
}
async ({ authentication }) => {
const dequeuedMessages = await engine.dequeueFromEnvironmentWorkerQueue({
consumerId: authentication.environment.id,
environmentId: authentication.environment.id,
});

return json({ dequeuedMessages }, { status: 200 });
}
);

function updateAvailableResources(
availableResources: MachineResources,
resources: MachineResources
) {
availableResources.cpu -= resources.cpu;
availableResources.memory -= resources.memory;
}

function hasAvailableResources(availableResources: MachineResources) {
return availableResources.cpu > 0 && availableResources.memory > 0;
}

export { action };
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { json, TypedResponse } from "@remix-run/server-runtime";
import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";
import { WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers";
import { z } from "zod";
import { $replica, prisma } from "~/db.server";
import { createLoaderWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";

// Keep this route for backwards compatibility
export const loader = createLoaderWorkerApiRoute(
{
params: z.object({
Expand All @@ -14,55 +13,7 @@ export const loader = createLoaderWorkerApiRoute(
maxRunCount: z.coerce.number().optional(),
}),
},
async ({
authenticatedWorker,
params,
searchParams,
}): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
const deployment = await $replica.workerDeployment.findUnique({
where: {
friendlyId: params.deploymentFriendlyId,
},
include: {
worker: true,
},
});

if (!deployment) {
throw new Error("Deployment not found");
}

if (!deployment.worker) {
throw new Error("Worker not found");
}

const dequeuedMessages = (await isCurrentDeployment(deployment.id, deployment.environmentId))
? await authenticatedWorker.dequeueFromEnvironment(
deployment.worker.id,
deployment.environmentId
)
: await authenticatedWorker.dequeueFromVersion(
deployment.worker.id,
searchParams.maxRunCount
);

return json(dequeuedMessages);
async (): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
return json([]);
}
);

async function isCurrentDeployment(deploymentId: string, environmentId: string): Promise<boolean> {
const promotion = await prisma.workerDeploymentPromotion.findUnique({
where: {
environmentId_label: {
environmentId,
label: CURRENT_DEPLOYMENT_LABEL,
},
},
});

if (!promotion) {
return false;
}

return promotion.deploymentId === deploymentId;
}
11 changes: 3 additions & 8 deletions apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,9 @@ import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.

export const action = createActionWorkerApiRoute(
{
body: WorkerApiDequeueRequestBody,
body: WorkerApiDequeueRequestBody, // Even though we don't use it, we need to keep it for backwards compatibility
},
async ({ authenticatedWorker, body }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
return json(
await authenticatedWorker.dequeue({
maxResources: body.maxResources,
maxRunCount: body.maxRunCount,
})
);
async ({ authenticatedWorker }): Promise<TypedResponse<WorkerApiDequeueResponseBody>> => {
return json(await authenticatedWorker.dequeue());
}
);
Original file line number Diff line number Diff line change
Expand Up @@ -700,12 +700,8 @@ function RunBody({
{isAdmin && (
<>
<Property.Item>
<Property.Label>Primary master queue</Property.Label>
<Property.Value>{run.masterQueue}</Property.Value>
</Property.Item>
<Property.Item>
<Property.Label>Secondary master queue</Property.Label>
<Property.Value>{run.secondaryMasterQueue ?? "–"}</Property.Value>
<Property.Label>Worker queue</Property.Label>
<Property.Value>{run.workerQueue}</Property.Value>
</Property.Item>
</>
)}
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ export class DefaultQueueManager implements QueueManager {
};
}

async getMasterQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
async getWorkerQueue(environment: AuthenticatedEnvironment): Promise<string | undefined> {
if (environment.type === "DEVELOPMENT") {
return;
return environment.id;
}

const workerGroupService = new WorkerGroupService({
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class RunEngineTriggerTaskService {
lockedQueueId,
});

const masterQueue = await this.queueConcern.getMasterQueue(environment);
const workerQueue = await this.queueConcern.getWorkerQueue(environment);

try {
return await this.traceEventConcern.traceRun(triggerRequest, async (event) => {
Expand Down Expand Up @@ -271,7 +271,7 @@ export class RunEngineTriggerTaskService {
concurrencyKey: body.options?.concurrencyKey,
queue: queueName,
lockedQueueId,
masterQueue: masterQueue,
workerQueue,
isTest: body.options?.test ?? false,
delayUntil,
queuedAt: delayUntil ? undefined : new Date(),
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/runEngine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export interface QueueManager {
): Promise<QueueProperties>;
getQueueName(request: TriggerTaskRequest): Promise<string>;
validateQueueLimits(env: AuthenticatedEnvironment): Promise<QueueValidationResult>;
getMasterQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
getWorkerQueue(env: AuthenticatedEnvironment): Promise<string | undefined>;
}

export interface PayloadProcessor {
Expand Down
10 changes: 2 additions & 8 deletions apps/webapp/app/services/deleteProject.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,9 @@ export class DeleteProjectService {
}

// Delete all queues from the RunEngine 2 prod master queues
const workerGroups = await this.#prismaClient.workerInstanceGroup.findMany({
select: {
masterQueue: true,
},
});
const engineMasterQueues = workerGroups.map((group) => group.masterQueue);
for (const masterQueue of engineMasterQueues) {
for (const environment of project.environments) {
await engine.removeEnvironmentQueuesFromMasterQueue({
masterQueue,
runtimeEnvironmentId: environment.id,
organizationId: project.organization.id,
projectId: project.id,
});
Expand Down
Loading