Skip to content

Commit 74a6fec

Browse files
committed
Address review comments
1 parent 8e574e3 commit 74a6fec

File tree

9 files changed

+94
-49
lines changed

9 files changed

+94
-49
lines changed

packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,43 +74,37 @@ class InstanceImpl implements WorkflowInstance {
7474
}
7575

7676
public async pause(): Promise<void> {
77-
const instance = await this.getInstance();
77+
using instance = await this.getInstance();
7878
await instance.pause();
79-
instance[Symbol.dispose]();
8079
}
8180

8281
public async resume(): Promise<void> {
83-
const instance = await this.getInstance();
82+
using instance = await this.getInstance();
8483
await instance.resume();
85-
instance[Symbol.dispose]();
8684
}
8785

8886
public async terminate(): Promise<void> {
89-
const instance = await this.getInstance();
87+
using instance = await this.getInstance();
9088
await instance.terminate();
91-
instance[Symbol.dispose]();
9289
}
9390

9491
public async restart(): Promise<void> {
95-
const instance = await this.getInstance();
92+
using instance = await this.getInstance();
9693
await instance.restart();
97-
instance[Symbol.dispose]();
9894
}
9995

10096
public async status(): Promise<InstanceStatus> {
101-
const instance = await this.getInstance();
97+
using instance = await this.getInstance();
10298
using res = (await instance.status()) as InstanceStatus & Disposable;
103-
instance[Symbol.dispose]();
10499
return structuredClone(res);
105100
}
106101

107102
public async sendEvent(args: {
108103
payload: unknown;
109104
type: string;
110105
}): Promise<void> {
111-
const instance = await this.getInstance();
106+
using instance = await this.getInstance();
112107
await instance.sendEvent(args);
113-
instance[Symbol.dispose]();
114108
}
115109
}
116110

packages/workflows-shared/src/binding.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,20 +219,31 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
219219
public async status(): Promise<
220220
InstanceStatus & { __LOCAL_DEV_STEP_OUTPUTS: unknown[] }
221221
> {
222-
// If the stub is stale (e.g. after pause/restart/terminate aborted
223-
// the DO), refresh it transparently so callers never see the error.
224-
let status: EngineInstanceStatus;
222+
// Both getStatus() and readLogs() must use the same fresh stub.
223+
// After pause/restart/terminate aborts the DO, the stub goes stale
224+
const fetchStatusAndLogs = async () => {
225+
const status = await this.stub.getStatus();
226+
227+
// NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise<EngineLogs>
228+
const logs = await (this.stub.readLogs() as unknown as Promise<
229+
EngineLogs & Disposable
230+
>);
231+
232+
return { status, logs };
233+
};
234+
235+
let result: {
236+
status: EngineInstanceStatus;
237+
logs: EngineLogs & Disposable;
238+
};
225239
try {
226-
status = await this.stub.getStatus();
240+
result = await fetchStatusAndLogs();
227241
} catch {
228242
this.stub = this.getStub();
229-
status = await this.stub.getStatus();
243+
result = await fetchStatusAndLogs();
230244
}
231-
232-
// NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise<EngineLogs>
233-
using logs = await (this.stub.readLogs() as unknown as Promise<
234-
EngineLogs & Disposable
235-
>);
245+
// Dispose the RPC handle when the method scope exits
246+
using logs = result.logs;
236247

237248
const filteredLogs = logs.logs.filter(
238249
(log) =>
@@ -255,7 +266,7 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
255266
)?.metadata.error;
256267

257268
return {
258-
status: instanceStatusName(status),
269+
status: instanceStatusName(result.status),
259270
__LOCAL_DEV_STEP_OUTPUTS: stepOutputs,
260271
output: workflowOutput,
261272
error: workflowError,

packages/workflows-shared/src/engine.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { computeHash } from "./lib/cache";
1212
import {
1313
ABORT_REASONS,
1414
createWorkflowError,
15+
isAbortError,
1516
isUserTriggeredPause,
1617
WorkflowFatalError,
1718
} from "./lib/errors";
@@ -646,15 +647,22 @@ export class Engine extends DurableObject<Env> {
646647
InstanceStatus.WaitingForPause
647648
);
648649

649-
void this.timeoutHandler.waitUntilNothingIsRunning("pause", async () => {
650-
await this.ctx.storage.put(PAUSE_DATETIME, new Date());
651-
await this.setStatus(
652-
metadata.accountId,
653-
metadata.instance.id,
654-
InstanceStatus.Paused
655-
);
656-
await this.abort(ABORT_REASONS.USER_PAUSE);
657-
});
650+
void this.timeoutHandler
651+
.waitUntilNothingIsRunning("pause", async () => {
652+
await this.ctx.storage.put(PAUSE_DATETIME, new Date());
653+
await this.setStatus(
654+
metadata.accountId,
655+
metadata.instance.id,
656+
InstanceStatus.Paused
657+
);
658+
await this.abort(ABORT_REASONS.USER_PAUSE);
659+
})
660+
.catch((e) => {
661+
// Expected: abort rejects the promise chain when it kills the DO
662+
if (!isAbortError(e)) {
663+
throw e;
664+
}
665+
});
658666
}
659667

660668
async userTriggeredRestart() {
@@ -808,6 +816,7 @@ export class Engine extends DurableObject<Env> {
808816
InstanceStatus.Errored, // TODO (WOR-85): Remove this once upgrade story is done
809817
InstanceStatus.Terminated,
810818
InstanceStatus.Complete,
819+
InstanceStatus.Paused,
811820
].includes(status)
812821
) {
813822
return;

packages/workflows-shared/src/lib/gracePeriodSemaphore.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,11 @@ export class GracePeriodSemaphore {
7878
});
7979
} catch {
8080
// If the promise gets rejected (e.g. resume cancels the pause),
81-
// allow steps to run again
81+
// allow steps to run again and unblock any that were waiting
82+
for (const promise of this.#waitingSteps) {
83+
promise.resolveCallback(undefined);
84+
}
85+
this.#waitingSteps = [];
8286
this.#canInitiateSteps = true;
8387
return;
8488
}
@@ -109,6 +113,12 @@ export class GracePeriodSemaphore {
109113
);
110114

