Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .changeset/add-step-context.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
"@cloudflare/workflows-shared": minor
---

Adds step context with attempt count to step.do() callbacks.

Workflow step callbacks now receive a context object containing the current attempt number (1-indexed).
This allows developers to access which retry attempt is currently executing.

Example:

```ts
await step.do("my-step", async (ctx) => {
// ctx.attempt is 1 on first try, 2 on first retry, etc.
console.log(`Attempt ${ctx.attempt}`);
});
```
28 changes: 20 additions & 8 deletions packages/workflows-shared/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export type StepState = {
attemptedCount: number;
};

export type WorkflowStepContext = {
attempt: number;
};

export class Context extends RpcTarget {
#engine: Engine;
#state: DurableObjectState;
Expand All @@ -63,25 +67,30 @@ export class Context extends RpcTarget {
return val;
}

do(name: string, callback: () => Promise<unknown>): Promise<unknown>;
do(
name: string,
callback: (ctx: WorkflowStepContext) => Promise<unknown>
): Promise<unknown>;
do(
name: string,
config: WorkflowStepConfig,
callback: () => Promise<unknown>
callback: (ctx: WorkflowStepContext) => Promise<unknown>
): Promise<unknown>;

async do<T>(
name: string,
configOrCallback: WorkflowStepConfig | (() => Promise<T>),
callback?: () => Promise<T>
configOrCallback:
| WorkflowStepConfig
| ((ctx: WorkflowStepContext) => Promise<T>),
callback?: (ctx: WorkflowStepContext) => Promise<T>
): Promise<unknown | void | undefined> {
let closure, stepConfig;
let closure: (ctx: WorkflowStepContext) => Promise<T>, stepConfig;
// If a user passes in a config, we'd like it to be the second arg so the callback is always last
if (callback) {
closure = callback;
stepConfig = configOrCallback as WorkflowStepConfig;
} else {
closure = configOrCallback as () => Promise<T>;
closure = configOrCallback as (ctx: WorkflowStepContext) => Promise<T>;
stepConfig = {};
}

Expand Down Expand Up @@ -192,7 +201,7 @@ export class Context extends RpcTarget {
}

const doWrapper = async (
doWrapperClosure: () => Promise<unknown>
doWrapperClosure: (ctx: WorkflowStepContext) => Promise<unknown>
): Promise<unknown | void | undefined> => {
const stepState = ((await this.#state.storage.get(
stepStateKey
Expand Down Expand Up @@ -314,7 +323,10 @@ export class Context extends RpcTarget {
await this.#state.storage.delete(`replace-result-${valueKey}`);
// if there is a timeout to be forced we dont want to race with closure
} else {
result = await Promise.race([doWrapperClosure(), timeoutPromise()]);
result = await Promise.race([
doWrapperClosure({ attempt: stepState.attemptedCount }),
timeoutPromise(),
]);
}

// if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ
Expand Down
28 changes: 1 addition & 27 deletions packages/workflows-shared/tests/binding.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,7 @@ import {
} from "cloudflare:test";
import { describe, expect, it, vi } from "vitest";
import { WorkflowBinding } from "../src/binding";
import type { Engine } from "../src/engine";
import type { ProvidedEnv } from "cloudflare:test";
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";

async function setWorkflowEntrypoint(
stub: DurableObjectStub<Engine>,
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
) {
const ctx = createExecutionContext();
await runInDurableObject(stub, (instance) => {
// @ts-expect-error this is only a stub for WorkflowEntrypoint
instance.env.USER_WORKFLOW = new (class {
constructor(
// eslint-disable-next-line @typescript-eslint/no-shadow
protected ctx: ExecutionContext,
// eslint-disable-next-line @typescript-eslint/no-shadow
protected env: ProvidedEnv
) {}
public async run(
event: Readonly<WorkflowEvent<unknown>>,
step: WorkflowStep
): Promise<unknown> {
return await callback(event, step);
}
})(ctx, env);
});
}
import { setWorkflowEntrypoint } from "./utils";

describe("WorkflowBinding", () => {
it("should not call dispose when sending an event to an instance", async () => {
Expand Down
46 changes: 46 additions & 0 deletions packages/workflows-shared/tests/context.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { describe, expect, it } from "vitest";
import { runWorkflow } from "./utils";

describe("Context", () => {
it("should provide attempt count 1 on first successful attempt", async () => {
let receivedAttempt: number | undefined;

await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => {
// TODO: remove after types are updated
// @ts-expect-error WorkflowStep types
const result = await step.do("a successful step", async (ctx) => {
receivedAttempt = ctx.attempt;
return "success";
});
return result;
});

expect(receivedAttempt).toBe(1);
});

it("should provide attempt count to callback", async () => {
const receivedAttempts: number[] = [];

await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => {
const result = await step.do(
"retrying step",
{
retries: {
limit: 2,
delay: 0,
},
},
// TODO: remove after types are updated
// @ts-expect-error WorkflowStep types
async (ctx) => {
receivedAttempts.push(ctx.attempt);
throw new Error(`Throwing`);
}
);
return result;
});

// Should have received attempts 1, 2, and 3
expect(receivedAttempts).toEqual([1, 2, 3]);
});
});
74 changes: 2 additions & 72 deletions packages/workflows-shared/tests/engine.test.ts
Original file line number Diff line number Diff line change
@@ -1,84 +1,14 @@
import {
createExecutionContext,
env,
runInDurableObject,
} from "cloudflare:test";
import { env, runInDurableObject } from "cloudflare:test";
import { NonRetryableError } from "cloudflare:workflows";
import { describe, expect, it, vi } from "vitest";
import { InstanceEvent, InstanceStatus } from "../src";
import { runWorkflow, runWorkflowDefer } from "./utils";
import type {
DatabaseInstance,
DatabaseVersion,
DatabaseWorkflow,
Engine,
EngineLogs,
} from "../src/engine";
import type { ProvidedEnv } from "cloudflare:test";
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";

async function setWorkflowEntrypoint(
stub: DurableObjectStub<Engine>,
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
) {
const ctx = createExecutionContext();
await runInDurableObject(stub, (instance) => {
// @ts-expect-error this is only a stub for WorkflowEntrypoint
instance.env.USER_WORKFLOW = new (class {
constructor(
// eslint-disable-next-line @typescript-eslint/no-shadow
protected ctx: ExecutionContext,
// eslint-disable-next-line @typescript-eslint/no-shadow
protected env: ProvidedEnv
) {}
public async run(
event: Readonly<WorkflowEvent<unknown>>,
step: WorkflowStep
): Promise<unknown> {
return await callback(event, step);
}
})(ctx, env);
});
}

async function runWorkflow(
instanceId: string,
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
): Promise<DurableObjectStub<Engine>> {
const engineId = env.ENGINE.idFromName(instanceId);
const engineStub = env.ENGINE.get(engineId);

await setWorkflowEntrypoint(engineStub, callback);

await engineStub.init(
12346,
{} as DatabaseWorkflow,
{} as DatabaseVersion,
{} as DatabaseInstance,
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
);

return engineStub;
}

async function runWorkflowDefer(
instanceId: string,
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
): Promise<DurableObjectStub<Engine>> {
const engineId = env.ENGINE.idFromName(instanceId);
const engineStub = env.ENGINE.get(engineId);

await setWorkflowEntrypoint(engineStub, callback);

void engineStub.init(
12346,
{} as DatabaseWorkflow,
{} as DatabaseVersion,
{} as DatabaseInstance,
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
);

return engineStub;
}

describe("Engine", () => {
it("should not retry after NonRetryableError is thrown", async () => {
Expand Down
77 changes: 77 additions & 0 deletions packages/workflows-shared/tests/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import {
createExecutionContext,
env,
runInDurableObject,
} from "cloudflare:test";
import type {
DatabaseInstance,
DatabaseVersion,
DatabaseWorkflow,
Engine,
} from "../src/engine";
import type { ProvidedEnv } from "cloudflare:test";
import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers";

export async function setWorkflowEntrypoint(
stub: DurableObjectStub<Engine>,
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
) {
const ctx = createExecutionContext();
await runInDurableObject(stub, (instance) => {
// @ts-expect-error this is only a stub for WorkflowEntrypoint
instance.env.USER_WORKFLOW = new (class {
constructor(
// eslint-disable-next-line @typescript-eslint/no-shadow
protected ctx: ExecutionContext,
// eslint-disable-next-line @typescript-eslint/no-shadow
protected env: ProvidedEnv
) {}
public async run(
event: Readonly<WorkflowEvent<unknown>>,
step: WorkflowStep
): Promise<unknown> {
return await callback(event, step);
}
})(ctx, env);
});
}

export async function runWorkflow(
instanceId: string,
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
): Promise<DurableObjectStub<Engine>> {
const engineId = env.ENGINE.idFromName(instanceId);
const engineStub = env.ENGINE.get(engineId);

await setWorkflowEntrypoint(engineStub, callback);

await engineStub.init(
12346,
{} as DatabaseWorkflow,
{} as DatabaseVersion,
{} as DatabaseInstance,
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
);

return engineStub;
}

export async function runWorkflowDefer(
instanceId: string,
callback: (event: unknown, step: WorkflowStep) => Promise<unknown>
): Promise<DurableObjectStub<Engine>> {
const engineId = env.ENGINE.idFromName(instanceId);
const engineStub = env.ENGINE.get(engineId);

await setWorkflowEntrypoint(engineStub, callback);

void engineStub.init(
12346,
{} as DatabaseWorkflow,
{} as DatabaseVersion,
{} as DatabaseInstance,
{ payload: {}, timestamp: new Date(), instanceId: "some-instance-id" }
);

return engineStub;
}
Loading