Skip to content

Commit 8097c3e

Browse files
committed
wip
1 parent fe50fe1 commit 8097c3e

File tree

14 files changed

+61
-33
lines changed

14 files changed

+61
-33
lines changed

.eslintrc.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@
127127
{
128128
"selector": "CallExpression[callee.name='clearTimeout']",
129129
"message": "clearTimeout must remove abort listener"
130+
},
131+
{
132+
"selector": "CallExpression[callee.property.name='removeAllListeners'][arguments.length=0]",
133+
"message": "removeAllListeners can remove error listeners leading to uncaught errors"
130134
}
131135
],
132136
"@typescript-eslint/no-unused-vars": "error",

src/change_stream.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -956,7 +956,9 @@ export class ChangeStream<
956956
private _endStream(): void {
957957
const cursorStream = this.cursorStream;
958958
if (cursorStream) {
959-
['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
959+
cursorStream.removeAllListeners('data');
960+
cursorStream.removeAllListeners('close');
961+
cursorStream.removeAllListeners('end');
960962
cursorStream.destroy();
961963
}
962964

src/cmap/connect.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,6 @@ export async function makeSocket(
441441
throw error;
442442
} finally {
443443
socket.setTimeout(0);
444-
socket.removeAllListeners();
445444
if (cancellationHandler != null) {
446445
options.cancellationToken?.removeListener('cancel', cancellationHandler);
447446
}

src/cmap/connection_pool.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
374374

375375
try {
376376
timeout?.throwIfExpired();
377+
timeout?.ref();
377378
return await (timeout ? Promise.race([promise, timeout]) : promise);
378379
} catch (error) {
379380
if (TimeoutError.is(error)) {
@@ -399,6 +400,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
399400
}
400401
throw error;
401402
} finally {
403+
timeout?.unref();
402404
abortListener?.[kDispose]();
403405
timeout?.clear();
404406
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ export {
5353
MongoClientBulkWriteCursorError,
5454
MongoClientBulkWriteError,
5555
MongoClientBulkWriteExecutionError,
56+
MongoClientClosedError,
5657
MongoCompatibilityError,
5758
MongoCursorExhaustedError,
5859
MongoCursorInUseError,

src/mongo_client.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import { MONGO_CLIENT_EVENTS } from './constants';
2222
import { type AbstractCursor } from './cursor/abstract_cursor';
2323
import { Db, type DbOptions } from './db';
2424
import type { Encrypter } from './encrypter';
25-
import { MongoInvalidArgumentError } from './error';
25+
import { MongoClientClosedError, MongoInvalidArgumentError } from './error';
2626
import { MongoClientAuthProviders } from './mongo_client_auth_providers';
2727
import {
2828
type LogComponentSeveritiesClientOptions,
@@ -692,7 +692,6 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
692692
/* @internal */
693693
private async _close(force = false): Promise<void> {
694694
try {
695-
this.closeController.abort();
696695
// There's no way to set hasBeenClosed back to false
697696
Object.defineProperty(this.s, 'hasBeenClosed', {
698697
value: true,
@@ -701,6 +700,12 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
701700
writable: false
702701
});
703702

703+
if (this.options.maxPoolSize === 1) {
704+
// If maxPoolSize is 1 we won't be able to run anything
705+
// unless we interrupt whatever is using the one connection.
706+
this.closeController.abort(new MongoClientClosedError());
707+
}
708+
704709
const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
705710
this.s.activeCursors.clear();
706711

@@ -749,7 +754,9 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
749754
await encrypter.close(this, force);
750755
}
751756
} finally {
752-
// ignore
757+
if (!this.closeController.signal.aborted) {
758+
this.closeController.abort(new MongoClientClosedError());
759+
}
753760
}
754761
}
755762

src/sdam/monitor.ts

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,7 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
387387
const makeMonitoringConnection = async () => {
388388
const socket = await makeSocket(monitor.connectOptions, monitor.closeSignal);
389389
const connection = makeConnection(monitor.connectOptions, socket);
390+
connection.unref();
390391
// The start time is after socket creation but before the handshake
391392
start = now();
392393
try {
@@ -447,15 +448,11 @@ function monitorServer(monitor: Monitor) {
447448

448449
// if the check indicates streaming is supported, immediately reschedule monitoring
449450
if (useStreamingProtocol(monitor, hello?.topologyVersion)) {
450-
clearOnAbortTimeout(
451-
() => {
452-
if (!isInCloseState(monitor)) {
453-
monitor.monitorId?.wake();
454-
}
455-
},
456-
0,
457-
monitor.closeSignal
458-
);
451+
queueMicrotask(() => {
452+
if (!isInCloseState(monitor)) {
453+
monitor.monitorId?.wake();
454+
}
455+
});
459456
}
460457

461458
done();
@@ -554,6 +551,7 @@ export class RTTPinger {
554551
if (connection == null) {
555552
connect(this.monitor.connectOptions, this.closeSignal).then(
556553
connection => {
554+
connection.unref();
557555
this.measureAndReschedule(start, connection);
558556
},
559557
() => {

src/sdam/topology.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
621621

622622
try {
623623
timeout?.throwIfExpired();
624+
timeout?.ref();
624625
const server = await (timeout ? Promise.race([serverPromise, timeout]) : serverPromise);
625626
if (options.timeoutContext?.csotEnabled() && server.description.minRoundTripTime !== 0) {
626627
options.timeoutContext.minRoundTripTime = server.description.minRoundTripTime;
@@ -661,6 +662,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
661662
// Other server selection error
662663
throw error;
663664
} finally {
665+
timeout?.unref();
664666
abortListener?.[kDispose]();
665667
if (options.timeoutContext?.clearServerSelectionTimeout) timeout?.clear();
666668
}

src/timeout.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export function clearOnAbortTimeout(
3939
}, ms);
4040

4141
if ('unref' in id && typeof id.unref === 'function') {
42-
id.unref();
42+
// id.unref();
4343
}
4444

4545
const abortListener = addAbortListener(closeSignal, function clearId() {
@@ -116,7 +116,9 @@ export class Timeout extends Promise<never> {
116116
this.start = Math.trunc(performance.now());
117117

118118
if (rejection == null && this.duration > 0) {
119-
if (options.closeSignal == null) throw new Error('incorrect timer use detected!');
119+
if (options.closeSignal == null) {
120+
throw new Error('You must provide a close signal to timeoutContext');
121+
}
120122

121123
this.id = clearOnAbortTimeout(
122124
() => {

src/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1571,7 +1571,7 @@ export function addAbortSignalToStream(
15711571

15721572
const abortListener = addAbortListener(signal, function () {
15731573
stream.off('close', abortListener[kDispose]).off('error', abortListener[kDispose]);
1574-
stream.destroy(this.reason);
1574+
if (!stream.destroyed) stream.destroy(this.reason);
15751575
});
15761576
// not nearly as complex as node's eos() but... do we need all that?? sobbing emoji.
15771577
stream.once('close', abortListener[kDispose]).once('error', abortListener[kDispose]);

0 commit comments

Comments
 (0)