Skip to content

Commit 3c92208

Browse files
committed
Fix flaky tests
1 parent 45dad75 commit 3c92208

File tree

14 files changed

+777
-453
lines changed

14 files changed

+777
-453
lines changed

packages/workflows-shared/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,14 @@
3939
"zod": "^3.22.3"
4040
},
4141
"devDependencies": {
42-
"@cloudflare/vitest-pool-workers": "catalog:vitest-3",
42+
"@cloudflare/lint-config-shared": "workspace:*",
43+
"@cloudflare/vitest-pool-workers": "^0.13.1",
4344
"@cloudflare/workers-tsconfig": "workspace:*",
4445
"@cloudflare/workers-types": "catalog:default",
4546
"@types/mime": "^3.0.4",
4647
"esbuild": "catalog:default",
4748
"typescript": "catalog:default",
48-
"vitest": "catalog:vitest-3"
49+
"vitest": "catalog:default"
4950
},
5051
"engines": {
5152
"node": ">=18.0.0"

packages/workflows-shared/src/binding.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers";
22
import { InstanceEvent, instanceStatusName } from "./instance";
33
import {
4-
isAbortError,
54
isUserTriggeredPause,
65
isUserTriggeredRestart,
76
isUserTriggeredTerminate,
@@ -58,11 +57,9 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> {
5857
val[Symbol.dispose]();
5958
}
6059
})
61-
.catch((e) => {
62-
// Suppress abort errors since they're expected
63-
if (!isAbortError(e)) {
64-
throw e;
65-
}
60+
.catch(() => {
61+
// Suppress all rejections: create() should queue and
62+
// return immediately
6663
});
6764

6865
this.ctx.waitUntil(initPromise);

packages/workflows-shared/src/context.ts

Lines changed: 104 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ export class Context extends RpcTarget {
7272
}
7373

7474
const status = await this.#engine.getStatus();
75+
76+
if (status === InstanceStatus.Paused) {
77+
throw new Error(ABORT_REASONS.USER_PAUSE);
78+
}
79+
7580
if (status === InstanceStatus.WaitingForPause) {
7681
await this.#state.storage.put(PAUSE_DATETIME, new Date());
7782
const metadata =
@@ -83,7 +88,7 @@ export class Context extends RpcTarget {
8388
InstanceStatus.Paused
8489
);
8590
}
86-
await this.#engine.abort(ABORT_REASONS.USER_PAUSE);
91+
throw new Error(ABORT_REASONS.USER_PAUSE);
8792
}
8893
}
8994

@@ -511,8 +516,34 @@ export class Context extends RpcTarget {
511516
type: "retry",
512517
});
513518
await this.#engine.timeoutHandler.release(this.#engine);
514-
// this may never finish because of the grace period - but waker will take of it
515-
await scheduler.wait(durationMs);
519+
// Race retry wait against the pause signal so pause
520+
// takes effect immediately during retries
521+
{
522+
const retryPauseSignal = this.#engine.pauseController.signal;
523+
let pausedDuringRetry = false;
524+
await Promise.race([
525+
scheduler.wait(durationMs),
526+
new Promise<void>((resolve) => {
527+
if (retryPauseSignal.aborted) {
528+
resolve();
529+
return;
530+
}
531+
retryPauseSignal.addEventListener("abort", () => resolve(), {
532+
once: true,
533+
});
534+
}),
535+
]);
536+
const retryStatus = await this.#engine.getStatus();
537+
if (
538+
retryStatus === InstanceStatus.Paused ||
539+
retryStatus === InstanceStatus.WaitingForPause
540+
) {
541+
pausedDuringRetry = true;
542+
}
543+
if (pausedDuringRetry) {
544+
throw new Error(ABORT_REASONS.USER_PAUSE);
545+
}
546+
}
516547

517548
// if it ever reaches here, we can try to remove it from the priority queue since it's no longer useful
518549
// @ts-expect-error priorityQueue is initiated in init
@@ -634,8 +665,37 @@ export class Context extends RpcTarget {
634665
type: "sleep",
635666
});
636667

