Skip to content

Commit 8d661c0

Browse files
committed
JAVA-2666: Allow an application to register a server selector that is applied after the suitable servers are selected and before servers within the latency window are selected
1 parent 3e38ac7 commit 8d661c0

File tree

6 files changed

+139
-14
lines changed

6 files changed

+139
-14
lines changed

docs/reference/content/whats-new.md

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,27 +11,32 @@ title = "What's New"
1111

1212
Key new features of the 3.6 Java driver release:
1313

14-
### Compression
15-
16-
The 3.6 release adds support for compression of messages to and from appropriately configured MongoDB servers:
17-
18-
* [Compression Tutorial]({{<ref "driver/tutorials/compression.md">}})
19-
* [Compression Tutorial (Async)]({{<ref "driver-async/tutorials/compression.md">}})
20-
2114
### Change Stream support
2215

2316
The 3.6 release adds support for [change streams](https://docs.mongodb.com/manual/operator/aggregation/changeStream).
2417

2518
* [Change Stream Quick Start]({{<ref "driver/tutorials/change-streams.md">}})
2619
* [Change Stream Quick Start (Async)]({{<ref "driver-async/tutorials/change-streams.md">}})
2720

21+
### Retryable writes
22+
23+
The 3.6 release adds support for retryable writes using the `retryWrites` option in
24+
[`MongoClientOptions`]({{<apiref "com/mongodb/MongoClientOptions">}}).
25+
26+
### Compression
27+
28+
The 3.6 release adds support for compression of messages to and from appropriately configured MongoDB servers:
29+
30+
* [Compression Tutorial]({{<ref "driver/tutorials/compression.md">}})
31+
* [Compression Tutorial (Async)]({{<ref "driver-async/tutorials/compression.md">}})
32+
2833
### Causal consistency
2934

3035
The 3.6 release adds support for causally consistency.
3136

32-
### Retryable writes
37+
### Application-configured server selection
3338

34-
The 3.6 release adds support for retryable writes using the `retryWrites` option in
39+
The 3.6 release adds support for application-configured control over server selection, using the `serverSelector` option in
3540
[`MongoClientOptions`]({{<apiref "com/mongodb/MongoClientOptions">}}).
3641

3742
## What's New in 3.5

driver-core/src/main/com/mongodb/selector/ServerSelector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import java.util.List;
2424

2525
/**
26-
* <p>An interface for selecting a server from a cluster according some preference.</p>
26+
* <p>An interface for selecting a server from a cluster according to some preference.</p>
2727
*
2828
* <p>Implementations of this interface should ensure that their equals and hashCode methods compare equal preferences as equal, as users of
2929
* this interface may rely on that behavior to efficiently consolidate handling of multiple requests waiting on a server that can satisfy

driver/src/main/com/mongodb/Mongo.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@
4444
import com.mongodb.operation.ListDatabasesOperation;
4545
import com.mongodb.operation.ReadOperation;
4646
import com.mongodb.operation.WriteOperation;
47+
import com.mongodb.selector.CompositeServerSelector;
4748
import com.mongodb.selector.LatencyMinimizingServerSelector;
4849
import com.mongodb.selector.ServerSelector;
4950
import com.mongodb.session.ClientSession;
5051
import org.bson.BsonBoolean;
5152

5253
import java.util.ArrayList;
54+
import java.util.Arrays;
5355
import java.util.Collection;
5456
import java.util.Collections;
5557
import java.util.List;
@@ -757,7 +759,7 @@ private static ClusterSettings getClusterSettings(final List<ServerAddress> seed
757759
.mode(clusterConnectionMode)
758760
.requiredReplicaSetName(options.getRequiredReplicaSetName())
759761
.serverSelectionTimeout(options.getServerSelectionTimeout(), MILLISECONDS)
760-
.serverSelector(new LatencyMinimizingServerSelector(options.getLocalThreshold(), MILLISECONDS))
762+
.serverSelector(getServerSelector(options))
761763
.description(options.getDescription())
762764
.maxWaitQueueSize(options.getConnectionPoolSettings().getMaxWaitQueueSize());
763765
for (ClusterListener clusterListener: options.getClusterListeners()) {
@@ -766,6 +768,17 @@ private static ClusterSettings getClusterSettings(final List<ServerAddress> seed
766768
return builder.build();
767769
}
768770

771+
private static ServerSelector getServerSelector(final MongoClientOptions options) {
772+
LatencyMinimizingServerSelector latencyMinimizingServerSelector =
773+
new LatencyMinimizingServerSelector(options.getLocalThreshold(), MILLISECONDS);
774+
775+
if (options.getServerSelector() == null) {
776+
return latencyMinimizingServerSelector;
777+
}
778+
779+
return new CompositeServerSelector(Arrays.asList(options.getServerSelector(), latencyMinimizingServerSelector));
780+
}
781+
769782
Cluster getCluster() {
770783
return cluster;
771784
}

driver/src/main/com/mongodb/MongoClientOptions.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.mongodb.event.ConnectionPoolListener;
2828
import com.mongodb.event.ServerListener;
2929
import com.mongodb.event.ServerMonitorListener;
30+
import com.mongodb.selector.ServerSelector;
3031
import org.bson.codecs.configuration.CodecRegistry;
3132

3233
import javax.net.SocketFactory;
@@ -65,6 +66,7 @@ public class MongoClientOptions {
6566
private final boolean retryWrites;
6667
private final ReadConcern readConcern;
6768
private final CodecRegistry codecRegistry;
69+
private final ServerSelector serverSelector;
6870

6971
private final int minConnectionsPerHost;
7072
private final int maxConnectionsPerHost;
@@ -121,6 +123,7 @@ private MongoClientOptions(final Builder builder) {
121123
retryWrites = builder.retryWrites;
122124
readConcern = builder.readConcern;
123125
codecRegistry = builder.codecRegistry;
126+
serverSelector = builder.serverSelector;
124127
sslEnabled = builder.sslEnabled;
125128
sslInvalidHostNameAllowed = builder.sslInvalidHostNameAllowed;
126129
sslContext = builder.sslContext;
@@ -564,6 +567,34 @@ public CodecRegistry getCodecRegistry() {
564567
return codecRegistry;
565568
}
566569

570+
/**
571+
* Gets the server selector.
572+
*
573+
* <p>The server selector augments the normal server selection rules applied by the driver when determining
574+
* which server to send an operation to. At the point that it's called by the driver, the
575+
* {@link com.mongodb.connection.ClusterDescription} which is passed to it contains a list of
576+
* {@link com.mongodb.connection.ServerDescription} instances which satisfy either the configured {@link ReadPreference} for any
577+
* read operation or ones that can take writes (e.g. a standalone, mongos, or replica set primary).</p>
578+
* <p>The server selector can then filter the {@code ServerDescription} list using whatever criteria that is required by the
579+
* application.</p>
580+
* <p>After this selector executes, two additional selectors are applied by the driver:</p>
581+
* <ul>
582+
* <li>select from within the latency window</li>
583+
* <li>select a random server from those remaining</li>
584+
* </ul>
585+
* <p>To skip the latency window selector, an application can:</p>
586+
* <ul>
587+
* <li>configure the local threshold to a sufficiently high value so that it doesn't exclude any servers</li>
588+
* <li>return a list containing a single server from this selector (which will also make the random member selector a no-op)</li>
589+
* </ul>
590+
*
591+
* @return the server selector, which may be null
592+
* @since 3.6
593+
*/
594+
public ServerSelector getServerSelector() {
595+
return serverSelector;
596+
}
597+
567598
/**
568599
* Gets the list of added {@code ClusterListener}. The default is an empty list.
569600
*
@@ -799,6 +830,9 @@ public boolean equals(final Object o) {
799830
if (!codecRegistry.equals(that.codecRegistry)) {
800831
return false;
801832
}
833+
if (serverSelector != null ? !serverSelector.equals(that.serverSelector) : that.serverSelector != null) {
834+
return false;
835+
}
802836
if (!clusterListeners.equals(that.clusterListeners)) {
803837
return false;
804838
}
@@ -828,6 +862,7 @@ public int hashCode() {
828862
result = 31 * result + (retryWrites ? 1 : 0);
829863
result = 31 * result + (readConcern != null ? readConcern.hashCode() : 0);
830864
result = 31 * result + codecRegistry.hashCode();
865+
result = 31 * result + (serverSelector != null ? serverSelector.hashCode() : 0);
831866
result = 31 * result + clusterListeners.hashCode();
832867
result = 31 * result + commandListeners.hashCode();
833868
result = 31 * result + minConnectionsPerHost;
@@ -869,6 +904,7 @@ public String toString() {
869904
+ ", retryWrites=" + retryWrites
870905
+ ", readConcern=" + readConcern
871906
+ ", codecRegistry=" + codecRegistry
907+
+ ", serverSelector=" + serverSelector
872908
+ ", clusterListeners=" + clusterListeners
873909
+ ", commandListeners=" + commandListeners
874910
+ ", minConnectionsPerHost=" + minConnectionsPerHost
@@ -923,7 +959,7 @@ public static class Builder {
923959
private boolean retryWrites = false;
924960
private ReadConcern readConcern = ReadConcern.DEFAULT;
925961
private CodecRegistry codecRegistry = MongoClient.getDefaultCodecRegistry();
926-
962+
private ServerSelector serverSelector;
927963
private int minConnectionsPerHost;
928964
private int maxConnectionsPerHost = 100;
929965
private int threadsAllowedToBlockForConnectionMultiplier = 5;
@@ -987,6 +1023,7 @@ public Builder(final MongoClientOptions options) {
9871023
retryWrites = options.getRetryWrites();
9881024
readConcern = options.getReadConcern();
9891025
codecRegistry = options.getCodecRegistry();
1026+
serverSelector = options.getServerSelector();
9901027
sslEnabled = options.isSslEnabled();
9911028
sslInvalidHostNameAllowed = options.isSslInvalidHostNameAllowed();
9921029
sslContext = options.getSslContext();
@@ -1313,6 +1350,19 @@ public Builder codecRegistry(final CodecRegistry codecRegistry) {
13131350
return this;
13141351
}
13151352

1353+
/**
1354+
* Sets a server selector that augments the normal server selection rules applied by the driver when determining
1355+
* which server to send an operation to. See {@link #getServerSelector()} for further details.
1356+
*
1357+
* @param serverSelector the server selector
1358+
* @return this
1359+
* @since 3.6
1360+
* @see #getServerSelector()
1361+
*/
1362+
public Builder serverSelector(final ServerSelector serverSelector) {
1363+
this.serverSelector = serverSelector;
1364+
return this;
1365+
}
13161366
/**
13171367
* Adds the given command listener.
13181368
*

driver/src/test/functional/com/mongodb/MongoClientsSpecification.groovy

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,19 @@
1818
package com.mongodb
1919

2020
import com.mongodb.client.MongoDriverInformation
21+
import com.mongodb.connection.ClusterDescription
22+
import com.mongodb.connection.ServerDescription
23+
import com.mongodb.event.CommandFailedEvent
24+
import com.mongodb.event.CommandListener
25+
import com.mongodb.event.CommandStartedEvent
26+
import com.mongodb.event.CommandSucceededEvent
2127
import org.bson.Document
2228
import spock.lang.IgnoreIf
2329

30+
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
2431
import static com.mongodb.ClusterFixture.isStandalone
2532
import static com.mongodb.ClusterFixture.serverVersionAtLeast
33+
import static com.mongodb.Fixture.getDefaultDatabaseName
2634
import static com.mongodb.Fixture.getMongoClientURI
2735

2836
class MongoClientsSpecification extends FunctionalSpecification {
@@ -52,4 +60,48 @@ class MongoClientsSpecification extends FunctionalSpecification {
5260
profileCollection?.drop()
5361
client?.close()
5462
}
63+
64+
@IgnoreIf({ !isDiscoverableReplicaSet() })
65+
def 'should use server selector from MongoClientOptions'() {
66+
given:
67+
def expectedWinner
68+
def actualWinningAddresses = [] as Set
69+
def optionsBuilder = MongoClientOptions.builder()
70+
// select the suitable server with the highest port number
71+
.serverSelector { ClusterDescription clusterDescription ->
72+
for (ServerDescription cur : clusterDescription.getServerDescriptions()) {
73+
if (expectedWinner == null || cur.address.port > expectedWinner.address.port) {
74+
expectedWinner = cur
75+
}
76+
}
77+
expectedWinner == null ? [] : [expectedWinner]
78+
}.addCommandListener(new CommandListener() {
79+
// record each address actually used
80+
@Override
81+
void commandStarted(final CommandStartedEvent event) {
82+
actualWinningAddresses.add(event.connectionDescription.connectionId.serverId.address)
83+
}
84+
85+
@Override
86+
void commandSucceeded(final CommandSucceededEvent event) {
87+
}
88+
89+
@Override
90+
void commandFailed(final CommandFailedEvent event) {
91+
}
92+
})
93+
94+
def client = new MongoClient(getMongoClientURI(optionsBuilder))
95+
def collection = client.getDatabase(getDefaultDatabaseName()).getCollection(getCollectionName())
96+
.withReadPreference(ReadPreference.nearest())
97+
98+
when:
99+
for (int i = 0; i < 10; i++) {
100+
collection.count()
101+
}
102+
103+
then:
104+
actualWinningAddresses.size() == 1
105+
actualWinningAddresses.contains(expectedWinner.address)
106+
}
55107
}

driver/src/test/unit/com/mongodb/MongoClientOptionsSpecification.groovy

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.mongodb.event.CommandListener
2525
import com.mongodb.event.ConnectionPoolListener
2626
import com.mongodb.event.ServerListener
2727
import com.mongodb.event.ServerMonitorListener
28+
import com.mongodb.selector.ServerSelector
2829
import spock.lang.IgnoreIf
2930
import spock.lang.Specification
3031

@@ -56,6 +57,7 @@ class MongoClientOptionsSpecification extends Specification {
5657
options.getConnectionsPerHost() == 100
5758
options.getConnectTimeout() == 10000
5859
options.getReadPreference() == ReadPreference.primary()
60+
options.getServerSelector() == null;
5961
options.getThreadsAllowedToBlockForConnectionMultiplier() == 5
6062
options.isSocketKeepAlive()
6163
!options.isSslEnabled()
@@ -157,6 +159,7 @@ class MongoClientOptionsSpecification extends Specification {
157159
given:
158160
def encoderFactory = new MyDBEncoderFactory()
159161
def socketFactory = SSLSocketFactory.getDefault()
162+
def serverSelector = Mock(ServerSelector)
160163
def options = MongoClientOptions.builder()
161164
.description('test')
162165
.applicationName('appName')
@@ -167,6 +170,7 @@ class MongoClientOptionsSpecification extends Specification {
167170
.connectionsPerHost(500)
168171
.connectTimeout(100)
169172
.socketTimeout(700)
173+
.serverSelector(serverSelector)
170174
.serverSelectionTimeout(150)
171175
.maxWaitTime(200)
172176
.maxConnectionIdleTime(300)
@@ -194,6 +198,7 @@ class MongoClientOptionsSpecification extends Specification {
194198
options.getApplicationName() == 'appName'
195199
options.getReadPreference() == ReadPreference.secondary()
196200
options.getWriteConcern() == WriteConcern.JOURNALED
201+
options.getServerSelector() == serverSelector
197202
options.getRetryWrites()
198203
options.getServerSelectionTimeout() == 150
199204
options.getMaxWaitTime() == 200
@@ -656,8 +661,8 @@ class MongoClientOptionsSpecification extends Specification {
656661
'description', 'heartbeatConnectTimeout', 'heartbeatFrequency', 'heartbeatSocketTimeout', 'localThreshold',
657662
'maxConnectionIdleTime', 'maxConnectionLifeTime', 'maxConnectionsPerHost', 'maxWaitTime', 'minConnectionsPerHost',
658663
'minHeartbeatFrequency', 'readConcern', 'readPreference', 'requiredReplicaSetName', 'retryWrites',
659-
'serverListeners', 'serverMonitorListeners', 'serverSelectionTimeout', 'socketFactory', 'socketKeepAlive',
660-
'socketTimeout', 'sslContext', 'sslEnabled', 'sslInvalidHostNameAllowed',
664+
'serverListeners', 'serverMonitorListeners', 'serverSelectionTimeout', 'serverSelector', 'socketFactory',
665+
'socketKeepAlive', 'socketTimeout', 'sslContext', 'sslEnabled', 'sslInvalidHostNameAllowed',
661666
'threadsAllowedToBlockForConnectionMultiplier', 'writeConcern']
662667

663668
then:

0 commit comments

Comments
 (0)