Skip to content

Commit a82ce71

Browse files
jyeminstIncMale
andauthored
Add load balancer support to the reactive streams driver (#710)
Specification (version 1.0.0): https://github.com/mongodb/specifications/blob/master/source/load-balancers/load-balancers.rst Co-authored-by: Valentin Kovalenko <[email protected]>
1 parent 1db10dc commit a82ce71

File tree

109 files changed

+2658
-1472
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+2658
-1472
lines changed

.evergreen/run-load-balancer-tests.sh

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,44 @@ echo "Running $AUTH tests over $SSL and connecting to $SINGLE_MONGOS_LB_URI"
4141
echo "Running tests with ${JDK}"
4242
./gradlew -version
4343

44+
# Disabling errexit so that both gradle command will run.
45+
# Then we exit with non-zero if either of them exited with non-zero
46+
47+
set +o errexit
48+
49+
./gradlew -PjdkHome=/opt/java/${JDK} \
50+
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
51+
-Dorg.mongodb.test.transaction.uri=${MULTI_MONGOS_LB_URI} \
52+
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-sync:test \
53+
--tests LoadBalancerTest \
54+
--tests RetryableReadsTest \
55+
--tests RetryableWritesTest \
56+
--tests VersionedApiTest \
57+
--tests ChangeStreamsTest \
58+
--tests UnifiedCrudTest \
59+
--tests UnifiedTransactionsTest \
60+
--tests InitialDnsSeedlistDiscoveryTest
61+
first=$?
62+
echo $first
63+
4464
./gradlew -PjdkHome=/opt/java/${JDK} \
4565
-Dorg.mongodb.test.uri=${SINGLE_MONGOS_LB_URI} \
4666
-Dorg.mongodb.test.transaction.uri=${MULTI_MONGOS_LB_URI} \
47-
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-sync:test --tests LoadBalancerTest
67+
${GRADLE_EXTRA_VARS} --stacktrace --info --continue driver-reactive-stream:test \
68+
--tests LoadBalancerTest \
69+
--tests RetryableReadsTest \
70+
--tests RetryableWritesTest \
71+
--tests VersionedApiTest \
72+
--tests ChangeStreamsTest \
73+
--tests UnifiedCrudTest \
74+
--tests UnifiedTransactionsTest \
75+
--tests InitialDnsSeedlistDiscoveryTest
76+
second=$?
77+
78+
if [ $first -ne 0 ]; then
79+
exit $first
80+
elif [ $second -ne 0 ]; then
81+
exit $second
82+
else
83+
exit 0
84+
fi

driver-core/src/main/com/mongodb/internal/async/client/ClientSessionBinding.java

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@
2525
import com.mongodb.internal.binding.AsyncClusterAwareReadWriteBinding;
2626
import com.mongodb.internal.binding.AsyncConnectionSource;
2727
import com.mongodb.internal.binding.AsyncReadWriteBinding;
28+
import com.mongodb.internal.binding.TransactionContext;
2829
import com.mongodb.internal.connection.AsyncConnection;
29-
import com.mongodb.internal.connection.ServerTuple;
30-
import com.mongodb.internal.selector.ReadPreferenceServerSelector;
3130
import com.mongodb.internal.session.ClientSessionContext;
3231
import com.mongodb.internal.session.SessionContext;
3332
import com.mongodb.lang.Nullable;
3433

3534
import static com.mongodb.assertions.Assertions.notNull;
35+
import static com.mongodb.connection.ClusterType.LOAD_BALANCED;
3636

3737
public class ClientSessionBinding implements AsyncReadWriteBinding {
3838
private final AsyncClusterAwareReadWriteBinding wrapped;
@@ -55,16 +55,16 @@ public ReadPreference getReadPreference() {
5555

5656
@Override
5757
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
58-
if (isActiveShardedTxn()) {
59-
getPinnedConnectionSource(callback);
58+
if (isConnectionSourcePinningRequired()) {
59+
getPinnedConnectionSource(true, callback);
6060
} else {
6161
wrapped.getReadConnectionSource(new WrappingCallback(callback));
6262
}
6363
}
6464

6565
public void getWriteConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
66-
if (isActiveShardedTxn()) {
67-
getPinnedConnectionSource(callback);
66+
if (isConnectionSourcePinningRequired()) {
67+
getPinnedConnectionSource(false, callback);
6868
} else {
6969
wrapped.getWriteConnectionSource(new WrappingCallback(callback));
7070
}
@@ -81,20 +81,26 @@ public ServerApi getServerApi() {
8181
return wrapped.getServerApi();
8282
}
8383

84-
private void getPinnedConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
85-
if (session.getPinnedServerAddress() == null) {
86-
wrapped.getCluster().selectServerAsync(
87-
new ReadPreferenceServerSelector(wrapped.getReadPreference()), new SingleResultCallback<ServerTuple>() {
88-
@Override
89-
public void onResult(final ServerTuple serverTuple, final Throwable t) {
90-
if (t != null) {
91-
callback.onResult(null, t);
92-
} else {
93-
session.setPinnedServerAddress(serverTuple.getServerDescription().getAddress());
94-
wrapped.getConnectionSource(session.getPinnedServerAddress(), new WrappingCallback(callback));
95-
}
96-
}
97-
});
84+
private void getPinnedConnectionSource(final boolean isRead, final SingleResultCallback<AsyncConnectionSource> callback) {
85+
WrappingCallback wrappingCallback = new WrappingCallback(callback);
86+
TransactionContext<AsyncConnection> transactionContext = TransactionContext.get(session);
87+
if (transactionContext == null) {
88+
SingleResultCallback<AsyncConnectionSource> connectionSourceCallback = (result, t) -> {
89+
if (t != null) {
90+
wrappingCallback.onResult(null, t);
91+
} else {
92+
TransactionContext<AsyncConnection> newTransactionContext = new TransactionContext<>(
93+
wrapped.getCluster().getDescription().getType());
94+
session.setTransactionContext(result.getServerDescription().getAddress(), newTransactionContext);
95+
newTransactionContext.release(); // The session is responsible for retaining a reference to the context
96+
wrappingCallback.onResult(result, null);
97+
}
98+
};
99+
if (isRead) {
100+
wrapped.getReadConnectionSource(connectionSourceCallback);
101+
} else {
102+
wrapped.getWriteConnectionSource(connectionSourceCallback);
103+
}
98104
} else {
99105
wrapped.getConnectionSource(session.getPinnedServerAddress(), new WrappingCallback(callback));
100106
}
@@ -123,8 +129,9 @@ private void closeSessionIfCountIsZero() {
123129
}
124130
}
125131

126-
private boolean isActiveShardedTxn() {
127-
return session.hasActiveTransaction() && wrapped.getCluster().getDescription().getType() == ClusterType.SHARDED;
132+
private boolean isConnectionSourcePinningRequired() {
133+
ClusterType clusterType = wrapped.getCluster().getDescription().getType();
134+
return session.hasActiveTransaction() && (clusterType == ClusterType.SHARDED || clusterType == LOAD_BALANCED);
128135
}
129136

130137
private class SessionBindingAsyncConnectionSource implements AsyncConnectionSource {
@@ -152,7 +159,24 @@ public ServerApi getServerApi() {
152159

153160
@Override
154161
public void getConnection(final SingleResultCallback<AsyncConnection> callback) {
155-
wrapped.getConnection(callback);
162+
TransactionContext<AsyncConnection> transactionContext = TransactionContext.get(session);
163+
if (transactionContext != null && transactionContext.isConnectionPinningRequired()) {
164+
AsyncConnection pinnedConnection = transactionContext.getPinnedConnection();
165+
if (pinnedConnection == null) {
166+
wrapped.getConnection((connection, t) -> {
167+
if (t != null) {
168+
callback.onResult(null, t);
169+
} else {
170+
transactionContext.pinConnection(connection, AsyncConnection::markAsPinned);
171+
callback.onResult(connection, null);
172+
}
173+
});
174+
} else {
175+
callback.onResult(pinnedConnection.retain(), null);
176+
}
177+
} else {
178+
wrapped.getConnection(callback);
179+
}
156180
}
157181

158182
@Override
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.internal.binding;
17+
18+
import com.mongodb.connection.ClusterType;
19+
import com.mongodb.internal.connection.Connection;
20+
import com.mongodb.lang.Nullable;
21+
import com.mongodb.session.ClientSession;
22+
23+
import java.util.function.BiConsumer;
24+
25+
import static com.mongodb.connection.ClusterType.LOAD_BALANCED;
26+
27+
public final class TransactionContext<C extends ReferenceCounted> extends AbstractReferenceCounted {
28+
private final ClusterType clusterType;
29+
private C pinnedConnection;
30+
31+
public TransactionContext(final ClusterType clusterType) {
32+
this.clusterType = clusterType;
33+
}
34+
35+
@Nullable
36+
public C getPinnedConnection() {
37+
return pinnedConnection;
38+
}
39+
40+
@SuppressWarnings("unchecked")
41+
public void pinConnection(final C connection, final BiConsumer<C, Connection.PinningMode> markAsPinnedOperation) {
42+
this.pinnedConnection = (C) connection.retain(); // safe because of the `retain` method contract
43+
markAsPinnedOperation.accept(connection, Connection.PinningMode.TRANSACTION);
44+
}
45+
46+
public boolean isConnectionPinningRequired() {
47+
return clusterType == LOAD_BALANCED;
48+
}
49+
50+
@Override
51+
public void release() {
52+
super.release();
53+
if (getCount() == 0) {
54+
if (pinnedConnection != null) {
55+
pinnedConnection.release();
56+
}
57+
}
58+
}
59+
60+
@SuppressWarnings("unchecked")
61+
public static <C extends TransactionContext<? extends ReferenceCounted>> C get(final ClientSession session) {
62+
return (C) session.getTransactionContext();
63+
}
64+
}

driver-core/src/main/com/mongodb/internal/connection/AsyncConnection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,4 +172,6 @@ <T> void getMoreAsync(MongoNamespace namespace, long cursorId, int numberToRetur
172172
* @param callback the callback that is called once the cursors have been killed
173173
*/
174174
void killCursorAsync(MongoNamespace namespace, List<Long> cursors, SingleResultCallback<Void> callback);
175+
176+
void markAsPinned(Connection.PinningMode pinningMode);
175177
}

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ public void open() {
517517
@Override
518518
public void openAsync(final SingleResultCallback<Void> callback) {
519519
assertFalse(isClosed.get());
520+
connectionCreated(connectionPoolListener, wrapped.getDescription().getConnectionId());
520521
wrapped.openAsync((nullResult, failure) -> {
521522
if (failure != null) {
522523
closeAndHandleOpenFailure();
@@ -898,8 +899,6 @@ void openAsyncOrGetAvailable(
898899
connection.closeSilently();
899900
callback.onResult(availableConnection, null);
900901
} else {//acquired a permit, phase two
901-
connectionCreated(//a connection is considered created only when it is ready to be open
902-
connectionPoolListener, getId(connection));
903902
connection.openAsync((nullResult, failure) -> {
904903
releasePermit();
905904
if (failure != null) {

driver-core/src/main/com/mongodb/internal/connection/InternalConnectionInitializer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ interface InternalConnectionInitializer {
2525
InternalConnectionInitializationDescription finishHandshake(InternalConnection internalConnection,
2626
InternalConnectionInitializationDescription description);
2727

28-
void initializeAsync(InternalConnection internalConnection, SingleResultCallback<InternalConnectionInitializationDescription> callback);
28+
void startHandshakeAsync(InternalConnection internalConnection,
29+
SingleResultCallback<InternalConnectionInitializationDescription> callback);
2930

31+
void finishHandshakeAsync(InternalConnection internalConnection, InternalConnectionInitializationDescription description,
32+
SingleResultCallback<InternalConnectionInitializationDescription> callback);
3033
}

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -159,21 +159,10 @@ public void open() {
159159
stream.open();
160160

161161
InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this);
162-
description = initializationDescription.getConnectionDescription();
163-
initialServerDescription = initializationDescription.getServerDescription();
164-
165-
if (clusterConnectionMode == ClusterConnectionMode.LOAD_BALANCED) {
166-
generation = connectionGenerationSupplier.getGeneration(assertNotNull(description.getServiceId()));
167-
}
162+
initAfterHandshakeStart(initializationDescription);
168163

169164
initializationDescription = connectionInitializer.finishHandshake(this, initializationDescription);
170-
description = initializationDescription.getConnectionDescription();
171-
initialServerDescription = initializationDescription.getServerDescription();
172-
opened.set(true);
173-
sendCompressor = findSendCompressor(description);
174-
if (LOGGER.isInfoEnabled()) {
175-
LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
176-
}
165+
initAfterHandshakeFinish(initializationDescription);
177166
} catch (Throwable t) {
178167
close();
179168
if (t instanceof MongoException) {
@@ -192,24 +181,24 @@ public void openAsync(final SingleResultCallback<Void> callback) {
192181
stream.openAsync(new AsyncCompletionHandler<Void>() {
193182
@Override
194183
public void completed(final Void aVoid) {
195-
connectionInitializer.initializeAsync(InternalStreamConnection.this,
196-
new SingleResultCallback<InternalConnectionInitializationDescription>() {
197-
@Override
198-
public void onResult(final InternalConnectionInitializationDescription result, final Throwable t) {
199-
if (t != null) {
184+
connectionInitializer.startHandshakeAsync(InternalStreamConnection.this,
185+
(initialResult, initialException) -> {
186+
if (initialException != null) {
200187
close();
201-
callback.onResult(null, t);
188+
callback.onResult(null, initialException);
202189
} else {
203-
description = result.getConnectionDescription();
204-
initialServerDescription = result.getServerDescription();
205-
opened.set(true);
206-
sendCompressor = findSendCompressor(description);
207-
if (LOGGER.isInfoEnabled()) {
208-
LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
209-
}
210-
callback.onResult(null, null);
190+
initAfterHandshakeStart(initialResult);
191+
connectionInitializer.finishHandshakeAsync(InternalStreamConnection.this,
192+
initialResult, (completedResult, completedException) -> {
193+
if (completedException != null) {
194+
close();
195+
callback.onResult(null, completedException);
196+
} else {
197+
initAfterHandshakeFinish(completedResult);
198+
callback.onResult(null, null);
199+
}
200+
});
211201
}
212-
}
213202
});
214203
}
215204

@@ -223,6 +212,26 @@ public void failed(final Throwable t) {
223212
}
224213
}
225214

215+
216+
private void initAfterHandshakeStart(final InternalConnectionInitializationDescription initializationDescription) {
217+
description = initializationDescription.getConnectionDescription();
218+
initialServerDescription = initializationDescription.getServerDescription();
219+
220+
if (clusterConnectionMode == ClusterConnectionMode.LOAD_BALANCED) {
221+
generation = connectionGenerationSupplier.getGeneration(assertNotNull(description.getServiceId()));
222+
}
223+
}
224+
225+
private void initAfterHandshakeFinish(final InternalConnectionInitializationDescription initializationDescription) {
226+
description = initializationDescription.getConnectionDescription();
227+
initialServerDescription = initializationDescription.getServerDescription();
228+
opened.set(true);
229+
sendCompressor = findSendCompressor(description);
230+
if (LOGGER.isInfoEnabled()) {
231+
LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
232+
}
233+
}
234+
226235
private Map<Byte, Compressor> createCompressorMap(final List<MongoCompressor> compressorList) {
227236
Map<Byte, Compressor> compressorMap = new HashMap<Byte, Compressor>(this.compressorList.size());
228237

0 commit comments

Comments
 (0)