Skip to content

Commit d8b18ee

Browse files
committed
refactor(realtime): disconnect flow
1. use queue for queueing disconnect request Signed-off-by: BoHong Li <[email protected]>
1 parent f892c68 commit d8b18ee

File tree

7 files changed

+203
-108
lines changed

7 files changed

+203
-108
lines changed

app.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ process.on('uncaughtException', function (err) {
278278
function handleTermSignals () {
279279
logger.info('CodiMD has been killed by signal, try to exit gracefully...')
280280
realtime.maintenance = true
281+
realtime.terminate()
281282
// disconnect all socket.io clients
282283
Object.keys(io.sockets.sockets).forEach(function (key) {
283284
var socket = io.sockets.sockets[key]

lib/connectionQueue.js renamed to lib/processQueue.js

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,22 @@ const EventEmitter = require('events').EventEmitter
66
* Queuing Class for connection queuing
77
*/
88

9-
const ConnectionQueueEvent = {
9+
const QueueEvent = {
1010
Tick: 'Tick'
1111
}
1212

13-
class ConnectionQueue extends EventEmitter {
13+
class ProcessQueue extends EventEmitter {
1414
constructor (maximumLength, triggerTimeInterval = 10) {
1515
super()
1616
this.max = maximumLength
1717
this.triggerTime = triggerTimeInterval
18+
this.taskMap = new Map()
1819
this.queue = []
1920
this.lock = false
2021

21-
this.on(ConnectionQueueEvent.Tick, () => {
22+
this.on(QueueEvent.Tick, () => {
2223
if (this.lock) return
24+
this.lock = true
2325
setImmediate(() => {
2426
this.process()
2527
})
@@ -29,7 +31,7 @@ class ConnectionQueue extends EventEmitter {
2931
start () {
3032
if (this.eventTrigger) return
3133
this.eventTrigger = setInterval(() => {
32-
this.emit(ConnectionQueueEvent.Tick)
34+
this.emit(QueueEvent.Tick)
3335
}, this.triggerTime)
3436
}
3537

@@ -40,36 +42,48 @@ class ConnectionQueue extends EventEmitter {
4042
}
4143
}
4244

45+
checkTaskIsInQueue (id) {
46+
return this.taskMap.has(id)
47+
}
48+
4349
/**
44-
* push a promisify-task to queue
45-
* @param task {Promise}
46-
* @returns {boolean} if success return true, otherwise flase
50+
* pushWithKey a promisify-task to queue
51+
* @param id {string}
52+
* @param processingFunc {Function<Promise>}
53+
* @returns {boolean} if success return true, otherwise false
4754
*/
48-
push (task) {
55+
push (id, processingFunc) {
4956
if (this.queue.length >= this.max) return false
57+
if (this.checkTaskIsInQueue(id)) return false
58+
const task = {
59+
id: id,
60+
processingFunc: processingFunc
61+
}
62+
this.taskMap.set(id, true)
5063
this.queue.push(task)
5164
this.start()
65+
this.emit(QueueEvent.Tick)
5266
return true
5367
}
5468

5569
process () {
56-
if (this.lock) return
57-
this.lock = true
5870
if (this.queue.length <= 0) {
5971
this.stop()
6072
this.lock = false
6173
return
6274
}
75+
6376
const task = this.queue.shift()
77+
this.taskMap.delete(task.id)
6478

6579
const finishTask = () => {
6680
this.lock = false
6781
setImmediate(() => {
68-
this.process()
82+
this.emit(QueueEvent.Tick)
6983
})
7084
}
71-
task().then(finishTask).catch(finishTask)
85+
task.processingFunc().then(finishTask).catch(finishTask)
7286
}
7387
}
7488

75-
exports.ConnectionQueue = ConnectionQueue
89+
exports.ProcessQueue = ProcessQueue

lib/realtime.js

Lines changed: 53 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const models = require('./models')
2121
// ot
2222
const ot = require('./ot')
2323

24+
const { ProcessQueue } = require('./processQueue')
2425
const { RealtimeClientConnection } = require('./realtimeClientConnection')
2526
const { UpdateDirtyNoteJob } = require('./realtimeUpdateDirtyNoteJob')
2627

@@ -69,6 +70,7 @@ function secure (socket, next) {
6970
}
7071
}
7172

73+
// TODO: only use in `updateDirtyNote`
7274
function emitCheck (note) {
7375
var out = {
7476
title: note.title,
@@ -85,6 +87,9 @@ function emitCheck (note) {
8587
var users = {}
8688
var notes = {}
8789

90+
const disconnectProcessQueue = new ProcessQueue(2000, 500)
91+
disconnectProcessQueue.start()
92+
8893
const updateDirtyNoteJob = new UpdateDirtyNoteJob(realtime)
8994
updateDirtyNoteJob.start(realtime)
9095

@@ -173,8 +178,9 @@ setInterval(function () {
173178
id: key
174179
}
175180
}
176-
disconnectSocketQueue.push(socket)
177-
disconnect(socket)
181+
if (!disconnectProcessQueue.checkTaskIsInQueue(socket.id)) {
182+
exports.queueForDisconnect(socket)
183+
}
178184
}
179185
return callback(null, null)
180186
}, function (err) {
@@ -238,8 +244,8 @@ function getStatus (callback) {
238244
distinctOnlineRegisteredUsers: distinctregaddresses.length,
239245
isConnectionBusy: isConnectionBusy,
240246
connectionSocketQueueLength: connectionSocketQueue.length,
241-
isDisconnectBusy: isDisconnectBusy,
242-
disconnectSocketQueueLength: disconnectSocketQueue.length
247+
isDisconnectBusy: disconnectProcessQueue.lock,
248+
disconnectSocketQueueLength: disconnectProcessQueue.queue.length
243249
}) : null
244250
}).catch(function (err) {
245251
return logger.error('count user failed: ' + err)
@@ -253,7 +259,7 @@ function isReady () {
253259
return realtime.io &&
254260
Object.keys(notes).length === 0 && Object.keys(users).length === 0 &&
255261
connectionSocketQueue.length === 0 && !isConnectionBusy &&
256-
disconnectSocketQueue.length === 0 && !isDisconnectBusy
262+
disconnectProcessQueue.queue.length === 0 && !disconnectProcessQueue.lock
257263
}
258264

259265
function parseUrl (data) {
@@ -416,8 +422,6 @@ function checkViewPermission (req, note) {
416422

417423
var isConnectionBusy = false
418424
var connectionSocketQueue = []
419-
var isDisconnectBusy = false
420-
var disconnectSocketQueue = []
421425

422426
function finishConnection (socket, noteId, socketId) {
423427
// if no valid info provided will drop the client
@@ -562,71 +566,45 @@ function failConnection (code, err, socket) {
562566
return socket.disconnect(true)
563567
}
564568

565-
function disconnect (socket) {
566-
if (isDisconnectBusy) return
567-
isDisconnectBusy = true
568-
569-
if (config.debug) {
570-
logger.info('SERVER disconnected a client')
571-
logger.info(JSON.stringify(users[socket.id]))
572-
}
573-
574-
if (users[socket.id]) {
575-
delete users[socket.id]
576-
}
577-
var noteId = socket.noteId
578-
var note = notes[noteId]
579-
if (note) {
580-
// delete user in users
581-
if (note.users[socket.id]) {
582-
delete note.users[socket.id]
569+
function queueForDisconnect (socket) {
570+
disconnectProcessQueue.push(socket.id, async function () {
571+
if (users[socket.id]) {
572+
delete users[socket.id]
583573
}
584-
// remove sockets in the note socks
585-
do {
586-
var index = note.socks.indexOf(socket)
587-
if (index !== -1) {
588-
note.socks.splice(index, 1)
574+
const noteId = socket.noteId
575+
const note = notes[noteId]
576+
if (note) {
577+
// delete user in users
578+
if (note.users[socket.id]) {
579+
delete note.users[socket.id]
589580
}
590-
} while (index !== -1)
591-
// remove note in notes if no user inside
592-
if (Object.keys(note.users).length <= 0) {
593-
if (note.server.isDirty) {
594-
updateNote(note, function (err, _note) {
595-
if (err) return logger.error('disconnect note failed: ' + err)
596-
// clear server before delete to avoid memory leaks
597-
note.server.document = ''
598-
note.server.operations = []
581+
// remove sockets in the note socks
582+
let index
583+
do {
584+
index = note.socks.indexOf(socket)
585+
if (index !== -1) {
586+
note.socks.splice(index, 1)
587+
}
588+
} while (index !== -1)
589+
// remove note in notes if no user inside
590+
if (Object.keys(note.users).length === 0) {
591+
if (note.server.isDirty) {
592+
exports.updateNote(note, function (err, _note) {
593+
if (err) return logger.error('disconnect note failed: ' + err)
594+
// clear server before delete to avoid memory leaks
595+
note.server.document = ''
596+
note.server.operations = []
597+
delete note.server
598+
delete notes[noteId]
599+
})
600+
} else {
599601
delete note.server
600602
delete notes[noteId]
601-
if (config.debug) {
602-
// logger.info(notes);
603-
getStatus(function (data) {
604-
logger.info(JSON.stringify(data))
605-
})
606-
}
607-
})
608-
} else {
609-
delete note.server
610-
delete notes[noteId]
603+
}
611604
}
612605
}
613-
}
614-
emitOnlineUsers(socket)
615-
616-
// clear finished socket in queue
617-
clearSocketQueue(disconnectSocketQueue, socket)
618-
// seek for next socket
619-
isDisconnectBusy = false
620-
if (disconnectSocketQueue.length > 0) {
621-
disconnect(disconnectSocketQueue[0])
622-
}
623-
624-
if (config.debug) {
625-
// logger.info(notes);
626-
getStatus(function (data) {
627-
logger.info(JSON.stringify(data))
628-
})
629-
}
606+
exports.emitOnlineUsers(socket)
607+
})
630608
}
631609

632610
function buildUserOutData (user) {
@@ -818,6 +796,11 @@ function connection (socket) {
818796
socketClient.registerEventHandler()
819797
}
820798

799+
function terminate () {
800+
disconnectProcessQueue.stop()
801+
updateDirtyNoteJob.stop()
802+
}
803+
821804
exports = module.exports = realtime
822805
exports.extractNoteIdFromSocket = extractNoteIdFromSocket
823806
exports.parseNoteIdFromSocket = parseNoteIdFromSocket
@@ -829,7 +812,6 @@ exports.updateUserData = updateUserData
829812
exports.startConnection = startConnection
830813
exports.emitRefresh = emitRefresh
831814
exports.emitUserStatus = emitUserStatus
832-
exports.disconnect = disconnect
833815
exports.emitOnlineUsers = emitOnlineUsers
834816
exports.checkViewPermission = checkViewPermission
835817
exports.getNoteFromNotePool = getNoteFromNotePool
@@ -838,6 +820,9 @@ exports.buildUserOutData = buildUserOutData
838820
exports.getNotePool = getNotePool
839821
exports.emitCheck = emitCheck
840822
exports.disconnectSocketOnNote = disconnectSocketOnNote
823+
exports.queueForDisconnect = queueForDisconnect
824+
exports.terminate = terminate
825+
exports.getUserPool = getUserPool
826+
exports.disconnectProcessQueue = disconnectProcessQueue
841827
exports.notes = notes
842828
exports.users = users
843-
exports.disconnectSocketQueue = disconnectSocketQueue

lib/realtimeClientConnection.js

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,10 @@ class RealtimeClientConnection {
222222
}
223223

224224
disconnectEventHandler () {
225-
if (this.realtime.isDuplicatedInSocketQueue(this.realtime.disconnectSocketQueue, this.socket)) return
226-
this.realtime.disconnectSocketQueue.push(this.socket)
227-
this.realtime.disconnect(this.socket)
225+
if (this.realtime.disconnectProcessQueue.checkTaskIsInQueue(this.socket.id)) {
226+
return
227+
}
228+
this.realtime.queueForDisconnect(this.socket)
228229
}
229230
}
230231

0 commit comments

Comments
 (0)