Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 191 additions & 133 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { EnqueueSystem } from "./enqueueSystem.js";
import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
import { SystemResources } from "./systems.js";
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";
import { assertNever } from "assert-never";

export type WaitpointSystemOptions = {
resources: SystemResources;
Expand Down Expand Up @@ -499,190 +500,247 @@ export class WaitpointSystem {
runId,
});

// 1. Get the any blocking waitpoints
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
where: { taskRunId: runId },
select: {
id: true,
batchId: true,
batchIndex: true,
waitpoint: {
select: { id: true, status: true },
},
},
});

await this.$.raceSimulationSystem.waitForRacepoint({ runId });

// 2. There are blockers still, so do nothing
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
runId,
blockingWaitpoints,
return await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
// 1. Get the any blocking waitpoints
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
where: { taskRunId: runId },
select: {
id: true,
batchId: true,
batchIndex: true,
waitpoint: {
select: { id: true, status: true },
},
},
});
return "blocked";
}

// 3. Get the run with environment
const run = await this.$.prisma.taskRun.findFirst({
where: {
id: runId,
},
include: {
runtimeEnvironment: {
select: {
id: true,
type: true,
maximumConcurrencyLimit: true,
project: { select: { id: true } },
organization: { select: { id: true } },
// 2. There are blockers still, so do nothing
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
runId,
blockingWaitpoints,
});
return "blocked";
}

// 3. Get the run with environment
const run = await this.$.prisma.taskRun.findFirst({
where: {
id: runId,
},
include: {
runtimeEnvironment: {
select: {
id: true,
type: true,
maximumConcurrencyLimit: true,
project: { select: { id: true } },
organization: { select: { id: true } },
},
},
},
},
});

if (!run) {
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
runId,
});
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
}

//4. Continue the run whether it's executing or not
await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);

if (isFinishedOrPendingFinished(snapshot.executionStatus)) {
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
if (!run) {
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
runId,
snapshot,
});
return "skipped";
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
}

//run is still executing, send a message to the worker
if (isExecuting(snapshot.executionStatus)) {
const result = await this.$.runQueue.reacquireConcurrency(
run.runtimeEnvironment.organization.id,
runId
);

if (result) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
this.$.prisma,
{
run: {
id: runId,
status: snapshot.runStatus,
attemptNumber: snapshot.attemptNumber,
},
//4. Continue the run whether it's executing or not
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);

switch (snapshot.executionStatus) {
case "RUN_CREATED": {
this.$.logger.info(`continueRunIfUnblocked: run is run created, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "QUEUED": {
this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "PENDING_EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "QUEUED_EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "PENDING_CANCEL":
case "FINISHED": {
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});
return "skipped";
}
case "EXECUTING_WITH_WAITPOINTS": {
const result = await this.$.runQueue.reacquireConcurrency(
run.runtimeEnvironment.organization.id,
runId
);

if (result) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
this.$.prisma,
{
run: {
id: runId,
status: snapshot.runStatus,
attemptNumber: snapshot.attemptNumber,
},
snapshot: {
executionStatus: "EXECUTING",
description: "Run was continued, whilst still executing.",
},
previousSnapshotId: snapshot.id,
environmentId: snapshot.environmentId,
environmentType: snapshot.environmentType,
projectId: snapshot.projectId,
organizationId: snapshot.organizationId,
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
}
);

await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);

this.$.logger.debug(
`continueRunIfUnblocked: run was still executing, sending notification`,
{
runId,
snapshot,
newSnapshot,
}
);

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
} else {
// Because we cannot reacquire the concurrency, we need to enqueue the run again
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
executionStatus: "EXECUTING",
description: "Run was continued, whilst still executing.",
status: "QUEUED_EXECUTING",
description: "Run can continue, but is waiting for concurrency",
},
previousSnapshotId: snapshot.id,
environmentId: snapshot.environmentId,
environmentType: snapshot.environmentType,
projectId: snapshot.projectId,
organizationId: snapshot.organizationId,
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
}
);

await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
});

this.$.logger.debug(
`continueRunIfUnblocked: run was still executing, sending notification`,
{
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
runId,
snapshot,
newSnapshot,
}
);
});
}

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
} else {
// Because we cannot reacquire the concurrency, we need to enqueue the run again
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
break;
}
case "SUSPENDED": {
if (!snapshot.checkpointId) {
this.$.logger.error(`continueRunIfUnblocked: run is suspended, but has no checkpoint`, {
runId,
snapshot,
});
throw new Error(
`continueRunIfUnblocked: run is suspended, but has no checkpoint: ${runId}`
);
}

//put it back in the queue, with the original timestamp (w/ priority)
//this prioritizes dequeuing waiting runs over new runs
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
status: "QUEUED_EXECUTING",
description: "Run can continue, but is waiting for concurrency",
status: "QUEUED",
description: "Run was QUEUED, because all waitpoints are completed",
},
previousSnapshotId: snapshot.id,
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
checkpointId: snapshot.checkpointId ?? undefined,
});

this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
runId,
snapshot,
newSnapshot,
});

break;
}
} else {
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
// TODO: We're screwed, should probably fail the run immediately
this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, {
runId: run.id,
snapshot,
blockingWaitpoints,
});
throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
default: {
assertNever(snapshot.executionStatus);
}
}

//put it back in the queue, with the original timestamp (w/ priority)
//this prioritizes dequeuing waiting runs over new runs
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
description: "Run was QUEUED, because all waitpoints are completed",
if (blockingWaitpoints.length > 0) {
//5. Remove the blocking waitpoints
await this.$.prisma.taskRunWaitpoint.deleteMany({
where: {
taskRunId: runId,
id: { in: blockingWaitpoints.map((b) => b.id) },
},
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
checkpointId: snapshot.checkpointId ?? undefined,
});

this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
runId,
snapshot,
newSnapshot,
blockingWaitpoints,
});
}
});

if (blockingWaitpoints.length > 0) {
//5. Remove the blocking waitpoints
await this.$.prisma.taskRunWaitpoint.deleteMany({
where: {
taskRunId: runId,
id: { in: blockingWaitpoints.map((b) => b.id) },
},
});

this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
runId,
});
}

return "unblocked";
return "unblocked";
}); // end of runlock
}

public async createRunAssociatedWaitpoint(
Expand Down