diff --git a/test/integration/node-specific/client_close.test.ts b/test/integration/node-specific/client_close.test.ts index d9b7683cb02..c894f8d2614 100644 --- a/test/integration/node-specific/client_close.test.ts +++ b/test/integration/node-specific/client_close.test.ts @@ -1,23 +1,20 @@ -/* eslint-disable @typescript-eslint/no-empty-function */ +import * as events from 'node:events'; + +import { expect } from 'chai'; + import { getCSFLEKMSProviders } from '../../csfle-kms-providers'; -import { type TestConfiguration } from '../../tools/runner/config'; +import { type Collection, type FindCursor, type MongoClient } from '../../mongodb'; import { runScriptAndGetProcessInfo } from './resource_tracking_script_builder'; -describe.skip('MongoClient.close() Integration', () => { +describe('MongoClient.close() Integration', () => { // note: these tests are set-up in accordance of the resource ownership tree - let config: TestConfiguration; - - beforeEach(function () { - config = this.configuration; - }); - describe('Node.js resource: TLS File read', () => { describe('when client is connecting and reads an infinite TLS file', () => { - it('the file read is interrupted by client.close()', async function () { + it.skip('the file read is interrupted by client.close()', async function () { await runScriptAndGetProcessInfo( 'tls-file-read', - config, + this.configuration, async function run({ MongoClient, uri, expect }) { const infiniteFile = '/dev/zero'; const client = new MongoClient(uri, { tls: true, tlsCertificateKeyFile: infiniteFile }); @@ -50,10 +47,10 @@ describe.skip('MongoClient.close() Integration', () => { }); describe('when MongoClientAuthProviders is instantiated and token file read hangs', () => { - it('the file read is interrupted by client.close()', async () => { + it.skip('the file read is interrupted by client.close()', async function () { await runScriptAndGetProcessInfo( 'token-file-read', - config, + this.configuration, async function run({ MongoClient, uri, expect }) { const infiniteFile = '/dev/zero'; process.env.OIDC_TOKEN_FILE = infiniteFile; @@ -62,10 +59,11 @@ describe.skip('MongoClient.close() Integration', () => { authMechanism: 'MONGODB-OIDC' }; const client = new MongoClient(uri, options); - client.connect(); + const connectPromise = client.connect(); expect(process.getActiveResourcesInfo()).to.include('FSReqPromise'); await client.close(); expect(process.getActiveResourcesInfo()).to.not.include('FSReqPromise'); + await connectPromise; } ); }); @@ -78,30 +76,41 @@ describe.skip('MongoClient.close() Integration', () => { describe('after a Topology is created through client.connect()', () => { const metadata: MongoDBMetadataUI = { requires: { topology: 'replicaset' } }; - it('server selection timers are cleaned up by client.close()', metadata, async () => { - const run = async function ({ MongoClient, uri, expect, sleep, mongodb, getTimerCount }) { - const serverSelectionTimeoutMS = 2222; - const client = new MongoClient(uri, { - minPoolSize: 1, - serverSelectionTimeoutMS, - readPreference: new mongodb.ReadPreference('secondary', [ - { something: 'that does not exist' } - ]) - }); - const insertPromise = client.db('db').collection('collection').insertOne({ x: 1 }); + it.skip( + 'server selection timers are cleaned up by client.close()', + metadata, + async function () { + const run = async function ({ + MongoClient, + uri, + expect, + sleep, + mongodb, + getTimerCount + }) { + const serverSelectionTimeoutMS = 2222; + const client = new MongoClient(uri, { + minPoolSize: 1, + serverSelectionTimeoutMS, + readPreference: new mongodb.ReadPreference('secondary', [ + { something: 'that does not exist' } + ]) + }); + const insertPromise = client.db('db').collection('collection').insertOne({ x: 1 }); - // don't allow entire server selection timer to elapse to ensure close is called mid-timeout - await sleep(serverSelectionTimeoutMS / 2); + // don't allow entire server selection timer to elapse to ensure close is called mid-timeout + await sleep(serverSelectionTimeoutMS / 2); - expect(getTimerCount()).to.not.equal(0); - await client.close(); - expect(getTimerCount()).to.equal(0); + expect(getTimerCount()).to.not.equal(0); + await client.close(); + expect(getTimerCount()).to.equal(0); - const err = await insertPromise.catch(e => e); - expect(err).to.be.instanceOf(mongodb.MongoTopologyClosedError); - }; - await runScriptAndGetProcessInfo('timer-server-selection', config, run); - }); + const err = await insertPromise.catch(e => e); + expect(err).to.be.instanceOf(mongodb.MongoTopologyClosedError); + }; + await runScriptAndGetProcessInfo('timer-server-selection', this.configuration, run); + } + ); }); }); @@ -117,7 +126,7 @@ describe.skip('MongoClient.close() Integration', () => { describe('MonitorInterval', () => { describe('Node.js resource: Timer', () => { describe('after a new monitor is made', () => { - it( + it.skip( 'monitor interval timer is cleaned up by client.close()', metadata, async function () { @@ -144,23 +153,26 @@ describe.skip('MongoClient.close() Integration', () => { expect(getTimerCount()).to.equal(0); }; - await runScriptAndGetProcessInfo('timer-monitor-interval', config, run); + await runScriptAndGetProcessInfo( + 'timer-monitor-interval', + this.configuration, + run + ); } ); }); describe('after a heartbeat fails', () => { - it( + it.skip( 'the new monitor interval timer is cleaned up by client.close()', metadata, - async () => { + async function () { const run = async function ({ MongoClient, expect, getTimerCount, once }) { const heartbeatFrequencyMS = 2000; const client = new MongoClient('mongodb://fakeUri', { heartbeatFrequencyMS }); const willBeHeartbeatFailed = once(client, 'serverHeartbeatFailed'); - client.connect(); + const connectPromise = client.connect(); await willBeHeartbeatFailed; - function getMonitorTimer(servers) { for (const [, server] of servers) { return server?.monitor.monitorId.timerId; @@ -173,8 +185,14 @@ describe.skip('MongoClient.close() Integration', () => { expect(getMonitorTimer(servers)).to.not.exist; expect(getTimerCount()).to.equal(0); + + await connectPromise; }; - await runScriptAndGetProcessInfo('timer-heartbeat-failed-monitor', config, run); + await runScriptAndGetProcessInfo( + 'timer-heartbeat-failed-monitor', + this.configuration, + run + ); } ); }); @@ -183,7 +201,7 @@ describe.skip('MongoClient.close() Integration', () => { describe('Monitoring Connection', () => { describe('Node.js resource: Socket', () => { - it('no sockets remain after client.close()', metadata, async function () { + it.skip('no sockets remain after client.close()', metadata, async function () { const run = async function ({ MongoClient, uri, expect, getSocketEndpoints }) { const client = new MongoClient(uri); await client.connect(); @@ -203,7 +221,11 @@ describe.skip('MongoClient.close() Integration', () => { expect(getSocketEndpoints()).to.not.deep.include({ host, port }); } }; - await runScriptAndGetProcessInfo('socket-connection-monitoring', config, run); + await runScriptAndGetProcessInfo( + 'socket-connection-monitoring', + this.configuration, + run + ); }); }); }); @@ -211,7 +233,7 @@ describe.skip('MongoClient.close() Integration', () => { describe('RTT Pinger', () => { describe('Node.js resource: Timer', () => { describe('after entering monitor streaming mode ', () => { - it( + it.skip( 'the rtt pinger timer is cleaned up by client.close()', metadata, async function () { @@ -238,7 +260,7 @@ describe.skip('MongoClient.close() Integration', () => { expect(getTimerCount()).to.equal(0); }; - await runScriptAndGetProcessInfo('timer-rtt-monitor', config, run); + await runScriptAndGetProcessInfo('timer-rtt-monitor', this.configuration, run); } ); }); @@ -247,8 +269,8 @@ describe.skip('MongoClient.close() Integration', () => { describe('Connection', () => { describe('Node.js resource: Socket', () => { describe('when rtt monitoring is turned on', () => { - it('no sockets remain after client.close()', metadata, async () => { - const run = async ({ MongoClient, uri, expect, getSockets, once, log }) => { + it.skip('no sockets remain after client.close()', metadata, async function () { + const run = async ({ MongoClient, uri, expect, getSockets, once }) => { const heartbeatFrequencyMS = 500; const client = new MongoClient(uri, { serverMonitoringMode: 'stream', @@ -265,7 +287,6 @@ describe.skip('MongoClient.close() Integration', () => { while (heartbeatOccurredSet.size < servers.size) { const ev = await once(client, 'serverHeartbeatSucceeded'); - log({ ev: ev[0] }); heartbeatOccurredSet.add(ev[0].connectionId); } @@ -281,14 +302,16 @@ describe.skip('MongoClient.close() Integration', () => { // close the client await client.close(); - - log({ socketsAfterClose: getSockets() }); // upon close, assert rttPinger sockets are cleaned up const activeSocketsAfterClose = activeSocketsAfterHeartbeat(); expect(activeSocketsAfterClose).to.have.lengthOf(0); }; - await runScriptAndGetProcessInfo('socket-connection-rtt-monitoring', config, run); + await runScriptAndGetProcessInfo( + 'socket-connection-rtt-monitoring', + this.configuration, + run + ); }); }); }); @@ -299,7 +322,7 @@ describe.skip('MongoClient.close() Integration', () => { describe('ConnectionPool', () => { describe('Node.js resource: minPoolSize timer', () => { describe('after new connection pool is created', () => { - it('the minPoolSize timer is cleaned up by client.close()', async function () { + it.skip('the minPoolSize timer is cleaned up by client.close()', async function () { const run = async function ({ MongoClient, uri, expect, getTimerCount }) { const client = new MongoClient(uri, { minPoolSize: 1 }); let minPoolSizeTimerCreated = false; @@ -322,7 +345,7 @@ describe.skip('MongoClient.close() Integration', () => { expect(getMinPoolSizeTimer(servers)).to.not.exist; expect(getTimerCount()).to.equal(0); }; - await runScriptAndGetProcessInfo('timer-min-pool-size', config, run); + await runScriptAndGetProcessInfo('timer-min-pool-size', this.configuration, run); }); }); }); @@ -333,7 +356,6 @@ describe.skip('MongoClient.close() Integration', () => { const waitQueueTimeoutMS = 1515; beforeEach(async function () { - // configure failPoint utilClient = this.configuration.newClient(); await utilClient.connect(); const failPoint = { @@ -357,7 +379,7 @@ describe.skip('MongoClient.close() Integration', () => { await utilClient.close(); }); - it('the wait queue timer is cleaned up by client.close()', async function () { + it.skip('the wait queue timer is cleaned up by client.close()', async function () { const run = async function ({ MongoClient, uri, expect, getTimerCount, once }) { const waitQueueTimeoutMS = 1515; @@ -391,7 +413,7 @@ describe.skip('MongoClient.close() Integration', () => { 'Timed out while checking out a connection from connection pool' ); }; - await runScriptAndGetProcessInfo('timer-check-out', config, run); + await runScriptAndGetProcessInfo('timer-check-out', this.configuration, run); }); }); }); @@ -399,7 +421,7 @@ describe.skip('MongoClient.close() Integration', () => { describe('Connection', () => { describe('Node.js resource: Socket', () => { describe('after a minPoolSize has been set on the ConnectionPool', () => { - it('no sockets remain after client.close()', async function () { + it.skip('no sockets remain after client.close()', async function () { const run = async function ({ MongoClient, uri, expect, getSockets }) { // assert no sockets to start with expect(getSockets()).to.have.lengthOf(0); @@ -417,7 +439,7 @@ describe.skip('MongoClient.close() Integration', () => { expect(getSockets()).to.have.lengthOf(0); }; - await runScriptAndGetProcessInfo('socket-minPoolSize', config, run); + await runScriptAndGetProcessInfo('socket-minPoolSize', this.configuration, run); }); }); }); @@ -431,13 +453,17 @@ describe.skip('MongoClient.close() Integration', () => { const metadata: MongoDBMetadataUI = { requires: { topology: 'sharded' } }; describe('after SRVPoller is created', () => { - it('timers are cleaned up by client.close()', metadata, async () => { + it.skip('timers are cleaned up by client.close()', metadata, async function () { const run = async function ({ MongoClient, expect, getTimerCount }) { const SRV_CONNECTION_STRING = `mongodb+srv://test1.test.build.10gen.cc`; + // 27018 localhost.test.build.10gen.cc. // 27017 localhost.test.build.10gen.cc. - const client = new MongoClient(SRV_CONNECTION_STRING); + const client = new MongoClient(SRV_CONNECTION_STRING, { + serverSelectionTimeoutMS: 2000, // if something changes make this test fail faster than 30s (connect() will reject) + tls: false // srv automatically sets tls to true, so we have to set it to false here. + }); await client.connect(); // the current expected behavior is that _timeout is set to undefined until SRV polling starts // then _timeout is set to undefined again when SRV polling stops @@ -445,7 +471,7 @@ describe.skip('MongoClient.close() Integration', () => { await client.close(); expect(getTimerCount()).to.equal(0); }; - await runScriptAndGetProcessInfo('timer-srv-poller', config, run); + await runScriptAndGetProcessInfo('timer-srv-poller', this.configuration, run); }); }); }); @@ -453,29 +479,101 @@ describe.skip('MongoClient.close() Integration', () => { }); describe('ClientSession (Implicit)', () => { - describe('Server resource: LSID/ServerSession', () => { - describe('after a clientSession is implicitly created and used', () => { - it.skip('the server-side ServerSession is cleaned up by client.close()', async function () {}); - }); + let client: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient({}, { monitorCommands: true }); }); - describe('Server resource: Transactions', () => { - describe('after a clientSession is implicitly created and used', () => { - it.skip('the server-side transaction is cleaned up by client.close()', async function () {}); + afterEach(async function () { + await client.close(); + }); + + describe('when MongoClient.close is called', function () { + it('sends an endSessions command', async function () { + await client.db('a').collection('a').insertOne({ a: 1 }); + await client.db('a').collection('a').insertOne({ a: 1 }); + await client.db('a').collection('a').insertOne({ a: 1 }); + const endSessionsStarted = events.once(client, 'commandStarted'); + const willEndSessions = events.once(client, 'commandSucceeded'); + + await client.close(); + + const [startedEv] = await endSessionsStarted; + expect(startedEv).to.have.nested.property('command.endSessions').that.has.lengthOf(1); + + const [commandEv] = await willEndSessions; + expect(commandEv).to.have.property('commandName', 'endSessions'); }); }); }); describe('ClientSession (Explicit)', () => { + let idleSessionsBeforeClose; + let idleSessionsAfterClose; + let client; + let utilClient; + let session; + + const metadata: MongoDBMetadataUI = { + requires: { + topology: ['replicaset', 'sharded'], + mongodb: '>=4.2' + } + }; + + beforeEach(async function () { + client = this.configuration.newClient(); + utilClient = this.configuration.newClient(); + await client.connect(); + await client + .db('db') + .collection('collection') + .drop() + .catch(() => null); + const collection = await client.db('db').createCollection('collection'); + session = client.startSession(); + session.startTransaction(); + await collection.insertOne({ x: 1 }, { session }); + + const opBefore = await utilClient.db().admin().command({ currentOp: 1 }); + idleSessionsBeforeClose = opBefore.inprog.filter(s => s.type === 'idleSession'); + + await client.close(); + + const opAfter = await utilClient.db().admin().command({ currentOp: 1 }); + idleSessionsAfterClose = opAfter.inprog.filter(s => s.type === 'idleSession'); + }); + + afterEach(async function () { + await utilClient?.close(); + await session?.endSession(); + await client?.close(); + }); + describe('Server resource: LSID/ServerSession', () => { describe('after a clientSession is created and used', () => { - it.skip('the server-side ServerSession is cleaned up by client.close()', async function () {}); + it( + 'the server-side ServerSession is cleaned up by client.close()', + metadata, + async function () { + expect(idleSessionsBeforeClose).to.not.be.empty; + expect(idleSessionsAfterClose).to.be.empty; + } + ); }); }); describe('Server resource: Transactions', () => { describe('after a clientSession is created and used', () => { - it.skip('the server-side transaction is cleaned up by client.close()', async function () {}); + it( + 'the server-side transaction is cleaned up by client.close()', + metadata, + async function () { + expect(idleSessionsBeforeClose[0].transaction.txnNumber).to.not.null; + expect(idleSessionsAfterClose).to.be.empty; + } + ); }); }); }); @@ -491,10 +589,10 @@ describe.skip('MongoClient.close() Integration', () => { describe('KMS Request', () => { describe('Node.js resource: TLS file read', () => { describe('when KMSRequest reads an infinite TLS file', () => { - it('the file read is interrupted by client.close()', metadata, async () => { + it.skip('the file read is interrupted by client.close()', metadata, async function () { await runScriptAndGetProcessInfo( 'tls-file-read-auto-encryption', - config, + this.configuration, async function run({ MongoClient, uri, expect, mongodb }) { const infiniteFile = '/dev/zero'; @@ -578,14 +676,68 @@ describe.skip('MongoClient.close() Integration', () => { }); describe('Node.js resource: Socket', () => { - it.skip('no sockets remain after client.close()', metadata, async () => {}); + it.skip('no sockets remain after client.close()', metadata, async () => null); }); }); }); describe('Server resource: Cursor', () => { - describe('after cursors are created', () => { - it.skip('all active server-side cursors are closed by client.close()', async function () {}); + const metadata: MongoDBMetadataUI = { + requires: { + mongodb: '>=4.2.0' // MongoServerError: Unrecognized option 'idleCursors' in $currentOp stage. on 4.0 + } + }; + + describe('after cursors are created', metadata, () => { + let client: MongoClient; + let coll: Collection; + let cursor: FindCursor; + let utilClient: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient(); + utilClient = this.configuration.newClient(); + await client.connect(); + await client + .db('close_db') + .collection('close_coll') + .drop() + .catch(() => null); + coll = await client.db('close_db').createCollection('close_coll'); + await coll.insertMany([{ a: 1 }, { b: 2 }, { c: 3 }]); + }); + + afterEach(async function () { + await utilClient?.close(); + await client?.close(); + await cursor?.close(); + }); + + it( + 'all active server-side cursors are closed by client.close()', + metadata, + async function () { + const getCursors = async function () { + const cursors = await utilClient + .db('admin') + .aggregate([{ $currentOp: { idleCursors: true } }]) + .toArray(); + + return cursors.filter(c => c.ns === 'close_db.close_coll'); + }; + + cursor = coll.find({}, { batchSize: 1 }); + await cursor.next(); + + // assert creation + expect(await getCursors()).to.not.be.empty; + + await client.close(); + + // assert clean-up + expect(await getCursors()).to.be.empty; + } + ); }); }); }); diff --git a/test/integration/node-specific/resource_tracking_script_builder.ts b/test/integration/node-specific/resource_tracking_script_builder.ts index 066fb9fad1e..375613d6157 100644 --- a/test/integration/node-specific/resource_tracking_script_builder.ts +++ b/test/integration/node-specific/resource_tracking_script_builder.ts @@ -174,10 +174,16 @@ export async function runScriptAndGetProcessInfo( REPORT_RESOURCE_SCRIPT_PATH, func ); - await writeFile(scriptName, scriptContent, { encoding: 'utf8' }); - const logFile = name + '.logs.txt'; + const logFile = name + '.logs.txt'; const stdErrFile = 'err.out'; + + await unlink(scriptName).catch(() => null); + await unlink(logFile).catch(() => null); + await unlink(stdErrFile).catch(() => null); + + await writeFile(scriptName, scriptContent, { encoding: 'utf8' }); + const script = spawn(process.execPath, [scriptName], { stdio: ['ignore', 'ignore', openSync(stdErrFile, 'w')] }); @@ -185,16 +191,15 @@ export async function runScriptAndGetProcessInfo( const willClose = once(script, 'close'); // make sure the process ended - const [exitCode] = await willClose; + const [exitCode] = (await willClose) as [number]; // format messages from child process as an object const messages = (await readFile(logFile, 'utf-8')) .trim() .split('\n') - .map(line => JSON.parse(line)) - .reduce((acc, curr) => ({ ...acc, ...curr }), {}); + .map(line => JSON.parse(line)); - const stdErrSize = await readFile(stdErrFile, { encoding: 'utf8' }); + const stdErr = await readFile(stdErrFile, { encoding: 'utf8' }); // delete temporary files await unlink(scriptName); @@ -203,18 +208,24 @@ export async function runScriptAndGetProcessInfo( // assertions about exit status if (exitCode) { + const { error } = messages.find(m => m.error != null); + expect(error, 'test script exited with non-zero exit code but did not report an error.').to + .exist; const assertionError = new AssertionError( - messages.error?.message + '\n\t' + JSON.stringify(messages.error?.resources, undefined, 2) + error.message + '\n\t' + JSON.stringify(error.resources, undefined, 2) ); - assertionError.stack = messages.error?.stack + new Error().stack.slice('Error'.length); + assertionError.stack = error.stack + new Error().stack.slice('Error'.length); throw assertionError; } // assertions about resource status - expect(messages.beforeExitHappened).to.be.true; - expect(messages.newResources.libuvResources).to.be.empty; - expect(messages.newResources.activeResources).to.be.empty; + const { beforeExitHappened } = messages.find(m => 'beforeExitHappened' in m); + const { newResources } = messages.find(m => 'newResources' in m); + + expect(beforeExitHappened).to.be.true; + expect(newResources.libuvResources).to.be.empty; + expect(newResources.activeResources).to.be.empty; // assertion about error output - expect(stdErrSize).to.be.empty; + expect(stdErr).to.be.empty; } diff --git a/test/tools/fixtures/process_resource_script.in.js b/test/tools/fixtures/process_resource_script.in.js index c2886af9a77..31def062cb2 100644 --- a/test/tools/fixtures/process_resource_script.in.js +++ b/test/tools/fixtures/process_resource_script.in.js @@ -1,7 +1,7 @@ 'use strict'; /* eslint-disable no-undef */ -/* eslint-disable no-unused-vars */ + const driverPath = DRIVER_SOURCE_PATH; const func = FUNCTION_STRING; const scriptName = SCRIPT_NAME_STRING; @@ -48,7 +48,6 @@ function getNewLibuvResourceArray() { * @param {LibuvResource} resource */ function isNewLibuvResource(resource) { - const serverType = ['tcp', 'udp']; return ( !originalReportAddresses.includes(resource.address) && resource.is_referenced // if a resource is unreferenced, it's not keeping the event loop open ); @@ -128,10 +127,10 @@ async function main() { } main() - .then(() => {}) + .then(() => null) .catch(e => { log({ error: { message: e.message, stack: e.stack, resources: getNewResources() } }); - process.exit(1); + process.exit(2); }); setTimeout(() => {