Skip to content
This repository was archived by the owner on May 14, 2024. It is now read-only.

Commit 92341e7

Browse files
committed
Refactor MessageTracker into testable module
1 parent 2e1ef78 commit 92341e7

File tree

11 files changed

+580
-120
lines changed

11 files changed

+580
-120
lines changed

lib/client/client.js

Lines changed: 6 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
'use strict'
22

33
const requestQueueFactory = require('./request-queue')
4+
const messageTrackerFactory = require('./message-tracker')
5+
const { MAX_MSGID } = require('./constants')
46

57
var EventEmitter = require('events').EventEmitter
68
var net = require('net')
@@ -49,7 +51,6 @@ var PresenceFilter = filters.PresenceFilter
4951
var ConnectionError = errors.ConnectionError
5052

5153
var CMP_EXPECT = [errors.LDAP_COMPARE_TRUE, errors.LDAP_COMPARE_FALSE]
52-
var MAX_MSGID = Math.pow(2, 31) - 1
5354

5455
// node 0.6 got rid of FDs, so make up a client id for logging
5556
var CLIENT_ID = 0
@@ -88,117 +89,6 @@ function ensureDN (input, strict) {
8889
}
8990
}
9091

91-
/**
92-
* Track message callback by messageID.
93-
*/
94-
function MessageTracker (opts) {
95-
assert.object(opts)
96-
assert.string(opts.id)
97-
assert.object(opts.parser)
98-
99-
this.id = opts.id
100-
this._msgid = 0
101-
this._messages = {}
102-
this._abandoned = {}
103-
this.parser = opts.parser
104-
105-
var self = this
106-
this.__defineGetter__('pending', function () {
107-
return Object.keys(self._messages)
108-
})
109-
}
110-
111-
/**
112-
* Record a messageID and callback.
113-
*/
114-
MessageTracker.prototype.track = function track (message, callback) {
115-
var msgid = this._nextID()
116-
message.messageID = msgid
117-
this._messages[msgid] = callback
118-
return msgid
119-
}
120-
121-
/**
122-
* Fetch callback based on messageID.
123-
*/
124-
MessageTracker.prototype.fetch = function fetch (msgid) {
125-
var msg = this._messages[msgid]
126-
if (msg) {
127-
this._purgeAbandoned(msgid)
128-
return msg
129-
}
130-
// It's possible that the server has not received the abandon request yet.
131-
// While waiting for evidence that the abandon has been received, incoming
132-
// messages that match the abandoned msgid will be handled as normal.
133-
msg = this._abandoned[msgid]
134-
if (msg) {
135-
return msg.cb
136-
}
137-
return null
138-
}
139-
140-
/**
141-
* Cease tracking for a given messageID.
142-
*/
143-
MessageTracker.prototype.remove = function remove (msgid) {
144-
if (this._messages[msgid]) {
145-
delete this._messages[msgid]
146-
} else if (this._abandoned[msgid]) {
147-
delete this._abandoned[msgid]
148-
}
149-
}
150-
151-
/**
152-
* Mark a messageID as abandoned.
153-
*/
154-
MessageTracker.prototype.abandon = function abandonMsg (msgid) {
155-
if (this._messages[msgid]) {
156-
// Keep track of "when" the message was abandoned
157-
this._abandoned[msgid] = {
158-
age: this._msgid,
159-
cb: this._messages[msgid]
160-
}
161-
delete this._messages[msgid]
162-
}
163-
}
164-
165-
/**
166-
* Purge old items from abandoned list.
167-
*/
168-
MessageTracker.prototype._purgeAbandoned = function _purgeAbandoned (msgid) {
169-
var self = this
170-
// Is (comp >= ref) according to sliding window
171-
function geWindow (ref, comp) {
172-
var max = ref + (MAX_MSGID / 2)
173-
var min = ref
174-
if (max >= MAX_MSGID) {
175-
// Handle roll-over
176-
max = max - MAX_MSGID - 1
177-
return ((comp <= max) || (comp >= min))
178-
} else {
179-
return ((comp <= max) && (comp >= min))
180-
}
181-
}
182-
183-
Object.keys(this._abandoned).forEach(function (id) {
184-
// Abandoned messageIDs can be forgotten if a received messageID is "newer"
185-
if (geWindow(self._abandoned[id].age, msgid)) {
186-
self._abandoned[id].cb(new errors.AbandonedError(
187-
'client request abandoned'))
188-
delete self._abandoned[id]
189-
}
190-
})
191-
}
192-
193-
/**
194-
* Allocate the next messageID according to a sliding window.
195-
*/
196-
MessageTracker.prototype._nextID = function _nextID () {
197-
if (++this._msgid >= MAX_MSGID) { this._msgid = 1 }
198-
199-
return this._msgid
200-
}
201-
20292
/// --- API
20393

