Skip to content

Commit 32ee755

Browse files
committed
[Workflows] Implement Workflows instance methods
1 parent 4bdb35e commit 32ee755

File tree

29 files changed

+2422
-163
lines changed

29 files changed

+2422
-163
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
---
4+
5+
Implement pause, resume, restart, and terminate instance lifecycle methods.
6+
7+
```js
8+
const handle = await env.MY_WORKFLOW.create({
9+
id: "my-instance",
10+
params: { input: "data" },
11+
});
12+
13+
await handle.pause(); // pauses after the current step completes
14+
await handle.resume(); // resumes from where it left off
15+
await handle.restart(); // restarts the workflow from the beginning
16+
await handle.terminate(); // terminates the workflow immediately
17+
```
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"miniflare": minor
3+
---
4+
5+
Support Workflow instance pause, resume, restart, and terminate in local dev.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@cloudflare/vitest-pool-workers": patch
3+
---
4+
5+
Allow `waitForStatus` to wait for "terminated" and "paused" states.
6+
7+
Removed the guard in `waitForStatus` that rejected "terminated" and "paused" as valid target states, preventing tests from waiting on workflow instances that were paused or terminated.

fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { env, introspectWorkflow, SELF } from "cloudflare:test";
2-
import { it } from "vitest";
2+
import { describe, it } from "vitest";
33

44
const STATUS_COMPLETE = "complete";
55
const STEP_NAME = "AI content scan";
@@ -70,3 +70,101 @@ it("workflow batch should be able to reach the end and be successful", async ({
7070
await introspector.dispose();
7171
}
7272
});
73+
74+
describe("workflow instance lifecycle methods", () => {
75+
it("should terminate a workflow instance", async ({ expect }) => {
76+
// CONFIG:
77+
await using introspector = await introspectWorkflow(env.MODERATOR);
78+
await introspector.modifyAll(async (m) => {
79+
await m.disableSleeps();
80+
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
81+
});
82+
83+
const res = await SELF.fetch("https://mock-worker.local/moderate");
84+
const data = (await res.json()) as { id: string; details: unknown };
85+
86+
const instances = introspector.get();
87+
expect(instances.length).toBe(1);
88+
const instance = instances[0];
89+
90+
expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
91+
violationScore: 50,
92+
});
93+
94+
const handle = await env.MODERATOR.get(data.id);
95+
await handle.terminate();
96+
97+
// ASSERTIONS:
98+
await expect(instance.waitForStatus("terminated")).resolves.not.toThrow();
99+
100+
// DISPOSE: ensured by `await using`
101+
});
102+
103+
it("should restart a workflow instance", async ({ expect }) => {
104+
// CONFIG:
105+
await using introspector = await introspectWorkflow(env.MODERATOR);
106+
await introspector.modifyAll(async (m) => {
107+
await m.disableSleeps();
108+
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
109+
await m.mockEvent({
110+
type: "moderation-decision",
111+
payload: { moderatorAction: "approve" },
112+
});
113+
});
114+
115+
const res = await SELF.fetch("https://mock-worker.local/moderate");
116+
const data = (await res.json()) as { id: string; details: unknown };
117+
118+
const instances = introspector.get();
119+
expect(instances.length).toBe(1);
120+
const instance = instances[0];
121+
122+
expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
123+
violationScore: 50,
124+
});
125+
126+
const handle = await env.MODERATOR.get(data.id);
127+
await handle.restart();
128+
129+
// Mocks survive instace restart, so the restarted workflow re-runs
130+
// with the same config
131+
await expect(
132+
instance.waitForStatus(STATUS_COMPLETE)
133+
).resolves.not.toThrow();
134+
135+
// DISPOSE: ensured by `await using`
136+
});
137+
138+
it("should pause a workflow instance", async ({ expect }) => {
139+
// CONFIG:
140+
await using introspector = await introspectWorkflow(env.MODERATOR);
141+
await introspector.modifyAll(async (m) => {
142+
await m.disableSleeps();
143+
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
144+
});
145+
146+
const res = await SELF.fetch("https://mock-worker.local/moderate");
147+
const data = (await res.json()) as { id: string; details: unknown };
148+
149+
const instances = introspector.get();
150+
expect(instances.length).toBe(1);
151+
const instance = instances[0];
152+
153+
expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
154+
violationScore: 50,
155+
});
156+
157+
const handle = await env.MODERATOR.get(data.id);
158+
await handle.pause();
159+
160+
await handle.sendEvent({
161+
type: "moderation-decision",
162+
payload: { moderatorAction: "approve" },
163+
});
164+
165+
// ASSERTIONS:
166+
await expect(instance.waitForStatus("paused")).resolves.not.toThrow();
167+
168+
// DISPOSE: ensured by `await using`
169+
});
170+
});

