Skip to content

Commit 3b215b5

Browse files
committed
Poll for DNS SRV record updates
JAVA-2927
1 parent 7de6b2d commit 3b215b5

23 files changed

+964
-215
lines changed

driver-async/src/test/functional/com/mongodb/async/client/InitialDnsSeedlistDiscoveryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import com.mongodb.ConnectionString;
2121
import com.mongodb.MongoClientException;
2222
import com.mongodb.MongoClientSettings;
23-
import com.mongodb.MongoConfigurationException;
23+
import com.mongodb.MongoException;
2424
import com.mongodb.ServerAddress;
2525
import com.mongodb.async.SingleResultCallback;
2626
import com.mongodb.connection.ClusterSettings;
@@ -86,7 +86,7 @@ public void shouldResolve() throws InterruptedException {
8686
if (isError) {
8787
MongoClient client = null;
8888
try {
89-
final AtomicReference<MongoConfigurationException> exceptionReference = new AtomicReference<MongoConfigurationException>();
89+
final AtomicReference<MongoException> exceptionReference = new AtomicReference<MongoException>();
9090
final CountDownLatch latch = new CountDownLatch(1);
9191

9292
ConnectionString connectionString = new ConnectionString(uri);

driver-core/src/main/com/mongodb/ConnectionString.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.connection.StreamFactoryFactory;
2020
import com.mongodb.diagnostics.logging.Logger;
2121
import com.mongodb.diagnostics.logging.Loggers;
22+
import com.mongodb.internal.dns.DefaultDnsResolver;
2223
import com.mongodb.lang.Nullable;
2324

2425
import java.io.UnsupportedEncodingException;
@@ -32,7 +33,6 @@
3233
import java.util.Set;
3334
import java.util.concurrent.TimeUnit;
3435

35-
import static com.mongodb.internal.dns.DnsResolver.resolveAdditionalQueryParametersFromTxtRecords;
3636
import static java.lang.String.format;
3737
import static java.util.Arrays.asList;
3838
import static java.util.Collections.singletonList;
@@ -361,7 +361,8 @@ public ConnectionString(final String connectionString) {
361361
collection = null;
362362
}
363363

364-
String txtRecordsQueryParameters = isSrvProtocol ? resolveAdditionalQueryParametersFromTxtRecords(unresolvedHosts.get(0)) : "";
364+
String txtRecordsQueryParameters = isSrvProtocol
365+
? new DefaultDnsResolver().resolveAdditionalQueryParametersFromTxtRecords(unresolvedHosts.get(0)) : "";
365366
String connectionStringQueryParamenters = unprocessedConnectionString;
366367

367368
Map<String, List<String>> connectionStringOptionsMap = parseOptions(connectionStringQueryParamenters);

driver-core/src/main/com/mongodb/connection/ClusterDescription.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.mongodb.connection;
1818

19-
import com.mongodb.MongoConfigurationException;
19+
import com.mongodb.MongoException;
2020
import com.mongodb.ReadPreference;
2121
import com.mongodb.ServerAddress;
2222
import com.mongodb.TagSet;
@@ -46,7 +46,7 @@ public class ClusterDescription {
4646
private final List<ServerDescription> serverDescriptions;
4747
private final ClusterSettings clusterSettings;
4848
private final ServerSettings serverSettings;
49-
private final MongoConfigurationException srvResolutionException;
49+
private final MongoException srvResolutionException;
5050

5151
/**
5252
* Creates a new ClusterDescription.
@@ -89,7 +89,7 @@ public ClusterDescription(final ClusterConnectionMode connectionMode, final Clus
8989
* @since 3.10
9090
*/
9191
public ClusterDescription(final ClusterConnectionMode connectionMode, final ClusterType type,
92-
final MongoConfigurationException srvResolutionException,
92+
final MongoException srvResolutionException,
9393
final List<ServerDescription> serverDescriptions,
9494
final ClusterSettings clusterSettings,
9595
final ServerSettings serverSettings) {
@@ -214,7 +214,7 @@ public ClusterType getType() {
214214
* @return any exception encountered while resolving the SRV record for the initial host, or null if none
215215
* @since 3.10
216216
*/
217-
public MongoConfigurationException getSrvResolutionException() {
217+
public MongoException getSrvResolutionException() {
218218
return srvResolutionException;
219219
}
220220

driver-core/src/main/com/mongodb/connection/DefaultClusterFactory.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import com.mongodb.event.ConnectionPoolListener;
2525
import com.mongodb.internal.connection.ClusterableServerFactory;
2626
import com.mongodb.internal.connection.DefaultClusterableServerFactory;
27-
import com.mongodb.internal.connection.MultiServerCluster;
27+
import com.mongodb.internal.connection.DefaultDnsSrvRecordMonitorFactory;
28+
import com.mongodb.internal.connection.DnsMultiServerCluster;
29+
import com.mongodb.internal.connection.DnsSrvRecordMonitorFactory;
30+
import com.mongodb.internal.connection.StableMultiServerCluster;
2831
import com.mongodb.internal.connection.SingleServerCluster;
2932

3033
import java.util.Collections;
@@ -183,10 +186,16 @@ public Cluster createCluster(final ClusterSettings clusterSettings, final Server
183186
connectionPoolSettings, streamFactory, heartbeatStreamFactory, credentialList, commandListener, applicationName,
184187
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList);
185188

189+
DnsSrvRecordMonitorFactory dnsSrvRecordMonitorFactory = new DefaultDnsSrvRecordMonitorFactory(clusterId, serverSettings);
190+
186191
if (clusterSettings.getMode() == ClusterConnectionMode.SINGLE) {
187192
return new SingleServerCluster(clusterId, clusterSettings, serverFactory);
188193
} else if (clusterSettings.getMode() == ClusterConnectionMode.MULTIPLE) {
189-
return new MultiServerCluster(clusterId, clusterSettings, serverFactory);
194+
if (clusterSettings.getSrvHost() == null) {
195+
return new StableMultiServerCluster(clusterId, clusterSettings, serverFactory);
196+
} else {
197+
return new DnsMultiServerCluster(clusterId, clusterSettings, serverFactory, dnsSrvRecordMonitorFactory);
198+
}
190199
} else {
191200
throw new UnsupportedOperationException("Unsupported cluster mode: " + clusterSettings.getMode());
192201
}

driver-core/src/main/com/mongodb/internal/connection/MultiServerCluster.java renamed to driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java

Lines changed: 47 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.mongodb.internal.connection;
1818

19-
import com.mongodb.MongoConfigurationException;
19+
import com.mongodb.MongoException;
2020
import com.mongodb.ServerAddress;
2121
import com.mongodb.connection.ClusterConnectionMode;
2222
import com.mongodb.connection.ClusterDescription;
@@ -32,6 +32,7 @@
3232
import org.bson.types.ObjectId;
3333

3434
import java.util.ArrayList;
35+
import java.util.Collection;
3536
import java.util.Collections;
3637
import java.util.HashSet;
3738
import java.util.Iterator;
@@ -47,14 +48,9 @@
4748
import static com.mongodb.connection.ServerType.REPLICA_SET_GHOST;
4849
import static com.mongodb.connection.ServerType.SHARD_ROUTER;
4950
import static com.mongodb.connection.ServerType.STANDALONE;
50-
import static com.mongodb.internal.connection.ServerAddressHelper.createServerAddress;
51-
import static com.mongodb.internal.dns.DnsResolver.resolveHostFromSrvRecords;
5251
import static java.lang.String.format;
5352

54-
/**
55-
* This class needs to be final because we are leaking a reference to "this" from the constructor
56-
*/
57-
public final class MultiServerCluster extends BaseCluster {
53+
public abstract class AbstractMultiServerCluster extends BaseCluster {
5854
private static final Logger LOGGER = Loggers.getLogger("cluster");
5955

6056
private ClusterType clusterType;
@@ -64,7 +60,6 @@ public final class MultiServerCluster extends BaseCluster {
6460

6561
private final ConcurrentMap<ServerAddress, ServerTuple> addressToServerTupleMap =
6662
new ConcurrentHashMap<ServerAddress, ServerTuple>();
67-
private volatile MongoConfigurationException srvResolutionException;
6863

6964
private static final class ServerTuple {
7065
private final ClusterableServer server;
@@ -76,7 +71,7 @@ private ServerTuple(final ClusterableServer server, final ServerDescription desc
7671
}
7772
}
7873

79-
public MultiServerCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory) {
74+
AbstractMultiServerCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory) {
8075
super(clusterId, settings, serverFactory);
8176
isTrue("connection mode is multiple", settings.getMode() == ClusterConnectionMode.MULTIPLE);
8277
clusterType = settings.getRequiredClusterType();
@@ -85,33 +80,17 @@ public MultiServerCluster(final ClusterId clusterId, final ClusterSettings setti
8580
if (LOGGER.isInfoEnabled()) {
8681
LOGGER.info(format("Cluster created with settings %s", settings.getShortDescription()));
8782
}
83+
}
8884

89-
if (settings.getSrvHost() != null) {
90-
new Thread(new Runnable() {
91-
@Override
92-
public void run() {
93-
List<ServerAddress> hosts = new ArrayList<ServerAddress>();
94-
try {
95-
List<String> resolvedHostNames = resolveHostFromSrvRecords(settings.getSrvHost());
96-
for (String host : resolvedHostNames) {
97-
hosts.add(createServerAddress(host));
98-
}
99-
} catch (MongoConfigurationException e) {
100-
srvResolutionException = e;
101-
} catch (RuntimeException e) {
102-
LOGGER.warn("Unexpected runtime exception while resolving SRV record", e);
103-
return;
104-
}
105-
initialize(clusterId, settings, hosts, serverFactory);
106-
}
107-
}).start();
108-
} else {
109-
initialize(clusterId, settings, settings.getHosts(), serverFactory);
110-
}
85+
ClusterType getClusterType() {
86+
return clusterType;
87+
}
88+
89+
MongoException getSrvResolutionException() {
90+
return null;
11191
}
11292

113-
private void initialize(final ClusterId clusterId, final ClusterSettings settings, final List<ServerAddress> serverAddresses,
114-
final ClusterableServerFactory serverFactory) {
93+
protected void initialize(final Collection<ServerAddress> serverAddresses) {
11594
ClusterDescription newDescription;
11695

11796
// synchronizing this code because addServer registers a callback which is re-entrant to this instance.
@@ -122,9 +101,12 @@ private void initialize(final ClusterId clusterId, final ClusterSettings setting
122101
}
123102
newDescription = updateDescription();
124103
}
125-
fireChangeEvent(new ClusterDescriptionChangedEvent(clusterId, newDescription,
126-
new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.<ServerDescription>emptyList(),
127-
settings, serverFactory.getSettings())));
104+
fireChangeEvent(new ClusterDescriptionChangedEvent(getClusterId(), newDescription, createInitialDescription()));
105+
}
106+
107+
private ClusterDescription createInitialDescription() {
108+
return new ClusterDescription(getSettings().getMode(), ClusterType.UNKNOWN, Collections.<ServerDescription>emptyList(),
109+
getSettings(), getServerFactory().getSettings());
128110
}
129111

130112
@Override
@@ -165,6 +147,34 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event)
165147
}
166148
}
167149

150+
void onChange(final Collection<ServerAddress> newHosts) {
151+
synchronized (this) {
152+
if (isClosed()) {
153+
return;
154+
}
155+
156+
for (ServerAddress cur : newHosts) {
157+
addServer(cur);
158+
}
159+
160+
for (Iterator<ServerTuple> iterator = addressToServerTupleMap.values().iterator(); iterator.hasNext();) {
161+
ServerTuple cur = iterator.next();
162+
if (!newHosts.contains(cur.description.getAddress())) {
163+
if (LOGGER.isInfoEnabled()) {
164+
LOGGER.info(format("Removing %s from client view of cluster.", cur.description.getAddress()));
165+
}
166+
iterator.remove();
167+
cur.server.close();
168+
}
169+
}
170+
171+
ClusterDescription oldClusterDescription = getCurrentDescription();
172+
ClusterDescription newClusterDescription = updateDescription();
173+
174+
fireChangeEvent(new ClusterDescriptionChangedEvent(getClusterId(), newClusterDescription, oldClusterDescription));
175+
}
176+
}
177+
168178
private void onChange(final ServerDescriptionChangedEvent event) {
169179
ClusterDescription oldClusterDescription = null;
170180
ClusterDescription newClusterDescription = null;
@@ -369,7 +379,7 @@ private ServerDescription getConnectingServerDescription(final ServerAddress ser
369379
}
370380

371381
private ClusterDescription updateDescription() {
372-
ClusterDescription newDescription = new ClusterDescription(MULTIPLE, clusterType, srvResolutionException,
382+
ClusterDescription newDescription = new ClusterDescription(MULTIPLE, clusterType, getSrvResolutionException(),
373383
getNewServerDescriptionList(), getSettings(), getServerFactory().getSettings());
374384
updateDescription(newDescription);
375385
return newDescription;
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.connection;
18+
19+
import com.mongodb.MongoException;
20+
import com.mongodb.MongoInternalException;
21+
import com.mongodb.ServerAddress;
22+
import com.mongodb.connection.ClusterId;
23+
import com.mongodb.connection.ClusterType;
24+
import com.mongodb.diagnostics.logging.Logger;
25+
import com.mongodb.diagnostics.logging.Loggers;
26+
import com.mongodb.internal.dns.DnsResolver;
27+
28+
import java.util.Collections;
29+
import java.util.HashSet;
30+
import java.util.List;
31+
import java.util.Set;
32+
33+
import static com.mongodb.internal.connection.ServerAddressHelper.createServerAddress;
34+
35+
class DefaultDnsSrvRecordMonitor implements DnsSrvRecordMonitor {
36+
private static final Logger LOGGER = Loggers.getLogger("cluster");
37+
38+
private final String hostName;
39+
private final long rescanFrequencyMillis;
40+
private final long noRecordsRescanFrequencyMillis;
41+
private final DnsSrvRecordInitializer dnsSrvRecordInitializer;
42+
private final DnsResolver dnsResolver;
43+
private final Thread monitorThread;
44+
private volatile boolean isClosed;
45+
46+
DefaultDnsSrvRecordMonitor(final String hostName, final long rescanFrequencyMillis, final long noRecordsRescanFrequencyMillis,
47+
final DnsSrvRecordInitializer dnsSrvRecordInitializer, final ClusterId clusterId,
48+
final DnsResolver dnsResolver) {
49+
this.hostName = hostName;
50+
this.rescanFrequencyMillis = rescanFrequencyMillis;
51+
this.noRecordsRescanFrequencyMillis = noRecordsRescanFrequencyMillis;
52+
this.dnsSrvRecordInitializer = dnsSrvRecordInitializer;
53+
this.dnsResolver = dnsResolver;
54+
monitorThread = new Thread(new DnsSrvRecordMonitorRunnable(), "cluster-" + clusterId + "-srv-" + hostName);
55+
monitorThread.setDaemon(true);
56+
}
57+
58+
@Override
59+
public void start() {
60+
monitorThread.start();
61+
}
62+
63+
@Override
64+
public void close() {
65+
isClosed = true;
66+
monitorThread.interrupt();
67+
}
68+
69+
private class DnsSrvRecordMonitorRunnable implements Runnable {
70+
private Set<ServerAddress> currentHosts = Collections.emptySet();
71+
private ClusterType clusterType = ClusterType.UNKNOWN;
72+
73+
@Override
74+
public void run() {
75+
while (!isClosed && shouldContinueMonitoring()) {
76+
try {
77+
List<String> resolvedHostNames = dnsResolver.resolveHostFromSrvRecords(hostName);
78+
Set<ServerAddress> hosts = createServerAddressSet(resolvedHostNames);
79+
80+
if (isClosed) {
81+
return;
82+
}
83+
84+
if (!hosts.equals(currentHosts)) {
85+
try {
86+
dnsSrvRecordInitializer.initialize(hosts);
87+
currentHosts = hosts;
88+
} catch (RuntimeException e) {
89+
LOGGER.warn("Exception in monitor thread during notification of DNS resolution state change", e);
90+
}
91+
}
92+
} catch (MongoException e) {
93+
if (currentHosts.isEmpty()) {
94+
dnsSrvRecordInitializer.initialize(e);
95+
}
96+
LOGGER.info("Exception while resolving SRV records", e);
97+
} catch (RuntimeException e) {
98+
if (currentHosts.isEmpty()) {
99+
dnsSrvRecordInitializer.initialize(new MongoInternalException("Unexpected runtime exception", e));
100+
}
101+
LOGGER.info("Unexpected runtime exception while resolving SRV record", e);
102+
}
103+
104+
try {
105+
Thread.sleep(getRescanFrequencyMillis());
106+
} catch (InterruptedException e) {
107+
// fall through
108+
}
109+
clusterType = dnsSrvRecordInitializer.getClusterType();
110+
}
111+
}
112+
113+
private boolean shouldContinueMonitoring() {
114+
return clusterType == ClusterType.UNKNOWN || clusterType == ClusterType.SHARDED;
115+
}
116+
117+
private long getRescanFrequencyMillis() {
118+
return currentHosts.isEmpty() ? noRecordsRescanFrequencyMillis : rescanFrequencyMillis;
119+
}
120+
121+
private Set<ServerAddress> createServerAddressSet(final List<String> resolvedHostNames) {
122+
Set<ServerAddress> hosts = new HashSet<ServerAddress>(resolvedHostNames.size());
123+
for (String host : resolvedHostNames) {
124+
hosts.add(createServerAddress(host));
125+
}
126+
return hosts;
127+
}
128+
}
129+
}
130+

0 commit comments

Comments
 (0)