Skip to content

Commit 6071362

Browse files
authored
Suppress merge-on-recovery for older indices (#113462)
There may be many older indices in need of merging, but today we do not throttle this work across shards so an upgrade could lead to an overwhelming spike in merges. With this commit we make it so that the automatic merge-on-recovery behaviour only applies to newly-created indices.
1 parent be7a1ec commit 6071362

File tree

5 files changed

+81
-3
lines changed

5 files changed

+81
-3
lines changed

docs/changelog/113462.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 113462
2+
summary: Suppress merge-on-recovery for older indices
3+
area: CRUD
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.elasticsearch.index.IndexService;
8585
import org.elasticsearch.index.IndexSettings;
8686
import org.elasticsearch.index.IndexVersion;
87+
import org.elasticsearch.index.IndexVersions;
8788
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
8889
import org.elasticsearch.index.analysis.TokenFilterFactory;
8990
import org.elasticsearch.index.engine.Engine;
@@ -120,6 +121,7 @@
120121
import org.elasticsearch.test.ESIntegTestCase.Scope;
121122
import org.elasticsearch.test.InternalTestCluster;
122123
import org.elasticsearch.test.engine.MockEngineSupport;
124+
import org.elasticsearch.test.index.IndexVersionUtils;
123125
import org.elasticsearch.test.junit.annotations.TestLogging;
124126
import org.elasticsearch.test.transport.MockTransportService;
125127
import org.elasticsearch.transport.TestTransportChannel;
@@ -2036,6 +2038,69 @@ public Settings onNodeStopped(String nodeName) {
20362038
assertBusy(() -> assertThat(searchableSegmentCountSupplier.getAsLong(), lessThan((long) initialSegmentCount)));
20372039
}
20382040

2041+
@Override
2042+
protected boolean forbidPrivateIndexSettings() {
2043+
return false; // need to set index.version.created to test difference in behaviour on older indices
2044+
}
2045+
2046+
public void testPostRecoveryMergeDisabledOnOlderIndices() throws Exception {
2047+
internalCluster().startMasterOnlyNode();
2048+
final var dataNode = internalCluster().startDataOnlyNode();
2049+
final var indexName = randomIdentifier();
2050+
createIndex(
2051+
indexName,
2052+
indexSettings(1, 0).put(INDEX_MERGE_ENABLED, false)
2053+
.put(
2054+
IndexMetadata.SETTING_VERSION_CREATED,
2055+
IndexVersionUtils.randomVersionBetween(
2056+
random(),
2057+
IndexVersionUtils.getFirstVersion(),
2058+
IndexVersionUtils.getPreviousVersion(IndexVersions.MERGE_ON_RECOVERY_VERSION)
2059+
)
2060+
)
2061+
.build()
2062+
);
2063+
2064+
final var initialSegmentCount = 20;
2065+
for (int i = 0; i < initialSegmentCount; i++) {
2066+
indexDoc(indexName, Integer.toString(i), "f", randomAlphaOfLength(10));
2067+
refresh(indexName); // force a one-doc segment
2068+
}
2069+
flush(indexName); // commit all the one-doc segments
2070+
2071+
final LongSupplier searchableSegmentCountSupplier = () -> indicesAdmin().prepareSegments(indexName)
2072+
.get(SAFE_AWAIT_TIMEOUT)
2073+
.getIndices()
2074+
.get(indexName)
2075+
.getShards()
2076+
.get(0)
2077+
.shards()[0].getSegments()
2078+
.stream()
2079+
.filter(Segment::isSearch)
2080+
.count();
2081+
2082+
assertEquals(initialSegmentCount, searchableSegmentCountSupplier.getAsLong());
2083+
2084+
// force a recovery by restarting the node, re-enabling merges while the node is down
2085+
internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
2086+
@Override
2087+
public Settings onNodeStopped(String nodeName) {
2088+
final var request = new UpdateSettingsRequest(Settings.builder().putNull(INDEX_MERGE_ENABLED).build(), indexName);
2089+
request.reopen(true);
2090+
safeGet(indicesAdmin().updateSettings(request));
2091+
return Settings.EMPTY;
2092+
}
2093+
});
2094+
2095+
ensureGreen(indexName);
2096+
final var mergeStats = indicesAdmin().prepareStats(indexName).clear().setMerge(true).get().getIndex(indexName).getShards()[0]
2097+
.getStats()
2098+
.getMerge();
2099+
assertEquals(0, mergeStats.getCurrent());
2100+
assertEquals(0, mergeStats.getTotal());
2101+
assertEquals(initialSegmentCount, searchableSegmentCountSupplier.getAsLong());
2102+
}
2103+
20392104
private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List<String> nodes, int shard) throws Exception {
20402105
assertThat(nodes, is(not(empty())));
20412106

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ private static IndexVersion def(int id, Version luceneVersion) {
115115
public static final IndexVersion INDEX_SORTING_ON_NESTED = def(8_512_00_0, Version.LUCENE_9_11_1);
116116
public static final IndexVersion LENIENT_UPDATEABLE_SYNONYMS = def(8_513_00_0, Version.LUCENE_9_11_1);
117117
public static final IndexVersion ENABLE_IGNORE_MALFORMED_LOGSDB = def(8_514_00_0, Version.LUCENE_9_11_1);
118+
public static final IndexVersion MERGE_ON_RECOVERY_VERSION = def(8_515_00_0, Version.LUCENE_9_11_1);
118119

119120
/*
120121
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -896,7 +896,7 @@ public void createShard(
896896
indexShard.startRecovery(
897897
recoveryState,
898898
recoveryTargetService,
899-
postRecoveryMerger.maybeMergeAfterRecovery(shardRouting, recoveryListener),
899+
postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener),
900900
repositoriesService,
901901
(mapping, listener) -> {
902902
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS

server/src/main/java/org/elasticsearch/indices/PostRecoveryMerger.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111

1212
import org.apache.lucene.index.IndexWriter;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1415
import org.elasticsearch.cluster.node.DiscoveryNode;
1516
import org.elasticsearch.cluster.routing.ShardRouting;
1617
import org.elasticsearch.common.settings.Settings;
1718
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
1819
import org.elasticsearch.core.Releasable;
1920
import org.elasticsearch.core.Strings;
21+
import org.elasticsearch.index.IndexVersions;
2022
import org.elasticsearch.index.shard.IndexShard;
2123
import org.elasticsearch.index.shard.ShardId;
2224
import org.elasticsearch.index.shard.ShardLongFieldRange;
@@ -44,14 +46,14 @@ class PostRecoveryMerger {
4446
private static final boolean TRIGGER_MERGE_AFTER_RECOVERY;
4547

4648
static {
47-
final var propertyValue = System.getProperty("es.trigger_merge_after_recovery");
49+
final var propertyValue = System.getProperty("es.trigger_merge_after_recovery_8_515_00_0");
4850
if (propertyValue == null) {
4951
TRIGGER_MERGE_AFTER_RECOVERY = true;
5052
} else if ("false".equals(propertyValue)) {
5153
TRIGGER_MERGE_AFTER_RECOVERY = false;
5254
} else {
5355
throw new IllegalStateException(
54-
"system property [es.trigger_merge_after_recovery] may only be set to [false], but was [" + propertyValue + "]"
56+
"system property [es.trigger_merge_after_recovery_8_515_00_0] may only be set to [false], but was [" + propertyValue + "]"
5557
);
5658
}
5759
}
@@ -81,6 +83,7 @@ class PostRecoveryMerger {
8183
}
8284

8385
PeerRecoveryTargetService.RecoveryListener maybeMergeAfterRecovery(
86+
IndexMetadata indexMetadata,
8487
ShardRouting shardRouting,
8588
PeerRecoveryTargetService.RecoveryListener recoveryListener
8689
) {
@@ -92,6 +95,10 @@ PeerRecoveryTargetService.RecoveryListener maybeMergeAfterRecovery(
9295
return recoveryListener;
9396
}
9497

98+
if (indexMetadata.getCreationVersion().before(IndexVersions.MERGE_ON_RECOVERY_VERSION)) {
99+
return recoveryListener;
100+
}
101+
95102
final var shardId = shardRouting.shardId();
96103
return new PeerRecoveryTargetService.RecoveryListener() {
97104
@Override

0 commit comments

Comments
 (0)