Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/client-side-encryption/auto_encrypter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ export class AutoEncrypter {
/**
* Cleans up the `_mongocryptdClient`, if present.
*/
async teardown(force: boolean): Promise<void> {
await this._mongocryptdClient?.close(force);
async close(): Promise<void> {
await this._mongocryptdClient?.close();
}

/**
Expand Down
9 changes: 8 additions & 1 deletion src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from '../constants';
import {
type AnyError,
MongoClientClosedError,
type MongoError,
MongoInvalidArgumentError,
MongoMissingCredentialsError,
Expand Down Expand Up @@ -484,11 +485,17 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
for (const connection of this.checkedOut) {
if (connection.generation <= minGeneration) {
connection.onError(new PoolClearedOnNetworkError(this));
this.checkIn(connection);
}
}
}

/** For MongoClient.close() procedures */
public closeCheckedOutConnections() {
for (const conn of this.checkedOut) {
conn.onError(new MongoClientClosedError());
}
}

/** Close the pool */
close(): void {
if (this.closed) {
Expand Down
13 changes: 3 additions & 10 deletions src/encrypter.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import { callbackify } from 'util';

import { AutoEncrypter, type AutoEncryptionOptions } from './client-side-encryption/auto_encrypter';
import { MONGO_CLIENT_EVENTS } from './constants';
import { getMongoDBClientEncryption } from './deps';
import { MongoInvalidArgumentError, MongoMissingDependencyError } from './error';
import { MongoClient, type MongoClientOptions } from './mongo_client';
import { type Callback } from './utils';

/** @internal */
export interface EncrypterOptions {
Expand Down Expand Up @@ -98,20 +95,16 @@ export class Encrypter {
}
}

closeCallback(client: MongoClient, force: boolean, callback: Callback<void>) {
callbackify(this.close.bind(this))(client, force, callback);
}

async close(client: MongoClient, force: boolean): Promise<void> {
async close(client: MongoClient): Promise<void> {
let error;
try {
await this.autoEncrypter.teardown(force);
await this.autoEncrypter.close();
} catch (autoEncrypterError) {
error = autoEncrypterError;
}
const internalClient = this.internalClient;
if (internalClient != null && client !== internalClient) {
return await internalClient.close(force);
return await internalClient.close();
}
if (error != null) {
throw error;
Expand Down
28 changes: 28 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,34 @@ export class MongoTopologyClosedError extends MongoAPIError {
}
}

/**
* An error generated when the MongoClient is closed and async
* operations are interrupted.
*
* @public
* @category Error
*/
export class MongoClientClosedError extends MongoAPIError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor() {
super('Operation interrupted because client was closed');
}

override get name(): string {
return 'MongoClientClosedError';
}
}

/** @public */
export interface MongoNetworkErrorOptions {
/** Indicates the timeout happened before a connection handshake completed */
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export {
MongoClientBulkWriteCursorError,
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoClientClosedError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
85 changes: 65 additions & 20 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,22 +347,35 @@ export type MongoClientEvents = Pick<TopologyEvents, (typeof MONGO_CLIENT_EVENTS
};

/**
* The **MongoClient** class is a class that allows for making Connections to MongoDB.
* @public
*
* The **MongoClient** class is a class that allows for making Connections to MongoDB.
*
* **NOTE:** The programmatically provided options take precedence over the URI options.
*
* @remarks
* The programmatically provided options take precedence over the URI options.
*
* A MongoClient is the entry point to connecting to a MongoDB server.
*
* It handles a multitude of features on your application's behalf:
* - **Server Host Connection Configuration**: A MongoClient is responsible for reading TLS cert, ca, and crl files if provided.
* - **SRV Record Polling**: A "`mongodb+srv`" style connection string is used to have the MongoClient resolve DNS SRV records of all server hostnames which the driver periodically monitors for changes and adjusts its current view of hosts correspondingly.
* - **Server Monitoring**: Automatically the MongoClient keeps monitoring the health of server nodes in your cluster to reach out to the correct and lowest latency one available.
* - **Connection Pooling**: To avoid paying the cost of rebuilding a connection to the server on every operations the MongoClient keeps idle connections preserved for reuse.
* - **Session Pooling**: The MongoClient creates logical sessions which enable retryable writes, causal consistency, and transactions. It handles pooling these sessions for reuse in subsequent operations.
* - **Cursor Operations**: A MongoClient's cursors use the health monitoring system to send the request for more documents to the same server the query began on.
* - **Mongocryptd process**: A MongoClient will launch a `mongocryptd` instance for handling encryption if the mongocrypt shared library isn't in use.
*
* There are many more features of a MongoClient that are not listed above.
*
* As suggested by the list above there are a number of asynchronous Node.js resources that are established by the driver.
* For details on clean up, please refer to the MongoClient `close()` documentation.
*
* @example
* ```ts
* import { MongoClient } from 'mongodb';
*
* // Enable command monitoring for debugging
* const client = new MongoClient('mongodb://localhost:27017', { monitorCommands: true });
*
* client.on('commandStarted', started => console.log(started));
* client.db().collection('pets');
* await client.insertOne({ name: 'spot', kind: 'dog' });
* const client = new MongoClient('mongodb://localhost:27017?appName=mflix', { monitorCommands: true });
* ```
*/
export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements AsyncDisposable {
Expand Down Expand Up @@ -641,25 +654,55 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
}

/**
* Cleans up client-side resources used by the MongoCLient and . This includes:
* Cleans up resources managed by the MongoClient.
*
* The close method clears and closes all resources that the client is responsible for.
* Please refer to the `MongoClient` class documentation for a breakdown of resource responsibilities.
*
* **However,** the close method is not a replacement for clean up of explicitly created resources.
* This method is written as a "best effort" attempt to leave behind the least amount of resources server-side when possible.
*
* The following list defines ideal preconditions and consequent pitfalls if they are not met.
*
* - Closes all open, unused connections (see note).
* - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}.
* - Ends all unused sessions server-side.
* - Cleans up any resources being used for auto encryption if auto encryption is enabled.
* The close method performs the following in the order listed:
* - Client-side:
* - **Close in-use connections**: Any connections that are currently waiting on a response from the server will be closed.
* This is performed _first_ to avoid reaching the next step (server-side clean up) and having no available connections to check out.
* - _Ideal_: All operations have been awaited or cancelled, and the outcomes, regardless of success or failure, have been processed before closing the client servicing the operation.
* - _Pitfall_: When `client.close()` is called and all connections are in use, after closing them, the client must create new connections for cleanup operations, which comes at the cost of new TLS/TCP handshakes and authentication steps.
* - Server-side:
* - **Close active cursors**: All cursors that haven't been completed will have a `killCursor` operation sent to the server they were initialized on, freeing the server-side resource.
* - _Ideal_: Cursors are explicitly closed or completed before `client.close()` is called.
* - _Pitfall_: `killCursors` may have to build a new connection if the in-use closure ended all pooled connections.
* - **End active sessions**: In-use sessions created with `client.startSession()` or `client.withSession()` or implicitly by the driver will have their `.endSession()` method called.
* Contrary to the name of the method, `endSession()` returns the session to the client's pool of sessions rather than end them on the server.
* - _Ideal_: Transaction outcomes are awaited and their corresponding explicit sessions are ended before `client.close()` is called.
* - _Pitfall_: **This step aborts in-progress transactions**. It is advisable to observe the outcome of a transaction before closing your client.
* - **End all pooled sessions**: The `endSessions` command with all session IDs the client has pooled is sent to the server to inform the cluster it can clean them up.
* - _Ideal_: No user intervention is expected.
* - _Pitfall_: None.
*
* @remarks Any in-progress operations are not killed and any connections used by in progress operations
* will be cleaned up lazily as operations finish.
* The remaining shutdown is of the MongoClient resources that are intended to be entirely internal but is documented here as their existence relates to the JS event loop.
*
* @param force - Force close, emitting no events
* - Client-side (again):
* - **Stop all server monitoring**: Connections kept live for detecting cluster changes and roundtrip time measurements are shutdown.
* - **Close all pooled connections**: Each server node in the cluster has a corresponding connection pool and all connections in the pool are closed. Any operations waiting to check out a connection will have an error thrown instead of a connection returned.
* - **Clear out server selection queue**: Any operations that are in the process of waiting for a server to be selected will have an error thrown instead of a server returned.
* - **Close encryption-related resources**: An internal MongoClient created for communicating with `mongocryptd` or other encryption purposes is closed. (Using this same method of course!)
*
* After the close method completes there should be no MongoClient related resources [ref-ed in Node.js' event loop](https://docs.libuv.org/en/v1.x/handle.html#reference-counting).
* This should allow Node.js to exit gracefully if MongoClient resources were the only active handles in the event loop.
*
* @param _force - currently an unused flag that has no effect. Defaults to `false`.
*/
async close(force = false): Promise<void> {

async close(_force = false): Promise<void> {
if (this.closeLock) {
return await this.closeLock;
}

try {
this.closeLock = this._close(force);
this.closeLock = this._close();
await this.closeLock;
} finally {
// release
Expand All @@ -668,7 +711,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
}

/* @internal */
private async _close(force = false): Promise<void> {
private async _close(): Promise<void> {
// There's no way to set hasBeenClosed back to false
Object.defineProperty(this.s, 'hasBeenClosed', {
value: true,
Expand All @@ -677,6 +720,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

this.topology?.closeCheckedOutConnections();

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();

Expand Down Expand Up @@ -722,7 +767,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements

const { encrypter } = this.options;
if (encrypter) {
await encrypter.close(this, force);
await encrypter.close(this);
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}
}

closeCheckedOutConnections() {
return this.pool.closeCheckedOutConnections();
}

/** Destroy the server connection */
destroy(): void {
close(): void {
if (this.s.state === STATE_CLOSED) {
return;
}
Expand Down
14 changes: 10 additions & 4 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

closeCheckedOutConnections() {
for (const server of this.s.servers.values()) {
return server.closeCheckedOutConnections();
}
}

/** Close this topology */
close(): void {
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return;
}

for (const server of this.s.servers.values()) {
destroyServer(server, this);
closeServer(server, this);
}

this.s.servers.clear();
Expand Down Expand Up @@ -791,12 +797,12 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Destroys a server, and removes all event listeners from the instance */
function destroyServer(server: Server, topology: Topology) {
function closeServer(server: Server, topology: Topology) {
for (const event of LOCAL_SERVER_EVENTS) {
server.removeAllListeners(event);
}

server.destroy();
server.close();
topology.emitAndLog(
Topology.SERVER_CLOSED,
new ServerClosedEvent(topology.s.id, server.description.address)
Expand Down Expand Up @@ -903,7 +909,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes

// prepare server for garbage collection
if (server) {
destroyServer(server, topology);
closeServer(server, topology);
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ describe('Change Streams', function () {
await csDb.createCollection('test').catch(() => null);
collection = csDb.collection('test');
changeStream = collection.watch();
changeStream.on('error', () => null);
});

afterEach(async () => {
Expand Down Expand Up @@ -702,15 +703,19 @@ describe('Change Streams', function () {

const outStream = new PassThrough({ objectMode: true });

// @ts-expect-error: transform requires a Document return type
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
const transform = doc => ({ doc: JSON.stringify(doc) });
changeStream
.stream({ transform })
.on('error', () => null)
.pipe(outStream)
.on('error', () => null);

const willBeData = once(outStream, 'data');

await collection.insertMany([{ a: 1 }]);

const [data] = await willBeData;
const parsedEvent = JSON.parse(data);
const parsedEvent = JSON.parse(data.doc);
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);

outStream.destroy();
Expand Down
Loading