Skip to content

Commit 124435e

Browse files
authored
Eliminate race conditions in continueRunIfUnblocked to prevent stuck runs (#2217)
* Eliminate race conditions in continueRunIfUnblocked to prevent stuck runs * Use assertNever for typecheck friendliness
1 parent 4a79b3b commit 124435e

File tree

1 file changed

+191
-133
lines changed

1 file changed

+191
-133
lines changed

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 191 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { EnqueueSystem } from "./enqueueSystem.js";
1515
import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
1616
import { SystemResources } from "./systems.js";
1717
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";
18+
import { assertNever } from "assert-never";
1819

1920
export type WaitpointSystemOptions = {
2021
resources: SystemResources;
@@ -499,190 +500,247 @@ export class WaitpointSystem {
499500
runId,
500501
});
501502

502-
// 1. Get the any blocking waitpoints
503-
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
504-
where: { taskRunId: runId },
505-
select: {
506-
id: true,
507-
batchId: true,
508-
batchIndex: true,
509-
waitpoint: {
510-
select: { id: true, status: true },
511-
},
512-
},
513-
});
514-
515503
await this.$.raceSimulationSystem.waitForRacepoint({ runId });
516504

517-
// 2. There are blockers still, so do nothing
518-
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
519-
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
520-
runId,
521-
blockingWaitpoints,
505+
return await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
506+
// 1. Get the any blocking waitpoints
507+
const blockingWaitpoints = await this.$.prisma.taskRunWaitpoint.findMany({
508+
where: { taskRunId: runId },
509+
select: {
510+
id: true,
511+
batchId: true,
512+
batchIndex: true,
513+
waitpoint: {
514+
select: { id: true, status: true },
515+
},
516+
},
522517
});
523-
return "blocked";
524-
}
525518

