Skip to content

Commit b17f417

Browse files
committed
feat(realtime): add queue for connect and disconnect queue
Signed-off-by: BoHong Li <[email protected]>
1 parent ff72d99 commit b17f417

File tree

3 files changed

+209
-0
lines changed

3 files changed

+209
-0
lines changed

lib/connectionQueue.js

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
'use strict'
2+
3+
const EventEmitter = require('events').EventEmitter
4+
5+
/**
6+
* Queuing Class for connection queuing
7+
*/
8+
9+
const ConnectionQueueEvent = {
10+
Tick: 'Tick'
11+
}
12+
13+
class ConnectionQueue extends EventEmitter {
14+
constructor (maximumLength, triggerTimeInterval = 10) {
15+
super()
16+
this.max = maximumLength
17+
this.triggerTime = triggerTimeInterval
18+
this.queue = []
19+
this.lock = false
20+
21+
this.on(ConnectionQueueEvent.Tick, () => {
22+
if (this.lock) return
23+
setImmediate(() => {
24+
this.process()
25+
})
26+
})
27+
}
28+
29+
start () {
30+
if (this.eventTrigger) return
31+
this.eventTrigger = setInterval(() => {
32+
this.emit(ConnectionQueueEvent.Tick)
33+
}, this.triggerTime)
34+
}
35+
36+
stop () {
37+
if (this.eventTrigger) {
38+
clearInterval(this.eventTrigger)
39+
this.eventTrigger = null
40+
}
41+
}
42+
43+
/**
44+
* push a promisify-task to queue
45+
* @param task {Promise}
46+
* @returns {boolean} if success return true, otherwise flase
47+
*/
48+
push (task) {
49+
if (this.queue.length >= this.max) return false
50+
this.queue.push(task)
51+
this.start()
52+
return true
53+
}
54+
55+
process () {
56+
if (this.lock) return
57+
this.lock = true
58+
if (this.queue.length <= 0) {
59+
this.stop()
60+
this.lock = false
61+
return
62+
}
63+
const task = this.queue.shift()
64+
65+
const finishTask = () => {
66+
this.lock = false
67+
setImmediate(() => {
68+
this.process()
69+
})
70+
}
71+
task().then(finishTask).catch(finishTask)
72+
}
73+
}
74+
75+
exports.ConnectionQueue = ConnectionQueue

test/connectionQueue.test.js

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/* eslint-env node, mocha */
2+
'use strict'
3+
4+
const assert = require('assert')
5+
const sinon = require('sinon')
6+
7+
const ConnectionQueuing = require('../lib/connectionQueue').ConnectionQueue
8+
9+
describe('ConnectionQueue', function () {
10+
let clock
11+
12+
beforeEach(() => {
13+
clock = sinon.useFakeTimers({
14+
toFake: ['setInterval']
15+
})
16+
})
17+
18+
afterEach(() => {
19+
clock.restore()
20+
sinon.restore()
21+
})
22+
23+
it('should not accept more than maximum task', () => {
24+
const queue = new ConnectionQueuing(2)
25+
const task = async () => {
26+
}
27+
28+
queue.start()
29+
assert(queue.push(task))
30+
assert(queue.push(task))
31+
assert(queue.push(task) === false)
32+
})
33+
34+
it('should run task every interval', (done) => {
35+
const runningClock = []
36+
const queue = new ConnectionQueuing(2)
37+
const task = async () => {
38+
runningClock.push(clock.now)
39+
}
40+
queue.start()
41+
assert(queue.push(task))
42+
assert(queue.push(task))
43+
clock.tick(5)
44+
setTimeout(() => {
45+
clock.tick(5)
46+
}, 1)
47+
setTimeout(() => {
48+
clock.tick(5)
49+
}, 2)
50+
setTimeout(() => {
51+
clock.tick(5)
52+
}, 3)
53+
queue.stop()
54+
55+
setTimeout(() => {
56+
assert(runningClock.length === 2)
57+
done()
58+
}, 10)
59+
})
60+
61+
it('should not crash when repeat stop queue', () => {
62+
const queue = new ConnectionQueuing(2, 10)
63+
try {
64+
queue.stop()
65+
queue.stop()
66+
queue.stop()
67+
assert.ok(true)
68+
} catch (e) {
69+
assert.fail(e)
70+
}
71+
})
72+
73+
it('should run process when queue is empty', (done) => {
74+
const queue = new ConnectionQueuing(2, 100)
75+
const processSpy = sinon.spy(queue, 'process')
76+
queue.start()
77+
clock.tick(100)
78+
setTimeout(() => {
79+
assert(processSpy.called)
80+
done()
81+
}, 1)
82+
})
83+
84+
it('should run process although error occurred', (done) => {
85+
const queue = new ConnectionQueuing(2, 100)
86+
const failedTask = sinon.spy(async () => {
87+
throw new Error('error')
88+
})
89+
const normalTask = sinon.spy(async () => {
90+
})
91+
queue.start()
92+
assert(queue.push(failedTask))
93+
assert(queue.push(normalTask))
94+
clock.tick(100)
95+
setTimeout(() => {
96+
clock.tick(100)
97+
}, 1)
98+
setTimeout(() => {
99+
// assert(queue.queue.length === 0)
100+
assert(failedTask.called)
101+
assert(normalTask.called)
102+
done()
103+
}, 5)
104+
})
105+
106+
it('should ignore trigger when event not complete', (done) => {
107+
const queue = new ConnectionQueuing(2, 10)
108+
const processSpy = sinon.spy(queue, 'process')
109+
const longTask = async () => {
110+
return new Promise((resolve) => {
111+
setInterval(() => {
112+
resolve()
113+
}, 50)
114+
})
115+
}
116+
queue.start()
117+
queue.push(longTask)
118+
clock.tick(10)
119+
setTimeout(() => {
120+
clock.tick(10)
121+
}, 0)
122+
setTimeout(() => {
123+
clock.tick(10)
124+
}, 1)
125+
setTimeout(() => {
126+
assert(processSpy.calledOnce)
127+
done()
128+
}, 2)
129+
})
130+
})

test/realtime.test.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,5 +701,9 @@ describe('realtime', function () {
701701
})
702702
})
703703

704+
describe('permission', function () {
705+
706+
})
707+
704708
})
705709
})

0 commit comments

Comments
 (0)