Skip to content

Commit ca4cee5

Browse files
committed
implement the QUEUED_EXECUTING dequeuing, and creating a checkpoint while the run is in QUEUED_EXECUTING state by saving the EXECUTING_WITH_WAITPOINTS snapshotId as the previousSnapshotId on the QUEUED_EXECUTING snapshot
1 parent 1924652 commit ca4cee5

File tree

10 files changed

+332
-40
lines changed

10 files changed

+332
-40
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- DropIndex
2+
DROP INDEX "SecretStore_key_idx";
3+
4+
-- AlterTable
5+
ALTER TABLE "TaskRunExecutionSnapshot" ADD COLUMN "previousSnapshotId" TEXT;
6+
7+
-- CreateIndex
8+
CREATE INDEX "SecretStore_key_idx" ON "SecretStore"("key" text_pattern_ops);

internal-packages/database/prisma/schema.prisma

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1964,6 +1964,9 @@ model TaskRunExecutionSnapshot {
19641964
isValid Boolean @default(true)
19651965
error String?
19661966
1967+
/// The previous snapshot ID
1968+
previousSnapshotId String?
1969+
19671970
/// Run
19681971
runId String
19691972
run TaskRun @relation(fields: [runId], references: [id])

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,14 +244,15 @@ export class RunEngine {
244244
heartbeatTimeouts: this.heartbeatTimeouts,
245245
});
246246

