Skip to content

Commit e3341b0

Browse files
committed
wip
1 parent 8458c6f commit e3341b0

File tree

24 files changed

+202
-127
lines changed

24 files changed

+202
-127
lines changed

.evergreen/run-tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,5 @@ export MONGODB_URI=${MONGODB_URI}
6565
export LOAD_BALANCER=${LOAD_BALANCER}
6666
export TEST_CSFLE=${TEST_CSFLE}
6767
export COMPRESSOR=${COMPRESSOR}
68+
export NODE_OPTIONS="${NODE_OPTIONS} --trace-uncaught"
6869
npm run "${TEST_NPM_SCRIPT}"

src/change_stream.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,8 @@ export class ChangeStream<
664664
this.isClosed = false;
665665
this.mode = false;
666666

667+
this.on('error', () => null);
668+
667669
// Listen for any `change` listeners being added to ChangeStream
668670
this.on('newListener', eventName => {
669671
if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) {

src/client-side-encryption/state_machine.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,8 @@ export class StateMachine {
351351
let socket: tls.TLSSocket;
352352

353353
function destroySockets() {
354-
for (const sock of [socket, netSocket]) {
355-
if (sock) {
356-
sock.removeAllListeners();
357-
sock.destroy();
358-
}
359-
}
354+
socket?.destroy();
355+
netSocket?.destroy();
360356
}
361357

362358
function onerror(cause: Error) {

src/cmap/connect.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ export async function makeSocket(
389389

390390
addAbortSignalToStream(closeSignal, socket);
391391

392+
socket.unref();
392393
socket.setKeepAlive(true, 300000);
393394
socket.setTimeout(connectTimeoutMS);
394395
socket.setNoDelay(noDelay);

src/cmap/connection.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,14 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
298298
);
299299
}
300300

301+
unref() {
302+
this.socket.unref();
303+
}
304+
305+
ref() {
306+
this.socket.ref();
307+
}
308+
301309
public markAvailable(): void {
302310
this.lastUseTime = now();
303311
}
@@ -353,7 +361,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
353361
return;
354362
}
355363

356-
this.socket.destroy();
364+
if (!this.socket.destroyed) this.socket.destroy();
357365
this.error = error;
358366

359367
this.dataEvents?.throw(error).then(undefined, squashError);

src/cmap/connection_pool.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,8 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
413413
if (!this.checkedOut.has(connection)) {
414414
return;
415415
}
416+
417+
connection.unref();
416418
const poolClosed = this.closed;
417419
const stale = this.connectionIsStale(connection);
418420
const willDestroy = !!(poolClosed || stale || connection.closed);
@@ -788,6 +790,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
788790
);
789791

790792
this.waitQueue.shift();
793+
connection.ref();
791794
waitQueueMember.resolve(connection);
792795
}
793796
}

src/mongo_client.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -691,14 +691,15 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
691691

692692
/* @internal */
693693
private async _close(force = false): Promise<void> {
694-
this.closeController.abort();
695-
// There's no way to set hasBeenClosed back to false
696-
Object.defineProperty(this.s, 'hasBeenClosed', {
697-
value: true,
698-
enumerable: true,
699-
configurable: false,
700-
writable: false
701-
});
694+
try {
695+
this.closeController.abort();
696+
// There's no way to set hasBeenClosed back to false
697+
Object.defineProperty(this.s, 'hasBeenClosed', {
698+
value: true,
699+
enumerable: true,
700+
configurable: false,
701+
writable: false
702+
});
702703

703704
const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
704705
this.s.activeCursors.clear();

src/sdam/monitor.ts

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -384,20 +384,13 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
384384
}
385385

386386
// connecting does an implicit `hello`
387-
(async () => {
387+
const makeMonitoringConnection = async () => {
388388
const socket = await makeSocket(monitor.connectOptions, monitor.closeSignal);
389389
const connection = makeConnection(monitor.connectOptions, socket);
390390
// The start time is after socket creation but before the handshake
391391
start = now();
392392
try {
393393
await performInitialHandshake(connection, monitor.connectOptions, monitor.closeSignal);
394-
return connection;
395-
} catch (error) {
396-
connection.destroy();
397-
throw error;
398-
}
399-
})().then(
400-
connection => {
401394
if (isInCloseState(monitor)) {
402395
connection.destroy();
403396
return;
@@ -417,15 +410,16 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
417410
useStreamingProtocol(monitor, connection.hello?.topologyVersion)
418411
)
419412
);
420-
421-
callback(undefined, connection.hello);
422-
},
423-
error => {
413+
return connection.hello;
414+
} catch (error) {
415+
connection.destroy();
424416
monitor.connection = null;
425417
awaited = false;
426-
onHeartbeatFailed(error);
418+
throw error;
427419
}
428-
);
420+
};
421+
422+
makeMonitoringConnection().then(callback.bind(undefined, undefined), onHeartbeatFailed);
429423
}
430424

