diff --git a/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts index 53b2463347..30d60197f9 100644 --- a/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts +++ b/apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts @@ -86,7 +86,17 @@ export async function action({ request, params }: ActionFunctionArgs) { const repairResults = await pMap( queues, async (queue) => { - return engine.repairQueue(environment, queue.name, parsedBody.dryRun); + const repair = await engine.repairQueue( + environment, + queue.name, + parsedBody.dryRun, + repairEnvironmentResults.runIds + ); + + return { + queue: queue.name, + ...repair, + }; }, { concurrency: 5 } ); diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index a2f21040de..0c38d1bf44 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -71,6 +71,7 @@ export class RunEngine { private tracer: Tracer; private meter: Meter; private heartbeatTimeouts: HeartbeatTimeouts; + private repairSnapshotTimeoutMs: number; prisma: PrismaClient; readOnlyPrisma: PrismaReplicaClient; @@ -191,6 +192,9 @@ export class RunEngine { heartbeatSnapshot: async ({ payload }) => { await this.#handleStalledSnapshot(payload); }, + repairSnapshot: async ({ payload }) => { + await this.#handleRepairSnapshot(payload); + }, expireRun: async ({ payload }) => { await this.ttlSystem.expireRun({ runId: payload.runId }); }, @@ -241,6 +245,8 @@ export class RunEngine { ...(options.heartbeatTimeoutsMs ?? {}), }; + this.repairSnapshotTimeoutMs = options.repairSnapshotTimeoutMs ?? 60_000; + const resources: SystemResources = { prisma: this.prisma, worker: this.worker, @@ -1174,81 +1180,77 @@ export class RunEngine { async repairEnvironment(environment: AuthenticatedEnvironment, dryRun: boolean) { const runIds = await this.runQueue.getCurrentConcurrencyOfEnvironment(environment); - const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000); + return this.#repairRuns(runIds, dryRun); + } - if (dryRun) { - return { - runIds, - completedRunIds: completedRuns.map((r) => r.id), - dryRun, - }; - } + async repairQueue( + environment: AuthenticatedEnvironment, + queue: string, + dryRun: boolean, + ignoreRunIds: string[] + ) { + const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue); + + const runIdsToRepair = runIds.filter((runId) => !ignoreRunIds.includes(runId)); + + return this.#repairRuns(runIdsToRepair, dryRun); + } - if (completedRuns.length === 0) { + async #repairRuns(runIds: string[], dryRun: boolean) { + if (runIds.length === 0) { return { runIds, - completedRunIds: [], + repairs: [], dryRun, }; } - await pMap( - completedRuns, - async (run) => { - await this.runQueue.acknowledgeMessage(run.orgId, run.id, { - skipDequeueProcessing: true, - removeFromWorkerQueue: false, - }); + const repairs = await pMap( + runIds, + async (runId) => { + return this.#repairRun(runId, dryRun); }, { concurrency: 5 } ); return { runIds, - completedRunIds: completedRuns.map((r) => r.id), + repairs, dryRun, }; } - async repairQueue(environment: AuthenticatedEnvironment, queue: string, dryRun: boolean) { - const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue); - - const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000); - - if (dryRun) { - return { - queue, - runIds, - completedRunIds: completedRuns.map((r) => r.id), - dryRun, - }; - } + async #repairRun(runId: string, dryRun: boolean) { + const snapshot = await getLatestExecutionSnapshot(this.prisma, runId); + + if ( + snapshot.executionStatus === "QUEUED" || + snapshot.executionStatus === "SUSPENDED" || + snapshot.executionStatus === "FINISHED" + ) { + if (!dryRun) { + // Schedule the repair job + await this.worker.enqueueOnce({ + id: `repair-in-progress-run:${runId}`, + job: "repairSnapshot", + payload: { runId, snapshotId: snapshot.id, executionStatus: snapshot.executionStatus }, + availableAt: new Date(Date.now() + this.repairSnapshotTimeoutMs), + }); + } - if (completedRuns.length === 0) { return { - queue, - runIds, - completedRunIds: [], - dryRun, + action: "repairSnapshot", + runId, + snapshotStatus: snapshot.executionStatus, + snapshotId: snapshot.id, }; } - await pMap( - completedRuns, - async (run) => { - await this.runQueue.acknowledgeMessage(run.orgId, run.id, { - skipDequeueProcessing: true, - removeFromWorkerQueue: false, - }); - }, - { concurrency: 5 } - ); - return { - queue, - runIds, - completedRunIds: completedRuns.map((r) => r.id), - dryRun, + action: "ignore", + runId, + snapshotStatus: snapshot.executionStatus, + snapshotId: snapshot.id, }; } @@ -1650,6 +1652,117 @@ export class RunEngine { }); } + async #handleRepairSnapshot({ + runId, + snapshotId, + executionStatus, + }: { + runId: string; + snapshotId: string; + executionStatus: string; + }) { + return await this.runLock.lock("handleRepairSnapshot", [runId], async () => { + const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId); + + if (latestSnapshot.id !== snapshotId) { + this.logger.log( + "RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.", + { + runId, + snapshotId, + latestSnapshotExecutionStatus: latestSnapshot.executionStatus, + repairExecutionStatus: executionStatus, + } + ); + + return; + } + + // Okay, so this means we haven't transitioned to a new status yes, so we need to do something + switch (latestSnapshot.executionStatus) { + case "EXECUTING": + case "EXECUTING_WITH_WAITPOINTS": + case "PENDING_CANCEL": + case "PENDING_EXECUTING": + case "QUEUED_EXECUTING": + case "RUN_CREATED": { + // Do nothing; + return; + } + case "QUEUED": { + this.logger.log("RunEngine.handleRepairSnapshot QUEUED", { + runId, + snapshotId, + }); + + //it will automatically be requeued X times depending on the queue retry settings + const gotRequeued = await this.runQueue.nackMessage({ + orgId: latestSnapshot.organizationId, + messageId: runId, + }); + + if (!gotRequeued) { + this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", { + runId, + snapshot: latestSnapshot, + }); + } else { + this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", { + runId, + snapshot: latestSnapshot, + }); + } + + break; + } + case "FINISHED": + case "SUSPENDED": { + this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED", { + runId, + snapshotId, + }); + + const taskRun = await this.prisma.taskRun.findFirst({ + where: { id: runId }, + select: { + queue: true, + }, + }); + + if (!taskRun) { + this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found", { + runId, + snapshotId, + }); + return; + } + + // We need to clear this run from the current concurrency sets + await this.runQueue.clearMessageFromConcurrencySets({ + runId, + orgId: latestSnapshot.organizationId, + queue: taskRun.queue, + env: { + id: latestSnapshot.environmentId, + type: latestSnapshot.environmentType, + project: { + id: latestSnapshot.projectId, + }, + organization: { + id: latestSnapshot.organizationId, + }, + }, + }); + + break; + } + default: { + assertNever(latestSnapshot.executionStatus); + } + } + }); + } + async #concurrencySweeperCallback( runIds: string[], completedAtOffsetMs: number = 1000 * 60 * 10 diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 5e629acc07..040cb3cd09 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -71,6 +71,7 @@ export type RunEngineOptions = { /** If not set then checkpoints won't ever be used */ retryWarmStartThresholdMs?: number; heartbeatTimeoutsMs?: Partial; + repairSnapshotTimeoutMs?: number; treatProductionExecutionStallsAsOOM?: boolean; suspendedHeartbeatRetriesConfig?: { maxCount?: number; diff --git a/internal-packages/run-engine/src/engine/workerCatalog.ts b/internal-packages/run-engine/src/engine/workerCatalog.ts index 81918ac119..2ed6f5076b 100644 --- a/internal-packages/run-engine/src/engine/workerCatalog.ts +++ b/internal-packages/run-engine/src/engine/workerCatalog.ts @@ -16,6 +16,14 @@ export const workerCatalog = { }), visibilityTimeoutMs: 30_000, }, + repairSnapshot: { + schema: z.object({ + runId: z.string(), + snapshotId: z.string(), + executionStatus: z.string(), + }), + visibilityTimeoutMs: 30_000, + }, expireRun: { schema: z.object({ runId: z.string(), diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index e854c73ce9..7a4066e706 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -40,6 +40,7 @@ import { OutputPayload, OutputPayloadV2, RunQueueKeyProducer, + RunQueueKeyProducerEnvironment, RunQueueSelectionStrategy, } from "./types.js"; import { WorkerQueueResolver } from "./workerQueueResolver.js"; @@ -930,6 +931,15 @@ export class RunQueue { }); } + public async clearMessageFromConcurrencySets(params: { + runId: string; + orgId: string; + queue: string; + env: RunQueueKeyProducerEnvironment; + }) { + return this.#callClearMessageFromConcurrencySets(params); + } + async quit() { this.abortController.abort(); @@ -1799,6 +1809,45 @@ export class RunQueue { ); } + async #callClearMessageFromConcurrencySets({ + runId, + orgId, + queue, + env, + }: { + runId: string; + orgId: string; + queue: string; + env: RunQueueKeyProducerEnvironment; + }) { + const messageId = runId; + const messageKey = this.keys.messageKey(orgId, messageId); + const queueCurrentConcurrencyKey = this.keys.queueCurrentConcurrencyKey(env, queue); + const envCurrentConcurrencyKey = this.keys.envCurrentConcurrencyKey(env); + const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKey(env, queue); + const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKey(env); + + this.logger.debug("Calling clearMessageFromConcurrencySets", { + messageKey, + queue, + env, + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + messageId, + service: this.name, + }); + + return this.redis.clearMessageFromConcurrencySets( + queueCurrentConcurrencyKey, + envCurrentConcurrencyKey, + queueCurrentDequeuedKey, + envCurrentDequeuedKey, + messageId + ); + } + async #callNackMessage({ message, retryAt }: { message: OutputPayload; retryAt?: number }) { const messageId = message.runId; const messageKey = this.keys.messageKey(message.orgId, message.runId); @@ -2640,6 +2689,26 @@ end return results `, }); + + this.redis.defineCommand("clearMessageFromConcurrencySets", { + numberOfKeys: 4, + lua: ` +-- Keys: +local queueCurrentConcurrencyKey = KEYS[1] +local envCurrentConcurrencyKey = KEYS[2] +local queueCurrentDequeuedKey = KEYS[3] +local envCurrentDequeuedKey = KEYS[4] + +-- Args: +local messageId = ARGV[1] + +-- Update the concurrency keys +redis.call('SREM', queueCurrentConcurrencyKey, messageId) +redis.call('SREM', envCurrentConcurrencyKey, messageId) +redis.call('SREM', queueCurrentDequeuedKey, messageId) +redis.call('SREM', envCurrentDequeuedKey, messageId) +`, + }); } } @@ -2724,6 +2793,17 @@ declare module "@internal/redis" { callback?: Callback ): Result; + clearMessageFromConcurrencySets( + // keys + queueCurrentConcurrencyKey: string, + envCurrentConcurrencyKey: string, + queueCurrentDequeuedKey: string, + envCurrentDequeuedKey: string, + // args + messageId: string, + callback?: Callback + ): Result; + nackMessage( // keys masterQueueKey: string,