Skip to content

Commit 8729f3d

Browse files
authored
[Workflows] Fix: implement workflows instance methods (#12881)
1 parent 3a1c149 commit 8729f3d

File tree

34 files changed

+3271
-340
lines changed

34 files changed

+3271
-340
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
"miniflare": minor
4+
---
5+
6+
Workflow instances now support pause, resume, restart, and terminate in local dev.
7+
8+
```js
9+
const instance = await env.MY_WORKFLOW.create({
10+
id: "my-instance",
11+
});
12+
13+
await instance.pause(); // pauses after the current step completes
14+
await instance.resume(); // resumes from where it left off
15+
await instance.restart(); // restarts the workflow from the beginning
16+
await instance.terminate(); // terminates the workflow immediately
17+
```
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@cloudflare/vitest-pool-workers": patch
3+
---
4+
5+
Workflows testing util `waitForStatus` now supports waiting for "terminated" and "paused" states.

fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { env, introspectWorkflow, SELF } from "cloudflare:test";
2-
import { it } from "vitest";
2+
import { describe, it } from "vitest";
33

44
const STATUS_COMPLETE = "complete";
55
const STEP_NAME = "AI content scan";
@@ -70,3 +70,96 @@ it("workflow batch should be able to reach the end and be successful", async ({
7070
await introspector.dispose();
7171
}
7272
});
73+
74+
describe("workflow instance lifecycle methods", () => {
75+
it("should terminate a workflow instance", async ({ expect }) => {
76+
// CONFIG:
77+
await using introspector = await introspectWorkflow(env.MODERATOR);
78+
await introspector.modifyAll(async (m) => {
79+
await m.disableSleeps();
80+
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
81+
});
82+
83+
const res = await SELF.fetch("https://mock-worker.local/moderate");
84+
const data = (await res.json()) as { id: string; details: unknown };
85+
86+
const instances = introspector.get();
87+
expect(instances.length).toBe(1);
88+
const instance = instances[0];
89+
90+
expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
91+
violationScore: 50,
92+
});
93+
94+
const handle = await env.MODERATOR.get(data.id);
95+
await handle.terminate();
96+
97+
// ASSERTIONS:
98+
await expect(instance.waitForStatus("terminated")).resolves.not.toThrow();
99+
100+
// DISPOSE: ensured by `await using`
101+
});
102+
103+
it("should restart a workflow instance", async ({ expect }) => {
104+
// CONFIG:
105+
await using introspector = await introspectWorkflow(env.MODERATOR);
106+
await introspector.modifyAll(async (m) => {
107+
await m.disableSleeps();
108+
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
109+
await m.mockEvent({
110+
type: "moderation-decision",
111+
payload: { moderatorAction: "approve" },
112+
});
113+
});
114+
115+
const res = await SELF.fetch("https://mock-worker.local/moderate");
116+
const data = (await res.json()) as { id: string; details: unknown };
117+
118+
const instances = introspector.get();
119+
expect(instances.length).toBe(1);
120+
const instance = instances[0];
121+
122+
expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
123+
violationScore: 50,
124+
});
125+
126+
const handle = await env.MODERATOR.get(data.id);
127+
await handle.restart();
128+
129+
// Mocks survive instace restart, so the restarted workflow re-runs
130+
// with the same config
131+
await expect(
132+
instance.waitForStatus(STATUS_COMPLETE)
133+
).resolves.not.toThrow();
134+
135+
// DISPOSE: ensured by `await using`
136+
});
137+
138+
it("should pause a workflow instance", async ({ expect }) => {
139+
// CONFIG:
140+
await using introspector = await introspectWorkflow(env.MODERATOR);
141+
await introspector.modifyAll(async (m) => {
142+
await m.disableSleeps();
143+
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
144+
});
145+
146+
const res = await SELF.fetch("https://mock-worker.local/moderate");
147+
const data = (await res.json()) as { id: string; details: unknown };
148+
149+
const instances = introspector.get();
150+
expect(instances.length).toBe(1);
151+
const instance = instances[0];
152+
153+
expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
154+
violationScore: 50,
155+
});
156+
157+
const handle = await env.MODERATOR.get(data.id);
158+
await handle.pause();
159+
160+
// ASSERTIONS:
161+
await expect(instance.waitForStatus("paused")).resolves.not.toThrow();
162+
163+
// DISPOSE: ensured by `await using`
164+
});
165+
});

