Skip to content
15 changes: 15 additions & 0 deletions src/cmap/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import {
/** @public */
export type Stream = Socket | TLSSocket;

function applyBackpressureLabels(error: MongoError) {
error.addErrorLabel(MongoErrorLabel.SystemOverloadedError);
error.addErrorLabel(MongoErrorLabel.RetryableError);
}

export async function connect(options: ConnectionOptions): Promise<Connection> {
let connection: Connection | null = null;
try {
Expand Down Expand Up @@ -103,6 +108,8 @@ export async function performInitialHandshake(
const authContext = new AuthContext(conn, credentials, options);
conn.authContext = authContext;

// If we encounter an error preparing the handshake document, do NOT apply backpressure labels. Errors
// encountered building the handshake document are all client-side, and do not indicate an overloaded server.
const handshakeDoc = await prepareHandshakeDocument(authContext);

// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
Expand Down Expand Up @@ -163,12 +170,15 @@ export async function performInitialHandshake(
try {
await provider.auth(authContext);
} catch (error) {
// NOTE: If we encounter an error authenticating a connection, do NOT apply backpressure labels.

if (error instanceof MongoError) {
error.addErrorLabel(MongoErrorLabel.HandshakeError);
if (needsRetryableWriteLabel(error, response.maxWireVersion, conn.description.type)) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}
}

throw error;
}
}
Expand All @@ -189,6 +199,9 @@ export async function performInitialHandshake(
if (error instanceof MongoError) {
error.addErrorLabel(MongoErrorLabel.HandshakeError);
}
// If we encounter an error executing the initial handshake, apply backpressure labels.
applyBackpressureLabels(error);

throw error;
}
}
Expand Down Expand Up @@ -424,6 +437,8 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
socket = await connectedSocket;
return socket;
} catch (error) {
// If we encounter a SystemOverloaded error while establishing a socket, apply the backpressure labels to it.
applyBackpressureLabels(error);
socket.destroy();
throw error;
} finally {
Expand Down
12 changes: 4 additions & 8 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
ResetPool: 'ResetPool',
PoolRequestedRetry: 'PoolRequestedRetry',
InterruptInUseConnections: 'InterruptInUseConnections',
NoWritesPerformed: 'NoWritesPerformed'
NoWritesPerformed: 'NoWritesPerformed',
RetryableError: 'RetryableError',
SystemOverloadedError: 'SystemOverloadedError'
} as const);

