Skip to content

Commit a2879e1

Browse files
LuisDuarte1edmundhung
authored andcommitted
backport workflow changes from #8775
1 parent aa8770f commit a2879e1

File tree

10 files changed

+545
-9
lines changed

10 files changed

+545
-9
lines changed

.changeset/sweet-clocks-take.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@cloudflare/workflows-shared": patch
3+
---
4+
5+
Add `waitForEvent` and `sendEvent` support for local dev

fixtures/workflow/src/index.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
WorkerEntrypoint,
3+
WorkflowBackoff,
34
WorkflowEntrypoint,
45
WorkflowEvent,
56
WorkflowStep,
@@ -31,8 +32,33 @@ export class Demo extends WorkflowEntrypoint<{}, Params> {
3132
}
3233
}
3334

35+
export class Demo2 extends WorkflowEntrypoint<{}, Params> {
36+
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
37+
const { timestamp, payload } = event;
38+
39+
const result = await step.do("First step", async function () {
40+
return {
41+
output: "First step result",
42+
};
43+
});
44+
45+
await step.waitForEvent("event-1 provider", {
46+
type: "event-1",
47+
});
48+
49+
const result2 = await step.do("Second step", async function () {
50+
return {
51+
output: "Second step result",
52+
};
53+
});
54+
55+
return payload ?? "no-payload";
56+
}
57+
}
58+
3459
type Env = {
3560
WORKFLOW: Workflow;
61+
WORKFLOW2: Workflow;
3662
};
3763
export default class extends WorkerEntrypoint<Env> {
3864
async fetch(req: Request) {
@@ -43,13 +69,30 @@ export default class extends WorkerEntrypoint<Env> {
4369
return new Response(null, { status: 404 });
4470
}
4571

72+
console.log(url.pathname);
4673
let handle: WorkflowInstance;
4774
if (url.pathname === "/create") {
4875
if (id === null) {
4976
handle = await this.env.WORKFLOW.create();
5077
} else {
5178
handle = await this.env.WORKFLOW.create({ id });
5279
}
80+
} else if (url.pathname === "/createDemo2") {
81+
console.log("I'm here", id);
82+
if (id === null) {
83+
handle = await this.env.WORKFLOW2.create();
84+
} else {
85+
handle = await this.env.WORKFLOW2.create({ id });
86+
}
87+
} else if (url.pathname === "/sendEvent") {
88+
handle = await this.env.WORKFLOW2.get(id);
89+
90+
await handle.sendEvent({
91+
type: "event-1",
92+
payload: await req.json(),
93+
});
94+
} else if (url.pathname === "/get2") {
95+
handle = await this.env.WORKFLOW2.get(id);
5396
} else {
5497
handle = await this.env.WORKFLOW.get(id);
5598
}

fixtures/workflow/tests/index.test.ts

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { rm } from "fs/promises";
22
import { resolve } from "path";
33
import { fetch } from "undici";
4-
import { afterAll, beforeAll, describe, it, vi } from "vitest";
4+
import { afterAll, beforeAll, describe, it, test, vi } from "vitest";
55
import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived";
66

77
describe("Workflows", () => {
@@ -26,11 +26,13 @@ describe("Workflows", () => {
2626
await stop?.();
2727
});
2828

29-
async function fetchJson(url: string) {
29+
async function fetchJson(url: string, body?: unknown, method?: string) {
3030
const response = await fetch(url, {
3131
headers: {
3232
"MF-Disable-Pretty-Error": "1",
3333
},
34+
method: method ?? "GET",
35+
body: body !== undefined ? JSON.stringify(body) : undefined,
3436
});
3537
const text = await response.text();
3638

@@ -98,4 +100,84 @@ describe("Workflows", () => {
98100
name: "Error",
99101
});
100102
});
103+
104+
test("batchCreate should create multiple instances and run them seperatly", async ({
105+
expect,
106+
}) => {
107+
await expect(fetchJson(`http://${ip}:${port}/createBatch`)).resolves
108+
.toMatchInlineSnapshot(`
109+
[
110+
"batch-1",
111+
"batch-2",
112+
]
113+
`);
114+
115+
await Promise.all([
116+
vi.waitFor(
117+
async () => {
118+
await expect(
119+
fetchJson(`http://${ip}:${port}/status?workflowName=batch-1`)
120+
).resolves.toStrictEqual({
121+
status: "complete",
122+
__LOCAL_DEV_STEP_OUTPUTS: [
123+
{ output: "First step result" },
124+
{ output: "Second step result" },
125+
],
126+
output: "1",
127+
});
128+
},
129+
{ timeout: 5000 }
130+
),
131+
vi.waitFor(
132+
async () => {
133+
await expect(
134+
fetchJson(`http://${ip}:${port}/status?workflowName=batch-2`)
135+
).resolves.toStrictEqual({
136+
status: "complete",
137+
__LOCAL_DEV_STEP_OUTPUTS: [
138+
{ output: "First step result" },
139+
{ output: "Second step result" },
140+
],
141+
output: "2",
142+
});
143+
},
144+
{ timeout: 5000 }
145+
),
146+
]);
147+
});
148+
149+
test("waitForEvent should work", async ({ expect }) => {
150+
await fetchJson(`http://${ip}:${port}/createDemo2?workflowName=something`);
151+
152+
await fetchJson(
153+
`http://${ip}:${port}/sendEvent?workflowName=something`,
154+
{ event: true },
155+
"POST"
156+
);
157+
158+
await vi.waitFor(
159+
async () => {
160+
await expect(
161+
fetchJson(`http://${ip}:${port}/get2?workflowName=something`)
162+
).resolves.toMatchInlineSnapshot(`
163+
{
164+
"__LOCAL_DEV_STEP_OUTPUTS": [
165+
{
166+
"output": "First step result",
167+
},
168+
{
169+
"event": true,
170+
},
171+
{
172+
"output": "Second step result",
173+
},
174+
],
175+
"output": {},
176+
"status": "complete",
177+
}
178+
`);
179+
},
180+
{ timeout: 5000 }
181+
);
182+
});
101183
});

fixtures/workflow/wrangler.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,9 @@ compatibility_date = "2024-10-22"
77
binding = "WORKFLOW"
88
name = "my-workflow"
99
class_name = "Demo"
10+
11+
12+
[[workflows]]
13+
binding = "WORKFLOW2"
14+
name = "my-workflow2"
15+
class_name = "Demo2"

packages/workflows-shared/src/binding.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
4141
terminate: handle.terminate.bind(handle),
4242
restart: handle.restart.bind(handle),
4343
status: handle.status.bind(handle),
44+
sendEvent: handle.sendEvent.bind(handle),
4445
};
4546
}
4647

