Skip to content

Commit 877b6d6

Browse files
authored
Make maxConnecting configurable (#822)
Note that `maxConnecting` is duplicated in `MongoClientOptions` similarly to other fields. https://jira.mongodb.org/browse/JAVA-4404 was filed to refactor the whole class. JAVA-4390
1 parent 6f49b5c commit 877b6d6

File tree

10 files changed

+200
-44
lines changed

10 files changed

+200
-44
lines changed

driver-core/src/main/com/mongodb/ConnectionString.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb;
1818

19+
import com.mongodb.connection.ConnectionPoolSettings;
1920
import com.mongodb.diagnostics.logging.Logger;
2021
import com.mongodb.diagnostics.logging.Loggers;
2122
import com.mongodb.internal.dns.DefaultDnsResolver;
@@ -126,6 +127,7 @@
126127
* <li>{@code maxPoolSize=n}: The maximum number of connections in the connection pool.</li>
127128
* <li>{@code waitQueueTimeoutMS=ms}: The maximum wait time in milliseconds that a thread may wait for a connection to
128129
* become available.</li>
130+
* <li>{@code maxConnecting=n}: The maximum number of connections a pool may be establishing concurrently.</li>
129131
* </ul>
130132
* <p>Write concern configuration:</p>
131133
* <ul>
@@ -265,6 +267,7 @@ public class ConnectionString {
265267
private Integer maxWaitTime;
266268
private Integer maxConnectionIdleTime;
267269
private Integer maxConnectionLifeTime;
270+
private Integer maxConnecting;
268271
private Integer connectTimeout;
269272
private Integer socketTimeout;
270273
private Boolean sslEnabled;
@@ -446,6 +449,7 @@ public ConnectionString(final String connectionString) {
446449
GENERAL_OPTIONS_KEYS.add("connecttimeoutms");
447450
GENERAL_OPTIONS_KEYS.add("maxidletimems");
448451
GENERAL_OPTIONS_KEYS.add("maxlifetimems");
452+
GENERAL_OPTIONS_KEYS.add("maxconnecting");
449453
GENERAL_OPTIONS_KEYS.add("sockettimeoutms");
450454

451455
// Order matters here: Having tls after ssl means than the tls option will supersede the ssl option when both are set
@@ -542,6 +546,8 @@ private void translateOptions(final Map<String, List<String>> optionsMap) {
542546
maxConnectionIdleTime = parseInteger(value, "maxidletimems");
543547
} else if (key.equals("maxlifetimems")) {
544548
maxConnectionLifeTime = parseInteger(value, "maxlifetimems");
549+
} else if (key.equals("maxconnecting")) {
550+
maxConnecting = parseInteger(value, "maxConnecting");
545551
} else if (key.equals("waitqueuetimeoutms")) {
546552
maxWaitTime = parseInteger(value, "waitqueuetimeoutms");
547553
} else if (key.equals("connecttimeoutms")) {
@@ -1300,6 +1306,18 @@ public Integer getMaxConnectionLifeTime() {
13001306
return maxConnectionLifeTime;
13011307
}
13021308

1309+
/**
1310+
* Gets the maximum number of connections a pool may be establishing concurrently specified in the connection string.
1311+
* @return The maximum number of connections a pool may be establishing concurrently
1312+
* if the {@code maxConnecting} option is specified in the connection string, or {@code null} otherwise.
1313+
* @see ConnectionPoolSettings#getMaxConnecting()
1314+
* @since 4.4
1315+
*/
1316+
@Nullable
1317+
public Integer getMaxConnecting() {
1318+
return maxConnecting;
1319+
}
1320+
13031321
/**
13041322
* Gets the socket connect timeout specified in the connection string.
13051323
* @return the socket connect timeout
@@ -1445,6 +1463,7 @@ public boolean equals(final Object o) {
14451463
&& Objects.equals(maxWaitTime, that.maxWaitTime)
14461464
&& Objects.equals(maxConnectionIdleTime, that.maxConnectionIdleTime)
14471465
&& Objects.equals(maxConnectionLifeTime, that.maxConnectionLifeTime)
1466+
&& Objects.equals(maxConnecting, that.maxConnecting)
14481467
&& Objects.equals(connectTimeout, that.connectTimeout)
14491468
&& Objects.equals(socketTimeout, that.socketTimeout)
14501469
&& Objects.equals(sslEnabled, that.sslEnabled)
@@ -1462,8 +1481,8 @@ public boolean equals(final Object o) {
14621481
public int hashCode() {
14631482
return Objects.hash(credential, isSrvProtocol, hosts, database, collection, directConnection, readPreference,
14641483
writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
1465-
maxConnectionIdleTime, maxConnectionLifeTime, connectTimeout, socketTimeout, sslEnabled, sslInvalidHostnameAllowed,
1466-
requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency, applicationName, compressorList,
1467-
uuidRepresentation);
1484+
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled,
1485+
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
1486+
applicationName, compressorList, uuidRepresentation);
14681487
}
14691488
}

driver-core/src/main/com/mongodb/connection/ConnectionPoolSettings.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import com.mongodb.ConnectionString;
2020
import com.mongodb.annotations.Immutable;
2121
import com.mongodb.annotations.NotThreadSafe;
22+
import com.mongodb.event.ConnectionCreatedEvent;
2223
import com.mongodb.event.ConnectionPoolListener;
24+
import com.mongodb.event.ConnectionReadyEvent;
2325

2426
import java.util.ArrayList;
2527
import java.util.List;
@@ -46,6 +48,7 @@ public class ConnectionPoolSettings {
4648
private final long maxConnectionIdleTimeMS;
4749
private final long maintenanceInitialDelayMS;
4850
private final long maintenanceFrequencyMS;
51+
private final int maxConnecting;
4952

5053
/**
5154
* Gets a Builder for creating a new ConnectionPoolSettings instance.
@@ -80,6 +83,7 @@ public static final class Builder {
8083
private long maxConnectionIdleTimeMS;
8184
private long maintenanceInitialDelayMS;
8285
private long maintenanceFrequencyMS = MILLISECONDS.convert(1, MINUTES);
86+
private int maxConnecting = 2;
8387

8488
Builder() {
8589
}
@@ -103,6 +107,7 @@ public Builder applySettings(final ConnectionPoolSettings connectionPoolSettings
103107
maxConnectionIdleTimeMS = connectionPoolSettings.maxConnectionIdleTimeMS;
104108
maintenanceInitialDelayMS = connectionPoolSettings.maintenanceInitialDelayMS;
105109
maintenanceFrequencyMS = connectionPoolSettings.maintenanceFrequencyMS;
110+
maxConnecting = connectionPoolSettings.maxConnecting;
106111
return this;
107112
}
108113

@@ -210,6 +215,19 @@ public Builder addConnectionPoolListener(final ConnectionPoolListener connection
210215
return this;
211216
}
212217

218+
/**
219+
* The maximum number of connections a pool may be establishing concurrently.
220+
*
221+
* @param maxConnecting The maximum number of connections a pool may be establishing concurrently. Must be positive.
222+
* @return {@code this}.
223+
* @see ConnectionPoolSettings#getMaxConnecting()
224+
* @since 4.4
225+
*/
226+
public Builder maxConnecting(final int maxConnecting) {
227+
this.maxConnecting = maxConnecting;
228+
return this;
229+
}
230+
213231
/**
214232
* Creates a new ConnectionPoolSettings object with the settings initialised on this builder.
215233
*
@@ -251,6 +269,11 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
251269
maxConnectionLifeTime(maxConnectionLifeTime, MILLISECONDS);
252270
}
253271

272+
Integer maxConnecting = connectionString.getMaxConnecting();
273+
if (maxConnecting != null) {
274+
maxConnecting(maxConnecting);
275+
}
276+
254277
return this;
255278
}
256279
}
@@ -343,6 +366,21 @@ public List<ConnectionPoolListener> getConnectionPoolListeners() {
343366
return connectionPoolListeners;
344367
}
345368

369+
/**
370+
* The maximum number of connections a pool may be establishing concurrently.
371+
* Establishment of a connection is a part of its life cycle
372+
* starting after a {@link ConnectionCreatedEvent} and ending before a {@link ConnectionReadyEvent}.
373+
* <p>
374+
* Default is 2.</p>
375+
*
376+
* @return The maximum number of connections a pool may be establishing concurrently.
377+
* @see Builder#maxConnecting(int)
378+
* @since 4.4
379+
*/
380+
public int getMaxConnecting() {
381+
return maxConnecting;
382+
}
383+
346384
@Override
347385
public boolean equals(final Object o) {
348386
if (this == o) {
@@ -378,7 +416,9 @@ public boolean equals(final Object o) {
378416
if (!connectionPoolListeners.equals(that.connectionPoolListeners)) {
379417
return false;
380418
}
381-
419+
if (maxConnecting != that.maxConnecting) {
420+
return false;
421+
}
382422
return true;
383423
}
384424

@@ -392,6 +432,7 @@ public int hashCode() {
392432
result = 31 * result + (int) (maintenanceInitialDelayMS ^ (maintenanceInitialDelayMS >>> 32));
393433
result = 31 * result + (int) (maintenanceFrequencyMS ^ (maintenanceFrequencyMS >>> 32));
394434
result = 31 * result + connectionPoolListeners.hashCode();
435+
result = 31 * result + maxConnecting;
395436
return result;
396437
}
397438

@@ -406,6 +447,7 @@ public String toString() {
406447
+ ", maintenanceInitialDelayMS=" + maintenanceInitialDelayMS
407448
+ ", maintenanceFrequencyMS=" + maintenanceFrequencyMS
408449
+ ", connectionPoolListeners=" + connectionPoolListeners
450+
+ ", maxConnecting=" + maxConnecting
409451
+ '}';
410452
}
411453

@@ -417,6 +459,7 @@ public String toString() {
417459
isTrue("maxConnectionIdleTime >= 0", builder.maxConnectionIdleTimeMS >= 0);
418460
isTrue("sizeMaintenanceFrequency > 0", builder.maintenanceFrequencyMS > 0);
419461
isTrue("maxSize >= minSize", builder.maxSize >= builder.minSize);
462+
isTrue("maxConnecting > 0", builder.maxConnecting > 0);
420463

421464
maxSize = builder.maxSize;
422465
minSize = builder.minSize;
@@ -426,5 +469,6 @@ public String toString() {
426469
maintenanceInitialDelayMS = builder.maintenanceInitialDelayMS;
427470
maintenanceFrequencyMS = builder.maintenanceFrequencyMS;
428471
connectionPoolListeners = unmodifiableList(builder.connectionPoolListeners);
472+
maxConnecting = builder.maxConnecting;
429473
}
430474
}

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,6 @@
100100
@SuppressWarnings("deprecation")
101101
class DefaultConnectionPool implements ConnectionPool {
102102
private static final Logger LOGGER = Loggers.getLogger("connection");
103-
/**
104-
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
105-
*/
106-
static final int MAX_CONNECTING = 2;
107103

108104
private final ConcurrentPool<UsageTrackingInternalConnection> pool;
109105
private final ConnectionPoolSettings settings;
@@ -145,7 +141,7 @@ class DefaultConnectionPool implements ConnectionPool {
145141
this.connectionPoolListener = getConnectionPoolListener(settings);
146142
backgroundMaintenance = new BackgroundMaintenanceManager();
147143
connectionPoolCreated(connectionPoolListener, serverId, settings);
148-
openConcurrencyLimiter = new OpenConcurrencyLimiter(MAX_CONNECTING);
144+
openConcurrencyLimiter = new OpenConcurrencyLimiter(settings.getMaxConnecting());
149145
asyncWorkManager = new AsyncWorkManager(internalSettings.isPrestartAsyncWorkManager());
150146
stateAndGeneration = new StateAndGeneration();
151147
connectionGenerationSupplier = new ConnectionGenerationSupplier() {

driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.util.concurrent.locks.ReentrantLock;
5959
import java.util.stream.Stream;
6060

61-
import static com.mongodb.internal.connection.DefaultConnectionPool.MAX_CONNECTING;
6261
import static java.lang.Long.MAX_VALUE;
6362
import static java.util.concurrent.TimeUnit.MILLISECONDS;
6463
import static java.util.concurrent.TimeUnit.MINUTES;
@@ -81,6 +80,7 @@
8180
public class DefaultConnectionPoolTest {
8281
private static final ServerId SERVER_ID = new ServerId(new ClusterId(), new ServerAddress());
8382
private static final long TEST_WAIT_TIMEOUT_MILLIS = SECONDS.toMillis(5);
83+
private static final int DEFAULT_MAX_CONNECTING = ConnectionPoolSettings.builder().build().getMaxConnecting();
8484

8585
private TestInternalConnectionFactory connectionFactory;
8686

@@ -331,11 +331,15 @@ private static Stream<Arguments> concurrentUsageArguments() {
331331
return Stream.of(// variants marked with (*) have proved their usefulness by detecting bugs
332332
Arguments.of(0, 1, true, 8, true, false, 0.02f, 0, 0),
333333
Arguments.of(0, 1, false, 8, false, true, 0.02f, 0, 0), // (*)
334-
Arguments.of(MAX_CONNECTING, MAX_CONNECTING, true, 8, true, true, 0, 0, 0),
335-
Arguments.of(MAX_CONNECTING + 1, MAX_CONNECTING + 5, true, 2 * (MAX_CONNECTING + 5), true, true, 0.02f, 0, 0),
336-
Arguments.of(MAX_CONNECTING + 5, MAX_CONNECTING + 5, false, 2 * (MAX_CONNECTING + 5), true, true, 0.02f, 0, 0), // (*)
337-
Arguments.of(MAX_CONNECTING + 1, MAX_CONNECTING + 5, false, 2 * (MAX_CONNECTING + 5), true, true, 0.3f, 0.1f, 0.1f),
338-
Arguments.of(MAX_CONNECTING + 1, MAX_CONNECTING + 5, true, 2 * (MAX_CONNECTING + 5), true, true, 0, 0.5f, 0.05f));
334+
Arguments.of(DEFAULT_MAX_CONNECTING, DEFAULT_MAX_CONNECTING, true, 8, true, true, 0, 0, 0),
335+
Arguments.of(DEFAULT_MAX_CONNECTING + 1, DEFAULT_MAX_CONNECTING + 5, true, 2 * (DEFAULT_MAX_CONNECTING + 5),
336+
true, true, 0.02f, 0, 0),
337+
Arguments.of(DEFAULT_MAX_CONNECTING + 5, DEFAULT_MAX_CONNECTING + 5, false, 2 * (DEFAULT_MAX_CONNECTING + 5),
338+
true, true, 0.02f, 0, 0), // (*)
339+
Arguments.of(DEFAULT_MAX_CONNECTING + 1, DEFAULT_MAX_CONNECTING + 5, false, 2 * (DEFAULT_MAX_CONNECTING + 5),
340+
true, true, 0.3f, 0.1f, 0.1f),
341+
Arguments.of(DEFAULT_MAX_CONNECTING + 1, DEFAULT_MAX_CONNECTING + 5, true, 2 * (DEFAULT_MAX_CONNECTING + 5),
342+
true, true, 0, 0.5f, 0.05f));
339343
}
340344

341345
@Test
@@ -346,14 +350,15 @@ public void callbackShouldNotBlockCheckoutIfOpenAsyncWorksNotInCurrentThread() t
346350
TestConnectionPoolListener listener = new TestConnectionPoolListener();
347351
provider = new DefaultConnectionPool(SERVER_ID, controllableConnFactory.factory,
348352
ConnectionPoolSettings.builder()
349-
.maxSize(MAX_CONNECTING + maxAvailableConnections)
353+
.maxSize(DEFAULT_MAX_CONNECTING + maxAvailableConnections)
350354
.addConnectionPoolListener(listener)
351355
.maxWaitTime(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS)
352356
.maintenanceInitialDelay(MAX_VALUE, NANOSECONDS)
353357
.build(),
354358
mockSdamProvider());
355359
provider.ready();
356-
acquireOpenPermits(provider, MAX_CONNECTING, InfiniteCheckoutEmulation.INFINITE_CALLBACK, controllableConnFactory, listener);
360+
acquireOpenPermits(provider, DEFAULT_MAX_CONNECTING, InfiniteCheckoutEmulation.INFINITE_CALLBACK,
361+
controllableConnFactory, listener);
357362
assertUseConcurrently(provider, 2 * maxAvailableConnections,
358363
true, true,
359364
0.02f, 0, 0,
@@ -364,7 +369,7 @@ public void callbackShouldNotBlockCheckoutIfOpenAsyncWorksNotInCurrentThread() t
364369
* The idea of this test is as follows:
365370
* <ol>
366371
* <li>Check out some connections from the pool
367-
* ({@link DefaultConnectionPool#MAX_CONNECTING} connections must not be checked out to make the next step possible).</li>
372+
* ({@link #DEFAULT_MAX_CONNECTING} connections must not be checked out to make the next step possible).</li>
368373
* <li>Acquire all permits to open a connection and leave them acquired.</li>
369374
* <li>Check in the checked out connections and concurrently check them out again.</li>
370375
* </ol>
@@ -380,7 +385,7 @@ public void checkoutHandOverMechanism() throws InterruptedException, TimeoutExce
380385
TestConnectionPoolListener listener = new TestConnectionPoolListener();
381386
provider = new DefaultConnectionPool(SERVER_ID, controllableConnFactory.factory,
382387
ConnectionPoolSettings.builder()
383-
.maxSize(MAX_CONNECTING
388+
.maxSize(DEFAULT_MAX_CONNECTING
384389
+ openConnectionsCount
385390
/* This wiggle room is needed to open opportunities to create new connections from the standpoint of
386391
* the max pool size, and then check that no connections were created nonetheless. */
@@ -395,7 +400,7 @@ public void checkoutHandOverMechanism() throws InterruptedException, TimeoutExce
395400
for (int i = 0; i < openConnectionsCount; i++) {
396401
connections.add(provider.get(0, NANOSECONDS));
397402
}
398-
acquireOpenPermits(provider, MAX_CONNECTING, InfiniteCheckoutEmulation.INFINITE_OPEN, controllableConnFactory, listener);
403+
acquireOpenPermits(provider, DEFAULT_MAX_CONNECTING, InfiniteCheckoutEmulation.INFINITE_OPEN, controllableConnFactory, listener);
399404
int previousIdx = 0;
400405
// concurrently check in / check out and assert the hand-over mechanism works
401406
for (int idx = 0; idx < connections.size(); idx += maxConcurrentlyHandedOver) {
@@ -542,7 +547,7 @@ private static void acquireOpenPermits(final DefaultConnectionPool pool, final i
542547
final InfiniteCheckoutEmulation infiniteEmulation,
543548
final ControllableConnectionFactory controllableConnFactory,
544549
final TestConnectionPoolListener listener) throws TimeoutException, InterruptedException {
545-
assertTrue(openPermitsCount <= MAX_CONNECTING);
550+
assertTrue(openPermitsCount <= DEFAULT_MAX_CONNECTING);
546551
int initialCreatedEventCount = listener.countEvents(ConnectionCreatedEvent.class);
547552
switch (infiniteEmulation) {
548553
case INFINITE_CALLBACK: {

0 commit comments

Comments
 (0)