247-
this.checkpointSystem = new CheckpointSystem({
247+
this.enqueueSystem = new EnqueueSystem({
248248
resources,
249249
executionSnapshotSystem: this.executionSnapshotSystem,
250250
});
251251

252-
this.enqueueSystem = new EnqueueSystem({
252+
this.checkpointSystem = new CheckpointSystem({
253253
resources,
254254
executionSnapshotSystem: this.executionSnapshotSystem,
255+
enqueueSystem: this.enqueueSystem,
255256
});
256257

257258
this.delayedRunSystem = new DelayedRunSystem({

internal-packages/run-engine/src/engine/systems/checkpointSystem.ts

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,23 @@ import {
1010
} from "./executionSnapshotSystem.js";
1111
import { SystemResources } from "./systems.js";
1212
import { ServiceValidationError } from "../errors.js";
13+
import { EnqueueSystem } from "./enqueueSystem.js";
1314

1415
export type CheckpointSystemOptions = {
1516
resources: SystemResources;
1617
executionSnapshotSystem: ExecutionSnapshotSystem;
18+
enqueueSystem: EnqueueSystem;
1719
};
1820

1921
export class CheckpointSystem {
2022
private readonly $: SystemResources;
2123
private readonly executionSnapshotSystem: ExecutionSnapshotSystem;
24+
private readonly enqueueSystem: EnqueueSystem;
2225

2326
constructor(private readonly options: CheckpointSystemOptions) {
2427
this.$ = options.resources;
2528
this.executionSnapshotSystem = options.executionSnapshotSystem;
29+
this.enqueueSystem = options.enqueueSystem;
2630
}
2731

2832
/**
@@ -48,7 +52,8 @@ export class CheckpointSystem {
4852

4953
return await this.$.runLock.lock([runId], 5_000, async () => {
5054
const snapshot = await getLatestExecutionSnapshot(prisma, runId);
51-
if (snapshot.id !== snapshotId) {
55+
56+
if (snapshot.id !== snapshotId && snapshot.previousSnapshotId !== snapshotId) {
5257
this.$.eventBus.emit("incomingCheckpointDiscarded", {
5358
time: new Date(),
5459
run: {
@@ -104,15 +109,11 @@ export class CheckpointSystem {
104109
data: {
105110
status: "WAITING_TO_RESUME",
106111
},
107-
select: {
108-
id: true,
109-
status: true,
110-
attemptNumber: true,
112+
include: {
111113
runtimeEnvironment: {
112-
select: {
113-
id: true,
114-
projectId: true,
115-
organizationId: true,
114+
include: {
115+
project: true,
116+
organization: true,
116117
},
117118
},
118119
},
@@ -139,35 +140,73 @@ export class CheckpointSystem {
139140
},
140141
});
141142

142-
//create a new execution snapshot, with the checkpoint
143-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
144-
run,
145-
snapshot: {
146-
executionStatus: "SUSPENDED",
147-
description: "Run was suspended after creating a checkpoint.",
148-
},
149-
environmentId: snapshot.environmentId,
150-
environmentType: snapshot.environmentType,
151-
checkpointId: taskRunCheckpoint.id,
152-
workerId,
153-
runnerId,
154-
});
143+
if (snapshot.executionStatus === "QUEUED_EXECUTING") {
144+
// Enqueue the run again
145+
const newSnapshot = await this.enqueueSystem.enqueueRun({
146+
run,
147+
env: run.runtimeEnvironment,
148+
timestamp: run.createdAt.getTime() - run.priorityMs,
149+
snapshot: {
150+
status: "QUEUED",
151+
description:
152+
"Run was QUEUED, because it was queued and executing and a checkpoint was created",
153+
},
154+
previousSnapshotId: snapshot.id,
155+
batchId: snapshot.batchId ?? undefined,
156+
completedWaitpoints: snapshot.completedWaitpoints.map((waitpoint) => ({
157+
id: waitpoint.id,
158+
index: waitpoint.index,
159+
})),
160+
checkpointId: taskRunCheckpoint.id,
161+
});
155162

156-
// Refill the token bucket for the release concurrency queue
157-
await this.$.releaseConcurrencyQueue.refillTokens(
158-
{
159-
orgId: run.runtimeEnvironment.organizationId,
160-
projectId: run.runtimeEnvironment.projectId,
161-
envId: run.runtimeEnvironment.id,
162-
},
163-
1
164-
);
163+
// Refill the token bucket for the release concurrency queue
164+
await this.$.releaseConcurrencyQueue.refillTokens(
165+
{
166+
orgId: run.runtimeEnvironment.organizationId,
167+
projectId: run.runtimeEnvironment.projectId,
168+
envId: run.runtimeEnvironment.id,
169+
},
170+
1
171+
);
165172

166-
return {
167-
ok: true as const,
168-
...executionResultFromSnapshot(newSnapshot),
169-
checkpoint: taskRunCheckpoint,
170-
} satisfies CreateCheckpointResult;
173+
return {
174+
ok: true as const,
175+
...executionResultFromSnapshot(newSnapshot),
176+
checkpoint: taskRunCheckpoint,
177+
} satisfies CreateCheckpointResult;
178+
} else {
179+
//create a new execution snapshot, with the checkpoint
180+
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
181+
run,
182+
snapshot: {
183+
executionStatus: "SUSPENDED",
184+
description: "Run was suspended after creating a checkpoint.",
185+
},
186+
previousSnapshotId: snapshot.id,
187+
environmentId: snapshot.environmentId,
188+
environmentType: snapshot.environmentType,
189+
checkpointId: taskRunCheckpoint.id,
190+
workerId,
191+
runnerId,
192+
});
193+
194+
// Refill the token bucket for the release concurrency queue
195+
await this.$.releaseConcurrencyQueue.refillTokens(
196+
{
197+
orgId: run.runtimeEnvironment.organizationId,
198+
projectId: run.runtimeEnvironment.projectId,
199+
envId: run.runtimeEnvironment.id,
200+
},
201+
1
202+
);
203+
204+
return {
205+
ok: true as const,
206+
...executionResultFromSnapshot(newSnapshot),
207+
checkpoint: taskRunCheckpoint,
208+
} satisfies CreateCheckpointResult;
209+
}
171210
});
172211
}
173212

@@ -229,6 +268,7 @@ export class CheckpointSystem {
229268
executionStatus: "EXECUTING",
230269
description: "Run was continued after being suspended",
231270
},
271+
previousSnapshotId: snapshot.id,
232272
environmentId: snapshot.environmentId,
233273
environmentType: snapshot.environmentType,
234274
completedWaitpoints: snapshot.completedWaitpoints,

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ export class DequeueSystem {
102102
description:
103103
"Tried to dequeue a run that is not in a valid state to be dequeued.",
104104
},
105+
previousSnapshotId: snapshot.id,
105106
environmentId: snapshot.environmentId,
106107
environmentType: snapshot.environmentType,
107108
checkpointId: snapshot.checkpointId ?? undefined,
@@ -142,6 +143,7 @@ export class DequeueSystem {
142143
executionStatus: "EXECUTING",
143144
description: "Run was continued, whilst still executing.",
144145
},
146+
previousSnapshotId: snapshot.id,
145147
environmentId: snapshot.environmentId,
146148
environmentType: snapshot.environmentType,
147149
batchId: snapshot.batchId ?? undefined,
@@ -427,6 +429,7 @@ export class DequeueSystem {
427429
executionStatus: "PENDING_EXECUTING",
428430
description: "Run was dequeued for execution",
429431
},
432+
previousSnapshotId: snapshot.id,
430433
environmentId: snapshot.environmentId,
431434
environmentType: snapshot.environmentType,
432435
checkpointId: snapshot.checkpointId ?? undefined,

internal-packages/run-engine/src/engine/systems/enqueueSystem.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export class EnqueueSystem {
2323
timestamp,
2424
tx,
2525
snapshot,
26+
previousSnapshotId,
2627
batchId,
2728
checkpointId,
2829
completedWaitpoints,
@@ -37,6 +38,7 @@ export class EnqueueSystem {
3738
status?: Extract<TaskRunExecutionStatus, "QUEUED" | "QUEUED_EXECUTING">;
3839
description?: string;
3940
};
41+
previousSnapshotId?: string;
4042
batchId?: string;
4143
checkpointId?: string;
4244
completedWaitpoints?: {
@@ -45,16 +47,17 @@ export class EnqueueSystem {
4547
}[];
4648
workerId?: string;
4749
runnerId?: string;
48-
}): Promise<void> {
50+
}) {
4951
const prisma = tx ?? this.$.prisma;
5052

51-
await this.$.runLock.lock([run.id], 5000, async () => {
53+
return await this.$.runLock.lock([run.id], 5000, async () => {
5254
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {
5355
run: run,
5456
snapshot: {
5557
executionStatus: snapshot?.status ?? "QUEUED",
5658
description: snapshot?.description ?? "Run was QUEUED",
5759
},
60+
previousSnapshotId,
5861
batchId,
5962
environmentId: env.id,
6063
environmentType: env.type,
@@ -85,6 +88,8 @@ export class EnqueueSystem {
8588
attempt: 0,
8689
},
8790
});
91+
92+
return newSnapshot;
8893
});
8994
}
9095
}

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ export class ExecutionSnapshotSystem {
154154
{
155155
run,
156156
snapshot,
157+
previousSnapshotId,
157158
batchId,
158159
environmentId,
159160
environmentType,
@@ -168,6 +169,7 @@ export class ExecutionSnapshotSystem {
168169
executionStatus: TaskRunExecutionStatus;
169170
description: string;
170171
};
172+
previousSnapshotId?: string;
171173
batchId?: string;
172174
environmentId: string;
173175
environmentType: RuntimeEnvironmentType;
@@ -186,6 +188,7 @@ export class ExecutionSnapshotSystem {
186188
engine: "V2",
187189
executionStatus: snapshot.executionStatus,
188190
description: snapshot.description,
191+
previousSnapshotId,
189192
runId: run.id,
190193
runStatus: run.status,
191194
attemptNumber: run.attemptNumber ?? undefined,

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ export class RunAttemptSystem {
213213
isWarmStart ? " (warm start)" : ""
214214
}`,
215215
},
216+
previousSnapshotId: latestSnapshot.id,
216217
environmentId: latestSnapshot.environmentId,
217218
environmentType: latestSnapshot.environmentType,
218219
workerId,
@@ -733,6 +734,7 @@ export class RunAttemptSystem {
733734
executionStatus: "PENDING_EXECUTING",
734735
description: "Attempt failed with a short delay, starting a new attempt",
735736
},
737+
previousSnapshotId: latestSnapshot.id,
736738
environmentId: latestSnapshot.environmentId,
737739
environmentType: latestSnapshot.environmentType,
738740
workerId,
@@ -983,6 +985,7 @@ export class RunAttemptSystem {
983985
executionStatus: "PENDING_CANCEL",
984986
description: "Run was cancelled",
985987
},
988+
previousSnapshotId: latestSnapshot.id,
986989
environmentId: latestSnapshot.environmentId,
987990
environmentType: latestSnapshot.environmentType,
988991
workerId,
@@ -1005,6 +1008,7 @@ export class RunAttemptSystem {
10051008
executionStatus: "FINISHED",
10061009
description: "Run was cancelled, not finished",
10071010
},
1011+
previousSnapshotId: latestSnapshot.id,
10081012
environmentId: latestSnapshot.environmentId,
10091013
environmentType: latestSnapshot.environmentType,
10101014
workerId,
@@ -1114,6 +1118,7 @@ export class RunAttemptSystem {
11141118
executionStatus: "FINISHED",
11151119
description: "Run failed",
11161120
},
1121+
previousSnapshotId: snapshotId,
11171122
environmentId: run.runtimeEnvironment.id,
11181123
environmentType: run.runtimeEnvironment.type,
11191124
workerId,

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ export class WaitpointSystem {
400400
executionStatus: newStatus,
401401
description: "Run was blocked by a waitpoint.",
402402
},
403+
previousSnapshotId: snapshot.id,
403404
environmentId: snapshot.environmentId,
404405
environmentType: snapshot.environmentType,
405406
batchId: batch?.id ?? snapshot.batchId ?? undefined,
@@ -511,6 +512,7 @@ export class WaitpointSystem {
511512
executionStatus: "EXECUTING",
512513
description: "Run was continued, whilst still executing.",
513514
},
515+
previousSnapshotId: snapshot.id,
514516
environmentId: snapshot.environmentId,
515517
environmentType: snapshot.environmentType,
516518
batchId: snapshot.batchId ?? undefined,
@@ -537,6 +539,7 @@ export class WaitpointSystem {
537539
status: "QUEUED_EXECUTING",
538540
description: "Run can continue, but is waiting for concurrency",
539541
},
542+
previousSnapshotId: snapshot.id,
540543
batchId: snapshot.batchId ?? undefined,
541544
completedWaitpoints: blockingWaitpoints.map((b) => ({
542545
id: b.waitpoint.id,

0 commit comments

Comments
 (0)