@@ -63,6 +64,7 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
6364
terminate: handle.terminate.bind(handle),
6465
restart: handle.restart.bind(handle),
6566
status: handle.status.bind(handle),
67+
sendEvent: handle.sendEvent.bind(handle),
6668
};
6769
}
6870
public async createBatch(
@@ -113,9 +115,16 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
113115
.at(0);
114116

115117
const filteredLogs = logs.filter(
116-
(log) => log.event === InstanceEvent.STEP_SUCCESS
118+
(log) =>
119+
log.event === InstanceEvent.STEP_SUCCESS ||
120+
log.event === InstanceEvent.WAIT_COMPLETE
121+
);
122+
123+
const stepOutputs = filteredLogs.map((log) =>
124+
log.event === InstanceEvent.STEP_SUCCESS
125+
? log.metadata.result
126+
: log.metadata.payload
117127
);
118-
const stepOutputs = filteredLogs.map((log) => log.metadata.result);
119128

120129
const workflowOutput =
121130
workflowSuccessEvent !== undefined
@@ -129,4 +138,15 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance {
129138
output: workflowOutput,
130139
}; // output, error
131140
}
141+
142+
public async sendEvent(args: {
143+
payload: unknown;
144+
type: string;
145+
}): Promise<void> {
146+
await this.stub.receiveEvent({
147+
payload: args.payload,
148+
type: args.type,
149+
timestamp: new Date(),
150+
});
151+
}
132152
}

0 commit comments

Comments
 (0)