Skip to content

Commit 9065953

Browse files
committed
refactor(sdam): introduce a state machine for unified topology
The `Topology` type gains a state machine for tracking its current connection state. NODE-2280
1 parent e7e6641 commit 9065953

File tree

3 files changed

+97
-7
lines changed

3 files changed

+97
-7
lines changed

lib/core/sdam/topology.js

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,33 @@ const LOCAL_SERVER_EVENTS = SERVER_RELAY_EVENTS.concat([
5959
'ended'
6060
]);
6161

62+
const STATE_CLOSING = 'closing';
63+
const STATE_CLOSED = 'closed';
64+
const STATE_CONNECTING = 'connecting';
65+
const STATE_CONNECTED = 'connected';
66+
67+
function stateTransition(topology, newState) {
68+
const legalTransitions = {
69+
[STATE_CLOSED]: [STATE_CLOSED, STATE_CONNECTING],
70+
[STATE_CONNECTING]: [STATE_CONNECTING, STATE_CLOSING, STATE_CONNECTED, STATE_CLOSED],
71+
[STATE_CONNECTED]: [STATE_CONNECTED, STATE_CLOSING, STATE_CLOSED],
72+
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
73+
};
74+
75+
const legalStates = legalTransitions[topology.s.state];
76+
if (legalStates && legalStates.indexOf(newState) < 0) {
77+
console.log('throwing an error');
78+
throw new MongoError(
79+
`illegal state transition from [${
80+
topology.s.state
81+
}] => [${newState}], allowed: [${legalStates}]`
82+
);
83+
}
84+
85+
topology.emit('stateChanged', topology.s.state, newState);
86+
topology.s.state = newState;
87+
}
88+
6289
/**
6390
* A container of server instances representing a connection to a MongoDB topology.
6491
*
@@ -117,6 +144,8 @@ class Topology extends EventEmitter {
117144
options,
118145
// initial seedlist of servers to connect to
119146
seedlist: seedlist,
147+
// initial state
148+
state: STATE_CLOSED,
120149
// the topology description
121150
description: new TopologyDescription(
122151
topologyType,
@@ -229,6 +258,15 @@ class Topology extends EventEmitter {
229258
connect(options, callback) {
230259
if (typeof options === 'function') (callback = options), (options = {});
231260
options = options || {};
261+
if (this.s.state === STATE_CONNECTED) {
262+
if (typeof callback === 'function') {
263+
callback();
264+
}
265+
266+
return;
267+
}
268+
269+
stateTransition(this, STATE_CONNECTING);
232270

233271
// emit SDAM monitoring events
234272
this.emit('topologyOpening', new monitoring.TopologyOpeningEvent(this.s.id));
@@ -243,17 +281,15 @@ class Topology extends EventEmitter {
243281
)
244282
);
245283

284+
// connect all known servers, then attempt server selection to connect
246285
connectServers(this, Array.from(this.s.description.servers.values()));
247-
this.s.connected = true;
248-
249-
// otherwise, wait for a server to properly connect based on user provided read preference,
250-
// or primary.
251286

252287
translateReadPreference(options);
253288
const readPreference = options.readPreference || ReadPreference.primary;
254289

255290
this.selectServer(readPreferenceServerSelector(readPreference), options, (err, server) => {
256291
if (err) {
292+
stateTransition(this, STATE_CLOSED);
257293
if (typeof callback === 'function') {
258294
callback(err, null);
259295
} else {
@@ -264,11 +300,13 @@ class Topology extends EventEmitter {
264300
}
265301

266302
const errorHandler = err => {
303+
stateTransition(this, STATE_CLOSED);
267304
server.removeListener('connect', connectHandler);
268305
if (typeof callback === 'function') callback(err, null);
269306
};
270307

271308
const connectHandler = (_, err) => {
309+
stateTransition(this, STATE_CONNECTED);
272310
server.removeListener('error', errorHandler);
273311
this.emit('open', err, this);
274312
this.emit('connect', this);
@@ -301,6 +339,13 @@ class Topology extends EventEmitter {
301339
}
302340

303341
options = options || {};
342+
if (this.s.state === STATE_CLOSED) {
343+
if (typeof callback === 'function') {
344+
callback();
345+
}
346+
347+
return;
348+
}
304349

305350
// clear all existing monitor timers
306351
this.s.monitorTimers.map(timer => clearTimeout(timer));
@@ -327,9 +372,12 @@ class Topology extends EventEmitter {
327372
delete this.s.detectTopologyDescriptionChange;
328373
}
329374

375+
// defer state transition because we may need to send an `endSessions` command above
376+
stateTransition(this, STATE_CLOSING);
377+
330378
const servers = this.s.servers;
331379
if (servers.size === 0) {
332-
this.s.connected = false;
380+
stateTransition(this, STATE_CLOSED);
333381
if (typeof callback === 'function') {
334382
callback(null, null);
335383
}
@@ -346,7 +394,7 @@ class Topology extends EventEmitter {
346394
// emit an event for close
347395
this.emit('topologyClosed', new monitoring.TopologyClosedEvent(this.s.id));
348396

349-
this.s.connected = false;
397+
stateTransition(this, STATE_CLOSED);
350398
if (typeof callback === 'function') {
351399
callback(null, null);
352400
}
@@ -807,7 +855,7 @@ function selectServers(topology, selector, timeout, start, callback) {
807855
}
808856

809857
// ensure we are connected
810-
if (!topology.s.connected) {
858+
if (topology.s.state !== STATE_CONNECTED && topology.s.state !== STATE_CONNECTING) {
811859
topology.connect();
812860

813861
// we want to make sure we're still within the requested timeout window
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
'use strict';
2+
const expect = require('chai').expect;
3+
4+
describe('Topology', { requires: { unifiedTopology: true } }, function() {
5+
it('should correctly track states of a topology', function(done) {
6+
const topology = this.configuration.newTopology();
7+
8+
const states = [];
9+
topology.on('stateChanged', (_, newState) => states.push(newState));
10+
topology.connect(err => {
11+
expect(err).to.not.exist;
12+
topology.destroy(err => {
13+
expect(err).to.not.exist;
14+
expect(states).to.eql(['connecting', 'connected', 'closing', 'closed']);
15+
done();
16+
});
17+
});
18+
});
19+
});

test/runner/filters/unified_filter.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
'use strict';
2+
3+
/**
4+
* Filter for tests that require the unified topology
5+
*
6+
* example:
7+
* metadata: {
8+
* requires: {
9+
* unifiedTopology: <boolean>
10+
* }
11+
* }
12+
*/
13+
class UnifiedTopologyFilter {
14+
filter(test) {
15+
if (!test.metadata) return true;
16+
if (!test.metadata.requires) return true;
17+
if (!test.metadata.requires.unifiedTopology) return true;
18+
19+
return !!process.env.MONGODB_UNIFIED_TOPOLOGY;
20+
}
21+
}
22+
23+
module.exports = UnifiedTopologyFilter;

0 commit comments

Comments
 (0)