Skip to content

Commit 53ca9e8

Browse files
committed
Fix pause - should not wait for sleeps or waitForEvents
1 parent 205f5f2 commit 53ca9e8

File tree

13 files changed

+300
-131
lines changed

13 files changed

+300
-131
lines changed
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
---
22
"@cloudflare/workflows-shared": minor
3+
"miniflare": minor
34
---
45

56
Workflow instances now support pause, resume, restart, and terminate in local dev.
67

78
```js
8-
const handle = await env.MY_WORKFLOW.create({
9+
const instance = await env.MY_WORKFLOW.create({
910
id: "my-instance",
10-
params: { input: "data" },
1111
});
1212

13-
await handle.pause(); // pauses after the current step completes
14-
await handle.resume(); // resumes from where it left off
15-
await handle.restart(); // restarts the workflow from the beginning
16-
await handle.terminate(); // terminates the workflow immediately
13+
await instance.pause(); // pauses after the current step completes
14+
await instance.resume(); // resumes from where it left off
15+
await instance.restart(); // restarts the workflow from the beginning
16+
await instance.terminate(); // terminates the workflow immediately
1717
```

.changeset/workflows-miniflare-lifecycle-methods.md

Lines changed: 0 additions & 5 deletions
This file was deleted.

fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,6 @@ describe("workflow instance lifecycle methods", () => {
157157
const handle = await env.MODERATOR.get(data.id);
158158
await handle.pause();
159159

160-
await handle.sendEvent({
161-
type: "moderation-decision",
162-
payload: { moderatorAction: "approve" },
163-
});
164-
165160
// ASSERTIONS:
166161
await expect(instance.waitForStatus("paused")).resolves.not.toThrow();
167162

fixtures/workflow-multiple/tests/index.test.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -121,13 +121,6 @@ describe("Workflows", () => {
121121
// Pause the instance
122122
await fetchJson(`http://${ip}:${port}/pause?workflowName=3&id=${id}`);
123123

124-
// Send event to unblock waitForEvent
125-
await fetchJson(
126-
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
127-
{ done: true },
128-
"POST"
129-
);
130-
131124
await vi.waitFor(
132125
async () => {
133126
const result = (await fetchJson(
@@ -141,6 +134,12 @@ describe("Workflows", () => {
141134
// Resume the instance
142135
await fetchJson(`http://${ip}:${port}/resume?workflowName=3&id=${id}`);
143136

137+
await fetchJson(
138+
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
139+
{ done: true },
140+
"POST"
141+
);
142+
144143
await vi.waitFor(
145144
async () => {
146145
const result = (await fetchJson(

fixtures/workflow/tests/index.test.ts

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -286,13 +286,6 @@ describe("Workflows", () => {
286286
// Pause the instance
287287
await fetchJson(`http://${ip}:${port}/pause?workflowName=${name}`);
288288

289-
// Send event to unblock waitForEvent
290-
await fetchJson(
291-
`http://${ip}:${port}/sendEvent?workflowName=${name}`,
292-
{ event: true },
293-
"POST"
294-
);
295-
296289
await vi.waitFor(
297290
async () => {
298291
const result = await fetchJson(
@@ -303,17 +296,23 @@ describe("Workflows", () => {
303296
{ timeout: 1500 }
304297
);
305298

299+
// Resume the instance
306300
await fetchJson(`http://${ip}:${port}/resume?workflowName=${name}`);
307301

308-
// After resume, the workflow should eventually complete
302+
await fetchJson(
303+
`http://${ip}:${port}/sendEvent?workflowName=${name}`,
304+
{ event: true },
305+
"POST"
306+
);
307+
309308
await vi.waitFor(
310309
async () => {
311310
const result = await fetchJson(
312311
`http://${ip}:${port}/get2?workflowName=${name}`
313312
);
314313
expect(result.status).toBe("complete");
315314
},
316-
{ timeout: 1500 }
315+
{ timeout: 5000 }
317316
);
318317
});
319318

packages/miniflare/test/plugins/workflows/index.spec.ts

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ export class LifecycleWorkflow extends WorkflowEntrypoint {
8888
async run(event, step) {
8989
const first = await step.do("first step", async () => "step-1-done");
9090
91-
const eventData = await step.waitForEvent("wait for signal", {
92-
type: "continue",
93-
timeout: "1 day",
91+
await step.do("long step", async () => {
92+
await scheduler.wait(500);
93+
return "long-step-done";
9494
});
9595
96-
const second = await step.do("second step", async () => "step-2-done");
96+
const second = await step.do("third step", async () => "step-3-done");
9797
9898
return "workflow-complete";
9999
}
@@ -224,19 +224,13 @@ describe("workflow instance lifecycle methods", () => {
224224

225225
await waitForStepOutput(mf, "pause-resume-test", "step-1-done");
226226

227-
// Pause the instance
227+
// Pause the instance — waits for the in-flight long step to finish, then pauses
228228
const pauseRes = await mf.dispatchFetch(
229229
"http://localhost/pause?id=pause-resume-test"
230230
);
231231
const pauseData = (await pauseRes.json()) as Record<string, unknown>;
232232
expect(pauseData).toHaveProperty("status");
233233

234-
// Send the event to unblock waitForEvent
235-
const eventRes = await mf.dispatchFetch(
236-
"http://localhost/sendEvent?id=pause-resume-test"
237-
);
238-
expect(((await eventRes.json()) as Record<string, unknown>).ok).toBe(true);
239-
240234
await waitForStatus(mf, "pause-resume-test", "paused");
241235

242236
// Resume the instance
@@ -246,7 +240,7 @@ describe("workflow instance lifecycle methods", () => {
246240
const resumeData = (await resumeRes.json()) as Record<string, unknown>;
247241
expect(resumeData).toHaveProperty("status");
248242

249-
// After resume, the workflow should complete (step-2 runs, then returns)
243+
// After resume, the workflow should complete (third step runs, then returns)
250244
const finalStatus = await waitForStatus(
251245
mf,
252246
"pause-resume-test",
@@ -299,17 +293,7 @@ describe("workflow instance lifecycle methods", () => {
299293
const restartData = (await restartRes.json()) as Record<string, unknown>;
300294
expect(restartData).toHaveProperty("status");
301295

302-
// After restart, the workflow restarts from scratch — wait for "running"
303-
await waitForStatus(mf, "restart-test", "running");
304-
305-
// Send the event to complete the restarted workflow
306-
// (workflow replays: first step is cached, then waitForEvent blocks again)
307-
await waitForStepOutput(mf, "restart-test", "step-1-done");
308-
const eventRes = await mf.dispatchFetch(
309-
"http://localhost/sendEvent?id=restart-test"
310-
);
311-
await eventRes.text(); // consume the body
312-
296+
// After restart, the workflow restarts from scratch and runs to completion
313297
const finalStatus = await waitForStatus(mf, "restart-test", "complete");
314298
expect(finalStatus.output).toBe("workflow-complete");
315299
});
Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
11
# `@cloudflare/workflows-shared`
22

3-
This is a package that is used at Cloudflare to power some internal features of [Cloudflare Workflows](https://developers.cloudflare.com/workflows/), as well as their open-source equivalents here in workers-sdk and Wrangler.
4-
5-
> [!NOTE]
6-
> Since code in this package is used by the Workflows infrastructure, it is important that PRs are given careful review with regards to how they could cause a failure in production.
7-
> Ideally, there should be comprehensive tests for changes being made to give extra confidence about the behavior.
3+
This package powers the local development experience for [Cloudflare Workflows](https://developers.cloudflare.com/workflows/) in workers-sdk and Wrangler.

packages/workflows-shared/src/binding.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers";
22
import { InstanceEvent, instanceStatusName } from "./instance";
33
import {
44
isAbortError,
5+
isUserTriggeredPause,
56
isUserTriggeredRestart,
67
isUserTriggeredTerminate,
78
WorkflowError,
@@ -175,7 +176,14 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
175176
}
176177

177178
public async pause(): Promise<void> {
178-
await this.stub.changeInstanceStatus("pause");
179+
try {
180+
await this.stub.changeInstanceStatus("pause");
181+
} catch (e) {
182+
// pause causes instance abortion
183+
if (!isUserTriggeredPause(e)) {
184+
throw e;
185+
}
186+
}
179187
}
180188

181189
public async resume(): Promise<void> {

packages/workflows-shared/src/context.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ export class Context extends RpcTarget {
6767
}
6868

6969
async #checkForPendingPause(): Promise<void> {
70+
if (this.#engine.timeoutHandler.isRunningStep()) {
71+
return;
72+
}
73+
7074
const status = await this.#engine.getStatus();
7175
if (status === InstanceStatus.WaitingForPause) {
7276
await this.#state.storage.put(PAUSE_DATETIME, new Date());
@@ -643,9 +647,6 @@ export class Context extends RpcTarget {
643647

644648
// @ts-expect-error priorityQueue is initiated in init
645649
this.#engine.priorityQueue.remove({ hash: cacheKey, type: "sleep" });
646-
647-
// Check if a pause was requested while this sleep was running
648-
await this.#checkForPendingPause();
649650
}
650651

651652
async sleepUntil(name: string, timestamp: Date | number): Promise<void> {
@@ -832,9 +833,6 @@ export class Context extends RpcTarget {
832833
throw error;
833834
});
834835

835-
// Check if a pause was requested while we were waiting for the event
836-
await this.#checkForPendingPause();
837-
838836
return result as WorkflowStepEvent<T>;
839837
}
840838
}

packages/workflows-shared/src/engine.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,7 @@ export class Engine extends DurableObject<Env> {
553553
const currentStatus = await this.getStatus();
554554
if (currentStatus === InstanceStatus.WaitingForPause) {
555555
// Engine is still running — cancel the pending pause
556+
this.timeoutHandler.cancelWaitingPromisesByType("pause");
556557
await this.setStatus(
557558
metadata.accountId,
558559
metadata.instance.id,
@@ -639,14 +640,21 @@ export class Engine extends DurableObject<Env> {
639640
);
640641
}
641642

642-
// Set to WaitingForPause — the engine keeps running.
643-
// The Context class will check for this between steps, store
644-
// PAUSE_DATETIME, transition to Paused, and then abort.
645643
await this.setStatus(
646644
metadata.accountId,
647645
metadata.instance.id,
648646
InstanceStatus.WaitingForPause
649647
);
648+
649+
void this.timeoutHandler.waitUntilNothingIsRunning("pause", async () => {
650+
await this.ctx.storage.put(PAUSE_DATETIME, new Date());
651+
await this.setStatus(
652+
metadata.accountId,
653+
metadata.instance.id,
654+
InstanceStatus.Paused
655+
);
656+
await this.abort(ABORT_REASONS.USER_PAUSE);
657+
});
650658
}
651659

652660
async userTriggeredRestart() {

0 commit comments

Comments
 (0)