Skip to content

Commit bc3d020

Browse files
authored
fix(NODE-5751): RTTPinger always sends legacy hello (#3923)
1 parent c698918 commit bc3d020

File tree

4 files changed

+210
-38
lines changed

4 files changed

+210
-38
lines changed

src/sdam/monitor.ts

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ const kConnection = Symbol('connection');
2626
/** @internal */
2727
const kCancellationToken = Symbol('cancellationToken');
2828
/** @internal */
29-
const kRTTPinger = Symbol('rttPinger');
30-
/** @internal */
3129
const kRoundTripTime = Symbol('roundTripTime');
3230

3331
const STATE_IDLE = 'idle';
@@ -81,7 +79,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
8179
[kCancellationToken]: CancellationToken;
8280
/** @internal */
8381
[kMonitorId]?: MonitorInterval;
84-
[kRTTPinger]?: RTTPinger;
82+
rttPinger?: RTTPinger;
8583

8684
get connection(): Connection | undefined {
8785
return this[kConnection];
@@ -197,8 +195,8 @@ function resetMonitorState(monitor: Monitor) {
197195
monitor[kMonitorId]?.stop();
198196
monitor[kMonitorId] = undefined;
199197

200-
monitor[kRTTPinger]?.close();
201-
monitor[kRTTPinger] = undefined;
198+
monitor.rttPinger?.close();
199+
monitor.rttPinger = undefined;
202200

203201
monitor[kCancellationToken].emit('cancel');
204202

@@ -251,8 +249,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
251249
}
252250
: { socketTimeoutMS: connectTimeoutMS };
253251

254-
if (isAwaitable && monitor[kRTTPinger] == null) {
255-
monitor[kRTTPinger] = new RTTPinger(
252+
if (isAwaitable && monitor.rttPinger == null) {
253+
monitor.rttPinger = new RTTPinger(
256254
monitor[kCancellationToken],
257255
Object.assign(
258256
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
@@ -271,9 +269,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
271269
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
272270
}
273271

274-
const rttPinger = monitor[kRTTPinger];
275272
const duration =
276-
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);
273+
isAwaitable && monitor.rttPinger
274+
? monitor.rttPinger.roundTripTime
275+
: calculateDurationInMs(start);
277276

278277
monitor.emit(
279278
Server.SERVER_HEARTBEAT_SUCCEEDED,
@@ -289,8 +288,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
289288
);
290289
start = now();
291290
} else {
292-
monitor[kRTTPinger]?.close();
293-
monitor[kRTTPinger] = undefined;
291+
monitor.rttPinger?.close();
292+
monitor.rttPinger = undefined;
294293

295294
callback(undefined, hello);
296295
}
@@ -383,7 +382,7 @@ export interface RTTPingerOptions extends ConnectionOptions {
383382
/** @internal */
384383
export class RTTPinger {
385384
/** @internal */
386-
[kConnection]?: Connection;
385+
connection?: Connection;
387386
/** @internal */
388387
[kCancellationToken]: CancellationToken;
389388
/** @internal */
@@ -393,7 +392,7 @@ export class RTTPinger {
393392
closed: boolean;
394393

395394
constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
396-
this[kConnection] = undefined;
395+
this.connection = undefined;
397396
this[kCancellationToken] = cancellationToken;
398397
this[kRoundTripTime] = 0;
399398
this.closed = false;
@@ -410,8 +409,8 @@ export class RTTPinger {
410409
this.closed = true;
411410
clearTimeout(this[kMonitorId]);
412411

413-
this[kConnection]?.destroy({ force: true });
414-
this[kConnection] = undefined;
412+
this.connection?.destroy({ force: true });
413+
this.connection = undefined;
415414
}
416415
}
417416

@@ -430,8 +429,8 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
430429
return;
431430
}
432431

433-
if (rttPinger[kConnection] == null) {
434-
rttPinger[kConnection] = conn;
432+
if (rttPinger.connection == null) {
433+
rttPinger.connection = conn;
435434
}
436435

437436
rttPinger[kRoundTripTime] = calculateDurationInMs(start);
@@ -441,11 +440,11 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
441440
);
442441
}
443442

444-
const connection = rttPinger[kConnection];
443+
const connection = rttPinger.connection;
445444
if (connection == null) {
446445
connect(options, (err, conn) => {
447446
if (err) {
448-
rttPinger[kConnection] = undefined;
447+
rttPinger.connection = undefined;
449448
rttPinger[kRoundTripTime] = 0;
450449
return;
451450
}
@@ -456,9 +455,12 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
456455
return;
457456
}
458457

459-
connection.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
458+
const commandName =
459+
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
460+
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined, err => {
460461
if (err) {
461-
rttPinger[kConnection] = undefined;
462+
rttPinger.connection?.destroy({ force: true });
463+
rttPinger.connection = undefined;
462464
rttPinger[kRoundTripTime] = 0;
463465
return;
464466
}

src/sdam/server.ts

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,6 @@ const stateTransition = makeStateMachine({
7272
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
7373
});
7474

75-
/** @internal */
76-
const kMonitor = Symbol('monitor');
77-
7875
/** @public */
7976
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
8077
MonitorOptions;
@@ -118,7 +115,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
118115
s: ServerPrivate;
119116
serverApi?: ServerApi;
120117
hello?: Document;
121-
[kMonitor]: Monitor | null;
118+
monitor: Monitor | null;
122119

123120
/** @event */
124121
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
@@ -164,22 +161,21 @@ export class Server extends TypedEventEmitter<ServerEvents> {
164161
});
165162

166163
if (this.loadBalanced) {
167-
this[kMonitor] = null;
164+
this.monitor = null;
168165
// monitoring is disabled in load balancing mode
169166
return;
170167
}
171168

172169
// create the monitor
173170
// TODO(NODE-4144): Remove new variable for type narrowing
174-
const monitor = new Monitor(this, this.s.options);
175-
this[kMonitor] = monitor;
171+
this.monitor = new Monitor(this, this.s.options);
176172

177173
for (const event of HEARTBEAT_EVENTS) {
178-
monitor.on(event, (e: any) => this.emit(event, e));
174+
this.monitor.on(event, (e: any) => this.emit(event, e));
179175
}
180176

181-
monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
182-
monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
177+
this.monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
178+
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
183179
this.emit(
184180
Server.DESCRIPTION_RECEIVED,
185181
new ServerDescription(this.description.hostAddress, event.reply, {
@@ -235,7 +231,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
235231
// a load balancer. It never transitions out of this state and
236232
// has no monitor.
237233
if (!this.loadBalanced) {
238-
this[kMonitor]?.connect();
234+
this.monitor?.connect();
239235
} else {
240236
stateTransition(this, STATE_CONNECTED);
241237
this.emit(Server.CONNECT, this);
@@ -258,7 +254,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
258254
stateTransition(this, STATE_CLOSING);
259255

260256
if (!this.loadBalanced) {
261-
this[kMonitor]?.close();
257+
this.monitor?.close();
262258
}
263259

264260
this.s.pool.close(options, err => {
@@ -276,7 +272,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
276272
*/
277273
requestCheck(): void {
278274
if (!this.loadBalanced) {
279-
this[kMonitor]?.requestCheck();
275+
this.monitor?.requestCheck();
280276
}
281277
}
282278

@@ -437,7 +433,7 @@ function markServerUnknown(server: Server, error?: MongoServerError) {
437433
}
438434

439435
if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
440-
server[kMonitor]?.reset();
436+
server.monitor?.reset();
441437
}
442438

443439
server.emit(
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import { expect } from 'chai';
2+
import * as semver from 'semver';
3+
import * as sinon from 'sinon';
4+
5+
import { type Connection, type MongoClient, type RTTPinger } from '../../../src';
6+
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
7+
import { sleep } from '../../tools/utils';
8+
9+
/**
10+
* RTTPingers are only created after getting a hello from the server that defines topologyVersion
11+
* Each monitor is reaching out to a different node and rttPinger's are created async as a result.
12+
*
13+
* This function checks for rttPingers and sleeps if none are found.
14+
*/
15+
async function getRTTPingers(client: MongoClient) {
16+
type RTTPingerConnection = Omit<RTTPinger, 'connection'> & { connection: Connection };
17+
const pingers = (rtt => rtt?.connection != null) as (r?: RTTPinger) => r is RTTPingerConnection;
18+
19+
if (!client.topology) expect.fail('Must provide a connected client');
20+
21+
// eslint-disable-next-line no-constant-condition
22+
while (true) {
23+
const servers = client.topology.s.servers.values();
24+
const rttPingers = Array.from(servers, s => s.monitor?.rttPinger).filter(pingers);
25+
26+
if (rttPingers.length !== 0) {
27+
return rttPingers;
28+
}
29+
30+
await sleep(5);
31+
}
32+
}
33+
34+
describe('class RTTPinger', () => {
35+
afterEach(() => sinon.restore());
36+
37+
beforeEach(async function () {
38+
if (!this.currentTest) return;
39+
if (this.configuration.isLoadBalanced) {
40+
this.currentTest.skipReason = 'No monitoring in LB mode, test not relevant';
41+
return this.skip();
42+
}
43+
if (semver.gte('4.4.0', this.configuration.version)) {
44+
this.currentTest.skipReason =
45+
'Test requires streaming monitoring, needs to be on MongoDB 4.4+';
46+
return this.skip();
47+
}
48+
});
49+
50+
context('when serverApi is enabled', () => {
51+
let serverApiClient: MongoClient;
52+
53+
beforeEach(async function () {
54+
if (!this.currentTest) return;
55+
56+
if (semver.gte('5.0.0', this.configuration.version)) {
57+
this.currentTest.skipReason = 'Test requires serverApi, needs to be on MongoDB 5.0+';
58+
return this.skip();
59+
}
60+
61+
serverApiClient = this.configuration.newClient(
62+
{},
63+
{ serverApi: { version: '1', strict: true }, heartbeatFrequencyMS: 10 }
64+
);
65+
});
66+
67+
afterEach(async () => {
68+
await serverApiClient?.close();
69+
});
70+
71+
it('measures rtt with a hello command', async function () {
72+
await serverApiClient.connect();
73+
const rttPingers = await getRTTPingers(serverApiClient);
74+
75+
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));
76+
77+
await sleep(11); // allow for another ping after spies have been made
78+
79+
expect(spies).to.have.lengthOf.at.least(1);
80+
for (const spy of spies) {
81+
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
82+
}
83+
});
84+
});
85+
86+
context('when serverApi is disabled', () => {
87+
let client: MongoClient;
88+
89+
beforeEach(async function () {
90+
if (!this.currentTest) return;
91+
if (this.configuration.serverApi) {
92+
this.currentTest.skipReason = 'Test requires serverApi to NOT be enabled';
93+
return this.skip();
94+
}
95+
96+
client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
97+
});
98+
99+
afterEach(async () => {
100+
await client?.close();
101+
});
102+
103+
context('connected to a pre-hello server', () => {
104+
it('measures rtt with a LEGACY_HELLO_COMMAND command', async function () {
105+
await client.connect();
106+
const rttPingers = await getRTTPingers(client);
107+
108+
// Fake pre-hello server.
109+
// Hello was back-ported to feature versions of the server so we would need to pin
110+
// versions prior to 4.4.2, 4.2.10, 4.0.21, and 3.6.21 to integration test
111+
for (const rtt of rttPingers) rtt.connection.helloOk = false;
112+
113+
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));
114+
115+
await sleep(11); // allow for another ping after spies have been made
116+
117+
expect(spies).to.have.lengthOf.at.least(1);
118+
for (const spy of spies) {
119+
expect(spy).to.have.been.calledWith(
120+
sinon.match.any,
121+
{ [LEGACY_HELLO_COMMAND]: 1 },
122+
sinon.match.any
123+
);
124+
}
125+
});
126+
});
127+
128+
context('connected to a helloOk server', () => {
129+
it('measures rtt with a hello command', async function () {
130+
await client.connect();
131+
const rttPingers = await getRTTPingers(client);
132+
133+
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));
134+
135+
// We should always be connected to helloOk servers
136+
for (const rtt of rttPingers) expect(rtt.connection).to.have.property('helloOk', true);
137+
138+
await sleep(11); // allow for another ping after spies have been made
139+
140+
expect(spies).to.have.lengthOf.at.least(1);
141+
for (const spy of spies) {
142+
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
143+
}
144+
});
145+
});
146+
});
147+
148+
context(`when the RTTPinger's hello command receives any error`, () => {
149+
let client: MongoClient;
150+
beforeEach(async function () {
151+
client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
152+
});
153+
154+
afterEach(async () => {
155+
await client?.close();
156+
});
157+
158+
it('destroys the connection with force=true', async function () {
159+
await client.connect();
160+
const rttPingers = await getRTTPingers(client);
161+
162+
for (const rtt of rttPingers) {
163+
sinon.stub(rtt.connection, 'command').yieldsRight(new Error('any error'));
164+
}
165+
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy'));
166+
167+
await sleep(11); // allow for another ping after spies have been made
168+
169+
expect(spies).to.have.lengthOf.at.least(1);
170+
for (const spy of spies) {
171+
expect(spy).to.have.been.calledWithExactly({ force: true });
172+
}
173+
});
174+
});
175+
});

0 commit comments

Comments
 (0)