Skip to content

Commit cee127b

Browse files
committed
add: use denque as Deque backend to implement a rate limiter for reloadDb
1 parent 7f53609 commit cee127b

File tree

5 files changed

+119
-23
lines changed

5 files changed

+119
-23
lines changed

package-lock.json

Lines changed: 11 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
},
3636
"dependencies": {
3737
"chai": "^4.3.10",
38+
"denque": "^2.1.0",
3839
"html-react-parser": "^4.2.7",
3940
"react-dom": "^18.2.0"
4041
},

src/data-model/OdaPmDb.ts

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import {OdaProjectTree} from "./OdaProjectTree";
2626
import {assertDatabase} from "../test_runtime";
2727
import {ModuleId_Unclassified, OdaPmModule} from "./OdaPmModule";
2828
import OdaPmToolPlugin from "../main";
29-
import {TimeChecker} from "../utils/TimeChecker";
29+
30+
import {RateLimiter} from "../utils/RateLimiter";
3031

3132
const dv = getAPI(); // We can use dv just like the examples in the docs
3233

@@ -234,11 +235,7 @@ export class OdaPmDb implements I_EvtListener {
234235
private plugin: OdaPmToolPlugin;
235236
// region rate limit
236237
// refresh db at least once 3 seconds
237-
timer: TimeChecker = new TimeChecker(3000);
238-
// if the db is queuedReload to reload during the rate limitation.
239-
// True only when reloadDb is requested but rate is still limited.
240-
queuedReload = false;
241-
timerId: NodeJS.Timeout;
238+
rateLimiter: RateLimiter = new RateLimiter(3, 1 / 3, 3)
242239

243240
// endregion
244241

@@ -254,16 +251,11 @@ export class OdaPmDb implements I_EvtListener {
254251
this.emitter.on(DataviewMetadataChangeEvent, this.boundReloadWorkflows)
255252
this.emitter.on(Evt_ReqDbReload, this.boundReloadWorkflows)
256253

257-
this.timerId = setTimeout(() => {
258-
if (this.queuedReload)
259-
this.reloadDb();
260-
}, 2000)
261254
}
262255

263256
rmListener(): void {
264257
this.emitter.off(DataviewMetadataChangeEvent, this.boundReloadWorkflows)
265258
this.emitter.off(Evt_ReqDbReload, this.boundReloadWorkflows)
266-
clearTimeout(this.timerId)
267259
}
268260

269261
// region Init
@@ -272,16 +264,13 @@ export class OdaPmDb implements I_EvtListener {
272264
devLog("[Event] dataviewReady is false. reloadDb canceled.")
273265
return;
274266
}
275-
this.queuedReload = true;
276-
if (!this.timer.isOver()) {
277-
devLog(`[Event] reloadDb canceled because timer is not ready: ${this.timer.elapsed()} < ${this.timer.thresMs}`);
267+
if (!this.rateLimiter.addRequest("rate")) {
268+
269+
devLog(`[Event] reloadDb canceled because rateLimiter is not ready`);
278270
return;
279271
}
280272

281-
//#ifdef DEVELOPMENT_BUILD
282-
this.timer.reset();
283273
devTime("[Event] [Timed] reloadDb")
284-
//#endif
285274
// cache for faster calc
286275
devTime("[Event] [Timed] getAllFiles")
287276
const allFiles = getAllFiles();
@@ -323,10 +312,7 @@ export class OdaPmDb implements I_EvtListener {
323312
this.emit(Evt_DbReloaded)
324313
assertDatabase(this);
325314
devLog("Database Reloaded.")
326-
//#ifdef DEVELOPMENT_BUILD
327315
devTimeEnd("[Event] [Timed] reloadDb")
328-
//#endif
329-
this.queuedReload = false;
330316
}
331317

332318

src/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ export default class OdaPmToolPlugin extends Plugin {
379379
}));
380380
// @ts-ignore
381381
this.registerEvent(this.app.metadataCache.on(DataviewMetadataChangeEvent, (...args) => {
382-
devLog(`[Event] DataviewMetadataChangeEvent: ${DataviewMetadataChangeEvent} Triggered. args:`, args)
382+
// devLog(`[Event] DataviewMetadataChangeEvent: ${DataviewMetadataChangeEvent} Triggered. args:`, args)
383383
this.emitter.emit(DataviewMetadataChangeEvent, ...args);
384384
}));
385385
// Don't use DataviewIndexReadyEvent, because it is fired when the full index is processed.