637-
// this probably will never finish except if sleep is less than the grace period
638-
await scheduler.wait(disableSleep ? 0 : duration);
668+
// Race the sleep against the pause signal
669+
const pauseSignal = this.#engine.pauseController.signal;
670+
const sleepDuration = disableSleep ? 0 : duration;
671+
672+
let pausedDuringSleep = false;
673+
await Promise.race([
674+
scheduler.wait(sleepDuration),
675+
new Promise<void>((resolve) => {
676+
if (pauseSignal.aborted) {
677+
resolve();
678+
return;
679+
}
680+
pauseSignal.addEventListener("abort", () => resolve(), {
681+
once: true,
682+
});
683+
}),
684+
]);
685+
686+
// Check if we were paused during the sleep
687+
const statusAfterSleep = await this.#engine.getStatus();
688+
if (
689+
statusAfterSleep === InstanceStatus.Paused ||
690+
statusAfterSleep === InstanceStatus.WaitingForPause
691+
) {
692+
pausedDuringSleep = true;
693+
}
694+
695+
if (pausedDuringSleep) {
696+
// Throw pause error
697+
throw new Error(ABORT_REASONS.USER_PAUSE);
698+
}
639699

640700
this.#engine.writeLog(
641701
InstanceEvent.SLEEP_COMPLETE,
@@ -806,33 +866,49 @@ export class Context extends RpcTarget {
806866
this.#engine.waiters.set(options.type, callbacks);
807867
});
808868

809-
const result = await Promise.race([
869+
// Race event, timeout, and pause signal. The pause promise resolves
870+
// when the race settles via event/timeout before the pause signal fires
871+
const pauseSignal = this.#engine.pauseController.signal;
872+
const pausePromise = new Promise<void>((resolve) => {
873+
if (pauseSignal.aborted) {
874+
resolve();
875+
return;
876+
}
877+
pauseSignal.addEventListener("abort", () => resolve(), {
878+
once: true,
879+
});
880+
});
881+
882+
const raceResult = await Promise.race([
810883
eventPromise,
811884
timeoutEntryPQ !== undefined
812885
? timeoutPromise(timeoutEntryPQ.targetTimestamp - Date.now(), false)
813886
: timeoutPromise(ms(options.timeout), true),
814-
])
815-
.then(async (event) => {
816-
this.#engine.writeLog(
817-
InstanceEvent.WAIT_COMPLETE,
818-
cacheKey,
819-
waitForEventNameWithCounter,
820-
event as Event
821-
);
822-
await this.#state.storage.put(waitForEventKey, event);
823-
return event;
824-
})
825-
.catch(async (error) => {
826-
this.#engine.writeLog(
827-
InstanceEvent.WAIT_TIMED_OUT,
828-
cacheKey,
829-
waitForEventNameWithCounter,
830-
error
831-
);
832-
await this.#state.storage.put(errorKey, error);
833-
throw error;
834-
});
887+
pausePromise,
888+
]).catch(async (error) => {
889+
this.#engine.writeLog(
890+
InstanceEvent.WAIT_TIMED_OUT,
891+
cacheKey,
892+
waitForEventNameWithCounter,
893+
error
894+
);
895+
await this.#state.storage.put(errorKey, error);
896+
throw error;
897+
});
898+
899+
// Pause signal won the race — throw to stop the workflow
900+
if (raceResult === undefined) {
901+
throw new Error(ABORT_REASONS.USER_PAUSE);
902+
}
903+
904+
this.#engine.writeLog(
905+
InstanceEvent.WAIT_COMPLETE,
906+
cacheKey,
907+
waitForEventNameWithCounter,
908+
raceResult as Event
909+
);
910+
await this.#state.storage.put(waitForEventKey, raceResult);
835911

836-
return result as WorkflowStepEvent<T>;
912+
return raceResult as WorkflowStepEvent<T>;
837913
}
838914
}

