Skip to content

Commit 2585162

Browse files
committed
Add rate limiting and throttling capabilities. Fixes #1.
1 parent 5365a30 commit 2585162

File tree

7 files changed

+461
-7
lines changed

7 files changed

+461
-7
lines changed

package.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@ngnjs/queue",
3-
"version": "1.0.0-alpha.10",
3+
"version": "1.0.0-alpha.11",
44
"description": "A lightweight NGN queue (taskrunner).",
55
"type": "module",
66
"main": "src/index.js",
@@ -35,6 +35,12 @@
3535
"test:deno": "dev test -rt deno tests/*.js",
3636
"test:browser": "dev test -rt browser tests/*.js",
3737
"manually": "dev test -rt manual tests/*.js",
38+
"test:browser:sanity": "dev test -rt browser tests/*-sanity.js",
39+
"test:deno:sanity": "dev test -rt deno tests/*-sanity.js",
40+
"test:node:sanity": "dev test -rt node tests/*-sanity.js",
41+
"test:browser:throttle": "dev test -rt browser tests/*-throttling.js",
42+
"test:deno:throttle": "dev test -rt deno tests/*-throttling.js",
43+
"test:node:throttle": "dev test -rt node tests/*-throttling.js",
3844
"report:syntax": "dev report syntax --pretty",
3945
"report:size": "dev report size ./.dist/**/*.js ./.dist/**/*.js.map",
4046
"report:compat": "dev report compatibility ./src/**/*.js",

src/queue.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ export default class Queue extends NGN.EventEmitter {
4848
}
4949

