Skip to content

Commit 777d890

Browse files
await heartbeats part 1
1 parent 628d53f commit 777d890

File tree

3 files changed

+181
-14
lines changed

3 files changed

+181
-14
lines changed
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
'use strict';
2+
3+
/* eslint-disable no-undef */
4+
/* eslint-disable no-unused-vars */
5+
const driverPath = "/Users/aditi.khare/Desktop/node-mongodb-native/lib";
6+
const func = (async ({ MongoClient, uri, expect, getSockets }) => {
7+
const heartbeatFrequencyMS = 100;
8+
const client = new MongoClient(uri, {
9+
serverMonitoringMode: 'stream',
10+
heartbeatFrequencyMS
11+
});
12+
await client.connect();
13+
const socketsAddressesBeforeHeartbeat = getSockets().map(r => r.address);
14+
const activeSocketsAfterHeartbeat = () => getSockets()
15+
.filter(r => !socketsAddressesBeforeHeartbeat.includes(r.address))
16+
.map(r => r.remoteEndpoint?.host + ':' + r.remoteEndpoint?.port);
17+
// set of servers whose hearbeats have occurred
18+
const heartbeatOccurredSet = new Set();
19+
const servers = client.topology.s.servers;
20+
while (heartbeatOccurredSet.size < servers.size) {
21+
await client.once('serverHeartbeatSucceeded', async (ev) => heartbeatOccurredSet.add(ev.connectionId));
22+
}
23+
// all servers should have had a heartbeat event and had a new socket created for rtt pinger
24+
for (const [server,] of servers) {
25+
expect(activeSocketsAfterHeartbeat()).to.deep.contain(server[0]);
26+
}
27+
// close the client
28+
await client.close();
29+
// upon close, assert rttPinger sockets are cleaned up
30+
const activeSocketsAfterClose = activeSocketsAfterHeartbeat();
31+
expect(activeSocketsAfterClose).to.have.lengthOf(0);
32+
});
33+
const scriptName = "socket-connection-rtt-monitoring";
34+
const uri = "mongodb://bob:pwd123@localhost:31000/integration_tests?replicaSet=rs&authSource=admin";
35+
36+
const mongodb = require(driverPath);
37+
const { MongoClient } = mongodb;
38+
const process = require('node:process');
39+
const util = require('node:util');
40+
const fs = require('node:fs');
41+
const { expect } = require('chai');
42+
const timers = require('node:timers');
43+
const { setTimeout } = timers;
44+
45+
let originalReport;
46+
const logFile = scriptName + '.logs.txt';
47+
const sleep = util.promisify(setTimeout);
48+
49+
const run = func;
50+
51+
/**
52+
*
53+
* Returns an array containing the new libuv resources created after script started.
54+
* A new resource is something that will keep the event loop running.
55+
*
56+
* In order to be counted as a new resource, a resource MUST:
57+
* - Must NOT share an address with a libuv resource that existed at the start of script
58+
* - Must be referenced. See [here](https://nodejs.org/api/timers.html#timeoutref) for more context.
59+
*
60+
* We're using the following tool to track resources: `process.report.getReport().libuv`
61+
* For more context, see documentation for [process.report.getReport()](https://nodejs.org/api/report.html), and [libuv](https://docs.libuv.org/en/v1.x/handle.html).
62+
*
63+
*/
64+
function getNewLibuvResourceArray() {
65+
let currReport = process.report.getReport().libuv;
66+
const originalReportAddresses = originalReport.map(resource => resource.address);
67+
68+
/**
69+
* @typedef {Object} LibuvResource
70+
* @property {string} type What is the resource type? For example, 'tcp' | 'timer' | 'udp' | 'tty'... (See more in [docs](https://docs.libuv.org/en/v1.x/handle.html)).
71+
* @property {boolean} is_referenced Is the resource keeping the JS event loop active?
72+
*
73+
* @param {LibuvResource} resource
74+
*/
75+
function isNewLibuvResource(resource) {
76+
const serverType = ['tcp', 'udp'];
77+
return (
78+
!originalReportAddresses.includes(resource.address) && resource.is_referenced // if a resource is unreferenced, it's not keeping the event loop open
79+
);
80+
}
81+
82+
currReport = currReport.filter(resource => isNewLibuvResource(resource));
83+
return currReport;
84+
}
85+
86+
/**
87+
* Returns an object of the new resources created after script started.
88+
*
89+
*
90+
* In order to be counted as a new resource, a resource MUST either:
91+
* - Meet the criteria to be returned by our helper utility `getNewLibuvResourceArray()`
92+
* OR
93+
* - Be returned by `process.getActiveResourcesInfo() and is not 'TTYWrap'
94+
*
95+
* The reason we are using both methods to detect active resources is:
96+
* - `process.report.getReport().libuv` does not detect active requests (such as timers or file reads) accurately
97+
* - `process.getActiveResourcesInfo()` does not contain enough server information we need for our assertions
98+
*
99+
*/
100+
function getNewResources() {
101+
return {
102+
libuvResources: getNewLibuvResourceArray(),
103+
activeResources: process.getActiveResourcesInfo().filter(r => r !== 'TTYWrap')
104+
};
105+
}
106+
107+
/**
108+
* @returns Number of active timers in event loop
109+
*/
110+
const getTimerCount = () => process.getActiveResourcesInfo().filter(r => r === 'Timeout').length;
111+
112+
/**
113+
* @returns Array of socket resources in the event loop
114+
*/
115+
const getSockets = () => process.report.getReport().libuv.filter(r => r.type === 'tcp');
116+
117+
/**
118+
* @returns Array of remote endpoints of socket resources in the event loop
119+
* @example [{ host: 'localhost', port: 27020 }, { host: 'localhost', port: 27107 }]
120+
*/
121+
const getSocketEndpoints = () =>
122+
process.report
123+
.getReport()
124+
.libuv.filter(r => r.type === 'tcp')
125+
.map(r => r.remoteEndpoint);
126+
127+
// A log function for debugging
128+
function log(message) {
129+
// remove outer parentheses for easier parsing
130+
const messageToLog = JSON.stringify(message) + ' \n';
131+
fs.writeFileSync(logFile, messageToLog, { flag: 'a' });
132+
}
133+
134+
async function main() {
135+
originalReport = process.report.getReport().libuv;
136+
process.on('beforeExit', () => {
137+
log({ beforeExitHappened: true });
138+
});
139+
await run({ MongoClient, uri, log, expect, mongodb, sleep, getTimerCount, getSockets, getSocketEndpoints });
140+
log({ newResources: getNewResources() });
141+
}
142+
143+
main()
144+
.then(() => {})
145+
.catch(e => {
146+
log({ error: { message: e.message, stack: e.stack, resources: getNewResources() } });
147+
process.exit(1);
148+
});
149+
150+
setTimeout(() => {
151+
// this means something was in the event loop such that it hung for more than 10 seconds
152+
// so we kill the process
153+
log({
154+
error: {
155+
message: 'Process timed out: resources remain in the event loop',
156+
resources: getNewResources()
157+
}
158+
});
159+
process.exit(99);
160+
// using `unref` will ensure this setTimeout call is not a resource / does not keep the event loop running
161+
}, 10000).unref();

