Skip to content

Commit e3049c8

Browse files
committed
Eliminate race conditions in continueRunIfUnblocked to prevent stuck runs
1 parent 08d84eb commit e3049c8

File tree

1 file changed

+192
-133
lines changed

1 file changed

+192
-133
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 192 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -499,190 +499,249 @@ export class WaitpointSystem {
499499
runId,
500500
});
501501

502-
// 1. Get the any blocking waitpoints
503-
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
504-
where: { taskRunId: runId },
505-
select: {
506-
id: true,
507-
batchId: true,
508-
batchIndex: true,
509-
waitpoint: {
510-
select: { id: true, status: true },
511-
},
512-
},
513-
});
514-
515502
await this.$.raceSimulationSystem.waitForRacepoint({ runId });
516503

517-
// 2. There are blockers still, so do nothing
518-
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
519-
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
520-
runId,
521-
blockingWaitpoints,
504+
return await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
505+
// 1. Get the any blocking waitpoints
506+
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
507+
where: { taskRunId: runId },
508+
select: {
509+
id: true,
510+
batchId: true,
511+
batchIndex: true,
512+
waitpoint: {
513+
select: { id: true, status: true },
514+
},
515+
},
522516
});
523-
return "blocked";
524-
}
525517

526-
// 3. Get the run with environment
527-
const run = await this.$.prisma.taskRun.findFirst({
528-
where: {
529-
id: runId,
530-
},
531-
include: {
532-
runtimeEnvironment: {
533-
select: {
534-
id: true,
535-
type: true,
536-
maximumConcurrencyLimit: true,
537-
project: { select: { id: true } },
538-
organization: { select: { id: true } },
518+
// 2. There are blockers still, so do nothing
519+
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
520+
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
521+
runId,
522+
blockingWaitpoints,
523+
});
524+
return "blocked";
525+
}
526+
527+
// 3. Get the run with environment
528+
const run = await this.$.prisma.taskRun.findFirst({
529+
where: {
530+
id: runId,
531+
},
532+
include: {
533+
runtimeEnvironment: {
534+
select: {
535+
id: true,
536+
type: true,
537+
maximumConcurrencyLimit: true,
538+
project: { select: { id: true } },
539+
organization: { select: { id: true } },
540+
},
539541
},
540542
},
541-
},
542-
});
543-
544-
if (!run) {
545-
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
546-
runId,
547543
});
548-
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
549-
}
550544

551-
//4. Continue the run whether it's executing or not
552-
await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
553-
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);
554-
555-
if (isFinishedOrPendingFinished(snapshot.executionStatus)) {
556-
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
545+
if (!run) {
546+
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
557547
runId,
558-
snapshot,
559548
});
560-
return "skipped";
549+
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
561550
}
562551

563-
//run is still executing, send a message to the worker
564-
if (isExecuting(snapshot.executionStatus)) {
565-
const result = await this.$.runQueue.reacquireConcurrency(
566-
run.runtimeEnvironment.organization.id,
567-
runId
568-
);
569-
570-
if (result) {
571-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
572-
this.$.prisma,
573-
{
574-
run: {
575-
id: runId,
576-
status: snapshot.runStatus,
577-
attemptNumber: snapshot.attemptNumber,
578-
},
552+
//4. Continue the run whether it's executing or not
553+
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);
554+
555+
switch (snapshot.executionStatus) {
556+
case "RUN_CREATED": {
557+
this.$.logger.info(`continueRunIfUnblocked: run is run created, skipping`, {
558+
runId,
559+
snapshot,
560+
executionStatus: snapshot.executionStatus,
561+
});
562+
563+
return "skipped";
564+
}
565+
case "QUEUED": {
566+
this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, {
567+
runId,
568+
snapshot,
569+
executionStatus: snapshot.executionStatus,
570+
});
571+
572+
return "skipped";
573+
}
574+
case "PENDING_EXECUTING": {
575+
this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, {
576+
runId,
577+
snapshot,
578+
executionStatus: snapshot.executionStatus,
579+
});
580+
581+
return "skipped";
582+
}
583+
case "QUEUED_EXECUTING": {
584+
this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, {
585+
runId,
586+
snapshot,
587+
executionStatus: snapshot.executionStatus,
588+
});
589+
590+
return "skipped";
591+
}
592+
case "EXECUTING": {
593+
this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, {
594+
runId,
595+
snapshot,
596+
executionStatus: snapshot.executionStatus,
597+
});
598+
599+
return "skipped";
600+
}
601+
case "PENDING_CANCEL":
602+
case "FINISHED": {
603+
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
604+
runId,
605+
snapshot,
606+
executionStatus: snapshot.executionStatus,
607+
});
608+
return "skipped";
609+
}
610+
case "EXECUTING_WITH_WAITPOINTS": {
611+
const result = await this.$.runQueue.reacquireConcurrency(
612+
run.runtimeEnvironment.organization.id,
613+
runId
614+
);
615+
616+
if (result) {
617+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
618+
this.$.prisma,
619+
{
620+
run: {
621+
id: runId,
622+
status: snapshot.runStatus,
623+
attemptNumber: snapshot.attemptNumber,
624+
},
625+
snapshot: {
626+
executionStatus: "EXECUTING",
627+
description: "Run was continued, whilst still executing.",
628+
},
629+
previousSnapshotId: snapshot.id,
630+
environmentId: snapshot.environmentId,
631+
environmentType: snapshot.environmentType,
632+
projectId: snapshot.projectId,
633+
organizationId: snapshot.organizationId,
634+
batchId: snapshot.batchId ?? undefined,
635+
completedWaitpoints: blockingWaitpoints.map((b) => ({
636+
id: b.waitpoint.id,
637+
index: b.batchIndex ?? undefined,
638+
})),
639+
}
640+
);
641+
642+
await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
643+
644+
this.$.logger.debug(
645+
`continueRunIfUnblocked: run was still executing, sending notification`,
646+
{
647+
runId,
648+
snapshot,
649+
newSnapshot,
650+
}
651+
);
652+
653+
await sendNotificationToWorker({
654+
runId,
655+
snapshot: newSnapshot,
656+
eventBus: this.$.eventBus,
657+
});
658+
} else {
659+
// Because we cannot reacquire the concurrency, we need to enqueue the run again
660+
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
661+
const newSnapshot = await this.enqueueSystem.enqueueRun({
662+
run,
663+
env: run.runtimeEnvironment,
579664
snapshot: {
580-
executionStatus: "EXECUTING",
581-
description: "Run was continued, whilst still executing.",
665+
status: "QUEUED_EXECUTING",
666+
description: "Run can continue, but is waiting for concurrency",
582667
},
583668
previousSnapshotId: snapshot.id,
584-
environmentId: snapshot.environmentId,
585-
environmentType: snapshot.environmentType,
586-
projectId: snapshot.projectId,
587-
organizationId: snapshot.organizationId,
588669
batchId: snapshot.batchId ?? undefined,
589670
completedWaitpoints: blockingWaitpoints.map((b) => ({
590671
id: b.waitpoint.id,
591672
index: b.batchIndex ?? undefined,
592673
})),
593-
}
594-
);
595-
596-
await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
674+
});
597675

