Skip to content

Commit d5925a6

Browse files
committed
Fix flaky tests
1 parent cf25336 commit d5925a6

File tree

14 files changed

+775
-451
lines changed

14 files changed

+775
-451
lines changed

packages/workflows-shared/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,14 @@
4141
},
4242
"devDependencies": {
4343
"@cloudflare/eslint-config-shared": "workspace:*",
44-
"@cloudflare/vitest-pool-workers": "catalog:vitest-3",
44+
"@cloudflare/vitest-pool-workers": "^0.13.1",
4545
"@cloudflare/workers-tsconfig": "workspace:*",
4646
"@cloudflare/workers-types": "catalog:default",
4747
"@types/mime": "^3.0.4",
4848
"esbuild": "catalog:default",
4949
"eslint": "catalog:default",
5050
"typescript": "catalog:default",
51-
"vitest": "catalog:vitest-3"
51+
"vitest": "catalog:default"
5252
},
5353
"engines": {
5454
"node": ">=18.0.0"

packages/workflows-shared/src/binding.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers";
22
import { InstanceEvent, instanceStatusName } from "./instance";
33
import {
4-
isAbortError,
54
isUserTriggeredPause,
65
isUserTriggeredRestart,
76
isUserTriggeredTerminate,
@@ -58,11 +57,9 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> {
5857
val[Symbol.dispose]();
5958
}
6059
})
61-
.catch((e) => {
62-
// Suppress abort errors since they're expected
63-
if (!isAbortError(e)) {
64-
throw e;
65-
}
60+
.catch(() => {
61+
// Suppress all rejections: create() should queue and
62+
// return immediately
6663
});
6764

6865
this.ctx.waitUntil(initPromise);

packages/workflows-shared/src/context.ts

Lines changed: 89 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ export class Context extends RpcTarget {
7272
}
7373

7474
const status = await this.#engine.getStatus();
75+
76+
if (status === InstanceStatus.Paused) {
77+
throw new Error(ABORT_REASONS.USER_PAUSE);
78+
}
79+
7580
if (status === InstanceStatus.WaitingForPause) {
7681
await this.#state.storage.put(PAUSE_DATETIME, new Date());
7782
const metadata =
@@ -83,7 +88,7 @@ export class Context extends RpcTarget {
8388
InstanceStatus.Paused
8489
);
8590
}
86-
await this.#engine.abort(ABORT_REASONS.USER_PAUSE);
91+
throw new Error(ABORT_REASONS.USER_PAUSE);
8792
}
8893
}
8994

@@ -511,8 +516,34 @@ export class Context extends RpcTarget {
511516
type: "retry",
512517
});
513518
await this.#engine.timeoutHandler.release(this.#engine);
514-
// this may never finish because of the grace period - but waker will take of it
515-
await scheduler.wait(durationMs);
519+
// Race retry wait against the pause signal so pause
520+
// takes effect immediately during retries
521+
{
522+
const retryPauseSignal = this.#engine.pauseController.signal;
523+
let pausedDuringRetry = false;
524+
await Promise.race([
525+
scheduler.wait(durationMs),
526+
new Promise<void>((resolve) => {
527+
if (retryPauseSignal.aborted) {
528+
resolve();
529+
return;
530+
}
531+
retryPauseSignal.addEventListener("abort", () => resolve(), {
532+
once: true,
533+
});
534+
}),
535+
]);
536+
const retryStatus = await this.#engine.getStatus();
537+
if (
538+
retryStatus === InstanceStatus.Paused ||
539+
retryStatus === InstanceStatus.WaitingForPause
540+
) {
541+
pausedDuringRetry = true;
542+
}
543+
if (pausedDuringRetry) {
544+
throw new Error(ABORT_REASONS.USER_PAUSE);
545+
}
546+
}
516547

517548
// if it ever reaches here, we can try to remove it from the priority queue since it's no longer useful
518549
// @ts-expect-error priorityQueue is initiated in init
@@ -634,8 +665,37 @@ export class Context extends RpcTarget {
634665
type: "sleep",
635666
});
636667

637-
// this probably will never finish except if sleep is less than the grace period
638-
await scheduler.wait(disableSleep ? 0 : duration);
668+
// Race the sleep against the pause signal
669+
const pauseSignal = this.#engine.pauseController.signal;
670+
const sleepDuration = disableSleep ? 0 : duration;
671+
672+
let pausedDuringSleep = false;
673+
await Promise.race([
674+
scheduler.wait(sleepDuration),
675+
new Promise<void>((resolve) => {
676+
if (pauseSignal.aborted) {
677+
resolve();
678+
return;
679+
}
680+
pauseSignal.addEventListener("abort", () => resolve(), {
681+
once: true,
682+
});
683+
}),
684+
]);
685+
686+
// Check if we were paused during the sleep
687+
const statusAfterSleep = await this.#engine.getStatus();
688+
if (
689+
statusAfterSleep === InstanceStatus.Paused ||
690+
statusAfterSleep === InstanceStatus.WaitingForPause
691+
) {
692+
pausedDuringSleep = true;
693+
}
694+
695+
if (pausedDuringSleep) {
696+
// Throw pause error
697+
throw new Error(ABORT_REASONS.USER_PAUSE);
698+
}
639699

640700
this.#engine.writeLog(
641701
InstanceEvent.SLEEP_COMPLETE,
@@ -806,11 +866,27 @@ export class Context extends RpcTarget {
806866
this.#engine.waiters.set(options.type, callbacks);
807867
});
808868