111115
this.#canInitiateSteps = true;
116+
117+
// Unblock any steps that were waiting to acquire while the pause was pending
118+
for (const promise of this.#waitingSteps) {
119+
promise.resolveCallback(undefined);
120+
}
121+
this.#waitingSteps = [];
112122
}
113123

114124
isRunningStep() {

packages/workflows-shared/tests/engine.test.ts

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ describe("Engine", () => {
3636
{} as DatabaseInstance,
3737
{ payload: {}, timestamp: new Date(), instanceId }
3838
)
39-
.catch((e) => {
39+
.catch((e: unknown) => {
4040
// NonRetryableError aborts
4141
if (!isAbortError(e)) {
4242
throw e;
@@ -45,10 +45,20 @@ describe("Engine", () => {
4545

4646
await vi.waitUntil(
4747
async () => {
48-
const logs = (await env.ENGINE.get(engineId).readLogs()) as EngineLogs;
49-
return logs.logs.some(
50-
(val) => val.event === InstanceEvent.WORKFLOW_FAILURE
51-
);
48+
try {
49+
const logs = (await env.ENGINE.get(
50+
engineId
51+
).readLogs()) as EngineLogs;
52+
return logs.logs.some(
53+
(val) => val.event === InstanceEvent.WORKFLOW_FAILURE
54+
);
55+
} catch (e) {
56+
// DO may still be aborting — retry
57+
if (isAbortError(e)) {
58+
return false;
59+
}
60+
throw e;
61+
}
5262
},
5363
{ timeout: 2000 }
5464
);
@@ -532,9 +542,16 @@ describe("Engine", () => {
532542
);
533543

534544
// Request pause while long-step is in flight
535-
await runInDurableObject(engineStub, async (engine) => {
536-
await engine.changeInstanceStatus("pause");
537-
});
545+
// Abort may propagate if the step finishes and pause completes during this call
546+
try {
547+
await runInDurableObject(engineStub, async (engine) => {
548+
await engine.changeInstanceStatus("pause");
549+
});
550+
} catch (e) {
551+
if (!isAbortError(e)) {
552+
throw e;
553+
}
554+
}
538555

539556
await vi.waitUntil(
540557
async () =>
@@ -610,9 +627,15 @@ describe("Engine", () => {
610627
);
611628

612629
// Request pause while both slow steps are in flight
613-
await runInDurableObject(engineStub, async (engine) => {
614-
await engine.changeInstanceStatus("pause");
615-
});
630+
try {
631+
await runInDurableObject(engineStub, async (engine) => {
632+
await engine.changeInstanceStatus("pause");
633+
});
634+
} catch (e) {
635+
if (!isAbortError(e)) {
636+
throw e;
637+
}
638+
}
616639

617640
await vi.waitUntil(
618641
async () =>

packages/workflows-shared/tests/env.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
declare namespace Cloudflare {
44
interface Env {
5-
ENGINE: DurableObjectNamespace<import("../src/index").Engine>;
5+
ENGINE: DurableObject<import("../src/index").Engine>;
66
USER_WORKFLOW: import("cloudflare:workers").WorkflowEntrypoint;
77
}
88
}

packages/workflows-shared/tests/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"compilerOptions": {
44
"moduleResolution": "bundler",
55
"types": [
6-
"@cloudflare/workers-types/experimental",
6+
"@cloudflare/workers-types",
77
"@cloudflare/vitest-pool-workers"
88
]
99
},

packages/workflows-shared/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"lib": ["es2022"],
66
"module": "esnext",
77
"moduleResolution": "bundler",
8-
"types": ["@cloudflare/workers-types/experimental"],
8+
"types": ["@cloudflare/workers-types"],
99
"noEmit": true,
1010
"isolatedModules": true,
1111
"allowSyntheticDefaultImports": true,

packages/workflows-shared/vitest.config.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ export default defineWorkersProject({
88
test: {
99
poolOptions: {
1010
workers: {
11-
singleWorker: true,
12-
isolatedStorage: true,
1311
main: "tests/test-entry.ts",
1412
miniflare: {
1513
compatibilityDate: "2025-02-04",

0 commit comments

Comments
 (0)