Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@
{
"selector": "BinaryExpression[operator=/[=!]==?/] Literal[value='undefined']",
"message": "Do not strictly check typeof undefined (NOTE: currently this rule only detects the usage of 'undefined' string literal so this could be a misfire)"
},
{
"selector": "CallExpression[callee.property.name='removeAllListeners'][arguments.length=0]",
"message": "removeAllListeners can remove error listeners leading to uncaught errors"
}
],
"@typescript-eslint/no-unused-vars": "error",
Expand Down
2 changes: 1 addition & 1 deletion .mocharc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"source-map-support/register",
"ts-node/register",
"test/tools/runner/chai_addons.ts",
"test/tools/runner/hooks/unhandled_checker.ts"
"test/tools/runner/ee_checker.ts"
],
"extension": [
"js",
Expand Down
10 changes: 4 additions & 6 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -951,12 +951,10 @@ export class ChangeStream<

/** @internal */
private _endStream(): void {
const cursorStream = this.cursorStream;
if (cursorStream) {
['data', 'close', 'end', 'error'].forEach(event => cursorStream.removeAllListeners(event));
cursorStream.destroy();
}

this.cursorStream?.removeAllListeners('data');
this.cursorStream?.removeAllListeners('close');
this.cursorStream?.removeAllListeners('end');
this.cursorStream?.destroy();
this.cursorStream = undefined;
}

Expand Down
1 change: 0 additions & 1 deletion src/client-side-encryption/state_machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,6 @@ export class StateMachine {
function destroySockets() {
for (const sock of [socket, netSocket]) {
if (sock) {
sock.removeAllListeners();
sock.destroy();
}
}
Expand Down
1 change: 0 additions & 1 deletion src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,6 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
throw error;
} finally {
socket.setTimeout(0);
socket.removeAllListeners();
if (cancellationHandler != null) {
options.cancellationToken?.removeListener('cancel', cancellationHandler);
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
HostAddress,
maxWireVersion,
type MongoDBNamespace,
noop,
now,
once,
squashError,
Expand Down Expand Up @@ -229,6 +230,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {

constructor(stream: Stream, options: ConnectionOptions) {
super();
this.on('error', noop);

this.socket = stream;
this.id = options.id;
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
kDispose,
List,
makeCounter,
noop,
now,
promiseWithResolvers
} from '../utils';
Expand Down Expand Up @@ -200,6 +201,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

constructor(server: Server, options: ConnectionPoolOptions) {
super();
this.on('error', noop);

this.options = Object.freeze({
connectionType: Connection,
Expand Down
2 changes: 2 additions & 0 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
type Disposable,
kDispose,
type MongoDBNamespace,
noop,
squashError
} from '../utils';

Expand Down Expand Up @@ -267,6 +268,7 @@ export abstract class AbstractCursor<
options: AbstractCursorOptions & Abortable = {}
) {
super();
this.on('error', noop);

if (!client.s.isMongoClient) {
throw new MongoRuntimeError('Cursor must be constructed with MongoClient');
Expand Down
3 changes: 2 additions & 1 deletion src/gridfs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { type Filter, TypedEventEmitter } from '../mongo_types';
import type { ReadPreference } from '../read_preference';
import type { Sort } from '../sort';
import { CSOTTimeoutContext } from '../timeout';
import { resolveOptions } from '../utils';
import { noop, resolveOptions } from '../utils';
import { WriteConcern, type WriteConcernOptions } from '../write_concern';
import type { FindOptions } from './../operations/find';
import {
Expand Down Expand Up @@ -87,6 +87,7 @@ export class GridFSBucket extends TypedEventEmitter<GridFSBucketEvents> {

constructor(db: Db, options?: GridFSBucketOptions) {
super();
this.on('error', noop);
this.setMaxListeners(0);
const privateOptions = resolveOptions(db, {
...DEFAULT_GRIDFS_BUCKET_OPTIONS,
Expand Down
2 changes: 2 additions & 0 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
hostMatchesWildcards,
isHostMatch,
type MongoDBNamespace,
noop,
ns,
resolveOptions,
squashError
Expand Down Expand Up @@ -386,6 +387,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements

constructor(url: string, options?: MongoClientOptions) {
super();
this.on('error', noop);

this.options = parseOptions(url, this, options);

Expand Down
8 changes: 7 additions & 1 deletion src/mongo_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type MongoLogger
} from './mongo_logger';
import type { Sort } from './sort';
import { noop } from './utils';

/** @internal */
export type TODO_NODE_3286 = any;
Expand Down Expand Up @@ -472,7 +473,12 @@ export class TypedEventEmitter<Events extends EventsDescription> extends EventEm
}

/** @public */
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {}
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {
constructor(...args: any[]) {
super(...args);
this.on('error', noop);
}
}

/** @public */
export type Abortable = {
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
type Callback,
type EventEmitterWithState,
makeStateMachine,
noop,
now,
ns
} from '../utils';
Expand Down Expand Up @@ -102,6 +103,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {

constructor(server: Server, options: MonitorOptions) {
super();
this.on('error', noop);

this.server = server;
this.connection = null;
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import {
makeStateMachine,
maxWireVersion,
type MongoDBNamespace,
noop,
supportsRetryableWrites
} from '../utils';
import { throwIfWriteConcernError } from '../write_concern';
Expand Down Expand Up @@ -142,6 +143,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
constructor(topology: Topology, description: ServerDescription, options: ServerOptions) {
super();
this.on('error', noop);

this.serverApi = options.serverApi;

Expand Down
3 changes: 2 additions & 1 deletion src/sdam/srv_polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';

import { MongoRuntimeError } from '../error';
import { TypedEventEmitter } from '../mongo_types';
import { checkParentDomainMatch, HostAddress, squashError } from '../utils';
import { checkParentDomainMatch, HostAddress, noop, squashError } from '../utils';

/**
* @internal
Expand Down Expand Up @@ -49,6 +49,7 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {

constructor(options: SrvPollerOptions) {
super();
this.on('error', noop);

if (!options || !options.srvHost) {
throw new MongoRuntimeError('Options for SrvPoller must exist and include srvHost');
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import {
kDispose,
List,
makeStateMachine,
noop,
now,
ns,
promiseWithResolvers,
Expand Down Expand Up @@ -248,6 +249,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
options: TopologyOptions
) {
super();
this.on('error', noop);

this.client = client;
// Options should only be undefined in tests, MongoClient will always have defined options
Expand Down
2 changes: 2 additions & 0 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import {
isPromiseLike,
List,
maxWireVersion,
noop,
now,
squashError,
uuidV4
Expand Down Expand Up @@ -161,6 +162,7 @@ export class ClientSession
clientOptions: MongoOptions
) {
super();
this.on('error', noop);

if (client == null) {
// TODO(NODE-3483)
Expand Down
3 changes: 2 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,7 @@ describe('Change Streams', function () {
// ChangeStream detects emitter usage via 'newListener' event
// so this covers all emitter methods
});
changeStream.on('error', () => null); // one must listen for errors if they use EE mode.

await once(changeStream.cursor, 'init');
expect(changeStream).to.have.property('mode', 'emitter');
Expand Down Expand Up @@ -971,7 +972,7 @@ describe('Change Streams', function () {
{ requires: { topology: '!single' } },
async function () {
changeStream = collection.watch([]);
changeStream.on('change', sinon.stub());
changeStream.on('change', sinon.stub()).on('error', () => null);

try {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ describe('Change Stream prose tests', function () {
expect(err).to.not.exist;
coll = client.db('integration_tests').collection('setupAfterTest');
const changeStream = coll.watch();
changeStream.on('error', done);
waitForStarted(changeStream, () => {
coll.insertOne({ x: 1 }, { writeConcern: { w: 'majority', j: true } }, err => {
expect(err).to.not.exist;
Expand Down Expand Up @@ -932,6 +933,7 @@ describe('Change Stream prose tests', function () {
let events = [];
client.on('commandStarted', e => recordEvent(events, e));
const changeStream = coll.watch([], { startAfter });
changeStream.on('error', done);
this.defer(() => changeStream.close());

changeStream.on('change', change => {
Expand Down
17 changes: 9 additions & 8 deletions test/integration/crud/crud_api.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { expect } from 'chai';
import { on } from 'events';
import * as semver from 'semver';
import * as sinon from 'sinon';
import { finished } from 'stream/promises';

import {
Collection,
Expand Down Expand Up @@ -238,7 +238,7 @@ describe('CRUD API', function () {
});

context('when creating a cursor with find', () => {
let collection;
let collection: Collection;

beforeEach(async () => {
collection = client.db().collection('t');
Expand Down Expand Up @@ -307,13 +307,14 @@ describe('CRUD API', function () {

describe('#stream()', () => {
it('creates a node stream that emits data events', async () => {
const count = 0;
const cursor = makeCursor();
const stream = cursor.stream();
on(stream, 'data');
cursor.once('close', function () {
expect(count).to.equal(2);
let count = 0;
const stream = makeCursor().stream();
const willFinish = finished(stream, { cleanup: true });
stream.on('data', () => {
count++;
});
await willFinish;
expect(count).to.equal(2);
});
});

Expand Down
6 changes: 3 additions & 3 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1993,15 +1993,15 @@ describe('Cursor', function () {
expect(res).property('insertedId').to.exist;
}, 300);

const start = new Date();
const start = performance.now();
const doc1 = await cursor.next();
expect(doc1).to.have.property('b', 2);
const end = new Date();
const end = performance.now();

await later; // make sure this finished, without a failure

// We should see here that cursor.next blocked for at least 300ms
expect(end.getTime() - start.getTime()).to.be.at.least(300);
expect(end - start).to.be.at.least(290);
}
}
);
Expand Down
Loading