src/utils/RateLimiter.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import Denque from "denque";
2+
import {devLog} from "./env-util";
3+
4+
class TimedItem<T> {
5+
timestamp: number;
6+
item: T;
7+
}
8+
9+
10+
/**
11+
* Rate limiter for burst requests.
12+
* We may face the problem of too many requests in a short time, in which case we can lower the allowReqRateThres.
13+
* If req freq > allowReqRateThres, we do not respond it at all.
14+
* Instead, when a new request comes, we update the allowReqRateThres.
15+
*/
16+
export class RateLimiter {
17+
requestedQueue: Denque<TimedItem<any>>
18+
respondedQueue: Denque<TimedItem<any>>
19+
20+
maxRate = 3; // 3 times/s
21+
minRate: number = 1 / 3; //1 time/3s
22+
window = 1; // in seconds
23+
// if req freq is too much we do not send response. So we respond only when reqFreq <= allowReqRateThres.
24+
allowReqRateThres: number
25+
26+
// if req freq > adjustReqRateThres, lower the allowReqRateThres
27+
adjustReqRateThres = 3;
28+
29+
constructor(maxRate: number, minRate: number, window: number) {
30+
this.maxRate = maxRate;
31+
this.minRate = minRate;
32+
this.allowReqRateThres = maxRate;
33+
this.window = window;
34+
this.requestedQueue = new Denque<TimedItem<any>>();
35+
this.respondedQueue = new Denque<TimedItem<any>>();
36+
}
37+
38+
// returns: can this request be responded?
39+
public addRequest(item: any): boolean {
40+
const timedItem = new TimedItem<any>();
41+
timedItem.item = item;
42+
timedItem.timestamp = Date.now();
43+
this.requestedQueue.push(timedItem)
44+
devLog(`[Rate] ReqQueue push: `, item)
45+
const reqFreq = this.calcFreq(this.window, this.requestedQueue);
46+
// const respFreq = this.calcFreq(this.window, this.respondedQueue);
47+
const allow = reqFreq <= this.allowReqRateThres;
48+
this.cleanQueue();
49+
if (allow) {
50+
this.respondedQueue.push(timedItem)
51+
}
52+
this.adjustThres(reqFreq)
53+
return allow;
54+
}
55+
56+
private cleanQueue() {
57+
const now = Date.now();
58+
// front is the oldest event
59+
//#ifdef DEVELOPMENT_BUILD
60+
const oldReqQueueSize = this.requestedQueue.length;
61+
const oldRespQueueSize = this.respondedQueue.length;
62+
//#endif
63+
while (this.requestedQueue.peekFront() && this.requestedQueue.peekFront()!.timestamp < now - this.window * 1000) {
64+
this.requestedQueue.shift();
65+
}
66+
while (this.respondedQueue.peekFront() && this.respondedQueue.peekFront()!.timestamp < now - this.window * 1000) {
67+
this.respondedQueue.shift();
68+
}
69+
//#ifdef DEVELOPMENT_BUILD
70+
console.log(`[Rate] reqQueue: ${this.requestedQueue.length}, respQueue: ${this.respondedQueue.length}, oldReqQueueSize: ${oldReqQueueSize}, oldRespQueueSize: ${oldRespQueueSize}`);
71+
//#endif
72+
}
73+
74+
private calcFreq(window: number, q: Denque<TimedItem<any>>): number {
75+
// get the frequency within the window
76+
let freqCount = 0;
77+
const now = Date.now();
78+
for (let i = 0; i < q.length; i++) {
79+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
80+
const timedItem = q.peekAt(i)!;
81+
if (timedItem.timestamp > now - window * 1000) {
82+
freqCount++;
83+
}
84+
}
85+
86+
const curRate = freqCount / window;
87+
return curRate;
88+
}
89+
90+
private adjustThres(reqFreq: number) {
91+
// if too many requests, lower the allowReqRateThres
92+
const originResThres = this.allowReqRateThres;
93+
if (reqFreq > this.adjustReqRateThres) {
94+
this.allowReqRateThres = Math.max(this.allowReqRateThres / 2, this.minRate)
95+
} else {
96+
this.allowReqRateThres = Math.min(this.allowReqRateThres * 2, this.maxRate);
97+
}
98+
devLog(`[Rate] reqFreq: ${reqFreq}, respRateThres: ${originResThres} -> ${this.allowReqRateThres}`)
99+
}
100+
}

0 commit comments

Comments
 (0)