Skip to content

Commit 79458c6

Browse files
committed
added defer in throttle
1 parent a1bfe52 commit 79458c6

File tree

3 files changed

+37
-12
lines changed

3 files changed

+37
-12
lines changed

src/workflows/Workflow.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { IClassOf } from "../decorators/IClassOf.js";
2-
import Inject from "../di/di.js";
32
import DateTime from "../types/DateTime.js";
43
import TimeSpan from "../types/TimeSpan.js";
54
import { ActivitySuspendedError } from "./ActivitySuspendedError.js";
6-
import WorkflowContext, { IWorkflowThrottleGroup } from "./WorkflowContext.js";
5+
import type WorkflowContext from "./WorkflowContext.js";
76
import { WorkflowRegistry } from "./WorkflowRegistry.js";
7+
import type { IWorkflowThrottleGroup } from "./WorkflowStorage.js";
88

99

1010
export function Activity(target, key) {

src/workflows/WorkflowContext.ts

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import TimeSpan from "../types/TimeSpan.js";
1212
import sleep from "../common/sleep.js";
1313
import Waiter from "./Waiter.js";
1414
import { WorkflowItem } from "./WorkflowDbContext.js";
15-
import WorkflowStorage from "./WorkflowStorage.js";
15+
import WorkflowStorage, { IWorkflowThrottleGroup } from "./WorkflowStorage.js";
1616

1717
export const runChildSymbol = Symbol("runChild");
1818

@@ -142,11 +142,6 @@ export interface IWorkflowResult<T> {
142142
queued?: DateTime;
143143
}
144144

145-
export interface IWorkflowThrottleGroup {
146-
group: string;
147-
maxPerSecond: number;
148-
}
149-
150145
export interface IWorkflowQueueParameter {
151146
id?: string;
152147
throwIfExists?: boolean;
@@ -232,9 +227,17 @@ export default class WorkflowContext {
232227
let throttleGroup = null;
233228

234229
if (throttle) {
235-
eta = await this.storage.getNextEta(throttle);
236-
queued = eta;
237-
throttleGroup = throttle.group;
230+
const { deferSeconds } = throttle;
231+
if (deferSeconds) {
232+
const lw = await this.storage.getLastEta(throttle);
233+
if (lw) {
234+
return lw.id;
235+
}
236+
} else {
237+
eta = await this.storage.getNextEta(throttle);
238+
queued = eta;
239+
throttleGroup = throttle.group;
240+
}
238241
}
239242

240243
// this will ensure even empty workflow !!

src/workflows/WorkflowStorage.ts

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@ import { loadedFromDb, WorkflowDbContext, WorkflowItem } from "./WorkflowDbConte
1010
import WorkflowTask from "./WorkflowTask.js";
1111
import Sql from "../sql/Sql.js";
1212

13+
export type IWorkflowThrottleGroup = {
14+
group: string;
15+
deferSeconds?: number;
16+
maxPerSecond?: never;
17+
} | {
18+
group: string;
19+
deferSeconds?: never;
20+
maxPerSecond?: number;
21+
};
1322

1423
@RegisterSingleton
1524
export default class WorkflowStorage {
@@ -36,9 +45,22 @@ export default class WorkflowStorage {
3645
return q.count();
3746
}
3847

39-
async getNextEta(throttle: { group: string, maxPerSecond: number }) {
48+
async getLastEta(throttle: IWorkflowThrottleGroup) {
49+
const db = new WorkflowDbContext(this.driver);
50+
const w = await db.workflows.where(throttle, (p) => (x) => x.throttleGroup === p.group
51+
&& x.state !== "failed"
52+
&& x.state !== "done"
53+
)
54+
.orderByDescending(void 0, (p) => (x) => x.queued)
55+
.first();
56+
return w;
57+
58+
}
59+
60+
async getNextEta(throttle: IWorkflowThrottleGroup) {
4061

4162
const db = new WorkflowDbContext(this.driver);
63+
4264
const last = await db.workflows.where(throttle, (p) => (x) => x.throttleGroup === p.group
4365
&& x.isWorkflow === true)
4466
.orderByDescending(void 0, (p) => (x) => x.queued)

0 commit comments

Comments
 (0)