869+
// Race event, timeout, and pause signal. If paused during waitForEvent,
870+
// throw a pause error so the workflow stops cleanly via init()'s catch
871+
const pauseSignal = this.#engine.pauseController.signal;
872+
const pausePromise = new Promise<never>((_, reject) => {
873+
if (pauseSignal.aborted) {
874+
reject(new Error(ABORT_REASONS.USER_PAUSE));
875+
return;
876+
}
877+
pauseSignal.addEventListener(
878+
"abort",
879+
() => reject(new Error(ABORT_REASONS.USER_PAUSE)),
880+
{ once: true }
881+
);
882+
});
883+
809884
const result = await Promise.race([
810885
eventPromise,
811886
timeoutEntryPQ !== undefined
812887
? timeoutPromise(timeoutEntryPQ.targetTimestamp - Date.now(), false)
813888
: timeoutPromise(ms(options.timeout), true),
889+
pausePromise,
814890
])
815891
.then(async (event) => {
816892
this.#engine.writeLog(
@@ -823,6 +899,14 @@ export class Context extends RpcTarget {
823899
return event;
824900
})
825901
.catch(async (error) => {
902+
// Pause errors propagate directly to init()'s catch —
903+
// don't log them as timeouts or persist to storage
904+
if (
905+
error instanceof Error &&
906+
error.message === ABORT_REASONS.USER_PAUSE
907+
) {
908+
throw error;
909+
}
826910
this.#engine.writeLog(
827911
InstanceEvent.WAIT_TIMED_OUT,
828912
cacheKey,

packages/workflows-shared/src/engine.ts

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import {
1313
ABORT_REASONS,
1414
createWorkflowError,
1515
isAbortError,
16-
isUserTriggeredPause,
1716
WorkflowFatalError,
1817
} from "./lib/errors";
1918
import {
@@ -99,6 +98,8 @@ export class Engine extends DurableObject<Env> {
9998
timeoutHandler: GracePeriodSemaphore;
10099
priorityQueue: TimePriorityQueue | undefined;
101100
stepLimit: number;
101+
engineAbortController: AbortController = new AbortController();
102+
pauseController: AbortController = new AbortController();
102103

103104
waiters: Map<string, Array<(event: Event | PromiseLike<Event>) => void>> =
104105
new Map();
@@ -425,6 +426,11 @@ export class Engine extends DurableObject<Env> {
425426

426427
async abort(reason: string) {
427428
await this.ctx.storage.sync();
429+
430+
// Clean up pending JS operations
431+
this.timeoutHandler.dispose();
432+
this.engineAbortController.abort(new Error(reason));
433+
428434
this.ctx.abort(reason);
429435
}
430436

@@ -655,14 +661,13 @@ export class Engine extends DurableObject<Env> {
655661
metadata.instance.id,
656662
InstanceStatus.Paused
657663
);
658-
await this.abort(ABORT_REASONS.USER_PAUSE);
664+
// Signal the pause controller to interrupt any active
665+
// scheduler.wait (sleep/waitForEvent). The workflow will
666+
// throw a pause error at the next step boundary via
667+
// #checkForPendingPause()
668+
this.pauseController.abort(ABORT_REASONS.USER_PAUSE);
659669
})
660-
.catch((e) => {
661-
// Expected: abort rejects the promise chain when it kills the DO
662-
if (!isAbortError(e)) {
663-
throw e;
664-
}
665-
});
670+
.catch(() => {});
666671
}
667672

668673
async userTriggeredRestart() {
@@ -774,6 +779,11 @@ export class Engine extends DurableObject<Env> {
774779

775780
await this.ctx.storage.put(ENGINE_STATUS_KEY, InstanceStatus.Queued);
776781

782+
// Reset pause state for the new run. The DO stays alive across
783+
// pause/resume, so we need to clear stale in-memory state from the previous run.
784+
this.pauseController = new AbortController();
785+
this.waiters = new Map();
786+
777787
void this.init(accountId, workflow, version, instance, event);
778788
}
779789

@@ -879,7 +889,7 @@ export class Engine extends DurableObject<Env> {
879889
});
880890
this.isRunning = false;
881891
} catch (err) {
882-
if (isUserTriggeredPause(err)) {
892+
if (isAbortError(err)) {
883893
this.isRunning = false;
884894
return;
885895
}

packages/workflows-shared/src/lib/gracePeriodSemaphore.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { ms } from "itty-time";
2-
import { ABORT_REASONS } from "./errors";
32
import type { Engine } from "../engine";
43
import type { WorkflowSleepDuration } from "cloudflare:workers";
54

@@ -121,6 +120,22 @@ export class GracePeriodSemaphore {
121120
this.#waitingSteps = [];
122121
}
123122

123+
dispose() {
124+
// Reject all waiting step promises so they stop blocking
125+
for (const promise of this.#waitingSteps) {
126+
promise.rejectCallback();
127+
}
128+
this.#waitingSteps = [];
129+
130+
// Reject all waiting promises
131+
for (const promise of this.#waitingPromises) {
132+
promise.rejectCallback();
133+
}
134+
this.#waitingPromises = [];
135+
136+
this.#canInitiateSteps = false;
137+
}
138+
124139
isRunningStep() {
125140
return this.#counter > 0;
126141
}
@@ -166,7 +181,10 @@ export const startGracePeriod: GracePeriodCallback = async (
166181

167182
// Ensure next alarm is set before we abort
168183
await engine.priorityQueue?.handleNextAlarm();
169-
await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE);
184+
// await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE);
170185
};
171-
void gracePeriodHandler();
186+
void gracePeriodHandler().catch(() => {
187+
// Swallow — the engine is shutting down (abort kills the context,
188+
// which rejects the scheduler.wait inside gracePeriodHandler)
189+
});
172190
};

0 commit comments

Comments
 (0)