Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
21abfe2
Fixes JAVA-5949 prevent connection churn on backpressure errors when …
nhachicha Dec 11, 2025
339297b
Remove handshake and update submodule including new tests
nhachicha Dec 15, 2025
2e44072
Update spec test; fix test runner
nhachicha Jan 20, 2026
4cff1b2
Add prose test
nhachicha Jan 26, 2026
0a93dd7
Increasing the timeout termination
nhachicha Jan 27, 2026
b1f4ba0
- Revert spec timeout
nhachicha Jan 27, 2026
1b1558d
Update exception checks
nhachicha Jan 27, 2026
fcdfa23
Increase timeout for operations to complete
nhachicha Jan 27, 2026
d423093
Simplifying conditions check
nhachicha Jan 28, 2026
bce97d4
Apply backpressure labels in CMAP layer; align SDAM with spec
nhachicha Apr 22, 2026
41b2773
Align Prose Test with Spec
nhachicha Apr 23, 2026
508d741
Test runner needs to add the SystemOverloadedError label for the pre-…
nhachicha Apr 23, 2026
c5e68fc
- Re-enable JAVA-5949-skipped SDAM/backpressure tests
nhachicha Apr 23, 2026
71c91f0
Deferred fixes to https://jira.mongodb.org/browse/JAVA-5664 to anothe…
nhachicha Apr 23, 2026
6afa1a8
Add DNS-lookup regression test for backpressure pool invalidation
nhachicha Apr 23, 2026
4938439
Addressing Claude-Code review feedback
nhachicha Apr 27, 2026
98add54
Update driver-sync/src/test/functional/com/mongodb/client/ServerDisco…
nhachicha Apr 27, 2026
9ce8dde
Add `maxAdaptiveRetries` API (#1944)
stIncMale Apr 20, 2026
a2b0e3c
Add support for server selection's deprioritized servers (#1860)
vbabanin Apr 21, 2026
135749d
Implement prose backpressure tests (#1946)
stIncMale Apr 22, 2026
dd8d662
Add `enableOverloadRetargeting` API (#1943)
vbabanin Apr 23, 2026
4923f03
Merge branch 'backpressure' into nh/backpressure/preserve_connection_…
nhachicha Apr 27, 2026
328e95f
Update driver-sync/src/test/functional/com/mongodb/client/unified/Eve…
nhachicha Apr 27, 2026
5a62b5c
Fixing DNS test
nhachicha Apr 27, 2026
4d3eaea
Adding more unit tests for the backpressure labels
nhachicha Apr 27, 2026
b24f7ec
Update driver-core/src/main/com/mongodb/internal/connection/Backpress…
nhachicha Apr 30, 2026
819810e
Update driver-core/src/main/com/mongodb/internal/connection/DefaultSd…
nhachicha Apr 30, 2026
d42bab5
PR - feedback
nhachicha Apr 30, 2026
a5720b8
Potential fix for pull request finding
nhachicha May 1, 2026
75a5ea4
Update driver-core/src/main/com/mongodb/internal/connection/Backpress…
nhachicha May 1, 2026
7f74b9b
- Replace the dedicated DNS test with a generic one.
nhachicha May 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.connection;

import com.mongodb.MongoException;
import com.mongodb.MongoSocketException;

import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLProtocolException;
import java.net.UnknownHostException;
import java.security.cert.CertPathBuilderException;
import java.security.cert.CertPathValidatorException;
import java.security.cert.CertificateException;
import java.util.Locale;

/**
* Attaches {@link MongoException#SYSTEM_OVERLOADED_ERROR_LABEL} and
* {@link MongoException#RETRYABLE_ERROR_LABEL} to network errors encountered during connection
* establishment or the hello message, per the CMAP specification.
*
* <p>This is topology-agnostic: it must be invoked from the connection-establishment path so that
* both default SDAM and load-balanced modes are covered.
*/
final class BackpressureErrorLabeler {

private BackpressureErrorLabeler() {
}

static void applyLabelsIfEligible(final Throwable t) {
if (!(t instanceof MongoSocketException)) {
return;
}
if (isDnsLookupFailure(t)) {
return;
}
if (isTlsConfigurationError(t)) {
return;
}
// TODO-BACKPRESSURE Nabil - SOCKS5 Revisit alongside JAVA-5205 (SOCKS5 in async) so both sync and
// async proxy error surfaces can be handled together — likely via a dedicated internal
// exception thrown from the proxy code path.
MongoException mongoException = (MongoException) t;
mongoException.addLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL);
mongoException.addLabel(MongoException.RETRYABLE_ERROR_LABEL);
}

private static boolean isDnsLookupFailure(final Throwable t) {
Throwable cause = t.getCause();
while (cause != null) {
if (cause instanceof UnknownHostException) {
return true;
}
cause = cause.getCause();
}
return false;
}

private static boolean isTlsConfigurationError(final Throwable t) {
Throwable cause = t.getCause();
while (cause != null) {
if (cause instanceof CertificateException
|| cause instanceof CertPathBuilderException
|| cause instanceof CertPathValidatorException
|| cause instanceof SSLPeerUnverifiedException
|| cause instanceof SSLProtocolException) {
return true;
}
if (cause instanceof SSLHandshakeException) {
String message = cause.getMessage();
if (message != null) {
String lowerMessage = message.toLowerCase(Locale.ROOT);
if (lowerMessage.contains("certificate")
|| lowerMessage.contains("verify")
|| lowerMessage.contains("trust")
|| lowerMessage.contains("hostname")
|| lowerMessage.contains("protocol")
|| lowerMessage.contains("cipher")
|| lowerMessage.contains("handshake_failure")) {
return true;
}
}
}
cause = cause.getCause();
}
return false;
Comment thread
nhachicha marked this conversation as resolved.
Outdated
Comment on lines +74 to +106
Copy link
Copy Markdown
Member

@vbabanin vbabanin Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dug into isTlsConfigurationError and the TLS exception landscape across JDK versions and providers. Sharing what I found below and what we discussed over the call.


Current TLS config error gaps

The keyword list (certificate, verify, trust, hostname, protocol, cipher, handshake_failure) catches 12 of the 25 RFC-defined TLS alert descriptions but misses these config errors.

Alert JDK exception message Labeled?
unknown_ca(48) "Received fatal alert: unknown_ca" Yes -- false postive
access_denied(49) "Received fatal alert: access_denied" Yes -- *false positive *
insufficient_security(71) "Received fatal alert: insufficient_security" Yes -- false positive
unrecognized_name(112) "Received fatal alert: unrecognized_name" Yes -- false positive

BouncyCastle's JSSE provider is currently unhandled: it produces TlsFatalAlertReceived (extends IOException, not SSLException), however, there is no good way to catch those.

How FIN, RST, and TLS alerts surface across providers and transports

Full exception chains (42 test cases)

FIN (graceful close during TLS handshake):

Transport Provider Exception chain
SocketStream JDK MongoSocketWriteException -> SSLHandshakeException("Remote host terminated the handshake") -> EOFException("SSL peer shut down incorrectly")
NettyStream JDK MongoSocketWriteException -> ClosedChannelException
TlsChannelStream JDK MongoSocketWriteException -> ClosedChannelException
SocketStream BC MongoSocketWriteException -> TlsFatalAlert("handshake_failure(40)")
NettyStream BC MongoSocketWriteException -> ClosedChannelException
TlsChannelStream BC MongoSocketWriteException -> ClosedChannelException

RST (abrupt reset during TLS handshake):

Transport Provider Exception chain
SocketStream JDK MongoSocketWriteException -> SocketException("Connection reset")
NettyStream JDK MongoSocketWriteException -> SslClosedEngineException("SSLEngine closed already")
TlsChannelStream JDK MongoSocketWriteException -> SocketException("Connection reset")
SocketStream BC MongoSocketWriteException -> SocketException("Connection reset")
NettyStream BC MongoSocketWriteException -> SslClosedEngineException("SSLEngine closed already")
TlsChannelStream BC MongoSocketWriteException -> SocketException("Connection reset")

TLS alert example -- unknown_ca(48):

Transport Provider Exception chain
SocketStream JDK MongoSocketWriteException -> SSLHandshakeException("Received fatal alert: unknown_ca")
NettyStream JDK MongoSocketWriteException -> SSLHandshakeException("Received fatal alert: unknown_ca")
TlsChannelStream JDK MongoSocketWriteException -> SSLHandshakeException("Received fatal alert: unknown_ca")
SocketStream BC MongoSocketWriteException -> TlsFatalAlertReceived("unknown_ca(48)")
NettyStream BC MongoSocketWriteException -> SSLException -> TlsFatalAlertReceived("unknown_ca(48)")
TlsChannelStream BC MongoSocketWriteException -> SSLException -> TlsFatalAlertReceived("unknown_ca(48)")

Key observation: FIN/RST get labeled correctly today because they don't match any exclusion check . A TLS alert from the server means it actively responded, so it wasn't shedding load.
Assumption: A rate limiter drops the TCP connection (FIN/RST) before or during a TLS exchange.

Two approaches

Both are heuristic-based -- perfect classification isn't possible since exception types and messages are provider implementation details.

A. Improved blocklist (current approach): Label by default, exclude what we can identify as not-overload.
Failure mode: unrecognized config errors stay labeled (pool preserved, unnecessary retries).

it should be better for unknown providers - if we can't classify an error, pool is preserved, which is the safer default under load.

B. Allowlist: Only label recognized I/O patterns (EOFException, SocketException("Connection reset"), ClosedChannelException).

Failure mode: unrecognized I/O errors are not labeled (pool cleared unnecessarily). This is the pre-backpressure behavior. For example, BC represents FIN as TlsFatalAlert("handshake_failure(40)") - the allowlist misses it, pool gets cleared.

TL;DR / Suggestion:

As we discussed over the call, the blocklist (Approach A) matches the CMAP spec's direction ("the pool MUST add the error labels" as default, with exclusions carved out).

However, we could check for all RFC alert description strings, so we can improve the heuristic.

All 25 handshake-only TLS alert descriptions from OpenJDK's Alert.java (RFC 8446/5246)

These are the alert codes with handshakeOnly=true in OpenJDK. When received from a peer, they produce SSLHandshakeException("Received fatal alert: <description>"). All are definitively config/protocol errors, not I/O:

Code Description string Currently caught by keywords?
40 handshake_failure Yes (handshake_failure)
41 no_certificate Yes (certificate)
42 bad_certificate Yes (certificate)
43 unsupported_certificate Yes (certificate)
44 certificate_revoked Yes (certificate)
45 certificate_expired Yes (certificate)
46 certificate_unknown Yes (certificate)
47 illegal_parameter No
48 unknown_ca No
49 access_denied No
50 decode_error No
51 decrypt_error No
60 export_restriction No
70 protocol_version Yes (protocol)
71 insufficient_security No
100 no_renegotiation No
109 missing_extension No
110 unsupported_extension No
111 certificate_unobtainable Yes (certificate)
112 unrecognized_name No
113 bad_certificate_status_response Yes (certificate)
114 bad_certificate_hash_value Yes (certificate)
115 unknown_psk_identity No
116 certificate_required Yes (certificate)
120 no_application_protocol No

We should keep the existing type checks (CertificateException, CertPathBuilderException, CertPathValidatorException, SSLPeerUnverifiedException, SSLProtocolException) - they handle local validation failures correctly on both providers (and should on others).

P.S Claude was used to examine JSSE providers and create a table of RFC messages used in OpenJDK's provider.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored for option A

}
}
Copy link
Copy Markdown
Member

@stIncMale stIncMale Mar 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[this comment is left on a file, but has nothing to do with the file; this is done so that we could reply to the comment; commenting on a PR does not allow replies - horrendous GitHub functionality]

The current PR seemingly depends on #1856 (see the description of the current PR). We need to decide what to do with that.

P.S. I originally left my thoughts in the description of this PR, but I don't know if that's what we are going to do.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer depend on this PR see #1900 (comment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[this comment is left on a file, but has nothing to do with the file; this is done so that we could reply to the comment; commenting on a PR does not allow replies - horrendous GitHub functionality]

@nhachicha

  • Could you please confirm that all the specification changes listed in the "Specification changes" part of the description of the current PR have been addressed in the PR?
  • If they have been addressed, let's update the description correspondingly.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description updated. All specs are implemented

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[this comment is left on a file, but has nothing to do with the file; this is done so that we could reply to the comment; commenting on a PR does not allow replies - horrendous GitHub functionality]

JAVA-5949 has an old comment, which instructs to re-enable some tests that were previously disabled when we updated the testing/resources/specifications submodule in main. Those tests were not enabled. Let's enable them and make sure they pass.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR no longer depends on #1856. Spec PR mongodb/specifications#1880 rewrote the backpressure-network-*-fail.yml tests to wait on serverDescriptionChangedEvent instead of serverHeartbeatSucceededEvent.

Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ private void handleException(final SdamIssue sdamIssue, final boolean beforeHand
serverMonitor.connect();
} else if (sdamIssue.relatedToNetworkNotTimeout()
|| (beforeHandshake && (sdamIssue.relatedToNetworkTimeout() || sdamIssue.relatedToAuth()))) {
if (sdamIssue.hasBackpressureLabel()) {
Comment thread
nhachicha marked this conversation as resolved.
Outdated
return;
}
updateDescription(sdamIssue.serverDescription());
connectionPool.invalidate(sdamIssue.exception().orElse(null));
serverMonitor.cancelCurrentCheck();
Comment thread
nhachicha marked this conversation as resolved.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,11 @@ public void open(final OperationContext originalOperationContext) {
isTrue("Open already called", stream == null);
stream = streamFactory.create(serverId.getAddress());
OperationContext operationContext = originalOperationContext;
boolean beforeHandshake = true;
try {
stream.open(operationContext);
InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this, operationContext);
beforeHandshake = false;

operationContext = operationContext.withOverride(TimeoutContext::withNewlyStartedMaintenanceTimeout);
initAfterHandshakeStart(initializationDescription);
Expand All @@ -241,6 +243,9 @@ public void open(final OperationContext originalOperationContext) {
initAfterHandshakeFinish(initializationDescription);
} catch (Throwable t) {
close();
if (beforeHandshake) {
BackpressureErrorLabeler.applyLabelsIfEligible(t);
}
if (t instanceof MongoException) {
throw (MongoException) t;
} else {
Expand All @@ -263,6 +268,7 @@ public void completed(@Nullable final Void aVoid) {
(initialResult, initialException) -> {
if (initialException != null) {
close();
BackpressureErrorLabeler.applyLabelsIfEligible(initialException);
callback.onResult(null, initialException);
} else {
assertNotNull(initialResult);
Expand All @@ -278,11 +284,13 @@ public void completed(@Nullable final Void aVoid) {
@Override
public void failed(final Throwable t) {
close();
BackpressureErrorLabeler.applyLabelsIfEligible(t);
callback.onResult(null, t);
}
});
} catch (Throwable t) {
close();
BackpressureErrorLabeler.applyLabelsIfEligible(t);
callback.onResult(null, t);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.connection;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoNodeIsRecoveringException;
import com.mongodb.MongoNotPrimaryException;
import com.mongodb.MongoSecurityException;
Expand Down Expand Up @@ -162,6 +163,11 @@ boolean relatedToWriteConcern() {
return exception instanceof MongoWriteConcernWithResponseException;
}

boolean hasBackpressureLabel() {
return exception instanceof MongoException
&& ((MongoException) exception).hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL);
}

private static boolean stale(@Nullable final Throwable t, final ServerDescription currentServerDescription) {
return TopologyVersionHelper.topologyVersion(t)
.map(candidateTopologyVersion -> TopologyVersionHelper.newerOrEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected void applyApplicationError(final BsonDocument applicationError) {

switch (when) {
case "beforeHandshakeCompletes":
BackpressureErrorLabeler.applyLabelsIfEligible(exception);
server.sdamServerDescriptionManager().handleExceptionBeforeHandshake(
SdamIssue.of(exception, new SdamIssue.Context(server.serverId(), errorGeneration, maxWireVersion)));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,28 @@ class DefaultServerSpecification extends Specification {
]
}

def 'DNS lookup failure should not be labeled as backpressure and should invalidate the pool'() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure this should be DNS-specific at the DefaultServer level.

Other unlabeled connection-establishment failures (e.g., TLS configuration errors) follow the same path and also shouldn’t receive backpressure labels.

If the intent of this test is “unlabeled errors still invalidate the pool”, we have two options:

  1. Expand this test to cover other unlabeled exceptions (TLS config, etc.), or
  2. Make it a generic behavior test and rename accordingly (e.g., from “DNS lookup failure …” to “should invalidate the pool when the exception without system overloaded label”). It could be also parametrized on different exception types.

I’d prefer (2) to keep the test focused on DefaultServer behavior rather than a specific error type (as DNS in this case).

Also, I suggest adding the complementary test for the opposite branch: should not invalidate the pool when exception with SystemOverloadedError. This would complete coverage of both paths.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed (option 2 + additional test)

given:
def exceptionToThrow = new MongoSocketException('DNS lookup failed', new ServerAddress(),
new UnknownHostException('no such host'))
BackpressureErrorLabeler.applyLabelsIfEligible(exceptionToThrow)
Comment thread
vbabanin marked this conversation as resolved.
Outdated
assert !exceptionToThrow.hasErrorLabel(MongoException.SYSTEM_OVERLOADED_ERROR_LABEL)
Copy link
Copy Markdown
Member

@vbabanin vbabanin Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part seems better suited for the unit tests where the SUT is BackpressureLabeler (e.g., BackpressureErrorLabelerTest). In this test the SUT is server.getConnection(), so it would be preferable to avoid asserting labeling internals here.

Could we instead assert the “label not applied” outcome at the behavior level in the then: section (e.g., on def e = thrown(MongoException)) and keep the unit tests in BackpressureLabeler responsible for the labeling implementation details?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


def connectionPool = Mock(ConnectionPool)
connectionPool.get(_) >> { throw exceptionToThrow }
def serverMonitor = Mock(ServerMonitor)
def server = defaultServer(connectionPool, serverMonitor)

when:
server.getConnection(OPERATION_CONTEXT)

then:
def e = thrown(MongoException)
e.is(exceptionToThrow)
1 * connectionPool.invalidate(exceptionToThrow)
1 * serverMonitor.cancelCurrentCheck()
}

def 'failed authentication should invalidate the connection pool'() {
given:
def connectionPool = Mock(ConnectionPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ public class ServerDiscoveryAndMonitoringTest extends AbstractServerDiscoveryAnd

public ServerDiscoveryAndMonitoringTest(final String description, final BsonDocument definition) {
super(definition);
assumeFalse("https://jira.mongodb.org/browse/JAVA-5949",
description.equals("error_handling_handshake.json: Network timeouts before and after the handshake completes"));

this.description = description;
init(serverAddress -> NO_OP_SERVER_LISTENER, NO_OP_CLUSTER_LISTENER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.ClusterFixture;
import com.mongodb.MongoClientSettings;
import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolReadyEvent;
Expand All @@ -26,6 +27,7 @@
import com.mongodb.event.ServerHeartbeatSucceededEvent;
import com.mongodb.event.ServerListener;
import com.mongodb.event.ServerMonitorListener;
import com.mongodb.internal.connection.TestConnectionPoolListener;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.time.TimePointTest;
Expand All @@ -47,6 +49,8 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import static com.mongodb.ClusterFixture.configureFailPoint;
Expand Down Expand Up @@ -268,6 +272,72 @@ public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() {
// As it requires mocking and package access to `com.mongodb.internal.connection`
}

/**
* See
* <a href="https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring-tests.md#connection-pool-backpressure">Connection Pool Backpressure</a>.
*/
@Test
public void testConnectionPoolBackpressure() throws InterruptedException {
assumeTrue(serverVersionAtLeast(7, 0));

TestConnectionPoolListener connectionPoolListener = new TestConnectionPoolListener();

MongoClientSettings clientSettings = getMongoClientSettingsBuilder()
.applyToConnectionPoolSettings(builder -> builder
.maxConnecting(100)
.addConnectionPoolListener(connectionPoolListener))
.build();
Comment thread
vbabanin marked this conversation as resolved.

try (MongoClient adminClient = MongoClients.create(getMongoClientSettingsBuilder().build());
MongoClient client = MongoClients.create(clientSettings)) {

Comment thread
nhachicha marked this conversation as resolved.
MongoDatabase adminDatabase = adminClient.getDatabase("admin");
MongoDatabase database = client.getDatabase(getDefaultDatabaseName());
MongoCollection<Document> collection = database.getCollection("testCollection");

try {
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentRateLimiterEnabled", true));
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentRatePerSec", 20));
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentBurstCapacitySecs", 1));
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentMaxQueueDepth", 1));
Comment thread
nhachicha marked this conversation as resolved.

collection.insertOne(Document.parse("{}"));

ExecutorService executor = Executors.newFixedThreadPool(100);
try {
for (int i = 0; i < 100; i++) {
executor.submit(() ->
collection.find(new Document("$where", "function() { sleep(2000); return true; }")).first());
}
executor.shutdown();
assertTrue("Executor did not terminate within timeout",
executor.awaitTermination(20, SECONDS));
} finally {
if (!executor.isTerminated()) {
executor.shutdownNow();
Comment thread
nhachicha marked this conversation as resolved.
}
}

int failedCheckOutCount = connectionPoolListener.countEvents(ConnectionCheckOutFailedEvent.class);
assertTrue("Expected at least 10 ConnectionCheckOutFailedEvents, but got " + failedCheckOutCount,
failedCheckOutCount >= 10);
assertEquals(0, connectionPoolListener.countEvents(ConnectionPoolClearedEvent.class));
} finally {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
adminDatabase.runCommand(new Document("setParameter", 1)
.append("ingressConnectionEstablishmentRateLimiterEnabled", false));
}
}
}

private static void assertPoll(final BlockingQueue<?> queue, @Nullable final Class<?> allowed, final Set<Class<?>> required)
throws InterruptedException {
assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.mongodb.event.CommandStartedEvent;
import com.mongodb.event.CommandSucceededEvent;
import com.mongodb.event.ConnectionCheckOutFailedEvent;
import com.mongodb.event.ConnectionCheckedInEvent;
import com.mongodb.event.ConnectionClosedEvent;
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.event.ConnectionPoolClearedEvent;
Expand Down Expand Up @@ -208,6 +209,12 @@ public void waitForConnectionPoolEvents(final String client, final BsonDocument
case "connectionReadyEvent":
eventClass = ConnectionReadyEvent.class;
break;
case "connectionClosedEvent":
eventClass = ConnectionClosedEvent.class;
break;
case "connectionCheckedInEvent":
eventClass = ConnectionCheckedInEvent.class;
break;
Comment thread
nhachicha marked this conversation as resolved.
default:
throw new UnsupportedOperationException("Unsupported event: " + event.getFirstKey());
}
Expand Down Expand Up @@ -436,9 +443,16 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e
switch (newType) {
case "Unknown":
return event.getNewDescription().getType() == ServerType.UNKNOWN;
case "LoadBalancer": {
case "LoadBalancer":
return event.getNewDescription().getType() == ServerType.LOAD_BALANCER;
}
case "Mongos":
return event.getNewDescription().getType() == ServerType.SHARD_ROUTER;
case "Standalone":
return event.getNewDescription().getType() == ServerType.STANDALONE;
case "RSPrimary":
return event.getNewDescription().getType() == ServerType.REPLICA_SET_PRIMARY;
case "RSSecondary":
return event.getNewDescription().getType() == ServerType.REPLICA_SET_SECONDARY;
default:
throw new UnsupportedOperationException();
Comment thread
nhachicha marked this conversation as resolved.
Outdated
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,8 @@ private OperationResult executeWaitForEvent(final UnifiedTestContext context, fi
case "poolReadyEvent":
case "connectionCreatedEvent":
case "connectionReadyEvent":
case "connectionClosedEvent":
case "connectionCheckedInEvent":
context.getEventMatcher().waitForConnectionPoolEvents(clientId, event, count, entities.getConnectionPoolListener(clientId));
Comment thread
nhachicha marked this conversation as resolved.
break;
case "serverHeartbeatStartedEvent":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,15 +439,7 @@ public static void applyCustomizations(final TestDef def) {
.file("server-discovery-and-monitoring", "pool-clear-on-error-checkout");
def.skipJira("https://jira.mongodb.org/browse/JAVA-5664")
.file("server-discovery-and-monitoring", "pool-cleared-on-min-pool-size-population-error");
def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
.file("server-discovery-and-monitoring", "backpressure-network-error-fail-single");
def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
.file("server-discovery-and-monitoring", "backpressure-network-timeout-error-single");
def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
.file("server-discovery-and-monitoring", "backpressure-network-error-fail-replicaset");
def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
.file("server-discovery-and-monitoring", "backpressure-network-timeout-error-replicaset");
def.skipJira("https://jira.mongodb.org/browse/JAVA-5949")
def.skipJira("https://jira.mongodb.org/browse/JAVA-6174")
Comment thread
vbabanin marked this conversation as resolved.
.file("server-discovery-and-monitoring", "backpressure-server-description-unchanged-on-min-pool-size-population-error");

// session tests
Expand Down