Skip to content

Commit 06e567c

Browse files
committed
JAVA-1285: When server selection fails to find a match, await only for connectRetryFrequency, instead of maxWaitTime, so that attempts to re-connect to each server happen more frequently.
1 parent e10b6d7 commit 06e567c

File tree

6 files changed

+52
-29
lines changed

6 files changed

+52
-29
lines changed

src/main/com/mongodb/BaseCluster.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,34 +59,40 @@ public BaseCluster(final String clusterId, final ClusterSettings settings, final
5959
}
6060

6161
@Override
62-
public Server getServer(final ServerSelector serverSelector, final long maxWaitTime, final TimeUnit timeUnit) {
62+
public Server getServer(final ServerSelector serverSelector, final long maxWaitTime, final TimeUnit maxWaitTimeTimeUnit,
63+
final long connectRetryFrequency, final TimeUnit connectRetryFrequencyTimeUnit) {
6364
isTrue("open", !isClosed());
6465

6566
try {
6667
CountDownLatch currentPhase = phase.get();
6768
ClusterDescription curDescription = description;
6869
List<ServerDescription> serverDescriptions = serverSelector.choose(curDescription);
69-
final long endTime = System.nanoTime() + NANOSECONDS.convert(maxWaitTime, timeUnit);
70+
final long endTime = System.nanoTime() + NANOSECONDS.convert(maxWaitTime, maxWaitTimeTimeUnit);
71+
boolean selectionFailureLogged = false;
7072
while (true) {
7173
throwIfIncompatible(curDescription);
74+
7275
if (!serverDescriptions.isEmpty()) {
7376
ClusterableServer server = getRandomServer(new ArrayList<ServerDescription>(serverDescriptions));
7477
if (server != null) {
7578
return new WrappedServer(server);
7679
}
7780
}
7881

79-
final long timeout = endTime - System.nanoTime();
80-
81-
LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out",
82-
serverSelector, curDescription, MILLISECONDS.convert(timeout, NANOSECONDS)));
82+
if (System.nanoTime() > endTime) {
83+
throw new MongoTimeoutException(format("Timed out while waiting for a server that matches %s after %d ms",
84+
serverSelector, MILLISECONDS.convert(maxWaitTime, maxWaitTimeTimeUnit)));
85+
}
8386

87+
if (!selectionFailureLogged) {
88+
LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out",
89+
serverSelector, curDescription, MILLISECONDS.convert(maxWaitTime, maxWaitTimeTimeUnit)));
90+
selectionFailureLogged = true;
91+
}
8492
connect();
8593

86-
if (!currentPhase.await(timeout, NANOSECONDS)) {
87-
throw new MongoTimeoutException(format("Timed out while waiting for a server that matches %s after %d ms",
88-
serverSelector, MILLISECONDS.convert(maxWaitTime, timeUnit)));
89-
}
94+
currentPhase.await(connectRetryFrequency, connectRetryFrequencyTimeUnit);
95+
9096
currentPhase = phase.get();
9197
curDescription = description;
9298
serverDescriptions = serverSelector.choose(curDescription);
@@ -97,25 +103,32 @@ public Server getServer(final ServerSelector serverSelector, final long maxWaitT
97103
}
98104

