Skip to content

Commit ba98b59

Browse files
committed
wip
1 parent 4c603c2 commit ba98b59

File tree

3 files changed

+115
-12
lines changed

3 files changed

+115
-12
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import assert from "assert";
2+
import Inject, { Register, RegisterSingleton, ServiceProvider } from "../../../di/di.js";
3+
import WorkflowContext from "../../../workflows/WorkflowContext.js";
4+
import Workflow, { Activity } from "../../../workflows/Workflow.js";
5+
import WorkflowClock from "../../../workflows/WorkflowClock.js";
6+
import DateTime from "../../../types/DateTime.js";
7+
import { TestConfig } from "../../TestConfig.js";
8+
import { BaseDriver } from "../../../drivers/base/BaseDriver.js";
9+
import WorkflowStorage from "../../../workflows/WorkflowStorage.js";
10+
import TimeSpan from "../../../types/TimeSpan.js";
11+
import sleep from "../../../common/sleep.js";
12+
13+
class MockClock extends WorkflowClock {
14+
15+
public get utcNow(): DateTime {
16+
return this.time;
17+
}
18+
19+
public set utcNow(v: DateTime) {
20+
this.time = v;
21+
}
22+
23+
private time: DateTime = DateTime.now;
24+
25+
public add(ts: TimeSpan) {
26+
this.time = this.time.add(ts);
27+
return this;
28+
}
29+
}
30+
31+
@RegisterSingleton
32+
class Mailer {
33+
34+
public items: any[] = [];
35+
}
36+
37+
class SendWorkflow extends Workflow<string, string> {
38+
39+
public async run(): Promise<any> {
40+
41+
await this.delay(TimeSpan.fromHours(1));
42+
43+
await this.sendMail("a", "b", "c");
44+
return "1";
45+
}
46+
47+
@Activity
48+
public async sendMail(
49+
from: string,
50+
to: string,
51+
message: string,
52+
@Inject logger?: Mailer) {
53+
await sleep(10);
54+
logger.items.push({ from, to, message });
55+
}
56+
57+
}
58+
59+
export default async function (this: TestConfig) {
60+
61+
const mockClock = new MockClock();
62+
const mailer = new Mailer();
63+
64+
const scope = new ServiceProvider();
65+
scope.add(WorkflowClock, mockClock);
66+
scope.add(BaseDriver, this.driver);
67+
const storage = new WorkflowStorage(this.driver, mockClock);
68+
scope.add(Mailer, mailer);
69+
scope.add(WorkflowStorage, storage);
70+
71+
const c = new WorkflowContext(storage);
72+
scope.add(WorkflowContext, c);
73+
74+
// this is an important step
75+
c.register(SendWorkflow);
76+
77+
await storage.seed();
78+
79+
const id = await c.queue(SendWorkflow, "a", {
80+
throttle: {
81+
group: "a",
82+
deferSeconds: 15
83+
}
84+
});
85+
86+
const result = await storage.getWorkflow(id);
87+
88+
const next = await c.queue(SendWorkflow, "b", {
89+
throttle: {
90+
group: "a",
91+
deferSeconds: 15
92+
}
93+
});
94+
95+
const resultNext = await storage.getWorkflow(next);
96+
97+
assert.equal(result.id, next);
98+
99+
// throw new Error("Preserve");
100+
}

src/tests/eternity/throttle-tests.ts renamed to src/tests/eternity/throttle/throttle.test.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import assert from "assert";
2-
import Inject, { Register, RegisterSingleton, ServiceProvider } from "../../di/di.js";
3-
import WorkflowContext from "../../workflows/WorkflowContext.js";
4-
import Workflow, { Activity } from "../../workflows/Workflow.js";
5-
import WorkflowClock from "../../workflows/WorkflowClock.js";
6-
import DateTime from "../../types/DateTime.js";
7-
import { TestConfig } from "../TestConfig.js";
8-
import { BaseDriver } from "../../drivers/base/BaseDriver.js";
9-
import WorkflowStorage from "../../workflows/WorkflowStorage.js";
10-
import TimeSpan from "../../types/TimeSpan.js";
11-
import sleep from "../../common/sleep.js";
2+
import Inject, { Register, RegisterSingleton, ServiceProvider } from "../../../di/di.js";
3+
import WorkflowContext from "../../../workflows/WorkflowContext.js";
4+
import Workflow, { Activity } from "../../../workflows/Workflow.js";
5+
import WorkflowClock from "../../../workflows/WorkflowClock.js";
6+
import DateTime from "../../../types/DateTime.js";
7+
import { TestConfig } from "../../TestConfig.js";
8+
import { BaseDriver } from "../../../drivers/base/BaseDriver.js";
9+
import WorkflowStorage from "../../../workflows/WorkflowStorage.js";
10+
import TimeSpan from "../../../types/TimeSpan.js";
11+
import sleep from "../../../common/sleep.js";
1212

1313
class MockClock extends WorkflowClock {
1414

src/workflows/WorkflowStorage.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,18 @@ export default class WorkflowStorage {
5353
}
5454

5555
async getLastEta(throttle: IWorkflowThrottleGroup) {
56+
const now = this.clock.utcNow;
5657
const db = new WorkflowDbContext(this.driver);
57-
const w = await db.workflows.where(throttle, (p) => (x) => x.throttleGroup === p.group
58+
const { group, deferSeconds } = throttle;
59+
const w = await db.workflows.where({ group, now }, (p) => (x) => x.throttleGroup === p.group
5860
&& x.state !== "failed"
5961
&& x.state !== "done"
62+
&& x.eta >= Sql.cast.asDateTime(p.now)
6063
)
6164
.orderByDescending(void 0, (p) => (x) => x.queued)
6265
.first();
6366
if (w) {
64-
w.eta = DateTime.from(w.eta).addSeconds(throttle.deferSeconds);
67+
w.eta = DateTime.from(w.eta).addSeconds(deferSeconds);
6568
await db.workflows.statements.update({ eta: w.eta }, { id: w.id });
6669
}
6770
return w;

0 commit comments

Comments
 (0)