Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@
{
"selector": "BinaryExpression[operator=/[=!]==?/] Literal[value='undefined']",
"message": "Do not strictly check typeof undefined (NOTE: currently this rule only detects the usage of 'undefined' string literal so this could be a misfire)"
},
{
"selector": "CallExpression[callee.property.name='removeAllListeners'][arguments.length=0]",
"message": "removeAllListeners can remove error listeners leading to uncaught errors"
}
],
"@typescript-eslint/no-unused-vars": "error",
Expand Down
2 changes: 1 addition & 1 deletion .mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai_addons.ts",
"test/tools/runner/hooks/unhandled_checker.ts"
"test/tools/runner/ee_checker.ts"
],
"extension": [
"js",
Expand Down
10 changes: 4 additions & 6 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -951,12 +951,10 @@ export class ChangeStream<

/** @internal */
private _endStream(): void {
const cursorStream = this.cursorStream;
if (cursorStream) {
['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
cursorStream.destroy();
}

this.cursorStream?.removeAllListeners('data');
this.cursorStream?.removeAllListeners('close');
this.cursorStream?.removeAllListeners('end');
this.cursorStream?.destroy();
this.cursorStream = undefined;
}

Expand Down
1 change: 0 additions & 1 deletion src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ export class StateMachine {
function destroySockets() {
for (const sock of [socket, netSocket]) {
if (sock) {
sock.removeAllListeners();
sock.destroy();
}
}
Expand Down
1 change: 0 additions & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
throw error;
} finally {
socket.setTimeout(0);
socket.removeAllListeners();
if (cancellationHandler != null) {
options.cancellationToken?.removeListener('cancel', cancellationHandler);
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
HostAddress,
maxWireVersion,
type MongoDBNamespace,
noop,
now,
once,
squashError,
Expand Down Expand Up @@ -229,6 +230,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

constructor(stream: Stream, options: ConnectionOptions) {
super();
this.on('error', noop);

this.socket = stream;
this.id = options.id;
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
kDispose,
List,
makeCounter,
noop,
now,
promiseWithResolvers
} from '../utils';
Expand Down Expand Up @@ -200,6 +201,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

constructor(server: Server, options: ConnectionPoolOptions) {
super();
this.on('error', noop);

this.options = Object.freeze({
connectionType: Connection,
Expand Down
2 changes: 2 additions & 0 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
type Disposable,
kDispose,
type MongoDBNamespace,
noop,
squashError
} from '../utils';

Expand Down Expand Up @@ -267,6 +268,7 @@ export abstract class AbstractCursor<
options: AbstractCursorOptions & Abortable = {}
) {
super();
this.on('error', noop);

if (!client.s.isMongoClient) {
throw new MongoRuntimeError('Cursor must be constructed with MongoClient');
Expand Down
3 changes: 2 additions & 1 deletion src/gridfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { type Filter, TypedEventEmitter } from '../mongo_types';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { CSOTTimeoutContext } from '../timeout';
import { resolveOptions } from '../utils';
import { noop, resolveOptions } from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { FindOptions } from './../operations/find';
import {
Expand Down Expand Up @@ -87,6 +87,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {

constructor(db: Db, options?: GridFSBucketOptions) {
super();
this.on('error', noop);
this.setMaxListeners(0);
const privateOptions = resolveOptions(db, {
...DEFAULT_GRIDFS_BUCKET_OPTIONS,
Expand Down
2 changes: 2 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import {
hostMatchesWildcards,
isHostMatch,
type MongoDBNamespace,
noop,
ns,
resolveOptions,
squashError
Expand Down Expand Up @@ -381,6 +382,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements

constructor(url: string, options?: MongoClientOptions) {
super();
this.on('error', noop);

this.options = parseOptions(url, this, options);

Expand Down
8 changes: 7 additions & 1 deletion src/mongo_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type MongoLogger
} from './mongo_logger';
import type { Sort } from './sort';
import { noop } from './utils';

/** @internal */
export type TODO_NODE_3286 = any;
Expand Down Expand Up @@ -472,7 +473,12 @@ export class TypedEventEmitter<Events extends EventsDescription> extends EventEm
}

/** @public */
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {}
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {
constructor(...args: any[]) {
super(...args);
this.on('error', noop);
}
}

/** @public */
export type Abortable = {
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type Callback,
type EventEmitterWithState,
makeStateMachine,
noop,
now,
ns
} from '../utils';
Expand Down Expand Up @@ -102,6 +103,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {

constructor(server: Server, options: MonitorOptions) {
super();
this.on('error', noop);

this.server = server;
this.connection = null;
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
makeStateMachine,
maxWireVersion,
type MongoDBNamespace,
noop,
supportsRetryableWrites
} from '../utils';
import { throwIfWriteConcernError } from '../write_concern';
Expand Down Expand Up @@ -142,6 +143,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
constructor(topology: Topology, description: ServerDescription, options: ServerOptions) {
super();
this.on('error', noop);

this.serverApi = options.serverApi;

Expand Down
3 changes: 2 additions & 1 deletion src/sdam/srv_polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';

import { MongoRuntimeError } from '../error';
import { TypedEventEmitter } from '../mongo_types';
import { checkParentDomainMatch, HostAddress, squashError } from '../utils';
import { checkParentDomainMatch, HostAddress, noop, squashError } from '../utils';

/**
* @internal
Expand Down Expand Up @@ -49,6 +49,7 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {

constructor(options: SrvPollerOptions) {
super();
this.on('error', noop);

if (!options || !options.srvHost) {
throw new MongoRuntimeError('Options for SrvPoller must exist and include srvHost');
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
kDispose,
List,
makeStateMachine,
noop,
now,
ns,
promiseWithResolvers,
Expand Down Expand Up @@ -248,6 +249,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
options: TopologyOptions
) {
super();
this.on('error', noop);

this.client = client;
// Options should only be undefined in tests, MongoClient will always have defined options
Expand Down
2 changes: 2 additions & 0 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
isPromiseLike,
List,
maxWireVersion,
noop,
now,
squashError,
uuidV4
Expand Down Expand Up @@ -161,6 +162,7 @@ export class ClientSession
clientOptions: MongoOptions
) {
super();
this.on('error', noop);

if (client == null) {
// TODO(NODE-3483)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ describe('Change Streams', function () {
// ChangeStream detects emitter usage via 'newListener' event
// so this covers all emitter methods
});
changeStream.on('error', () => null); // one must listen for errors if they use EE mode.

await once(changeStream.cursor, 'init');
expect(changeStream).to.have.property('mode', 'emitter');
Expand Down Expand Up @@ -971,7 +972,7 @@ describe('Change Streams', function () {
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
changeStream.on('change', sinon.stub());
changeStream.on('change', sinon.stub()).on('error', () => null);

try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ describe('Change Stream prose tests', function () {
expect(err).to.not.exist;
coll = client.db('integration_tests').collection('setupAfterTest');
const changeStream = coll.watch();
changeStream.on('error', done);
waitForStarted(changeStream, () => {
coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }, err => {
expect(err).to.not.exist;
Expand Down Expand Up @@ -932,6 +933,7 @@ describe('Change Stream prose tests', function () {
let events = [];
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });
changeStream.on('error', done);
this.defer(() => changeStream.close());

changeStream.on('change', change => {
Expand Down
17 changes: 9 additions & 8 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { on } from 'events';
import * as semver from 'semver';
import * as sinon from 'sinon';
import { finished } from 'stream/promises';

import {
Collection,
Expand Down Expand Up @@ -238,7 +238,7 @@ describe('CRUD API', function () {
});

context('when creating a cursor with find', () => {
let collection;
let collection: Collection;

beforeEach(async () => {
collection = client.db().collection('t');
Expand Down Expand Up @@ -307,13 +307,14 @@ describe('CRUD API', function () {

describe('#stream()', () => {
it('creates a node stream that emits data events', async () => {
const count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
on(stream, 'data');
cursor.once('close', function () {
expect(count).to.equal(2);
let count = 0;
const stream = makeCursor().stream();
const willFinish = finished(stream, { cleanup: true });
stream.on('data', () => {
count++;
});
await willFinish;
expect(count).to.equal(2);
});
});

Expand Down
6 changes: 3 additions & 3 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1993,15 +1993,15 @@ describe('Cursor', function () {
expect(res).property('insertedId').to.exist;
}, 300);

const start = new Date();
const start = performance.now();
const doc1 = await cursor.next();
expect(doc1).to.have.property('b', 2);
const end = new Date();
const end = performance.now();

await later; // make sure this finished, without a failure

// We should see here that cursor.next blocked for at least 300ms
expect(end.getTime() - start.getTime()).to.be.at.least(300);
expect(end - start).to.be.at.least(290);
}
}
);
Expand Down
Loading