Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,17 @@ export async function action({ request, params }: ActionFunctionArgs) {
const repairResults = await pMap(
queues,
async (queue) => {
return engine.repairQueue(environment, queue.name, parsedBody.dryRun);
const repair = await engine.repairQueue(
environment,
queue.name,
parsedBody.dryRun,
repairEnvironmentResults.runIds
);

return {
queue: queue.name,
...repair,
};
},
{ concurrency: 5 }
);
Expand Down
215 changes: 164 additions & 51 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export class RunEngine {
private tracer: Tracer;
private meter: Meter;
private heartbeatTimeouts: HeartbeatTimeouts;
private repairSnapshotTimeoutMs: number;

prisma: PrismaClient;
readOnlyPrisma: PrismaReplicaClient;
Expand Down Expand Up @@ -191,6 +192,9 @@ export class RunEngine {
heartbeatSnapshot: async ({ payload }) => {
await this.#handleStalledSnapshot(payload);
},
repairSnapshot: async ({ payload }) => {
await this.#handleRepairSnapshot(payload);
},
expireRun: async ({ payload }) => {
await this.ttlSystem.expireRun({ runId: payload.runId });
},
Expand Down Expand Up @@ -241,6 +245,8 @@ export class RunEngine {
...(options.heartbeatTimeoutsMs ?? {}),
};

this.repairSnapshotTimeoutMs = options.repairSnapshotTimeoutMs ?? 60_000;

const resources: SystemResources = {
prisma: this.prisma,
worker: this.worker,
Expand Down Expand Up @@ -1174,81 +1180,77 @@ export class RunEngine {
async repairEnvironment(environment: AuthenticatedEnvironment, dryRun: boolean) {
const runIds = await this.runQueue.getCurrentConcurrencyOfEnvironment(environment);

const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);
return this.#repairRuns(runIds, dryRun);
}

if (dryRun) {
return {
runIds,
completedRunIds: completedRuns.map((r) => r.id),
dryRun,
};
}
async repairQueue(
environment: AuthenticatedEnvironment,
queue: string,
dryRun: boolean,
ignoreRunIds: string[]
) {
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);

const runIdsToRepair = runIds.filter((runId) => !ignoreRunIds.includes(runId));

return this.#repairRuns(runIdsToRepair, dryRun);
}

if (completedRuns.length === 0) {
async #repairRuns(runIds: string[], dryRun: boolean) {
if (runIds.length === 0) {
return {
runIds,
completedRunIds: [],
repairs: [],
dryRun,
};
}

await pMap(
completedRuns,
async (run) => {
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
skipDequeueProcessing: true,
removeFromWorkerQueue: false,
});
const repairs = await pMap(
runIds,
async (runId) => {
return this.#repairRun(runId, dryRun);
},
{ concurrency: 5 }
);

return {
runIds,
completedRunIds: completedRuns.map((r) => r.id),
repairs,
dryRun,
};
}

async repairQueue(environment: AuthenticatedEnvironment, queue: string, dryRun: boolean) {
const runIds = await this.runQueue.getCurrentConcurrencyOfQueue(environment, queue);

const completedRuns = await this.#concurrencySweeperCallback(runIds, 5000);

if (dryRun) {
return {
queue,
runIds,
completedRunIds: completedRuns.map((r) => r.id),
dryRun,
};
}
async #repairRun(runId: string, dryRun: boolean) {
const snapshot = await getLatestExecutionSnapshot(this.prisma, runId);

if (
snapshot.executionStatus === "QUEUED" ||
snapshot.executionStatus === "SUSPENDED" ||
snapshot.executionStatus === "FINISHED"
) {
if (!dryRun) {
// Schedule the repair job
await this.worker.enqueueOnce({
id: `repair-in-progress-run:${runId}`,
job: "repairSnapshot",
payload: { runId, snapshotId: snapshot.id, executionStatus: snapshot.executionStatus },
availableAt: new Date(Date.now() + this.repairSnapshotTimeoutMs),
});
}

if (completedRuns.length === 0) {
return {
queue,
runIds,
completedRunIds: [],
dryRun,
action: "repairSnapshot",
runId,
snapshotStatus: snapshot.executionStatus,
snapshotId: snapshot.id,
};
}

await pMap(
completedRuns,
async (run) => {
await this.runQueue.acknowledgeMessage(run.orgId, run.id, {
skipDequeueProcessing: true,
removeFromWorkerQueue: false,
});
},
{ concurrency: 5 }
);

return {
queue,
runIds,
completedRunIds: completedRuns.map((r) => r.id),
dryRun,
action: "ignore",
runId,
snapshotStatus: snapshot.executionStatus,
snapshotId: snapshot.id,
};
}

Expand Down Expand Up @@ -1650,6 +1652,117 @@ export class RunEngine {
});
}

async #handleRepairSnapshot({
runId,
snapshotId,
executionStatus,
}: {
runId: string;
snapshotId: string;
executionStatus: string;
}) {
return await this.runLock.lock("handleRepairSnapshot", [runId], async () => {
const latestSnapshot = await getLatestExecutionSnapshot(this.prisma, runId);

if (latestSnapshot.id !== snapshotId) {
this.logger.log(
"RunEngine.handleRepairSnapshot no longer the latest snapshot, stopping the repair.",
{
runId,
snapshotId,
latestSnapshotExecutionStatus: latestSnapshot.executionStatus,
repairExecutionStatus: executionStatus,
}
);

return;
}

// Okay, so this means we haven't transitioned to a new status yes, so we need to do something
switch (latestSnapshot.executionStatus) {
case "EXECUTING":
case "EXECUTING_WITH_WAITPOINTS":
case "PENDING_CANCEL":
case "PENDING_EXECUTING":
case "QUEUED_EXECUTING":
case "RUN_CREATED": {
// Do nothing;
return;
}
case "QUEUED": {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED", {
runId,
snapshotId,
});

//it will automatically be requeued X times depending on the queue retry settings
const gotRequeued = await this.runQueue.nackMessage({
orgId: latestSnapshot.organizationId,
messageId: runId,
});

if (!gotRequeued) {
this.logger.error("RunEngine.handleRepairSnapshot QUEUED repair failed", {
runId,
snapshot: latestSnapshot,
});
} else {
this.logger.log("RunEngine.handleRepairSnapshot QUEUED repair successful", {
runId,
snapshot: latestSnapshot,
});
}

break;
}
case "FINISHED":
case "SUSPENDED": {
this.logger.log("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED", {
runId,
snapshotId,
});

const taskRun = await this.prisma.taskRun.findFirst({
where: { id: runId },
select: {
queue: true,
},
});

if (!taskRun) {
this.logger.error("RunEngine.handleRepairSnapshot SUSPENDED/FINISHED task run not found", {
runId,
snapshotId,
});
return;
}

// We need to clear this run from the current concurrency sets
await this.runQueue.clearMessageFromConcurrencySets({
runId,
orgId: latestSnapshot.organizationId,
queue: taskRun.queue,
env: {
id: latestSnapshot.environmentId,
type: latestSnapshot.environmentType,
project: {
id: latestSnapshot.projectId,
},
organization: {
id: latestSnapshot.organizationId,
},
},
});

break;
}
default: {
assertNever(latestSnapshot.executionStatus);
}
}
});
}

async #concurrencySweeperCallback(
runIds: string[],
completedAtOffsetMs: number = 1000 * 60 * 10
Expand Down
1 change: 1 addition & 0 deletions internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export type RunEngineOptions = {
/** If not set then checkpoints won't ever be used */
retryWarmStartThresholdMs?: number;
heartbeatTimeoutsMs?: Partial<HeartbeatTimeouts>;
repairSnapshotTimeoutMs?: number;
treatProductionExecutionStallsAsOOM?: boolean;
suspendedHeartbeatRetriesConfig?: {
maxCount?: number;
Expand Down
8 changes: 8 additions & 0 deletions internal-packages/run-engine/src/engine/workerCatalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ export const workerCatalog = {
}),
visibilityTimeoutMs: 30_000,
},
repairSnapshot: {
schema: z.object({
runId: z.string(),
snapshotId: z.string(),
executionStatus: z.string(),
}),
visibilityTimeoutMs: 30_000,
},
expireRun: {
schema: z.object({
runId: z.string(),
Expand Down
Loading
Loading