Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,17 @@ export async function action({ request, params }: ActionFunctionArgs) {
const repairResults = await pMap(
queues,
async (queue) => {
return engine.repairQueue(environment, queue.name, parsedBody.dryRun);
const repair = await engine.repairQueue(
environment,
queue.name,
parsedBody.dryRun,
repairEnvironmentResults.runIds
);

return {
queue: queue.name,
...repair,
};
},
{ concurrency: 5 }
);
Expand Down
215 changes: 164 additions & 51 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export class RunEngine {
private tracer: Tracer;
private meter: Meter;
private heartbeatTimeouts: HeartbeatTimeouts;
private repairSnapshotTimeoutMs: number;

prisma: PrismaClient;
readOnlyPrisma: PrismaReplicaClient;
Expand Down Expand Up @@ -191,6 +192,9 @@ export class RunEngine {
heartbeatSnapshot: async ({ payload }) => {
await this.#handleStalledSnapshot(payload);
},
repairSnapshot: async ({ payload }) => {
await this.#handleRepairSnapshot(payload);
},
expireRun: async ({ payload }) => {
await this.ttlSystem.expireRun({ runId: payload.runId });
},
Expand Down Expand Up @@ -241,6 +245,8 @@ export class RunEngine {
...(options.heartbeatTimeoutsMs ?? {}),
};

this.repairSnapshotTimeoutMs = options.repairSnapshotTimeoutMs ?? 60_000;

const resources: SystemResources = {
prisma: this.prisma,
worker: this.worker,
Expand Down Expand Up @@ -1174,81 +1180,77 @@ export class RunEngine {
async repairEnvironment(environment: AuthenticatedEnvironment, dryRun: boolean) {
const runIds = await this.runQueue.getCurrentConcurrencyOfEnvironment(environment);

const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);
return this.#repairRuns(runIds, dryRun);
}

if (dryRun) {
return {
runIds,
completedRunIds: completedRuns.map((r) => r.id),
dryRun,
};
}
async repairQueue(
environment: AuthenticatedEnvironment,
queue: string,
dryRun: boolean,
ignoreRunIds: string[]
) {
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);

const runIdsToRepair = runIds.filter((runId) => !ignoreRunIds.includes(runId));

return this.#repairRuns(runIdsToRepair, dryRun);
}

if (completedRuns.length === 0) {
async #repairRuns(runIds: string[], dryRun: boolean) {
if (runIds.length === 0) {
return {
runIds,
completedRunIds: [],
repairs: [],
dryRun,
};
}

await pMap(
completedRuns,
async (run) => {
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
skipDequeueProcessing: true,
removeFromWorkerQueue: false,
});
const repairs = await pMap(
runIds,
async (runId) => {
return this.#repairRun(runId, dryRun);
},
{ concurrency: 5 }
);

return {
runIds,
completedRunIds: completedRuns.map((r) => r.id),
repairs,
dryRun,
};
}

async repairQueue(environment: AuthenticatedEnvironment, queue: string, dryRun: boolean) {
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);

const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);

if (dryRun) {
return {
queue,
runIds,
completedRunIds: completedRuns.map((r) => r.id),
dryRun,
};
}
async #repairRun(runId: string, dryRun: boolean) {
const snapshot = await getLatestExecutionSnapshot(this.prisma, runId);

if (
snapshot.executionStatus === "QUEUED" ||
snapshot.executionStatus === "SUSPENDED" ||
snapshot.executionStatus === "FINISHED"
) {
if (!dryRun) {
// Schedule the repair job
await this.worker.enqueueOnce({
id: `repair-in-progress-run:${runId}`,
job: "repairSnapshot",
payload: { runId, snapshotId: snapshot.id, executionStatus: snapshot.executionStatus },
availableAt: new Date(Date.now() + this.repairSnapshotTimeoutMs),
});
}

if (completedRuns.length === 0) {
return {
queue,
runIds,
completedRunIds: [],
dryRun,
action: "repairSnapshot",
runId,
snapshotStatus: snapshot.executionStatus,
snapshotId: snapshot.id,
};
}

await pMap(
completedRuns,
async (run) => {
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
skipDequeueProcessing: true,
removeFromWorkerQueue: false,
});
},
{ concurrency: 5 }
);

return {
queue,
runIds,
completedRunIds: completedRuns.map((r) => r.id),
dryRun,
action: "ignore",
runId,
snapshotStatus: snapshot.executionStatus,
snapshotId: snapshot.id,
};
}