/** @public */
Expand Down Expand Up @@ -1528,13 +1530,7 @@ export function isNodeShuttingDownError(err: MongoError): boolean {
*
* @see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.md#not-writable-primary-and-node-is-recovering
*/
export function isSDAMUnrecoverableError(error: MongoError): boolean {
// NOTE: null check is here for a strictly pre-CMAP world, a timeout or
// close event are considered unrecoverable
if (error instanceof MongoParseError || error == null) {
return true;
}

export function isStateChangeError(error: MongoError): boolean {
return isRecoveringError(error) || isNotWritablePrimaryError(error);
}

Expand Down
86 changes: 55 additions & 31 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import {
import {
type AnyError,
isNodeShuttingDownError,
isSDAMUnrecoverableError,
isStateChangeError,
MONGODB_ERROR_CODES,
MongoError,
MongoErrorLabel,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoParseError,
MongoRuntimeError,
MongoServerClosedError,
type MongoServerError,
Expand Down Expand Up @@ -391,43 +392,60 @@ export class Server extends TypedEventEmitter<ServerEvents> {
return;
}

const isStaleError =
error.connectionGeneration && error.connectionGeneration < this.pool.generation;
if (isStaleError) {
if (isStaleError(this, error)) {
return;
}

const isNetworkNonTimeoutError =
error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError);
const isNetworkTimeoutBeforeHandshakeError =
error instanceof MongoNetworkError && error.beforeHandshake;
const isAuthHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
if (isNetworkNonTimeoutError || isNetworkTimeoutBeforeHandshakeError || isAuthHandshakeError) {
// In load balanced mode we never mark the server as unknown and always
// clear for the specific service id.
const isAuthOrEstablishmentHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
const isSystemOverloadError = error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError);

// TODO: considering parse errors as SDAM unrecoverable errors seem
// questionable. What if the parse error only comes from an application connection,
// indicating some bytes were lost in transmission? It seems overkill to completely
// kill the server.
// Parse errors from monitoring connections are already handled because the
// error would be wrapped in a ServerHeartbeatFailedEvent, which would mark the
// server unknown and clear the pool. Can we remove this?
if (isStateChangeError(error) || error instanceof MongoParseError) {
const shouldClearPool = isNodeShuttingDownError(error);

// from the SDAM spec: The driver MUST synchronize clearing the pool with updating the topology.
// In load balanced mode: there is no monitoring, so there is no topology to update. We simply clear the pool.
// For other topologies: the `ResetPool` label instructs the topology to clear the server's pool in `updateServer()`.
if (!this.loadBalanced) {
if (shouldClearPool) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
}
markServerUnknown(this, error);
process.nextTick(() => this.requestCheck());
return;
}

if (connection && shouldClearPool) {
this.pool.clear({ serviceId: connection.serviceId });
}
} else if (
isNetworkNonTimeoutError ||
isNetworkTimeoutBeforeHandshakeError ||
isAuthOrEstablishmentHandshakeError
) {
// Do NOT clear the pool if we encounter a system overloaded error.
if (isSystemOverloadError) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only this line is relevant for the actual backpressure change. The rest is a nice-to-have refactor that makes Node's SDAM error handling more closely match the spec's pseudocode (easier for me to develop and review). When the time comes, I'll probably break the refactor into a standalone commit to ease review.

return;
}
// from the SDAM spec: The driver MUST synchronize clearing the pool with updating the topology.
// In load balanced mode: there is no monitoring, so there is no topology to update. We simply clear the pool.
// For other topologies: the `ResetPool` label instructs the topology to clear the server's pool in `updateServer()`.
if (!this.loadBalanced) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(this, error);
} else if (connection) {
this.pool.clear({ serviceId: connection.serviceId });
}
} else {
if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(this, error)) {
const shouldClearPool = isNodeShuttingDownError(error);
if (this.loadBalanced && connection && shouldClearPool) {
this.pool.clear({ serviceId: connection.serviceId });
}

if (!this.loadBalanced) {
if (shouldClearPool) {
error.addErrorLabel(MongoErrorLabel.ResetPool);
}
markServerUnknown(this, error);
process.nextTick(() => this.requestCheck());
}
}
}
}
}

Expand Down Expand Up @@ -560,12 +578,6 @@ function connectionIsStale(pool: ConnectionPool, connection: Connection) {
return connection.generation !== pool.generation;
}

function shouldHandleStateChangeError(server: Server, err: MongoError) {
const etv = err.topologyVersion;
const stv = server.description.topologyVersion;
return compareTopologyVersion(stv, etv) < 0;
}

function inActiveTransaction(session: ClientSession | undefined, cmd: Document) {
return session && session.inTransaction() && !isTransactionCommand(cmd);
}
Expand All @@ -575,3 +587,15 @@ function inActiveTransaction(session: ClientSession | undefined, cmd: Document)
function isRetryableWritesEnabled(topology: Topology) {
return topology.s.options.retryWrites !== false;
}

