Skip to content

Commit 3fa0237

Browse files
committed
Add team review suggestions
1 parent 4b7640b commit 3fa0237

File tree

6 files changed

+36
-45
lines changed

6 files changed

+36
-45
lines changed

fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ const STATUS_COMPLETE = "complete";
66

77
describe("Test Workflow", () => {
88
it("should be able to trigger a workflow", async () => {
9-
// With `using` to ensure cleanup:
9+
// With `using` to ensure Workflow instances cleanup:
1010
await using introspector = await introspectWorkflow(env.TEST_WORKFLOW);
1111
const res = await SELF.fetch("https://mock-worker.local");
1212

1313
expect(res.status).toBe(200);
1414
});
1515

1616
it("workflow should reach the end and be successful", async () => {
17-
// With `using` to ensure cleanup:
17+
// With `using` to ensure Workflow instances cleanup:
1818
await using introspector = await introspectWorkflow(env.TEST_WORKFLOW);
1919
const res = await SELF.fetch("https://mock-worker.local");
2020

@@ -30,7 +30,7 @@ describe("Test Workflow", () => {
3030
});
3131

3232
it("workflow should reach the end and be successful with introspector", async () => {
33-
// CONFIG with `using` to ensure cleanup:
33+
// CONFIG with `using` to ensure Workflow instances cleanup:
3434
await using introspector = await introspectWorkflow(env.TEST_WORKFLOW);
3535

3636
await SELF.fetch("https://mock-worker.local");
@@ -42,7 +42,7 @@ describe("Test Workflow", () => {
4242
const instance = instances[0];
4343
await instance.waitForStatus(STATUS_COMPLETE);
4444

45-
// CLEANUP: assured by Symbol.asyncDispose
45+
// CLEANUP: ensured by `using`
4646
});
4747
});
4848

@@ -51,7 +51,7 @@ describe("Test long Workflow", () => {
5151
const mockResult = "mocked result";
5252

5353
it("workflow should be able to introspect and reach the end and be successful", async () => {
54-
// CONFIG with `using` to ensure cleanup:
54+
// CONFIG with `using` to ensure Workflow instances cleanup:
5555
await using introspector = await introspectWorkflow(env.TEST_LONG_WORKFLOW);
5656
introspector.modifyAll(async (m) => {
5757
await m.disableSleeps();
@@ -70,7 +70,7 @@ describe("Test long Workflow", () => {
7070
);
7171
await instance.waitForStatus(STATUS_COMPLETE);
7272

73-
// CLEANUP: done by Symbol.asyncDispose
73+
// CLEANUP: ensured by `using`
7474
});
7575

7676
it("workflow batch should be able to introspect and reach the end and be successful (explicit cleanup)", async () => {

fixtures/vitest-pool-workers-examples/workflows/test/unit.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ describe("Long Workflow (single instance)", () => {
270270
await expect(
271271
instance.waitForStatus(STATUS_COMPLETE)
272272
).rejects.toMatchInlineSnapshot(
273-
`[Error: [WorkflowIntrospector] The Wokflow instance 12345678910 has reached status 'errored'. This is a finite status that prevents it from ever reaching the expected status of 'complete'.]`
273+
`[Error: [WorkflowIntrospector] The Workflow instance 12345678910 has reached status 'errored'. This is a finite status that prevents it from ever reaching the expected status of 'complete'.]`
274274
);
275275

276276
const instanceStatus = await createdInstance.status();
@@ -620,7 +620,7 @@ describe("Long Workflow (batch)", () => {
620620
await expect(
621621
instances[0].waitForStatus(STATUS_COMPLETE)
622622
).rejects.toMatchInlineSnapshot(
623-
`[Error: [WorkflowIntrospector] The Wokflow instance 12345678910 has reached status 'errored'. This is a finite status that prevents it from ever reaching the expected status of 'complete'.]`
623+
`[Error: [WorkflowIntrospector] The Workflow instance 12345678910 has reached status 'errored'. This is a finite status that prevents it from ever reaching the expected status of 'complete'.]`
624624
);
625625

626626
expect((await createdInstances[0].status()).status).toBe(STATUS_ERRORED);

packages/vitest-pool-workers/src/worker/workflows.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ export async function introspectWorkflow(
113113
const modifierCallbacks: ModifierCallback[] = [];
114114
const instanceIntrospectors: WorkflowInstanceIntrospector[] = [];
115115
// @ts-expect-error getBindingName not exposed
116-
const bindingName = await workflow.getBindingName();
116+
const bindingName = await workflow.unsafeGetBindingName();
117117
const internalOriginalWorkflow = internalEnv[bindingName] as Workflow;
118118
const externalOriginalWorkflow = env[bindingName] as Workflow;
119119

packages/workflows-shared/src/binding.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
8080
return await Promise.all(batch.map((val) => this.create(val)));
8181
}
8282

83-
public getBindingName(): string {
83+
public unsafeGetBindingName(): string {
8484
return this.env.BINDING_NAME;
8585
}
8686

@@ -109,7 +109,7 @@ export class WorkflowBinding extends WorkerEntrypoint<Env> implements Workflow {
109109
const stub = this.env.ENGINE.get(stubId);
110110

111111
try {
112-
await stub._unsafeAbort(reason);
112+
await stub.unsafeAbort(reason);
113113
} catch {
114114
// do nothing because we want to clean up this instance
115115
}

packages/workflows-shared/src/context.ts

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,6 @@ export class Context extends RpcTarget {
242242
try {
243243
const timeoutPromise = async () => {
244244
const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`;
245-
// inserir aqui
246245
let timeout = ms(config.timeout);
247246
if (forceStepTimeout) {
248247
timeout = 0;
@@ -279,32 +278,34 @@ export class Context extends RpcTarget {
279278
const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`;
280279

281280
const mockErrorKey = `mock-step-error-${valueKey}`;
282-
const mockedErrorPayload =
283-
(await this.#state.storage.get<{
284-
name: string;
285-
message: string;
286-
}>(mockErrorKey)) ||
287-
(await this.#state.storage.get<{
288-
name: string;
289-
message: string;
290-
}>(`${mockErrorKey}-${stepState.attemptedCount}`));
281+
const persistentMockError = await this.#state.storage.get<{
282+
name: string;
283+
message: string;
284+
}>(mockErrorKey);
285+
const transientMockError = await this.#state.storage.get<{
286+
name: string;
287+
message: string;
288+
}>(`${mockErrorKey}-${stepState.attemptedCount}`);
289+
const mockErrorPayload = persistentMockError || transientMockError;
291290

292291
// if a mocked error exists, throw it immediately
293-
if (mockedErrorPayload) {
294-
const errorToThrow = new Error(mockedErrorPayload.message);
295-
errorToThrow.name = mockedErrorPayload.name;
292+
if (mockErrorPayload) {
293+
const errorToThrow = new Error(mockErrorPayload.message);
294+
errorToThrow.name = mockErrorPayload.name;
296295
throw errorToThrow;
297296
}
298297

299298
const replaceResult = await this.#state.storage.get(
300299
`replace-result-${valueKey}`
301300
);
301+
302302
const forceStepTimeoutKey = `force-step-timeout-${valueKey}`;
303-
const forceStepTimeout =
304-
(await this.#state.storage.get(forceStepTimeoutKey)) ||
305-
(await this.#state.storage.get(
306-
`${forceStepTimeoutKey}-${stepState.attemptedCount}`
307-
));
303+
const persistentStepTimeout =
304+
await this.#state.storage.get(forceStepTimeoutKey);
305+
const transientStepTimeout = await this.#state.storage.get(
306+
`${forceStepTimeoutKey}-${stepState.attemptedCount}`
307+
);
308+
const forceStepTimeout = persistentStepTimeout || transientStepTimeout;
308309

309310
if (forceStepTimeout) {
310311
result = await timeoutPromise();
@@ -656,6 +657,9 @@ export class Context extends RpcTarget {
656657
const timeoutEntryPQ = this.#engine.priorityQueue.getFirst(
657658
(a) => a.hash === cacheKey && a.type === "timeout"
658659
);
660+
const forceEventTimeout = await this.#state.storage.get(
661+
`force-event-timeout-${waitForEventKey}`
662+
);
659663
if (
660664
(timeoutEntryPQ === undefined &&
661665
this.#engine.priorityQueue !== undefined &&
@@ -665,7 +669,7 @@ export class Context extends RpcTarget {
665669
})) ||
666670
(timeoutEntryPQ !== undefined &&
667671
timeoutEntryPQ.targetTimestamp < Date.now()) ||
668-
(await this.#state.storage.get(`force-event-timeout-${waitForEventKey}`))
672+
forceEventTimeout
669673
) {
670674
this.#engine.writeLog(
671675
InstanceEvent.WAIT_TIMED_OUT,

packages/workflows-shared/src/engine.ts

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import { TimePriorityQueue } from "./lib/timePriorityQueue";
1919
import { WorkflowInstanceModifier } from "./modifier";
2020
import type { Event } from "./context";
2121
import type { InstanceMetadata, RawInstanceLog } from "./instance";
22-
import type { StepSelector } from "./modifier";
2322
import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers";
2423

2524
export interface Env {
@@ -145,18 +144,6 @@ export class Engine extends DurableObject<Env> {
145144
}
146145
}
147146

148-
async readLogsFromStepSelector(
149-
step: StepSelector
150-
): Promise<RawInstanceLog[]> {
151-
const hash = await computeHash(step.name);
152-
let count = 1;
153-
if (step.index) {
154-
count = step.index;
155-
}
156-
const cacheKey = `${hash}-${count}`;
157-
return this.readLogsFromStep(cacheKey);
158-
}
159-
160147
readLogsFromStep(cacheKey: string): RawInstanceLog[] {
161148
return [
162149
...this.ctx.storage.sql.exec(
@@ -303,7 +290,7 @@ export class Engine extends DurableObject<Env> {
303290
if (waiter) {
304291
waiter.reject(
305292
new Error(
306-
`[WorkflowIntrospector] The Wokflow instance ${this.instanceId} has reached status '${instanceStatusName(reachedStatus)}'. This is a finite status that prevents it from ever reaching the expected status of '${instanceStatusName(unreachableStatus)}'.`
293+
`[WorkflowIntrospector] The Workflow instance ${this.instanceId} has reached status '${instanceStatusName(reachedStatus)}'. This is a finite status that prevents it from ever reaching the expected status of '${instanceStatusName(unreachableStatus)}'.`
307294
)
308295
);
309296
this.statusWaiters.delete(unreachableStatus);
@@ -377,7 +364,7 @@ export class Engine extends DurableObject<Env> {
377364
}
378365

379366
// Called by the cleanup function when introspecting in tests
380-
async _unsafeAbort(reason?: string) {
367+
async unsafeAbort(reason?: string) {
381368
await this.ctx.storage.sync();
382369
await this.ctx.storage.deleteAll();
383370

0 commit comments

Comments
 (0)