test/integration/node-specific/client_close.test.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import { type TestConfiguration } from '../../tools/runner/config';
33
import { runScriptAndGetProcessInfo } from './resource_tracking_script_builder';
44

5-
describe('MongoClient.close() Integration', () => {
5+
describe.only('MongoClient.close() Integration', () => {
66
// note: these tests are set-up in accordance of the resource ownership tree
77

88
let config: TestConfiguration;
@@ -120,10 +120,11 @@ describe('MongoClient.close() Integration', () => {
120120
'monitor interval timer is cleaned up by client.close()',
121121
metadata,
122122
async function () {
123-
const run = async function ({ MongoClient, uri, expect, getTimerCount }) {
123+
const run = async function ({ MongoClient, uri, expect, getTimerCount, mongodb }) {
124124
const heartbeatFrequencyMS = 2000;
125125
const client = new MongoClient(uri, { heartbeatFrequencyMS });
126-
const heartbeatPromise = client.once('serverHeartbeatSucceeded', v => v);
126+
const { heartbeatPromise, resolve } = mongodb.promiseWithResolvers();
127+
client.once('serverHeartbeatSucceeded', () => resolve());
127128
await client.connect();
128129
await heartbeatPromise;
129130

@@ -212,14 +213,16 @@ describe('MongoClient.close() Integration', () => {
212213
'the rtt pinger timer is cleaned up by client.close()',
213214
metadata,
214215
async function () {
215-
const run = async function ({ MongoClient, uri, expect, getTimerCount }) {
216+
const run = async function ({ MongoClient, uri, expect, getTimerCount, mongodb }) {
216217
const heartbeatFrequencyMS = 2000;
217218
const client = new MongoClient(uri, {
218219
serverMonitoringMode: 'stream',
219220
heartbeatFrequencyMS
220221
});
221222
await client.connect();
222-
await client.once('serverHeartbeatSucceeded', v => v);
223+
const { heartbeatPromise, resolve } = mongodb.promiseWithResolvers();
224+
client.once('serverHeartbeatSucceeded', () => resolve());
225+
await heartbeatPromise;
223226

224227
function getRttTimer(servers) {
225228
for (const [, server] of servers) {
@@ -245,7 +248,7 @@ describe('MongoClient.close() Integration', () => {
245248
describe('Node.js resource: Socket', () => {
246249
describe('when rtt monitoring is turned on', () => {
247250
it('no sockets remain after client.close()', metadata, async () => {
248-
const run = async ({ MongoClient, uri, expect, getSockets }) => {
251+
const run = async ({ MongoClient, uri, expect, getSockets, mongodb }) => {
249252
const heartbeatFrequencyMS = 100;
250253
const client = new MongoClient(uri, {
251254
serverMonitoringMode: 'stream',
@@ -263,15 +266,18 @@ describe('MongoClient.close() Integration', () => {
263266
// set of servers whose hearbeats have occurred
264267
const heartbeatOccurredSet = new Set();
265268

266-
while (heartbeatOccurredSet.size < client.topology.s.servers.batchSize) {
267-
await client.once('serverHeartbeatSucceeded', async ev =>
268-
heartbeatOccurredSet.add(ev.connectionId)
269-
);
269+
const servers = client.topology.s.servers;
270+
while (heartbeatOccurredSet.size < servers.size) {
271+
const { heartbeatPromise, resolve } = mongodb.promiseWithResolvers();
272+
client.once('serverHeartbeatSucceeded', (ev) => {
273+
heartbeatOccurredSet.add(ev.connectionId);
274+
resolve();
275+
});
276+
await heartbeatPromise;
270277
}
271278

272279
// all servers should have had a heartbeat event and had a new socket created for rtt pinger
273-
const servers = client.topology.s.servers;
274-
for (const server of servers) {
280+
for (const [server,] of servers) {
275281
expect(activeSocketsAfterHeartbeat()).to.deep.contain(server[0]);
276282
}
277283

@@ -428,7 +434,7 @@ describe('MongoClient.close() Integration', () => {
428434
const metadata: MongoDBMetadataUI = {
429435
requires: {
430436
predicate: () =>
431-
process.env.ATLAS_SRV_REPL ? 'Skipped: this test requires an SRV environment' : true
437+
process.env.ATLAS_SRV_REPL ? true : 'Skipped: this test requires an SRV environment'
432438
}
433439
};
434440

test/tools/fixtures/process_resource_script.in.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async function main() {
110110
process.on('beforeExit', () => {
111111
log({ beforeExitHappened: true });
112112
});
113-
await run({ MongoClient, uri, log, expect, mongodb, sleep, getTimerCount });
113+
await run({ MongoClient, uri, log, expect, mongodb, sleep, getTimerCount, getSockets, getSocketEndpoints });
114114
log({ newResources: getNewResources() });
115115
}
116116

0 commit comments

Comments
 (0)