Skip to content

Commit 16bca55

Browse files
committed
refactor: group all server selection related code into a module
1 parent d45d33c commit 16bca55

File tree

4 files changed

+151
-135
lines changed

4 files changed

+151
-135
lines changed

lib/core/sdam/common.js

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,32 @@ const ServerType = {
2828
Unknown: 'Unknown'
2929
};
3030

31+
const TOPOLOGY_DEFAULTS = {
32+
useUnifiedTopology: true,
33+
localThresholdMS: 15,
34+
serverSelectionTimeoutMS: 30000,
35+
heartbeatFrequencyMS: 10000,
36+
minHeartbeatFrequencyMS: 500
37+
};
38+
39+
function drainTimerQueue(queue) {
40+
queue.forEach(clearTimeout);
41+
queue.clear();
42+
}
43+
44+
function clearAndRemoveTimerFrom(timer, timers) {
45+
clearTimeout(timer);
46+
return timers.delete(timer);
47+
}
48+
3149
module.exports = {
3250
STATE_CLOSING,
3351
STATE_CLOSED,
3452
STATE_CONNECTING,
3553
STATE_CONNECTED,
54+
TOPOLOGY_DEFAULTS,
3655
TopologyType,
37-
ServerType
56+
ServerType,
57+
drainTimerQueue,
58+
clearAndRemoveTimerFrom
3859
};

lib/core/sdam/server_selectors.js renamed to lib/core/sdam/server_selection.js

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@ const ServerType = require('./common').ServerType;
33
const TopologyType = require('./common').TopologyType;
44
const ReadPreference = require('../topologies/read_preference');
55
const MongoError = require('../error').MongoError;
6+
const calculateDurationInMs = require('../utils').calculateDurationInMs;
7+
const MongoTimeoutError = require('../error').MongoTimeoutError;
8+
9+
const common = require('./common');
10+
const STATE_CONNECTED = common.STATE_CONNECTED;
11+
const STATE_CONNECTING = common.STATE_CONNECTING;
12+
const TOPOLOGY_DEFAULTS = common.TOPOLOGY_DEFAULTS;
13+
const drainTimerQueue = common.drainTimerQueue;
14+
const clearAndRemoveTimerFrom = common.clearAndRemoveTimerFrom;
615

716
// max staleness constants
817
const IDLE_WRITE_PERIOD = 10000;
@@ -238,7 +247,117 @@ function readPreferenceServerSelector(readPreference) {
238247
};
239248
}
240249

250+
/**
251+
* Selects servers using the provided selector
252+
*
253+
* @private
254+
* @param {Topology} topology The topology to select servers from
255+
* @param {function} selector The predicate used for selecting servers
256+
* @param {Number} timeout The max time we are willing wait for selection
257+
* @param {Number} start A high precision timestamp for the start of the selection process
258+
* @param {function} callback The callback used to convey errors or the resultant servers
259+
*/
260+
function selectServers(topology, selector, timeout, start, callback) {
261+
const duration = calculateDurationInMs(start);
262+
if (duration >= timeout) {
263+
return callback(
264+
new MongoTimeoutError(`Server selection timed out after ${timeout} ms`),
265+
topology.description.error
266+
);
267+
}
268+
269+
// ensure we are connected
270+
if (topology.s.state !== STATE_CONNECTED && topology.s.state !== STATE_CONNECTING) {
271+
topology.connect();
272+
273+
// we want to make sure we're still within the requested timeout window
274+
const failToConnectTimer = setTimeout(() => {
275+
topology.removeListener('connect', connectHandler);
276+
callback(
277+
new MongoTimeoutError('Server selection timed out waiting to connect'),
278+
topology.description.error
279+
);
280+
}, timeout - duration);
281+
282+
const connectHandler = () => {
283+
clearAndRemoveTimerFrom(failToConnectTimer, topology.s.connectionTimers);
284+
selectServers(topology, selector, timeout, process.hrtime(), callback);
285+
};
286+
287+
topology.s.connectionTimers.add(failToConnectTimer);
288+
topology.once('connect', connectHandler);
289+
return;
290+
}
291+
292+
// otherwise, attempt server selection
293+
const serverDescriptions = Array.from(topology.description.servers.values());
294+
let descriptions;
295+
296+
// support server selection by options with readPreference
297+
if (typeof selector === 'object') {
298+
const readPreference = selector.readPreference
299+
? selector.readPreference
300+
: ReadPreference.primary;
301+
302+
selector = readPreferenceServerSelector(readPreference);
303+
}
304+
305+
try {
306+
descriptions = selector
307+
? selector(topology.description, serverDescriptions)
308+
: serverDescriptions;
309+
} catch (e) {
310+
return callback(e, null);
311+
}
312+
313+
if (descriptions.length) {
314+
const servers = descriptions.map(description => topology.s.servers.get(description.address));
315+
return callback(null, servers);
316+
}
317+
318+
const retrySelection = () => {
319+
// clear all existing monitor timers
320+
drainTimerQueue(topology.s.monitorTimers);
321+
322+
// ensure all server monitors attempt monitoring soon
323+
topology.s.servers.forEach(server => {
324+
const timer = setTimeout(
325+
() => server.monitor({ heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS }),
326+
TOPOLOGY_DEFAULTS.minHeartbeatFrequencyMS
327+
);
328+
329+
topology.s.monitorTimers.add(timer);
330+
});
331+
332+
const iterationTimer = setTimeout(() => {
333+
topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler);
334+
callback(
335+
new MongoTimeoutError(
336+
`Server selection timed out after ${timeout} ms`,
337+
topology.description.error
338+
)
339+
);
340+
}, timeout - duration);
341+
342+
const descriptionChangedHandler = () => {
343+
// successful iteration, clear the check timer
344+
clearAndRemoveTimerFrom(iterationTimer, topology.s.iterationTimers);
345+
346+
// topology description has changed due to monitoring, reattempt server selection
347+
selectServers(topology, selector, timeout, start, callback);
348+
};
349+
350+
// track this timer in case we need to clean it up outside this loop
351+
topology.s.iterationTimers.add(iterationTimer);
352+
353+
topology.once('topologyDescriptionChanged', descriptionChangedHandler);
354+
};
355+
356+
retrySelection();
357+
}
358+
241359
module.exports = {
360+
selectServers,
242361
writableServerSelector,
243362
readPreferenceServerSelector
244363
};