fixtures/workflow-multiple/src/index.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,32 @@ export class Demo2 extends WorkflowEntrypoint<{}, Params> {
5757
}
5858
}
5959

60+
export class Demo3 extends WorkflowEntrypoint<{}, Params> {
61+
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
62+
const result = await step.do("First step", async function () {
63+
return {
64+
output: "First step result",
65+
};
66+
});
67+
68+
await step.waitForEvent("wait for signal", {
69+
type: "continue",
70+
});
71+
72+
const result2 = await step.do("Second step", async function () {
73+
return {
74+
output: "workflow3",
75+
};
76+
});
77+
78+
return "i'm workflow3";
79+
}
80+
}
81+
6082
type Env = {
6183
WORKFLOW: Workflow;
6284
WORKFLOW2: Workflow;
85+
WORKFLOW3: Workflow;
6386
};
6487

6588
export default class extends WorkerEntrypoint<Env> {
@@ -71,8 +94,15 @@ export default class extends WorkerEntrypoint<Env> {
7194
if (url.pathname === "/favicon.ico") {
7295
return new Response(null, { status: 404 });
7396
}
74-
let workflowToUse =
75-
workflowName == "2" ? this.env.WORKFLOW2 : this.env.WORKFLOW;
97+
98+
let workflowToUse: Workflow;
99+
if (workflowName === "3") {
100+
workflowToUse = this.env.WORKFLOW3;
101+
} else if (workflowName === "2") {
102+
workflowToUse = this.env.WORKFLOW2;
103+
} else {
104+
workflowToUse = this.env.WORKFLOW;
105+
}
76106

77107
let handle: WorkflowInstance;
78108
if (url.pathname === "/create") {
@@ -81,6 +111,25 @@ export default class extends WorkerEntrypoint<Env> {
81111
} else {
82112
handle = await workflowToUse.create({ id });
83113
}
114+
} else if (url.pathname === "/pause") {
115+
handle = await workflowToUse.get(id);
116+
await handle.pause();
117+
} else if (url.pathname === "/resume") {
118+
handle = await workflowToUse.get(id);
119+
await handle.resume();
120+
} else if (url.pathname === "/restart") {
121+
handle = await workflowToUse.get(id);
122+
await handle.restart();
123+
} else if (url.pathname === "/terminate") {
124+
handle = await workflowToUse.get(id);
125+
await handle.terminate();
126+
} else if (url.pathname === "/sendEvent") {
127+
handle = await workflowToUse.get(id);
128+
await handle.sendEvent({
129+
type: "continue",
130+
payload: await req.json(),
131+
});
132+
return Response.json({ ok: true });
84133
} else {
85134
handle = await workflowToUse.get(id);
86135
}

fixtures/workflow-multiple/tests/index.test.ts

Lines changed: 152 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { randomUUID } from "crypto";
12
import { rm } from "fs/promises";
23
import { resolve } from "path";
34
import { fetch } from "undici";
4-
import { afterAll, beforeAll, describe, it, vi } from "vitest";
5+
import { afterAll, beforeAll, describe, it, test, vi } from "vitest";
56
import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived";
67

78
describe("Workflows", () => {
@@ -26,11 +27,13 @@ describe("Workflows", () => {
2627
await stop?.();
2728
});
2829

29-
async function fetchJson(url: string) {
30+
async function fetchJson(url: string, body?: unknown, method?: string) {
3031
const response = await fetch(url, {
3132
headers: {
3233
"MF-Disable-Pretty-Error": "1",
3334
},
35+
method: method ?? "GET",
36+
body: body !== undefined ? JSON.stringify(body) : undefined,
3437
});
3538
const text = await response.text();
3639

@@ -92,4 +95,151 @@ describe("Workflows", () => {
9295
),
9396
]);
9497
});
98+
99+
describe("instance lifecycle methods (workflow3)", () => {
100+
test("pause and resume a workflow", async ({ expect }) => {
101+
const id = randomUUID();
102+
103+
await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);
104+
105+
await vi.waitFor(
106+
async () => {
107+
const result = (await fetchJson(
108+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
109+
)) as {
110+
status: {
111+
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
112+
};
113+
};
114+
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
115+
output: "First step result",
116+
});
117+
},
118+
{ timeout: 5000 }
119+
);
120+
121+
// Pause the instance
122+
await fetchJson(`http://${ip}:${port}/pause?workflowName=3&id=${id}`);
123+
124+
// Send event to unblock waitForEvent
125+
await fetchJson(
126+
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
127+
{ done: true },
128+
"POST"
129+
);
130+
131+
await vi.waitFor(
132+
async () => {
133+
const result = (await fetchJson(
134+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
135+
)) as { status: { status: string } };
136+
expect(result.status.status).toBe("paused");
137+
},
138+
{ timeout: 5000 }
139+
);
140+
141+
// Resume the instance
142+
await fetchJson(`http://${ip}:${port}/resume?workflowName=3&id=${id}`);
143+
144+
await vi.waitFor(
145+
async () => {
146+
const result = (await fetchJson(
147+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
148+
)) as { status: { status: string; output: string } };
149+
expect(result.status.status).toBe("complete");
150+
expect(result.status.output).toBe("i'm workflow3");
151+
},
152+
{ timeout: 5000 }
153+
);
154+
});
155+
156+
test("terminate a running workflow", async ({ expect }) => {
157+
const id = randomUUID();
158+
159+
await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);
160+
161+
await vi.waitFor(
162+
async () => {
163+
const result = (await fetchJson(
164+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
165+
)) as {
166+
status: {
167+
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
168+
};
169+
};
170+
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
171+
output: "First step result",
172+
});
173+
},
174+
{ timeout: 5000 }
175+
);
176+
177+
// Terminate
178+
await fetchJson(`http://${ip}:${port}/terminate?workflowName=3&id=${id}`);
179+
180+
await vi.waitFor(
181+
async () => {
182+
const result = (await fetchJson(
183+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
184+
)) as { status: { status: string } };
185+
expect(result.status.status).toBe("terminated");
186+
},
187+
{ timeout: 5000 }
188+
);
189+
});
190+
191+
test("restart a running workflow", async ({ expect }) => {
192+
const id = randomUUID();
193+
194+
await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);
195+
196+
await vi.waitFor(
197+
async () => {
198+
const result = (await fetchJson(
199+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
200+
)) as {
201+
status: {
202+
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
203+
};
204+
};
205+
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
206+
output: "First step result",
207+
});
208+
},
209+
{ timeout: 5000 }
210+
);
211+
212+
// Restart the instance
213+
await fetchJson(`http://${ip}:${port}/restart?workflowName=3&id=${id}`);
214+
215+
// After restart, wait for it to be running again
216+
await vi.waitFor(
217+
async () => {
218+
const result = (await fetchJson(
219+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
220+
)) as { status: { status: string } };
221+
expect(result.status.status).toBe("running");
222+
},
223+
{ timeout: 5000 }
224+
);
225+
226+
// Send event to complete the restarted workflow
227+
await fetchJson(
228+
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
229+
{ done: true },
230+
"POST"
231+
);
232+
233+
await vi.waitFor(
234+
async () => {
235+
const result = (await fetchJson(
236+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
237+
)) as { status: { status: string; output: string } };
238+
expect(result.status.status).toBe("complete");
239+
expect(result.status.output).toBe("i'm workflow3");
240+
},
241+
{ timeout: 5000 }
242+
);
243+
});
244+
});
95245
});

0 commit comments

Comments
 (0)