Skip to content

Commit 7d59cc8

Browse files
committed
JAVA-1202: Dedicate a single thread scheduled executor to each server being monitored, and , when changing the heartbeat frequency, ensure that the ServerStateNotifier is scheduled at most once by cancelling the existing schedule before creating the new one.
1 parent b718ddc commit 7d59cc8

File tree

7 files changed

+119
-106
lines changed

7 files changed

+119
-106
lines changed

src/main/com/mongodb/BaseCluster.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ public ClusterSettings getSettings() {
138138
public void close() {
139139
if (!isClosed()) {
140140
isClosed = true;
141-
serverFactory.close();
142141
phase.get().countDown();
143142
clusterListener.clusterClosed(new ClusterEvent(clusterId));
144143
}

src/main/com/mongodb/ClusterableServerFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,4 @@
1818

1919
interface ClusterableServerFactory {
2020
ClusterableServer create(ServerAddress serverAddress);
21-
22-
void close();
2321
}

src/main/com/mongodb/Clusters.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.mongodb;
1818

19-
import java.util.concurrent.ScheduledExecutorService;
20-
2119
/**
2220
* A factory for cluster implementations.
2321
*
@@ -27,11 +25,8 @@ private Clusters() {
2725
}
2826

2927
public static Cluster create(final String clusterId, final ClusterSettings settings, final ServerSettings serverSettings,
30-
final ScheduledExecutorService scheduledExecutorService,
31-
final ClusterListener clusterListener,
32-
final Mongo mongo) {
33-
ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(clusterId, serverSettings, scheduledExecutorService,
34-
mongo);
28+
final ClusterListener clusterListener, final Mongo mongo) {
29+
ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(clusterId, serverSettings, mongo);
3530

3631
if (settings.getMode() == ClusterConnectionMode.Single) {
3732
return new SingleServerCluster(clusterId, settings, serverFactory,

src/main/com/mongodb/DBTCPConnector.java

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@
2121
import java.net.SocketTimeoutException;
2222
import java.util.ArrayList;
2323
import java.util.List;
24-
import java.util.concurrent.Executors;
25-
import java.util.concurrent.ScheduledExecutorService;
26-
import java.util.concurrent.ThreadFactory;
2724
import java.util.concurrent.atomic.AtomicInteger;
2825

2926
import static com.mongodb.ClusterConnectionMode.Multiple;
@@ -47,7 +44,6 @@ public class DBTCPConnector implements DBConnector {
4744

4845
private final Mongo _mongo;
4946

50-
private ScheduledExecutorService scheduledExecutorService;
5147
private Cluster cluster;
5248

5349
private final MyPort _myPort = new MyPort();
@@ -73,10 +69,6 @@ public void start() {
7369
MongoOptions options = _mongo.getMongoOptions();
7470

7571
String clusterId = Integer.toString(NEXT_CLUSTER_ID.getAndIncrement());
76-
scheduledExecutorService = Executors.newScheduledThreadPool(options.heartbeatThreadCount > 0 ?
77-
options.heartbeatThreadCount :
78-
_mongo.getAuthority().getServerAddresses().size(),
79-
new DefaultThreadFactory(clusterId));
8072
cluster =
8173
Clusters.create(clusterId,
8274
ClusterSettings.builder()
@@ -94,7 +86,7 @@ public void start() {
9486
.socketFactory(_mongo.getMongoOptions().getSocketFactory())
9587
.build())
9688
.build(),
97-
scheduledExecutorService, null, _mongo);
89+
null, _mongo);
9890
}
9991

10092
/**
@@ -603,10 +595,6 @@ public void close(){
603595
cluster.close();
604596
cluster = null;
605597
}
606-
if (scheduledExecutorService != null) {
607-
scheduledExecutorService.shutdownNow();
608-
scheduledExecutorService = null;
609-
}
610598
}
611599

612600
/**
@@ -665,21 +653,4 @@ MyPort getMyPort() {
665653
private Server getServer(final ServerSelector serverSelector) {
666654
return cluster.getServer(serverSelector, getClusterWaitTimeMS(), MILLISECONDS);
667655
}
668-
669-
// Custom thread factory for scheduled executor service that creates daemon threads. Otherwise,
670-
// applications that neglect to close the MongoClient will not exit.
671-
static class DefaultThreadFactory implements ThreadFactory {
672-
private final AtomicInteger threadNumber = new AtomicInteger(1);
673-
private final String clusterId;
674-
675-
DefaultThreadFactory(final String clusterId) {
676-
this.clusterId = clusterId;
677-
}
678-
679-
public Thread newThread(Runnable runnable) {
680-
Thread t = new Thread(runnable, "cluster-" + clusterId + "-thread-" + threadNumber.getAndIncrement());
681-
t.setDaemon(true);
682-
return t;
683-
}
684-
}
685656
}

src/main/com/mongodb/DefaultClusterableServerFactory.java

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,16 @@
1616

1717
package com.mongodb;
1818

19-
import java.util.concurrent.ScheduledExecutorService;
20-
2119
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2220

2321
class DefaultClusterableServerFactory implements ClusterableServerFactory {
2422
private final String clusterId;
2523
private ServerSettings settings;
26-
private final ScheduledExecutorService scheduledExecutorService;
2724
private final Mongo mongo;
2825

29-
public DefaultClusterableServerFactory(final String clusterId, final ServerSettings settings,
30-
final ScheduledExecutorService scheduledExecutorService,
31-
final Mongo mongo) {
26+
public DefaultClusterableServerFactory(final String clusterId, final ServerSettings settings, final Mongo mongo) {
3227
this.clusterId = clusterId;
3328
this.settings = settings;
34-
this.scheduledExecutorService = scheduledExecutorService;
3529
this.mongo = mongo;
3630
}
3731

@@ -47,14 +41,9 @@ public ClusterableServer create(final ServerAddress serverAddress) {
4741
.maxWaitQueueSize(options.getConnectionsPerHost() * options.getThreadsAllowedToBlockForConnectionMultiplier())
4842
.maxWaitTime(options.getMaxWaitTime(), MILLISECONDS)
4943
.build();
50-
return new DefaultServer(serverAddress, settings,
44+
return new DefaultServer(serverAddress, settings, clusterId,
5145
new PooledConnectionProvider(clusterId, serverAddress, new DBPortFactory(options), connectionPoolSettings,
5246
new JMXConnectionPoolListener(mongo.getMongoOptions().getDescription())),
53-
scheduledExecutorService, mongo);
54-
}
55-
56-
@Override
57-
public void close() {
58-
scheduledExecutorService.shutdownNow();
47+
mongo);
5948
}
6049
}

src/main/com/mongodb/DefaultServer.java

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import java.util.Map;
2020
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.Executors;
2122
import java.util.concurrent.ScheduledExecutorService;
2223
import java.util.concurrent.ScheduledFuture;
24+
import java.util.concurrent.ThreadFactory;
2325
import java.util.concurrent.TimeUnit;
2426

2527
import static com.mongodb.ServerConnectionState.Connecting;
@@ -29,10 +31,30 @@
2931
import static org.bson.util.Assertions.notNull;
3032

3133
class DefaultServer implements ClusterableServer {
34+
35+
private final String clusterId;
36+
37+
private enum HeartbeatFrequency {
38+
NORMAL {
39+
@Override
40+
long getFrequencyMS(final ServerSettings settings) {
41+
return settings.getHeartbeatFrequency(MILLISECONDS);
42+
}
43+
},
44+
45+
RETRY {
46+
@Override
47+
long getFrequencyMS(final ServerSettings settings) {
48+
return settings.getHeartbeatConnectRetryFrequency(MILLISECONDS);
49+
}
50+
};
51+
52+
abstract long getFrequencyMS(final ServerSettings settings);
53+
}
54+
3255
private final ScheduledExecutorService scheduledExecutorService;
3356
private final ServerAddress serverAddress;
3457
private final ServerStateNotifier stateNotifier;
35-
private final ScheduledFuture<?> scheduledFuture;
3658
private final PooledConnectionProvider connectionProvider;
3759
private final Map<ChangeListener<ServerDescription>, Boolean> changeListeners =
3860
new ConcurrentHashMap<ChangeListener<ServerDescription>, Boolean>();
@@ -41,22 +63,22 @@ class DefaultServer implements ClusterableServer {
4163
private volatile ServerDescription description;
4264
private volatile boolean isClosed;
4365

66+
private ScheduledFuture<?> scheduledFuture;
67+
private HeartbeatFrequency currentFrequency;
68+
4469
public DefaultServer(final ServerAddress serverAddress,
4570
final ServerSettings settings,
46-
final PooledConnectionProvider connectionProvider,
47-
final ScheduledExecutorService scheduledExecutorService,
48-
Mongo mongo) {
71+
final String clusterId, final PooledConnectionProvider connectionProvider,
72+
final Mongo mongo) {
73+
this.clusterId = notNull("clusterId", clusterId);
4974
this.settings = notNull("settings", settings);
50-
51-
this.scheduledExecutorService = notNull("scheduledExecutorService", scheduledExecutorService);
5275
this.serverAddress = notNull("serverAddress", serverAddress);
5376
this.description = ServerDescription.builder().state(Connecting).address(serverAddress).build();
5477
serverStateListener = new DefaultServerStateListener();
5578
this.stateNotifier = new ServerStateNotifier(serverAddress, serverStateListener,
5679
settings.getHeartbeatSocketSettings(), mongo);
57-
this.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(stateNotifier, 0,
58-
settings.getHeartbeatFrequency(MILLISECONDS),
59-
MILLISECONDS);
80+
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory());
81+
setHeartbeat(0, HeartbeatFrequency.NORMAL);
6082
this.connectionProvider = connectionProvider;
6183
}
6284

@@ -87,14 +109,15 @@ public void invalidate() {
87109
serverStateListener.stateChanged(new ChangeEvent<ServerDescription>(description, ServerDescription.builder()
88110
.state(Connecting)
89111
.address(serverAddress).build()));
90-
scheduledExecutorService.submit(stateNotifier);
112+
setHeartbeat(0, HeartbeatFrequency.RETRY);
91113
connectionProvider.invalidate();
92114
}
93115

94116
@Override
95117
public void close() {
96118
if (!isClosed()) {
97119
scheduledFuture.cancel(true);
120+
scheduledExecutorService.shutdownNow();
98121
stateNotifier.close();
99122
connectionProvider.close();
100123
isClosed = true;
@@ -106,18 +129,44 @@ public boolean isClosed() {
106129
return isClosed;
107130
}
108131

132+
private void setHeartbeat(final ChangeEvent<ServerDescription> event) {
133+
HeartbeatFrequency heartbeatFrequency = event.getNewValue().getState() == Unconnected
134+
? HeartbeatFrequency.RETRY
135+
: HeartbeatFrequency.NORMAL;
136+
long initialDelay = heartbeatFrequency.getFrequencyMS(settings);
137+
setHeartbeat(initialDelay, heartbeatFrequency);
138+
}
139+
140+
private synchronized void setHeartbeat(final long initialDelay, final HeartbeatFrequency newFrequency) {
141+
if (currentFrequency != newFrequency) {
142+
currentFrequency = newFrequency;
143+
if (scheduledFuture != null) {
144+
scheduledFuture.cancel(false);
145+
}
146+
scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(stateNotifier, initialDelay,
147+
newFrequency.getFrequencyMS(settings),
148+
MILLISECONDS);
149+
}
150+
}
151+
152+
// Custom thread factory for scheduled executor service that creates daemon threads. Otherwise,
153+
// applications that neglect to close the MongoClient will not exit.
154+
class DefaultThreadFactory implements ThreadFactory {
155+
public Thread newThread(final Runnable runnable) {
156+
Thread t = new Thread(runnable, "cluster-" + clusterId + "-" + serverAddress);
157+
t.setDaemon(true);
158+
return t;
159+
}
160+
}
161+
109162
private final class DefaultServerStateListener implements ChangeListener<ServerDescription> {
110163
@Override
111164
public void stateChanged(final ChangeEvent<ServerDescription> event) {
112165
description = event.getNewValue();
113166
for (ChangeListener<ServerDescription> listener : changeListeners.keySet()) {
114167
listener.stateChanged(event);
115168
}
116-
if (event.getNewValue().getState() == Unconnected) {
117-
scheduledExecutorService.schedule(stateNotifier, settings.getHeartbeatConnectRetryFrequency(MILLISECONDS),
118-
MILLISECONDS);
119-
}
169+
setHeartbeat(event);
120170
}
121-
122171
}
123172
}

0 commit comments

Comments
 (0)