lib/core/sdam/topology.js

Lines changed: 9 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,9 @@ const ServerType = require('./common').ServerType;
55
const TopologyDescription = require('./topology_description');
66
const TopologyType = require('./common').TopologyType;
77
const monitoring = require('./monitoring');
8-
const calculateDurationInMs = require('../utils').calculateDurationInMs;
9-
const MongoTimeoutError = require('../error').MongoTimeoutError;
108
const Server = require('./server');
119
const relayEvents = require('../utils').relayEvents;
1210
const ReadPreference = require('../topologies/read_preference');
13-
const readPreferenceServerSelector = require('./server_selectors').readPreferenceServerSelector;
14-
const writableServerSelector = require('./server_selectors').writableServerSelector;
1511
const isRetryableWritesSupported = require('../topologies/shared').isRetryableWritesSupported;
1612
const CoreCursor = require('../cursor').CoreCursor;
1713
const deprecate = require('util').deprecate;
@@ -27,20 +23,19 @@ const SrvPoller = require('./srv_polling').SrvPoller;
2723
const getMMAPError = require('../topologies/shared').getMMAPError;
2824
const makeStateMachine = require('../utils').makeStateMachine;
2925
const eachAsync = require('../utils').eachAsync;
26+
3027
const common = require('./common');
28+
const drainTimerQueue = common.drainTimerQueue;
29+
const clearAndRemoveTimerFrom = common.clearAndRemoveTimerFrom;
30+
31+
const serverSelection = require('./server_selection');
32+
const readPreferenceServerSelector = serverSelection.readPreferenceServerSelector;
33+
const writableServerSelector = serverSelection.writableServerSelector;
34+
const selectServers = serverSelection.selectServers;
3135

3236
// Global state
3337
let globalTopologyCounter = 0;
3438

