Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 192 additions & 133 deletions internal-packages/run-engine/src/engine/systems/waitpointSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -499,190 +499,249 @@ export class WaitpointSystem {
runId,
});

// 1. Get the any blocking waitpoints
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
where: { taskRunId: runId },
select: {
id: true,
batchId: true,
batchIndex: true,
waitpoint: {
select: { id: true, status: true },
},
},
});

await this.$.raceSimulationSystem.waitForRacepoint({ runId });

// 2. There are blockers still, so do nothing
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
runId,
blockingWaitpoints,
return await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
// 1. Get the any blocking waitpoints
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
where: { taskRunId: runId },
select: {
id: true,
batchId: true,
batchIndex: true,
waitpoint: {
select: { id: true, status: true },
},
},
});
return "blocked";
}

// 3. Get the run with environment
const run = await this.$.prisma.taskRun.findFirst({
where: {
id: runId,
},
include: {
runtimeEnvironment: {
select: {
id: true,
type: true,
maximumConcurrencyLimit: true,
project: { select: { id: true } },
organization: { select: { id: true } },
// 2. There are blockers still, so do nothing
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
runId,
blockingWaitpoints,
});
return "blocked";
}

// 3. Get the run with environment
const run = await this.$.prisma.taskRun.findFirst({
where: {
id: runId,
},
include: {
runtimeEnvironment: {
select: {
id: true,
type: true,
maximumConcurrencyLimit: true,
project: { select: { id: true } },
organization: { select: { id: true } },
},
},
},
},
});

if (!run) {
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
runId,
});
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
}

//4. Continue the run whether it's executing or not
await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);

if (isFinishedOrPendingFinished(snapshot.executionStatus)) {
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
if (!run) {
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
runId,
snapshot,
});
return "skipped";
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
}

//run is still executing, send a message to the worker
if (isExecuting(snapshot.executionStatus)) {
const result = await this.$.runQueue.reacquireConcurrency(
run.runtimeEnvironment.organization.id,
runId
);

if (result) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
this.$.prisma,
{
run: {
id: runId,
status: snapshot.runStatus,
attemptNumber: snapshot.attemptNumber,
},
//4. Continue the run whether it's executing or not
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);

switch (snapshot.executionStatus) {
case "RUN_CREATED": {
this.$.logger.info(`continueRunIfUnblocked: run is run created, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "QUEUED": {
this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "PENDING_EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "QUEUED_EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "EXECUTING": {
this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});

return "skipped";
}
case "PENDING_CANCEL":
case "FINISHED": {
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
runId,
snapshot,
executionStatus: snapshot.executionStatus,
});
return "skipped";
}
case "EXECUTING_WITH_WAITPOINTS": {
const result = await this.$.runQueue.reacquireConcurrency(
run.runtimeEnvironment.organization.id,
runId
);

if (result) {
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
this.$.prisma,
{
run: {
id: runId,
status: snapshot.runStatus,
attemptNumber: snapshot.attemptNumber,
},
snapshot: {
executionStatus: "EXECUTING",
description: "Run was continued, whilst still executing.",
},
previousSnapshotId: snapshot.id,
environmentId: snapshot.environmentId,
environmentType: snapshot.environmentType,
projectId: snapshot.projectId,
organizationId: snapshot.organizationId,
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
}
);

await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);

this.$.logger.debug(
`continueRunIfUnblocked: run was still executing, sending notification`,
{
runId,
snapshot,
newSnapshot,
}
);

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
} else {
// Because we cannot reacquire the concurrency, we need to enqueue the run again
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
executionStatus: "EXECUTING",
description: "Run was continued, whilst still executing.",
status: "QUEUED_EXECUTING",
description: "Run can continue, but is waiting for concurrency",
},
previousSnapshotId: snapshot.id,
environmentId: snapshot.environmentId,
environmentType: snapshot.environmentType,
projectId: snapshot.projectId,
organizationId: snapshot.organizationId,
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
}
);

await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
});

this.$.logger.debug(
`continueRunIfUnblocked: run was still executing, sending notification`,
{
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
runId,
snapshot,
newSnapshot,
}
);
});
}

await sendNotificationToWorker({
runId,
snapshot: newSnapshot,
eventBus: this.$.eventBus,
});
} else {
// Because we cannot reacquire the concurrency, we need to enqueue the run again
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
break;
}
case "SUSPENDED": {
if (!snapshot.checkpointId) {
this.$.logger.error(`continueRunIfUnblocked: run is suspended, but has no checkpoint`, {
runId,
snapshot,
});
throw new Error(
`continueRunIfUnblocked: run is suspended, but has no checkpoint: ${runId}`
);
}

//put it back in the queue, with the original timestamp (w/ priority)
//this prioritizes dequeuing waiting runs over new runs
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
status: "QUEUED_EXECUTING",
description: "Run can continue, but is waiting for concurrency",
status: "QUEUED",
description: "Run was QUEUED, because all waitpoints are completed",
},
previousSnapshotId: snapshot.id,
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
checkpointId: snapshot.checkpointId ?? undefined,
});

this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
runId,
snapshot,
newSnapshot,
});

break;
}
} else {
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
// TODO: We're screwed, should probably fail the run immediately
this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, {
runId: run.id,
snapshot,
blockingWaitpoints,
});
throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
default: {
throw new Error(
`continueRunIfUnblocked: invalid execution status: ${snapshot.executionStatus}`
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericallam we don't want an assertNever here to catch at build time?

}
}

//put it back in the queue, with the original timestamp (w/ priority)
//this prioritizes dequeuing waiting runs over new runs
const newSnapshot = await this.enqueueSystem.enqueueRun({
run,
env: run.runtimeEnvironment,
snapshot: {
description: "Run was QUEUED, because all waitpoints are completed",
if (blockingWaitpoints.length > 0) {
//5. Remove the blocking waitpoints
await this.$.prisma.taskRunWaitpoint.deleteMany({
where: {
taskRunId: runId,
id: { in: blockingWaitpoints.map((b) => b.id) },
},
batchId: snapshot.batchId ?? undefined,
completedWaitpoints: blockingWaitpoints.map((b) => ({
id: b.waitpoint.id,
index: b.batchIndex ?? undefined,
})),
checkpointId: snapshot.checkpointId ?? undefined,
});

this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
runId,
snapshot,
newSnapshot,
blockingWaitpoints,
});
}
});

if (blockingWaitpoints.length > 0) {
//5. Remove the blocking waitpoints
await this.$.prisma.taskRunWaitpoint.deleteMany({
where: {
taskRunId: runId,
id: { in: blockingWaitpoints.map((b) => b.id) },
},
});

this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
runId,
});
}

return "unblocked";
return "unblocked";
}); // end of runlock
}

public async createRunAssociatedWaitpoint(
Expand Down