Skip to content

Commit 44dc7cf

Browse files
committed
Allow step errors and timeouts to be set and be performed in the correct order
1 parent 1bf6eab commit 44dc7cf

File tree

1 file changed

+46
-39
lines changed

1 file changed

+46
-39
lines changed

packages/workflows-shared/src/modifier.ts

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,7 @@ type UserEvent = {
1313
payload: unknown;
1414
};
1515

16-
export class WorkflowIntrospectorError extends Error {
17-
name = "WorkflowIntrospectorError";
18-
}
19-
20-
export class InstanceModifier extends RpcTarget {
16+
export class WorkflowInstanceModifier extends RpcTarget {
2117
#engine: Engine;
2218
#state: DurableObjectState;
2319

@@ -52,6 +48,13 @@ export class InstanceModifier extends RpcTarget {
5248
return valueKey;
5349
}
5450

51+
#getAndIncrementCounter = async (valueKey: string, by: number) => {
52+
const counterKey = `failure-index-${valueKey}`;
53+
const next = (await this.#state.storage.get<number>(counterKey)) ?? 1;
54+
await this.#state.storage.put(counterKey, next + by);
55+
return next;
56+
};
57+
5558
async #getSleepStepDisableKey(step: StepSelector): Promise<string> {
5659
let count = 1;
5760
if (step.index) {
@@ -64,76 +67,77 @@ export class InstanceModifier extends RpcTarget {
6467

6568
async disableSleeps(steps?: StepSelector[]): Promise<void> {
6669
if (!steps) {
67-
console.log("[Modifier.disableSleeps()] Disabling all sleeps");
6870
await this.#state.storage.put("disableAllSleeps", true);
6971
} else {
7072
for (const step of steps) {
71-
console.log(
72-
"[Modifier.disableSleeps()] Disabling sleep of step:",
73-
step.name
74-
);
7573
const sleepDisableKey = await this.#getSleepStepDisableKey(step);
7674
await this.#state.storage.put(sleepDisableKey, true);
7775
}
7876
}
7977
}
8078

8179
async mockStepResult(step: StepSelector, stepResult: unknown): Promise<void> {
82-
console.log(
83-
"[Modifier.mockStepResult()] Mocking step result of step:",
84-
step.name
85-
);
8680
const valueKey = await this.#getStepCacheKey(step);
8781

8882
if (await this.#state.storage.get(`replace-result-${valueKey}`)) {
89-
throw new WorkflowIntrospectorError(
90-
"You're trying to mock the same step multiple times!"
83+
throw new Error(
84+
`[WorkflowIntrospector] Trying to mock step '${step.name}' multiple times!`
9185
);
9286
}
9387

9488
await this.#state.storage.put(`replace-result-${valueKey}`, stepResult);
9589
}
9690

97-
async mockStepImplementation(
98-
_step: StepSelector,
99-
_implementation: () => Promise<unknown>
100-
): Promise<void> {
101-
// TODO
102-
// somehow pass the implementation to context so it can race with timeout
103-
}
104-
10591
async mockStepError(
10692
step: StepSelector,
10793
error: Error,
10894
times?: number
10995
): Promise<void> {
110-
console.log(
111-
"[Modifier.mockStepError()] Mocking step error of step",
112-
step.name
113-
);
11496
const valueKey = await this.#getStepCacheKey(step);
11597
const serializableError = {
11698
name: error.name,
11799
message: error.message,
118100
};
101+
102+
if (await this.#state.storage.get(`replace-result-${valueKey}`)) {
103+
throw new Error(
104+
`[WorkflowIntrospector] Trying to mock error on step '${step.name}' after mocking its result!`
105+
);
106+
}
107+
119108
if (times) {
120-
for (let time = 1; time <= times; time++) {
121-
const mockErrorKey = `mock-error-${valueKey}-${time}`;
122-
await this.#state.storage.put(mockErrorKey, serializableError);
123-
}
109+
const start = await this.#getAndIncrementCounter(valueKey, times);
110+
const mockErrorsPuts = Array.from({ length: times }, (_, i) => {
111+
const attempt = start + i;
112+
const mockErrorKey = `mock-step-error-${valueKey}-${attempt}`;
113+
return this.#state.storage.put(mockErrorKey, serializableError);
114+
});
115+
116+
await Promise.all(mockErrorsPuts);
124117
} else {
125-
const mockErrorKey = `mock-error-${valueKey}`;
118+
const mockErrorKey = `mock-step-error-${valueKey}`;
126119
await this.#state.storage.put(mockErrorKey, serializableError);
127120
}
128121
}
129122

130123
async forceStepTimeout(step: StepSelector, times?: number) {
131124
const valueKey = await this.#getStepCacheKey(step);
125+
126+
if (await this.#state.storage.get(`replace-result-${valueKey}`)) {
127+
throw new Error(
128+
`[WorkflowIntrospector] Trying to force timeout on step '${step.name}' after mocking its result!`
129+
);
130+
}
131+
132132
if (times) {
133-
for (let time = 1; time <= times; time++) {
134-
const forceStepTimeoutKey = `force-step-timeout-${valueKey}-${time}`;
135-
await this.#state.storage.put(forceStepTimeoutKey, true);
136-
}
133+
const start = await this.#getAndIncrementCounter(valueKey, times);
134+
const forceTimeouts = Array.from({ length: times }, (_, i) => {
135+
const attempt = start + i;
136+
const forceStepTimeoutKey = `force-step-timeout-${valueKey}-${attempt}`;
137+
return this.#state.storage.put(forceStepTimeoutKey, true);
138+
});
139+
140+
await Promise.all(forceTimeouts);
137141
} else {
138142
const forceStepTimeoutKey = `force-step-timeout-${valueKey}`;
139143
await this.#state.storage.put(forceStepTimeoutKey, true);
@@ -147,12 +151,15 @@ export class InstanceModifier extends RpcTarget {
147151
type: event.type,
148152
};
149153

150-
await this.#state.storage.put(`mockEvent-${event.type}`, true);
154+
await this.#state.storage.put(`mock-event-${event.type}`, true);
151155
await this.#engine.receiveEvent(myEvent);
152156
}
153157

154158
async forceEventTimeout(step: StepSelector): Promise<void> {
155159
const waitForEventKey = await this.#getWaitForEventCacheKey(step);
156-
await this.#state.storage.put(`forceEventTimeout-${waitForEventKey}`, true);
160+
await this.#state.storage.put(
161+
`force-event-timeout-${waitForEventKey}`,
162+
true
163+
);
157164
}
158165
}

0 commit comments

Comments
 (0)