Skip to content

Commit 17e82c1

Browse files
committed
refactor(realtime): connection flow to queue
Signed-off-by: BoHong Li <[email protected]>
1 parent 0b03b8e commit 17e82c1

File tree

9 files changed

+550
-365
lines changed

9 files changed

+550
-365
lines changed

app.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,9 @@ function handleTermSignals () {
299299
})
300300
}
301301
}, 100)
302+
setTimeout(() => {
303+
process.exit(1)
304+
}, 5000)
302305
}
303306
process.on('SIGINT', handleTermSignals)
304307
process.on('SIGTERM', handleTermSignals)

lib/processQueue.js

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,41 @@ const EventEmitter = require('events').EventEmitter
77
*/
88

99
const QueueEvent = {
10-
Tick: 'Tick'
10+
Tick: 'Tick',
11+
Push: 'Push',
12+
Finish: 'Finish'
1113
}
1214

1315
class ProcessQueue extends EventEmitter {
14-
constructor (maximumLength, triggerTimeInterval = 10) {
16+
constructor ({
17+
maximumLength = 500,
18+
triggerTimeInterval = 5000,
19+
// execute on push
20+
proactiveMode = true,
21+
// execute next work on finish
22+
continuousMode = true
23+
}) {
1524
super()
1625
this.max = maximumLength
1726
this.triggerTime = triggerTimeInterval
1827
this.taskMap = new Map()
1928
this.queue = []
2029
this.lock = false
2130

22-
this.on(QueueEvent.Tick, () => {
23-
if (this.lock) return
24-
this.lock = true
25-
setImmediate(() => {
26-
this.process()
27-
})
31+
this.on(QueueEvent.Tick, this.onEventProcessFunc.bind(this))
32+
if (proactiveMode) {
33+
this.on(QueueEvent.Push, this.onEventProcessFunc.bind(this))
34+
}
35+
if (continuousMode) {
36+
this.on(QueueEvent.Finish, this.onEventProcessFunc.bind(this))
37+
}
38+
}
39+
40+
onEventProcessFunc () {
41+
if (this.lock) return
42+
this.lock = true
43+
setImmediate(() => {
44+
this.process()
2845
})
2946
}
3047

@@ -62,7 +79,7 @@ class ProcessQueue extends EventEmitter {
6279
this.taskMap.set(id, true)
6380
this.queue.push(task)
6481
this.start()
65-
this.emit(QueueEvent.Tick)
82+
this.emit(QueueEvent.Push)
6683
return true
6784
}
6885

@@ -79,7 +96,7 @@ class ProcessQueue extends EventEmitter {
7996
const finishTask = () => {
8097
this.lock = false
8198
setImmediate(() => {
82-
this.emit(QueueEvent.Tick)
99+
this.emit(QueueEvent.Finish)
83100
})
84101
}
85102
task.processingFunc().then(finishTask).catch(finishTask)

0 commit comments

Comments
 (0)