Skip to content

Commit 0e20f72

Browse files
committed
JAVA-1217: Instead of increasing the retry frequency for an unreachable server, always use the same frequency, and force an attempt to connect only when there is a failure of the server selector to find a matching server.
1 parent 38ec3b1 commit 0e20f72

12 files changed

+197
-175
lines changed

src/main/com/mongodb/BaseCluster.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,15 +76,13 @@ public Server getServer(final ServerSelector serverSelector, final long maxWaitT
7676
}
7777
}
7878

79-
if (!curDescription.isConnecting()) {
80-
throw new MongoServerSelectionException(format("Unable to connect to any server that matches %s", serverSelector));
81-
}
82-
8379
final long timeout = endTime - System.nanoTime();
8480

8581
LOGGER.info(format("No server chosen by %s from cluster description %s. Waiting for %d ms before timing out",
8682
serverSelector, curDescription, MILLISECONDS.convert(timeout, NANOSECONDS)));
8783

84+
connect();
85+
8886
if (!currentPhase.await(timeout, NANOSECONDS)) {
8987
throw new MongoTimeoutException(format("Timed out while waiting for a server that matches %s after %d ms",
9088
serverSelector, MILLISECONDS.convert(timeout, NANOSECONDS)));
@@ -107,11 +105,6 @@ public ClusterDescription getDescription(final long maxWaitTime, final TimeUnit
107105
ClusterDescription curDescription = description;
108106
final long endTime = System.nanoTime() + NANOSECONDS.convert(maxWaitTime, timeUnit);
109107
while (curDescription.getType() == ClusterType.Unknown) {
110-
111-
if (!curDescription.isConnecting()) {
112-
throw new MongoServerSelectionException(format("Unable to connect to any servers"));
113-
}
114-
115108
final long timeout = endTime - System.nanoTime();
116109

117110
LOGGER.info(format("Cluster description not yet available. Waiting for %d ms before timing out",
@@ -156,6 +149,11 @@ public boolean isClosed() {
156149
*/
157150
protected abstract ClusterableServer getServer(final ServerAddress serverAddress);
158151

152+
/**
153+
* Try to connect to all servers
154+
*/
155+
protected abstract void connect();
156+
159157
protected synchronized void updateDescription(final ClusterDescription newDescription) {
160158
LOGGER.fine(format("Updating cluster description to %s", newDescription.getShortDescription()));
161159

src/main/com/mongodb/ClusterDescription.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Set;
2626
import java.util.TreeSet;
2727

28-
import static com.mongodb.ServerConnectionState.Connecting;
2928
import static java.lang.String.format;
3029
import static org.bson.util.Assertions.notNull;
3130

@@ -75,20 +74,6 @@ public ClusterType getType() {
7574
return type;
7675
}
7776

78-
/**
79-
* Returns true if the application has been unsuccessful in its last attempt to connect to any of the servers in the cluster.
80-
*
81-
* @return true if connecting, false otherwise
82-
*/
83-
public boolean isConnecting() {
84-
for (final ServerDescription cur : all) {
85-
if (cur.getState() == Connecting) {
86-
return true;
87-
}
88-
}
89-
return false;
90-
}
91-
9277
/**
9378
* Returns the Set of all server descriptions in this cluster, sorted by the String value of the ServerAddress of each one.
9479
*

src/main/com/mongodb/ClusterableServer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,10 @@ interface ClusterableServer extends Server {
4747
* * @return whether the server is closed
4848
*/
4949
boolean isClosed();
50+
51+
/**
52+
* Attempt to connect to the server.
53+
*/
54+
void connect();
5055
}
5156

src/main/com/mongodb/DefaultServer.java

Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -18,65 +18,32 @@
1818

1919
import java.util.Map;
2020
import java.util.concurrent.ConcurrentHashMap;
21-
import java.util.concurrent.Executors;
22-
import java.util.concurrent.ScheduledExecutorService;
23-
import java.util.concurrent.ScheduledFuture;
24-
import java.util.concurrent.ThreadFactory;
2521
import java.util.concurrent.TimeUnit;
2622

2723
import static com.mongodb.ServerConnectionState.Connecting;
28-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2924
import static org.bson.util.Assertions.isTrue;
3025
import static org.bson.util.Assertions.notNull;
3126

3227
class DefaultServer implements ClusterableServer {
33-
34-
private enum HeartbeatFrequency {
35-
NORMAL {
36-
@Override
37-
long getFrequencyMS(final ServerSettings settings) {
38-
return settings.getHeartbeatFrequency(MILLISECONDS);
39-
}
40-
},
41-
42-
RETRY {
43-
@Override
44-
long getFrequencyMS(final ServerSettings settings) {
45-
return settings.getHeartbeatConnectRetryFrequency(MILLISECONDS);
46-
}
47-
};
48-
49-
abstract long getFrequencyMS(final ServerSettings settings);
50-
}
51-
52-
private final String clusterId;
53-
private final ScheduledExecutorService scheduledExecutorService;
5428
private final ServerAddress serverAddress;
55-
private final ServerStateNotifier stateNotifier;
29+
private final ServerMonitor serverMonitor;
5630
private final PooledConnectionProvider connectionProvider;
5731
private final Map<ChangeListener<ServerDescription>, Boolean> changeListeners =
5832
new ConcurrentHashMap<ChangeListener<ServerDescription>, Boolean>();
59-
private final ServerSettings settings;
6033
private final ChangeListener<ServerDescription> serverStateListener;
6134
private volatile ServerDescription description;
6235
private volatile boolean isClosed;
6336

64-
private ScheduledFuture<?> scheduledFuture;
65-
private HeartbeatFrequency currentFrequency;
66-
6737
public DefaultServer(final ServerAddress serverAddress,
6838
final ServerSettings settings,
6939
final String clusterId, final PooledConnectionProvider connectionProvider,
7040
final Mongo mongo) {
71-
this.clusterId = notNull("clusterId", clusterId);
72-
this.settings = notNull("settings", settings);
7341
this.serverAddress = notNull("serverAddress", serverAddress);
7442
this.description = ServerDescription.builder().state(Connecting).address(serverAddress).build();
7543
serverStateListener = new DefaultServerStateListener();
76-
this.stateNotifier = new ServerStateNotifier(serverAddress, serverStateListener,
77-
settings.getHeartbeatSocketSettings(), mongo);
78-
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory());
79-
setHeartbeat(0, HeartbeatFrequency.NORMAL);
44+
this.serverMonitor = new ServerMonitor(serverAddress, serverStateListener,
45+
settings.getHeartbeatSocketSettings(), settings, clusterId, mongo);
46+
this.serverMonitor.start();
8047
this.connectionProvider = connectionProvider;
8148
}
8249

@@ -107,16 +74,13 @@ public void invalidate() {
10774
serverStateListener.stateChanged(new ChangeEvent<ServerDescription>(description, ServerDescription.builder()
10875
.state(Connecting)
10976
.address(serverAddress).build()));
110-
setHeartbeat(0, HeartbeatFrequency.RETRY);
11177
connectionProvider.invalidate();
11278
}
11379

11480
@Override
11581
public void close() {
11682
if (!isClosed()) {
117-
scheduledFuture.cancel(true);
118-
scheduledExecutorService.shutdownNow();
119-
stateNotifier.close();
83+
serverMonitor.close();
12084
connectionProvider.close();
12185
isClosed = true;
12286
}
@@ -127,34 +91,9 @@ public boolean isClosed() {
12791
return isClosed;
12892
}
12993

130-
private void setHeartbeat(final ChangeEvent<ServerDescription> event) {
131-
HeartbeatFrequency heartbeatFrequency = event.getNewValue().getState() == Connecting
132-
? HeartbeatFrequency.RETRY
133-
: HeartbeatFrequency.NORMAL;
134-
long initialDelay = heartbeatFrequency.getFrequencyMS(settings);
135-
setHeartbeat(initialDelay, heartbeatFrequency);
136-
}
137-
138-
private synchronized void setHeartbeat(final long initialDelay, final HeartbeatFrequency newFrequency) {
139-
if (currentFrequency != newFrequency) {
140-
currentFrequency = newFrequency;
141-
if (scheduledFuture != null) {
142-
scheduledFuture.cancel(false);
143-
}
144-
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(stateNotifier, initialDelay,
145-
newFrequency.getFrequencyMS(settings),
146-
MILLISECONDS);
147-
}
148-
}
149-
150-
// Custom thread factory for scheduled executor service that creates daemon threads. Otherwise,
151-
// applications that neglect to close the MongoClient will not exit.
152-
class DefaultThreadFactory implements ThreadFactory {
153-
public Thread newThread(final Runnable runnable) {
154-
Thread t = new Thread(runnable, "cluster-" + clusterId + "-" + serverAddress);
155-
t.setDaemon(true);
156-
return t;
157-
}
94+
@Override
95+
public void connect() {
96+
serverMonitor.connect();
15897
}
15998

16099
private final class DefaultServerStateListener implements ChangeListener<ServerDescription> {
@@ -164,7 +103,6 @@ public void stateChanged(final ChangeEvent<ServerDescription> event) {
164103
for (ChangeListener<ServerDescription> listener : changeListeners.keySet()) {
165104
listener.stateChanged(event);
166105
}
167-
setHeartbeat(event);
168106
}
169107
}
170108
}

src/main/com/mongodb/MultiServerCluster.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,13 @@ public MultiServerCluster(final String clusterId, final ClusterSettings settings
7575
fireChangeEvent();
7676
}
7777

78+
@Override
79+
protected void connect() {
80+
for (ServerTuple cur : addressToServerTupleMap.values()) {
81+
cur.server.connect();
82+
}
83+
}
84+
7885
@Override
7986
public void close() {
8087
if (!isClosed()) {

0 commit comments

Comments
 (0)