From 81c6306550c67465b06c333337e662d48852389e Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 31 Jan 2025 11:54:43 -0500 Subject: [PATCH 1/3] test(NODE-6692): check that event emitters have error listeners --- .mocharc.json | 2 +- src/cmap/connection.ts | 2 + src/cmap/connection_pool.ts | 2 + src/cursor/abstract_cursor.ts | 2 + src/gridfs/index.ts | 3 +- src/mongo_client.ts | 2 + src/mongo_types.ts | 8 ++- src/sdam/monitor.ts | 2 + src/sdam/server.ts | 2 + src/sdam/srv_polling.ts | 3 +- src/sdam/topology.ts | 2 + src/sessions.ts | 2 + .../change-streams/change_stream.test.ts | 3 +- .../change_streams.prose.test.ts | 2 + test/integration/crud/crud_api.test.ts | 17 +++--- test/integration/crud/misc_cursors.test.js | 6 +-- .../examples/change_streams.test.js | 54 +++++++++++++------ test/integration/sessions/sessions.test.ts | 4 +- test/mocha_mongodb.json | 2 +- test/tools/cmap_spec_runner.ts | 1 + test/tools/reporter/mongodb_reporter.js | 5 +- test/tools/runner/ee_checker.ts | 27 ++++++++++ test/tools/runner/hooks/unhandled_checker.ts | 44 --------------- test/tools/unified-spec-runner/entities.ts | 2 + test/unit/sdam/srv_polling.test.ts | 4 ++ 25 files changed, 121 insertions(+), 82 deletions(-) create mode 100644 test/tools/runner/ee_checker.ts delete mode 100644 test/tools/runner/hooks/unhandled_checker.ts diff --git a/.mocharc.json b/.mocharc.json index add5e56532c..eae1c0b935e 100644 --- a/.mocharc.json +++ b/.mocharc.json @@ -4,7 +4,7 @@ "source-map-support/register", "ts-node/register", "test/tools/runner/chai_addons.ts", - "test/tools/runner/hooks/unhandled_checker.ts" + "test/tools/runner/ee_checker.ts" ], "extension": [ "js", diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 93284cc726c..b6d92f56e0c 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -46,6 +46,7 @@ import { HostAddress, maxWireVersion, type MongoDBNamespace, + noop, now, once, squashError, @@ -229,6 +230,7 @@ export class Connection extends TypedEventEmitter { constructor(stream: Stream, options: ConnectionOptions) { super(); + this.on('error', noop); this.socket = stream; this.id = options.id; diff --git a/src/cmap/connection_pool.ts b/src/cmap/connection_pool.ts index 63c4860259c..71f509481b1 100644 --- a/src/cmap/connection_pool.ts +++ b/src/cmap/connection_pool.ts @@ -34,6 +34,7 @@ import { kDispose, List, makeCounter, + noop, now, promiseWithResolvers } from '../utils'; @@ -200,6 +201,7 @@ export class ConnectionPool extends TypedEventEmitter { constructor(server: Server, options: ConnectionPoolOptions) { super(); + this.on('error', noop); this.options = Object.freeze({ connectionType: Connection, diff --git a/src/cursor/abstract_cursor.ts b/src/cursor/abstract_cursor.ts index 1758e80c246..cf9ea8293dd 100644 --- a/src/cursor/abstract_cursor.ts +++ b/src/cursor/abstract_cursor.ts @@ -27,6 +27,7 @@ import { type Disposable, kDispose, type MongoDBNamespace, + noop, squashError } from '../utils'; @@ -267,6 +268,7 @@ export abstract class AbstractCursor< options: AbstractCursorOptions & Abortable = {} ) { super(); + this.on('error', noop); if (!client.s.isMongoClient) { throw new MongoRuntimeError('Cursor must be constructed with MongoClient'); diff --git a/src/gridfs/index.ts b/src/gridfs/index.ts index 70f154431cf..f62267b0a99 100644 --- a/src/gridfs/index.ts +++ b/src/gridfs/index.ts @@ -7,7 +7,7 @@ import { type Filter, TypedEventEmitter } from '../mongo_types'; import type { ReadPreference } from '../read_preference'; import type { Sort } from '../sort'; import { CSOTTimeoutContext } from '../timeout'; -import { resolveOptions } from '../utils'; +import { noop, resolveOptions } from '../utils'; import { WriteConcern, type WriteConcernOptions } from '../write_concern'; import type { FindOptions } from './../operations/find'; import { @@ -87,6 +87,7 @@ export class GridFSBucket extends TypedEventEmitter { constructor(db: Db, options?: GridFSBucketOptions) { super(); + this.on('error', noop); this.setMaxListeners(0); const privateOptions = resolveOptions(db, { ...DEFAULT_GRIDFS_BUCKET_OPTIONS, diff --git a/src/mongo_client.ts b/src/mongo_client.ts index 579b98dea9b..78b514b6f5d 100644 --- a/src/mongo_client.ts +++ b/src/mongo_client.ts @@ -58,6 +58,7 @@ import { hostMatchesWildcards, isHostMatch, type MongoDBNamespace, + noop, ns, resolveOptions, squashError @@ -386,6 +387,7 @@ export class MongoClient extends TypedEventEmitter implements constructor(url: string, options?: MongoClientOptions) { super(); + this.on('error', noop); this.options = parseOptions(url, this, options); diff --git a/src/mongo_types.ts b/src/mongo_types.ts index 84ca67b6ed3..fda9909429b 100644 --- a/src/mongo_types.ts +++ b/src/mongo_types.ts @@ -24,6 +24,7 @@ import { type MongoLogger } from './mongo_logger'; import type { Sort } from './sort'; +import { noop } from './utils'; /** @internal */ export type TODO_NODE_3286 = any; @@ -472,7 +473,12 @@ export class TypedEventEmitter extends EventEm } /** @public */ -export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {} +export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> { + constructor(...args: any[]) { + super(...args); + this.on('error', noop); + } +} /** @public */ export type Abortable = { diff --git a/src/sdam/monitor.ts b/src/sdam/monitor.ts index 65fb0403791..326bdeeeccc 100644 --- a/src/sdam/monitor.ts +++ b/src/sdam/monitor.ts @@ -13,6 +13,7 @@ import { type Callback, type EventEmitterWithState, makeStateMachine, + noop, now, ns } from '../utils'; @@ -102,6 +103,7 @@ export class Monitor extends TypedEventEmitter { constructor(server: Server, options: MonitorOptions) { super(); + this.on('error', noop); this.server = server; this.connection = null; diff --git a/src/sdam/server.ts b/src/sdam/server.ts index e2a69e39e39..55a1765b24b 100644 --- a/src/sdam/server.ts +++ b/src/sdam/server.ts @@ -47,6 +47,7 @@ import { makeStateMachine, maxWireVersion, type MongoDBNamespace, + noop, supportsRetryableWrites } from '../utils'; import { throwIfWriteConcernError } from '../write_concern'; @@ -142,6 +143,7 @@ export class Server extends TypedEventEmitter { */ constructor(topology: Topology, description: ServerDescription, options: ServerOptions) { super(); + this.on('error', noop); this.serverApi = options.serverApi; diff --git a/src/sdam/srv_polling.ts b/src/sdam/srv_polling.ts index c95c386cfa7..583c82d1398 100644 --- a/src/sdam/srv_polling.ts +++ b/src/sdam/srv_polling.ts @@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers'; import { MongoRuntimeError } from '../error'; import { TypedEventEmitter } from '../mongo_types'; -import { checkParentDomainMatch, HostAddress, squashError } from '../utils'; +import { checkParentDomainMatch, HostAddress, noop, squashError } from '../utils'; /** * @internal @@ -49,6 +49,7 @@ export class SrvPoller extends TypedEventEmitter { constructor(options: SrvPollerOptions) { super(); + this.on('error', noop); if (!options || !options.srvHost) { throw new MongoRuntimeError('Options for SrvPoller must exist and include srvHost'); diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index 6f87e922710..a67f17dd9bb 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -44,6 +44,7 @@ import { kDispose, List, makeStateMachine, + noop, now, ns, promiseWithResolvers, @@ -248,6 +249,7 @@ export class Topology extends TypedEventEmitter { options: TopologyOptions ) { super(); + this.on('error', noop); this.client = client; // Options should only be undefined in tests, MongoClient will always have defined options diff --git a/src/sessions.ts b/src/sessions.ts index 33260532ef3..734ada4fbc0 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -43,6 +43,7 @@ import { isPromiseLike, List, maxWireVersion, + noop, now, squashError, uuidV4 @@ -161,6 +162,7 @@ export class ClientSession clientOptions: MongoOptions ) { super(); + this.on('error', noop); if (client == null) { // TODO(NODE-3483) diff --git a/test/integration/change-streams/change_stream.test.ts b/test/integration/change-streams/change_stream.test.ts index baabdcb3b23..a67b714ded2 100644 --- a/test/integration/change-streams/change_stream.test.ts +++ b/test/integration/change-streams/change_stream.test.ts @@ -736,6 +736,7 @@ describe('Change Streams', function () { // ChangeStream detects emitter usage via 'newListener' event // so this covers all emitter methods }); + changeStream.on('error', () => null); // one must listen for errors if they use EE mode. await once(changeStream.cursor, 'init'); expect(changeStream).to.have.property('mode', 'emitter'); @@ -971,7 +972,7 @@ describe('Change Streams', function () { { requires: { topology: '!single' } }, async function () { changeStream = collection.watch([]); - changeStream.on('change', sinon.stub()); + changeStream.on('change', sinon.stub()).on('error', () => null); try { // eslint-disable-next-line @typescript-eslint/no-unused-vars diff --git a/test/integration/change-streams/change_streams.prose.test.ts b/test/integration/change-streams/change_streams.prose.test.ts index 60492b40d31..cd2c424072a 100644 --- a/test/integration/change-streams/change_streams.prose.test.ts +++ b/test/integration/change-streams/change_streams.prose.test.ts @@ -858,6 +858,7 @@ describe('Change Stream prose tests', function () { expect(err).to.not.exist; coll = client.db('integration_tests').collection('setupAfterTest'); const changeStream = coll.watch(); + changeStream.on('error', done); waitForStarted(changeStream, () => { coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }, err => { expect(err).to.not.exist; @@ -932,6 +933,7 @@ describe('Change Stream prose tests', function () { let events = []; client.on('commandStarted', e => recordEvent(events, e)); const changeStream = coll.watch([], { startAfter }); + changeStream.on('error', done); this.defer(() => changeStream.close()); changeStream.on('change', change => { diff --git a/test/integration/crud/crud_api.test.ts b/test/integration/crud/crud_api.test.ts index 94610462a26..ac60b2901a6 100644 --- a/test/integration/crud/crud_api.test.ts +++ b/test/integration/crud/crud_api.test.ts @@ -1,7 +1,7 @@ import { expect } from 'chai'; -import { on } from 'events'; import * as semver from 'semver'; import * as sinon from 'sinon'; +import { finished } from 'stream/promises'; import { Collection, @@ -238,7 +238,7 @@ describe('CRUD API', function () { }); context('when creating a cursor with find', () => { - let collection; + let collection: Collection; beforeEach(async () => { collection = client.db().collection('t'); @@ -307,13 +307,14 @@ describe('CRUD API', function () { describe('#stream()', () => { it('creates a node stream that emits data events', async () => { - const count = 0; - const cursor = makeCursor(); - const stream = cursor.stream(); - on(stream, 'data'); - cursor.once('close', function () { - expect(count).to.equal(2); + let count = 0; + const stream = makeCursor().stream(); + const willFinish = finished(stream, { cleanup: true }); + stream.on('data', () => { + count++; }); + await willFinish; + expect(count).to.equal(2); }); }); diff --git a/test/integration/crud/misc_cursors.test.js b/test/integration/crud/misc_cursors.test.js index efe873c2b76..2eca8a008a5 100644 --- a/test/integration/crud/misc_cursors.test.js +++ b/test/integration/crud/misc_cursors.test.js @@ -1993,15 +1993,15 @@ describe('Cursor', function () { expect(res).property('insertedId').to.exist; }, 300); - const start = new Date(); + const start = performance.now(); const doc1 = await cursor.next(); expect(doc1).to.have.property('b', 2); - const end = new Date(); + const end = performance.now(); await later; // make sure this finished, without a failure // We should see here that cursor.next blocked for at least 300ms - expect(end.getTime() - start.getTime()).to.be.at.least(300); + expect(end - start).to.be.at.least(290); } } ); diff --git a/test/integration/node-specific/examples/change_streams.test.js b/test/integration/node-specific/examples/change_streams.test.js index 9f9dad72fec..5285da5cf14 100644 --- a/test/integration/node-specific/examples/change_streams.test.js +++ b/test/integration/node-specific/examples/change_streams.test.js @@ -66,9 +66,13 @@ maybeDescribe('examples(change-stream):', function () { // Start Changestream Example 1 const collection = db.collection('inventory'); const changeStream = collection.watch(); - changeStream.on('change', next => { - // process next document - }); + changeStream + .on('change', next => { + // process next document + }) + .once('error', () => { + // handle error + }); // End Changestream Example 1 const changeStreamIterator = collection.watch(); @@ -113,9 +117,13 @@ maybeDescribe('examples(change-stream):', function () { // Start Changestream Example 2 const collection = db.collection('inventory'); const changeStream = collection.watch([], { fullDocument: 'updateLookup' }); - changeStream.on('change', next => { - // process next document - }); + changeStream + .on('change', next => { + // process next document + }) + .once('error', error => { + // handle error + }); // End Changestream Example 2 // Start Changestream Example 2 Alternative @@ -151,15 +159,23 @@ maybeDescribe('examples(change-stream):', function () { const changeStream = collection.watch(); let newChangeStream; - changeStream.once('change', next => { - const resumeToken = changeStream.resumeToken; - changeStream.close(); - - newChangeStream = collection.watch([], { resumeAfter: resumeToken }); - newChangeStream.on('change', next => { - processChange(next); + changeStream + .once('change', next => { + const resumeToken = changeStream.resumeToken; + changeStream.close(); + + newChangeStream = collection.watch([], { resumeAfter: resumeToken }); + newChangeStream + .on('change', next => { + processChange(next); + }) + .once('error', error => { + // handle error + }); + }) + .once('error', error => { + // handle error }); - }); // End Changestream Example 3 // Start Changestream Example 3 Alternative @@ -200,9 +216,13 @@ maybeDescribe('examples(change-stream):', function () { const collection = db.collection('inventory'); const changeStream = collection.watch(pipeline); - changeStream.on('change', next => { - // process next document - }); + changeStream + .on('change', next => { + // process next document + }) + .once('error', error => { + // handle error + }); // End Changestream Example 4 // Start Changestream Example 4 Alternative diff --git a/test/integration/sessions/sessions.test.ts b/test/integration/sessions/sessions.test.ts index ef734481cbc..ccedd595868 100644 --- a/test/integration/sessions/sessions.test.ts +++ b/test/integration/sessions/sessions.test.ts @@ -430,13 +430,15 @@ describe('Sessions Spec', function () { }); }); - context('when using a LegacyMongoClient', () => { + // TODO(NODE-XXXX): LegacyMongoClient uses a released version of the driver so it won't be fixed until the error listeners are published + context.skip('when using a LegacyMongoClient', () => { let legacyClient; beforeEach(async function () { const options = this.configuration.serverApi ? { serverApi: this.configuration.serverApi } : {}; legacyClient = new LegacyMongoClient(this.configuration.url(), options); + legacyClient.on('error', () => null); }); afterEach(async function () { diff --git a/test/mocha_mongodb.json b/test/mocha_mongodb.json index ba1a054f393..bab33041991 100644 --- a/test/mocha_mongodb.json +++ b/test/mocha_mongodb.json @@ -3,9 +3,9 @@ "require": [ "source-map-support/register", "ts-node/register", + "test/tools/runner/ee_checker.ts", "test/tools/runner/chai_addons.ts", "test/tools/runner/hooks/configuration.ts", - "test/tools/runner/hooks/unhandled_checker.ts", "test/tools/runner/hooks/leak_checker.ts", "test/tools/runner/hooks/legacy_crud_shims.ts" ], diff --git a/test/tools/cmap_spec_runner.ts b/test/tools/cmap_spec_runner.ts index a5350e176e0..ee00f1a0c56 100644 --- a/test/tools/cmap_spec_runner.ts +++ b/test/tools/cmap_spec_runner.ts @@ -295,6 +295,7 @@ export class ThreadContext { poolOptions: Partial = {}, contextOptions: { injectPoolStats: boolean } ) { + this.poolEventsEventEmitter.on('error', () => null); this.#poolOptions = poolOptions; this.#hostAddress = hostAddress; this.#server = server; diff --git a/test/tools/reporter/mongodb_reporter.js b/test/tools/reporter/mongodb_reporter.js index 2866fc1f394..7849faf1914 100644 --- a/test/tools/reporter/mongodb_reporter.js +++ b/test/tools/reporter/mongodb_reporter.js @@ -103,7 +103,7 @@ class MongoDBMochaReporter extends mocha.reporters.Spec { catchErr(test => this.testEnd(test)) ); - process.prependListener('SIGINT', () => this.end(true)); + process.prependOnceListener('SIGINT', () => this.end(true)); } start() {} @@ -135,7 +135,7 @@ class MongoDBMochaReporter extends mocha.reporters.Spec { let endTime = test.endTime; endTime = endTime ? endTime.toISOString() : 0; - let error = test.error; + let error = test.err; let failure = error ? { type: error.constructor.name, @@ -250,7 +250,6 @@ class MongoDBMochaReporter extends mocha.reporters.Spec { */ fail(test, error) { if (REPORT_TO_STDIO) console.log(chalk.red(`тип ${test.fullTitle()} -- ${error.message}`)); - test.error = error; } /** diff --git a/test/tools/runner/ee_checker.ts b/test/tools/runner/ee_checker.ts new file mode 100644 index 00000000000..d087a0abfb8 --- /dev/null +++ b/test/tools/runner/ee_checker.ts @@ -0,0 +1,27 @@ +// eslint-disable-next-line @typescript-eslint/no-require-imports +const events = require('events'); + +const EventEmitter = events.EventEmitter; + +events.EventEmitter = class RequireErrorListenerEventEmitter extends EventEmitter { + constructor(...args) { + super(...args); + const ctorCallSite = new Error('EventEmitter must add an error listener synchronously'); + ctorCallSite.stack; + process.nextTick(() => { + const isChangeStream = this.constructor.name + .toLowerCase() + .includes('ChangeStream'.toLowerCase()); + + if (isChangeStream) { + // TODO(NODE-6699): Include checking change streams when the API requirements for error listeners has been clarified + // Comment out the return to check for ChangeStreams in the tests that may be missing error listeners + return; + } + + if (this.listenerCount('error') === 0) { + throw ctorCallSite; + } + }); + } +}; diff --git a/test/tools/runner/hooks/unhandled_checker.ts b/test/tools/runner/hooks/unhandled_checker.ts deleted file mode 100644 index 079b749a463..00000000000 --- a/test/tools/runner/hooks/unhandled_checker.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { expect } from 'chai'; - -const unhandled: { - rejections: Error[]; - exceptions: Error[]; - unknown: unknown[]; -} = { - rejections: [], - exceptions: [], - unknown: [] -}; - -const uncaughtExceptionListener: NodeJS.UncaughtExceptionListener = (error, origin) => { - if (origin === 'uncaughtException') { - unhandled.exceptions.push(error); - } else if (origin === 'unhandledRejection') { - unhandled.rejections.push(error); - } else { - unhandled.unknown.push(error); - } -}; - -function beforeEachUnhandled() { - unhandled.rejections = []; - unhandled.exceptions = []; - unhandled.unknown = []; - process.addListener('uncaughtExceptionMonitor', uncaughtExceptionListener); -} - -function afterEachUnhandled() { - process.removeListener('uncaughtExceptionMonitor', uncaughtExceptionListener); - try { - expect(unhandled).property('rejections').to.have.lengthOf(0); - expect(unhandled).property('exceptions').to.have.lengthOf(0); - expect(unhandled).property('unknown').to.have.lengthOf(0); - } catch (error) { - this.test.error(error); - } - unhandled.rejections = []; - unhandled.exceptions = []; - unhandled.unknown = []; -} - -export const mochaHooks = { beforeEach: beforeEachUnhandled, afterEach: afterEachUnhandled }; diff --git a/test/tools/unified-spec-runner/entities.ts b/test/tools/unified-spec-runner/entities.ts index 04a6c6bc69c..6424061e03b 100644 --- a/test/tools/unified-spec-runner/entities.ts +++ b/test/tools/unified-spec-runner/entities.ts @@ -231,6 +231,8 @@ export class UnifiedMongoClient extends MongoClient { // TODO(NODE-5785): We need to increase the truncation length because signature.hash is a Buffer making hellos too long mongodbLogMaxDocumentLength: 1250 } as any); + + this.observedEventEmitter.on('error', () => null); this.logCollector = logCollector; this.observeSensitiveCommands = description.observeSensitiveCommands ?? false; diff --git a/test/unit/sdam/srv_polling.test.ts b/test/unit/sdam/srv_polling.test.ts index c719d8fb07d..908c5bf8013 100644 --- a/test/unit/sdam/srv_polling.test.ts +++ b/test/unit/sdam/srv_polling.test.ts @@ -188,6 +188,10 @@ describe('Mongos SRV Polling', function () { describe('topology', function () { class FakeSrvPoller extends EventEmitter { + constructor() { + super(); + this.on('error', () => null); + } start() { // ignore } From 0b41b6dfde431f938528ddca81e8288e7b37dbfc Mon Sep 17 00:00:00 2001 From: Neal Beeken Date: Fri, 31 Jan 2025 16:30:38 -0500 Subject: [PATCH 2/3] chore: a few more error listeners caught --- .eslintrc.json | 4 +++ src/change_stream.ts | 10 +++----- src/client-side-encryption/state_machine.ts | 28 ++++++++------------- src/cmap/connect.ts | 1 - 4 files changed, 18 insertions(+), 25 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index d9a6d9a9202..adf89319dca 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -119,6 +119,10 @@ { "selector": "BinaryExpression[operator=/[=!]==?/] Literal[value='undefined']", "message": "Do not strictly check typeof undefined (NOTE: currently this rule only detects the usage of 'undefined' string literal so this could be a misfire)" + }, + { + "selector": "CallExpression[callee.property.name='removeAllListeners'][arguments.length=0]", + "message": "removeAllListeners can remove error listeners leading to uncaught errors" } ], "@typescript-eslint/no-unused-vars": "error", diff --git a/src/change_stream.ts b/src/change_stream.ts index b7e45c70efc..1526ca3c756 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -951,12 +951,10 @@ export class ChangeStream< /** @internal */ private _endStream(): void { - const cursorStream = this.cursorStream; - if (cursorStream) { - ['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event)); - cursorStream.destroy(); - } - + this.cursorStream?.removeAllListeners('data'); + this.cursorStream?.removeAllListeners('close'); + this.cursorStream?.removeAllListeners('end'); + this.cursorStream?.destroy(); this.cursorStream = undefined; } diff --git a/src/client-side-encryption/state_machine.ts b/src/client-side-encryption/state_machine.ts index 07dad3c578a..ae49fcb25f4 100644 --- a/src/client-side-encryption/state_machine.ts +++ b/src/client-side-encryption/state_machine.ts @@ -338,17 +338,7 @@ export class StateMachine { const message = request.message; const buffer = new BufferPool(); - const netSocket: net.Socket = new net.Socket(); - let socket: tls.TLSSocket; - - function destroySockets() { - for (const sock of [socket, netSocket]) { - if (sock) { - sock.removeAllListeners(); - sock.destroy(); - } - } - } + const socket = new net.Socket(); function onerror(cause: Error) { return new MongoCryptError('KMS request failed', { cause }); @@ -380,7 +370,8 @@ export class StateMachine { reject: rejectOnNetSocketError, resolve: resolveOnNetSocketConnect } = promiseWithResolvers(); - netSocket + + socket .once('error', err => rejectOnNetSocketError(onerror(err))) .once('close', () => rejectOnNetSocketError(onclose())) .once('connect', () => resolveOnNetSocketConnect()); @@ -394,14 +385,14 @@ export class StateMachine { host: this.options.proxyOptions.proxyHost, port: this.options.proxyOptions.proxyPort || 1080 }; - netSocket.connect(netSocketOptions); + socketOptions.socket = socket.connect(netSocketOptions); await willConnect; try { socks ??= loadSocks(); socketOptions.socket = ( await socks.SocksClient.createConnection({ - existing_socket: netSocket, + existing_socket: socket, command: 'connect', destination: { host: socketOptions.host, port: socketOptions.port }, proxy: { @@ -419,7 +410,7 @@ export class StateMachine { } } - socket = tls.connect(socketOptions, () => { + tls.connect(socketOptions, () => { socket.write(message); }); @@ -430,7 +421,7 @@ export class StateMachine { } = promiseWithResolvers(); abortListener = addAbortListener(options?.signal, function () { - destroySockets(); + socket.destroy(); rejectOnTlsSocketError(this.reason); }); @@ -455,12 +446,13 @@ export class StateMachine { ]) : willResolveKmsRequest); } catch (error) { - if (error instanceof TimeoutError) + if (TimeoutError.is(error)) { throw new MongoOperationTimeoutError('KMS request timed out'); + } throw error; } finally { // There's no need for any more activity on this socket at this point. - destroySockets(); + socket.destroy(); abortListener?.[kDispose](); } } diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index 9efe2461070..394b70689ca 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -427,7 +427,6 @@ export async function makeSocket(options: MakeConnectionOptions): Promise Date: Fri, 31 Jan 2025 17:06:34 -0500 Subject: [PATCH 3/3] chore: revert socket dupe clean up --- src/client-side-encryption/state_machine.ts | 27 +++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/client-side-encryption/state_machine.ts b/src/client-side-encryption/state_machine.ts index ae49fcb25f4..096c4cfc635 100644 --- a/src/client-side-encryption/state_machine.ts +++ b/src/client-side-encryption/state_machine.ts @@ -338,7 +338,16 @@ export class StateMachine { const message = request.message; const buffer = new BufferPool(); - const socket = new net.Socket(); + const netSocket: net.Socket = new net.Socket(); + let socket: tls.TLSSocket; + + function destroySockets() { + for (const sock of [socket, netSocket]) { + if (sock) { + sock.destroy(); + } + } + } function onerror(cause: Error) { return new MongoCryptError('KMS request failed', { cause }); @@ -370,8 +379,7 @@ export class StateMachine { reject: rejectOnNetSocketError, resolve: resolveOnNetSocketConnect } = promiseWithResolvers(); - - socket + netSocket .once('error', err => rejectOnNetSocketError(onerror(err))) .once('close', () => rejectOnNetSocketError(onclose())) .once('connect', () => resolveOnNetSocketConnect()); @@ -385,14 +393,14 @@ export class StateMachine { host: this.options.proxyOptions.proxyHost, port: this.options.proxyOptions.proxyPort || 1080 }; - socketOptions.socket = socket.connect(netSocketOptions); + netSocket.connect(netSocketOptions); await willConnect; try { socks ??= loadSocks(); socketOptions.socket = ( await socks.SocksClient.createConnection({ - existing_socket: socket, + existing_socket: netSocket, command: 'connect', destination: { host: socketOptions.host, port: socketOptions.port }, proxy: { @@ -410,7 +418,7 @@ export class StateMachine { } } - tls.connect(socketOptions, () => { + socket = tls.connect(socketOptions, () => { socket.write(message); }); @@ -421,7 +429,7 @@ export class StateMachine { } = promiseWithResolvers(); abortListener = addAbortListener(options?.signal, function () { - socket.destroy(); + destroySockets(); rejectOnTlsSocketError(this.reason); }); @@ -446,13 +454,12 @@ export class StateMachine { ]) : willResolveKmsRequest); } catch (error) { - if (TimeoutError.is(error)) { + if (error instanceof TimeoutError) throw new MongoOperationTimeoutError('KMS request timed out'); - } throw error; } finally { // There's no need for any more activity on this socket at this point. - socket.destroy(); + destroySockets(); abortListener?.[kDispose](); } }