431425
function monitorServer(monitor: Monitor) {

src/utils.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable no-console */
12
import * as crypto from 'crypto';
23
import type { SrvRecord } from 'dns';
34
import { type EventEmitter } from 'events';
@@ -1571,7 +1572,37 @@ export function addAbortSignalToStream(
15711572

15721573
const abortListener = addAbortListener(signal, function () {
15731574
stream.off('close', abortListener[kDispose]).off('error', abortListener[kDispose]);
1574-
stream.destroy(this.reason);
1575+
const removeAll = stream.removeAllListeners.bind(stream);
1576+
1577+
//@ts-expect-error: overwrite
1578+
stream.removeAllListeners = function (...args) {
1579+
const removingError = args.length === 0 || args.includes('error');
1580+
if (removingError) {
1581+
console.log('removing all', args, new Error('why are u removing my error listener'));
1582+
}
1583+
return removeAll(...args);
1584+
};
1585+
1586+
stream.on('removeListener', (name, listener) => {
1587+
if (name === 'error') {
1588+
console.log(
1589+
'who doth remove my error listener',
1590+
new Error('error listener gone missing!!'),
1591+
listener
1592+
);
1593+
}
1594+
});
1595+
1596+
const error = new Error(
1597+
//@ts-expect-error: we added these
1598+
`sad: ${stream.___socketId}: error listeners: ${stream.listenerCount('error')} + ${stream.___stack}`,
1599+
{ cause: this.reason }
1600+
);
1601+
1602+
//@ts-expect-error: adding this for debug
1603+
error.stream = stream;
1604+
1605+
stream.destroy(error);
15751606
});
15761607
// not nearly as complex as node's eos() but... do we need all that?? sobbing emoji.
15771608
stream.once('close', abortListener[kDispose]).once('error', abortListener[kDispose]);

test/integration/change-streams/change_stream.test.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
MongoChangeStreamError,
1919
type MongoClient,
2020
MongoServerError,
21+
promiseWithResolvers,
2122
ReadPreference,
2223
type ResumeToken
2324
} from '../../mongodb';
@@ -62,6 +63,7 @@ describe('Change Streams', function () {
6263
await csDb.createCollection('test').catch(() => null);
6364
collection = csDb.collection('test');
6465
changeStream = collection.watch();
66+
changeStream.once('error', error => this.error(error));
6567
});
6668

6769
afterEach(async () => {
@@ -695,10 +697,18 @@ describe('Change Streams', function () {
695697
async test() {
696698
await initIteratorMode(changeStream);
697699

700+
const { promise, resolve, reject } = promiseWithResolvers<void>();
701+
698702
const outStream = new PassThrough({ objectMode: true });
699703

700-
// @ts-expect-error: transform requires a Document return type
701-
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
704+
const csStream = changeStream
705+
// @ts-expect-error: transform requires a Document return type
706+
.stream({ transform: JSON.stringify });
707+
708+
csStream.once('error', reject).pipe(outStream).once('error', reject);
709+
710+
outStream.on('close', resolve);
711+
csStream.on('close', resolve);
702712

703713
const willBeData = once(outStream, 'data');
704714

@@ -709,6 +719,8 @@ describe('Change Streams', function () {
709719
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);
710720

711721
outStream.destroy();
722+
csStream.destroy();
723+
await promise;
712724
}
713725
});
714726

0 commit comments

Comments
 (0)