From dde1dd27190597f655994bc7bab8a7faf2cd4f24 Mon Sep 17 00:00:00 2001 From: Olga Silva Date: Mon, 19 Jan 2026 14:51:58 +0000 Subject: [PATCH] [Workflows] Add step context with attempt count --- .changeset/add-step-context.md | 17 ++++ packages/workflows-shared/src/context.ts | 28 +++++-- .../workflows-shared/tests/binding.test.ts | 28 +------ .../workflows-shared/tests/context.test.ts | 46 +++++++++++ .../workflows-shared/tests/engine.test.ts | 74 +----------------- packages/workflows-shared/tests/utils.ts | 77 +++++++++++++++++++ 6 files changed, 163 insertions(+), 107 deletions(-) create mode 100644 .changeset/add-step-context.md create mode 100644 packages/workflows-shared/tests/context.test.ts create mode 100644 packages/workflows-shared/tests/utils.ts diff --git a/.changeset/add-step-context.md b/.changeset/add-step-context.md new file mode 100644 index 000000000000..c57994d15a6b --- /dev/null +++ b/.changeset/add-step-context.md @@ -0,0 +1,17 @@ +--- +"@cloudflare/workflows-shared": minor +--- + +Adds step context with attempt count to step.do() callbacks. + +Workflow step callbacks now receive a context object containing the current attempt number (1-indexed). +This allows developers to access which retry attempt is currently executing. + +Example: + +```ts +await step.do("my-step", async (ctx) => { + // ctx.attempt is 1 on first try, 2 on first retry, etc. + console.log(`Attempt ${ctx.attempt}`); +}); +``` diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index b63448b3a7cd..5ced5eae4ccf 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -42,6 +42,10 @@ export type StepState = { attemptedCount: number; }; +export type WorkflowStepContext = { + attempt: number; +}; + export class Context extends RpcTarget { #engine: Engine; #state: DurableObjectState; @@ -63,25 +67,30 @@ export class Context extends RpcTarget { return val; } - do(name: string, callback: () => Promise): Promise; + do( + name: string, + callback: (ctx: WorkflowStepContext) => Promise + ): Promise; do( name: string, config: WorkflowStepConfig, - callback: () => Promise + callback: (ctx: WorkflowStepContext) => Promise ): Promise; async do( name: string, - configOrCallback: WorkflowStepConfig | (() => Promise), - callback?: () => Promise + configOrCallback: + | WorkflowStepConfig + | ((ctx: WorkflowStepContext) => Promise), + callback?: (ctx: WorkflowStepContext) => Promise ): Promise { - let closure, stepConfig; + let closure: (ctx: WorkflowStepContext) => Promise, stepConfig; // If a user passes in a config, we'd like it to be the second arg so the callback is always last if (callback) { closure = callback; stepConfig = configOrCallback as WorkflowStepConfig; } else { - closure = configOrCallback as () => Promise; + closure = configOrCallback as (ctx: WorkflowStepContext) => Promise; stepConfig = {}; } @@ -192,7 +201,7 @@ export class Context extends RpcTarget { } const doWrapper = async ( - doWrapperClosure: () => Promise + doWrapperClosure: (ctx: WorkflowStepContext) => Promise ): Promise => { const stepState = ((await this.#state.storage.get( stepStateKey @@ -314,7 +323,10 @@ export class Context extends RpcTarget { await this.#state.storage.delete(`replace-result-${valueKey}`); // if there is a timeout to be forced we dont want to race with closure } else { - result = await Promise.race([doWrapperClosure(), timeoutPromise()]); + result = await Promise.race([ + doWrapperClosure({ attempt: stepState.attemptedCount }), + timeoutPromise(), + ]); } // if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ diff --git a/packages/workflows-shared/tests/binding.test.ts b/packages/workflows-shared/tests/binding.test.ts index f7e278e0d417..24cd827a54cd 100644 --- a/packages/workflows-shared/tests/binding.test.ts +++ b/packages/workflows-shared/tests/binding.test.ts @@ -5,33 +5,7 @@ import { } from "cloudflare:test"; import { describe, expect, it, vi } from "vitest"; import { WorkflowBinding } from "../src/binding"; -import type { Engine } from "../src/engine"; -import type { ProvidedEnv } from "cloudflare:test"; -import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers"; - -async function setWorkflowEntrypoint( - stub: DurableObjectStub, - callback: (event: unknown, step: WorkflowStep) => Promise -) { - const ctx = createExecutionContext(); - await runInDurableObject(stub, (instance) => { - // @ts-expect-error this is only a stub for WorkflowEntrypoint - instance.env.USER_WORKFLOW = new (class { - constructor( - // eslint-disable-next-line @typescript-eslint/no-shadow - protected ctx: ExecutionContext, - // eslint-disable-next-line @typescript-eslint/no-shadow - protected env: ProvidedEnv - ) {} - public async run( - event: Readonly>, - step: WorkflowStep - ): Promise { - return await callback(event, step); - } - })(ctx, env); - }); -} +import { setWorkflowEntrypoint } from "./utils"; describe("WorkflowBinding", () => { it("should not call dispose when sending an event to an instance", async () => { diff --git a/packages/workflows-shared/tests/context.test.ts b/packages/workflows-shared/tests/context.test.ts new file mode 100644 index 000000000000..762649fdccd1 --- /dev/null +++ b/packages/workflows-shared/tests/context.test.ts @@ -0,0 +1,46 @@ +import { describe, expect, it } from "vitest"; +import { runWorkflow } from "./utils"; + +describe("Context", () => { + it("should provide attempt count 1 on first successful attempt", async () => { + let receivedAttempt: number | undefined; + + await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => { + // TODO: remove after types are updated + // @ts-expect-error WorkflowStep types + const result = await step.do("a successful step", async (ctx) => { + receivedAttempt = ctx.attempt; + return "success"; + }); + return result; + }); + + expect(receivedAttempt).toBe(1); + }); + + it("should provide attempt count to callback", async () => { + const receivedAttempts: number[] = []; + + await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => { + const result = await step.do( + "retrying step", + { + retries: { + limit: 2, + delay: 0, + }, + }, + // TODO: remove after types are updated + // @ts-expect-error WorkflowStep types + async (ctx) => { + receivedAttempts.push(ctx.attempt); + throw new Error(`Throwing`); + } + ); + return result; + }); + + // Should have received attempts 1, 2, and 3 + expect(receivedAttempts).toEqual([1, 2, 3]); + }); +}); diff --git a/packages/workflows-shared/tests/engine.test.ts b/packages/workflows-shared/tests/engine.test.ts index f9117d906671..cce6d9fc767a 100644 --- a/packages/workflows-shared/tests/engine.test.ts +++ b/packages/workflows-shared/tests/engine.test.ts @@ -1,84 +1,14 @@ -import { - createExecutionContext, - env, - runInDurableObject, -} from "cloudflare:test"; +import { env, runInDurableObject } from "cloudflare:test"; import { NonRetryableError } from "cloudflare:workflows"; import { describe, expect, it, vi } from "vitest"; import { InstanceEvent, InstanceStatus } from "../src"; +import { runWorkflow, runWorkflowDefer } from "./utils"; import type { DatabaseInstance, DatabaseVersion, DatabaseWorkflow, - Engine, EngineLogs, } from "../src/engine"; -import type { ProvidedEnv } from "cloudflare:test"; -import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers"; - -async function setWorkflowEntrypoint( - stub: DurableObjectStub, - callback: (event: unknown, step: WorkflowStep) => Promise -) { - const ctx = createExecutionContext(); - await runInDurableObject(stub, (instance) => { - // @ts-expect-error this is only a stub for WorkflowEntrypoint - instance.env.USER_WORKFLOW = new (class { - constructor( - // eslint-disable-next-line @typescript-eslint/no-shadow - protected ctx: ExecutionContext, - // eslint-disable-next-line @typescript-eslint/no-shadow - protected env: ProvidedEnv - ) {} - public async run( - event: Readonly>, - step: WorkflowStep - ): Promise { - return await callback(event, step); - } - })(ctx, env); - }); -} - -async function runWorkflow( - instanceId: string, - callback: (event: unknown, step: WorkflowStep) => Promise -): Promise> { - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - await setWorkflowEntrypoint(engineStub, callback); - - await engineStub.init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - {} as DatabaseInstance, - { payload: {}, timestamp: new Date(), instanceId: "some-instance-id" } - ); - - return engineStub; -} - -async function runWorkflowDefer( - instanceId: string, - callback: (event: unknown, step: WorkflowStep) => Promise -): Promise> { - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - await setWorkflowEntrypoint(engineStub, callback); - - void engineStub.init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - {} as DatabaseInstance, - { payload: {}, timestamp: new Date(), instanceId: "some-instance-id" } - ); - - return engineStub; -} describe("Engine", () => { it("should not retry after NonRetryableError is thrown", async () => { diff --git a/packages/workflows-shared/tests/utils.ts b/packages/workflows-shared/tests/utils.ts new file mode 100644 index 000000000000..4e58933d5b0c --- /dev/null +++ b/packages/workflows-shared/tests/utils.ts @@ -0,0 +1,77 @@ +import { + createExecutionContext, + env, + runInDurableObject, +} from "cloudflare:test"; +import type { + DatabaseInstance, + DatabaseVersion, + DatabaseWorkflow, + Engine, +} from "../src/engine"; +import type { ProvidedEnv } from "cloudflare:test"; +import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers"; + +export async function setWorkflowEntrypoint( + stub: DurableObjectStub, + callback: (event: unknown, step: WorkflowStep) => Promise +) { + const ctx = createExecutionContext(); + await runInDurableObject(stub, (instance) => { + // @ts-expect-error this is only a stub for WorkflowEntrypoint + instance.env.USER_WORKFLOW = new (class { + constructor( + // eslint-disable-next-line @typescript-eslint/no-shadow + protected ctx: ExecutionContext, + // eslint-disable-next-line @typescript-eslint/no-shadow + protected env: ProvidedEnv + ) {} + public async run( + event: Readonly>, + step: WorkflowStep + ): Promise { + return await callback(event, step); + } + })(ctx, env); + }); +} + +export async function runWorkflow( + instanceId: string, + callback: (event: unknown, step: WorkflowStep) => Promise +): Promise> { + const engineId = env.ENGINE.idFromName(instanceId); + const engineStub = env.ENGINE.get(engineId); + + await setWorkflowEntrypoint(engineStub, callback); + + await engineStub.init( + 12346, + {} as DatabaseWorkflow, + {} as DatabaseVersion, + {} as DatabaseInstance, + { payload: {}, timestamp: new Date(), instanceId: "some-instance-id" } + ); + + return engineStub; +} + +export async function runWorkflowDefer( + instanceId: string, + callback: (event: unknown, step: WorkflowStep) => Promise +): Promise> { + const engineId = env.ENGINE.idFromName(instanceId); + const engineStub = env.ENGINE.get(engineId); + + await setWorkflowEntrypoint(engineStub, callback); + + void engineStub.init( + 12346, + {} as DatabaseWorkflow, + {} as DatabaseVersion, + {} as DatabaseInstance, + { payload: {}, timestamp: new Date(), instanceId: "some-instance-id" } + ); + + return engineStub; +}