Skip to content

Commit cd9c4d3

Browse files
committed
Reconnect remote cluster when seeds are changed (#43379)
The RemoteClusterService should close the current RemoteClusterConnection and should build it again if the seeds are changed, similarly to what is done when the ping interval or the compression settings are changed. Closes #37799
1 parent 88557f4 commit cd9c4d3

File tree

2 files changed

+88
-1
lines changed

2 files changed

+88
-1
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ private synchronized void updateRemoteClusters(Map<String, Tuple<String, List<Tu
233233
remote = new RemoteClusterConnection(settings, clusterAlias, seedList, transportService, numRemoteConnections,
234234
getNodePredicate(settings), proxyAddress, connectionProfile);
235235
remoteClusters.put(clusterAlias, remote);
236-
} else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)) {
236+
} else if (connectionProfileChanged(remote.getConnectionManager().getConnectionProfile(), connectionProfile)
237+
|| seedsChanged(remote.getSeedNodes(), seedList)) {
237238
// New ConnectionProfile. Must tear down existing connection
238239
try {
239240
IOUtils.close(remote);
@@ -472,6 +473,16 @@ private boolean connectionProfileChanged(ConnectionProfile oldProfile, Connectio
472473
|| Objects.equals(oldProfile.getPingInterval(), newProfile.getPingInterval()) == false;
473474
}
474475

476+
private boolean seedsChanged(final List<Tuple<String, Supplier<DiscoveryNode>>> oldSeedNodes,
477+
final List<Tuple<String, Supplier<DiscoveryNode>>> newSeedNodes) {
478+
if (oldSeedNodes.size() != newSeedNodes.size()) {
479+
return true;
480+
}
481+
Set<String> oldSeeds = oldSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet());
482+
Set<String> newSeeds = newSeedNodes.stream().map(Tuple::v1).collect(Collectors.toSet());
483+
return oldSeeds.equals(newSeeds) == false;
484+
}
485+
475486
/**
476487
* Collects all nodes of the given clusters and returns / passes a (clusterAlias, nodeId) to {@link DiscoveryNode}
477488
* function on success.

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.io.IOException;
4141
import java.net.InetAddress;
4242
import java.net.InetSocketAddress;
43+
import java.util.ArrayList;
4344
import java.util.Arrays;
4445
import java.util.Collections;
4546
import java.util.EnumSet;
@@ -877,6 +878,81 @@ public void testGetNodePredicatesCombination() {
877878
}
878879
}
879880

881+
public void testReconnectWhenSeedsNodesAreUpdated() throws Exception {
882+
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
883+
try (MockTransportService cluster_node_0 = startTransport("cluster_node_0", knownNodes, Version.CURRENT);
884+
MockTransportService cluster_node_1 = startTransport("cluster_node_1", knownNodes, Version.CURRENT)) {
885+
886+
final DiscoveryNode node0 = cluster_node_0.getLocalDiscoNode();
887+
final DiscoveryNode node1 = cluster_node_1.getLocalDiscoNode();
888+
knownNodes.add(node0);
889+
knownNodes.add(node1);
890+
Collections.shuffle(knownNodes, random());
891+
892+
try (MockTransportService transportService =
893+
MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
894+
transportService.start();
895+
transportService.acceptIncomingRequests();
896+
897+
final Settings.Builder builder = Settings.builder();
898+
builder.putList("cluster.remote.cluster_test.seeds", Collections.singletonList(node0.getAddress().toString()));
899+
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
900+
assertFalse(service.isCrossClusterSearchEnabled());
901+
service.initializeRemoteClusters();
902+
assertTrue(service.isCrossClusterSearchEnabled());
903+
904+
final RemoteClusterConnection firstRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
905+
assertTrue(firstRemoteClusterConnection.isNodeConnected(node0));
906+
assertTrue(firstRemoteClusterConnection.isNodeConnected(node1));
907+
assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected());
908+
assertFalse(firstRemoteClusterConnection.isClosed());
909+
910+
final CountDownLatch firstLatch = new CountDownLatch(1);
911+
service.updateRemoteCluster(
912+
"cluster_test",
913+
Collections.singletonList(node0.getAddress().toString()), null,
914+
genericProfile("cluster_test"), connectionListener(firstLatch));
915+
firstLatch.await();
916+
917+
assertTrue(service.isCrossClusterSearchEnabled());
918+
assertTrue(firstRemoteClusterConnection.isNodeConnected(node0));
919+
assertTrue(firstRemoteClusterConnection.isNodeConnected(node1));
920+
assertEquals(2, firstRemoteClusterConnection.getNumNodesConnected());
921+
assertFalse(firstRemoteClusterConnection.isClosed());
922+
assertSame(firstRemoteClusterConnection, service.getRemoteClusterConnection("cluster_test"));
923+
924+
final List<String> newSeeds = new ArrayList<>();
925+
newSeeds.add(node1.getAddress().toString());
926+
if (randomBoolean()) {
927+
newSeeds.add(node0.getAddress().toString());
928+
Collections.shuffle(newSeeds, random());
929+
}
930+
931+
final CountDownLatch secondLatch = new CountDownLatch(1);
932+
service.updateRemoteCluster(
933+
"cluster_test",
934+
newSeeds, null,
935+
genericProfile("cluster_test"), connectionListener(secondLatch));
936+
secondLatch.await();
937+
938+
assertTrue(service.isCrossClusterSearchEnabled());
939+
assertBusy(() -> {
940+
assertFalse(firstRemoteClusterConnection.isNodeConnected(node0));
941+
assertFalse(firstRemoteClusterConnection.isNodeConnected(node1));
942+
assertEquals(0, firstRemoteClusterConnection.getNumNodesConnected());
943+
assertTrue(firstRemoteClusterConnection.isClosed());
944+
});
945+
946+
final RemoteClusterConnection secondRemoteClusterConnection = service.getRemoteClusterConnection("cluster_test");
947+
assertTrue(secondRemoteClusterConnection.isNodeConnected(node0));
948+
assertTrue(secondRemoteClusterConnection.isNodeConnected(node1));
949+
assertEquals(2, secondRemoteClusterConnection.getNumNodesConnected());
950+
assertFalse(secondRemoteClusterConnection.isClosed());
951+
}
952+
}
953+
}
954+
}
955+
880956
public void testRemoteClusterWithProxy() throws Exception {
881957
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
882958
try (MockTransportService cluster_1_node0 = startTransport("cluster_1_node0", knownNodes, Version.CURRENT);

0 commit comments

Comments
 (0)