35-
// Constants
36-
const TOPOLOGY_DEFAULTS = {
37-
useUnifiedTopology: true,
38-
localThresholdMS: 15,
39-
serverSelectionTimeoutMS: 30000,
40-
heartbeatFrequencyMS: 10000,
41-
minHeartbeatFrequencyMS: 500
42-
};
43-
4439
// events that we relay to the `Topology`
4540
const SERVER_RELAY_EVENTS = [
4641
'serverHeartbeatStarted',
@@ -114,7 +109,7 @@ class Topology extends EventEmitter {
114109
seedlist = parseStringSeedlist(seedlist);
115110
}
116111

117-
options = Object.assign({}, TOPOLOGY_DEFAULTS, options);
112+
options = Object.assign({}, common.TOPOLOGY_DEFAULTS, options);
118113

119114
const topologyType = topologyTypeFromSeedlist(seedlist, options);
120115
const topologyId = globalTopologyCounter++;
@@ -810,115 +805,6 @@ function randomSelection(array) {
810805
return array[Math.floor(Math.random() * array.length)];
811806
}
812807

813-
/**
814-
* Selects servers using the provided selector
815-
*
816-
* @private
817-
* @param {Topology} topology The topology to select servers from
818-
* @param {function} selector The actual predicate used for selecting servers
819-
* @param {Number} timeout The max time we are willing wait for selection
820-
* @param {Number} start A high precision timestamp for the start of the selection process
821-
* @param {function} callback The callback used to convey errors or the resultant servers
822-
*/
823-
function selectServers(topology, selector, timeout, start, callback) {
824-
const duration = calculateDurationInMs(start);
825-
if (duration >= timeout) {
826-
return callback(
827-
new MongoTimeoutError(`Server selection timed out after ${timeout} ms`),
828-
topology.description.error
829-
);
830-
}
831-
832-
// ensure we are connected
833-
if (topology.s.state !== STATE_CONNECTED && topology.s.state !== STATE_CONNECTING) {
834-
topology.connect();
835-
836-
// we want to make sure we're still within the requested timeout window
837-
const failToConnectTimer = setTimeout(() => {
838-
topology.removeListener('connect', connectHandler);
839-
callback(
840-
new MongoTimeoutError('Server selection timed out waiting to connect'),
841-
topology.description.error
842-
);
843-
}, timeout - duration);
844-
845-
const connectHandler = () => {
846-
clearAndRemoveTimerFrom(failToConnectTimer, topology.s.connectionTimers);
847-
selectServers(topology, selector, timeout, process.hrtime(), callback);
848-
};
849-
850-
topology.s.connectionTimers.add(failToConnectTimer);
851-
topology.once('connect', connectHandler);
852-
return;
853-
}
854-
855-
// otherwise, attempt server selection
856-
const serverDescriptions = Array.from(topology.description.servers.values());
857-
let descriptions;
858-
859-
// support server selection by options with readPreference
860-
if (typeof selector === 'object') {
861-
const readPreference = selector.readPreference
862-
? selector.readPreference
863-
: ReadPreference.primary;
864-
865-
selector = readPreferenceServerSelector(readPreference);
866-
}
867-
868-
try {
869-
descriptions = selector
870-
? selector(topology.description, serverDescriptions)
871-
: serverDescriptions;
872-
} catch (e) {
873-
return callback(e, null);
874-
}
875-
876-
if (descriptions.length) {
877-
const servers = descriptions.map(description => topology.s.servers.get(description.address));
878-
return callback(null, servers);
879-
}
880-
881-
const retrySelection = () => {
882-
// clear all existing monitor timers
883-
drainTimerQueue(topology.s.monitorTimers);
884-
885-
// ensure all server monitors attempt monitoring soon
886-
topology.s.servers.forEach(server => {
887-
const timer = setTimeout(
888-
() => server.monitor({ heartbeatFrequencyMS: topology.description.heartbeatFrequencyMS }),
889-
TOPOLOGY_DEFAULTS.minHeartbeatFrequencyMS
890-
);
891-
892-
topology.s.monitorTimers.add(timer);
893-
});
894-
895-
const iterationTimer = setTimeout(() => {
896-
topology.removeListener('topologyDescriptionChanged', descriptionChangedHandler);
897-
callback(
898-
new MongoTimeoutError(
899-
`Server selection timed out after ${timeout} ms`,
900-
topology.description.error
901-
)
902-
);
903-
}, timeout - duration);
904-
905-
const descriptionChangedHandler = () => {
906-
// successful iteration, clear the check timer
907-
clearAndRemoveTimerFrom(iterationTimer, topology.s.iterationTimers);
908-
909-
// topology description has changed due to monitoring, reattempt server selection
910-
selectServers(topology, selector, timeout, start, callback);
911-
};
912-
913-
// track this timer in case we need to clean it up outside this loop
914-
topology.s.iterationTimers.add(iterationTimer);
915-
916-
topology.once('topologyDescriptionChanged', descriptionChangedHandler);
917-
};
918-
919-
retrySelection();
920-
}
921-
922808
function createAndConnectServer(topology, serverDescription, connectDelay) {
923809
topology.emit(
924810
'serverOpening',
@@ -965,16 +851,6 @@ function resetServer(topology, serverDescription) {
965851
topology.s.servers.set(serverDescription.address, newServer);
966852
}
967853

968-
function drainTimerQueue(queue) {
969-
queue.forEach(clearTimeout);
970-
queue.clear();
971-
}
972-
973-
function clearAndRemoveTimerFrom(timer, timers) {
974-
clearTimeout(timer);
975-
return timers.delete(timer);
976-
}
977-
978854
/**
979855
* Create `Server` instances for all initially known servers, connect them, and assign
980856
* them to the passed in `Topology`.

test/unit/sdam/server_selection/spec_tests.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const ReadPreference = core.ReadPreference;
1010
const Server = require('../../../../lib/core/sdam/server');
1111
const ServerType = require('../../../../lib/core/sdam/common').ServerType;
1212
const ServerDescription = require('../../../../lib/core/sdam/server_description');
13-
const ServerSelectors = require('../../../../lib/core/sdam/server_selectors');
13+
const ServerSelectors = require('../../../../lib/core/sdam/server_selection');
1414

1515
const EJSON = require('mongodb-extjson');
1616

0 commit comments

Comments
 (0)