packages/workflows-shared/src/engine.ts

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import {
1313
ABORT_REASONS,
1414
createWorkflowError,
1515
isAbortError,
16-
isUserTriggeredPause,
1716
WorkflowFatalError,
1817
} from "./lib/errors";
1918
import {
@@ -99,6 +98,8 @@ export class Engine extends DurableObject<Env> {
9998
timeoutHandler: GracePeriodSemaphore;
10099
priorityQueue: TimePriorityQueue | undefined;
101100
stepLimit: number;
101+
engineAbortController: AbortController = new AbortController();
102+
pauseController: AbortController = new AbortController();
102103

103104
waiters: Map<string, Array<(event: Event | PromiseLike<Event>) => void>> =
104105
new Map();
@@ -425,6 +426,11 @@ export class Engine extends DurableObject<Env> {
425426

426427
async abort(reason: string) {
427428
await this.ctx.storage.sync();
429+
430+
// Clean up pending JS operations
431+
this.timeoutHandler.dispose();
432+
this.engineAbortController.abort(new Error(reason));
433+
428434
this.ctx.abort(reason);
429435
}
430436

@@ -655,14 +661,13 @@ export class Engine extends DurableObject<Env> {
655661
metadata.instance.id,
656662
InstanceStatus.Paused
657663
);
658-
await this.abort(ABORT_REASONS.USER_PAUSE);
664+
// Signal the pause controller to interrupt any active
665+
// scheduler.wait (sleep/waitForEvent). The workflow will
666+
// throw a pause error at the next step boundary via
667+
// #checkForPendingPause()
668+
this.pauseController.abort(ABORT_REASONS.USER_PAUSE);
659669
})
660-
.catch((e) => {
661-
// Expected: abort rejects the promise chain when it kills the DO
662-
if (!isAbortError(e)) {
663-
throw e;
664-
}
665-
});
670+
.catch(() => {});
666671
}
667672

668673
async userTriggeredRestart() {
@@ -765,15 +770,24 @@ export class Engine extends DurableObject<Env> {
765770
const pausedDate = await this.ctx.storage.get<Date>(PAUSE_DATETIME);
766771
if (pausedDate !== undefined) {
767772
const offset = Date.now() - new Date(pausedDate).valueOf();
768-
const pq = new TimePriorityQueue(this.ctx, metadata);
769-
pq.offsetAll(offset);
773+
if (this.priorityQueue) {
774+
this.priorityQueue.offsetAll(offset);
775+
} else {
776+
const pq = new TimePriorityQueue(this.ctx, metadata);
777+
pq.offsetAll(offset);
778+
}
770779
}
771780
await this.ctx.storage.delete(PAUSE_DATETIME);
772781

773782
const { accountId, workflow, version, instance, event } = metadata;
774783

775784
await this.ctx.storage.put(ENGINE_STATUS_KEY, InstanceStatus.Queued);
776785

786+
// Reset pause state for the new run. The DO stays alive across
787+
// pause/resume, so we need to clear stale in-memory state from the previous run.
788+
this.pauseController = new AbortController();
789+
this.waiters = new Map();
790+
777791
void this.init(accountId, workflow, version, instance, event);
778792
}
779793

@@ -879,7 +893,7 @@ export class Engine extends DurableObject<Env> {
879893
});
880894
this.isRunning = false;
881895
} catch (err) {
882-
if (isUserTriggeredPause(err)) {
896+
if (isAbortError(err)) {
883897
this.isRunning = false;
884898
return;
885899
}

packages/workflows-shared/src/lib/gracePeriodSemaphore.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { ms } from "itty-time";
2-
import { ABORT_REASONS } from "./errors";
32
import type { Engine } from "../engine";
43
import type { WorkflowSleepDuration } from "cloudflare:workers";
54

@@ -121,6 +120,22 @@ export class GracePeriodSemaphore {
121120
this.#waitingSteps = [];
122121
}
123122

123+
dispose() {
124+
// Reject all waiting step promises so they stop blocking
125+
for (const promise of this.#waitingSteps) {
126+
promise.rejectCallback();
127+
}
128+
this.#waitingSteps = [];
129+
130+
// Reject all waiting promises
131+
for (const promise of this.#waitingPromises) {
132+
promise.rejectCallback();
133+
}
134+
this.#waitingPromises = [];
135+
136+
this.#canInitiateSteps = false;
137+
}
138+
124139
isRunningStep() {
125140
return this.#counter > 0;
126141
}
@@ -166,7 +181,10 @@ export const startGracePeriod: GracePeriodCallback = async (
166181

167182
// Ensure next alarm is set before we abort
168183
await engine.priorityQueue?.handleNextAlarm();
169-
await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE);
184+
// await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE);
170185
};
171-
void gracePeriodHandler();
186+
void gracePeriodHandler().catch(() => {
187+
// Swallow — the engine is shutting down (abort kills the context,
188+
// which rejects the scheduler.wait inside gracePeriodHandler)
189+
});
172190
};

0 commit comments

Comments
 (0)