Skip to content

Commit 9ab0943

Browse files
committed
Add mockStepError and cleanUp. Improve waitForStepResult and waitForStatus
1 parent ff97447 commit 9ab0943

File tree

6 files changed

+249
-78
lines changed

6 files changed

+249
-78
lines changed

packages/vitest-pool-workers/src/pool/index.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -535,14 +535,16 @@ Workflows defined in project: ${workflowClassNames.join(", ")}`);
535535

536536
// Add Workflows Engines DOs bindings to the Runner Worker
537537
for (const value of Object.values(runnerWorker.workflows ?? {})) {
538-
runnerWorker.durableObjects[
539-
`${WORKFLOW_ENGINE_BINDING}${value.name.toUpperCase()}`
540-
] = {
538+
const engineName = `${WORKFLOW_ENGINE_BINDING}${value.name.toUpperCase()}`;
539+
runnerWorker.durableObjects[engineName] = {
541540
className: "Engine",
542541
unsafeScriptName: `workflows:${value.name}`,
542+
unsafeUniqueKey: `miniflare-workflows-${value.name}`,
543543
};
544544
}
545545

546+
// console.log("Runner Worker", runnerWorker);
547+
546548
// Vite has its own define mechanism, but we can't control it from custom
547549
// pools. Our defines come from `wrangler.toml` files which are only parsed
548550
// with the rest of the pool configuration. Instead, we implement our own

packages/vitest-pool-workers/src/worker/index.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import {
2121
import { createChunkingSocket } from "../shared/chunking-socket";
2222
import type { SocketLike } from "../shared/chunking-socket";
2323
import type { VitestExecutor as VitestExecutorType } from "vitest/execute";
24-
import { env } from "cloudflare:workers";
2524

2625
function structuredSerializableStringify(value: unknown): string {
2726
return devalue.stringify(value, structuredSerializableReducers);
@@ -319,10 +318,6 @@ export class RunnerObject implements DurableObject {
319318
}
320319
}
321320

322-
// @ts-expect-error asd
323-
console.log("HIII", env["__VITEST_POOL_WORKERS_RUNNER_OBJECT"])
324-
console.log("HIII", env["USER_ENGINE_MY_WORKFLOW"])
325-
326321
export default createWorkerEntrypointWrapper("default");
327322

328323
// Re-export user Durable Object wrappers
Lines changed: 44 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,23 @@
1-
import { WAIT_FOREVER, waitUntil } from "async-wait-until";
21
import { WORKFLOW_ENGINE_BINDING } from "../shared/workflows";
32
import { internalEnv } from "./env";
4-
5-
// import { instanceStatusName } from "@cloudflare/workflows-shared";
6-
7-
export enum InstanceStatus {
8-
Queued = 0, // Queued and waiting to start
9-
Running = 1,
10-
Paused = 2, // TODO (WOR-73): Implement pause
11-
Errored = 3, // Stopped due to a user or system Error
12-
Terminated = 4, // Stopped explicitly by user
13-
Complete = 5, // Successful completion
14-
// TODO (WOR-71): Sleep
15-
}
16-
17-
export function instanceStatusName(status: InstanceStatus) {
18-
switch (status) {
19-
case InstanceStatus.Queued:
20-
return "queued";
21-
case InstanceStatus.Running:
22-
return "running";
23-
case InstanceStatus.Paused:
24-
return "paused";
25-
case InstanceStatus.Errored:
26-
return "errored";
27-
case InstanceStatus.Terminated:
28-
return "terminated";
29-
case InstanceStatus.Complete:
30-
return "complete";
31-
default:
32-
return "unknown";
33-
}
34-
}
3+
// import type { InstanceStatus } from "@cloudflare/workflows-shared/src/instance";
4+
import type { InstanceModifier } from "@cloudflare/workflows-shared/src/modifier";
355

366
export type StepSelector = {
377
name: string;
388
index?: number;
399
};
4010

4111
export type WorkflowInstanceIntrospector = {
42-
modify(fn: (m: InstanceModifier) => void): WorkflowInstanceIntrospector;
12+
modify(
13+
fn: (m: InstanceModifier) => Promise<void>
14+
): Promise<WorkflowInstanceIntrospector>;
4315

44-
waitForStepResult(step: StepSelector): Promise<any>;
16+
waitForStepResult(step: StepSelector): Promise<unknown>;
4517

46-
waitUntil(opts: { status: InstanceStatus }): Promise<void>;
18+
waitForStatus(status: string): Promise<void>;
19+
20+
cleanUp(): Promise<void>;
4721
};
4822

4923
/**
@@ -58,12 +32,17 @@ export async function introspectWorkflowInstance(
5832
throw new Error("Workflow binding and instance id are required.");
5933
}
6034

61-
console.log(`Introspecting workflow instance: ${instanceId}`);
35+
console.log(
36+
`[Vitest-Workflows] Introspecting workflow instance: ${instanceId}`
37+
);
6238

63-
await workflow.create({ id: instanceId }); // why do I need to create? Worked before without it
39+
//await workflow.create({ id: instanceId });
6440

41+
// @ts-expect-error getWorkflowName() not exposed
6542
const engineBindingName = `${WORKFLOW_ENGINE_BINDING}${(await workflow.getWorkflowName()).toUpperCase()}`;
43+
// @ts-expect-error binding created at in runner worker start
6644
const engineStubId = internalEnv[engineBindingName].idFromName(instanceId);
45+
// @ts-expect-error binding created at in runner worker start
6746
const engineStub = internalEnv[engineBindingName].get(engineStubId);
6847

6948
const instanceModifier = await engineStub.getInstanceModifier();
@@ -84,33 +63,40 @@ class WorkflowInstanceIntrospectorHandle
8463
this.instanceModifier = instanceModifier;
8564
}
8665

87-
async modify(
88-
fn: (m: InstanceModifier) => void
89-
): WorkflowInstanceIntrospector {
66+
public async modify(
67+
fn: (m: InstanceModifier) => Promise<void>
68+
): Promise<WorkflowInstanceIntrospector> {
69+
console.log("[Vitest-Workflows] I should go call a modifier");
9070
await fn(this.instanceModifier);
91-
console.log("Should allow modifications");
9271
return this;
9372
}
9473

95-
async waitForStepResult(step: StepSelector): Promise<unknown> {
96-
await waitUntil(async () => {});
74+
public async waitForStepResult(step: StepSelector): Promise<unknown> {
75+
console.log("waiting for step result of step", step.name);
76+
// @ts-expect-error waitForStepResult not exposed
77+
const stepResult = await this.engineStub.waitForStepResult(
78+
step.name,
79+
step.index
80+
);
9781

98-
console.log("Should await the step result of step", step.name);
99-
return { result: "result" };
82+
console.log("result of step", step.name, "awaited");
83+
return stepResult;
10084
}
10185

102-
async waitUntil(opts: { status: InstanceStatus }): Promise<void> {
103-
// console.log("I waited until the Engine reached the status", opts.status);
104-
await waitUntil(
105-
async () => {
106-
const currentStatus = instanceStatusName(
107-
await this.engineStub.getStatus()
108-
);
109-
console.log("status from engine", currentStatus);
110-
console.log("status user wants", opts.status);
111-
return currentStatus === opts.status;
112-
},
113-
{ timeout: WAIT_FOREVER } // Default timeout is 5s, crashes the test after that
114-
);
86+
public async waitForStatus(status: string): Promise<void> {
87+
console.log("[Vitest-Workflows] waiting for status");
88+
// @ts-expect-error waitForStatus not exposed
89+
await this.engineStub.waitForStatus(status);
90+
91+
console.log("[Vitest-Workflows] status awaited");
92+
}
93+
94+
public async cleanUp(): Promise<void> {
95+
// works with isolatedStorage = false
96+
try {
97+
await this.engineStub.abort("user called delete");
98+
} catch {
99+
// do nothing because we want to clean up this instance
100+
}
115101
}
116102
}

packages/workflows-shared/src/context.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,35 @@ export class Context extends RpcTarget {
274274
await this.#state.storage.put(stepStateKey, stepState);
275275
const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`;
276276

277+
const mockErrorKey = `mock-error-${valueKey}`;
278+
const mockedErrorPayload =
279+
(await this.#state.storage.get<{
280+
name: string;
281+
message: string;
282+
}>(mockErrorKey)) ||
283+
(await this.#state.storage.get<{
284+
name: string;
285+
message: string;
286+
}>(`${mockErrorKey}-${stepState.attemptedCount}`));
287+
288+
// if a mocked error exists, throw it immediately
289+
if (mockedErrorPayload) {
290+
const errorToThrow = new Error(mockedErrorPayload.message);
291+
errorToThrow.name = mockedErrorPayload.name;
292+
console.log("[Context.do()] Will throw an error for step:", name);
293+
throw errorToThrow;
294+
}
295+
277296
const replaceResult = await this.#state.storage.get(
278297
`replace-result-${valueKey}`
279298
);
280299
if (replaceResult) {
300+
console.log(
301+
"[Context.do()] Will replace the result of step:",
302+
name,
303+
"with:",
304+
replaceResult
305+
);
281306
result = replaceResult;
282307
} else {
283308
result = await Promise.race([doWrapperClosure(), timeoutPromise()]);

0 commit comments

Comments
 (0)