diff --git a/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts new file mode 100644 index 0000000000..53b2463347 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts @@ -0,0 +1,95 @@ +import { ActionFunctionArgs, json } from "@remix-run/server-runtime"; +import pMap from "p-map"; +import { z } from "zod"; +import { $replica, prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { determineEngineVersion } from "~/v3/engineVersion.server"; +import { engine } from "~/v3/runEngine.server"; + +const ParamsSchema = z.object({ + environmentId: z.string(), +}); + +const BodySchema = z.object({ + dryRun: z.boolean().default(true), + queues: z.array(z.string()).default([]), +}); + +export async function action({ request, params }: ActionFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + const parsedParams = ParamsSchema.parse(params); + + const environment = await prisma.runtimeEnvironment.findFirst({ + where: { + id: parsedParams.environmentId, + }, + include: { + organization: true, + project: true, + orgMember: true, + }, + }); + + if (!environment) { + return json({ error: "Environment not found" }, { status: 404 }); + } + + const engineVersion = await determineEngineVersion({ environment }); + + if (engineVersion === "V1") { + return json({ error: "Engine version is V1" }, { status: 400 }); + } + + const body = await request.json(); + const parsedBody = BodySchema.parse(body); + + const queues = await $replica.taskQueue.findMany({ + where: { + runtimeEnvironmentId: environment.id, + version: "V2", + name: parsedBody.queues.length > 0 ? { in: parsedBody.queues } : undefined, + }, + select: { + friendlyId: true, + name: true, + concurrencyLimit: true, + type: true, + paused: true, + }, + orderBy: { + orderableName: "asc", + }, + }); + + const repairEnvironmentResults = await engine.repairEnvironment(environment, parsedBody.dryRun); + + const repairResults = await pMap( + queues, + async (queue) => { + return engine.repairQueue(environment, queue.name, parsedBody.dryRun); + }, + { concurrency: 5 } + ); + + return json({ environment: repairEnvironmentResults, queues: repairResults }); +} diff --git a/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts new file mode 100644 index 0000000000..3ea9576899 --- /dev/null +++ b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.report.ts @@ -0,0 +1,95 @@ +import { json, LoaderFunctionArgs } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica, prisma } from "~/db.server"; +import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server"; +import { determineEngineVersion } from "~/v3/engineVersion.server"; +import { engine } from "~/v3/runEngine.server"; + +const ParamsSchema = z.object({ + environmentId: z.string(), +}); + +const SearchParamsSchema = z.object({ + verbose: z.string().default("0"), + page: z.coerce.number().optional(), + per_page: z.coerce.number().optional(), +}); + +export async function loader({ request, params }: LoaderFunctionArgs) { + // Next authenticate the request + const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request); + + if (!authenticationResult) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + const user = await prisma.user.findUnique({ + where: { + id: authenticationResult.userId, + }, + }); + + if (!user) { + return json({ error: "Invalid or Missing API key" }, { status: 401 }); + } + + if (!user.admin) { + return json({ error: "You must be an admin to perform this action" }, { status: 403 }); + } + + const parsedParams = ParamsSchema.parse(params); + + const environment = await prisma.runtimeEnvironment.findFirst({ + where: { + id: parsedParams.environmentId, + }, + include: { + organization: true, + project: true, + orgMember: true, + }, + }); + + if (!environment) { + return json({ error: "Environment not found" }, { status: 404 }); + } + + const engineVersion = await determineEngineVersion({ environment }); + + if (engineVersion === "V1") { + return json({ error: "Engine version is V1" }, { status: 400 }); + } + + const url = new URL(request.url); + const searchParams = SearchParamsSchema.parse(Object.fromEntries(url.searchParams)); + + const page = searchParams.page ?? 1; + const perPage = searchParams.per_page ?? 50; + + const queues = await $replica.taskQueue.findMany({ + where: { + runtimeEnvironmentId: environment.id, + version: "V2", + }, + select: { + friendlyId: true, + name: true, + concurrencyLimit: true, + type: true, + paused: true, + }, + orderBy: { + orderableName: "asc", + }, + skip: (page - 1) * perPage, + take: perPage, + }); + + const report = await engine.generateEnvironmentReport( + environment, + queues, + searchParams.verbose === "1" + ); + + return json(report); +} diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index a12abc73e2..680d385ca4 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -30,7 +30,8 @@ "nanoid": "3.3.8", "redlock": "5.0.0-beta.2", "seedrandom": "^3.0.5", - "zod": "3.25.76" + "zod": "3.25.76", + "p-map": "^6.0.0" }, "devDependencies": { "@internal/testcontainers": "workspace:*", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index f9b3061e91..84cce4b9d5 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -30,7 +30,7 @@ import { EventEmitter } from "node:events"; import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrategy.js"; import { RunQueue } from "../run-queue/index.js"; import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js"; -import { MinimalAuthenticatedEnvironment } from "../shared/index.js"; +import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js"; import { BillingCache } from "./billingCache.js"; import { NotImplementedError, RunDuplicateIdempotencyKeyError } from "./errors.js"; import { EventBus, EventBusEvents } from "./eventBus.js"; @@ -53,8 +53,15 @@ import { RunAttemptSystem } from "./systems/runAttemptSystem.js"; import { SystemResources } from "./systems/systems.js"; import { TtlSystem } from "./systems/ttlSystem.js"; import { WaitpointSystem } from "./systems/waitpointSystem.js"; -import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js"; +import { + EngineWorker, + HeartbeatTimeouts, + ReportableQueue, + RunEngineOptions, + TriggerParams, +} from "./types.js"; import { workerCatalog } from "./workerCatalog.js"; +import pMap from "p-map"; export class RunEngine { private runLockRedis: Redis; @@ -1162,6 +1169,206 @@ export class RunEngine { } } + async repairEnvironment(environment: AuthenticatedEnvironment, dryRun: boolean) { + const runIds = await this.runQueue.getCurrentConcurrencyOfEnvironment(environment); + + const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000); + + if (dryRun) { + return { + runIds, + completedRunIds: completedRuns.map((r) => r.id), + dryRun, + }; + } + + if (completedRuns.length === 0) { + return { + runIds, + completedRunIds: [], + dryRun, + }; + } + + await pMap( + completedRuns, + async (run) => { + await this.runQueue.acknowledgeMessage(run.orgId, run.id, { + skipDequeueProcessing: true, + removeFromWorkerQueue: false, + }); + }, + { concurrency: 5 } + ); + + return { + runIds, + completedRunIds: completedRuns.map((r) => r.id), + dryRun, + }; + } + + async repairQueue(environment: AuthenticatedEnvironment, queue: string, dryRun: boolean) { + const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue); + + const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000); + + if (dryRun) { + return { + queue, + runIds, + completedRunIds: completedRuns.map((r) => r.id), + dryRun, + }; + } + + if (completedRuns.length === 0) { + return { + queue, + runIds, + completedRunIds: [], + dryRun, + }; + } + + await pMap( + completedRuns, + async (run) => { + await this.runQueue.acknowledgeMessage(run.orgId, run.id, { + skipDequeueProcessing: true, + removeFromWorkerQueue: false, + }); + }, + { concurrency: 5 } + ); + + return { + queue, + runIds, + completedRunIds: completedRuns.map((r) => r.id), + dryRun, + }; + } + + async generateEnvironmentReport( + environment: AuthenticatedEnvironment, + queues: ReportableQueue[], + verbose: boolean + ) { + const [ + concurrencyLimit, // env limit (no burst) + concurrencyLimitWithBurstFactor, // env limit * burst + currentDequeued, + currentConcurrency, + burstFactor, + ] = await Promise.all([ + this.runQueue.getEnvConcurrencyLimit(environment), + this.runQueue.getEnvConcurrencyLimitWithBurstFactor(environment), + this.runQueue.currentConcurrencyOfEnvironment(environment), // "currentDequeued" in your label terminology + this.runQueue.operationalCurrentConcurrencyOfEnvironment(environment), + this.runQueue.getEnvConcurrencyBurstFactor(environment), + ]); + + const envMetrics = { + envCurrent: currentConcurrency, + envLimit: concurrencyLimit, + envLimitWithBurst: concurrencyLimitWithBurstFactor, + burstFactor, + }; + + const envAnalysis = analyzeEnvironment(envMetrics); + + const queueReports = await pMap( + queues, + async (queue) => { + return this.#generateReportForQueue(environment, queue, envMetrics, verbose); + }, + { concurrency: 5 } + ); + + return { + concurrencyLimit: { + value: concurrencyLimit, + key: verbose ? this.runQueue.keys.envConcurrencyLimitKey(environment) : undefined, + }, + concurrencyLimitWithBurstFactor: { + value: concurrencyLimitWithBurstFactor, + key: verbose + ? this.runQueue.keys.envConcurrencyLimitBurstFactorKey(environment) + : undefined, + }, + currentDequeued: { + value: currentDequeued, + key: verbose ? this.runQueue.keys.envCurrentDequeuedKey(environment) : undefined, + label: "Env current dequeued, this is what is displayed to the user", + }, + currentConcurrency: { + value: currentConcurrency, + key: verbose ? this.runQueue.keys.envCurrentConcurrencyKey(environment) : undefined, + label: + "Env current concurrency, this is what is used to determine if the environment can be dequeued from", + }, + analysis: envAnalysis, + queues: queueReports, + }; + } + + async #generateReportForQueue( + environment: AuthenticatedEnvironment, + queue: ReportableQueue, + envMetrics: EnvInputs, + verbose: boolean + ) { + const currentConcurrency = await this.runQueue.currentConcurrencyOfQueue( + environment, + queue.name + ); + const currentDequeued = await this.runQueue.currentDequeuedOfQueue(environment, queue.name); + const concurrencyLimit = await this.runQueue.getQueueConcurrencyLimit(environment, queue.name); + const messagesDueCount = await this.runQueue.lengthOfQueueAvailableMessages( + environment, + queue.name + ); + + const queueAnalysis = analyzeQueue({ + paused: queue.paused === true, + envLimit: envMetrics.envLimit, + envLimitWithBurst: envMetrics.envLimitWithBurst, + queueLimit: typeof concurrencyLimit === "number" ? concurrencyLimit : undefined, + queueCurrent: currentConcurrency, + envCurrent: envMetrics.envCurrent, + dueCount: messagesDueCount, + }); + + return { + name: queue.name, + friendlyId: queue.friendlyId, + type: queue.type, + paused: queue.paused, + dbConcurrencyLimit: queue.concurrencyLimit, + key: this.runQueue.keys.queueKey(environment, queue.name), + analysis: queueAnalysis, + concurrencyLimit: { + value: typeof concurrencyLimit === "number" ? concurrencyLimit : null, + key: verbose + ? this.runQueue.keys.queueConcurrencyLimitKey(environment, queue.name) + : undefined, + }, + currentConcurrency: { + value: currentConcurrency, + key: verbose + ? this.runQueue.keys.queueCurrentConcurrencyKey(environment, queue.name) + : undefined, + }, + currentDequeued: { + value: currentDequeued, + key: verbose + ? this.runQueue.keys.queueCurrentDequeuedKey(environment, queue.name) + : undefined, + }, + }; + } + async #handleStalledSnapshot({ runId, snapshotId, @@ -1436,13 +1643,14 @@ export class RunEngine { } async #concurrencySweeperCallback( - runIds: string[] + runIds: string[], + completedAtOffsetMs: number = 1000 * 60 * 10 ): Promise> { const runs = await this.readOnlyPrisma.taskRun.findMany({ where: { id: { in: runIds }, completedAt: { - lte: new Date(Date.now() - 1000 * 60 * 10), // This only finds runs that were completed more than 10 minutes ago + lte: new Date(Date.now() - completedAtOffsetMs), // This only finds runs that were completed more than 10 minutes ago }, organizationId: { not: null, @@ -1483,3 +1691,98 @@ export class RunEngine { this.billingCache.invalidate(orgId); } } + +type EnvInputs = { + envCurrent: number; + envLimit: number; + envLimitWithBurst: number; + burstFactor?: number; +}; + +function analyzeEnvironment(inputs: EnvInputs) { + const { envCurrent, envLimit, envLimitWithBurst, burstFactor } = inputs; + + const reasons: string[] = []; + const envAvailableCapacity = Math.max(0, envLimitWithBurst - envCurrent); + const canDequeue = envAvailableCapacity > 0; + + if (!canDequeue) { + reasons.push( + `Environment concurrency (${envCurrent}) has reached the limit with burst (${envLimitWithBurst}).` + ); + } + + return { + canDequeue, + reasons, + metrics: { + envAvailableCapacity, + }, + }; +} + +type QueueInputs = { + paused?: boolean; + envLimit: number; + envLimitWithBurst: number; + queueLimit?: number; // undefined => no explicit queue limit (Lua uses a huge default) + queueCurrent: number; + envCurrent: number; + dueCount?: number; // optional (if you implement countDueMessages) +}; + +function analyzeQueue(inputs: QueueInputs) { + const { paused, envLimit, envLimitWithBurst, queueLimit, queueCurrent, envCurrent, dueCount } = + inputs; + + const reasons: string[] = []; + + // Effective queue limit mirrors the Lua: min(queueLimit || 1_000_000, envLimit) + const queueLimitCapped = typeof queueLimit === "number" ? queueLimit : 1_000_000; + const effectiveQueueLimit = Math.min(queueLimitCapped, envLimit); + + const envAvailable = Math.max(0, envLimitWithBurst - envCurrent); + const queueAvailable = Math.max(0, effectiveQueueLimit - queueCurrent); + + // Mirror Lua's actualMaxCount = min(maxCount, envAvailable, queueAvailable). + // Here we only need to know if capacity exists at all (maxCount >= 1 assumed). + const hasCapacity = envAvailable > 0 && queueAvailable > 0; + + // High-signal reasons (ordered) + if (paused) { + reasons.push("Queue is paused."); + } + + if (envAvailable <= 0) { + reasons.push( + `Environment concurrency (${envCurrent}) has reached the limit with burst (${envLimitWithBurst}).` + ); + } + + if (queueAvailable <= 0) { + reasons.push( + `Queue concurrency (${queueCurrent}) has reached the effective queue limit (${effectiveQueueLimit}).` + ); + } + + // Optional visibility: no due messages (score > now or empty queue) + if (typeof dueCount === "number" && dueCount <= 0) { + reasons.push("No due messages in the queue (nothing scored ≤ now)."); + } + + // Final decision: + // - Not paused + // - Has capacity (both env and queue) + // - And (optionally) has work due + const canDequeue = !paused && hasCapacity && (typeof dueCount === "number" ? dueCount > 0 : true); + + return { + canDequeue, + reasons: canDequeue ? [] : reasons, + metrics: { + effectiveQueueLimit, + queueAvailableCapacity: queueAvailable, + messagesDueCount: typeof dueCount === "number" ? dueCount : null, + }, + }; +} diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 5125be560e..5e629acc07 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -150,3 +150,11 @@ export type TriggerParams = { }; export type EngineWorker = Worker; + +export type ReportableQueue = { + name: string; + concurrencyLimit: number | null; + type: string; + paused: boolean; + friendlyId: string; +}; diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 81cbb0379c..aa30fc31c7 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -355,6 +355,24 @@ export class RunQueue { return Math.floor(limit * burstFactor); } + public async getEnvConcurrencyBurstFactor(env: MinimalAuthenticatedEnvironment) { + const result = await this.redis.get(this.keys.envConcurrencyLimitBurstFactorKey(env)); + + const burstFactor = result + ? Number(result) + : this.options.defaultEnvConcurrencyBurstFactor ?? 1; + + return burstFactor; + } + + public async getCurrentConcurrencyOfEnvironment(env: MinimalAuthenticatedEnvironment) { + return this.redis.smembers(this.keys.envCurrentConcurrencyKey(env)); + } + + public async getCurrentConcurrencyOfQueue(env: MinimalAuthenticatedEnvironment, queue: string) { + return this.redis.smembers(this.keys.queueCurrentConcurrencyKey(env, queue)); + } + public async lengthOfQueue( env: MinimalAuthenticatedEnvironment, queue: string, @@ -363,6 +381,19 @@ export class RunQueue { return this.redis.zcard(this.keys.queueKey(env, queue, concurrencyKey)); } + public async lengthOfQueueAvailableMessages( + env: MinimalAuthenticatedEnvironment, + queue: string, + currentTime: Date = new Date(), + concurrencyKey?: string + ) { + return this.redis.zcount( + this.keys.queueKey(env, queue, concurrencyKey), + "-inf", + String(currentTime.getTime()) + ); + } + public async lengthOfEnvQueue(env: MinimalAuthenticatedEnvironment) { return this.redis.zcard(this.keys.envQueueKey(env)); } @@ -419,6 +450,14 @@ export class RunQueue { return this.redis.scard(this.keys.queueCurrentConcurrencyKey(env, queue, concurrencyKey)); } + public async currentDequeuedOfQueue( + env: MinimalAuthenticatedEnvironment, + queue: string, + concurrencyKey?: string + ) { + return this.redis.scard(this.keys.queueCurrentDequeuedKey(env, queue, concurrencyKey)); + } + public async currentConcurrencyOfQueues( env: MinimalAuthenticatedEnvironment, queues: string[] @@ -502,6 +541,15 @@ export class RunQueue { return this.redis.scard(this.keys.envCurrentDequeuedKey(env)); } + /** + * Get the operational current concurrency of the environment + * @param env - The environment to get the current concurrency of + * @returns The current concurrency of the environment + */ + public async operationalCurrentConcurrencyOfEnvironment(env: MinimalAuthenticatedEnvironment) { + return this.redis.scard(this.keys.envCurrentConcurrencyKey(env)); + } + public async messageExists(orgId: string, messageId: string) { return this.redis.exists(this.keys.messageKey(orgId, messageId)); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6c7ca99a06..a7f137024e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1129,6 +1129,9 @@ importers: nanoid: specifier: 3.3.8 version: 3.3.8 + p-map: + specifier: ^6.0.0 + version: 6.0.0 redlock: specifier: 5.0.0-beta.2 version: 5.0.0-beta.2(patch_hash=rwyegdki7iserrd7fgjwxkhnlu)