598-
this.$.logger.debug(
599-
`continueRunIfUnblocked: run was still executing, sending notification`,
600-
{
676+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
601677
runId,
602678
snapshot,
603679
newSnapshot,
604-
}
605-
);
680+
});
681+
}
606682

607-
await sendNotificationToWorker({
608-
runId,
609-
snapshot: newSnapshot,
610-
eventBus: this.$.eventBus,
611-
});
612-
} else {
613-
// Because we cannot reacquire the concurrency, we need to enqueue the run again
614-
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
683+
break;
684+
}
685+
case "SUSPENDED": {
686+
if (!snapshot.checkpointId) {
687+
this.$.logger.error(`continueRunIfUnblocked: run is suspended, but has no checkpoint`, {
688+
runId,
689+
snapshot,
690+
});
691+
throw new Error(
692+
`continueRunIfUnblocked: run is suspended, but has no checkpoint: ${runId}`
693+
);
694+
}
695+
696+
//put it back in the queue, with the original timestamp (w/ priority)
697+
//this prioritizes dequeuing waiting runs over new runs
615698
const newSnapshot = await this.enqueueSystem.enqueueRun({
616699
run,
617700
env: run.runtimeEnvironment,
618701
snapshot: {
619-
status: "QUEUED_EXECUTING",
620-
description: "Run can continue, but is waiting for concurrency",
702+
status: "QUEUED",
703+
description: "Run was QUEUED, because all waitpoints are completed",
621704
},
622-
previousSnapshotId: snapshot.id,
623705
batchId: snapshot.batchId ?? undefined,
624706
completedWaitpoints: blockingWaitpoints.map((b) => ({
625707
id: b.waitpoint.id,
626708
index: b.batchIndex ?? undefined,
627709
})),
710+
checkpointId: snapshot.checkpointId ?? undefined,
628711
});
629712

630-
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
713+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
631714
runId,
632715
snapshot,
633716
newSnapshot,
634717
});
718+
719+
break;
635720
}
636-
} else {
637-
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
638-
// TODO: We're screwed, should probably fail the run immediately
639-
this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, {
640-
runId: run.id,
641-
snapshot,
642-
blockingWaitpoints,
643-
});
644-
throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
721+
default: {
722+
throw new Error(
723+
`continueRunIfUnblocked: invalid execution status: ${snapshot.executionStatus}`
724+
);
645725
}
726+
}
646727

647-
//put it back in the queue, with the original timestamp (w/ priority)
648-
//this prioritizes dequeuing waiting runs over new runs
649-
const newSnapshot = await this.enqueueSystem.enqueueRun({
650-
run,
651-
env: run.runtimeEnvironment,
652-
snapshot: {
653-
description: "Run was QUEUED, because all waitpoints are completed",
728+
if (blockingWaitpoints.length > 0) {
729+
//5. Remove the blocking waitpoints
730+
await this.$.prisma.taskRunWaitpoint.deleteMany({
731+
where: {
732+
taskRunId: runId,
733+
id: { in: blockingWaitpoints.map((b) => b.id) },
654734
},
655-
batchId: snapshot.batchId ?? undefined,
656-
completedWaitpoints: blockingWaitpoints.map((b) => ({
657-
id: b.waitpoint.id,
658-
index: b.batchIndex ?? undefined,
659-
})),
660-
checkpointId: snapshot.checkpointId ?? undefined,
661735
});
662736

663-
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
737+
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
664738
runId,
665-
snapshot,
666-
newSnapshot,
739+
blockingWaitpoints,
667740
});
668741
}
669-
});
670-
671-
if (blockingWaitpoints.length > 0) {
672-
//5. Remove the blocking waitpoints
673-
await this.$.prisma.taskRunWaitpoint.deleteMany({
674-
where: {
675-
taskRunId: runId,
676-
id: { in: blockingWaitpoints.map((b) => b.id) },
677-
},
678-
});
679-
680-
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
681-
runId,
682-
});
683-
}
684742

685-
return "unblocked";
743+
return "unblocked";
744+
}); // end of runlock
686745
}
687746

688747
public async createRunAssociatedWaitpoint(

0 commit comments

Comments
 (0)