Skip to content

Commit ff97447

Browse files
committed
Allow list of sleep steps in disableSleeps()
1 parent 8bb1860 commit ff97447

File tree

2 files changed

+39
-7
lines changed

2 files changed

+39
-7
lines changed

packages/workflows-shared/src/context.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,13 @@ export class Context extends RpcTarget {
464464
const sleepLogWrittenKey = `${cacheKey}-log-written`;
465465
const maybeResult = await this.#state.storage.get(sleepKey);
466466

467-
const disableSleeps = await this.#state.storage.get("disableSleeps");
467+
const sleepNameCountHash = await computeHash(
468+
name + this.#getCount("sleep-" + name)
469+
);
470+
const disableThisSleep = await this.#state.storage.get(sleepNameCountHash);
471+
const disableAllSleeps = await this.#state.storage.get("disableAllSleeps");
472+
473+
const disableSleep = disableAllSleeps || disableThisSleep;
468474

469475
if (maybeResult != undefined) {
470476
// @ts-expect-error priorityQueue is initiated in init
@@ -474,7 +480,7 @@ export class Context extends RpcTarget {
474480
// in case the engine dies while sleeping and wakes up before the retry period
475481
if (entryPQ !== undefined) {
476482
await scheduler.wait(
477-
disableSleeps ? 0 : entryPQ.targetTimestamp - Date.now()
483+
disableSleep ? 0 : entryPQ.targetTimestamp - Date.now()
478484
);
479485
// @ts-expect-error priorityQueue is initiated in init
480486
this.#engine.priorityQueue.remove({ hash: cacheKey, type: "sleep" });
@@ -513,13 +519,12 @@ export class Context extends RpcTarget {
513519
// @ts-expect-error priorityQueue is initiated in init
514520
await this.#engine.priorityQueue.add({
515521
hash: cacheKey,
516-
targetTimestamp: Date.now() + (disableSleeps ? 0 : duration),
522+
targetTimestamp: Date.now() + (disableSleep ? 0 : duration),
517523
type: "sleep",
518524
});
519525

520-
console.log("disable sleeps:", disableSleeps);
521526
// this probably will never finish except if sleep is less than the grace period
522-
await scheduler.wait(disableSleeps ? 0 : duration);
527+
await scheduler.wait(disableSleep ? 0 : duration);
523528

524529
this.#engine.writeLog(
525530
InstanceEvent.SLEEP_COMPLETE,

packages/workflows-shared/src/modifier.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,42 @@ export class InstanceModifier extends RpcTarget {
4848
return valueKey;
4949
}
5050

51-
async disableSleeps(): Promise<void> {
52-
await this.#state.storage.put("disableSleeps", true);
51+
async #getSleepStepDisableKey(step: StepSelector): Promise<string> {
52+
let count = 1;
53+
if (step.index) {
54+
count = step.index;
55+
}
56+
const sleepNameCountHash = await computeHash(step.name + count);
57+
58+
return sleepNameCountHash;
59+
}
60+
61+
async disableSleeps(steps?: StepSelector[]): Promise<void> {
62+
if (!steps) {
63+
await this.#state.storage.put("disableAllSleeps", true);
64+
} else {
65+
for (const step of steps) {
66+
const sleepDisableKey = await this.#getSleepStepDisableKey(step);
67+
await this.#state.storage.put(sleepDisableKey, true);
68+
}
69+
}
5370
}
5471

5572
async mockStepResult(step: StepSelector, stepResult: unknown): Promise<void> {
73+
console.log("mocking step result");
5674
const valueKey = await this.#getStepCacheKey(step);
5775
await this.#state.storage.put(`replace-result-${valueKey}`, stepResult);
5876
}
5977

78+
async mockStepImplementation(
79+
_step: StepSelector,
80+
_implementation: () => Promise<unknown>
81+
): Promise<void> {
82+
// TODO
83+
// can call the new implementation here and replace the step result - meh
84+
// ideally pass the implementation to context so it can race with timeout
85+
}
86+
6087
async mockEvent(event: UserEvent): Promise<void> {
6188
// could maybe:
6289
// WorkflowInstance.sendEvent()

0 commit comments

Comments
 (0)