Skip to content

Commit a980ef7

Browse files
committed
Check if merge is aborted before continuing merging
Relates #107513 Relates ES-11749 Relates ES-11384
1 parent d90055e commit a980ef7

File tree

4 files changed

+627
-0
lines changed

4 files changed

+627
-0
lines changed
Lines changed: 364 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,364 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.codecs.DocValuesProducer;
13+
import org.apache.lucene.codecs.StoredFieldsReader;
14+
import org.apache.lucene.index.BinaryDocValues;
15+
import org.apache.lucene.index.CodecReader;
16+
import org.apache.lucene.index.FieldInfo;
17+
import org.apache.lucene.index.FilterCodecReader;
18+
import org.apache.lucene.index.MergePolicy;
19+
import org.apache.lucene.index.NumericDocValues;
20+
import org.apache.lucene.index.OneMergeWrappingMergePolicy;
21+
import org.apache.lucene.index.SortedDocValues;
22+
import org.apache.lucene.index.SortedNumericDocValues;
23+
import org.apache.lucene.index.SortedSetDocValues;
24+
import org.apache.lucene.index.StoredFieldVisitor;
25+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
26+
import org.elasticsearch.action.bulk.BulkResponse;
27+
import org.elasticsearch.action.support.WriteRequest;
28+
import org.elasticsearch.cluster.metadata.ProjectId;
29+
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
30+
import org.elasticsearch.core.TimeValue;
31+
import org.elasticsearch.index.IndexSettings;
32+
import org.elasticsearch.index.codec.FilterDocValuesProducer;
33+
import org.elasticsearch.indices.IndicesService;
34+
import org.elasticsearch.plugins.EnginePlugin;
35+
import org.elasticsearch.plugins.Plugin;
36+
import org.elasticsearch.plugins.PluginsService;
37+
import org.elasticsearch.test.ESIntegTestCase;
38+
39+
import java.io.IOException;
40+
import java.util.ArrayList;
41+
import java.util.Collection;
42+
import java.util.Map;
43+
import java.util.Optional;
44+
import java.util.concurrent.CountDownLatch;
45+
import java.util.concurrent.atomic.AtomicBoolean;
46+
import java.util.concurrent.atomic.AtomicLong;
47+
import java.util.stream.StreamSupport;
48+
49+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
50+
import static org.hamcrest.Matchers.equalTo;
51+
import static org.hamcrest.Matchers.greaterThan;
52+
53+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
54+
public class CheckAbortedMergesIT extends ESIntegTestCase {
55+
56+
@Override
57+
protected boolean addMockInternalEngine() {
58+
return false;
59+
}
60+
61+
@Override
62+
protected Collection<Class<? extends Plugin>> nodePlugins() {
63+
var plugins = new ArrayList<>(super.nodePlugins());
64+
plugins.add(BlockRunningMergesEngineTestPlugin.class);
65+
return plugins;
66+
}
67+
68+
public void testAbortedMerges() throws Exception {
69+
internalCluster().startMasterOnlyNode();
70+
var nodeA = internalCluster().startDataOnlyNode();
71+
72+
var pluginA = internalCluster().getInstance(PluginsService.class, nodeA)
73+
.filterPlugins(BlockRunningMergesEngineTestPlugin.class)
74+
.findFirst()
75+
.orElseThrow(() -> new AssertionError("Plugin not found"));
76+
77+
final boolean checkAbortedMerges = false;randomBoolean();
78+
pluginA.blockMerges();
79+
80+
final var indexName = randomIdentifier();
81+
createIndex(
82+
indexName,
83+
indexSettings(1, 0).put(CheckAbortedDuringMergePolicy.ENABLE_CHECK_ABORTED_DURING_MERGE.getKey(), checkAbortedMerges)
84+
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
85+
.build()
86+
);
87+
88+
var indexServiceA = internalCluster().getInstance(IndicesService.class, nodeA).indexService(resolveIndex(indexName));
89+
assertThat(indexServiceA.hasShard(0), equalTo(true));
90+
91+
indexDocs(indexName, 10);
92+
flush(indexName);
93+
94+
while (true) {
95+
indexDocs(indexName, 10);
96+
flush(indexName);
97+
98+
var mergesStats = client().admin().indices().prepareStats(indexName).clear().setMerge(true).get();
99+
if (mergesStats.getIndices().get(indexName).getPrimaries().getMerge().getCurrent() > 0) {
100+
break;
101+
}
102+
}
103+
104+
var nodeB = internalCluster().startDataOnlyNode();
105+
ensureStableCluster(3);
106+
107+
pluginA.waitForMergesBlocked();
108+
109+
ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, nodeA, nodeB, ProjectId.DEFAULT));
110+
ensureGreen(indexName);
111+
112+
var indexServiceB = internalCluster().getInstance(IndicesService.class, nodeB).indexService(resolveIndex(indexName));
113+
assertBusy(() -> assertThat(indexServiceB.hasShard(0), equalTo(true)));
114+
assertBusy(() -> assertThat(indexServiceA.hasShard(0), equalTo(false)));
115+
if (randomBoolean()) {
116+
forceMerge();
117+
}
118+
119+
assertThat(pluginA.mergedDocsCount.get(), equalTo(0L));
120+
assertThat(pluginA.mergedFieldsCount.get(), equalTo(0L));
121+
assertThat(pluginA.checkIntegrityCount.get(), equalTo(0L));
122+
123+
pluginA.unblockMerges();
124+
125+
var mergeMetrics = internalCluster().getDataNodeInstances(MergeMetrics.class);
126+
assertBusy(
127+
() -> assertThat(
128+
StreamSupport.stream(mergeMetrics.spliterator(), false)
129+
.mapToLong(m -> m.getQueuedMergeSizeInBytes() + m.getRunningMergeSizeInBytes())
130+
.sum(),
131+
equalTo(0L)
132+
)
133+
);
134+
135+
assertBusy(() -> {
136+
if (checkAbortedMerges) {
137+
assertThat(pluginA.mergedDocsCount.get(), equalTo(0L));
138+
assertThat(pluginA.mergedFieldsCount.get(), equalTo(0L));
139+
// Only the first integrity check is completed, the following ones should have been aborted
140+
assertThat(pluginA.checkIntegrityCount.get(), equalTo(1L));
141+
} else {
142+
assertThat(pluginA.mergedDocsCount.get(), greaterThan(0L));
143+
assertThat(pluginA.mergedFieldsCount.get(), greaterThan(0L));
144+
assertThat(pluginA.checkIntegrityCount.get(), greaterThan(1L));
145+
}
146+
});
147+
}
148+
149+
private static BulkResponse indexDocs(String indexName, int numDocs) {
150+
final var client = client();
151+
var bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
152+
for (int i = 0; i < numDocs; i++) {
153+
var indexRequest = client.prepareIndex(indexName)
154+
.setSource(Map.of("text", randomUnicodeOfCodepointLengthBetween(1, 25), "integer", randomIntBetween(0, 100)));
155+
bulkRequest.add(indexRequest);
156+
}
157+
var bulkResponse = bulkRequest.get();
158+
assertNoFailures(bulkResponse);
159+
return bulkResponse;
160+
}
161+
162+
/**
163+
* An engine plugin that allows to block running merges.
164+
*
165+
* Note: merges are blocked before executing the first integrity check on stored fields of the first segment to be merged
166+
*/
167+
public static class BlockRunningMergesEngineTestPlugin extends Plugin implements EnginePlugin {
168+
169+
// Merges are not blocked by default
170+
private final AtomicBoolean blockMerges = new AtomicBoolean(false);
171+
172+
// Number of checkIntegrity() method calls that have been executed
173+
private final AtomicLong checkIntegrityCount = new AtomicLong(0L);
174+
175+
// Number of time a field has been accessed during merges
176+
private final AtomicLong mergedFieldsCount = new AtomicLong(0L);
177+
178+
// Number of time a doc has been accessed during merges
179+
private final AtomicLong mergedDocsCount = new AtomicLong(0L);
180+
181+
// Used to block merges from running immediately
182+
private final AtomicBoolean mergesStarted = new AtomicBoolean();
183+
private final CountDownLatch mergesStartedLatch = new CountDownLatch(1);
184+
private final CountDownLatch resumeMerges = new CountDownLatch(1);
185+
186+
void blockMerges() {
187+
if (blockMerges.compareAndSet(false, true) == false) {
188+
throw new AssertionError("Merges already blocked");
189+
}
190+
}
191+
192+
void waitForMergesBlocked() {
193+
safeAwait(mergesStartedLatch);
194+
}
195+
196+
void unblockMerges() {
197+
if (blockMerges.compareAndSet(true, false) == false) {
198+
throw new AssertionError("Merges already unblocked");
199+
}
200+
resumeMerges.countDown();
201+
}
202+
203+
@Override
204+
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
205+
return Optional.of(
206+
config -> new InternalEngine(
207+
new EngineConfig(
208+
config.getShardId(),
209+
config.getThreadPool(),
210+
config.getThreadPoolMergeExecutorService(),
211+
config.getIndexSettings(),
212+
config.getWarmer(),
213+
config.getStore(),
214+
wrapMergePolicy(config.getMergePolicy()),
215+
config.getAnalyzer(),
216+
config.getSimilarity(),
217+
config.getCodecProvider(),
218+
config.getEventListener(),
219+
config.getQueryCache(),
220+
config.getQueryCachingPolicy(),
221+
config.getTranslogConfig(),
222+
config.getFlushMergesAfter(),
223+
config.getExternalRefreshListener(),
224+
config.getInternalRefreshListener(),
225+
config.getIndexSort(),
226+
config.getCircuitBreakerService(),
227+
config.getGlobalCheckpointSupplier(),
228+
config.retentionLeasesSupplier(),
229+
config.getPrimaryTermSupplier(),
230+
config.getSnapshotCommitSupplier(),
231+
config.getLeafSorter(),
232+
config.getRelativeTimeInNanosSupplier(),
233+
config.getIndexCommitListener(),
234+
config.isPromotableToPrimary(),
235+
config.getMapperService(),
236+
config.getEngineResetLock(),
237+
config.getMergeMetrics(),
238+
config.getIndexDeletionPolicyWrapper()
239+
)
240+
)
241+
);
242+
}
243+
244+
private MergePolicy wrapMergePolicy(MergePolicy policy) {
245+
if (blockMerges.get() == false) {
246+
return policy;
247+
}
248+
return new OneMergeWrappingMergePolicy(policy, toWrap -> new MergePolicy.OneMerge(toWrap) {
249+
250+
void maybeBlockMerge() {
251+
if (mergesStarted.compareAndSet(false, true)) {
252+
mergesStartedLatch.countDown();
253+
}
254+
safeAwait(resumeMerges, TimeValue.ONE_HOUR);
255+
}
256+
257+
@Override
258+
public CodecReader wrapForMerge(CodecReader reader) throws IOException {
259+
return new FilterCodecReader(toWrap.wrapForMerge(reader)) {
260+
261+
@Override
262+
public CacheHelper getReaderCacheHelper() {
263+
return in.getReaderCacheHelper();
264+
}
265+
266+
@Override
267+
public CacheHelper getCoreCacheHelper() {
268+
return in.getCoreCacheHelper();
269+
}
270+
271+
@Override
272+
public StoredFieldsReader getFieldsReader() {
273+
return new WrappedStoredFieldsReader(super.getFieldsReader());
274+
}
275+
276+
private class WrappedStoredFieldsReader extends StoredFieldsReader {
277+
278+
private final StoredFieldsReader delegate;
279+
280+
private WrappedStoredFieldsReader(StoredFieldsReader delegate) {
281+
this.delegate = delegate;
282+
}
283+
284+
@Override
285+
public void checkIntegrity() throws IOException {
286+
maybeBlockMerge();
287+
delegate.checkIntegrity();
288+
checkIntegrityCount.incrementAndGet();
289+
}
290+
291+
@Override
292+
public void close() throws IOException {
293+
delegate.close();
294+
}
295+
296+
@Override
297+
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
298+
delegate.document(docID, visitor);
299+
mergedDocsCount.incrementAndGet();
300+
}
301+
302+
@Override
303+
public StoredFieldsReader clone() {
304+
return new WrappedStoredFieldsReader(delegate.clone());
305+
}
306+
307+
@Override
308+
public StoredFieldsReader getMergeInstance() {
309+
return new WrappedStoredFieldsReader(delegate.getMergeInstance());
310+
}
311+
}
312+
313+
@Override
314+
public DocValuesProducer getDocValuesReader() {
315+
return new FilterDocValuesProducer(super.getDocValuesReader()) {
316+
@Override
317+
public NumericDocValues getNumeric(FieldInfo field) throws IOException {
318+
var result = super.getNumeric(field);
319+
mergedFieldsCount.incrementAndGet();
320+
return result;
321+
}
322+
323+
@Override
324+
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
325+
var result = super.getBinary(field);
326+
mergedFieldsCount.incrementAndGet();
327+
return result;
328+
}
329+
330+
@Override
331+
public SortedDocValues getSorted(FieldInfo field) throws IOException {
332+
var result = super.getSorted(field);
333+
mergedFieldsCount.incrementAndGet();
334+
return result;
335+
}
336+
337+
@Override
338+
public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException {
339+
var result = super.getSortedNumeric(field);
340+
mergedFieldsCount.incrementAndGet();
341+
return result;
342+
}
343+
344+
@Override
345+
public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException {
346+
var result = super.getSortedSet(field);
347+
mergedFieldsCount.incrementAndGet();
348+
return result;
349+
}
350+
351+
@Override
352+
public void checkIntegrity() throws IOException {
353+
maybeBlockMerge();
354+
super.checkIntegrity();
355+
checkIntegrityCount.incrementAndGet();
356+
}
357+
};
358+
}
359+
};
360+
}
361+
});
362+
}
363+
}
364+
}

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.index.MergeSchedulerConfig;
3131
import org.elasticsearch.index.SearchSlowLog;
3232
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
33+
import org.elasticsearch.index.engine.CheckAbortedDuringMergePolicy;
3334
import org.elasticsearch.index.engine.EngineConfig;
3435
import org.elasticsearch.index.fielddata.IndexFieldDataService;
3536
import org.elasticsearch.index.mapper.FieldMapper;
@@ -206,6 +207,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
206207
IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING,
207208
IndexSettings.RECOVERY_USE_SYNTHETIC_SOURCE_SETTING,
208209
InferenceMetadataFieldsMapper.USE_LEGACY_SEMANTIC_TEXT_FORMAT,
210+
CheckAbortedDuringMergePolicy.ENABLE_CHECK_ABORTED_DURING_MERGE,
209211

210212
// validate that built-in similarities don't get redefined
211213
Setting.groupSetting("index.similarity.", (s) -> {

0 commit comments

Comments
 (0)