Skip to content

Commit 66cafd2

Browse files
committed
feat(run-engine): ability to repair runs in QUEUED, SUSPENDED, and FINISHED execution status
1 parent 3acdd30 commit 66cafd2

File tree

5 files changed

+264
-52
lines changed

5 files changed

+264
-52
lines changed

apps/webapp/app/routes/admin.api.v1.environments.$environmentId.engine.repair-queues.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,17 @@ export async function action({ request, params }: ActionFunctionArgs) {
8686
const repairResults = await pMap(
8787
queues,
8888
async (queue) => {
89-
return engine.repairQueue(environment, queue.name, parsedBody.dryRun);
89+
const repair = await engine.repairQueue(
90+
environment,
91+
queue.name,
92+
parsedBody.dryRun,
93+
repairEnvironmentResults.runIds
94+
);
95+
96+
return {
97+
queue: queue.name,
98+
...repair,
99+
};
90100
},
91101
{ concurrency: 5 }
92102
);

internal-packages/run-engine/src/engine/index.ts

Lines changed: 164 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export class RunEngine {
7171
private tracer: Tracer;
7272
private meter: Meter;
7373
private heartbeatTimeouts: HeartbeatTimeouts;
74+
private repairSnapshotTimeoutMs: number;
7475

7576
prisma: PrismaClient;
7677
readOnlyPrisma: PrismaReplicaClient;
@@ -191,6 +192,9 @@ export class RunEngine {
191192
heartbeatSnapshot: async ({ payload }) => {
192193
await this.#handleStalledSnapshot(payload);
193194
},
195+
repairSnapshot: async ({ payload }) => {
196+
await this.#handleRepairSnapshot(payload);
197+
},
194198
expireRun: async ({ payload }) => {
195199
await this.ttlSystem.expireRun({ runId: payload.runId });
196200
},
@@ -241,6 +245,8 @@ export class RunEngine {
241245
...(options.heartbeatTimeoutsMs ?? {}),
242246
};
243247

248+
this.repairSnapshotTimeoutMs = options.repairSnapshotTimeoutMs ?? 60_000;
249+
244250
const resources: SystemResources = {
245251
prisma: this.prisma,
246252
worker: this.worker,
@@ -1172,81 +1178,77 @@ export class RunEngine {
11721178
async repairEnvironment(environment: AuthenticatedEnvironment, dryRun: boolean) {
11731179
const runIds = await this.runQueue.getCurrentConcurrencyOfEnvironment(environment);
11741180

1175-
const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);
1181+
return this.#repairRuns(runIds, dryRun);
1182+
}
11761183

1177-
if (dryRun) {
1178-
return {
1179-
runIds,
1180-
completedRunIds: completedRuns.map((r) => r.id),
1181-
dryRun,
1182-
};
1183-
}
1184+
async repairQueue(
1185+
environment: AuthenticatedEnvironment,
1186+
queue: string,
1187+
dryRun: boolean,
1188+
ignoreRunIds: string[]
1189+
) {
1190+
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);
1191+
1192+
const runIdsToRepair = runIds.filter((runId) => !ignoreRunIds.includes(runId));
1193+
1194+
return this.#repairRuns(runIdsToRepair, dryRun);
1195+
}
11841196

1185-
if (completedRuns.length === 0) {
1197+
async #repairRuns(runIds: string[], dryRun: boolean) {
1198+
if (runIds.length === 0) {
11861199
return {
11871200
runIds,
1188-
completedRunIds: [],
1201+
repairs: [],
11891202
dryRun,
11901203
};
11911204
}
11921205

1193-
await pMap(
1194-
completedRuns,
1195-
async (run) => {
1196-
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
1197-
skipDequeueProcessing: true,
1198-
removeFromWorkerQueue: false,
1199-
});
1206+
const repairs = await pMap(
1207+
runIds,
1208+
async (runId) => {
1209+
return this.#repairRun(runId, dryRun);
12001210
},
12011211
{ concurrency: 5 }
12021212
);
12031213

12041214
return {
12051215
runIds,
1206-
completedRunIds: completedRuns.map((r) => r.id),
1216+
repairs,
12071217
dryRun,
12081218
};
12091219
}
12101220

1211-
async repairQueue(environment: AuthenticatedEnvironment, queue: string, dryRun: boolean) {
1212-
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);
1213-
1214-
const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);
1215-
1216-
if (dryRun) {
1217-
return {
1218-
queue,
1219-
runIds,
1220-
completedRunIds: completedRuns.map((r) => r.id),
1221-
dryRun,
1222-
};
1223-
}
1221+
async #repairRun(runId: string, dryRun: boolean) {
1222+
const snapshot = await getLatestExecutionSnapshot(this.prisma, runId);
1223+
1224+
if (
1225+
snapshot.executionStatus === "QUEUED" ||
1226+
snapshot.executionStatus === "SUSPENDED" ||
1227+
snapshot.executionStatus === "FINISHED"
1228+
) {
1229+
if (!dryRun) {
1230+
// Schedule the repair job
1231+
await this.worker.enqueueOnce({
1232+
id: `repair-in-progress-run:${runId}`,
1233+
job: "repairSnapshot",
1234+
payload: { runId, snapshotId: snapshot.id, executionStatus: snapshot.executionStatus },
1235+
availableAt: new Date(Date.now() + this.repairSnapshotTimeoutMs),
1236+
});
1237+
}
12241238

1225-
if (completedRuns.length === 0) {
12261239
return {
1227-
queue,
1228-
runIds,
1229-
completedRunIds: [],
1230-
dryRun,
1240+
action: "repairSnapshot",
1241+
runId,
1242+
snapshotStatus: snapshot.executionStatus,
1243+
snapshotId: snapshot.id,
12311244
};
12321245
}
12331246

1234-
await pMap(
1235-
completedRuns,
1236-
async (run) => {
1237-
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
1238-
skipDequeueProcessing: true,
1239-
removeFromWorkerQueue: false,
1240-
});
1241-
},
1242-
{ concurrency: 5 }
1243-
);
1244-
12451247
return {
1246-
queue,
1247-
runIds,
1248-
completedRunIds: completedRuns.map((r) => r.id),
1249-
dryRun,
1248+
action: "ignore",
1249+
runId,
1250+
snapshotStatus: snapshot.executionStatus,
1251+
snapshotId: snapshot.id,
12501252
};
12511253
}
12521254

