diff --git a/.changeset/nice-carrots-sink.md b/.changeset/nice-carrots-sink.md new file mode 100644 index 000000000000..bcdcd7100309 --- /dev/null +++ b/.changeset/nice-carrots-sink.md @@ -0,0 +1,5 @@ +--- +"miniflare": patch +--- + +Include workflow bining name in workflow plugin. diff --git a/.changeset/public-women-exist.md b/.changeset/public-women-exist.md new file mode 100644 index 000000000000..c23d3c257ed7 --- /dev/null +++ b/.changeset/public-women-exist.md @@ -0,0 +1,11 @@ +--- +"@cloudflare/vitest-pool-workers": minor +--- + +Add Workflows test support to the `cloudflare:test` module. + +The `cloudflare:test` module has two new APIs: + +- `introspectWorkflowInstance` +- `introspectWorkflow` + which allow changing the behavior of one or multiple Workflow instances created during tests. diff --git a/fixtures/vitest-pool-workers-examples/README.md b/fixtures/vitest-pool-workers-examples/README.md index dd4c3797cb96..020f74460ab1 100644 --- a/fixtures/vitest-pool-workers-examples/README.md +++ b/fixtures/vitest-pool-workers-examples/README.md @@ -10,6 +10,7 @@ This directory contains example projects tested with `@cloudflare/vitest-pool-wo | [πŸ“¦ kv-r2-caches](kv-r2-caches) | Isolated tests using KV, R2 and the Cache API | | [πŸ“š d1](d1) | Isolated tests using D1 with migrations | | [πŸ“Œ durable-objects](durable-objects) | Isolated tests using Durable Objects with direct access | +| [πŸ” workflows](workflows) | Tests using Workflows | | [πŸš₯ queues](queues) | Tests using Queue producers and consumers | | [🚰 pipelines](pipelines) | Tests using Pipelines | | [πŸš€ hyperdrive](hyperdrive) | Tests using Hyperdrive with a Vitest managed TCP server | diff --git a/fixtures/vitest-pool-workers-examples/workflows/README.md b/fixtures/vitest-pool-workers-examples/workflows/README.md index 476d2235b63a..bf98b73fc6ae 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/README.md +++ b/fixtures/vitest-pool-workers-examples/workflows/README.md @@ -1,4 +1,9 @@ # πŸ” workflows -Unit testing of workflows themselves is not possible yet - but you can still -trigger them in unit tests. +This Worker includes a ModeratorWorkflow that serves as a template for an automated content moderation process. +The testing suite uses workflow mocking to validate the logic of each step. + +| Test | Overview | +| ----------------------------------------------- | ----------------------------------------------------------------------------------------- | +| [integration.test.ts](test/integration.test.ts) | Tests on the Worker's endpoints, ensuring that workflows are created and run correctly. | +| [unit.test.ts](test/unit.test.ts) | Tests on the internal logic of each workflow. It uses mocking to test steps in isolation. | diff --git a/fixtures/vitest-pool-workers-examples/workflows/src/env.d.ts b/fixtures/vitest-pool-workers-examples/workflows/src/env.d.ts index 7b694d43d5f1..983e36c3f1ac 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/src/env.d.ts +++ b/fixtures/vitest-pool-workers-examples/workflows/src/env.d.ts @@ -1,3 +1,6 @@ -interface Env { - TEST_WORKFLOW: Workflow; +declare namespace Cloudflare { + interface Env { + MODERATOR: Workflow; + } } +interface Env extends Cloudflare.Env {} diff --git a/fixtures/vitest-pool-workers-examples/workflows/src/index.ts b/fixtures/vitest-pool-workers-examples/workflows/src/index.ts index 293c94d18207..7f753b4a7322 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/src/index.ts +++ b/fixtures/vitest-pool-workers-examples/workflows/src/index.ts @@ -1,28 +1,89 @@ import { - WorkerEntrypoint, WorkflowEntrypoint, WorkflowEvent, WorkflowStep, } from "cloudflare:workers"; -export class TestWorkflow extends WorkflowEntrypoint { +export class ModeratorWorkflow extends WorkflowEntrypoint { async run(_event: Readonly>, step: WorkflowStep) { - console.log("ola"); - return "test-workflow"; + await step.sleep("sleep for a while", "10 seconds"); + + // Get an initial analysis from an AI model + const aiResult = await step.do("AI content scan", async () => { + // Call to an workers-ai to scan the text content and return a violation score + + // Simulated score: + const violationScore = Math.floor(Math.random() * 100); + + return { violationScore: violationScore }; + }); + + // Triage based on the AI score + if (aiResult.violationScore < 10) { + await step.do("auto approve content", async () => { + // API call to set app content status to "approved" + return { status: "auto_approved" }; + }); + return { status: "auto_approved" }; + } + if (aiResult.violationScore > 90) { + await step.do("auto reject content", async () => { + // API call to set app content status to "rejected" + return { status: "auto_rejected" }; + }); + return { status: "auto_rejected" }; + } + + // If the score is ambiguous, require human review + type EventPayload = { + moderatorAction: string; + }; + const eventPayload = await step.waitForEvent("human review", { + type: "moderation-decision", + timeout: "1 day", + }); + + if (eventPayload) { + // The moderator responded in time. + const decision = eventPayload.payload.moderatorAction; // e.g., "approve" or "reject" + await step.do("apply moderator decision", async () => { + // API call to update content status based on the decision + return { status: "moderated", decision: decision }; + }); + return { status: "moderated", decision: decision }; + } } } -export default class TestNamedEntrypoint extends WorkerEntrypoint { - async fetch(request: Request) { - const maybeId = new URL(request.url).searchParams.get("id"); +export default { + async fetch(request: Request, env: Env) { + const url = new URL(request.url); + const maybeId = url.searchParams.get("id"); if (maybeId !== null) { - const instance = await this.env.TEST_WORKFLOW.get(maybeId); + const instance = await env.MODERATOR.get(maybeId); return Response.json(await instance.status()); } - const workflow = await this.env.TEST_WORKFLOW.create(); + if (url.pathname === "/moderate") { + const workflow = await env.MODERATOR.create(); + return Response.json({ + id: workflow.id, + details: await workflow.status(), + }); + } - return new Response(JSON.stringify({ id: workflow.id })); - } -} + if (url.pathname === "/moderate-batch") { + const workflows = await env.MODERATOR.createBatch([ + {}, + { id: "321" }, + {}, + ]); + + const ids = workflows.map((workflow) => workflow.id); + return Response.json({ ids: ids }); + } + + return new Response("Not found", { status: 404 }); + }, +}; diff --git a/fixtures/vitest-pool-workers-examples/workflows/test/env.d.ts b/fixtures/vitest-pool-workers-examples/workflows/test/env.d.ts index 12d2e823e4f2..bd9eb32999df 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/test/env.d.ts +++ b/fixtures/vitest-pool-workers-examples/workflows/test/env.d.ts @@ -1,6 +1,4 @@ declare module "cloudflare:test" { // Controls the type of `import("cloudflare:test").env` - interface ProvidedEnv extends Env { - TEST_WORKFLOW: Workflow; - } + interface ProvidedEnv extends Env {} } diff --git a/fixtures/vitest-pool-workers-examples/workflows/test/integration-self.test.ts b/fixtures/vitest-pool-workers-examples/workflows/test/integration-self.test.ts deleted file mode 100644 index 0e200b8a10fa..000000000000 --- a/fixtures/vitest-pool-workers-examples/workflows/test/integration-self.test.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { SELF } from "cloudflare:test"; -import { expect, it, vi } from "vitest"; - -it("should be able to trigger a workflow", async () => { - const res = await SELF.fetch("https://mock-worker.local"); - - expect(res.status).toBe(200); -}); - -it("workflow should reach the end and be successful", async () => { - const res = await SELF.fetch("https://mock-worker.local"); - - const json = await res.json<{ id: string }>(); - - await vi.waitUntil(async () => { - const res = await SELF.fetch(`https://mock-worker.local?id=${json.id}`); - - const statusJson = await res.json<{ status: string }>(); - console.log(statusJson); - return statusJson.status === "complete"; - }, 1000); -}); diff --git a/fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts b/fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts new file mode 100644 index 000000000000..3cf76ef55edd --- /dev/null +++ b/fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts @@ -0,0 +1,62 @@ +import { env, introspectWorkflow, SELF } from "cloudflare:test"; +import { expect, it } from "vitest"; + +const STATUS_COMPLETE = "complete"; +const STEP_NAME = "AI content scan"; +const mockResult = { violationScore: 0 }; + +// This example implicitly disposes the Workflow instance +it("workflow should be able to reach the end and be successful", async () => { + // CONFIG with `await using` to ensure Workflow instances cleanup: + await using introspector = await introspectWorkflow(env.MODERATOR); + await introspector.modifyAll(async (m) => { + await m.disableSleeps(); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + }); + + await SELF.fetch(`https://mock-worker.local/moderate`); + + const instances = introspector.get(); + expect(instances.length).toBe(1); + + // ASSERTIONS: + const instance = instances[0]; + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow(); + + // DISPOSE: ensured by `await using` +}); + +// This example explicitly disposes the Workflow instances +it("workflow batch should be able to reach the end and be successful", async () => { + // CONFIG: + let introspector = await introspectWorkflow(env.MODERATOR); + try { + await introspector.modifyAll(async (m) => { + await m.disableSleeps(); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + }); + + await SELF.fetch(`https://mock-worker.local/moderate-batch`); + + const instances = introspector.get(); + expect(instances.length).toBe(3); + + // ASSERTIONS: + for (const instance of instances) { + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + await expect( + instance.waitForStatus(STATUS_COMPLETE) + ).resolves.not.toThrow(); + } + } finally { + // DISPOSE: + // Workflow introspector should be disposed the end of each test, if no `await using` dyntax is used + // Also disposes all intercepted instances + await introspector.dispose(); + } +}); diff --git a/fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts b/fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts new file mode 100644 index 000000000000..2b0adb578b71 --- /dev/null +++ b/fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts @@ -0,0 +1,159 @@ +import { env, introspectWorkflowInstance } from "cloudflare:test"; +import { expect, it } from "vitest"; + +const INSTANCE_ID = "12345678910"; +const STATUS_COMPLETE = "complete"; +const STATUS_ERROR = "errored"; +const STEP_NAME = "AI content scan"; + +// This example implicitly disposes the Workflow instance +it("should mock a non-violation score and complete", async () => { + const mockResult = { violationScore: 0 }; + + // CONFIG with `await using` to ensure Workflow instances cleanup: + await using instance = await introspectWorkflowInstance( + env.MODERATOR, + INSTANCE_ID + ); + await instance.modify(async (m) => { + await m.disableSleeps(); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + }); + + await env.MODERATOR.create({ + id: INSTANCE_ID, + }); + + // ASSERTIONS: + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + expect( + await instance.waitForStepResult({ name: "auto approve content" }) + ).toEqual({ status: "auto_approved" }); + + await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow(); + + // DISPOSE: ensured by `await using` +}); + +// This example explicitly disposes the Workflow instance +it("should mock the violation score calculation to fail 2 times and then complete", async () => { + const mockResult = { violationScore: 0 }; + + // CONFIG: + const instance = await introspectWorkflowInstance(env.MODERATOR, INSTANCE_ID); + + try { + await instance.modify(async (m) => { + await m.disableSleeps(); + await m.mockStepError( + { name: STEP_NAME }, + new Error("Something went wrong!"), + 2 + ); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + }); + + await env.MODERATOR.create({ + id: INSTANCE_ID, + }); + + // ASSERTIONS: + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + expect( + await instance.waitForStepResult({ name: "auto approve content" }) + ).toEqual({ status: "auto_approved" }); + + await expect( + instance.waitForStatus(STATUS_COMPLETE) + ).resolves.not.toThrow(); + } finally { + // DISPOSE: + // Workflow introspector should be disposed the end of each test, if no `await using` dyntax is used + // Also disposes all intercepted instances + await instance.dispose(); + } +}); + +it("should mock a violation score and complete", async () => { + const mockResult = { violationScore: 99 }; + + await using instance = await introspectWorkflowInstance( + env.MODERATOR, + INSTANCE_ID + ); + await instance.modify(async (m) => { + await m.disableSleeps(); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + }); + + await env.MODERATOR.create({ + id: INSTANCE_ID, + }); + + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + expect( + await instance.waitForStepResult({ name: "auto reject content" }) + ).toEqual({ status: "auto_rejected" }); + + await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow(); +}); + +it("should be reviewed, accepted and complete", async () => { + const mockResult = { violationScore: 50 }; + + await using instance = await introspectWorkflowInstance( + env.MODERATOR, + INSTANCE_ID + ); + await instance.modify(async (m) => { + await m.disableSleeps(); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + await m.mockEvent({ + type: "moderation-decision", + payload: { moderatorAction: "approve" }, + }); + }); + + await env.MODERATOR.create({ + id: INSTANCE_ID, + }); + + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + expect( + await instance.waitForStepResult({ name: "apply moderator decision" }) + ).toEqual({ status: "moderated", decision: "approve" }); + + await expect(instance.waitForStatus(STATUS_COMPLETE)).resolves.not.toThrow(); +}); + +it("should force human review to timeout and error", async () => { + const mockResult = { violationScore: 50 }; + + await using instance = await introspectWorkflowInstance( + env.MODERATOR, + INSTANCE_ID + ); + await instance.modify(async (m) => { + await m.disableSleeps(); + await m.mockStepResult({ name: STEP_NAME }, mockResult); + await m.forceEventTimeout({ name: "human review" }); + }); + + await env.MODERATOR.create({ + id: INSTANCE_ID, + }); + + expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual( + mockResult + ); + + await expect(instance.waitForStatus(STATUS_ERROR)).resolves.not.toThrow(); +}); diff --git a/fixtures/vitest-pool-workers-examples/workflows/vitest.config.ts b/fixtures/vitest-pool-workers-examples/workflows/vitest.config.ts index 91a0d7265364..23119b5bc991 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/vitest.config.ts +++ b/fixtures/vitest-pool-workers-examples/workflows/vitest.config.ts @@ -9,8 +9,6 @@ export default defineWorkersProject({ poolOptions: { workers: { singleWorker: true, - // FIXME(lduarte): currently for the workflow binding to work, isolateStorage must be disabled. - isolatedStorage: false, wrangler: { configPath: "./wrangler.jsonc", }, diff --git a/fixtures/vitest-pool-workers-examples/workflows/wrangler.jsonc b/fixtures/vitest-pool-workers-examples/workflows/wrangler.jsonc index 01249aef9d96..49738dc2c1dd 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/wrangler.jsonc +++ b/fixtures/vitest-pool-workers-examples/workflows/wrangler.jsonc @@ -4,9 +4,9 @@ "compatibility_date": "2025-02-04", "workflows": [ { - "binding": "TEST_WORKFLOW", - "class_name": "TestWorkflow", - "name": "test-workflow", + "binding": "MODERATOR", + "class_name": "ModeratorWorkflow", + "name": "moderator-workflow", }, ], } diff --git a/packages/miniflare/src/plugins/workflows/index.ts b/packages/miniflare/src/plugins/workflows/index.ts index 20a39fdbd43a..9b66ded4b18d 100644 --- a/packages/miniflare/src/plugins/workflows/index.ts +++ b/packages/miniflare/src/plugins/workflows/index.ts @@ -82,7 +82,7 @@ export const WORKFLOWS_PLUGIN: Plugin< // this creates one miniflare service per workflow that the user's script has. we should dedupe engine definition later const services = Object.entries(options.workflows ?? {}).map( - ([_bindingName, workflow]) => { + ([bindingName, workflow]) => { // NOTE(lduarte): the engine unique namespace key must be unique per workflow definition // otherwise workerd will crash because there's two equal DO namespaces const uniqueKey = `miniflare-workflows-${workflow.name}`; @@ -124,6 +124,10 @@ export const WORKFLOWS_PLUGIN: Plugin< entrypoint: workflow.className, }, }, + { + name: "BINDING_NAME", + json: JSON.stringify(bindingName), + }, ], }, }; diff --git a/packages/vitest-pool-workers/package.json b/packages/vitest-pool-workers/package.json index 63205ef79e6e..0c07fe231da6 100644 --- a/packages/vitest-pool-workers/package.json +++ b/packages/vitest-pool-workers/package.json @@ -66,6 +66,7 @@ "@cloudflare/mock-npm-registry": "workspace:*", "@cloudflare/workers-tsconfig": "workspace:*", "@cloudflare/workers-types": "catalog:default", + "@cloudflare/workflows-shared": "workspace:*", "@types/node": "catalog:default", "@types/semver": "^7.5.1", "@vitest/runner": "catalog:default", diff --git a/packages/vitest-pool-workers/scripts/bundle.mjs b/packages/vitest-pool-workers/scripts/bundle.mjs index 24ec1e20f6cb..6a528ba5f3fa 100644 --- a/packages/vitest-pool-workers/scripts/bundle.mjs +++ b/packages/vitest-pool-workers/scripts/bundle.mjs @@ -85,8 +85,32 @@ const commonOptions = { platform: "node", target: "esnext", bundle: true, - packages: "external", - external: ["cloudflare:*"], + external: [ + // Node.js built-ins (handled automatically by esbuild but listed for completeness) + "node:*", + "cloudflare:*", + "workerd:*", + // Virtual/runtime modules + "__VITEST_POOL_WORKERS_DEFINES", + "__VITEST_POOL_WORKERS_USER_OBJECT", + // All npm packages (previously handled by packages: "external") + "birpc", + "cjs-module-lexer", + "devalue", + "miniflare", + "semver", + "semver/*", + "wrangler", + "zod", + "undici", + "undici/*", + // Peer dependencies + "vitest", + "vitest/*", + "@vitest/runner", + "@vitest/snapshot", + "@vitest/snapshot/*", + ], sourcemap: true, sourcesContent: false, logLevel: watch ? "info" : "warning", diff --git a/packages/vitest-pool-workers/src/pool/index.ts b/packages/vitest-pool-workers/src/pool/index.ts index 9411cd878355..e7ad2e1c2f31 100644 --- a/packages/vitest-pool-workers/src/pool/index.ts +++ b/packages/vitest-pool-workers/src/pool/index.ts @@ -493,15 +493,6 @@ function buildProjectWorkerOptions( fixupWorkflowBindingsToSelf(runnerWorker, relativeWranglerConfigPath) ).sort(); - if ( - workflowClassNames.length !== 0 && - project.options.isolatedStorage === true - ) { - throw new Error(`Project ${project.relativePath} has Workflows defined and \`isolatedStorage\` set to true. -Please set \`isolatedStorage\` to false in order to run projects with Workflows. -Workflows defined in project: ${workflowClassNames.join(", ")}`); - } - const wrappers = [ 'import { createWorkerEntrypointWrapper, createDurableObjectWrapper, createWorkflowEntrypointWrapper } from "cloudflare:test-internal";', ]; diff --git a/packages/vitest-pool-workers/src/pool/loopback.ts b/packages/vitest-pool-workers/src/pool/loopback.ts index d79498ab9b8e..1b4ec7bed935 100644 --- a/packages/vitest-pool-workers/src/pool/loopback.ts +++ b/packages/vitest-pool-workers/src/pool/loopback.ts @@ -9,6 +9,7 @@ import { Mutex, R2_PLUGIN_NAME, Response, + WORKFLOWS_PLUGIN_NAME, } from "miniflare"; import { isFileNotFoundError, WORKER_NAME_PREFIX } from "./helpers"; import type { Awaitable, Miniflare, Request, WorkerOptions } from "miniflare"; @@ -314,6 +315,7 @@ const PLUGIN_PRODUCT_NAMES: Record = { [DURABLE_OBJECTS_PLUGIN_NAME]: "Durable Objects", [KV_PLUGIN_NAME]: "KV", [R2_PLUGIN_NAME]: "R2", + [WORKFLOWS_PLUGIN_NAME]: "Workflows", }; const LIST_FORMAT = new Intl.ListFormat("en-US"); @@ -347,6 +349,23 @@ function checkAllStorageOperationsResolved( "\x1b[2m" ); lines.push("\x1b[22m" + separator, ""); + + if ( + failedProducts.includes( + PLUGIN_PRODUCT_NAMES[WORKFLOWS_PLUGIN_NAME] ?? WORKFLOWS_PLUGIN_NAME + ) + ) { + console.warn( + [ + "", + separator, + `Workflows are being created in ${source}.`, + "Even with isolated storage, Workflows are required to be manually disposed at the end of each test.", + "See https://developers.cloudflare.com/workers/testing/vitest-integration/test-apis/ for more details.", + "", + ].join("\n") + ); + } console.error(lines.join("\n")); return false; } diff --git a/packages/vitest-pool-workers/src/worker/lib/cloudflare/test-internal.ts b/packages/vitest-pool-workers/src/worker/lib/cloudflare/test-internal.ts index 818f2c420df4..afc69277687f 100644 --- a/packages/vitest-pool-workers/src/worker/lib/cloudflare/test-internal.ts +++ b/packages/vitest-pool-workers/src/worker/lib/cloudflare/test-internal.ts @@ -7,3 +7,4 @@ export * from "../../env"; export * from "../../events"; export * from "../../fetch-mock"; export * from "../../wait-until"; +export * from "../../workflows"; diff --git a/packages/vitest-pool-workers/src/worker/lib/cloudflare/test.ts b/packages/vitest-pool-workers/src/worker/lib/cloudflare/test.ts index bc19e10b5f9b..26246a242e26 100644 --- a/packages/vitest-pool-workers/src/worker/lib/cloudflare/test.ts +++ b/packages/vitest-pool-workers/src/worker/lib/cloudflare/test.ts @@ -18,4 +18,6 @@ export { getQueueResult, applyD1Migrations, createPagesEventContext, + introspectWorkflowInstance, + introspectWorkflow, } from "cloudflare:test-internal"; diff --git a/packages/vitest-pool-workers/src/worker/workflows.ts b/packages/vitest-pool-workers/src/worker/workflows.ts new file mode 100644 index 000000000000..6a5753535507 --- /dev/null +++ b/packages/vitest-pool-workers/src/worker/workflows.ts @@ -0,0 +1,265 @@ +import { + instanceStatusName, + InstanceStatus as InstanceStatusNumber, +} from "@cloudflare/workflows-shared/src/instance"; +import { runInRunnerObject } from "./durable-objects"; +import { env, internalEnv } from "./env"; +import type { WorkflowBinding } from "@cloudflare/workflows-shared/src/binding"; +import type { + StepSelector, + WorkflowInstanceModifier, +} from "@cloudflare/workflows-shared/src/modifier"; + +type ModifierCallback = (m: WorkflowInstanceModifier) => Promise; + +// See public facing `cloudflare:test` types for docs +export interface WorkflowInstanceIntrospector { + modify(fn: ModifierCallback): Promise; + + waitForStepResult(step: StepSelector): Promise; + + waitForStatus(status: string): Promise; + + dispose(): Promise; +} + +// Note(osilva): `introspectWorkflowInstance()` doesn’t need to be async, but we keep it that way +// to avoid potential breaking changes later and to stay consistent with `introspectWorkflow`. + +// In the "cloudflare:test" module, the exposed type is `Workflow`. Here we use `WorkflowBinding` +// (which implements `Workflow`) to access unsafe functions. +export async function introspectWorkflowInstance( + workflow: WorkflowBinding, + instanceId: string +): Promise { + if (!workflow || !instanceId) { + throw new Error( + "[WorkflowIntrospector] Workflow binding and instance id are required." + ); + } + return new WorkflowInstanceIntrospectorHandle(workflow, instanceId); +} + +class WorkflowInstanceIntrospectorHandle + implements WorkflowInstanceIntrospector +{ + #workflow: WorkflowBinding; + #instanceId: string; + #instanceModifier: WorkflowInstanceModifier; + + constructor(workflow: WorkflowBinding, instanceId: string) { + this.#workflow = workflow; + this.#instanceId = instanceId; + + this.#instanceModifier = workflow.unsafeGetInstanceModifier( + instanceId + ) as WorkflowInstanceModifier; + } + + async modify(fn: ModifierCallback): Promise { + await fn(this.#instanceModifier); + + return this; + } + + async waitForStepResult(step: StepSelector): Promise { + const stepResult = await this.#workflow.unsafeWaitForStepResult( + this.#instanceId, + step.name, + step.index + ); + + return stepResult; + } + + async waitForStatus(status: InstanceStatus["status"]): Promise { + if ( + status === instanceStatusName(InstanceStatusNumber.Terminated) || + status === instanceStatusName(InstanceStatusNumber.Paused) + ) { + throw new Error( + `[WorkflowIntrospector] InstanceStatus '${status}' is not implemented yet and cannot be waited.` + ); + } + + if (status === instanceStatusName(InstanceStatusNumber.Queued)) { + // we currently don't have a queue mechanism, but it would happen before it + // starts running, so waiting for it to be queued should always return + return; + } + await this.#workflow.unsafeWaitForStatus(this.#instanceId, status); + } + + async dispose(): Promise { + await this.#workflow.unsafeAbort(this.#instanceId, "Instance dispose"); + } + + async [Symbol.asyncDispose](): Promise { + await this.dispose(); + } +} + +// See public facing `cloudflare:test` types for docs +export interface WorkflowIntrospector { + modifyAll(fn: ModifierCallback): Promise; + + get(): WorkflowInstanceIntrospector[]; + + dispose(): Promise; +} + +// Note(osilva): `introspectWorkflow` could be sync with some changes, but we keep it async +// to avoid potential breaking changes later. + +// In the "cloudflare:test" module, the exposed type is `Workflow`. Here we use `WorkflowBinding` +// (which implements `Workflow`) to access unsafe functions. +export async function introspectWorkflow( + workflow: WorkflowBinding +): Promise { + if (!workflow) { + throw new Error("[WorkflowIntrospector] Workflow binding is required."); + } + + const modifierCallbacks: ModifierCallback[] = []; + const instanceIntrospectors: WorkflowInstanceIntrospector[] = []; + + const bindingName = await workflow.unsafeGetBindingName(); + const internalOriginalWorkflow = internalEnv[bindingName] as Workflow; + const externalOriginalWorkflow = env[bindingName] as Workflow; + + const introspectAndModifyInstance = async (instanceId: string) => { + try { + await runInRunnerObject(internalEnv, async () => { + const introspector = await introspectWorkflowInstance( + workflow, + instanceId + ); + instanceIntrospectors.push(introspector); + // Apply any stored modifier functions + for (const callback of modifierCallbacks) { + await introspector.modify(callback); + } + }); + } catch (error) { + console.error( + `[WorkflowIntrospector] Error during introspection for instance ${instanceId}:`, + error + ); + throw new Error( + `[WorkflowIntrospector] Failed to introspect Workflow instance ${instanceId}.` + ); + } + }; + + const createWorkflowProxyGetHandler = < + T extends Workflow, + >(): ProxyHandler["get"] => { + return (target, property) => { + if (property === "create") { + return new Proxy(target[property], { + async apply(func, thisArg, argArray) { + const hasId = Object.hasOwn(argArray[0] ?? {}, "id"); + if (!hasId) { + argArray = [{ id: crypto.randomUUID(), ...(argArray[0] ?? {}) }]; + } + const instanceId = (argArray[0] as { id: string }).id; + + await introspectAndModifyInstance(instanceId); + + return target[property](...argArray); + }, + }); + } + + if (property === "createBatch") { + return new Proxy(target[property], { + async apply(func, thisArg, argArray) { + for (const [index, arg] of argArray[0]?.entries() ?? []) { + const hasId = Object.hasOwn(arg, "id"); + if (!hasId) { + argArray[0][index] = { id: crypto.randomUUID(), ...arg }; + } + } + + await Promise.all( + argArray[0].map((options: { id: string }) => + introspectAndModifyInstance(options.id) + ) + ); + + const createPromises = (argArray[0] ?? []).map( + (arg: WorkflowInstanceCreateOptions) => target["create"](arg) + ); + return Promise.all(createPromises); + }, + }); + } + // @ts-expect-error index signature + return target[property]; + }; + }; + + const dispose = () => { + internalEnv[bindingName] = internalOriginalWorkflow; + env[bindingName] = externalOriginalWorkflow; + }; + + // Create a single handler instance to be reused + const proxyGetHandler = createWorkflowProxyGetHandler(); + + // Apply the proxies using the shared handler logic + internalEnv[bindingName] = new Proxy(internalOriginalWorkflow, { + get: proxyGetHandler, + }); + env[bindingName] = new Proxy(externalOriginalWorkflow, { + get: proxyGetHandler, + }); + + return new WorkflowIntrospectorHandle( + workflow, + modifierCallbacks, + instanceIntrospectors, + dispose + ); +} + +class WorkflowIntrospectorHandle implements WorkflowIntrospector { + workflow: WorkflowBinding; + #modifierCallbacks: ModifierCallback[]; + #instanceIntrospectors: WorkflowInstanceIntrospector[]; + #disposeCallback: () => void; + + constructor( + workflow: WorkflowBinding, + modifierCallbacks: ModifierCallback[], + instanceIntrospectors: WorkflowInstanceIntrospector[], + disposeCallback: () => void + ) { + this.workflow = workflow; + this.#modifierCallbacks = modifierCallbacks; + this.#instanceIntrospectors = instanceIntrospectors; + this.#disposeCallback = disposeCallback; + } + + async modifyAll(fn: ModifierCallback): Promise { + this.#modifierCallbacks.push(fn); + } + + get(): WorkflowInstanceIntrospector[] { + return this.#instanceIntrospectors; + } + + async dispose(): Promise { + // also disposes all instance introspectors + await Promise.all( + this.#instanceIntrospectors.map((introspector) => introspector.dispose()) + ); + this.#modifierCallbacks = []; + this.#instanceIntrospectors = []; + this.#disposeCallback(); + } + + async [Symbol.asyncDispose](): Promise { + await this.dispose(); + } +} diff --git a/packages/vitest-pool-workers/types/cloudflare-test.d.ts b/packages/vitest-pool-workers/types/cloudflare-test.d.ts index e677bda34070..5e69a6189d90 100644 --- a/packages/vitest-pool-workers/types/cloudflare-test.d.ts +++ b/packages/vitest-pool-workers/types/cloudflare-test.d.ts @@ -127,6 +127,373 @@ declare module "cloudflare:test" { migrationsTableName?: string ): Promise; + /** + * Creates an introspector for a specific Workflow instance, used to + * modify its behavior and await outcomes. + * This is the primary entry point for testing individual Workflow instances. + * + * @param workflow - The Workflow binding, e.g., `env.MY_WORKFLOW`. + * @param instanceId - The known ID of the Workflow instance to target. + * @returns A `WorkflowInstanceIntrospector` to control the instance's behavior. + * + * @remarks + * ### Dispose + * + * The introspector must be disposed after the test to remove mocks and release + * resources. This can be handled in two ways: + * + * 1. **Implicit dispose**: With the `await using` syntax. + * `await using instance = await introspectWorkflowInstance(...)` + * + * 2. **Explicit dispose**: Manually call `await instance.dispose()` at the end of the + * test. + * + * @example + * // Full test of a Workflow instance using implicit dispose + * it("should disable all sleeps and complete", async () => { + * // 1. CONFIGURATION + * // `await using` ensures .dispose() is automatically called at the end of the block. + * await using instance = await introspectWorkflowInstance(env.MY_WORKFLOW, "123456"); + * + * await instance.modify(async (m) => { + * await m.disableSleeps(); + * }); + * + * // 2. EXECUTION + * await env.MY_WORKFLOW.create({ id: "123456" }); + * + * // 3. ASSERTION + * await instance.waitForStatus("complete"); + * + * // 4. DISPOSE is implicit and automatic here. + * }); + */ + export function introspectWorkflowInstance( + workflow: Workflow, + instanceId: string + ): Promise; + + /** + * Provides methods to control a single Workflow instance. + */ + export interface WorkflowInstanceIntrospector { + /** + * Applies modifications to the Workflow instance's behavior. + * Takes a callback function to apply modifications. + * + * @param fn - An async callback that receives a `WorkflowInstanceModifier` object. + * @returns The `WorkflowInstanceIntrospector` instance for chaining. + */ + modify( + fn: (m: WorkflowInstanceModifier) => Promise + ): Promise; + + /** + * Waits for a specific step to complete and return a result. + * If the step has already completed, this promise resolves immediately. + * + * @param step - An object specifying the step `name` and optional `index` (1-based). + * If multiple steps share the same name, `index` targets a specific one. + * Defaults to the first step found (`index: 1`). + * @returns A promise that resolves with the step's result, + * or rejects with an error if the step fails. + */ + waitForStepResult(step: { name: string; index?: number }): Promise; + + /** + * Waits for the Workflow instance to reach a specific InstanceStatus status + * (e.g., 'running', 'complete'). + * If the instance is already in the target status, this promise resolves immediately. + * Throws an error if the Workflow instance reaches a finite state + * (e.g., complete, errored) that is different from the target status. + * + * @param status - The target `InstanceStatus` to wait for. + */ + waitForStatus(status: InstanceStatus["status"]): Promise; + + /** + * Disposes the Workflow instance introspector. + * + * This is crucial for ensuring test isolation by preventing state from + * leaking between tests. It should be called at the end of each test. + */ + dispose(): Promise; + + /** + * An alias for {@link dispose} to support automatic disposal with the `using` keyword. + * + * @see {@link dispose} + * @example + * ```ts + * it('my workflow test', async () => { + * await using instance = await introspectWorkflowInstance(env.WORKFLOW, "123456"); + * + * // ... your test logic ... + * + * // .dispose() is automatically called here at the end of the scope + * }); + * ``` + */ + [Symbol.asyncDispose](): Promise; + } + + /** + * Provides methods to mock or alter the behavior of a Workflow instance's + * steps, events, and sleeps. + */ + interface WorkflowInstanceModifier { + /** + * Disables sleeps, causing `step.sleep()` and `step.sleepUntil()` to + * resolve immediately. + * + * @example Disable all sleeps: + * ```ts + * await instance.modify(m => { + * m.disableSleeps(); + * }); + * ``` + * + * @example Disable a specific set of sleeps by their step names: + * ```ts + * await instance.modify(m => { + * m.disableSleeps([{ name: "sleep1" }, { name: "sleep5" }, { name: "sleep7" }]); + * }); + * ``` + * + * @param steps - Optional array of specific steps to disable sleeps for. + * If omitted, **all sleeps** in the Workflow will be disabled. + * A step is an object specifying the step `name` and optional `index` (1-based). + * If multiple steps share the same name, `index` targets a specific one. + * Defaults to the first step found (`index: 1`). + */ + disableSleeps(steps?: { name: string; index?: number }[]): Promise; + + /** + * Mocks the result of a `step.do()`, causing it to return a specified + * value instantly without executing the step's actual implementation. + * + * If called multiple times for the same step, an error will be thrown. + * + * @param step - An object specifying the step `name` and optional `index` (1-based). + * If multiple steps share the same name, `index` targets a specific one. + * Defaults to the first step found (`index: 1`). + * @param stepResult - The mock value to be returned by the step. + * + * @example Mock the result of the third step named "fetch-data": + * ```ts + * await instance.modify(m => { + * m.mockStepResult( + * { name: "fetch-data", index: 3 }, + * { success: true, data: [1, 2, 3] } + * ); + * }); + * ``` + */ + mockStepResult( + step: { name: string; index?: number }, + stepResult: unknown + ): Promise; + + /** + * Forces a `step.do()` to throw an error, simulating a failure without + * executing the step's actual implementation. Useful for testing retry logic + * and error handling. + * + * @example Mock a step that errors 3 times before succeeding: + * ```ts + * // This example assumes the "fetch-data" step is configured with at least 3 retries. + * await instance.modify(m => { + * m.mockStepError( + * { name: "fetch-data" }, + * new Error("Failed!"), + * 3 + * ); + * m.mockStepResult( + * { name: "fetch-data" }, + * { success: true, data: [1, 2, 3] } + * ); + * }); + * ``` + * + * @param step - An object specifying the step `name` and optional `index` (1-based). + * If multiple steps share the same name, `index` targets a specific one. + * Defaults to the first step found (`index: 1`). + * @param error - The `Error` object to be thrown. + * @param times - Optional number of times to throw the error. If a step has + * retries configured, it will fail this many times before potentially + * succeeding on a subsequent attempt. If omitted, it will throw on **every attempt**. + */ + mockStepError( + step: { name: string; index?: number }, + error: Error, + times?: number + ): Promise; + + /** + * Forces a `step.do()` to fail by timing out immediately, without executing + * the step's actual implementation. Default step timeout is 10 minutes. + * + * @example Mock a step that times out 3 times before succeeding: + * ```ts + * // This example assumes the "fetch-data" step is configured with at least 3 retries. + * await instance.modify(m => { + * m.forceStepTimeout( + * { name: "fetch-data" }, + * 3 + * ); + * m.mockStepResult( + * { name: "fetch-data" }, + * { success: true, data: [1, 2, 3] } + * ); + * }); + * ``` + * + * @param step - An object specifying the step `name` and optional `index` (1-based). + * If multiple steps share the same name, `index` targets a specific one. + * Defaults to the first step found (`index: 1`). + * @param times - Optional number of times the step will time out. Useful for + * testing retry logic. If omitted, it will time out on **every attempt**. + */ + forceStepTimeout(step: { name: string; index?: number }, times?: number); + + /** + * Sends a mock event to the Workflow instance. This causes a `step.waitForEvent()` + * to resolve with the provided payload, as long as the step's timeout has not + * yet expired. Default event timeout is 24 hours. + * + * @example Mock a step event: + * ```ts + * await instance.modify(m => { + * m.mockEvent( + * { type: "user-approval", payload: { approved: true } }, + * ); + * ``` + * + * @param event - The event to send, including its `type` and `payload`. + */ + mockEvent(event: { type: string; payload: unknown }): Promise; + + /** + * Forces a `step.waitForEvent()` to time out instantly, causing the step to fail. + * This simulates a scenario where an expected event never arrives. + * Default event timeout is 24 hours. + * + * @example Mock a step to time out: + * ```ts + * await instance.modify(m => { + * m.forceEventTimeout( + * { name: "user-approval" }, + * ); + * ``` + * + * @param step - An object specifying the step `name` and optional `index` (1-based). + * If multiple steps share the same name, `index` targets a specific one. + * Defaults to the first step found (`index: 1`). + */ + forceEventTimeout(step: { name: string; index?: number }): Promise; + } + + /** + * Creates an **introspector** for a Workflow, where instance IDs are unknown + * beforehand. This allows for defining modifications that will apply to + * **all subsequently created instances**. + * + * This is the primary entry point for testing Workflow instances where the id + * is unknown before their creation. + * + * @param workflow - The Workflow binding, e.g., `env.MY_WORKFLOW`. + * @returns A `WorkflowIntrospector` to control the instances behavior. + * + * @remarks + * ### Dispose + * + * The introspector must be disposed after the test to remove mocks and release + * resources. This can be handled in two ways: + * + * 1. **Implicit dispose**: With the `await using` syntax. + * `await using introspector = await introspectWorkflow(...)` + * + * 2. **Explicit dispose**: Manually call `await introspector.dispose()` at the end of the + * test. + * + * @example + * ```ts + * // Full test of a Workflow instance using implicit dispose + * it("should disable all sleeps and complete", async () => { + * // 1. CONFIGURATION + * await using introspector = await introspectWorkflow(env.MY_WORKFLOW); + * await introspector.modifyAll(async (m) => { + * await m.disableSleeps(); + * }); + * + * // 2. EXECUTION + * await env.MY_WORKFLOW.create(); + * + * // 3. ASSERTION + * const instances = introspector.get(); + * for(const instance of instances) { + * await instance.waitForStatus("complete"); + * } + * + * // 4. DISPOSE is implicit and automatic here. + * }); + * ``` + */ + export function introspectWorkflow( + workflow: Workflow + ): Promise; + + /** + * Provides methods to control all instances created by a Worflow. + */ + export interface WorkflowIntrospector { + /** + * Applies modifications to all Workflow instances created after calling + * `introspectWorkflow`. Takes a callback function to apply modifications. + * + * @param fn - An async callback that receives a `WorkflowInstanceModifier` object. + */ + modifyAll( + fn: (m: WorkflowInstanceModifier) => Promise + ): Promise; + + /** + * Returns all `WorkflowInstanceIntrospector`s from Workflow instances + * created after calling `introspectWorkflow`. + */ + get(): WorkflowInstanceIntrospector[]; + + /** + * + * Disposes the introspector and every `WorkflowInstanceIntrospector` from Workflow + * instances created after calling `introspectWorkflow`. + * + * This function is essential for test isolation, ensuring that results from one + * test do not leak into the next. It should be called at the end or after each test. + * + * **Note:** After dispose, `introspectWorkflow()` must be called again to begin + * a new introspection. + * + */ + dispose(): Promise; + + /** + * An alias for {@link dispose} to support automatic disposal with the `using` keyword. + * This is an alternative to calling `dispose()` in an `afterEach` hook. + * + * @see {@link dispose} + * @example + * it('my workflow test', async () => { + * await using workflowIntrospector = await introspectWorkflow(env.WORKFLOW); + * + * // ... your test logic ... + * + * // .dispose() is automatically called here at the end of the scope + * }); + */ + [Symbol.asyncDispose](): Promise; + } + // Only require `params` and `data` to be specified if they're non-empty interface EventContextInitBase { request: Request; diff --git a/packages/workflows-shared/package.json b/packages/workflows-shared/package.json index 3143dd3a1d58..c9cb3f8faaf0 100644 --- a/packages/workflows-shared/package.json +++ b/packages/workflows-shared/package.json @@ -35,7 +35,7 @@ }, "dependencies": { "heap-js": "^2.5.0", - "itty-time": "^1.0.6", + "itty-time": "^2.0.2", "mime": "^3.0.0", "zod": "^3.22.3" }, diff --git a/packages/workflows-shared/src/binding.ts b/packages/workflows-shared/src/binding.ts index 3965a28bb10b..a5ca1737fa19 100644 --- a/packages/workflows-shared/src/binding.ts +++ b/packages/workflows-shared/src/binding.ts @@ -10,6 +10,7 @@ import type { type Env = { ENGINE: DurableObjectNamespace; + BINDING_NAME: string; }; // this.env.WORKFLOW is WorkflowBinding @@ -78,6 +79,50 @@ export class WorkflowBinding extends WorkerEntrypoint implements Workflow { return await Promise.all(batch.map((val) => this.create(val))); } + + public unsafeGetBindingName(): string { + return this.env.BINDING_NAME; + } + + public unsafeGetInstanceModifier(instanceId: string): unknown { + const stubId = this.env.ENGINE.idFromName(instanceId); + const stub = this.env.ENGINE.get(stubId); + + const instanceModifier = stub.getInstanceModifier(); + + return instanceModifier; + } + + public async unsafeWaitForStepResult( + instanceId: string, + name: string, + index?: number + ): Promise { + const stubId = this.env.ENGINE.idFromName(instanceId); + const stub = this.env.ENGINE.get(stubId); + + return await stub.waitForStepResult(name, index); + } + + public async unsafeAbort(instanceId: string, reason?: string): Promise { + const stubId = this.env.ENGINE.idFromName(instanceId); + const stub = this.env.ENGINE.get(stubId); + + try { + await stub.unsafeAbort(reason); + } catch { + // do nothing because we want to dispose this instance + } + } + + public async unsafeWaitForStatus( + instanceId: string, + status: string + ): Promise { + const stubId = this.env.ENGINE.idFromName(instanceId); + const stub = this.env.ENGINE.get(stubId); + return await stub.waitForStatus(status); + } } export class WorkflowHandle extends RpcTarget implements WorkflowInstance { diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index d11aace6fcbd..bbc610a4f8ba 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -29,9 +29,9 @@ const defaultConfig: Required = { retries: { limit: 5, delay: 1000, - backoff: "constant", + backoff: "exponential", }, - timeout: "15 minutes", + timeout: "10 minutes", }; export interface UserErrorField { @@ -242,7 +242,10 @@ export class Context extends RpcTarget { try { const timeoutPromise = async () => { const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; - const timeout = ms(config.timeout); + let timeout = ms(config.timeout); + if (forceStepTimeout) { + timeout = 0; + } // @ts-expect-error priorityQueue is initiated in init await this.#engine.priorityQueue.add({ hash: priorityQueueHash, @@ -274,7 +277,45 @@ export class Context extends RpcTarget { await this.#state.storage.put(stepStateKey, stepState); const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; - result = await Promise.race([doWrapperClosure(), timeoutPromise()]); + const mockErrorKey = `mock-step-error-${valueKey}`; + const persistentMockError = await this.#state.storage.get<{ + name: string; + message: string; + }>(mockErrorKey); + const transientMockError = await this.#state.storage.get<{ + name: string; + message: string; + }>(`${mockErrorKey}-${stepState.attemptedCount}`); + const mockErrorPayload = persistentMockError || transientMockError; + + // if a mocked error exists, throw it immediately + if (mockErrorPayload) { + const errorToThrow = new Error(mockErrorPayload.message); + errorToThrow.name = mockErrorPayload.name; + throw errorToThrow; + } + + const replaceResult = await this.#state.storage.get( + `replace-result-${valueKey}` + ); + + const forceStepTimeoutKey = `force-step-timeout-${valueKey}`; + const persistentStepTimeout = + await this.#state.storage.get(forceStepTimeoutKey); + const transientStepTimeout = await this.#state.storage.get( + `${forceStepTimeoutKey}-${stepState.attemptedCount}` + ); + const forceStepTimeout = persistentStepTimeout || transientStepTimeout; + + if (forceStepTimeout) { + result = await timeoutPromise(); + } else if (replaceResult) { + result = replaceResult; + await this.#state.storage.delete(`replace-result-${valueKey}`); + // if there is a timeout to be forced we dont want to race with closure + } else { + result = await Promise.race([doWrapperClosure(), timeoutPromise()]); + } // if we reach here, means that the clouse ran successfully and we can remove the timeout from the PQ // @ts-expect-error priorityQueue is initiated in init @@ -457,6 +498,14 @@ export class Context extends RpcTarget { const sleepLogWrittenKey = `${cacheKey}-log-written`; const maybeResult = await this.#state.storage.get(sleepKey); + const sleepNameCountHash = await computeHash( + name + this.#getCount("sleep-" + name) + ); + const disableThisSleep = await this.#state.storage.get(sleepNameCountHash); + const disableAllSleeps = await this.#state.storage.get("disableAllSleeps"); + + const disableSleep = disableAllSleeps || disableThisSleep; + if (maybeResult != undefined) { // @ts-expect-error priorityQueue is initiated in init const entryPQ = this.#engine.priorityQueue.getFirst( @@ -464,7 +513,9 @@ export class Context extends RpcTarget { ); // in case the engine dies while sleeping and wakes up before the retry period if (entryPQ !== undefined) { - await scheduler.wait(entryPQ.targetTimestamp - Date.now()); + await scheduler.wait( + disableSleep ? 0 : entryPQ.targetTimestamp - Date.now() + ); // @ts-expect-error priorityQueue is initiated in init this.#engine.priorityQueue.remove({ hash: cacheKey, type: "sleep" }); } @@ -502,11 +553,12 @@ export class Context extends RpcTarget { // @ts-expect-error priorityQueue is initiated in init await this.#engine.priorityQueue.add({ hash: cacheKey, - targetTimestamp: Date.now() + duration, + targetTimestamp: Date.now() + (disableSleep ? 0 : duration), type: "sleep", }); + // this probably will never finish except if sleep is less than the grace period - await scheduler.wait(duration); + await scheduler.wait(disableSleep ? 0 : duration); this.#engine.writeLog( InstanceEvent.SLEEP_COMPLETE, @@ -605,6 +657,9 @@ export class Context extends RpcTarget { const timeoutEntryPQ = this.#engine.priorityQueue.getFirst( (a) => a.hash === cacheKey && a.type === "timeout" ); + const forceEventTimeout = await this.#state.storage.get( + `force-event-timeout-${waitForEventKey}` + ); if ( (timeoutEntryPQ === undefined && this.#engine.priorityQueue !== undefined && @@ -613,7 +668,8 @@ export class Context extends RpcTarget { type: "timeout", })) || (timeoutEntryPQ !== undefined && - timeoutEntryPQ.targetTimestamp < Date.now()) + timeoutEntryPQ.targetTimestamp < Date.now()) || + forceEventTimeout ) { this.#engine.writeLog( InstanceEvent.WAIT_TIMED_OUT, @@ -680,7 +736,6 @@ export class Context extends RpcTarget { : timeoutPromise(ms(options.timeout), true), ]) .then(async (event) => { - console.log(event); this.#engine.writeLog( InstanceEvent.WAIT_COMPLETE, cacheKey, diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts index 5d401a822dbf..3c352c00357d 100644 --- a/packages/workflows-shared/src/engine.ts +++ b/packages/workflows-shared/src/engine.ts @@ -4,8 +4,11 @@ import { INSTANCE_METADATA, InstanceEvent, InstanceStatus, + instanceStatusName, InstanceTrigger, + toInstanceStatus, } from "./instance"; +import { computeHash } from "./lib/cache"; import { WorkflowFatalError } from "./lib/errors"; import { ENGINE_TIMEOUT, @@ -13,6 +16,7 @@ import { startGracePeriod, } from "./lib/gracePeriodSemaphore"; import { TimePriorityQueue } from "./lib/timePriorityQueue"; +import { WorkflowInstanceModifier } from "./modifier"; import type { Event } from "./context"; import type { InstanceMetadata, RawInstanceLog } from "./instance"; import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers"; @@ -133,6 +137,11 @@ export class Engine extends DurableObject { target, JSON.stringify(metadata) ); + + // Wake any waiters if this is a terminal step event + if (group) { + this.handleStepResultWaiter(group, event, metadata); + } } readLogsFromStep(_cacheKey: string): RawInstanceLog[] { @@ -191,12 +200,172 @@ export class Engine extends DurableObject { status: InstanceStatus ): Promise { await this.ctx.storage.put(ENGINE_STATUS_KEY, status); + + // check if anyone is waiting for this status + this.handleStatusWaiter(status); + } + + private statusWaiters: Map< + InstanceStatus, + { resolve: () => void; reject: (e: unknown) => void } + > = new Map(); + async waitForStatus(status: string): Promise { + const targetStatus = toInstanceStatus(status); + const currentStatus = + await this.ctx.storage.get(ENGINE_STATUS_KEY); + + // if the workflow has already reached the desired state, resolve immediately + if (currentStatus === targetStatus) { + return; + } + + // if it hasn't reached the desired state, create a new promise and add its resolver to the waiters map + return new Promise((resolve, reject) => { + this.statusWaiters.set(targetStatus, { resolve, reject }); + // immediately reconcile against current status in case it's already finite + this.handleStatusWaiter(currentStatus as InstanceStatus); + }); + } + + handleStatusWaiter(status: InstanceStatus): void { + const waiter = this.statusWaiters.get(status); + + // resolve if it reached the desired status + if (waiter) { + waiter.resolve(); + this.statusWaiters.delete(status); + return; + } + + switch (status) { + case InstanceStatus.Errored: { + // if it reaches final status "errored", then it can't be waiting for it to complete or terminate + const unreachableStatuses = [ + InstanceStatus.Complete, + InstanceStatus.Terminated, + ]; + + this.rejectUnreachableStatus(status, unreachableStatuses); + break; + } + case InstanceStatus.Terminated: { + // if it reaches final status "terminated", then it can't be waiting for it to complete or error + const unreachableStatuses = [ + InstanceStatus.Complete, + InstanceStatus.Errored, + ]; + + this.rejectUnreachableStatus(status, unreachableStatuses); + break; + } + case InstanceStatus.Complete: { + // if it reaches final status "complete", then it can't be waiting for it to terminate or error + const unreachableStatuses = [ + InstanceStatus.Terminated, + InstanceStatus.Errored, + ]; + + this.rejectUnreachableStatus(status, unreachableStatuses); + break; + } + default: + break; + } + } + + rejectUnreachableStatus( + reachedStatus: number, + unreachableStatuses: number[] + ): void { + if (unreachableStatuses) { + for (const unreachableStatus of unreachableStatuses) { + const waiter = this.statusWaiters.get(unreachableStatus); + if (waiter) { + waiter.reject( + new Error( + `[WorkflowIntrospector] The Workflow instance ${this.instanceId} has reached status '${instanceStatusName(reachedStatus)}'. This is a finite status that prevents it from ever reaching the expected status of '${instanceStatusName(unreachableStatus)}'.` + ) + ); + this.statusWaiters.delete(unreachableStatus); + return; + } + } + } + } + + private stepResultWaiters: Map< + string, + { resolve: (v: unknown) => void; reject: (e: unknown) => void } + > = new Map(); + async waitForStepResult( + stepName: string, + stepCount?: number + ): Promise { + const hash = await computeHash(stepName); + const count = stepCount ?? 1; + const cacheKey = `${hash}-${count}`; + + // read latest log from step + const rows = [ + ...this.ctx.storage.sql.exec<{ + event: InstanceEvent; + metadata: string; + }>( + "SELECT event, metadata FROM states WHERE groupKey = ? ORDER BY id DESC LIMIT 1", + cacheKey + ), + ]; + + if (rows.length > 0) { + const { event, metadata } = rows[0]; + const parsed = JSON.parse(metadata); + if (event === InstanceEvent.STEP_SUCCESS) { + return parsed?.result; + } + if (event === InstanceEvent.STEP_FAILURE) { + throw parsed?.error ?? parsed; + } + } + + // if it hasn't completed the step, create a new promise to later resolve/reject + return new Promise((resolve, reject) => { + this.stepResultWaiters.set(cacheKey, { resolve, reject }); + }); + } + + handleStepResultWaiter( + group: string, + event: InstanceEvent, + metadata: Record + ) { + const waiter = this.stepResultWaiters.get(group); + if (!waiter) { + return; + } + if (event === InstanceEvent.STEP_SUCCESS) { + const result = metadata?.result; + waiter.resolve(result); + this.stepResultWaiters.delete(group); + } else if (event === InstanceEvent.STEP_FAILURE) { + const error = metadata?.error ?? new Error("Step failed"); + waiter.reject(error); + this.stepResultWaiters.delete(group); + } } async abort(_reason: string) { // TODO: Maybe don't actually kill but instead check a flag and return early if true } + // Called by the dispose function when introspecting the instance in tests + // TODO: Ideally this abort should be done by `abortAllDurableObjects` from worked called by vitest-pool-workers + async unsafeAbort(reason?: string) { + await this.ctx.storage.sync(); + await this.ctx.storage.deleteAll(); + + this.ctx.abort(reason); + } + async storeEventMap() { // TODO: this can be more efficient, but oh well await this.ctx.blockConcurrencyWhile(async () => { @@ -267,8 +436,14 @@ export class Engine extends DurableObject { } } } else { + const mockEvent = await this.ctx.storage.get(`mock-event-${event.type}`); + if (mockEvent) { + return; + } + const metadata = await this.ctx.storage.get(INSTANCE_METADATA); + if (metadata === undefined) { throw new Error("Engine was never started"); } @@ -283,6 +458,10 @@ export class Engine extends DurableObject { } } + getInstanceModifier(): WorkflowInstanceModifier { + return new WorkflowInstanceModifier(this, this.ctx); + } + async userTriggeredTerminate() {} async init( @@ -364,6 +543,7 @@ export class Engine extends DurableObject { }); }; this.isRunning = true; + void workflowRunningHandler(); try { const target = this.env.USER_WORKFLOW; diff --git a/packages/workflows-shared/src/index.ts b/packages/workflows-shared/src/index.ts index 2bd6abf17a36..0a25c6d5da48 100644 --- a/packages/workflows-shared/src/index.ts +++ b/packages/workflows-shared/src/index.ts @@ -2,3 +2,4 @@ export * from "./binding"; export * from "./context"; export * from "./engine"; export * from "./instance"; +export * from "./modifier"; diff --git a/packages/workflows-shared/src/instance.ts b/packages/workflows-shared/src/instance.ts index 99e622290596..0b6bcd1b79e8 100644 --- a/packages/workflows-shared/src/instance.ts +++ b/packages/workflows-shared/src/instance.ts @@ -83,7 +83,9 @@ export function toInstanceStatus(status: string): InstanceStatus { case "unknown": throw new Error("unknown cannot be parsed into a InstanceStatus"); default: - throw new Error(`${status} was not handled`); + throw new Error( + `${status} was not handled because it's not a valid InstanceStatus` + ); } } diff --git a/packages/workflows-shared/src/modifier.ts b/packages/workflows-shared/src/modifier.ts new file mode 100644 index 000000000000..743dd2aec438 --- /dev/null +++ b/packages/workflows-shared/src/modifier.ts @@ -0,0 +1,172 @@ +import { RpcTarget } from "cloudflare:workers"; +import { computeHash } from "./lib/cache"; +import type { Event } from "./context"; +import type { Engine } from "./engine"; + +export type StepSelector = { + name: string; + index?: number; +}; + +type UserEvent = { + type: string; + payload: unknown; +}; + +export class WorkflowInstanceModifier extends RpcTarget { + #engine: Engine; + #state: DurableObjectState; + + constructor(engine: Engine, state: DurableObjectState) { + super(); + this.#engine = engine; + this.#state = state; + } + + async #getWaitForEventCacheKey(step: StepSelector): Promise { + let count = 1; + if (step.index) { + count = step.index; + } + const name = `${step.name}-${count}`; + const hash = await computeHash(name); + const cacheKey = `${hash}-${count}`; + const waitForEventKey = `${cacheKey}-value`; + + return waitForEventKey; + } + + async #getStepCacheKey(step: StepSelector): Promise { + const hash = await computeHash(step.name); + let count = 1; + if (step.index) { + count = step.index; + } + const cacheKey = `${hash}-${count}`; + const valueKey = `${cacheKey}-value`; + + return valueKey; + } + + #getAndIncrementCounter = async (valueKey: string, by: number) => { + const counterKey = `failure-index-${valueKey}`; + const next = (await this.#state.storage.get(counterKey)) ?? 1; + await this.#state.storage.put(counterKey, next + by); + return next; + }; + + async #getSleepStepDisableKey(step: StepSelector): Promise { + let count = 1; + if (step.index) { + count = step.index; + } + const sleepNameCountHash = await computeHash(step.name + count); + + return sleepNameCountHash; + } + + async disableSleeps(steps?: StepSelector[]): Promise { + if (!steps) { + await this.#state.storage.put("disableAllSleeps", true); + } else { + for (const step of steps) { + const sleepDisableKey = await this.#getSleepStepDisableKey(step); + await this.#state.storage.put(sleepDisableKey, true); + } + } + } + + // step.do() flow: It first checks if a result or error is already in the cache and, if so, returns it immediately. + // If nothing is in the cache, it checks for remaining attempts and runs the user's code against the defined timeout. + // Since `step.do()` performs this initial cache check, directly changing the `valueKey` would cause it to + // assume the value was pre-cached, preventing it from writing any logs about the step's execution state. + // Storing the value under a separate key is crucial because it ensures all execution logs for the step are + // generated, rather than the step being skipped due to a premature cache hit. + async mockStepResult(step: StepSelector, stepResult: unknown): Promise { + const valueKey = await this.#getStepCacheKey(step); + + if (await this.#state.storage.get(`replace-result-${valueKey}`)) { + throw new Error( + `[WorkflowIntrospector] Trying to mock step '${step.name}' multiple times!` + ); + } + + await this.#state.storage.put(`replace-result-${valueKey}`, stepResult); + } + + // Same logic of `mockStepResult` but stores an error instead of a value. + async mockStepError( + step: StepSelector, + error: Error, + times?: number + ): Promise { + const valueKey = await this.#getStepCacheKey(step); + const serializableError = { + name: error.name, + message: error.message, + }; + + if (await this.#state.storage.get(`replace-result-${valueKey}`)) { + throw new Error( + `[WorkflowIntrospector] Trying to mock error on step '${step.name}' after mocking its result!` + ); + } + + if (times) { + const start = await this.#getAndIncrementCounter(valueKey, times); + const mockErrorsPuts = Array.from({ length: times }, (_, i) => { + const attempt = start + i; + const mockErrorKey = `mock-step-error-${valueKey}-${attempt}`; + return this.#state.storage.put(mockErrorKey, serializableError); + }); + + await Promise.all(mockErrorsPuts); + } else { + const mockErrorKey = `mock-step-error-${valueKey}`; + await this.#state.storage.put(mockErrorKey, serializableError); + } + } + + async forceStepTimeout(step: StepSelector, times?: number) { + const valueKey = await this.#getStepCacheKey(step); + + if (await this.#state.storage.get(`replace-result-${valueKey}`)) { + throw new Error( + `[WorkflowIntrospector] Trying to force timeout on step '${step.name}' after mocking its result!` + ); + } + + if (times) { + const start = await this.#getAndIncrementCounter(valueKey, times); + const forceTimeouts = Array.from({ length: times }, (_, i) => { + const attempt = start + i; + const forceStepTimeoutKey = `force-step-timeout-${valueKey}-${attempt}`; + return this.#state.storage.put(forceStepTimeoutKey, true); + }); + + await Promise.all(forceTimeouts); + } else { + const forceStepTimeoutKey = `force-step-timeout-${valueKey}`; + await this.#state.storage.put(forceStepTimeoutKey, true); + } + } + + async mockEvent(event: UserEvent): Promise { + const myEvent: Event = { + timestamp: new Date(), + payload: event.payload, + type: event.type, + }; + + await this.#state.storage.put(`mock-event-${event.type}`, true); + await this.#engine.receiveEvent(myEvent); + } + + async forceEventTimeout(step: StepSelector): Promise { + const waitForEventKey = await this.#getWaitForEventCacheKey(step); + await this.#state.storage.put( + `force-event-timeout-${waitForEventKey}`, + true + ); + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 75bf9d8c64c3..57c3ce044858 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -3279,6 +3279,9 @@ importers: '@cloudflare/workers-types': specifier: catalog:default version: 4.20250906.0 + '@cloudflare/workflows-shared': + specifier: workspace:* + version: link:../workflows-shared '@types/node': specifier: ^20.19.9 version: 20.19.9 @@ -3530,8 +3533,8 @@ importers: specifier: ^2.5.0 version: 2.5.0 itty-time: - specifier: ^1.0.6 - version: 1.0.6 + specifier: ^2.0.2 + version: 2.0.2 mime: specifier: ^3.0.0 version: 3.0.0 @@ -10103,6 +10106,9 @@ packages: itty-time@1.0.6: resolution: {integrity: sha512-+P8IZaLLBtFv8hCkIjcymZOp4UJ+xW6bSlQsXGqrkmJh7vSiMFSlNne0mCYagEE0N7HDNR5jJBRxwN0oYv61Rw==} + itty-time@2.0.2: + resolution: {integrity: sha512-95U9Xo3Qggz/e82ewZQJw8x76HgiMvbrijzoPy6nCTae4FZEwbcVJgrB57Kvddf8Q5pQdgJKTX6euFJpDr6Low==} + jackspeak@3.4.3: resolution: {integrity: sha512-OGlZQpz2yfahA/Rd1Y8Cd9SIEsqvXkLVoSw/cgwhnhFMDbsQFeZYoJJ7bIZBS9BcamUW96asq/npPWugM+RQBw==} @@ -20577,6 +20583,8 @@ snapshots: itty-time@1.0.6: {} + itty-time@2.0.2: {} + jackspeak@3.4.3: dependencies: '@isaacs/cliui': 8.0.2