diff --git a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts index 52a8094e4b..0ddd3c33bb 100644 --- a/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/waitpointSystem.ts @@ -15,6 +15,7 @@ import { EnqueueSystem } from "./enqueueSystem.js"; import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js"; +import { assertNever } from "assert-never"; export type WaitpointSystemOptions = { resources: SystemResources; @@ -499,190 +500,247 @@ export class WaitpointSystem { runId, }); - // 1. Get the any blocking waitpoints - const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({ - where: { taskRunId: runId }, - select: { - id: true, - batchId: true, - batchIndex: true, - waitpoint: { - select: { id: true, status: true }, - }, - }, - }); - await this.$.raceSimulationSystem.waitForRacepoint({ runId }); - // 2. There are blockers still, so do nothing - if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) { - this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, { - runId, - blockingWaitpoints, + return await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => { + // 1. Get the any blocking waitpoints + const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({ + where: { taskRunId: runId }, + select: { + id: true, + batchId: true, + batchIndex: true, + waitpoint: { + select: { id: true, status: true }, + }, + }, }); - return "blocked"; - } - // 3. Get the run with environment - const run = await this.$.prisma.taskRun.findFirst({ - where: { - id: runId, - }, - include: { - runtimeEnvironment: { - select: { - id: true, - type: true, - maximumConcurrencyLimit: true, - project: { select: { id: true } }, - organization: { select: { id: true } }, + // 2. There are blockers still, so do nothing + if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) { + this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, { + runId, + blockingWaitpoints, + }); + return "blocked"; + } + + // 3. Get the run with environment + const run = await this.$.prisma.taskRun.findFirst({ + where: { + id: runId, + }, + include: { + runtimeEnvironment: { + select: { + id: true, + type: true, + maximumConcurrencyLimit: true, + project: { select: { id: true } }, + organization: { select: { id: true } }, + }, }, }, - }, - }); - - if (!run) { - this.$.logger.error(`continueRunIfUnblocked: run not found`, { - runId, }); - throw new Error(`continueRunIfUnblocked: run not found: ${runId}`); - } - //4. Continue the run whether it's executing or not - await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => { - const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); - - if (isFinishedOrPendingFinished(snapshot.executionStatus)) { - this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, { + if (!run) { + this.$.logger.error(`continueRunIfUnblocked: run not found`, { runId, - snapshot, }); - return "skipped"; + throw new Error(`continueRunIfUnblocked: run not found: ${runId}`); } - //run is still executing, send a message to the worker - if (isExecuting(snapshot.executionStatus)) { - const result = await this.$.runQueue.reacquireConcurrency( - run.runtimeEnvironment.organization.id, - runId - ); - - if (result) { - const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( - this.$.prisma, - { - run: { - id: runId, - status: snapshot.runStatus, - attemptNumber: snapshot.attemptNumber, - }, + //4. Continue the run whether it's executing or not + const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId); + + switch (snapshot.executionStatus) { + case "RUN_CREATED": { + this.$.logger.info(`continueRunIfUnblocked: run is run created, skipping`, { + runId, + snapshot, + executionStatus: snapshot.executionStatus, + }); + + return "skipped"; + } + case "QUEUED": { + this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, { + runId, + snapshot, + executionStatus: snapshot.executionStatus, + }); + + return "skipped"; + } + case "PENDING_EXECUTING": { + this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, { + runId, + snapshot, + executionStatus: snapshot.executionStatus, + }); + + return "skipped"; + } + case "QUEUED_EXECUTING": { + this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, { + runId, + snapshot, + executionStatus: snapshot.executionStatus, + }); + + return "skipped"; + } + case "EXECUTING": { + this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, { + runId, + snapshot, + executionStatus: snapshot.executionStatus, + }); + + return "skipped"; + } + case "PENDING_CANCEL": + case "FINISHED": { + this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, { + runId, + snapshot, + executionStatus: snapshot.executionStatus, + }); + return "skipped"; + } + case "EXECUTING_WITH_WAITPOINTS": { + const result = await this.$.runQueue.reacquireConcurrency( + run.runtimeEnvironment.organization.id, + runId + ); + + if (result) { + const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot( + this.$.prisma, + { + run: { + id: runId, + status: snapshot.runStatus, + attemptNumber: snapshot.attemptNumber, + }, + snapshot: { + executionStatus: "EXECUTING", + description: "Run was continued, whilst still executing.", + }, + previousSnapshotId: snapshot.id, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + batchId: snapshot.batchId ?? undefined, + completedWaitpoints: blockingWaitpoints.map((b) => ({ + id: b.waitpoint.id, + index: b.batchIndex ?? undefined, + })), + } + ); + + await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot); + + this.$.logger.debug( + `continueRunIfUnblocked: run was still executing, sending notification`, + { + runId, + snapshot, + newSnapshot, + } + ); + + await sendNotificationToWorker({ + runId, + snapshot: newSnapshot, + eventBus: this.$.eventBus, + }); + } else { + // Because we cannot reacquire the concurrency, we need to enqueue the run again + // and because the run is still executing, we need to set the status to QUEUED_EXECUTING + const newSnapshot = await this.enqueueSystem.enqueueRun({ + run, + env: run.runtimeEnvironment, snapshot: { - executionStatus: "EXECUTING", - description: "Run was continued, whilst still executing.", + status: "QUEUED_EXECUTING", + description: "Run can continue, but is waiting for concurrency", }, previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, batchId: snapshot.batchId ?? undefined, completedWaitpoints: blockingWaitpoints.map((b) => ({ id: b.waitpoint.id, index: b.batchIndex ?? undefined, })), - } - ); - - await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot); + }); - this.$.logger.debug( - `continueRunIfUnblocked: run was still executing, sending notification`, - { + this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, { runId, snapshot, newSnapshot, - } - ); + }); + } - await sendNotificationToWorker({ - runId, - snapshot: newSnapshot, - eventBus: this.$.eventBus, - }); - } else { - // Because we cannot reacquire the concurrency, we need to enqueue the run again - // and because the run is still executing, we need to set the status to QUEUED_EXECUTING + break; + } + case "SUSPENDED": { + if (!snapshot.checkpointId) { + this.$.logger.error(`continueRunIfUnblocked: run is suspended, but has no checkpoint`, { + runId, + snapshot, + }); + throw new Error( + `continueRunIfUnblocked: run is suspended, but has no checkpoint: ${runId}` + ); + } + + //put it back in the queue, with the original timestamp (w/ priority) + //this prioritizes dequeuing waiting runs over new runs const newSnapshot = await this.enqueueSystem.enqueueRun({ run, env: run.runtimeEnvironment, snapshot: { - status: "QUEUED_EXECUTING", - description: "Run can continue, but is waiting for concurrency", + status: "QUEUED", + description: "Run was QUEUED, because all waitpoints are completed", }, - previousSnapshotId: snapshot.id, batchId: snapshot.batchId ?? undefined, completedWaitpoints: blockingWaitpoints.map((b) => ({ id: b.waitpoint.id, index: b.batchIndex ?? undefined, })), + checkpointId: snapshot.checkpointId ?? undefined, }); - this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, { + this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, { runId, snapshot, newSnapshot, }); + + break; } - } else { - if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) { - // TODO: We're screwed, should probably fail the run immediately - this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, { - runId: run.id, - snapshot, - blockingWaitpoints, - }); - throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`); + default: { + assertNever(snapshot.executionStatus); } + } - //put it back in the queue, with the original timestamp (w/ priority) - //this prioritizes dequeuing waiting runs over new runs - const newSnapshot = await this.enqueueSystem.enqueueRun({ - run, - env: run.runtimeEnvironment, - snapshot: { - description: "Run was QUEUED, because all waitpoints are completed", + if (blockingWaitpoints.length > 0) { + //5. Remove the blocking waitpoints + await this.$.prisma.taskRunWaitpoint.deleteMany({ + where: { + taskRunId: runId, + id: { in: blockingWaitpoints.map((b) => b.id) }, }, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: blockingWaitpoints.map((b) => ({ - id: b.waitpoint.id, - index: b.batchIndex ?? undefined, - })), - checkpointId: snapshot.checkpointId ?? undefined, }); - this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, { + this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { runId, - snapshot, - newSnapshot, + blockingWaitpoints, }); } - }); - - if (blockingWaitpoints.length > 0) { - //5. Remove the blocking waitpoints - await this.$.prisma.taskRunWaitpoint.deleteMany({ - where: { - taskRunId: runId, - id: { in: blockingWaitpoints.map((b) => b.id) }, - }, - }); - - this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, { - runId, - }); - } - return "unblocked"; + return "unblocked"; + }); // end of runlock } public async createRunAssociatedWaitpoint(