Expand Down Expand Up @@ -1650,6 +1652,117 @@ export class RunEngine {
});
}

async #handleRepairSnapshot({
runId,
snapshotId,
executionStatus,
}: {
runId: string;
snapshotId: string;
executionStatus: string;
}) {
return await this.runLock.lock("handleRepairSnapshot", [runId], async () => {
const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId);

if (latestSnapshot.id !== snapshotId) {
this.logger.log(
"RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.",
{
runId,
snapshotId,
latestSnapshotExecutionStatus: latestSnapshot.executionStatus,
repairExecutionStatus: executionStatus,
}
);

return;
}

// Okay, so this means we haven't transitioned to a new status yes, so we need to do something
switch (latestSnapshot.executionStatus) {
case "EXECUTING":
case "EXECUTING_WITH_WAITPOINTS":
case "FINISHED":
case "PENDING_CANCEL":
case "PENDING_EXECUTING":
case "QUEUED_EXECUTING":
case "RUN_CREATED": {
// Do nothing;
return;
}
case "QUEUED": {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", {
runId,
snapshotId,
});

//it will automatically be requeued X times depending on the queue retry settings
const gotRequeued = await this.runQueue.nackMessage({
orgId: latestSnapshot.organizationId,
messageId: runId,
});

if (!gotRequeued) {
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", {
runId,
snapshot: latestSnapshot,
});
} else {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", {
runId,
snapshot: latestSnapshot,
});
}

break;
}
case "SUSPENDED": {
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", {
runId,
snapshotId,
});

const taskRun = await this.prisma.taskRun.findFirst({
where: { id: runId },
select: {
queue: true,
},
});

if (!taskRun) {
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", {
runId,
snapshotId,
});
return;
}

// We need to clear this run from the current concurrency sets
await this.runQueue.clearMessageFromConcurrencySets({
runId,
orgId: latestSnapshot.organizationId,
queue: taskRun.queue,
env: {
id: latestSnapshot.environmentId,
type: latestSnapshot.environmentType,
project: {
id: latestSnapshot.projectId,
},
organization: {
id: latestSnapshot.organizationId,
},
},
});

break;
}
default: {
assertNever(latestSnapshot.executionStatus);
}
}
});
}
Comment on lines 1655 to 1764
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Handle FINISHED runs: attempt ack or clear sets

Currently FINISHED is grouped under a “do nothing” branch. Add explicit FINISHED handling to ack the message if present, else clear concurrency sets, to actually repair stuck finished runs.

