Skip to content

Commit f235827

Browse files
authored
[Workflows] Add step context with attempt count (#11970)
1 parent ec2459e commit f235827

File tree

6 files changed

+170
-108
lines changed

6 files changed

+170
-108
lines changed

.changeset/add-step-context.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
"@cloudflare/workflows-shared": minor
3+
---
4+
5+
Adds step context with attempt count to step.do() callbacks.
6+
7+
Workflow step callbacks now receive a context object containing the current attempt number (1-indexed).
8+
This allows developers to access which retry attempt is currently executing.
9+
10+
Example:
11+
12+
```ts
13+
await step.do("my-step", async (ctx) => {
14+
// ctx.attempt is 1 on first try, 2 on first retry, etc.
15+
console.log(`Attempt ${ctx.attempt}`);
16+
});
17+
```

packages/workflows-shared/src/context.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ export type StepState = {
4242
attemptedCount: number;
4343
};
4444

45+
export type WorkflowStepContext = {
46+
attempt: number;
47+
};
48+
4549
export class Context extends RpcTarget {
4650
#engine: Engine;
4751
#state: DurableObjectState;
@@ -64,25 +68,30 @@ export class Context extends RpcTarget {
6468
return val;
6569
}
6670

67-
do(name: string, callback: () => Promise<unknown>): Promise<unknown>;
71+
do(
72+
name: string,
73+
callback: (ctx: WorkflowStepContext) => Promise<unknown>
74+
): Promise<unknown>;
6875
do(
6976
name: string,
7077
config: WorkflowStepConfig,
71-
callback: () => Promise<unknown>
78+
callback: (ctx: WorkflowStepContext) => Promise<unknown>
7279
): Promise<unknown>;
7380

7481
async do<T>(
7582
name: string,
76-
configOrCallback: WorkflowStepConfig | (() => Promise<T>),
77-
callback?: () => Promise<T>
83+
configOrCallback:
84+
| WorkflowStepConfig
85+
| ((ctx: WorkflowStepContext) => Promise<T>),
86+
callback?: (ctx: WorkflowStepContext) => Promise<T>
7887
): Promise<unknown | void | undefined> {
79-
let closure, stepConfig;
88+
let closure: (ctx: WorkflowStepContext) => Promise<T>, stepConfig;
8089
// If a user passes in a config, we'd like it to be the second arg so the callback is always last
8190
if (callback) {
8291
closure = callback;
8392
stepConfig = configOrCallback as WorkflowStepConfig;
8493
} else {
85-
closure = configOrCallback as () => Promise<T>;
94+
closure = configOrCallback as (ctx: WorkflowStepContext) => Promise<T>;
8695
stepConfig = {};
8796
}
8897

@@ -126,7 +135,11 @@ export class Context extends RpcTarget {
126135
const stepNameWithCounter = `${name}-${count}`;
127136
const stepStateKey = `${cacheKey}-metadata`;
128137

129-
const maybeMap = await this.#state.storage.get([valueKey, configKey]);
138+
const maybeMap = await this.#state.storage.get([
139+
valueKey,
140+
configKey,
141+
errorKey,
142+
]);
130143

131144
// Check cache
132145
const maybeResult = maybeMap.get(valueKey);
@@ -202,7 +215,7 @@ export class Context extends RpcTarget {
202215
}
203216

204217
const doWrapper = async (
205-
doWrapperClosure: () => Promise<unknown>
218+
doWrapperClosure: (ctx: WorkflowStepContext) => Promise<unknown>
206219
): Promise<unknown | void | undefined> => {
207220
const stepState = ((await this.#state.storage.get(
208221
stepStateKey
@@ -324,7 +337,10 @@ export class Context extends RpcTarget {
324337
await this.#state.storage.delete(`replace-result-${valueKey}`);
325338
// if there is a timeout to be forced we dont want to race with closure
326339
} else {
327-
result = await Promise.race([doWrapperClosure(), timeoutPromise()]);
340+
result = await Promise.race([
341+
doWrapperClosure({ attempt: stepState.attemptedCount }),
342+
timeoutPromise(),
343+
]);
328344
}
329345

330346
// if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ

packages/workflows-shared/tests/binding.test.ts

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,33 +5,7 @@ import {
55
} from "cloudflare:test";
66
import { describe, it, vi } from "vitest";
77
import { WorkflowBinding } from "../src/binding";
8-
import type { Engine } from "../src/engine";
9-
import type { ProvidedEnv } from "cloudflare:test";
10-
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";
11-
12-
async function setWorkflowEntrypoint(
13-
stub: DurableObjectStub<Engine>,
14-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
15-
) {
16-
const ctx = createExecutionContext();
17-
await runInDurableObject(stub, (instance) => {
18-
// @ts-expect-error this is only a stub for WorkflowEntrypoint
19-
instance.env.USER_WORKFLOW = new (class {
20-
constructor(
21-
// eslint-disable-next-line @typescript-eslint/no-shadow
22-
protected ctx: ExecutionContext,
23-
// eslint-disable-next-line @typescript-eslint/no-shadow
24-
protected env: ProvidedEnv
25-
) {}
26-
public async run(
27-
event: Readonly<WorkflowEvent<unknown>>,
28-
step: WorkflowStep
29-
): Promise<unknown> {
30-
return await callback(event, step);
31-
}
32-
})(ctx, env);
33-
});
34-
}
8+
import { setWorkflowEntrypoint } from "./utils";
359

3610
describe("WorkflowBinding", () => {
3711
it("should not call dispose when sending an event to an instance", async ({
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import { describe, it } from "vitest";
2+
import { runWorkflow } from "./utils";
3+
4+
describe("Context", () => {
5+
it("should provide attempt count 1 on first successful attempt", async ({
6+
expect,
7+
}) => {
8+
let receivedAttempt: number | undefined;
9+
10+
await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => {
11+
// TODO: remove after types are updated
12+
// @ts-expect-error WorkflowStep types
13+
const result = await step.do("a successful step", async (ctx) => {
14+
receivedAttempt = ctx.attempt;
15+
return "success";
16+
});
17+
return result;
18+
});
19+
20+
expect(receivedAttempt).toBe(1);
21+
});
22+
23+
it("should provide attempt count to callback", async ({ expect }) => {
24+
const receivedAttempts: number[] = [];
25+
26+
await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => {
27+
const result = await step.do(
28+
"retrying step",
29+
{
30+
retries: {
31+
limit: 2,
32+
delay: 0,
33+
},
34+
},
35+
// TODO: remove after types are updated
36+
// @ts-expect-error WorkflowStep types
37+
async (ctx) => {
38+
receivedAttempts.push(ctx.attempt);
39+
throw new Error(`Throwing`);
40+
}
41+
);
42+
return result;
43+
});
44+
45+
// Should have received attempts 1, 2, and 3
46+
expect(receivedAttempts).toEqual([1, 2, 3]);
47+
});
48+
});

packages/workflows-shared/tests/engine.test.ts

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,14 @@
1-
import {
2-
createExecutionContext,
3-
env,
4-
runInDurableObject,
5-
} from "cloudflare:test";
1+
import { env, runInDurableObject } from "cloudflare:test";
62
import { NonRetryableError } from "cloudflare:workflows";
73
import { describe, it, vi } from "vitest";
84
import { DEFAULT_STEP_LIMIT, InstanceEvent, InstanceStatus } from "../src";
5+
import { runWorkflow, runWorkflowDefer, setWorkflowEntrypoint } from "./utils";
96
import type {
107
DatabaseInstance,
118
DatabaseVersion,
129
DatabaseWorkflow,
13-
Engine,
1410
EngineLogs,
1511
} from "../src/engine";
16-
import type { ProvidedEnv } from "cloudflare:test";
17-
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";
18-
19-
async function setWorkflowEntrypoint(
20-
stub: DurableObjectStub<Engine>,
21-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
22-
) {
23-
const ctx = createExecutionContext();
24-
await runInDurableObject(stub, (instance) => {
25-
// @ts-expect-error this is only a stub for WorkflowEntrypoint
26-
instance.env.USER_WORKFLOW = new (class {
27-
constructor(
28-
// eslint-disable-next-line @typescript-eslint/no-shadow
29-
protected ctx: ExecutionContext,
30-
// eslint-disable-next-line @typescript-eslint/no-shadow
31-
protected env: ProvidedEnv
32-
) {}
33-
public async run(
34-
event: Readonly<WorkflowEvent<unknown>>,
35-
step: WorkflowStep
36-
): Promise<unknown> {
37-
return await callback(event, step);
38-
}
39-
})(ctx, env);
40-
});
41-
}
42-
43-
async function runWorkflow(
44-
instanceId: string,
45-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
46-
): Promise<DurableObjectStub<Engine>> {
47-
const engineId = env.ENGINE.idFromName(instanceId);
48-
const engineStub = env.ENGINE.get(engineId);
49-
50-
await setWorkflowEntrypoint(engineStub, callback);
51-
52-
await engineStub.init(
53-
12346,
54-
{} as DatabaseWorkflow,
55-
{} as DatabaseVersion,
56-
{} as DatabaseInstance,
57-
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
58-
);
59-
60-
return engineStub;
61-
}
62-
63-
async function runWorkflowDefer(
64-
instanceId: string,
65-
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
66-
): Promise<DurableObjectStub<Engine>> {
67-
const engineId = env.ENGINE.idFromName(instanceId);
68-
const engineStub = env.ENGINE.get(engineId);
69-
70-
await setWorkflowEntrypoint(engineStub, callback);
71-
72-
void engineStub.init(
73-
12346,
74-
{} as DatabaseWorkflow,
75-
{} as DatabaseVersion,
76-
{} as DatabaseInstance,
77-
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
78-
);
79-
80-
return engineStub;
81-
}
8212

8313
describe("Engine", () => {
8414
it("should not retry after NonRetryableError is thrown", async ({
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import {
2+
createExecutionContext,
3+
env,
4+
runInDurableObject,
5+
} from "cloudflare:test";
6+
import type {
7+
DatabaseInstance,
8+
DatabaseVersion,
9+
DatabaseWorkflow,
10+
Engine,
11+
} from "../src/engine";
12+
import type { ProvidedEnv } from "cloudflare:test";
13+
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";
14+
15+
export async function setWorkflowEntrypoint(
16+
stub: DurableObjectStub<Engine>,
17+
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
18+
) {
19+
const ctx = createExecutionContext();
20+
await runInDurableObject(stub, (instance) => {
21+
// @ts-expect-error this is only a stub for WorkflowEntrypoint
22+
instance.env.USER_WORKFLOW = new (class {
23+
constructor(
24+
// eslint-disable-next-line @typescript-eslint/no-shadow
25+
protected ctx: ExecutionContext,
26+
// eslint-disable-next-line @typescript-eslint/no-shadow
27+
protected env: ProvidedEnv
28+
) {}
29+
public async run(
30+
event: Readonly<WorkflowEvent<unknown>>,
31+
step: WorkflowStep
32+
): Promise<unknown> {
33+
return await callback(event, step);
34+
}
35+
})(ctx, env);
36+
});
37+
}
38+
39+
export async function runWorkflow(
40+
instanceId: string,
41+
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
42+
): Promise<DurableObjectStub<Engine>> {
43+
const engineId = env.ENGINE.idFromName(instanceId);
44+
const engineStub = env.ENGINE.get(engineId);
45+
46+
await setWorkflowEntrypoint(engineStub, callback);
47+
48+
await engineStub.init(
49+
12346,
50+
{} as DatabaseWorkflow,
51+
{} as DatabaseVersion,
52+
{} as DatabaseInstance,
53+
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
54+
);
55+
56+
return engineStub;
57+
}
58+
59+
export async function runWorkflowDefer(
60+
instanceId: string,
61+
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
62+
): Promise<DurableObjectStub<Engine>> {
63+
const engineId = env.ENGINE.idFromName(instanceId);
64+
const engineStub = env.ENGINE.get(engineId);
65+
66+
await setWorkflowEntrypoint(engineStub, callback);
67+
68+
void engineStub.init(
69+
12346,
70+
{} as DatabaseWorkflow,
71+
{} as DatabaseVersion,
72+
{} as DatabaseInstance,
73+
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
74+
);
75+
76+
return engineStub;
77+
}

0 commit comments

Comments
 (0)