Skip to content

Commit d30c0e8

Browse files
committed
Wait for connect on remote settings update (#48497)
This is related to #47718. It introduces a 10 seconds wait for a connection to complete when remote clsuter settings introduce a new remote cluster connection.
1 parent 876fcc9 commit d30c0e8

File tree

4 files changed

+40
-13
lines changed

4 files changed

+40
-13
lines changed

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Map;
5050
import java.util.Objects;
5151
import java.util.Set;
52+
import java.util.concurrent.CountDownLatch;
5253
import java.util.concurrent.TimeUnit;
5354
import java.util.concurrent.TimeoutException;
5455
import java.util.function.BiFunction;
@@ -396,7 +397,24 @@ protected void updateRemoteCluster(String clusterAlias, List<String> addresses,
396397
builder.setPingInterval(pingSchedule);
397398
newProfile = builder.build();
398399
}
399-
updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, noopListener);
400+
401+
if (remoteClusters.containsKey(clusterAlias) == false) {
402+
CountDownLatch latch = new CountDownLatch(1);
403+
updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, ActionListener.wrap(latch::countDown));
404+
405+
try {
406+
// Wait 10 seconds for a new cluster. We must use a latch instead of a future because we
407+
// are on the cluster state thread and our custom future implementation will throw an
408+
// assertion.
409+
if (latch.await(10, TimeUnit.SECONDS) == false) {
410+
logger.warn("failed to connect to new remote cluster {} within {}", clusterAlias, TimeValue.timeValueSeconds(10));
411+
}
412+
} catch (InterruptedException e) {
413+
Thread.currentThread().interrupt();
414+
}
415+
} else {
416+
updateRemoteCluster(clusterAlias, addresses, proxyAddress, newProfile, noopListener);
417+
}
400418
}
401419

402420
void updateRemoteCluster(final String clusterAlias, final List<String> addresses, final String proxyAddress,

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.action.ActionListener;
2323
import org.elasticsearch.action.OriginalIndices;
2424
import org.elasticsearch.action.support.IndicesOptions;
25+
import org.elasticsearch.action.support.PlainActionFuture;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
2627
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
2728
import org.elasticsearch.common.Strings;
@@ -322,7 +323,19 @@ public void testIncrementallyAddClusters() throws IOException {
322323
assertFalse(service.isCrossClusterSearchEnabled());
323324
service.initializeRemoteClusters();
324325
assertFalse(service.isCrossClusterSearchEnabled());
325-
service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null);
326+
327+
PlainActionFuture<Void> clusterAdded = PlainActionFuture.newFuture();
328+
// Add the cluster on a different thread to test that we wait for a new cluster to
329+
// connect before returning.
330+
new Thread(() -> {
331+
try {
332+
service.updateRemoteCluster("cluster_1", Collections.singletonList(cluster1Seed.getAddress().toString()), null);
333+
clusterAdded.onResponse(null);
334+
} catch (Exception e) {
335+
clusterAdded.onFailure(e);
336+
}
337+
}).start();
338+
clusterAdded.actionGet();
326339
assertTrue(service.isCrossClusterSearchEnabled());
327340
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
328341
service.updateRemoteCluster("cluster_2", Collections.singletonList(cluster2Seed.getAddress().toString()), null);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,9 @@ public void setupLocalRemote() throws Exception {
6767
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address));
6868
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
6969

70-
assertBusy(() -> {
71-
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
72-
assertThat(infos.size(), equalTo(1));
73-
assertThat(infos.get(0).getNumNodesConnected(), equalTo(1));
74-
});
70+
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
71+
assertThat(infos.size(), equalTo(1));
72+
assertThat(infos.get(0).getNumNodesConnected(), equalTo(1));
7573
}
7674

7775
@Before

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,10 @@ private void setupRemoteCluster() throws Exception {
9494
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
9595
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
9696

97-
assertBusy(() -> {
98-
List<RemoteConnectionInfo> infos =
99-
followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
100-
assertThat(infos.size(), equalTo(1));
101-
assertThat(infos.get(0).getNumNodesConnected(), greaterThanOrEqualTo(1));
102-
});
97+
List<RemoteConnectionInfo> infos =
98+
followerClient().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
99+
assertThat(infos.size(), equalTo(1));
100+
assertThat(infos.get(0).getNumNodesConnected(), greaterThanOrEqualTo(1));
103101
}
104102

105103
private void cleanRemoteCluster() throws Exception {

0 commit comments

Comments
 (0)