20494
/**
@@ -947,7 +837,7 @@ Client.prototype.connect = function connect () {
947837

948838
// Initialize socket events and LDAP parser.
949839
function initSocket () {
950-
tracker = new MessageTracker({
840+
tracker = messageTrackerFactory({
951841
id: self.url ? self.url.href : self.socketPath,
952842
parser: new Parser({ log: log })
953843
})
@@ -1157,11 +1047,8 @@ Client.prototype._onClose = function _onClose (closeError) {
11571047
this.emit('close', closeError)
11581048
// On close we have to walk the outstanding messages and go invoke their
11591049
// callback with an error.
1160-
tracker.pending.forEach(function (msgid) {
1161-
var cb = tracker.fetch(msgid)
1162-
tracker.remove(msgid)
1163-
1164-
if (socket.unbindMessageID !== parseInt(msgid, 10)) {
1050+
tracker.purge(function (msgid, cb) {
1051+
if (socket.unbindMessageID !== msgid) {
11651052
return cb(new ConnectionError(tracker.id + ' closed'))
11661053
} else {
11671054
// Unbinds will be communicated as a success since we're closed
@@ -1201,7 +1088,7 @@ Client.prototype._updateIdle = function _updateIdle (override) {
12011088
function isIdle (disable) {
12021089
return ((disable !== true) &&
12031090
(self._socket && self.connected) &&
1204-
(self._tracker.pending.length === 0))
1091+
(self._tracker.pending === 0))
12051092
}
12061093
if (isIdle(override)) {
12071094
if (!this._idleTimer) {

lib/client/constants.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
'use strict'
2+
3+
module.exports = {
4+
// https://tools.ietf.org/html/rfc4511#section-4.1.1
5+
// Message identifiers are an integer between (0, maxint).
6+
MAX_MSGID: Math.pow(2, 31) - 1
7+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
'use strict'
2+
3+
const { MAX_MSGID } = require('../constants')
4+
5+
/**
6+
* Compare a reference id with another id to determine "greater than or equal"
7+
* between the two values according to a sliding window.
8+
*
9+
* @param {integer} ref
10+
* @param {integer} comp
11+
*
12+
* @returns {boolean} `true` if the `comp` value is >= to the `ref` value
13+
* within the computed window, otherwise `false`.
14+
*/
15+
module.exports = function geWindow (ref, comp) {
16+
let max = ref + Math.floor(MAX_MSGID / 2)
17+
const min = ref
18+
if (max >= MAX_MSGID) {
19+
// Handle roll-over
20+
max = max - MAX_MSGID - 1
21+
return ((comp <= max) || (comp >= min))
22+
} else {
23+
return ((comp <= max) && (comp >= min))
24+
}
25+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
'use strict'
2+
3+
const { MAX_MSGID } = require('../constants')
4+
5+
/**
6+
* Returns a function that generates message identifiers. According to RFC 4511
7+
* the identifers should be `(0, MAX_MSGID)`. The returned function handles
8+
* this and wraps around when the maximum has been reached.
9+
*
10+
* @param {integer} [start=0] Starting number in the identifier sequence.
11+
*
12+
* @returns {function} This function accepts no parameters and returns an
13+
* increasing sequence identifier each invocation until it reaches the maximum
14+
* identifier. At this point the sequence starts over.
15+
*/
16+
module.exports = function idGeneratorFactory (start = 0) {
17+
let currentID = start
18+
return function nextID () {
19+
const nextID = currentID + 1
20+
currentID = (nextID >= MAX_MSGID) ? 1 : nextID
21+
return currentID
22+
}
23+
}

lib/client/message-tracker/index.js

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
'use strict'
2+
3+
const idGeneratorFactory = require('./id-generator')
4+
const purgeAbandoned = require('./purge-abandoned')
5+
6+
/**
7+
* Returns a message tracker object that keeps track of which message
8+
* identifiers correspond to which message handlers. Also handles keeping track
9+
* of abandoned messages.
10+
*
11+
* @param {object} options
12+
* @param {string} options.id An identifier for the tracker.
13+
* @param {object} options.parser An object that will be used to parse messages.
14+
*
15+
* @returns {MessageTracker}
16+
*/
17+
module.exports = function messageTrackerFactory (options) {
18+
if (Object.prototype.toString.call(options) !== '[object Object]') {
19+
throw Error('options object is required')
20+
}
21+
if (!options.id || typeof options.id !== 'string') {
22+
throw Error('options.id string is required')
23+
}
24+
if (!options.parser || Object.prototype.toString.call(options.parser) !== '[object Object]') {
25+
throw Error('options.parser object is required')
26+
}
27+
28+
let currentID = 0
29+
const nextID = idGeneratorFactory()
30+
const messages = new Map()
31+
const abandoned = new Map()
32+
33+
/**
34+
* @typedef {object} MessageTracker
35+
* @property {string} id The identifier of the tracker as supplied via the options.
36+
* @property {object} parser The parser object given by the the options.
37+
*/
38+
const tracker = {
39+
id: options.id,
40+
parser: options.parser
41+
}
42+
43+
/**
44+
* Count of messages awaiting response.
45+
*
46+
* @alias pending
47+
* @memberof! MessageTracker#
48+
*/
49+
Object.defineProperty(tracker, 'pending', {
50+
get () {
51+
return messages.size
52+
}
53+
})
54+
55+
/**
56+
* Move a specific message to the abanded track.
57+
*
58+
* @param {integer} msgID The identifier for the message to move.
59+
*
60+
* @memberof MessageTracker
61+
* @method abandon
62+
*/
63+
tracker.abandon = function abandonMessage (msgID) {
64+
if (messages.has(msgID) === false) return false
65+
abandoned.set(msgID, {
66+
age: currentID,
67+
cb: messages.get(msgID)
68+
})
69+
return messages.delete(msgID)
70+
}
71+
72+
/**
73+
* Retrieves the message handler for a message. Removes abandoned messages
74+
* that have been given time to be resolved.
75+
*
76+
* @param {integer} msgID The identifier for the message to get the handler for.
77+
*
78+
* @memberof MessageTracker
79+
* @method fetch
80+
*/
81+
tracker.fetch = function fetchMessage (msgID) {
82+
const messageCB = messages.get(msgID)
83+
if (messageCB) {
84+
purgeAbandoned(msgID, abandoned)
85+
return messageCB
86+
}
87+
88+
// We sent an abandon request but the server either wasn't able to process
89+
// it or has not received it yet. Therefore, we received a response for the
90+
// abandoned message. So we must return the abandoned message's callback
91+
// to be processed normally.
92+
const abandonedMsg = abandoned.get(msgID)
93+
if (abandonedMsg) {
94+
return abandonedMsg.cb
95+
}
96+
97+
return null
98+
}
99+
100+
/**
101+
* Removes all message tracks, cleans up the abandoned track, and invokes
102+
* a callback for each message purged.
103+
*
104+
* @param {function} cb A function with the signature `(msgID, handler)`.
105+
*
106+
* @memberof MessageTracker
107+
* @method purge
108+
*/
109+
tracker.purge = function purgeMessages (cb) {
110+
messages.forEach((val, key) => {
111+
purgeAbandoned(key, abandoned)
112+
tracker.remove(key)
113+
cb(key, val)
114+
})
115+
}
116+
117+
/**
118+
* Removes a message from all tracking.
119+
*
120+
* @param {integer} msgID The identifier for the message to remove from tracking.
121+
*
122+
* @memberof MessageTracker
123+
* @method remove
124+
*/
125+
tracker.remove = function removeMessage (msgID) {
126+
if (messages.delete(msgID) === false) {
127+
abandoned.delete(msgID)
128+
}
129+
}
130+
131+
/**
132+
* Add a message handler to be tracked.
133+
*
134+
* @param {object} message The message object to be tracked. This object will
135+
* have a new property added to it: `messageID`.
136+
* @param {function} callback The handler for the message.
137+
*
138+
* @memberof MessageTracker
139+
* @method track
140+
*/
141+
tracker.track = function trackMessage (message, callback) {
142+
currentID = nextID()
143+
// This side effect is not ideal but the client doesn't attach the tracker
144+
// to itself until after the `.connect` method has fired. If this can be
145+
// refactored later, then we can possibly get rid of this side effect.
146+
message.messageID = currentID
147+
messages.set(currentID, callback)
148+
}
149+
150+
return tracker
151+
}

0 commit comments

Comments
 (0)