Skip to content

Commit 33a73a8

Browse files
authored
Trigger merges after recovery (#113102)
We may have shut a shard down while merges were still pending (or adjusted the merge policy while the shard was down) meaning that after recovery its segments do not reflect the desired state according to the merge policy. With this commit we invoke `IndexWriter#maybeMerge()` at the end of recovery to check for, and execute, any such lost merges.
1 parent 634d0e7 commit 33a73a8

File tree

6 files changed

+273
-19
lines changed

6 files changed

+273
-19
lines changed

docs/changelog/113102.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 113102
2+
summary: Trigger merges after recovery
3+
area: Recovery
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,9 @@ private Set<ShardId> getShardIds(final String nodeId, final String indexName) {
255255
/**
256256
* Index documents until all the shards are at least WATERMARK_BYTES in size, and return the one with the smallest size
257257
*/
258-
private ShardSizes createReasonableSizedShards(final String indexName) throws InterruptedException {
258+
private ShardSizes createReasonableSizedShards(final String indexName) {
259259
while (true) {
260-
indexRandom(true, indexName, scaledRandomIntBetween(100, 10000));
260+
indexRandom(false, indexName, scaledRandomIntBetween(100, 10000));
261261
forceMerge();
262262
refresh();
263263

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
3535
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
3636
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
37+
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
3738
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
3839
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
3940
import org.elasticsearch.action.admin.indices.stats.ShardStats;
@@ -86,6 +87,7 @@
8687
import org.elasticsearch.index.analysis.AbstractTokenFilterFactory;
8788
import org.elasticsearch.index.analysis.TokenFilterFactory;
8889
import org.elasticsearch.index.engine.Engine;
90+
import org.elasticsearch.index.engine.Segment;
8991
import org.elasticsearch.index.mapper.MapperParsingException;
9092
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
9193
import org.elasticsearch.index.recovery.RecoveryStats;
@@ -137,16 +139,21 @@
137139
import java.util.concurrent.TimeUnit;
138140
import java.util.concurrent.atomic.AtomicBoolean;
139141
import java.util.function.Consumer;
142+
import java.util.function.LongSupplier;
140143
import java.util.stream.Collectors;
141144
import java.util.stream.IntStream;
145+
import java.util.stream.Stream;
142146

143147
import static java.util.Collections.singletonMap;
144148
import static java.util.stream.Collectors.toList;
145149
import static org.elasticsearch.action.DocWriteResponse.Result.CREATED;
146150
import static org.elasticsearch.action.DocWriteResponse.Result.UPDATED;
147151
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
148152
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
153+
import static org.elasticsearch.index.MergePolicyConfig.INDEX_MERGE_ENABLED;
149154
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
155+
import static org.elasticsearch.indices.IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING;
156+
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
150157
import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING;
151158
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
152159
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -158,6 +165,7 @@
158165
import static org.hamcrest.Matchers.hasSize;
159166
import static org.hamcrest.Matchers.instanceOf;
160167
import static org.hamcrest.Matchers.is;
168+
import static org.hamcrest.Matchers.lessThan;
161169
import static org.hamcrest.Matchers.lessThanOrEqualTo;
162170
import static org.hamcrest.Matchers.not;
163171
import static org.hamcrest.Matchers.notNullValue;
@@ -1957,6 +1965,77 @@ public void accept(long globalCheckpoint, Exception e) {
19571965
recoveryCompleteListener.onResponse(null);
19581966
}
19591967

1968+
public void testPostRecoveryMerge() throws Exception {
1969+
internalCluster().startMasterOnlyNode();
1970+
final var dataNode = internalCluster().startDataOnlyNode();
1971+
final var indexName = randomIdentifier();
1972+
createIndex(indexName, indexSettings(1, 0).put(INDEX_MERGE_ENABLED, false).build());
1973+
1974+
final var initialSegmentCount = 20;
1975+
for (int i = 0; i < initialSegmentCount; i++) {
1976+
indexDoc(indexName, Integer.toString(i), "f", randomAlphaOfLength(10));
1977+
refresh(indexName); // force a one-doc segment
1978+
}
1979+
flush(indexName); // commit all the one-doc segments
1980+
1981+
final LongSupplier searchableSegmentCountSupplier = () -> indicesAdmin().prepareSegments(indexName)
1982+
.get(SAFE_AWAIT_TIMEOUT)
1983+
.getIndices()
1984+
.get(indexName)
1985+
.getShards()
1986+
.get(0)
1987+
.shards()[0].getSegments()
1988+
.stream()
1989+
.filter(Segment::isSearch)
1990+
.count();
1991+
1992+
assertEquals(initialSegmentCount, searchableSegmentCountSupplier.getAsLong());
1993+
1994+
// force a recovery by restarting the node, re-enabling merges while the node is down, but configure the node not to be in the hot
1995+
// or content tiers so that it does not do any post-recovery merge
1996+
internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
1997+
@Override
1998+
public Settings onNodeStopped(String nodeName) {
1999+
final var request = new UpdateSettingsRequest(Settings.builder().putNull(INDEX_MERGE_ENABLED).build(), indexName);
2000+
request.reopen(true);
2001+
safeGet(indicesAdmin().updateSettings(request));
2002+
return Settings.builder()
2003+
.putList(NODE_ROLES_SETTING.getKey(), randomNonEmptySubsetOf(List.of("data_warm", "data_cold")))
2004+
.build();
2005+
}
2006+
});
2007+
2008+
ensureGreen(indexName);
2009+
final var mergeStats = indicesAdmin().prepareStats(indexName).clear().setMerge(true).get().getIndex(indexName).getShards()[0]
2010+
.getStats()
2011+
.getMerge();
2012+
assertEquals(0, mergeStats.getCurrent());
2013+
assertEquals(0, mergeStats.getTotal());
2014+
assertEquals(initialSegmentCount, searchableSegmentCountSupplier.getAsLong());
2015+
2016+
// force a recovery by restarting the node again, but this time putting it into the hot or content tiers to enable post-recovery
2017+
// merges
2018+
internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
2019+
@Override
2020+
public Settings onNodeStopped(String nodeName) {
2021+
return Settings.builder()
2022+
.putList(
2023+
NODE_ROLES_SETTING.getKey(),
2024+
Stream.concat(
2025+
Stream.of(randomFrom("data", "data_content", "data_hot")),
2026+
Stream.of("data", "data_content", "data_hot", "data_warm", "data_cold").filter(p -> randomBoolean())
2027+
).distinct().toList()
2028+
)
2029+
// set the inactive time to zero so that we flush immediately after every merge, rather than having the test wait 5min
2030+
.put(SHARD_INACTIVE_TIME_SETTING.getKey(), TimeValue.ZERO)
2031+
.build();
2032+
}
2033+
});
2034+
2035+
ensureGreen(indexName);
2036+
assertBusy(() -> assertThat(searchableSegmentCountSupplier.getAsLong(), lessThan((long) initialSegmentCount)));
2037+
}
2038+
19602039
private void assertGlobalCheckpointIsStableAndSyncedInAllNodes(String indexName, List<String> nodes, int shard) throws Exception {
19612040
assertThat(nodes, is(not(empty())));
19622041

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1514,6 +1514,22 @@ public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
15141514
engine.forceMerge(forceMerge.flush(), forceMerge.maxNumSegments(), forceMerge.onlyExpungeDeletes(), forceMerge.forceMergeUUID());
15151515
}
15161516

1517+
public void triggerPendingMerges() throws IOException {
1518+
switch (state /* single volatile read */) {
1519+
case STARTED, POST_RECOVERY -> getEngine().forceMerge(
1520+
// don't immediately flush - if any merging happens then we don't wait for it anyway
1521+
false,
1522+
// don't apply any segment count limit, we only want to call IndexWriter#maybeMerge
1523+
ForceMergeRequest.Defaults.MAX_NUM_SEGMENTS,
1524+
// don't look for expunge-delete merges, we only want to call IndexWriter#maybeMerge
1525+
false,
1526+
// force-merge UUID is not used when calling IndexWriter#maybeMerge
1527+
null
1528+
);
1529+
// otherwise shard likely closed and maybe reopened, nothing to do
1530+
}
1531+
}
1532+
15171533
/**
15181534
* Creates a new {@link IndexCommit} snapshot from the currently running engine. All resources referenced by this
15191535
* commit won't be freed until the commit / snapshot is closed.

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ public class IndicesService extends AbstractLifecycleComponent
262262
private final TimestampFieldMapperService timestampFieldMapperService;
263263
private final CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
264264
private final MapperMetrics mapperMetrics;
265+
private final PostRecoveryMerger postRecoveryMerger;
265266

266267
@Override
267268
protected void doStart() {
@@ -378,6 +379,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
378379
clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries);
379380

380381
this.timestampFieldMapperService = new TimestampFieldMapperService(settings, threadPool, this);
382+
383+
this.postRecoveryMerger = new PostRecoveryMerger(settings, threadPool.executor(ThreadPool.Names.FORCE_MERGE), this::getShardOrNull);
381384
}
382385

383386
private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";
@@ -890,23 +893,29 @@ public void createShard(
890893
RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode);
891894
IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer);
892895
indexShard.addShardFailureCallback(onShardFailure);
893-
indexShard.startRecovery(recoveryState, recoveryTargetService, recoveryListener, repositoriesService, (mapping, listener) -> {
894-
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
895-
: "mapping update consumer only required by local shards recovery";
896-
AcknowledgedRequest<PutMappingRequest> putMappingRequestAcknowledgedRequest = new PutMappingRequest().setConcreteIndex(
897-
shardRouting.index()
898-
)
899-
.setConcreteIndex(shardRouting.index()) // concrete index - no name clash, it uses uuid
900-
.source(mapping.source().string(), XContentType.JSON);
901-
// concrete index - no name clash, it uses uuid
902-
client.execute(
903-
featureService.clusterHasFeature(clusterService.state(), SUPPORTS_AUTO_PUT)
904-
? TransportAutoPutMappingAction.TYPE
905-
: TransportPutMappingAction.TYPE,
906-
putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE),
907-
new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null))
908-
);
909-
}, this, clusterStateVersion);
896+
indexShard.startRecovery(
897+
recoveryState,
898+
recoveryTargetService,
899+
postRecoveryMerger.maybeMergeAfterRecovery(shardRouting, recoveryListener),
900+
repositoriesService,
901+
(mapping, listener) -> {
902+
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS
903+
: "mapping update consumer only required by local shards recovery";
904+
AcknowledgedRequest<PutMappingRequest> putMappingRequestAcknowledgedRequest = new PutMappingRequest()
905+
// concrete index - no name clash, it uses uuid
906+
.setConcreteIndex(shardRouting.index())
907+
.source(mapping.source().string(), XContentType.JSON);
908+
client.execute(
909+
featureService.clusterHasFeature(clusterService.state(), SUPPORTS_AUTO_PUT)
910+
? TransportAutoPutMappingAction.TYPE
911+
: TransportPutMappingAction.TYPE,
912+
putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE),
913+
new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null))
914+
);
915+
},
916+
this,
917+
clusterStateVersion
918+
);
910919
}
911920

912921
@Override
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices;
11+
12+
import org.apache.lucene.index.IndexWriter;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.cluster.node.DiscoveryNode;
15+
import org.elasticsearch.cluster.routing.ShardRouting;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
18+
import org.elasticsearch.core.Releasable;
19+
import org.elasticsearch.core.Strings;
20+
import org.elasticsearch.index.shard.IndexShard;
21+
import org.elasticsearch.index.shard.ShardId;
22+
import org.elasticsearch.index.shard.ShardLongFieldRange;
23+
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
24+
import org.elasticsearch.indices.recovery.RecoveryFailedException;
25+
import org.elasticsearch.indices.recovery.RecoveryState;
26+
import org.elasticsearch.logging.LogManager;
27+
import org.elasticsearch.logging.Logger;
28+
29+
import java.util.concurrent.Executor;
30+
import java.util.function.Function;
31+
32+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE;
33+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_HOT_NODE_ROLE;
34+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.DATA_ROLE;
35+
import static org.elasticsearch.cluster.node.DiscoveryNodeRole.INDEX_ROLE;
36+
37+
/**
38+
* Triggers a check for pending merges when a shard completes recovery.
39+
*/
40+
class PostRecoveryMerger {
41+
42+
private static final Logger logger = LogManager.getLogger(PostRecoveryMerger.class);
43+
44+
private static final boolean TRIGGER_MERGE_AFTER_RECOVERY;
45+
46+
static {
47+
final var propertyValue = System.getProperty("es.trigger_merge_after_recovery");
48+
if (propertyValue == null) {
49+
TRIGGER_MERGE_AFTER_RECOVERY = true;
50+
} else if ("false".equals(propertyValue)) {
51+
TRIGGER_MERGE_AFTER_RECOVERY = false;
52+
} else {
53+
throw new IllegalStateException(
54+
"system property [es.trigger_merge_after_recovery] may only be set to [false], but was [" + propertyValue + "]"
55+
);
56+
}
57+
}
58+
59+
/**
60+
* Throttled runner to avoid multiple concurrent calls to {@link IndexWriter#maybeMerge()}: we do not need to execute these things
61+
* especially quickly, as long as they happen eventually, and each such call may involve some IO (reading the soft-deletes doc values to
62+
* count deleted docs). Note that we're not throttling any actual merges, just the checks to see what merges might be needed. Throttling
63+
* merges across shards is a separate issue, but normally this mechanism won't trigger any new merges anyway.
64+
*/
65+
private final ThrottledTaskRunner postRecoveryMergeRunner;
66+
67+
private final Function<ShardId, IndexShard> shardFunction;
68+
private final boolean enabled;
69+
70+
PostRecoveryMerger(Settings settings, Executor executor, Function<ShardId, IndexShard> shardFunction) {
71+
this.postRecoveryMergeRunner = new ThrottledTaskRunner(getClass().getCanonicalName(), 1, executor);
72+
this.shardFunction = shardFunction;
73+
this.enabled =
74+
// enabled globally ...
75+
TRIGGER_MERGE_AFTER_RECOVERY
76+
// ... and we are a node that expects nontrivial amounts of indexing work
77+
&& (DiscoveryNode.hasRole(settings, DATA_HOT_NODE_ROLE)
78+
|| DiscoveryNode.hasRole(settings, DATA_CONTENT_NODE_ROLE)
79+
|| DiscoveryNode.hasRole(settings, DATA_ROLE)
80+
|| DiscoveryNode.hasRole(settings, INDEX_ROLE));
81+
}
82+
83+
PeerRecoveryTargetService.RecoveryListener maybeMergeAfterRecovery(
84+
ShardRouting shardRouting,
85+
PeerRecoveryTargetService.RecoveryListener recoveryListener
86+
) {
87+
if (enabled == false) {
88+
return recoveryListener;
89+
}
90+
91+
if (shardRouting.isPromotableToPrimary() == false) {
92+
return recoveryListener;
93+
}
94+
95+
final var shardId = shardRouting.shardId();
96+
return new PeerRecoveryTargetService.RecoveryListener() {
97+
@Override
98+
public void onRecoveryDone(
99+
RecoveryState state,
100+
ShardLongFieldRange timestampMillisFieldRange,
101+
ShardLongFieldRange eventIngestedMillisFieldRange
102+
) {
103+
postRecoveryMergeRunner.enqueueTask(new PostRecoveryMerge(shardId));
104+
recoveryListener.onRecoveryDone(state, timestampMillisFieldRange, eventIngestedMillisFieldRange);
105+
}
106+
107+
@Override
108+
public void onRecoveryFailure(RecoveryFailedException e, boolean sendShardFailure) {
109+
recoveryListener.onRecoveryFailure(e, sendShardFailure);
110+
}
111+
};
112+
}
113+
114+
class PostRecoveryMerge implements ActionListener<Releasable> {
115+
private final ShardId shardId;
116+
117+
PostRecoveryMerge(ShardId shardId) {
118+
this.shardId = shardId;
119+
}
120+
121+
@Override
122+
public void onResponse(Releasable releasable) {
123+
try (releasable) {
124+
final var indexShard = shardFunction.apply(shardId);
125+
if (indexShard == null) {
126+
return;
127+
}
128+
129+
indexShard.triggerPendingMerges();
130+
} catch (Exception e) {
131+
logFailure(e);
132+
}
133+
}
134+
135+
@Override
136+
public void onFailure(Exception e) {
137+
logFailure(e);
138+
}
139+
140+
private void logFailure(Exception e) {
141+
// post-recovery merge is a best-effort thing, failure needs no special handling
142+
logger.debug(() -> Strings.format("failed to execute post-recovery merge of [%s]", shardId), e);
143+
}
144+
}
145+
}

0 commit comments

Comments
 (0)