@@ -1642,6 +1644,117 @@ export class RunEngine {
16421644
});
16431645
}
16441646

1647+
async #handleRepairSnapshot({
1648+
runId,
1649+
snapshotId,
1650+
executionStatus,
1651+
}: {
1652+
runId: string;
1653+
snapshotId: string;
1654+
executionStatus: string;
1655+
}) {
1656+
return await this.runLock.lock("handleRepairSnapshot", [runId], async () => {
1657+
const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId);
1658+
1659+
if (latestSnapshot.id !== snapshotId) {
1660+
this.logger.log(
1661+
"RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.",
1662+
{
1663+
runId,
1664+
snapshotId,
1665+
latestSnapshotExecutionStatus: latestSnapshot.executionStatus,
1666+
repairExecutionStatus: executionStatus,
1667+
}
1668+
);
1669+
1670+
return;
1671+
}
1672+
1673+
// Okay, so this means we haven't transitioned to a new status yes, so we need to do something
1674+
switch (latestSnapshot.executionStatus) {
1675+
case "EXECUTING":
1676+
case "EXECUTING_WITH_WAITPOINTS":
1677+
case "FINISHED":
1678+
case "PENDING_CANCEL":
1679+
case "PENDING_EXECUTING":
1680+
case "QUEUED_EXECUTING":
1681+
case "RUN_CREATED": {
1682+
// Do nothing;
1683+
return;
1684+
}
1685+
case "QUEUED": {
1686+
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", {
1687+
runId,
1688+
snapshotId,
1689+
});
1690+
1691+
//it will automatically be requeued X times depending on the queue retry settings
1692+
const gotRequeued = await this.runQueue.nackMessage({
1693+
orgId: latestSnapshot.organizationId,
1694+
messageId: runId,
1695+
});
1696+
1697+
if (!gotRequeued) {
1698+
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", {
1699+
runId,
1700+
snapshot: latestSnapshot,
1701+
});
1702+
} else {
1703+
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", {
1704+
runId,
1705+
snapshot: latestSnapshot,
1706+
});
1707+
}
1708+
1709+
break;
1710+
}
1711+
case "SUSPENDED": {
1712+
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED", {
1713+
runId,
1714+
snapshotId,
1715+
});
1716+
1717+
const taskRun = await this.prisma.taskRun.findFirst({
1718+
where: { id: runId },
1719+
select: {
1720+
queue: true,
1721+
},
1722+
});
1723+
1724+
if (!taskRun) {
1725+
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED task run not found", {
1726+
runId,
1727+
snapshotId,
1728+
});
1729+
return;
1730+
}
1731+
1732+
// We need to clear this run from the current concurrency sets
1733+
await this.runQueue.clearMessageFromConcurrencySets({
1734+
runId,
1735+
orgId: latestSnapshot.organizationId,
1736+
queue: taskRun.queue,
1737+
env: {
1738+
id: latestSnapshot.environmentId,
1739+
type: latestSnapshot.environmentType,
1740+
project: {
1741+
id: latestSnapshot.projectId,
1742+
},
1743+
organization: {
1744+
id: latestSnapshot.organizationId,
1745+
},
1746+
},
1747+
});
1748+
1749+
break;
1750+
}
1751+
default: {
1752+
assertNever(latestSnapshot.executionStatus);
1753+
}
1754+
}
1755+
});
1756+
}
1757+
16451758
async #concurrencySweeperCallback(
16461759
runIds: string[],
16471760
completedAtOffsetMs: number = 1000 * 60 * 10

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export type RunEngineOptions = {
7171
/** If not set then checkpoints won't ever be used */
7272
retryWarmStartThresholdMs?: number;
7373
heartbeatTimeoutsMs?: Partial<HeartbeatTimeouts>;
74+
repairSnapshotTimeoutMs?: number;
7475
treatProductionExecutionStallsAsOOM?: boolean;
7576
suspendedHeartbeatRetriesConfig?: {
7677
maxCount?: number;

internal-packages/run-engine/src/engine/workerCatalog.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ export const workerCatalog = {
1616
}),
1717
visibilityTimeoutMs: 30_000,
1818
},
19+
repairSnapshot: {
20+
schema: z.object({
21+
runId: z.string(),
22+
snapshotId: z.string(),
23+
executionStatus: z.string(),
24+
}),
25+
visibilityTimeoutMs: 30_000,
26+
},
1927
expireRun: {
2028
schema: z.object({
2129
runId: z.string(),

0 commit comments

Comments
 (0)