fixtures/workflow-multiple/src/index.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,32 @@ export class Demo2 extends WorkflowEntrypoint<{}, Params> {
5757
}
5858
}
5959

60+
export class Demo3 extends WorkflowEntrypoint<{}, Params> {
61+
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
62+
const result = await step.do("First step", async function () {
63+
return {
64+
output: "First step result",
65+
};
66+
});
67+
68+
await step.waitForEvent("wait for signal", {
69+
type: "continue",
70+
});
71+
72+
const result2 = await step.do("Second step", async function () {
73+
return {
74+
output: "workflow3",
75+
};
76+
});
77+
78+
return "i'm workflow3";
79+
}
80+
}
81+
6082
type Env = {
6183
WORKFLOW: Workflow;
6284
WORKFLOW2: Workflow;
85+
WORKFLOW3: Workflow;
6386
};
6487

6588
export default class extends WorkerEntrypoint<Env> {
@@ -71,8 +94,15 @@ export default class extends WorkerEntrypoint<Env> {
7194
if (url.pathname === "/favicon.ico") {
7295
return new Response(null, { status: 404 });
7396
}
74-
let workflowToUse =
75-
workflowName == "2" ? this.env.WORKFLOW2 : this.env.WORKFLOW;
97+
98+
let workflowToUse: Workflow;
99+
if (workflowName === "3") {
100+
workflowToUse = this.env.WORKFLOW3;
101+
} else if (workflowName === "2") {
102+
workflowToUse = this.env.WORKFLOW2;
103+
} else {
104+
workflowToUse = this.env.WORKFLOW;
105+
}
76106

77107
let handle: WorkflowInstance;
78108
if (url.pathname === "/create") {
@@ -81,6 +111,25 @@ export default class extends WorkerEntrypoint<Env> {
81111
} else {
82112
handle = await workflowToUse.create({ id });
83113
}
114+
} else if (url.pathname === "/pause") {
115+
handle = await workflowToUse.get(id);
116+
await handle.pause();
117+
} else if (url.pathname === "/resume") {
118+
handle = await workflowToUse.get(id);
119+
await handle.resume();
120+
} else if (url.pathname === "/restart") {
121+
handle = await workflowToUse.get(id);
122+
await handle.restart();
123+
} else if (url.pathname === "/terminate") {
124+
handle = await workflowToUse.get(id);
125+
await handle.terminate();
126+
} else if (url.pathname === "/sendEvent") {
127+
handle = await workflowToUse.get(id);
128+
await handle.sendEvent({
129+
type: "continue",
130+
payload: await req.json(),
131+
});
132+
return Response.json({ ok: true });
84133
} else {
85134
handle = await workflowToUse.get(id);
86135
}

fixtures/workflow-multiple/tests/index.test.ts

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import { randomUUID } from "crypto";
12
import { rm } from "fs/promises";
23
import { resolve } from "path";
34
import { fetch } from "undici";
4-
import { afterAll, beforeAll, describe, it, vi } from "vitest";
5+
import { afterAll, beforeAll, describe, it, test, vi } from "vitest";
56
import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived";
67

78
describe("Workflows", () => {
@@ -26,11 +27,13 @@ describe("Workflows", () => {
2627
await stop?.();
2728
});
2829

29-
async function fetchJson(url: string) {
30+
async function fetchJson(url: string, body?: unknown, method?: string) {
3031
const response = await fetch(url, {
3132
headers: {
3233
"MF-Disable-Pretty-Error": "1",
3334
},
35+
method: method ?? "GET",
36+
body: body !== undefined ? JSON.stringify(body) : undefined,
3437
});
3538
const text = await response.text();
3639

@@ -92,4 +95,150 @@ describe("Workflows", () => {
9295
),
9396
]);
9497
});
98+
99+
describe("instance lifecycle methods (workflow3)", () => {
100+
test("pause and resume a workflow", async ({ expect }) => {
101+
const id = randomUUID();
102+
103+
await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);
104+
105+
await vi.waitFor(
106+
async () => {
107+
const result = (await fetchJson(
108+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
109+
)) as {
110+
status: {
111+
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
112+
};
113+
};
114+
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
115+
output: "First step result",
116+
});
117+
},
118+
{ timeout: 5000 }
119+
);
120+
121+
// Pause the instance
122+
await fetchJson(`http://${ip}:${port}/pause?workflowName=3&id=${id}`);
123+
124+
await vi.waitFor(
125+
async () => {
126+
const result = (await fetchJson(
127+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
128+
)) as { status: { status: string } };
129+
expect(result.status.status).toBe("paused");
130+
},
131+
{ timeout: 5000 }
132+
);
133+
134+
// Resume the instance
135+
await fetchJson(`http://${ip}:${port}/resume?workflowName=3&id=${id}`);
136+
137+
await fetchJson(
138+
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
139+
{ done: true },
140+
"POST"
141+
);
142+
143+
await vi.waitFor(
144+
async () => {
145+
const result = (await fetchJson(
146+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
147+
)) as { status: { status: string; output: string } };
148+
expect(result.status.status).toBe("complete");
149+
expect(result.status.output).toBe("i'm workflow3");
150+
},
151+
{ timeout: 5000 }
152+
);
153+
});
154+
155+
test("terminate a running workflow", async ({ expect }) => {
156+
const id = randomUUID();
157+
158+
await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);
159+
160+
await vi.waitFor(
161+
async () => {
162+
const result = (await fetchJson(
163+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
164+
)) as {
165+
status: {
166+
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
167+
};
168+
};
169+
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
170+
output: "First step result",
171+
});
172+
},
173+
{ timeout: 5000 }
174+
);
175+
176+
// Terminate
177+
await fetchJson(`http://${ip}:${port}/terminate?workflowName=3&id=${id}`);
178+
179+
await vi.waitFor(
180+
async () => {
181+
const result = (await fetchJson(
182+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
183+
)) as { status: { status: string } };
184+
expect(result.status.status).toBe("terminated");
185+
},
186+
{ timeout: 5000 }
187+
);
188+
});
189+
190+
test("restart a running workflow", async ({ expect }) => {
191+
const id = randomUUID();
192+
193+
await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);
194+
195+
await vi.waitFor(
196+
async () => {
197+
const result = (await fetchJson(
198+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
199+
)) as {
200+
status: {
201+
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
202+
};
203+
};
204+
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
205+
output: "First step result",
206+
});
207+
},
208+
{ timeout: 5000 }
209+
);
210+
211+
// Restart the instance
212+
await fetchJson(`http://${ip}:${port}/restart?workflowName=3&id=${id}`);
213+
214+
// After restart, wait for it to be running again
215+
await vi.waitFor(
216+
async () => {
217+
const result = (await fetchJson(
218+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
219+
)) as { status: { status: string } };
220+
expect(result.status.status).toBe("running");
221+
},
222+
{ timeout: 5000 }
223+
);
224+
225+
// Send event to complete the restarted workflow
226+
await fetchJson(
227+
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
228+
{ done: true },
229+
"POST"
230+
);
231+
232+
await vi.waitFor(
233+
async () => {
234+
const result = (await fetchJson(
235+
`http://${ip}:${port}/status?workflowName=3&id=${id}`
236+
)) as { status: { status: string; output: string } };
237+
expect(result.status.status).toBe("complete");
238+
expect(result.status.output).toBe("i'm workflow3");
239+
},
240+
{ timeout: 5000 }
241+
);
242+
});
243+
});
95244
});

fixtures/workflow-multiple/wrangler.jsonc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,10 @@
1313
"name": "my-workflow-2",
1414
"class_name": "Demo2",
1515
},
16+
{
17+
"binding": "WORKFLOW3",
18+
"name": "my-workflow-3",
19+
"class_name": "Demo3",
20+
},
1621
],
1722
}

0 commit comments

Comments
 (0)