Skip to content

Commit 1de71c0

Browse files
tlrxDaveCTurner
andauthored
Fix RestartIndexFollowingIT.testFollowIndex in case of fatal exception (#92522) (#92645)
This test failed several times after the leader cluster is fully restarted for the second time. The logs indicate that one or more ShardFollowNodeTask (the persistent task in charge or replication operations for a shard) have been stopped because a fatal exception occured. The fatal exception is an IllegalStateException with the Unable to open any connections to remote cluster message. I think this is due to the leader cluster being slow to restart and the remote cluster sniff strategy giving up after it tried to connect to the leader cluster nodes. Since this exception is fatal, the ShardFollowNodeTask stopped to replicate all operations and the test fails waiting for the number of docs to match on leader and follower clusters. The documented way to resolve CCR fatal exceptions for follower is to recreate the follower or to pause/resume follower. Test has been adjusted accordingly. Closes #90666 Co-authored-by: David Turner <[email protected]> Co-authored-by: David Turner <[email protected]>
1 parent 88ede8c commit 1de71c0

File tree

1 file changed

+37
-6
lines changed

1 file changed

+37
-6
lines changed

x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/RestartIndexFollowingIT.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
1313
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
1414
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
15+
import org.elasticsearch.action.support.ActiveShardCount;
1516
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1617
import org.elasticsearch.common.settings.Settings;
1718
import org.elasticsearch.core.TimeValue;
@@ -20,11 +21,16 @@
2021
import org.elasticsearch.transport.TransportService;
2122
import org.elasticsearch.xcontent.XContentType;
2223
import org.elasticsearch.xpack.CcrIntegTestCase;
24+
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
2325
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
2426
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
27+
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
2528
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
2629

2730
import java.util.List;
31+
import java.util.Objects;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicBoolean;
2834

2935
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3036
import static org.hamcrest.Matchers.containsString;
@@ -79,6 +85,7 @@ public void testFollowIndex() throws Exception {
7985
ensureFollowerGreen("index2");
8086

8187
final long secondBatchNumDocs = randomIntBetween(10, 200);
88+
logger.info("Indexing [{}] docs as second batch", secondBatchNumDocs);
8289
for (int i = 0; i < secondBatchNumDocs; i++) {
8390
leaderClient().prepareIndex("index1").setSource("{}", XContentType.JSON).get();
8491
}
@@ -87,16 +94,28 @@ public void testFollowIndex() throws Exception {
8794
ensureLeaderGreen("index1");
8895

8996
final long thirdBatchNumDocs = randomIntBetween(10, 200);
97+
logger.info("Indexing [{}] docs as third batch", thirdBatchNumDocs);
9098
for (int i = 0; i < thirdBatchNumDocs; i++) {
9199
leaderClient().prepareIndex("index1").setSource("{}", XContentType.JSON).get();
92100
}
93101

94-
assertBusy(
95-
() -> assertThat(
96-
followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
97-
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)
98-
)
99-
);
102+
var totalDocs = firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs;
103+
final AtomicBoolean resumeAfterDisconnectionOnce = new AtomicBoolean(false);
104+
assertBusy(() -> {
105+
if (resumeAfterDisconnectionOnce.get() == false && isFollowerStoppedBecauseOfRemoteClusterDisconnection("index2")) {
106+
assertTrue(resumeAfterDisconnectionOnce.compareAndSet(false, true));
107+
if (randomBoolean()) {
108+
logger.info("shard follow task has been stopped because of remote cluster disconnection, resuming");
109+
pauseFollow("index2");
110+
assertAcked(followerClient().execute(ResumeFollowAction.INSTANCE, resumeFollow("index2")).actionGet());
111+
} else {
112+
logger.info("shard follow task has been stopped because of remote cluster disconnection, recreating");
113+
assertAcked(followerClient().admin().indices().prepareDelete("index2"));
114+
followerClient().execute(PutFollowAction.INSTANCE, putFollow("index1", "index2", ActiveShardCount.ALL)).actionGet();
115+
}
116+
}
117+
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, equalTo(totalDocs));
118+
}, 30L, TimeUnit.SECONDS);
100119

101120
cleanRemoteCluster();
102121
assertAcked(followerClient().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("index2")).actionGet());
@@ -151,4 +170,16 @@ private void cleanRemoteCluster() throws Exception {
151170
assertThat(infos.size(), equalTo(0));
152171
});
153172
}
173+
174+
private boolean isFollowerStoppedBecauseOfRemoteClusterDisconnection(String indexName) {
175+
var request = new FollowStatsAction.StatsRequest();
176+
request.setIndices(new String[] { indexName });
177+
var response = followerClient().execute(FollowStatsAction.INSTANCE, request).actionGet();
178+
return response.getStatsResponses().stream().map(r -> r.status().getFatalException()).filter(Objects::nonNull).anyMatch(e -> {
179+
if (e.getCause()instanceof IllegalStateException ise) {
180+
return ise.getMessage().contains("Unable to open any connections to remote cluster");
181+
}
182+
return false;
183+
});
184+
}
154185
}

0 commit comments

Comments
 (0)