Skip to content

Commit 1924652

Browse files
committed
Implement checkpoint tests, handle dequeuing QUEUED_EXECUTING runs
1 parent 9bb4c77 commit 1924652

File tree

5 files changed

+992
-3
lines changed

5 files changed

+992
-3
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,7 @@ export class RunEngine {
10091009
batch,
10101010
workerId,
10111011
runnerId,
1012+
tx,
10121013
}: {
10131014
runId: string;
10141015
waitpoints: string | string[];
@@ -1033,6 +1034,7 @@ export class RunEngine {
10331034
batch,
10341035
workerId,
10351036
runnerId,
1037+
tx,
10361038
});
10371039
}
10381040

internal-packages/run-engine/src/engine/statuses.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export function isCheckpointable(status: TaskRunExecutionStatus): boolean {
2626
//executing
2727
"EXECUTING",
2828
"EXECUTING_WITH_WAITPOINTS",
29+
"QUEUED_EXECUTING",
2930
];
3031
return checkpointableStatuses.includes(status);
3132
}

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { RunEngineOptions } from "../types.js";
1010
import { ExecutionSnapshotSystem, getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
1111
import { RunAttemptSystem } from "./runAttemptSystem.js";
1212
import { SystemResources } from "./systems.js";
13+
import { sendNotificationToWorker } from "../eventBus.js";
1314

1415
export type DequeueSystemOptions = {
1516
resources: SystemResources;
@@ -128,6 +129,38 @@ export class DequeueSystem {
128129
return null;
129130
}
130131

132+
if (snapshot.executionStatus === "QUEUED_EXECUTING") {
133+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
134+
prisma,
135+
{
136+
run: {
137+
id: runId,
138+
status: snapshot.runStatus,
139+
attemptNumber: snapshot.attemptNumber,
140+
},
141+
snapshot: {
142+
executionStatus: "EXECUTING",
143+
description: "Run was continued, whilst still executing.",
144+
},
145+
environmentId: snapshot.environmentId,
146+
environmentType: snapshot.environmentType,
147+
batchId: snapshot.batchId ?? undefined,
148+
completedWaitpoints: snapshot.completedWaitpoints.map((waitpoint) => ({
149+
id: waitpoint.id,
150+
index: waitpoint.index,
151+
})),
152+
}
153+
);
154+
155+
await sendNotificationToWorker({
156+
runId,
157+
snapshot: newSnapshot,
158+
eventBus: this.$.eventBus,
159+
});
160+
161+
return null;
162+
}
163+
131164
const result = await getRunWithBackgroundWorkerTasks(
132165
prisma,
133166
runId,

0 commit comments

Comments
 (0)