Skip to content

Commit e886bdd

Browse files
committed
Add introspectWorkflow API with proxies
1 parent 44dc7cf commit e886bdd

File tree

5 files changed

+203
-65
lines changed

5 files changed

+203
-65
lines changed

packages/miniflare/src/plugins/workflows/index.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ export const WORKFLOWS_PLUGIN: Plugin<
8282

8383
// this creates one miniflare service per workflow that the user's script has. we should dedupe engine definition later
8484
const services = Object.entries(options.workflows ?? {}).map<Service>(
85-
([_bindingName, workflow]) => {
85+
([bindingName, workflow]) => {
8686
// NOTE(lduarte): the engine unique namespace key must be unique per workflow definition
8787
// otherwise workerd will crash because there's two equal DO namespaces
8888
const uniqueKey = `miniflare-workflows-${workflow.name}`;
@@ -106,7 +106,9 @@ export const WORKFLOWS_PLUGIN: Plugin<
106106
className: "Engine",
107107
enableSql: true,
108108
uniqueKey,
109-
preventEviction: true,
109+
// Note(osilva): the engine should not be prevented from eviction if we wish
110+
// to abort it during tests (vitest-pool-workers)
111+
// preventEviction: true,
110112
},
111113
],
112114
durableObjectStorage: {
@@ -128,6 +130,10 @@ export const WORKFLOWS_PLUGIN: Plugin<
128130
name: "WORKFLOW_NAME",
129131
json: JSON.stringify(workflow.name),
130132
},
133+
{
134+
name: "BINDING_NAME",
135+
json: JSON.stringify(bindingName),
136+
},
131137
],
132138
},
133139
};

packages/vitest-pool-workers/src/worker/lib/cloudflare/test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ export {
1919
applyD1Migrations,
2020
createPagesEventContext,
2121
introspectWorkflowInstance,
22+
introspectWorkflow,
2223
} from "cloudflare:test-internal";

packages/vitest-pool-workers/src/worker/workflows.ts

Lines changed: 179 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,98 +2,86 @@ import {
22
instanceStatusName,
33
InstanceStatus as InstanceStatusNumber,
44
} from "@cloudflare/workflows-shared/src/instance";
5-
import { WorkflowIntrospectorError } from "@cloudflare/workflows-shared/src/modifier";
65
import { WORKFLOW_ENGINE_BINDING } from "../shared/workflows";
7-
import { internalEnv } from "./env";
8-
import type { InstanceModifier } from "@cloudflare/workflows-shared/src/modifier";
6+
import { runInRunnerObject } from "./durable-objects";
7+
import { env, internalEnv } from "./env";
8+
import type {
9+
StepSelector,
10+
WorkflowInstanceModifier,
11+
} from "@cloudflare/workflows-shared/src/modifier";
912

10-
export type StepSelector = {
11-
name: string;
12-
index?: number;
13-
};
13+
type ModifierCallback = (m: WorkflowInstanceModifier) => Promise<void>;
1414

15-
export type WorkflowInstanceIntrospector = {
16-
modify(
17-
fn: (m: InstanceModifier) => Promise<void>
18-
): Promise<WorkflowInstanceIntrospector>;
15+
// See public facing `cloudflare:test` types for docs
16+
export interface WorkflowInstanceIntrospector {
17+
modify(fn: ModifierCallback): Promise<WorkflowInstanceIntrospector>;
1918

2019
waitForStepResult(step: StepSelector): Promise<unknown>;
2120

2221
waitForStatus(status: string): Promise<void>;
2322

2423
cleanUp(): Promise<void>;
25-
};
24+
}
2625

27-
/**
28-
* Entry point that targets a single Workflow instance
29-
* This would allow to apply test rules and mocks to the given instance
30-
*/
3126
export async function introspectWorkflowInstance(
3227
workflow: Workflow,
3328
instanceId: string
3429
): Promise<WorkflowInstanceIntrospector> {
3530
if (!workflow || !instanceId) {
36-
throw new Error("Workflow binding and instance id are required.");
31+
throw new Error(
32+
"[WorkflowIntrospector] Workflow binding and instance id are required."
33+
);
3734
}
3835

39-
console.log(
40-
`[Vitest-Workflows] Introspecting workflow instance: ${instanceId}`
41-
);
42-
4336
// @ts-expect-error getWorkflowName() not exposed
4437
const engineBindingName = `${WORKFLOW_ENGINE_BINDING}${(await workflow.getWorkflowName()).toUpperCase()}`;
38+
4539
// @ts-expect-error DO binding created in runner worker start
4640
const engineStubId = internalEnv[engineBindingName].idFromName(instanceId);
4741
// @ts-expect-error DO binding created in runner worker start
4842
const engineStub = internalEnv[engineBindingName].get(engineStubId);
4943

50-
const instanceModifier = await engineStub.getInstanceModifier();
44+
const instanceModifier = engineStub.getInstanceModifier();
5145

5246
return new WorkflowInstanceIntrospectorHandle(engineStub, instanceModifier);
5347
}
5448

