Skip to content

Commit ca977cc

Browse files
committed
check edge cases for wairForStatus - prevents user from waiting indefinitely
1 parent 9712184 commit ca977cc

File tree

3 files changed

+95
-13
lines changed

3 files changed

+95
-13
lines changed

packages/vitest-pool-workers/src/worker/workflows.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import {
2+
instanceStatusName,
3+
InstanceStatus as InstanceStatusNumber,
4+
} from "@cloudflare/workflows-shared/src/instance";
5+
import { WorkflowIntrospectorError } from "@cloudflare/workflows-shared/src/modifier";
16
import { WORKFLOW_ENGINE_BINDING } from "../shared/workflows";
27
import { internalEnv } from "./env";
38
import type { InstanceModifier } from "@cloudflare/workflows-shared/src/modifier";
@@ -80,16 +85,31 @@ class WorkflowInstanceIntrospectorHandle
8085
return stepResult;
8186
}
8287

83-
async waitForStatus(status: string): Promise<void> {
88+
async waitForStatus(status: InstanceStatus["status"]): Promise<void> {
8489
console.log("[Vitest-Workflows] waiting for status");
90+
91+
if (
92+
status === instanceStatusName(InstanceStatusNumber.Terminated) ||
93+
status === instanceStatusName(InstanceStatusNumber.Paused)
94+
) {
95+
throw new WorkflowIntrospectorError(
96+
`InstanceStatus '${status}' is not implemented yet and cannot be waited.`
97+
);
98+
}
99+
100+
if (status === instanceStatusName(InstanceStatusNumber.Queued)) {
101+
// we currently don't have a queue mechanism, but it would happen before it
102+
// starts running, so waiting for it to be queued should always return
103+
return;
104+
}
85105
// @ts-expect-error waitForStatus not exposed
86106
await this.engineStub.waitForStatus(status);
87107

88108
console.log("[Vitest-Workflows] status awaited");
89109
}
90110

91111
async cleanUp(): Promise<void> {
92-
// works with isolatedStorage = false
112+
// this cleans state with isolatedStorage = false
93113
try {
94114
// @ts-expect-error DO binding created in runner worker start
95115
await this.engineStub.unsafeAbort("user called delete");

packages/workflows-shared/src/engine.ts

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
startGracePeriod,
1717
} from "./lib/gracePeriodSemaphore";
1818
import { TimePriorityQueue } from "./lib/timePriorityQueue";
19-
import { InstanceModifier } from "./modifier";
19+
import { InstanceModifier, WorkflowIntrospectorError } from "./modifier";
2020
import type { Event } from "./context";
2121
import type { InstanceMetadata, RawInstanceLog } from "./instance";
2222
import type { StepSelector } from "./modifier";
@@ -226,14 +226,16 @@ export class Engine extends DurableObject<Env> {
226226
): Promise<void> {
227227
await this.ctx.storage.put(ENGINE_STATUS_KEY, status);
228228

229-
// Check if anyone is waiting for this new status
229+
// check if anyone is waiting for this status
230230
this.handleStatusWaiter(status);
231231

232232
console.log("[Engine] Changed my status to", instanceStatusName(status));
233233
}
234234

235-
private statusWaiters: Map<InstanceStatus, { resolve: () => void }> =
236-
new Map();
235+
private statusWaiters: Map<
236+
InstanceStatus,
237+
{ resolve: () => void; reject: (e: unknown) => void }
238+
> = new Map();
237239
async waitForStatus(status: string): Promise<void> {
238240
const targetStatus = toInstanceStatus(status);
239241
const currentStatus =
@@ -245,16 +247,74 @@ export class Engine extends DurableObject<Env> {
245247
}
246248

247249
// if it hasn't reached the desired state, create a new promise and add its resolver to the waiters map
248-
return new Promise((resolve) => {
249-
this.statusWaiters.set(targetStatus, { resolve });
250+
return new Promise((resolve, reject) => {
251+
this.statusWaiters.set(targetStatus, { resolve, reject });
250252
});
251253
}
252254

253-
handleStatusWaiter(status: InstanceStatus) {
255+
handleStatusWaiter(status: InstanceStatus): void {
254256
const waiter = this.statusWaiters.get(status);
257+
258+
// resolve if it reached the desired status
255259
if (waiter) {
256260
waiter.resolve();
257261
this.statusWaiters.delete(status);
262+
return;
263+
}
264+
265+
switch (status) {
266+
case InstanceStatus.Errored: {
267+
// if it reaches final status "errored", then it can't be waiting for it to complete or terminate
268+
const unreachableStatuses = [
269+
InstanceStatus.Complete,
270+
InstanceStatus.Terminated,
271+
];
272+
273+
this.rejectUnreachableStatus(status, unreachableStatuses);
274+
break;
275+
}
276+
case InstanceStatus.Terminated: {
277+
// if it reaches final status "terminated", then it can't be waiting for it to complete or error
278+
const unreachableStatuses = [
279+
InstanceStatus.Complete,
280+
InstanceStatus.Errored,
281+
];
282+
283+
this.rejectUnreachableStatus(status, unreachableStatuses);
284+
break;
285+
}
286+
case InstanceStatus.Complete: {
287+
// if it reaches final status "complete", then it can't be waiting for it to terminate or error
288+
const unreachableStatuses = [
289+
InstanceStatus.Terminated,
290+
InstanceStatus.Errored,
291+
];
292+
293+
this.rejectUnreachableStatus(status, unreachableStatuses);
294+
break;
295+
}
296+
default:
297+
break;
298+
}
299+
}
300+
301+
rejectUnreachableStatus(
302+
reachedStatus: number,
303+
unreachableStatuses: number[]
304+
): void {
305+
if (unreachableStatuses) {
306+
for (const unreachableStatus of unreachableStatuses) {
307+
const waiter = this.statusWaiters.get(unreachableStatus);
308+
if (waiter) {
309+
waiter.reject(
310+
new WorkflowIntrospectorError(
311+
`The Wokflow instance ${this.instanceId} has reached status '${instanceStatusName(reachedStatus)}'. This is a final state that prevents it from ever reaching the expected status of '${instanceStatusName(unreachableStatus)}'.`
312+
)
313+
);
314+
this.statusWaiters.delete(unreachableStatus);
315+
return;
316+
}
317+
}
258318
}
259319
}
260320

@@ -287,7 +347,7 @@ export class Engine extends DurableObject<Env> {
287347
return parsed?.result;
288348
}
289349
if (event === InstanceEvent.STEP_FAILURE) {
290-
throw parsed?.error ?? parsed ?? new Error("Step failed");
350+
throw parsed?.error ?? parsed;
291351
}
292352
}
293353

@@ -311,8 +371,8 @@ export class Engine extends DurableObject<Env> {
311371
waiter.resolve(result);
312372
this.stepResultWaiters.delete(group);
313373
} else if (event === InstanceEvent.STEP_FAILURE) {
314-
const errorLike = metadata?.error ?? metadata;
315-
waiter.reject(errorLike);
374+
const error = metadata?.error ?? new Error("Step failed");
375+
waiter.reject(error);
316376
this.stepResultWaiters.delete(group);
317377
}
318378
}

packages/workflows-shared/src/instance.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ export function toInstanceStatus(status: string): InstanceStatus {
8383
case "unknown":
8484
throw new Error("unknown cannot be parsed into a InstanceStatus");
8585
default:
86-
throw new Error(`${status} was not handled`);
86+
throw new Error(
87+
`${status} was not handled because it's not a valid InstanceStatus`
88+
);
8789
}
8890
}
8991

0 commit comments

Comments
 (0)