Skip to content

Commit 938c5a1

Browse files
committed
add ability to delay checkpoints
1 parent 8f3f373 commit 938c5a1

File tree

1 file changed

+38
-18
lines changed

1 file changed

+38
-18
lines changed

apps/coordinator/src/checkpointer.ts

Lines changed: 38 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ export class Checkpointer {
8989
#logger = new SimpleStructuredLogger("checkpointer");
9090
#abortControllers = new Map<string, AbortController>();
9191
#failedCheckpoints = new Map<string, unknown>();
92-
#waitingForRetry = new Set<string>();
92+
#waitingToCheckpoint = new Set<string>();
9393

9494
private registryHost: string;
9595
private registryNamespace: string;
@@ -189,7 +189,10 @@ export class Checkpointer {
189189
}
190190
}
191191

192-
async checkpointAndPush(opts: CheckpointAndPushOptions): Promise<CheckpointData | undefined> {
192+
async checkpointAndPush(
193+
opts: CheckpointAndPushOptions,
194+
delayMs?: number
195+
): Promise<CheckpointData | undefined> {
193196
const start = performance.now();
194197
this.#logger.log(`checkpointAndPush() start`, { start, opts });
195198

@@ -203,7 +206,7 @@ export class Checkpointer {
203206
}
204207

205208
try {
206-
const result = await this.#checkpointAndPushWithBackoff(opts);
209+
const result = await this.#checkpointAndPushWithBackoff(opts, delayMs);
207210

208211
const end = performance.now();
209212
this.#logger.log(`checkpointAndPush() end`, {
@@ -226,8 +229,8 @@ export class Checkpointer {
226229
}
227230
}
228231

229-
isCheckpointing(runId: string) {
230-
return this.#abortControllers.has(runId) || this.#waitingForRetry.has(runId);
232+
#isCheckpointing(runId: string) {
233+
return this.#abortControllers.has(runId) || this.#waitingToCheckpoint.has(runId);
231234
}
232235

233236
cancelCheckpoint(runId: string): boolean {
@@ -238,8 +241,8 @@ export class Checkpointer {
238241
return true;
239242
}
240243

241-
if (this.#waitingForRetry.has(runId)) {
242-
this.#waitingForRetry.delete(runId);
244+
if (this.#waitingToCheckpoint.has(runId)) {
245+
this.#waitingToCheckpoint.delete(runId);
243246
return true;
244247
}
245248

@@ -261,13 +264,30 @@ export class Checkpointer {
261264
return true;
262265
}
263266

264-
async #checkpointAndPushWithBackoff({
265-
runId,
266-
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
267-
projectRef,
268-
deploymentVersion,
269-
attemptNumber,
270-
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
267+
async #checkpointAndPushWithBackoff(
268+
{
269+
runId,
270+
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
271+
projectRef,
272+
deploymentVersion,
273+
attemptNumber,
274+
}: CheckpointAndPushOptions,
275+
delayMs?: number
276+
): Promise<CheckpointAndPushResult> {
277+
if (delayMs && delayMs > 0) {
278+
this.#logger.log("Delaying checkpoint", { runId, delayMs });
279+
280+
this.#waitingToCheckpoint.add(runId);
281+
await setTimeout(delayMs);
282+
283+
if (!this.#waitingToCheckpoint.has(runId)) {
284+
this.#logger.log("Checkpoint canceled during initial delay", { runId });
285+
return { success: false, reason: "CANCELED" };
286+
} else {
287+
this.#waitingToCheckpoint.delete(runId);
288+
}
289+
}
290+
271291
this.#logger.log("Checkpointing with backoff", {
272292
runId,
273293
leaveRunning,
@@ -290,14 +310,14 @@ export class Checkpointer {
290310
delay,
291311
});
292312

293-
this.#waitingForRetry.add(runId);
313+
this.#waitingToCheckpoint.add(runId);
294314
await setTimeout(delay.milliseconds);
295315

296-
if (!this.#waitingForRetry.has(runId)) {
316+
if (!this.#waitingToCheckpoint.has(runId)) {
297317
this.#logger.log("Checkpoint canceled while waiting for retry", { runId });
298318
return { success: false, reason: "CANCELED" };
299319
} else {
300-
this.#waitingForRetry.delete(runId);
320+
this.#waitingToCheckpoint.delete(runId);
301321
}
302322
}
303323

@@ -386,7 +406,7 @@ export class Checkpointer {
386406
return { success: false, reason: "NO_SUPPORT" };
387407
}
388408

389-
if (this.isCheckpointing(runId)) {
409+
if (this.#isCheckpointing(runId)) {
390410
this.#logger.error("Checkpoint procedure already in progress", { options });
391411
return { success: false, reason: "IN_PROGRESS" };
392412
}

0 commit comments

Comments
 (0)