diff --git a/docs/changelog/132450.yaml b/docs/changelog/132450.yaml new file mode 100644 index 0000000000000..8c334ce65dd89 --- /dev/null +++ b/docs/changelog/132450.yaml @@ -0,0 +1,6 @@ +pr: 132450 +summary: Check if merge is aborted before executing integrity checks and access fields + and documents +area: Engine +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/CheckAbortedMergesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/CheckAbortedMergesIT.java new file mode 100644 index 0000000000000..ccff6a6725bae --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/CheckAbortedMergesIT.java @@ -0,0 +1,364 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilterCodecReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OneMergeWrappingMergePolicy; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.codec.FilterDocValuesProducer; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.StreamSupport; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class CheckAbortedMergesIT extends ESIntegTestCase { + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + var plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(BlockRunningMergesEngineTestPlugin.class); + return plugins; + } + + public void testAbortedMerges() throws Exception { + internalCluster().startMasterOnlyNode(); + var nodeA = internalCluster().startDataOnlyNode(); + + var pluginA = internalCluster().getInstance(PluginsService.class, nodeA) + .filterPlugins(BlockRunningMergesEngineTestPlugin.class) + .findFirst() + .orElseThrow(() -> new AssertionError("Plugin not found")); + + final boolean checkAbortedMerges = randomBoolean(); + pluginA.blockMerges(); + + final var indexName = randomIdentifier(); + createIndex( + indexName, + indexSettings(1, 0).put(CheckAbortedDuringMergePolicy.ENABLE_CHECK_ABORTED_DURING_MERGE.getKey(), checkAbortedMerges) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE) + .build() + ); + + var indexServiceA = internalCluster().getInstance(IndicesService.class, nodeA).indexService(resolveIndex(indexName)); + assertThat(indexServiceA.hasShard(0), equalTo(true)); + + indexDocs(indexName, 10); + flush(indexName); + + while (true) { + indexDocs(indexName, 10); + flush(indexName); + + var mergesStats = client().admin().indices().prepareStats(indexName).clear().setMerge(true).get(); + if (mergesStats.getIndices().get(indexName).getPrimaries().getMerge().getCurrent() > 0) { + break; + } + } + + var nodeB = internalCluster().startDataOnlyNode(); + ensureStableCluster(3); + + pluginA.waitForMergesBlocked(); + + ClusterRerouteUtils.reroute(client(), new MoveAllocationCommand(indexName, 0, nodeA, nodeB, ProjectId.DEFAULT)); + ensureGreen(indexName); + + var indexServiceB = internalCluster().getInstance(IndicesService.class, nodeB).indexService(resolveIndex(indexName)); + assertBusy(() -> assertThat(indexServiceB.hasShard(0), equalTo(true))); + assertBusy(() -> assertThat(indexServiceA.hasShard(0), equalTo(false))); + if (randomBoolean()) { + forceMerge(); + } + + assertThat(pluginA.mergedDocsCount.get(), equalTo(0L)); + assertThat(pluginA.mergedFieldsCount.get(), equalTo(0L)); + assertThat(pluginA.checkIntegrityCount.get(), equalTo(0L)); + + pluginA.unblockMerges(); + + var mergeMetrics = internalCluster().getDataNodeInstances(MergeMetrics.class); + assertBusy( + () -> assertThat( + StreamSupport.stream(mergeMetrics.spliterator(), false) + .mapToLong(m -> m.getQueuedMergeSizeInBytes() + m.getRunningMergeSizeInBytes()) + .sum(), + equalTo(0L) + ) + ); + + assertBusy(() -> { + if (checkAbortedMerges) { + assertThat(pluginA.mergedDocsCount.get(), equalTo(0L)); + assertThat(pluginA.mergedFieldsCount.get(), equalTo(0L)); + // Only the first integrity check is completed, the following ones should have been aborted + assertThat(pluginA.checkIntegrityCount.get(), equalTo(1L)); + } else { + assertThat(pluginA.mergedDocsCount.get(), greaterThan(0L)); + assertThat(pluginA.mergedFieldsCount.get(), greaterThan(0L)); + assertThat(pluginA.checkIntegrityCount.get(), greaterThan(1L)); + } + }); + } + + private static BulkResponse indexDocs(String indexName, int numDocs) { + final var client = client(); + var bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (int i = 0; i < numDocs; i++) { + var indexRequest = client.prepareIndex(indexName) + .setSource(Map.of("text", randomUnicodeOfCodepointLengthBetween(1, 25), "integer", randomIntBetween(0, 100))); + bulkRequest.add(indexRequest); + } + var bulkResponse = bulkRequest.get(); + assertNoFailures(bulkResponse); + return bulkResponse; + } + + /** + * An engine plugin that allows to block running merges. + * + * Note: merges are blocked before executing the first integrity check on stored fields of the first segment to be merged + */ + public static class BlockRunningMergesEngineTestPlugin extends Plugin implements EnginePlugin { + + // Merges are not blocked by default + private final AtomicBoolean blockMerges = new AtomicBoolean(false); + + // Number of checkIntegrity() method calls that have been executed + private final AtomicLong checkIntegrityCount = new AtomicLong(0L); + + // Number of time a field has been accessed during merges + private final AtomicLong mergedFieldsCount = new AtomicLong(0L); + + // Number of time a doc has been accessed during merges + private final AtomicLong mergedDocsCount = new AtomicLong(0L); + + // Used to block merges from running immediately + private final AtomicBoolean mergesStarted = new AtomicBoolean(); + private final CountDownLatch mergesStartedLatch = new CountDownLatch(1); + private final CountDownLatch resumeMerges = new CountDownLatch(1); + + void blockMerges() { + if (blockMerges.compareAndSet(false, true) == false) { + throw new AssertionError("Merges already blocked"); + } + } + + void waitForMergesBlocked() { + safeAwait(mergesStartedLatch); + } + + void unblockMerges() { + if (blockMerges.compareAndSet(true, false) == false) { + throw new AssertionError("Merges already unblocked"); + } + resumeMerges.countDown(); + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of( + config -> new InternalEngine( + new EngineConfig( + config.getShardId(), + config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), + config.getIndexSettings(), + config.getWarmer(), + config.getStore(), + wrapMergePolicy(config.getMergePolicy()), + config.getAnalyzer(), + config.getSimilarity(), + config.getCodecProvider(), + config.getEventListener(), + config.getQueryCache(), + config.getQueryCachingPolicy(), + config.getTranslogConfig(), + config.getFlushMergesAfter(), + config.getExternalRefreshListener(), + config.getInternalRefreshListener(), + config.getIndexSort(), + config.getCircuitBreakerService(), + config.getGlobalCheckpointSupplier(), + config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), + config.getSnapshotCommitSupplier(), + config.getLeafSorter(), + config.getRelativeTimeInNanosSupplier(), + config.getIndexCommitListener(), + config.isPromotableToPrimary(), + config.getMapperService(), + config.getEngineResetLock(), + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() + ) + ) + ); + } + + private MergePolicy wrapMergePolicy(MergePolicy policy) { + if (blockMerges.get() == false) { + return policy; + } + return new OneMergeWrappingMergePolicy(policy, toWrap -> new MergePolicy.OneMerge(toWrap) { + + void maybeBlockMerge() { + if (mergesStarted.compareAndSet(false, true)) { + mergesStartedLatch.countDown(); + } + safeAwait(resumeMerges, TimeValue.ONE_HOUR); + } + + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + return new FilterCodecReader(toWrap.wrapForMerge(reader)) { + + @Override + public CacheHelper getReaderCacheHelper() { + return in.getReaderCacheHelper(); + } + + @Override + public CacheHelper getCoreCacheHelper() { + return in.getCoreCacheHelper(); + } + + @Override + public StoredFieldsReader getFieldsReader() { + return new WrappedStoredFieldsReader(super.getFieldsReader()); + } + + private class WrappedStoredFieldsReader extends StoredFieldsReader { + + private final StoredFieldsReader delegate; + + private WrappedStoredFieldsReader(StoredFieldsReader delegate) { + this.delegate = delegate; + } + + @Override + public void checkIntegrity() throws IOException { + maybeBlockMerge(); + delegate.checkIntegrity(); + checkIntegrityCount.incrementAndGet(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + delegate.document(docID, visitor); + mergedDocsCount.incrementAndGet(); + } + + @Override + public StoredFieldsReader clone() { + return new WrappedStoredFieldsReader(delegate.clone()); + } + + @Override + public StoredFieldsReader getMergeInstance() { + return new WrappedStoredFieldsReader(delegate.getMergeInstance()); + } + } + + @Override + public DocValuesProducer getDocValuesReader() { + return new FilterDocValuesProducer(super.getDocValuesReader()) { + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + var result = super.getNumeric(field); + mergedFieldsCount.incrementAndGet(); + return result; + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + var result = super.getBinary(field); + mergedFieldsCount.incrementAndGet(); + return result; + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + var result = super.getSorted(field); + mergedFieldsCount.incrementAndGet(); + return result; + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + var result = super.getSortedNumeric(field); + mergedFieldsCount.incrementAndGet(); + return result; + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + var result = super.getSortedSet(field); + mergedFieldsCount.incrementAndGet(); + return result; + } + + @Override + public void checkIntegrity() throws IOException { + maybeBlockMerge(); + super.checkIntegrity(); + checkIntegrityCount.incrementAndGet(); + } + }; + } + }; + } + }); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java index 9f4c5b80ccf23..d43a03dea4dfc 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.cache.bitset.BitsetFilterCache; +import org.elasticsearch.index.engine.CheckAbortedDuringMergePolicy; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.mapper.FieldMapper; @@ -206,6 +207,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_MAPPER_SOURCE_MODE_SETTING, IndexSettings.RECOVERY_USE_SYNTHETIC_SOURCE_SETTING, InferenceMetadataFieldsMapper.USE_LEGACY_SEMANTIC_TEXT_FORMAT, + CheckAbortedDuringMergePolicy.ENABLE_CHECK_ABORTED_DURING_MERGE, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { diff --git a/server/src/main/java/org/elasticsearch/index/engine/CheckAbortedDuringMergePolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CheckAbortedDuringMergePolicy.java new file mode 100644 index 0000000000000..702753cb6af98 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/CheckAbortedDuringMergePolicy.java @@ -0,0 +1,253 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.CodecReader; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilterCodecReader; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OneMergeWrappingMergePolicy; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.StoredFieldVisitor; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.core.CheckedRunnable; +import org.elasticsearch.index.codec.FilterDocValuesProducer; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; + +import java.io.IOException; +import java.util.function.LongSupplier; + +import static org.elasticsearch.common.Strings.format; + +/** + * A policy that checks if a merge is aborted before accessing segment files to merge. + */ +public class CheckAbortedDuringMergePolicy extends OneMergeWrappingMergePolicy { + + private interface Checker { + + void ensureNotAborted() throws MergeAbortedException; + + void ensureNotAborted(CheckedRunnable runnable, String method) throws IOException; + } + + public static final Setting ENABLE_CHECK_ABORTED_DURING_MERGE = Setting.boolSetting( + "index.merge.check_aborted_during_merge_policy.enabled", + false, + Setting.Property.IndexScope, + Setting.Property.Dynamic, + Setting.Property.ServerlessPublic + ); + + private static final Logger logger = LogManager.getLogger(CheckAbortedDuringMergePolicy.class); + + public CheckAbortedDuringMergePolicy(final ShardId shardId, final MergePolicy delegate, LongSupplier relativeTimeInMillis) { + super(delegate, toWrap -> new OneMerge(toWrap) { + @Override + public CodecReader wrapForMerge(CodecReader reader) throws IOException { + return wrapReader(shardId, toWrap.wrapForMerge(reader), this, relativeTimeInMillis); + } + }); + } + + private static CodecReader wrapReader(ShardId shardId, CodecReader reader, OneMerge oneMerge, LongSupplier relativeTimeInMillis) { + return new CheckingCodecReader(logger, shardId, reader, oneMerge, relativeTimeInMillis); + } + + private static class CheckingCodecReader extends FilterCodecReader implements Checker { + + private final Logger logger; + private final ShardId shardId; + private final OneMerge oneMerge; + private final LongSupplier relativeTimeInMillis; + + private CheckingCodecReader( + Logger logger, + ShardId shardId, + CodecReader delegate, + OneMerge oneMerge, + LongSupplier relativeTimeInMillis + ) { + super(delegate); + this.logger = logger; + this.shardId = shardId; + this.oneMerge = oneMerge; + this.relativeTimeInMillis = relativeTimeInMillis; + } + + @Override + public final void ensureNotAborted() throws MergeAbortedException { + try { + oneMerge.checkAborted(); + } catch (MergeAbortedException e) { + logger.debug(() -> format("%s merge is aborted", shardId)); + throw e; + } + } + + @Override + public final void ensureNotAborted(CheckedRunnable runnable, String method) throws IOException { + logger.debug(() -> format("%s %s - start", shardId, method)); + boolean executed = false; + long startTimeInMillis = relativeTimeInMillis.getAsLong(); + try { + ensureNotAborted(); + runnable.run(); + executed = true; + } finally { + long endTimeInMillis = relativeTimeInMillis.getAsLong(); + boolean success = executed; + logger.debug( + () -> format("%s %s - end (success: %s, took: {} ms)", shardId, method, success, (endTimeInMillis - startTimeInMillis)) + ); + } + } + + @Override + public CacheHelper getCoreCacheHelper() { + return getDelegate().getCoreCacheHelper(); + } + + @Override + public CacheHelper getReaderCacheHelper() { + return getDelegate().getReaderCacheHelper(); + } + + @Override + public StoredFieldsReader getFieldsReader() { + return new CheckingStoredFieldsReader(super.getFieldsReader(), this); + } + + @Override + public DocValuesProducer getDocValuesReader() { + return new CheckingDocValuesProducer(super.getDocValuesReader(), this); + } + + @Override + public void checkIntegrity() throws IOException { + ensureNotAborted(() -> super.checkIntegrity(), "CodecReader#checkIntegrity"); + } + } + + private static class CheckingStoredFieldsReader extends StoredFieldsReader { + + private final StoredFieldsReader delegate; + private final Checker checker; + private long visitedDocs; + + private CheckingStoredFieldsReader(StoredFieldsReader delegate, Checker checker) { + this.delegate = delegate; + this.checker = checker; + } + + @Override + public void document(int docID, StoredFieldVisitor visitor) throws IOException { + if (visitedDocs % 100L == 0L) { + checker.ensureNotAborted(); + } + delegate.document(docID, visitor); + visitedDocs += 1L; + } + + @Override + public StoredFieldsReader clone() { + return new CheckingStoredFieldsReader(delegate.clone(), checker); + } + + @Override + public StoredFieldsReader getMergeInstance() { + return new CheckingStoredFieldsReader(delegate.getMergeInstance(), checker); + } + + @Override + public void checkIntegrity() throws IOException { + checker.ensureNotAborted(() -> delegate.checkIntegrity(), "StoredFieldsReader#checkIntegrity"); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + } + + private static class CheckingDocValuesProducer extends FilterDocValuesProducer { + + private final Checker checker; + private long visitedFields; + + protected CheckingDocValuesProducer(DocValuesProducer delegate, Checker checker) { + super(delegate); + this.checker = checker; + } + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + if (visitedFields % 100L == 0L) { + checker.ensureNotAborted(); + } + var values = super.getNumeric(field); + visitedFields += 1; + return values; + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + if (visitedFields % 100L == 0L) { + checker.ensureNotAborted(); + } + var values = super.getBinary(field); + visitedFields += 1; + return values; + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + if (visitedFields % 100L == 0L) { + checker.ensureNotAborted(); + } + var values = super.getSorted(field); + visitedFields += 1; + return values; + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + if (visitedFields % 100L == 0L) { + checker.ensureNotAborted(); + } + var values = super.getSortedNumeric(field); + visitedFields += 1; + return values; + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + if (visitedFields % 100L == 0L) { + checker.ensureNotAborted(); + } + var values = super.getSortedSet(field); + visitedFields += 1; + return values; + } + + @Override + public void checkIntegrity() throws IOException { + checker.ensureNotAborted(() -> super.checkIntegrity(), "DocValuesProducer#checkIntegrity"); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index db4686bb4faf1..96a1c6249fd59 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -134,6 +134,7 @@ import java.util.stream.Stream; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.index.engine.CheckAbortedDuringMergePolicy.ENABLE_CHECK_ABORTED_DURING_MERGE; public class InternalEngine extends Engine { @@ -2796,6 +2797,13 @@ private IndexWriterConfig getIndexWriterConfig() { // to enable it. mergePolicy = new ShuffleForcedMergePolicy(mergePolicy); } + if (ENABLE_CHECK_ABORTED_DURING_MERGE.get(engineConfig.getIndexSettings().getSettings())) { + mergePolicy = new CheckAbortedDuringMergePolicy( + shardId, + mergePolicy, + engineConfig.getThreadPool().relativeTimeInMillisSupplier() + ); + } iwc.setMergePolicy(mergePolicy); // TODO: Introduce an index setting for setMaxFullFlushMergeWaitMillis iwc.setMaxFullFlushMergeWaitMillis(-1);