Skip to content

Commit a2b9408

Browse files
committed
Address review comments
1 parent 53ca9e8 commit a2b9408

File tree

3 files changed

+46
-32
lines changed

3 files changed

+46
-32
lines changed

packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,43 +74,37 @@ class InstanceImpl implements WorkflowInstance {
7474
}
7575

7676
public async pause(): Promise<void> {
77-
const instance = await this.getInstance();
77+
using instance = await this.getInstance();
7878
await instance.pause();
79-
instance[Symbol.dispose]();
8079
}
8180

8281
public async resume(): Promise<void> {
83-
const instance = await this.getInstance();
82+
using instance = await this.getInstance();
8483
await instance.resume();
85-
instance[Symbol.dispose]();
8684
}
8785

8886
public async terminate(): Promise<void> {
89-
const instance = await this.getInstance();
87+
using instance = await this.getInstance();
9088
await instance.terminate();
91-
instance[Symbol.dispose]();
9289
}
9390

9491
public async restart(): Promise<void> {
95-
const instance = await this.getInstance();
92+
using instance = await this.getInstance();
9693
await instance.restart();
97-
instance[Symbol.dispose]();
9894
}
9995

10096
public async status(): Promise<InstanceStatus> {
101-
const instance = await this.getInstance();
97+
using instance = await this.getInstance();
10298
using res = (await instance.status()) as InstanceStatus & Disposable;
103-
instance[Symbol.dispose]();
10499
return structuredClone(res);
105100
}
106101

107102
public async sendEvent(args: {
108103
payload: unknown;
109104
type: string;
110105
}): Promise<void> {
111-
const instance = await this.getInstance();
106+
using instance = await this.getInstance();
112107
await instance.sendEvent(args);
113-
instance[Symbol.dispose]();
114108
}
115109
}
116110

packages/workflows-shared/src/binding.ts

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,20 +219,31 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
219219
public async status(): Promise<
220220
InstanceStatus & { __LOCAL_DEV_STEP_OUTPUTS: unknown[] }
221221
> {
222-
// If the stub is stale (e.g. after pause/restart/terminate aborted
223-
// the DO), refresh it transparently so callers never see the error.
224-
let status: EngineInstanceStatus;
222+
// Both getStatus() and readLogs() must use the same fresh stub.
223+
// After pause/restart/terminate aborts the DO, the stub goes stale
224+
const fetchStatusAndLogs = async () => {
225+
const status = await this.stub.getStatus();
226+
227+
// NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise<EngineLogs>
228+
const logs = await (this.stub.readLogs() as unknown as Promise<
229+
EngineLogs & Disposable
230+
>);
231+
232+
return { status, logs };
233+
};
234+
235+
let result: {
236+
status: EngineInstanceStatus;
237+
logs: EngineLogs & Disposable;
238+
};
225239
try {
226-
status = await this.stub.getStatus();
240+
result = await fetchStatusAndLogs();
227241
} catch {
228242
this.stub = this.getStub();
229-
status = await this.stub.getStatus();
243+
result = await fetchStatusAndLogs();
230244
}
231-
232-
// NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise<EngineLogs>
233-
using logs = await (this.stub.readLogs() as unknown as Promise<
234-
EngineLogs & Disposable
235-
>);
245+
// Dispose the RPC handle when the method scope exits
246+
using logs = result.logs;
236247

237248
const filteredLogs = logs.logs.filter(
238249
(log) =>
@@ -255,7 +266,7 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
255266
)?.metadata.error;
256267

257268
return {
258-
status: instanceStatusName(status),
269+
status: instanceStatusName(result.status),
259270
__LOCAL_DEV_STEP_OUTPUTS: stepOutputs,
260271
output: workflowOutput,
261272
error: workflowError,

packages/workflows-shared/src/engine.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { computeHash } from "./lib/cache";
1212
import {
1313
ABORT_REASONS,
1414
createWorkflowError,
15+
isAbortError,
1516
isUserTriggeredPause,
1617
WorkflowFatalError,
1718
} from "./lib/errors";
@@ -646,15 +647,22 @@ export class Engine extends DurableObject<Env> {
646647
InstanceStatus.WaitingForPause
647648
);
648649

649-
void this.timeoutHandler.waitUntilNothingIsRunning("pause", async () => {
650-
await this.ctx.storage.put(PAUSE_DATETIME, new Date());
651-
await this.setStatus(
652-
metadata.accountId,
653-
metadata.instance.id,
654-
InstanceStatus.Paused
655-
);
656-
await this.abort(ABORT_REASONS.USER_PAUSE);
657-
});
650+
void this.timeoutHandler
651+
.waitUntilNothingIsRunning("pause", async () => {
652+
await this.ctx.storage.put(PAUSE_DATETIME, new Date());
653+
await this.setStatus(
654+
metadata.accountId,
655+
metadata.instance.id,
656+
InstanceStatus.Paused
657+
);
658+
await this.abort(ABORT_REASONS.USER_PAUSE);
659+
})
660+
.catch((e) => {
661+
// Expected: abort rejects the promise chain when it kills the DO
662+
if (!isAbortError(e)) {
663+
throw e;
664+
}
665+
});
658666
}
659667

660668
async userTriggeredRestart() {
@@ -808,6 +816,7 @@ export class Engine extends DurableObject<Env> {
808816
InstanceStatus.Errored, // TODO (WOR-85): Remove this once upgrade story is done
809817
InstanceStatus.Terminated,
810818
InstanceStatus.Complete,
819+
InstanceStatus.Paused,
811820
].includes(status)
812821
) {
813822
return;

0 commit comments

Comments
 (0)