5050
/**
51-
* Remove an queue item.
51+
* Remove a queue item.
5252
* @param {Item[]} Item
53-
* Any number of queue items (or queue item indexes) can be added.
53+
* Any number of queue items (or queue item indexes) can be removed.
5454
*/
5555
remove () {
5656
for (const item of arguments) {

src/ratelimiter.js

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import Reference from '@ngnjs/plugin'
2+
import Throttle from './throttle.js'
3+
4+
const NGN = new Reference()
5+
NGN.requires('EventEmitter', 'Middleware')
6+
const { EventEmitter, Middleware } = NGN
7+
8+
const PERFORMANCE = async () => {
9+
return globalThis.process !== undefined
10+
? (await import('perf_hooks')).performance
11+
: globalThis.performance
12+
}
13+
14+
export default class RateLimiter extends EventEmitter {
15+
#max
16+
#duration
17+
#queue
18+
#maxconcurrent
19+
20+
#runBatch = async (batch, sequential) => {
21+
return new Promise(resolve => {
22+
if (!sequential) {
23+
if (!isNaN(this.#maxconcurrent) && this.#maxconcurrent > 0) {
24+
const limiter = new Throttle(this.#maxconcurrent, batch)
25+
limiter.on('end', resolve)
26+
limiter.run()
27+
return
28+
}
29+
30+
this.afterOnce('task.done', batch.length, resolve)
31+
32+
// Parallel processing
33+
for (const task of batch) {
34+
task.once('done', () => this.emit('task.done', task))
35+
task.run()
36+
}
37+
} else {
38+
// Sequential processing
39+
const process = new Middleware()
40+
for (const task of batch) {
41+
process.use(next => {
42+
this.emit('activetask', task)
43+
task.once('done', () => {
44+
this.emit('task.done', task)
45+
next()
46+
})
47+
task.run()
48+
})
49+
}
50+
51+
process.run(resolve)
52+
}
53+
})
54+
}
55+
56+
constructor (max, duration, tasks) {
57+
super()
58+
this.#max = max
59+
this.#duration = duration
60+
this.#queue = tasks
61+
}
62+
63+
set maxConcurrent (value) {
64+
this.#maxconcurrent = value
65+
}
66+
67+
get maxConcurrent () {
68+
return this.#maxconcurrent
69+
}
70+
71+
async run (sequential = false) {
72+
if (this.#queue.length === 0) {
73+
return this.emit('done')
74+
}
75+
76+
const performance = await PERFORMANCE()
77+
78+
// Batch the queue tasks
79+
const batch = []
80+
while (this.#queue.length > 0) {
81+
batch.push(this.#queue.splice(0, this.#max))
82+
}
83+
84+
// Begin processing
85+
const process = new Middleware()
86+
while (batch.length > 0) {
87+
const tasks = batch.shift()
88+
89+
process.use(async next => {
90+
const start = performance.now()
91+
92+
this.emit('batch.start', {
93+
id: batch.length + 1,
94+
start,
95+
tasks: tasks.length
96+
})
97+
98+
await this.#runBatch(tasks, sequential)
99+
100+
let mark = performance.now()
101+
const data = { endProcessing: mark }
102+
103+
while ((mark - start) < this.#duration) {
104+
mark = performance.now()
105+
}
106+
107+
this.emit('batch.end', Object.assign(data, {
108+
id: batch.length + 1,
109+
start,
110+
end: mark,
111+
duration: Math.ceil(mark - start),
112+
processingTime: data.endProcessing - start,
113+
tasks: tasks.length
114+
}))
115+
116+
next()
117+
})
118+
}
119+
120+
process.run(() => this.emit('done'))
121+
}
122+
}

src/runner.js

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import Reference from '@ngnjs/plugin'
22
import Processor from './queue.js'
33
import Item from './item.js'
4+
import RateLimiter from './ratelimiter.js'
5+
import Throttle from './throttle.js'
46

57
const NGN = new Reference().requires('EventEmitter', 'Middleware', 'WARN', 'INFO', 'ERROR')
68
const { WARN, ERROR, EventEmitter, Middleware } = NGN
@@ -12,10 +14,21 @@ export default class Queue extends EventEmitter {
1214
#status = 'pending'
1315
#timer
1416
#timeout = 0
17+
#ratelimit = null
18+
#maxconcurrent
19+
#progress = { completed: 0, total: 0 }
1520

1621
constructor (cfg = {}) {
1722
super(...arguments)
1823

24+
if (cfg.rateLimit) {
25+
this.rate = cfg.rateLimit
26+
}
27+
28+
if (cfg.maxConcurrent) {
29+
this.maxConcurrent = cfg.maxConcurrent
30+
}
31+
1932
// Create a task queue
2033
this.#queue = new Processor({
2134
name: this.name,
@@ -98,6 +111,116 @@ export default class Queue extends EventEmitter {
98111
return this.#status
99112
}
100113

114+
/**
115+
* Rate limiting restricts the maximum number of tasks
116+
* which run within the specified duration. For example,
117+
* a max of 100 tasks per 60 seconds (60K milliseconds)
118+
* will will take 10 minutes to process 1000 tasks
119+
* (1000 tasks/100 tasks/minute = 10 minutes).
120+
* @param {numeric} max
121+
* @param {numeric} duration
122+
*/
123+
rateLimit (max, duration) {
124+
if (isNaN(max) || max <= 0) {
125+
this.#ratelimit = null
126+
return
127+
}
128+
129+
if (isNaN(max) || isNaN(duration)) {
130+
throw new Error('rate limiting requires a count and duration (milliseconds)')
131+
}
132+
133+
this.#ratelimit = [max, duration]
134+
}
135+
136+
/**
137+
* Remove rate limiting (if set).
138+
*/
139+
removeRateLimit () {
140+
this.#ratelimit = null
141+
}
142+
143+
/**
144+
* @param {Array} rate
145+
* A shortcut attribute for setting the rate limit.
146+
* This must be a 2 element array (ex: `[100, 60000]`)
147+
* or `null` (to remove rate).
148+
*/
149+
set rate (value) {
150+
if (!Array.isArray(value) || value.length !== 2) {
151+
throw new Error('rate attribute only accepts an array with 2 numeric elements')
152+
}
153+
154+
this.rateLimit(...value)
155+
}
156+
157+
get rate () {
158+
return this.#ratelimit
159+
}
160+
161+
/**
162+
* @param {numeric} maxConcurrent
163+
* The maximum concurrent number of tasks allowed to
164+
* operate at the same time. Set this to `null` or any
165+
* value `<= 0` to remove concurrency limits.
166+
*/
167+
set maxConcurrent (value) {
168+
if (value === null) {
169+
this.#maxconcurrent = null
170+
return
171+
}
172+
173+
if (isNaN(value)) {
174+
throw new Error('maxConcurrent must be a number or null.')
175+
}
176+
177+
if (value < 0) {
178+
this.#maxconcurrent = null
179+
return
180+
}
181+
182+
this.#maxconcurrent = value
183+
}
184+
185+
get maxConcurrent () {
186+
return this.#maxconcurrent
187+
}
188+
189+
/**
190+
* @typedef plan
191+
* The expected operation plan.
192+
* @param {numeric} tasksRemaining
193+
* The number of tasks which still need to be processed
194+
* @param {numeric} tasksCompleted
195+
* The number of tasks which have been completed
196+
* @param {numeric} minimumDuration
197+
* The minimum duration (in milliseconds) the queue will take to complete.
198+
* This is calculated using the rate limit.
199+
*/
200+
201+
/**
202+
* @param {object} plan
203+
* Returns a plan object.
204+
*/
205+
get plan () {
206+
if (this.#progress.total === 0 && this.#queue.items.length > 0) {
207+
this.#progress.total = this.#queue.items.length
208+
}
209+
210+
const plan = {
211+
tasksRemaining: this.#progress.total - this.#progress.completed,
212+
tasksCompleted: this.#progress.completed
213+
}
214+
215+
if (Array.isArray(this.#ratelimit)) {
216+
plan.minimumDuration = (this.#progress.total / this.#ratelimit[0]) * this.#ratelimit[1]
217+
} else {
218+
plan.minimumDuration = null
219+
}
220+
221+
return Object.freeze(plan)
222+
}
223+
101224
reset () {
102225
this.#queue.afterOnce('reset', this.#queue.size, () => {
103226
this.#cancelled = false
@@ -252,6 +375,8 @@ export default class Queue extends EventEmitter {
252375
return
253376
}
254377

378+
this.#progress = { completed: 0, total: 0 }
379+
255380
// Immediately "complete" when the queue is empty.
256381
if (this.#queue.size === 0) {
257382
this._status = 'pending'
@@ -261,6 +386,8 @@ export default class Queue extends EventEmitter {
261386

262387
// Update the status
263388
this.#processing = true
389+
this.#progress.total = this.#queue.items.length
390+
this.#progress.completed = 0
264391
this._status = 'running'
265392

266393
// Add a timer
@@ -269,12 +396,44 @@ export default class Queue extends EventEmitter {
269396
this.#timer = setTimeout(() => this.abort(true, activeItem), this.#timeout)
270397
}
271398

399+
const isConcurrencyLimited = !isNaN(this.#maxconcurrent) && this.#maxconcurrent > 0
400+
const isRateLimited = this.#ratelimit !== null
401+
const isLimited = isConcurrencyLimited || isRateLimited
402+
403+
if (isLimited) {
404+
if (isRateLimited) {
405+
const limiter = new RateLimiter(...this.#ratelimit, this.#queue.items)
406+
407+
if (isConcurrencyLimited) {
408+
limiter.maxConcurrent = this.#maxconcurrent
409+
}
410+
411+
limiter.on('activetask', item => { activeItem = item })
412+
limiter.on('task.done', task => { this.#progress.completed += 1 })
413+
limiter.relay('batch.*', this, 'limited')
414+
limiter.on('done', () => this.emit('end'))
415+
416+
;(async () => {
417+
await limiter.run(sequential)
418+
})()
419+
420+
return
421+
} else if (!sequential) {
422+
const limiter = new Throttle(this.#maxconcurrent, this.#queue.items)
423+
limiter.on('task.done', task => { this.#progress.completed += 1 })
424+
limiter.on('done', () => this.emit('end'))
425+
limiter.run()
426+
return
427+
}
428+
}
429+
272430
if (!sequential) {
273431
this.afterOnce('task.done', this.size, 'end')
274432

275433
// Run in parallel
276434
// const TOKEN = Symbol('queue runner')
277435
for (const task of this.#queue.items) {
436+
task.once('done', () => { this.#progress.completed += 1 })
278437
task.run()
279438
}
280439
} else {
@@ -283,7 +442,10 @@ export default class Queue extends EventEmitter {
283442
for (const task of this.#queue.items) {
284443
process.use(next => {
285444
activeItem = task
286-
task.once('done', next)
445+
task.once('done', () => {
446+
this.#progress.completed += 1
447+
next()
448+
})
287449
task.run()
288450
})
289451
}

0 commit comments

Comments
 (0)