Skip to content

Commit a467471

Browse files
committed
Fix flaky tests
1 parent 6230a76 commit a467471

File tree

14 files changed

+484
-397
lines changed

14 files changed

+484
-397
lines changed

packages/workflows-shared/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@
4141
},
4242
"devDependencies": {
4343
"@cloudflare/eslint-config-shared": "workspace:*",
44-
"@cloudflare/vitest-pool-workers": "catalog:vitest-3",
44+
"@cloudflare/vitest-pool-workers": "^0.13.1",
4545
"@cloudflare/workers-tsconfig": "workspace:*",
4646
"@cloudflare/workers-types": "catalog:default",
4747
"@types/mime": "^3.0.4",
4848
"esbuild": "catalog:default",
4949
"eslint": "catalog:default",
5050
"typescript": "catalog:default",
51-
"vitest": "catalog:vitest-3"
51+
"vitest": "catalog:default"
5252
},
5353
"engines": {
5454
"node": ">=18.0.0"

packages/workflows-shared/src/binding.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers";
22
import { InstanceEvent, instanceStatusName } from "./instance";
33
import {
4-
isAbortError,
54
isUserTriggeredPause,
65
isUserTriggeredRestart,
76
isUserTriggeredTerminate,
@@ -58,11 +57,9 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> {
5857
val[Symbol.dispose]();
5958
}
6059
})
61-
.catch((e) => {
62-
// Suppress abort errors since they're expected
63-
if (!isAbortError(e)) {
64-
throw e;
65-
}
60+
.catch(() => {
61+
// Suppress all rejections: create() should queue and
62+
// return immediately
6663
});
6764

6865
this.ctx.waitUntil(initPromise);

packages/workflows-shared/src/context.ts

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { RpcTarget } from "cloudflare:workers";
22
import { ms } from "itty-time";
3+
import { abortableWait } from "./engine";
34
import { INSTANCE_METADATA, InstanceEvent, InstanceStatus } from "./instance";
45
import { computeHash } from "./lib/cache";
56
import {
@@ -279,7 +280,10 @@ export class Context extends RpcTarget {
279280
// complete sleep if it didn't finish for some reason
280281
if (retryEntryPQ !== undefined) {
281282
await this.#engine.timeoutHandler.release(this.#engine);
282-
await scheduler.wait(retryEntryPQ.targetTimestamp - Date.now());
283+
await abortableWait(
284+
retryEntryPQ.targetTimestamp - Date.now(),
285+
this.#engine.engineAbortController.signal
286+
);
283287
await this.#engine.timeoutHandler.acquire(this.#engine);
284288
// @ts-expect-error priorityQueue is initiated in init
285289
this.#engine.priorityQueue.remove({
@@ -298,6 +302,11 @@ export class Context extends RpcTarget {
298302
}
299303
const { accountId, instance } = instanceMetadata;
300304

305+
// AbortController to cancel the timeout when the step callback
306+
// wins the Promise.race — matches production's pattern of
307+
// aborting the losing side to prevent dangling promises.
308+
const stepAbortController = new AbortController();
309+
301310
try {
302311
const timeoutPromise = async () => {
303312
const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`;
@@ -311,7 +320,13 @@ export class Context extends RpcTarget {
311320
targetTimestamp: Date.now() + timeout,
312321
type: "timeout",
313322
});
314-
await scheduler.wait(timeout);
323+
await abortableWait(
324+
timeout,
325+
AbortSignal.any([
326+
stepAbortController.signal,
327+
this.#engine.engineAbortController.signal,
328+
])
329+
);
315330
// if we reach here, means that we can try to delete the timeout from the PQ
316331
// because we managed to wait in the same lifetime
317332
// @ts-expect-error priorityQueue is initiated in init
@@ -378,6 +393,9 @@ export class Context extends RpcTarget {
378393
]);
379394
}
380395

396+
// Cancel the timeout so its scheduler.wait doesn't dangle
397+
stepAbortController.abort("step finished");
398+
381399
// if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ
382400
// @ts-expect-error priorityQueue is initiated in init
383401
await this.#engine.priorityQueue.remove({
@@ -448,6 +466,9 @@ export class Context extends RpcTarget {
448466
}
449467
);
450468
} catch (e) {
469+
// Cancel the timeout so its scheduler.wait doesn't dangle
470+
stepAbortController.abort("step errored");
471+
451472
const error = e as Error;
452473
// if we reach here, means that the clouse ran but errored out and we can remove the timeout from the PQ
453474
// @ts-expect-error priorityQueue is initiated in init
@@ -512,7 +533,10 @@ export class Context extends RpcTarget {
512533
});
513534
await this.#engine.timeoutHandler.release(this.#engine);
514535
// this may never finish because of the grace period - but waker will take of it
515-
await scheduler.wait(durationMs);
536+
await abortableWait(
537+
durationMs,
538+
this.#engine.engineAbortController.signal
539+
);
516540

517541
// if it ever reaches here, we can try to remove it from the priority queue since it's no longer useful
518542
// @ts-expect-error priorityQueue is initiated in init
@@ -590,8 +614,9 @@ export class Context extends RpcTarget {
590614
);
591615
// in case the engine dies while sleeping and wakes up before the retry period
592616
if (entryPQ !== undefined) {
593-
await scheduler.wait(
594-
disableSleep ? 0 : entryPQ.targetTimestamp - Date.now()
617+
await abortableWait(
618+
disableSleep ? 0 : entryPQ.targetTimestamp - Date.now(),
619+
this.#engine.engineAbortController.signal
595620
);
596621
// @ts-expect-error priorityQueue is initiated in init
597622
this.#engine.priorityQueue.remove({ hash: cacheKey, type: "sleep" });
@@ -635,7 +660,10 @@ export class Context extends RpcTarget {
635660
});
636661

637662
// this probably will never finish except if sleep is less than the grace period
638-
await scheduler.wait(disableSleep ? 0 : duration);
663+
await abortableWait(
664+
disableSleep ? 0 : duration,
665+
this.#engine.engineAbortController.signal
666+
);
639667

640668
this.#engine.writeLog(
641669
InstanceEvent.SLEEP_COMPLETE,
@@ -771,7 +799,10 @@ export class Context extends RpcTarget {
771799
type: "timeout",
772800
});
773801
}
774-
await scheduler.wait(timeoutToWait);
802+
await abortableWait(
803+
timeoutToWait,
804+
this.#engine.engineAbortController.signal
805+
);
775806
// if we reach here, means that we can try to delete the timeout from the PQ
776807
// because we managed to wait in the same lifetime
777808

packages/workflows-shared/src/engine.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,34 @@ export const DEFAULT_STEP_LIMIT = 10_000;
8989

9090
const PAUSE_DATETIME = "PAUSE_DATETIME";
9191

92+
/**
93+
* Race `scheduler.wait` against an AbortSignal. Native `scheduler.wait` in
94+
* workerd doesn't guarantee timer cleanup when the signal fires, so we race
95+
* it with a promise that rejects on abort. This ensures pending waits are
96+
* interrupted when the engine aborts — preventing dangling timers from
97+
* hitting a broken gate and creating cascading exceptions.
98+
*/
99+
export function abortableWait(
100+
delay: number,
101+
signal: AbortSignal
102+
): Promise<void> {
103+
if (signal.aborted) {
104+
return Promise.reject(signal.reason);
105+
}
106+
return Promise.race([
107+
scheduler.wait(delay),
108+
new Promise<void>((_, reject) => {
109+
signal.addEventListener(
110+
"abort",
111+
() => {
112+
reject(signal.reason);
113+
},
114+
{ once: true }
115+
);
116+
}),
117+
]);
118+
}
119+
92120
export class Engine extends DurableObject<Env> {
93121
logs: Array<unknown> = [];
94122

@@ -99,6 +127,7 @@ export class Engine extends DurableObject<Env> {
99127
timeoutHandler: GracePeriodSemaphore;
100128
priorityQueue: TimePriorityQueue | undefined;
101129
stepLimit: number;
130+
engineAbortController: AbortController = new AbortController();
102131

103132
waiters: Map<string, Array<(event: Event | PromiseLike<Event>) => void>> =
104133
new Map();
@@ -425,7 +454,17 @@ export class Engine extends DurableObject<Env> {
425454

426455
async abort(reason: string) {
427456
await this.ctx.storage.sync();
428-
this.ctx.abort(reason);
457+
458+
// Dispose the semaphore so steps blocked on acquire() are rejected
459+
this.timeoutHandler.dispose();
460+
461+
// Abort all active scheduler.wait timers via the shared signal
462+
const abortError = new Error(reason);
463+
this.engineAbortController.abort(abortError);
464+
465+
await this.ctx.blockConcurrencyWhile(async () => {
466+
throw abortError;
467+
});
429468
}
430469

431470
// Called by the dispose function when introspecting the instance in tests
@@ -434,7 +473,12 @@ export class Engine extends DurableObject<Env> {
434473
await this.ctx.storage.sync();
435474
await this.ctx.storage.deleteAll();
436475

437-
this.ctx.abort(reason);
476+
this.timeoutHandler.dispose();
477+
const abortError = new Error(reason ?? "unsafeAbort");
478+
this.engineAbortController.abort(abortError);
479+
await this.ctx.blockConcurrencyWhile(async () => {
480+
throw abortError;
481+
});
438482
}
439483

440484
async storeEventMap() {

packages/workflows-shared/src/lib/gracePeriodSemaphore.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ms } from "itty-time";
2-
import { ABORT_REASONS } from "./errors";
2+
import { abortableWait } from "../engine";
33
import type { Engine } from "../engine";
44
import type { WorkflowSleepDuration } from "cloudflare:workers";
55

@@ -121,6 +121,22 @@ export class GracePeriodSemaphore {
121121
this.#waitingSteps = [];
122122
}
123123

124+
dispose() {
125+
// Reject all waiting step promises so they stop blocking
126+
for (const promise of this.#waitingSteps) {
127+
promise.rejectCallback();
128+
}
129+
this.#waitingSteps = [];
130+
131+
// Reject all waiting promises (e.g. pause)
132+
for (const promise of this.#waitingPromises) {
133+
promise.rejectCallback();
134+
}
135+
this.#waitingPromises = [];
136+
137+
this.#canInitiateSteps = false;
138+
}
139+
124140
isRunningStep() {
125141
return this.#counter > 0;
126142
}
@@ -154,7 +170,12 @@ export const startGracePeriod: GracePeriodCallback = async (
154170
}
155171

156172
latestGracePeriodTimestamp = thisTimestamp;
157-
await scheduler.wait(timeoutMs);
173+
try {
174+
await abortableWait(timeoutMs, engine.engineAbortController.signal);
175+
} catch {
176+
// Engine aborted — grace period is no longer relevant
177+
return;
178+
}
158179
if (
159180
thisTimestamp !== latestGracePeriodTimestamp ||
160181
engine.timeoutHandler.isRunningStep()
@@ -166,7 +187,8 @@ export const startGracePeriod: GracePeriodCallback = async (
166187

167188
// Ensure next alarm is set before we abort
168189
await engine.priorityQueue?.handleNextAlarm();
169-
await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE);
190+
// TODO: possibly handle alarms
191+
// await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE);
170192
};
171193
void gracePeriodHandler();
172194
};

0 commit comments

Comments
 (0)