5549
class WorkflowInstanceIntrospectorHandle
5650
implements WorkflowInstanceIntrospector
5751
{
58-
engineStub: DurableObjectStub;
59-
instanceModifier: InstanceModifier;
52+
#engineStub: DurableObjectStub;
53+
#instanceModifier: WorkflowInstanceModifier;
6054
constructor(
6155
engineStub: DurableObjectStub,
62-
instanceModifier: InstanceModifier
56+
instanceModifier: WorkflowInstanceModifier
6357
) {
64-
this.engineStub = engineStub;
65-
this.instanceModifier = instanceModifier;
58+
this.#engineStub = engineStub;
59+
this.#instanceModifier = instanceModifier;
6660
}
6761

68-
async modify(
69-
fn: (m: InstanceModifier) => Promise<void>
70-
): Promise<WorkflowInstanceIntrospector> {
71-
console.log("[Vitest-Workflows] I should go call a modifier");
72-
await fn(this.instanceModifier);
62+
async modify(fn: ModifierCallback): Promise<WorkflowInstanceIntrospector> {
63+
await fn(this.#instanceModifier);
64+
7365
return this;
7466
}
7567

7668
async waitForStepResult(step: StepSelector): Promise<unknown> {
77-
console.log("waiting for step result of step", step.name);
78-
// @ts-expect-error waitForStepResult not exposed
79-
const stepResult = await this.engineStub.waitForStepResult(
69+
// @ts-expect-error DO binding created in runner worker start
70+
const stepResult = await this.#engineStub.waitForStepResult(
8071
step.name,
8172
step.index
8273
);
8374

84-
console.log("result of step", step.name, "awaited");
8575
return stepResult;
8676
}
8777

8878
async waitForStatus(status: InstanceStatus["status"]): Promise<void> {
89-
console.log("[Vitest-Workflows] waiting for status");
90-
9179
if (
9280
status === instanceStatusName(InstanceStatusNumber.Terminated) ||
9381
status === instanceStatusName(InstanceStatusNumber.Paused)
9482
) {
95-
throw new WorkflowIntrospectorError(
96-
`InstanceStatus '${status}' is not implemented yet and cannot be waited.`
83+
throw new Error(
84+
`[WorkflowIntrospector] InstanceStatus '${status}' is not implemented yet and cannot be waited.`
9785
);
9886
}
9987

@@ -102,19 +90,165 @@ class WorkflowInstanceIntrospectorHandle
10290
// starts running, so waiting for it to be queued should always return
10391
return;
10492
}
105-
// @ts-expect-error waitForStatus not exposed
106-
await this.engineStub.waitForStatus(status);
107-
108-
console.log("[Vitest-Workflows] status awaited");
93+
// @ts-expect-error DO binding created in runner worker start
94+
await this.#engineStub.waitForStatus(status);
10995
}
11096

11197
async cleanUp(): Promise<void> {
11298
// this cleans state with isolatedStorage = false
11399
try {
114100
// @ts-expect-error DO binding created in runner worker start
115-
await this.engineStub.unsafeAbort("user called delete");
101+
await this.#engineStub._unsafeAbort("Instance clean up");
116102
} catch {
117103
// do nothing because we want to clean up this instance
118104
}
119105
}
120106
}
107+
108+
// See public facing `cloudflare:test` types for docs
109+
export interface WorkflowIntrospector {
110+
modifyAll(fn: ModifierCallback): void;
111+
112+
get(): WorkflowInstanceIntrospector[];
113+
114+
cleanUp(): void;
115+
}
116+
117+
export async function introspectWorkflow(
118+
workflow: Workflow
119+
): Promise<WorkflowIntrospectorHandle> {
120+
if (!workflow) {
121+
throw new Error("[WorkflowIntrospector] Workflow binding is required.");
122+
}
123+
124+
const modifierCallbacks: ModifierCallback[] = [];
125+
const instanceIntrospectors: WorkflowInstanceIntrospector[] = [];
126+
// @ts-expect-error getBindingName not exposed
127+
const bindingName = await workflow.getBindingName();
128+
const internalOriginalWorkflow = internalEnv[bindingName] as Workflow;
129+
const externalOriginalWorkflow = env[bindingName] as Workflow;
130+
131+
const introspectAndModifyInstance = async (instanceId: string) => {
132+
try {
133+
await runInRunnerObject(internalEnv, async () => {
134+
const introspector = await introspectWorkflowInstance(
135+
workflow,
136+
instanceId
137+
);
138+
instanceIntrospectors.push(introspector);
139+
// Apply any stored modifier functions
140+
for (const callback of modifierCallbacks) {
141+
await introspector.modify(callback);
142+
}
143+
});
144+
} catch (error) {
145+
console.error(
146+
`[WorkflowIntrospector] Error during introspection for instance ${instanceId}:`,
147+
error
148+
);
149+
throw new Error(
150+
`[WorkflowIntrospector] Failed to introspect Workflow instance ${instanceId}.`
151+
);
152+
}
153+
};
154+
155+
const createWorkflowProxyGetHandler = <
156+
T extends Workflow,
157+
>(): ProxyHandler<T>["get"] => {
158+
return (target, property) => {
159+
if (property === "create") {
160+
return new Proxy(target[property], {
161+
async apply(func, thisArg, argArray) {
162+
const hasId = Object.hasOwn(argArray[0] ?? {}, "id");
163+
if (!hasId) {
164+
argArray = [{ id: crypto.randomUUID(), ...(argArray[0] ?? {}) }];
165+
}
166+
const instanceId = (argArray[0] as { id: string }).id;
167+
168+
await introspectAndModifyInstance(instanceId);
169+
170+
return target[property](...argArray);
171+
},
172+
});
173+
}
174+
175+
if (property === "createBatch") {
176+
return new Proxy(target[property], {
177+
async apply(func, thisArg, argArray) {
178+
for (const [index, arg] of argArray[0]?.entries() ?? []) {
179+
const hasId = Object.hasOwn(arg, "id");
180+
if (!hasId) {
181+
argArray[0][index] = { id: crypto.randomUUID(), ...arg };
182+
}
183+
}
184+
185+
await Promise.all(
186+
argArray[0].map((options: { id: string }) =>
187+
introspectAndModifyInstance(options.id)
188+
)
189+
);
190+
return target[property](argArray[0]);
191+
},
192+
});
193+
}
194+
// @ts-expect-error index signature
195+
return target[property];
196+
};
197+
};
198+
199+
const cleanup = () => {
200+
internalEnv[bindingName] = internalOriginalWorkflow;
201+
env[bindingName] = externalOriginalWorkflow;
202+
};
203+
204+
// Create a single handler instance to be reused
205+
const proxyGetHandler = createWorkflowProxyGetHandler();
206+
207+
// Apply the proxies using the shared handler logic
208+
internalEnv[bindingName] = new Proxy(internalOriginalWorkflow, {
209+
get: proxyGetHandler,
210+
});
211+
env[bindingName] = new Proxy(externalOriginalWorkflow, {
212+
get: proxyGetHandler,
213+
});
214+
215+
return new WorkflowIntrospectorHandle(
216+
workflow,
217+
modifierCallbacks,
218+
instanceIntrospectors,
219+
cleanup
220+
);
221+
}
222+
223+
class WorkflowIntrospectorHandle implements WorkflowIntrospector {
224+
workflow: Workflow;
225+
#modifierCallbacks: ModifierCallback[];
226+
#instanceIntrospectors: WorkflowInstanceIntrospector[];
227+
#cleanupCallback: () => void;
228+
229+
constructor(
230+
workflow: Workflow,
231+
modifierCallbacks: ModifierCallback[],
232+
instanceIntrospectors: WorkflowInstanceIntrospector[],
233+
cleanupCallback: () => void
234+
) {
235+
this.workflow = workflow;
236+
this.#modifierCallbacks = modifierCallbacks;
237+
this.#instanceIntrospectors = instanceIntrospectors;
238+
this.#cleanupCallback = cleanupCallback;
239+
}
240+
241+
modifyAll(fn: ModifierCallback): void {
242+
this.#modifierCallbacks.push(fn);
243+
}
244+
245+
get(): WorkflowInstanceIntrospector[] {
246+
return this.#instanceIntrospectors;
247+
}
248+
249+
cleanUp(): void {
250+
this.#modifierCallbacks = [];
251+
this.#instanceIntrospectors = [];
252+
this.#cleanupCallback();
253+
}
254+
}

packages/workflows-shared/src/binding.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import type {
1111
type Env = {
1212
ENGINE: DurableObjectNamespace<Engine>;
1313
WORKFLOW_NAME: string;
14+
BINDING_NAME: string;
1415
};
1516

1617
// this.env.WORKFLOW is WorkflowBinding
@@ -79,9 +80,13 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
7980

8081
return await Promise.all(batch.map((val) => this.create(val)));
8182
}
82-
public async getWorkflowName(): Promise<string> {
83+
public getWorkflowName(): string {
8384
return this.env.WORKFLOW_NAME;
8485
}
86+
87+
public getBindingName(): string {
88+
return this.env.BINDING_NAME;
89+
}
8590
}
8691

8792
export class WorkflowHandle extends RpcTarget implements WorkflowInstance {

0 commit comments

Comments
 (0)