99105
@Override
100-
public ClusterDescription getDescription(final long maxWaitTime, final TimeUnit timeUnit) {
106+
public ClusterDescription getDescription(final long maxWaitTime, final TimeUnit maxWaitTimeTimeUnit,
107+
final long connectRetryFrequency, final TimeUnit connectRetryFrequencyTimeUnit) {
101108
isTrue("open", !isClosed());
102109

103110
try {
104111
CountDownLatch currentPhase = phase.get();
105112
ClusterDescription curDescription = description;
106-
final long endTime = System.nanoTime() + NANOSECONDS.convert(maxWaitTime, timeUnit);
113+
final long endTime = System.nanoTime() + NANOSECONDS.convert(maxWaitTime, maxWaitTimeTimeUnit);
114+
boolean selectionFailureLogged = false;
107115
while (curDescription.getType() == ClusterType.Unknown) {
108-
final long timeout = endTime - System.nanoTime();
109116

110-
LOGGER.info(format("Cluster description not yet available. Waiting for %d ms before timing out",
111-
MILLISECONDS.convert(timeout, NANOSECONDS)));
117+
if (System.nanoTime() > endTime) {
118+
throw new MongoTimeoutException(format("Timed out while waiting to connect after %d ms",
119+
MILLISECONDS.convert(maxWaitTime, maxWaitTimeTimeUnit)));
120+
}
121+
122+
if (!selectionFailureLogged) {
123+
LOGGER.info(format("Cluster description not yet available. Waiting for %d ms before timing out",
124+
MILLISECONDS.convert(maxWaitTime, maxWaitTimeTimeUnit)));
125+
selectionFailureLogged = true;
126+
}
112127

113128
connect();
114129

115-
if (!currentPhase.await(timeout, NANOSECONDS)) {
116-
throw new MongoTimeoutException(format("Timed out while waiting to connect after %d ms",
117-
MILLISECONDS.convert(maxWaitTime, timeUnit)));
118-
}
130+
currentPhase.await(connectRetryFrequency, connectRetryFrequencyTimeUnit);
131+
119132
currentPhase = phase.get();
120133
curDescription = description;
121134
}

src/main/com/mongodb/Cluster.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020

2121
interface Cluster {
2222

23-
ClusterDescription getDescription(final long maxWaitTime, final TimeUnit timeUnit);
23+
ClusterDescription getDescription(final long maxWaitTime, final TimeUnit maxWaitTimeTimeUnit,
24+
long connectRetryFrequency, TimeUnit connectRetryFrequencyTimeUnit);
2425

25-
Server getServer(ServerSelector serverSelector, final long maxWaitTime, final TimeUnit timeUnit);
26+
Server getServer(ServerSelector serverSelector, final long maxWaitTime, final TimeUnit maxWaitTimeTimeUnit,
27+
long connectRetryFrequency, TimeUnit connectRetryFrequencyTimeUnit);
2628

2729
void close();
2830

src/main/com/mongodb/DBTCPConnector.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,8 @@ private boolean shouldRetryQuery(ReadPreference readPreference, final DBCollecti
393393
}
394394

395395
private ClusterDescription getClusterDescription() {
396-
return cluster.getDescription(getClusterWaitTimeMS(), MILLISECONDS);
396+
return cluster.getDescription(getClusterWaitTimeMS(), MILLISECONDS,
397+
_mongo.getMongoOptions().heartbeatConnectRetryFrequencyMS, MILLISECONDS);
397398
}
398399

399400
private int getClusterWaitTimeMS() {
@@ -651,6 +652,7 @@ MyPort getMyPort() {
651652
}
652653

653654
private Server getServer(final ServerSelector serverSelector) {
654-
return cluster.getServer(serverSelector, getClusterWaitTimeMS(), MILLISECONDS);
655+
return cluster.getServer(serverSelector, getClusterWaitTimeMS(), MILLISECONDS,
656+
_mongo.getMongoOptions().heartbeatConnectRetryFrequencyMS, MILLISECONDS);
655657
}
656658
}

src/main/com/mongodb/ServerMonitor.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ public void run() {
115115
if (!isClosed) {
116116
try {
117117
logStateChange(currentServerDescription, throwable);
118-
serverStateListener.stateChanged(new ChangeEvent<ServerDescription>(currentServerDescription,
119-
serverDescription));
118+
sendStateChangedEvent(currentServerDescription);
120119
} catch (Throwable t) {
121120
LOGGER.log(Level.WARNING, "Exception in monitor thread during notification of server state change", t);
122121
}
@@ -130,6 +129,13 @@ public void run() {
130129
}
131130
}
132131

132+
private void sendStateChangedEvent(final ServerDescription currentServerDescription) {
133+
if (!currentServerDescription.equals(serverDescription) ||
134+
currentServerDescription.getAverageLatencyNanos() != serverDescription.getAverageLatencyNanos()) {
135+
serverStateListener.stateChanged(new ChangeEvent<ServerDescription>(currentServerDescription, serverDescription));
136+
}
137+
}
138+
133139
private void logStateChange(final ServerDescription currentServerDescription, final Throwable throwable) {
134140
// Note that the ServerDescription.equals method does not include the average ping time as part of the comparison,
135141
// so this will not spam the logs too hard.

src/test/com/mongodb/MultiServerClusterSpecification.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class MultiServerClusterSpecification extends Specification {
280280
cluster.close()
281281

282282
when:
283-
cluster.getServer(new ReadPreferenceServerSelector(ReadPreference.primary()), 1, SECONDS)
283+
cluster.getServer(new ReadPreferenceServerSelector(ReadPreference.primary()), 1, SECONDS, 1, SECONDS)
284284

285285
then:
286286
thrown(IllegalStateException)
@@ -428,7 +428,7 @@ class MultiServerClusterSpecification extends Specification {
428428
}
429429

430430
def getClusterDescription(MultiServerCluster cluster) {
431-
cluster.getDescription(1, MILLISECONDS)
431+
cluster.getDescription(1, MILLISECONDS, 1, MILLISECONDS)
432432
}
433433

434434
def getServerDescription(ServerAddress server) {

src/test/com/mongodb/SingleServerClusterSpecification.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ class SingleServerClusterSpecification extends Specification {
151151
sendNotification(firstServer, getBuilder(firstServer).minWireVersion(1000).maxWireVersion(1000).build())
152152

153153
when:
154-
cluster.getServer(new ReadPreferenceServerSelector(ReadPreference.primary()), 1, SECONDS)
154+
cluster.getServer(new ReadPreferenceServerSelector(ReadPreference.primary()), 1, SECONDS, 1, SECONDS)
155155

156156
then:
157157
thrown(MongoIncompatibleDriverException)
@@ -186,7 +186,7 @@ class SingleServerClusterSpecification extends Specification {
186186
}
187187

188188
def getClusterDescription(Cluster cluster) {
189-
cluster.getDescription(1, MILLISECONDS)
189+
cluster.getDescription(1, MILLISECONDS, 1, MILLISECONDS)
190190
}
191191

192192
def getServerDescriptions() {

0 commit comments

Comments
 (0)