function isStaleError(server: Server, error: MongoError): boolean {
const currentGeneration = server.pool.generation;
const generation = error.connectionGeneration;

if (generation && generation < currentGeneration) {
return true;
}

const currentTopologyVersion = server.description.topologyVersion;
return compareTopologyVersion(currentTopologyVersion, error.topologyVersion) >= 0;
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import { expect } from 'chai';
import { once } from 'events';

import { type MongoClient } from '../../../src';
import {
type ConnectionCheckOutFailedEvent,
type ConnectionPoolClearedEvent,
type MongoClient
} from '../../../src';
import {
CONNECTION_POOL_CLEARED,
CONNECTION_POOL_READY,
SERVER_HEARTBEAT_FAILED,
SERVER_HEARTBEAT_SUCCEEDED
} from '../../../src/constants';
import { sleep } from '../../tools/utils';

describe('Server Discovery and Monitoring Prose Tests', function () {
context('Monitors sleep at least minHeartbeatFrequencyMS between checks', function () {
Expand Down Expand Up @@ -187,4 +192,74 @@ describe('Server Discovery and Monitoring Prose Tests', function () {
}
});
});

context('Connection Pool Backpressure', function () {
let client: MongoClient;
const checkoutFailedEvents: Array<ConnectionCheckOutFailedEvent> = [];
const poolClearedEvents: Array<ConnectionPoolClearedEvent> = [];

beforeEach(async function () {
client = this.configuration.newClient({}, { maxConnecting: 100 });

client.on('connectionCheckOutFailed', e => checkoutFailedEvents.push(e));
client.on('connectionPoolCleared', e => poolClearedEvents.push(e));

await client.connect();

const admin = client.db('admin').admin();
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRateLimiterEnabled: true
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRatePerSec: 20
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentBurstCapacitySecs: 1
});
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentMaxQueueDepth: 1
});

await client.db('test').collection('test').insertOne({});
});

afterEach(async function () {
// give the time to recover from the connection storm before cleaning up.
await sleep(1000);

const admin = client.db('admin').admin();
await admin.command({
setParameter: 1,
ingressConnectionEstablishmentRateLimiterEnabled: false
});

await client.close();
});

it(
'does not clear the pool when connections are closed due to connection storms',
{
requires: {
mongodb: '>=7.0' // rate limiting added in 7.0
}
},
async function () {
await Promise.allSettled(
Array.from({ length: 100 }).map(() =>
client
.db('test')
.collection('test')
.findOne({ $where: 'function() { sleep(2000); return true; }' })
)
);

expect(poolClearedEvents).to.be.empty;
expect(checkoutFailedEvents.length).to.be.greaterThan(10);
}
);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"isMaster",
"hello"
],
"closeConnection": true,
"errorCode": 91,
"appName": "poolCreateMinSizeErrorTest"
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ failPoint:
mode: { times: 50 }
data:
failCommands: ["isMaster","hello"]
closeConnection: true
errorCode: 91
appName: "poolCreateMinSizeErrorTest"
poolOptions:
minPoolSize: 1
Expand Down
8 changes: 4 additions & 4 deletions test/spec/load-balancers/sdam-error-handling.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"description": "state change errors are correctly handled",
"schemaVersion": "1.3",
"schemaVersion": "1.4",
"runOnRequirements": [
{
"topologies": [
Expand Down Expand Up @@ -263,7 +263,7 @@
"description": "errors during the initial connection hello are ignored",
"runOnRequirements": [
{
"minServerVersion": "4.9"
"minServerVersion": "4.4.7"
}
],
"operations": [
Expand All @@ -282,7 +282,7 @@
"isMaster",
"hello"
],
"closeConnection": true,
"errorCode": 11600,
"appName": "lbSDAMErrorTestClient"
}
}
Expand All @@ -297,7 +297,7 @@
}
},
"expectError": {
"isClientError": true
"isError": true
}
}
],
Expand Down
11 changes: 5 additions & 6 deletions test/spec/load-balancers/sdam-error-handling.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
description: state change errors are correctly handled

schemaVersion: '1.3'
schemaVersion: '1.4'

runOnRequirements:
- topologies: [ load-balanced ]
Expand Down Expand Up @@ -141,9 +141,8 @@ tests:
# to the same mongos on which the failpoint is set.
- description: errors during the initial connection hello are ignored
runOnRequirements:
# Server version 4.9+ is needed to set a fail point on the initial
# connection handshake with the appName filter due to SERVER-49336.
- minServerVersion: '4.9'
# Require SERVER-49336 for failCommand + appName on the initial handshake.
- minServerVersion: '4.4.7'
operations:
- name: failPoint
object: testRunner
Expand All @@ -154,14 +153,14 @@ tests:
mode: { times: 1 }
data:
failCommands: [isMaster, hello]
closeConnection: true
errorCode: 11600
appName: *singleClientAppName
- name: insertOne
object: *singleColl
arguments:
document: { x: 1 }
expectError:
isClientError: true
isError: true
expectEvents:
- client: *singleClient
eventType: cmap
Expand Down
Loading