Skip to content

Commit d4c5984

Browse files
committed
[Workflows] Add step context with attempt count
1 parent 27ea230 commit d4c5984

File tree

6 files changed

+169
-106
lines changed

6 files changed

+169
-106
lines changed

.changeset/add-step-context.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
---
4+
5+
Adds step context with attempt count to step.do() callbacks.
6+
7+
Workflow step callbacks now receive a context object containing the current attempt number (1-indexed).
8+
This allows developers to access which retry attempt is currently executing.
9+
10+
Example:
11+
12+
```ts
13+
await step.do("my-step", async (ctx) => {
14+
// ctx.attempt is 1 on first try, 2 on first retry, etc.
15+
console.log(`Attempt ${ctx.attempt}`);
16+
});
17+
```

packages/workflows-shared/src/context.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ export type StepState = {
4242
attemptedCount: number;
4343
};
4444

45+
export type WorkflowStepContext = {
46+
attempt: number;
47+
};
48+
4549
export class Context extends RpcTarget {
4650
#engine: Engine;
4751
#state: DurableObjectState;
@@ -63,25 +67,30 @@ export class Context extends RpcTarget {
6367
return val;
6468
}
6569

66-
do(name: string, callback: () => Promise<unknown>): Promise<unknown>;
70+
do(
71+
name: string,
72+
callback: (ctx: WorkflowStepContext) => Promise<unknown>
73+
): Promise<unknown>;
6774
do(
6875
name: string,
6976
config: WorkflowStepConfig,
70-
callback: () => Promise<unknown>
77+
callback: (ctx: WorkflowStepContext) => Promise<unknown>
7178
): Promise<unknown>;
7279

7380
async do<T>(
7481
name: string,
75-
configOrCallback: WorkflowStepConfig | (() => Promise<T>),
76-
callback?: () => Promise<T>
82+
configOrCallback:
83+
| WorkflowStepConfig
84+
| ((ctx: WorkflowStepContext) => Promise<T>),
85+
callback?: (ctx: WorkflowStepContext) => Promise<T>
7786
): Promise<unknown | void | undefined> {
78-
let closure, stepConfig;
87+
let closure: (ctx: WorkflowStepContext) => Promise<T>, stepConfig;
7988
// If a user passes in a config, we'd like it to be the second arg so the callback is always last
8089
if (callback) {
8190
closure = callback;
8291
stepConfig = configOrCallback as WorkflowStepConfig;
8392
} else {
84-
closure = configOrCallback as () => Promise<T>;
93+
closure = configOrCallback as (ctx: WorkflowStepContext) => Promise<T>;
8594
stepConfig = {};
8695
}
8796

@@ -192,7 +201,7 @@ export class Context extends RpcTarget {
192201
}
193202

194203
const doWrapper = async (
195-
doWrapperClosure: () => Promise<unknown>
204+
doWrapperClosure: (ctx: WorkflowStepContext) => Promise<unknown>
196205
): Promise<unknown | void | undefined> => {
197206
const stepState = ((await this.#state.storage.get(
198207
stepStateKey
@@ -314,7 +323,10 @@ export class Context extends RpcTarget {
314323
await this.#state.storage.delete(`replace-result-${valueKey}`);
315324
// if there is a timeout to be forced we dont want to race with closure
316325
} else {
317-
result = await Promise.race([doWrapperClosure(), timeoutPromise()]);
326+
result = await Promise.race([
327+
doWrapperClosure({ attempt: stepState.attemptedCount }),
328+
timeoutPromise(),
329+
]);
318330
}
319331

320332
// if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ

packages/workflows-shared/tests/binding.test.ts

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,7 @@ import {
55
} from "cloudflare:test";
66
import { describe, expect, it, vi } from "vitest";
77
import { WorkflowBinding } from "../src/binding";
8-
import type { Engine } from "../src/engine";
9-
import type { ProvidedEnv } from "cloudflare:test";
10-
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";
11-
12-
async function setWorkflowEntrypoint(
13-
stub: DurableObjectStub<Engine>,
14-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
15-
) {
16-
const ctx = createExecutionContext();
17-
await runInDurableObject(stub, (instance) => {
18-
// @ts-expect-error this is only a stub for WorkflowEntrypoint
19-
instance.env.USER_WORKFLOW = new (class {
20-
constructor(
21-
// eslint-disable-next-line @typescript-eslint/no-shadow
22-
protected ctx: ExecutionContext,
23-
// eslint-disable-next-line @typescript-eslint/no-shadow
24-
protected env: ProvidedEnv
25-
) {}
26-
public async run(
27-
event: Readonly<WorkflowEvent<unknown>>,
28-
step: WorkflowStep
29-
): Promise<unknown> {
30-
return await callback(event, step);
31-
}
32-
})(ctx, env);
33-
});
34-
}
8+
import { setWorkflowEntrypoint } from "./utils";
359

3610
describe("WorkflowBinding", () => {
3711
it("should not call dispose when sending an event to an instance", async () => {
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { describe, expect, it } from "vitest";
2+
import { runWorkflow } from "./utils";
3+
4+
describe("Context", () => {
5+
it("should provide attempt count 1 on first successful attempt", async () => {
6+
let receivedAttempt: number | undefined;
7+
8+
const engineStub = await runWorkflow(
9+
"MOCK-INSTANCE-ID",
10+
async (_event, step) => {
11+
// TODO: remove after types are updated
12+
// @ts-expect-error WorkflowStep types
13+
const result = await step.do("a successful step", async (ctx) => {
14+
receivedAttempt = ctx.attempt;
15+
return "success";
16+
});
17+
return result;
18+
}
19+
);
20+
21+
expect(receivedAttempt).toBe(1);
22+
});
23+
24+
it("should provide attempt count to callback", async () => {
25+
const receivedAttempts: number[] = [];
26+
27+
const engineStub = await runWorkflow(
28+
"MOCK-INSTANCE-ID",
29+
async (_event, step) => {
30+
const result = await step.do(
31+
"retrying step",
32+
{
33+
retries: {
34+
limit: 2,
35+
delay: 0,
36+
},
37+
},
38+
// TODO: remove after types are updated
39+
// @ts-expect-error WorkflowStep types
40+
async (ctx) => {
41+
receivedAttempts.push(ctx.attempt);
42+
throw new Error(`Throwing`);
43+
}
44+
);
45+
return result;
46+
}
47+
);
48+
49+
// Should have received attempts 1, 2, and 3
50+
expect(receivedAttempts).toEqual([1, 2, 3]);
51+
});
52+
});

packages/workflows-shared/tests/engine.test.ts

Lines changed: 2 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,15 @@
1-
import {
2-
createExecutionContext,
3-
env,
4-
runInDurableObject,
5-
} from "cloudflare:test";
1+
import { env, runInDurableObject } from "cloudflare:test";
62
import { NonRetryableError } from "cloudflare:workflows";
73
import { describe, expect, it, vi } from "vitest";
84
import { InstanceEvent, InstanceStatus } from "../src";
5+
import { runWorkflow, runWorkflowDefer } from "./utils";
96
import type {
107
DatabaseInstance,
118
DatabaseVersion,
129
DatabaseWorkflow,
1310
Engine,
1411
EngineLogs,
1512
} from "../src/engine";
16-
import type { ProvidedEnv } from "cloudflare:test";
17-
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";
18-
19-
async function setWorkflowEntrypoint(
20-
stub: DurableObjectStub<Engine>,
21-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
22-
) {
23-
const ctx = createExecutionContext();
24-
await runInDurableObject(stub, (instance) => {
25-
// @ts-expect-error this is only a stub for WorkflowEntrypoint
26-
instance.env.USER_WORKFLOW = new (class {
27-
constructor(
28-
// eslint-disable-next-line @typescript-eslint/no-shadow
29-
protected ctx: ExecutionContext,
30-
// eslint-disable-next-line @typescript-eslint/no-shadow
31-
protected env: ProvidedEnv
32-
) {}
33-
public async run(
34-
event: Readonly<WorkflowEvent<unknown>>,
35-
step: WorkflowStep
36-
): Promise<unknown> {
37-
return await callback(event, step);
38-
}
39-
})(ctx, env);
40-
});
41-
}
42-
43-
async function runWorkflow(
44-
instanceId: string,
45-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
46-
): Promise<DurableObjectStub<Engine>> {
47-
const engineId = env.ENGINE.idFromName(instanceId);
48-
const engineStub = env.ENGINE.get(engineId);
49-
50-
await setWorkflowEntrypoint(engineStub, callback);
51-
52-
await engineStub.init(
53-
12346,
54-
{} as DatabaseWorkflow,
55-
{} as DatabaseVersion,
56-
{} as DatabaseInstance,
57-
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
58-
);
59-
60-
return engineStub;
61-
}
62-
63-
async function runWorkflowDefer(
64-
instanceId: string,
65-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
66-
): Promise<DurableObjectStub<Engine>> {
67-
const engineId = env.ENGINE.idFromName(instanceId);
68-
const engineStub = env.ENGINE.get(engineId);
69-
70-
await setWorkflowEntrypoint(engineStub, callback);
71-
72-
void engineStub.init(
73-
12346,
74-
{} as DatabaseWorkflow,
75-
{} as DatabaseVersion,
76-
{} as DatabaseInstance,
77-
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
78-
);
79-
80-
return engineStub;
81-
}
8213

8314
describe("Engine", () => {
8415
it("should not retry after NonRetryableError is thrown", async () => {
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import {
2+
createExecutionContext,
3+
env,
4+
runInDurableObject,
5+
} from "cloudflare:test";
6+
import type {
7+
DatabaseInstance,
8+
DatabaseVersion,
9+
DatabaseWorkflow,
10+
Engine,
11+
} from "../src/engine";
12+
import type { ProvidedEnv } from "cloudflare:test";
13+
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";
14+
15+
export async function setWorkflowEntrypoint(
16+
stub: DurableObjectStub<Engine>,
17+
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
18+
) {
19+
const ctx = createExecutionContext();
20+
await runInDurableObject(stub, (instance) => {
21+
// @ts-expect-error this is only a stub for WorkflowEntrypoint
22+
instance.env.USER_WORKFLOW = new (class {
23+
constructor(
24+
// eslint-disable-next-line @typescript-eslint/no-shadow
25+
protected ctx: ExecutionContext,
26+
// eslint-disable-next-line @typescript-eslint/no-shadow
27+
protected env: ProvidedEnv
28+
) {}
29+
public async run(
30+
event: Readonly<WorkflowEvent<unknown>>,
31+
step: WorkflowStep
32+
): Promise<unknown> {
33+
return await callback(event, step);
34+
}
35+
})(ctx, env);
36+
});
37+
}
38+
39+
export async function runWorkflow(
40+
instanceId: string,
41+
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
42+
): Promise<DurableObjectStub<Engine>> {
43+
const engineId = env.ENGINE.idFromName(instanceId);
44+
const engineStub = env.ENGINE.get(engineId);
45+
46+
await setWorkflowEntrypoint(engineStub, callback);
47+
48+
await engineStub.init(
49+
12346,
50+
{} as DatabaseWorkflow,
51+
{} as DatabaseVersion,
52+
{} as DatabaseInstance,
53+
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
54+
);
55+
56+
return engineStub;
57+
}
58+
59+
export async function runWorkflowDefer(
60+
instanceId: string,
61+
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
62+
): Promise<DurableObjectStub<Engine>> {
63+
const engineId = env.ENGINE.idFromName(instanceId);
64+
const engineStub = env.ENGINE.get(engineId);
65+
66+
await setWorkflowEntrypoint(engineStub, callback);
67+
68+
void engineStub.init(
69+
12346,
70+
{} as DatabaseWorkflow,
71+
{} as DatabaseVersion,
72+
{} as DatabaseInstance,
73+
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
74+
);
75+
76+
return engineStub;
77+
}

0 commit comments

Comments
 (0)