Skip to content

Commit 758a2ab

Browse files
authored
CCR should check historyUUID in every read request (#65841)
Today, CCR only checks the historyUUID of the leader shard when it has operations to replicate. If the follower shard is already in-sync with the leader shard, then CCR won't detect if the historyUUID of the leader shard has been changed. While this is not an issue, it can annoy users in the following situation: 1. The follower index is in-sync with the leader index 2. Users restore the leader index from snapshots 3. CCR won't detect the issue and report ok in its stats API 4. CCR suddenly stops working when users start indexing to the leader index 5. This commit makes sure that we always check historyUUID in every read-request so we can detect and report the issue as soon as possible. Backport of #65841
1 parent e6276e8 commit 758a2ab

File tree

2 files changed

+57
-6
lines changed

2 files changed

+57
-6
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ private void globalCheckpointAdvancementFailure(
441441
listener.onFailure(new IndexNotFoundException(shardId.getIndex()));
442442
return;
443443
}
444-
444+
checkHistoryUUID(indexShard, request.expectedHistoryUUID);
445445
final long mappingVersion = indexMetaData.getMappingVersion();
446446
final long settingsVersion = indexMetaData.getSettingsVersion();
447447
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
@@ -485,6 +485,14 @@ protected Response newResponse() {
485485

486486
static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
487487

488+
private static void checkHistoryUUID(IndexShard indexShard, String expectedHistoryUUID) {
489+
final String historyUUID = indexShard.getHistoryUUID();
490+
if (historyUUID.equals(expectedHistoryUUID) == false) {
491+
throw new IllegalStateException(
492+
"unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + historyUUID + "]");
493+
}
494+
}
495+
488496
/**
489497
* Returns at most the specified maximum number of operations from the specified from sequence number. This method will never return
490498
* operations above the specified global checkpoint.
@@ -511,11 +519,7 @@ static Translog.Operation[] getOperations(
511519
if (indexShard.state() != IndexShardState.STARTED) {
512520
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
513521
}
514-
final String historyUUID = indexShard.getHistoryUUID();
515-
if (historyUUID.equals(expectedHistoryUUID) == false) {
516-
throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" +
517-
historyUUID + "]");
518-
}
522+
checkHistoryUUID(indexShard, expectedHistoryUUID);
519523
if (fromSeqNo > globalCheckpoint) {
520524
throw new IllegalStateException(
521525
"not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]");

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.ccr;
88

9+
import org.elasticsearch.ExceptionsHelper;
910
import org.elasticsearch.cluster.metadata.IndexMetaData;
1011
import org.elasticsearch.common.bytes.BytesReference;
1112
import org.elasticsearch.common.settings.Settings;
@@ -28,7 +29,9 @@
2829
import static java.util.Collections.singletonMap;
2930
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
3031
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
32+
import static org.hamcrest.Matchers.containsString;
3133
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.hasSize;
3235
import static org.hamcrest.Matchers.nullValue;
3336

3437
public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
@@ -141,6 +144,50 @@ public void testRemoveRemoteConnection() throws Exception {
141144
});
142145
}
143146

147+
public void testChangeLeaderIndex() throws Exception {
148+
final String settings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
149+
150+
// First, let index-1 is writable and index-2 follows index-1
151+
assertAcked(client().admin().indices().prepareCreate("index-1").setSource(settings, XContentType.JSON));
152+
ensureGreen("index-1");
153+
int numDocs = between(1, 100);
154+
for (int i = 0; i < numDocs; i++) {
155+
client().prepareIndex("index-1", "doc").setSource("{}", XContentType.JSON).get();
156+
}
157+
client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-1", "index-2")).get();
158+
assertBusy(() -> assertThat(client().prepareSearch("index-2").get().getHits().totalHits, equalTo((long) numDocs)));
159+
160+
// Then switch index-1 to be a follower of index-0
161+
assertAcked(client().admin().indices().prepareCreate("index-0").setSource(settings, XContentType.JSON));
162+
final int newDocs;
163+
if (randomBoolean()) {
164+
newDocs = randomIntBetween(0, numDocs);
165+
} else {
166+
newDocs = numDocs + randomIntBetween(1, 100);
167+
}
168+
for (int i = 0; i < newDocs; i++) {
169+
client().prepareIndex("index-0", "doc").setSource("{}", XContentType.JSON).get();
170+
}
171+
if (randomBoolean()) {
172+
client().admin().indices().prepareFlush("index-0").get();
173+
}
174+
assertAcked(client().admin().indices().prepareClose("index-1"));
175+
client().execute(PutFollowAction.INSTANCE, getPutFollowRequest("index-0", "index-1")).get();
176+
177+
// index-2 should detect that the leader index has changed
178+
assertBusy(() -> {
179+
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
180+
statsRequest.setIndices(new String[]{"index-2"});
181+
FollowStatsAction.StatsResponses resp = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
182+
assertThat(resp.getStatsResponses(), hasSize(1));
183+
FollowStatsAction.StatsResponse stats = resp.getStatsResponses().get(0);
184+
assertNotNull(stats.status().getFatalException());
185+
Throwable unwrapped = ExceptionsHelper.unwrap(stats.status().getFatalException(), IllegalStateException.class);
186+
assertNotNull(unwrapped);
187+
assertThat(unwrapped.getMessage(), containsString("unexpected history uuid"));
188+
});
189+
}
190+
144191
public static String getIndexSettings(final int numberOfShards,
145192
final int numberOfReplicas,
146193
final Map<String, String> additionalIndexSettings) throws IOException {

0 commit comments

Comments
 (0)