-      switch (latestSnapshot.executionStatus) {
-        case "EXECUTING":
-        case "EXECUTING_WITH_WAITPOINTS":
-        case "FINISHED":
+      switch (latestSnapshot.executionStatus) {
+        case "EXECUTING":
+        case "EXECUTING_WITH_WAITPOINTS":
         case "PENDING_CANCEL":
         case "PENDING_EXECUTING":
         case "QUEUED_EXECUTING":
         case "RUN_CREATED": {
           // Do nothing;
           return;
         }
         case "QUEUED": {
           ...
           break;
         }
+        case "FINISHED": {
+          this.logger.log("RunEngine.handleRepairSnapshot FINISHED", {
+            runId,
+            snapshotId,
+          });
+
+          // If the message still exists, ack to fully clean up queue and concurrency artifacts.
+          const hasMessage = await this.runQueue.messageExists(
+            latestSnapshot.organizationId,
+            runId
+          );
+          if (hasMessage) {
+            await this.runQueue.acknowledgeMessage(latestSnapshot.organizationId, runId, {
+              skipDequeueProcessing: true,
+              removeFromWorkerQueue: true,
+            });
+            break;
+          }
+
+          // Fallback: ensure concurrency sets are cleared if the message is gone.
+          const taskRun = await this.prisma.taskRun.findFirst({
+            where: { id: runId },
+            select: { queue: true },
+          });
+          if (!taskRun) {
+            this.logger.error("RunEngine.handleRepairSnapshot FINISHED task run not found", {
+              runId,
+              snapshotId,
+            });
+            return;
+          }
+          await this.runQueue.clearMessageFromConcurrencySets({
+            runId,
+            orgId: latestSnapshot.organizationId,
+            queue: taskRun.queue,
+            env: {
+              id: latestSnapshot.environmentId,
+              type: latestSnapshot.environmentType,
+              project: { id: latestSnapshot.projectId },
+              organization: { id: latestSnapshot.organizationId },
+            },
+          });
+          break;
+        }
         case "SUSPENDED": {
           ...
           break;
         }
         default: {
           assertNever(latestSnapshot.executionStatus);
         }
       }

This makes FINISHED repair effective without relying on the periodic sweeper.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async #handleRepairSnapshot({
runId,
snapshotId,
executionStatus,
}: {
runId: string;
snapshotId: string;
executionStatus: string;
}) {
return await this.runLock.lock("handleRepairSnapshot", [runId], async () => {
const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId);
if (latestSnapshot.id !== snapshotId) {
this.logger.log(
"RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.",
{
runId,
snapshotId,
latestSnapshotExecutionStatus: latestSnapshot.executionStatus,
repairExecutionStatus: executionStatus,
}
);
return;
}
// Okay, so this means we haven't transitioned to a new status yes, so we need to do something
switch (latestSnapshot.executionStatus) {
case "EXECUTING":
case "EXECUTING_WITH_WAITPOINTS":
case "FINISHED":
case "PENDING_CANCEL":
case "PENDING_EXECUTING":
case "QUEUED_EXECUTING":
case "RUN_CREATED": {
// Do nothing;
return;
}
case "QUEUED": {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", {
runId,
snapshotId,
});
//it will automatically be requeued X times depending on the queue retry settings
const gotRequeued = await this.runQueue.nackMessage({
orgId: latestSnapshot.organizationId,
messageId: runId,
});
if (!gotRequeued) {
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", {
runId,
snapshot: latestSnapshot,
});
} else {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", {
runId,
snapshot: latestSnapshot,
});
}
break;
}
case "SUSPENDED": {
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", {
runId,
snapshotId,
});
const taskRun = await this.prisma.taskRun.findFirst({
where: { id: runId },
select: {
queue: true,
},
});
if (!taskRun) {
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", {
runId,
snapshotId,
});
return;
}
// We need to clear this run from the current concurrency sets
await this.runQueue.clearMessageFromConcurrencySets({
runId,
orgId: latestSnapshot.organizationId,
queue: taskRun.queue,
env: {
id: latestSnapshot.environmentId,
type: latestSnapshot.environmentType,
project: {
id: latestSnapshot.projectId,
},
organization: {
id: latestSnapshot.organizationId,
},
},
});
break;
}
default: {
assertNever(latestSnapshot.executionStatus);
}
}
});
}
// Okay, so this means we haven't transitioned to a new status yet, so we need to do something
switch (latestSnapshot.executionStatus) {
case "EXECUTING":
case "EXECUTING_WITH_WAITPOINTS":
case "PENDING_CANCEL":
case "PENDING_EXECUTING":
case "QUEUED_EXECUTING":
case "RUN_CREATED": {
// Do nothing;
return;
}
case "QUEUED": {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", {
runId,
snapshotId,
});
//it will automatically be requeued X times depending on the queue retry settings
const gotRequeued = await this.runQueue.nackMessage({
orgId: latestSnapshot.organizationId,
messageId: runId,
});
if (!gotRequeued) {
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", {
runId,
snapshot: latestSnapshot,
});
} else {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", {
runId,
snapshot: latestSnapshot,
});
}
break;
}
case "FINISHED": {
this.logger.log("RunEngine.handleRepairSnapshot FINISHED", {
runId,
snapshotId,
});
// If the message still exists, ack to fully clean up queue and concurrency artifacts.
const hasMessage = await this.runQueue.messageExists(
latestSnapshot.organizationId,
runId
);
if (hasMessage) {
await this.runQueue.acknowledgeMessage(latestSnapshot.organizationId, runId, {
skipDequeueProcessing: true,
removeFromWorkerQueue: true,
});
break;
}
// Fallback: ensure concurrency sets are cleared if the message is gone.
const taskRun = await this.prisma.taskRun.findFirst({
where: { id: runId },
select: { queue: true },
});
if (!taskRun) {
this.logger.error("RunEngine.handleRepairSnapshot FINISHED task run not found", {
runId,
snapshotId,
});
return;
}
await this.runQueue.clearMessageFromConcurrencySets({
runId,
orgId: latestSnapshot.organizationId,
queue: taskRun.queue,
env: {
id: latestSnapshot.environmentId,
type: latestSnapshot.environmentType,
project: { id: latestSnapshot.projectId },
organization: { id: latestSnapshot.organizationId },
},
});
break;
}
case "SUSPENDED": {
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", {
runId,
snapshotId,
});
const taskRun = await this.prisma.taskRun.findFirst({
where: { id: runId },
select: {
queue: true,
},
});
if (!taskRun) {
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", {
runId,
snapshotId,
});
return;
}
// We need to clear this run from the current concurrency sets
await this.runQueue.clearMessageFromConcurrencySets({
runId,
orgId: latestSnapshot.organizationId,
queue: taskRun.queue,
env: {
id: latestSnapshot.environmentId,
type: latestSnapshot.environmentType,
project: {
id: latestSnapshot.projectId,
},
organization: {
id: latestSnapshot.organizationId,
},
},
});
break;
}
default: {
assertNever(latestSnapshot.executionStatus);
}
}
🤖 Prompt for AI Agents
internal-packages/run-engine/src/engine/index.ts around lines 1655 to 1764: the
FINISHED case is currently lumped into the "do nothing" branch; implement
explicit FINISHED handling that first tries to ack the queue message (await
this.runQueue.ackMessage({ orgId: latestSnapshot.organizationId, messageId:
runId })) and if ack returns false fall back to clearing the run from
concurrency sets (fetch taskRun.queue like in SUSPENDED, log and return if not
found, then call this.runQueue.clearMessageFromConcurrencySets with the same env
shape used in SUSPENDED); log success/failure paths similarly to the
QUEUED/SUSPENDED branches.


async #concurrencySweeperCallback(
runIds: string[],
completedAtOffsetMs: number = 1000 * 60 * 10
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export type RunEngineOptions = {
/** If not set then checkpoints won't ever be used */
retryWarmStartThresholdMs?: number;
heartbeatTimeoutsMs?: Partial<HeartbeatTimeouts>;
repairSnapshotTimeoutMs?: number;
treatProductionExecutionStallsAsOOM?: boolean;
suspendedHeartbeatRetriesConfig?: {
maxCount?: number;
Expand Down
8 changes: 8 additions & 0 deletions internal-packages/run-engine/src/engine/workerCatalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ export const workerCatalog = {
}),
visibilityTimeoutMs: 30_000,
},
repairSnapshot: {
schema: z.object({
runId: z.string(),
snapshotId: z.string(),
executionStatus: z.string(),
}),
visibilityTimeoutMs: 30_000,
},
expireRun: {
schema: z.object({
runId: z.string(),
Expand Down
Loading
Loading