526-
// 3. Get the run with environment
527-
const run = await this.$.prisma.taskRun.findFirst({
528-
where: {
529-
id: runId,
530-
},
531-
include: {
532-
runtimeEnvironment: {
533-
select: {
534-
id: true,
535-
type: true,
536-
maximumConcurrencyLimit: true,
537-
project: { select: { id: true } },
538-
organization: { select: { id: true } },
519+
// 2. There are blockers still, so do nothing
520+
if (blockingWaitpoints.some((w) => w.waitpoint.status !== "COMPLETED")) {
521+
this.$.logger.debug(`continueRunIfUnblocked: blocking waitpoints still exist`, {
522+
runId,
523+
blockingWaitpoints,
524+
});
525+
return "blocked";
526+
}
527+
528+
// 3. Get the run with environment
529+
const run = await this.$.prisma.taskRun.findFirst({
530+
where: {
531+
id: runId,
532+
},
533+
include: {
534+
runtimeEnvironment: {
535+
select: {
536+
id: true,
537+
type: true,
538+
maximumConcurrencyLimit: true,
539+
project: { select: { id: true } },
540+
organization: { select: { id: true } },
541+
},
539542
},
540543
},
541-
},
542-
});
543-
544-
if (!run) {
545-
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
546-
runId,
547544
});
548-
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
549-
}
550545

551-
//4. Continue the run whether it's executing or not
552-
await this.$.runLock.lock("continueRunIfUnblocked", [runId], async () => {
553-
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);
554-
555-
if (isFinishedOrPendingFinished(snapshot.executionStatus)) {
556-
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
546+
if (!run) {
547+
this.$.logger.error(`continueRunIfUnblocked: run not found`, {
557548
runId,
558-
snapshot,
559549
});
560-
return "skipped";
550+
throw new Error(`continueRunIfUnblocked: run not found: ${runId}`);
561551
}
562552

563-
//run is still executing, send a message to the worker
564-
if (isExecuting(snapshot.executionStatus)) {
565-
const result = await this.$.runQueue.reacquireConcurrency(
566-
run.runtimeEnvironment.organization.id,
567-
runId
568-
);
569-
570-
if (result) {
571-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
572-
this.$.prisma,
573-
{
574-
run: {
575-
id: runId,
576-
status: snapshot.runStatus,
577-
attemptNumber: snapshot.attemptNumber,
578-
},
553+
//4. Continue the run whether it's executing or not
554+
const snapshot = await getLatestExecutionSnapshot(this.$.prisma, runId);
555+
556+
switch (snapshot.executionStatus) {
557+
case "RUN_CREATED": {
558+
this.$.logger.info(`continueRunIfUnblocked: run is run created, skipping`, {
559+
runId,
560+
snapshot,
561+
executionStatus: snapshot.executionStatus,
562+
});
563+
564+
return "skipped";
565+
}
566+
case "QUEUED": {
567+
this.$.logger.info(`continueRunIfUnblocked: run is queued, skipping`, {
568+
runId,
569+
snapshot,
570+
executionStatus: snapshot.executionStatus,
571+
});
572+
573+
return "skipped";
574+
}
575+
case "PENDING_EXECUTING": {
576+
this.$.logger.info(`continueRunIfUnblocked: run is pending executing, skipping`, {
577+
runId,
578+
snapshot,
579+
executionStatus: snapshot.executionStatus,
580+
});
581+
582+
return "skipped";
583+
}
584+
case "QUEUED_EXECUTING": {
585+
this.$.logger.info(`continueRunIfUnblocked: run is already queued executing, skipping`, {
586+
runId,
587+
snapshot,
588+
executionStatus: snapshot.executionStatus,
589+
});
590+
591+
return "skipped";
592+
}
593+
case "EXECUTING": {
594+
this.$.logger.info(`continueRunIfUnblocked: run is already executing, skipping`, {
595+
runId,
596+
snapshot,
597+
executionStatus: snapshot.executionStatus,
598+
});
599+
600+
return "skipped";
601+
}
602+
case "PENDING_CANCEL":
603+
case "FINISHED": {
604+
this.$.logger.debug(`continueRunIfUnblocked: run is finished, skipping`, {
605+
runId,
606+
snapshot,
607+
executionStatus: snapshot.executionStatus,
608+
});
609+
return "skipped";
610+
}
611+
case "EXECUTING_WITH_WAITPOINTS": {
612+
const result = await this.$.runQueue.reacquireConcurrency(
613+
run.runtimeEnvironment.organization.id,
614+
runId
615+
);
616+
617+
if (result) {
618+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
619+
this.$.prisma,
620+
{
621+
run: {
622+
id: runId,
623+
status: snapshot.runStatus,
624+
attemptNumber: snapshot.attemptNumber,
625+
},
626+
snapshot: {
627+
executionStatus: "EXECUTING",
628+
description: "Run was continued, whilst still executing.",
629+
},
630+
previousSnapshotId: snapshot.id,
631+
environmentId: snapshot.environmentId,
632+
environmentType: snapshot.environmentType,
633+
projectId: snapshot.projectId,
634+
organizationId: snapshot.organizationId,
635+
batchId: snapshot.batchId ?? undefined,
636+
completedWaitpoints: blockingWaitpoints.map((b) => ({
637+
id: b.waitpoint.id,
638+
index: b.batchIndex ?? undefined,
639+
})),
640+
}
641+
);
642+
643+
await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
644+
645+
this.$.logger.debug(
646+
`continueRunIfUnblocked: run was still executing, sending notification`,
647+
{
648+
runId,
649+
snapshot,
650+
newSnapshot,
651+
}
652+
);
653+
654+
await sendNotificationToWorker({
655+
runId,
656+
snapshot: newSnapshot,
657+
eventBus: this.$.eventBus,
658+
});
659+
} else {
660+
// Because we cannot reacquire the concurrency, we need to enqueue the run again
661+
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
662+
const newSnapshot = await this.enqueueSystem.enqueueRun({
663+
run,
664+
env: run.runtimeEnvironment,
579665
snapshot: {
580-
executionStatus: "EXECUTING",
581-
description: "Run was continued, whilst still executing.",
666+
status: "QUEUED_EXECUTING",
667+
description: "Run can continue, but is waiting for concurrency",
582668
},
583669
previousSnapshotId: snapshot.id,
584-
environmentId: snapshot.environmentId,
585-
environmentType: snapshot.environmentType,
586-
projectId: snapshot.projectId,
587-
organizationId: snapshot.organizationId,
588670
batchId: snapshot.batchId ?? undefined,
589671
completedWaitpoints: blockingWaitpoints.map((b) => ({
590672
id: b.waitpoint.id,
591673
index: b.batchIndex ?? undefined,
592674
})),
593-
}
594-
);
595-
596-
await this.releaseConcurrencySystem.refillTokensForSnapshot(snapshot);
675+
});
597676

598-
this.$.logger.debug(
599-
`continueRunIfUnblocked: run was still executing, sending notification`,
600-
{
677+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
601678
runId,
602679
snapshot,
603680
newSnapshot,
604-
}
605-
);
681+
});
682+
}
606683

607-
await sendNotificationToWorker({
608-
runId,
609-
snapshot: newSnapshot,
610-
eventBus: this.$.eventBus,
611-
});
612-
} else {
613-
// Because we cannot reacquire the concurrency, we need to enqueue the run again
614-
// and because the run is still executing, we need to set the status to QUEUED_EXECUTING
684+
break;
685+
}
686+
case "SUSPENDED": {
687+
if (!snapshot.checkpointId) {
688+
this.$.logger.error(`continueRunIfUnblocked: run is suspended, but has no checkpoint`, {
689+
runId,
690+
snapshot,
691+
});
692+
throw new Error(
693+
`continueRunIfUnblocked: run is suspended, but has no checkpoint: ${runId}`
694+
);
695+
}
696+
697+
//put it back in the queue, with the original timestamp (w/ priority)
698+
//this prioritizes dequeuing waiting runs over new runs
615699
const newSnapshot = await this.enqueueSystem.enqueueRun({
616700
run,
617701
env: run.runtimeEnvironment,
618702
snapshot: {
619-
status: "QUEUED_EXECUTING",
620-
description: "Run can continue, but is waiting for concurrency",
703+
status: "QUEUED",
704+
description: "Run was QUEUED, because all waitpoints are completed",
621705
},
622-
previousSnapshotId: snapshot.id,
623706
batchId: snapshot.batchId ?? undefined,
624707
completedWaitpoints: blockingWaitpoints.map((b) => ({
625708
id: b.waitpoint.id,
626709
index: b.batchIndex ?? undefined,
627710
})),
711+
checkpointId: snapshot.checkpointId ?? undefined,
628712
});
629713

630-
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED_EXECUTING`, {
714+
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
631715
runId,
632716
snapshot,
633717
newSnapshot,
634718
});
719+
720+
break;
635721
}
636-
} else {
637-
if (snapshot.executionStatus !== "RUN_CREATED" && !snapshot.checkpointId) {
638-
// TODO: We're screwed, should probably fail the run immediately
639-
this.$.logger.error(`continueRunIfUnblocked: run has no checkpoint`, {
640-
runId: run.id,
641-
snapshot,
642-
blockingWaitpoints,
643-
});
644-
throw new Error(`continueRunIfUnblocked: run has no checkpoint: ${run.id}`);
722+
default: {
723+
assertNever(snapshot.executionStatus);
645724
}
725+
}
646726

647-
//put it back in the queue, with the original timestamp (w/ priority)
648-
//this prioritizes dequeuing waiting runs over new runs
649-
const newSnapshot = await this.enqueueSystem.enqueueRun({
650-
run,
651-
env: run.runtimeEnvironment,
652-
snapshot: {
653-
description: "Run was QUEUED, because all waitpoints are completed",
727+
if (blockingWaitpoints.length > 0) {
728+
//5. Remove the blocking waitpoints
729+
await this.$.prisma.taskRunWaitpoint.deleteMany({
730+
where: {
731+
taskRunId: runId,
732+
id: { in: blockingWaitpoints.map((b) => b.id) },
654733
},
655-
batchId: snapshot.batchId ?? undefined,
656-
completedWaitpoints: blockingWaitpoints.map((b) => ({
657-
id: b.waitpoint.id,
658-
index: b.batchIndex ?? undefined,
659-
})),
660-
checkpointId: snapshot.checkpointId ?? undefined,
661734
});
662735

663-
this.$.logger.debug(`continueRunIfUnblocked: run goes to QUEUED`, {
736+
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
664737
runId,
665-
snapshot,
666-
newSnapshot,
738+
blockingWaitpoints,
667739
});
668740
}
669-
});
670-
671-
if (blockingWaitpoints.length > 0) {
672-
//5. Remove the blocking waitpoints
673-
await this.$.prisma.taskRunWaitpoint.deleteMany({
674-
where: {
675-
taskRunId: runId,
676-
id: { in: blockingWaitpoints.map((b) => b.id) },
677-
},
678-
});
679-
680-
this.$.logger.debug(`continueRunIfUnblocked: removed blocking waitpoints`, {
681-
runId,
682-
});
683-
}
684741

685-
return "unblocked";
742+
return "unblocked";
743+
}); // end of runlock
686744
}
687745

688746
public async createRunAssociatedWaitpoint(

0 commit comments

Comments
 (0)