diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 520df8a8ebeca..c41b6926b3cae 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -89,7 +89,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } diff --git a/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java b/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java index 684dab5a8243a..3a8bd32559c61 100644 --- a/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java +++ b/server/src/main/java/org/elasticsearch/common/lucene/FilterIndexCommit.java @@ -71,4 +71,11 @@ public Map getUserData() throws IOException { public String toString() { return "FilterIndexCommit{" + "in=" + in + '}'; } + + public static IndexCommit unwrap(IndexCommit in) { + while (in instanceof FilterIndexCommit) { + in = ((FilterIndexCommit) in).getIndexCommit(); + } + return in; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java index 38caddd57f67a..d6ea943cc050b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java @@ -38,7 +38,7 @@ * In particular, this policy will delete index commits whose max sequence number is at most * the current global checkpoint except the index commit which has the highest max sequence number among those. */ -public class CombinedDeletionPolicy extends IndexDeletionPolicy { +public class CombinedDeletionPolicy extends ElasticsearchIndexDeletionPolicy { private final Logger logger; private final TranslogDeletionPolicy translogDeletionPolicy; private final SoftDeletesPolicy softDeletesPolicy; @@ -48,13 +48,6 @@ public class CombinedDeletionPolicy extends IndexDeletionPolicy { // when checking for externally acquired index commits that haven't been released private final Set internallyAcquiredIndexCommits; - interface CommitsListener { - - void onNewAcquiredCommit(IndexCommit commit, Set additionalFiles); - - void onDeletedCommit(IndexCommit commit); - } - @Nullable private final CommitsListener commitsListener; @@ -187,7 +180,6 @@ private void deleteCommit(IndexCommit commit) throws IOException { assert commit.isDeleted() == false : "Index commit [" + commitDescription(commit) + "] is deleted twice"; logger.debug("Delete index commit [{}]", commitDescription(commit)); commit.delete(); - assert commit.isDeleted() : "Deletion commit [" + commitDescription(commit) + "] was suppressed"; } private void updateRetentionPolicy() throws IOException { @@ -204,7 +196,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException { return SegmentInfos.readCommit(indexCommit.getDirectory(), indexCommit.getSegmentsFileName()).totalMaxDoc(); } - SafeCommitInfo getSafeCommitInfo() { + @Override + public SafeCommitInfo getSafeCommitInfo() { return safeCommitInfo; } @@ -214,7 +207,8 @@ SafeCommitInfo getSafeCommitInfo() { * * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. */ - synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { + @Override + public synchronized IndexCommit acquireIndexCommit(boolean acquiringSafeCommit) { return acquireIndexCommit(acquiringSafeCommit, false); } @@ -241,7 +235,8 @@ protected IndexCommit wrapCommit(IndexCommit indexCommit, boolean acquiredIntern * * @return true if the acquired commit can be clean up. */ - synchronized boolean releaseCommit(final IndexCommit acquiredCommit) { + @Override + public synchronized boolean releaseIndexCommit(final IndexCommit acquiredCommit) { final SnapshotIndexCommit snapshotIndexCommit = (SnapshotIndexCommit) acquiredCommit; final IndexCommit releasingCommit = snapshotIndexCommit.getIndexCommit(); assert acquiredIndexCommits.containsKey(releasingCommit) @@ -316,7 +311,8 @@ private static Set listOfNewFileNames(IndexCommit previous, IndexCommit /** * Checks whether the deletion policy is holding on to externally acquired index commits */ - synchronized boolean hasAcquiredIndexCommitsForTesting() { + @Override + public synchronized boolean hasAcquiredIndexCommitsForTesting() { // We explicitly check only external commits and disregard internal commits acquired by the commits listener for (var e : acquiredIndexCommits.entrySet()) { if (internallyAcquiredIndexCommits.contains(e.getKey()) == false || e.getValue() > 1) { @@ -329,7 +325,8 @@ synchronized boolean hasAcquiredIndexCommitsForTesting() { /** * Checks if the deletion policy can delete some index commits with the latest global checkpoint. */ - boolean hasUnreferencedCommits() { + @Override + public boolean hasUnreferencedCommits() { return maxSeqNoOfNextSafeCommit <= globalCheckpointSupplier.getAsLong(); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java new file mode 100644 index 0000000000000..0294dbc519781 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ElasticsearchIndexDeletionPolicy.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.IndexDeletionPolicy; + +import java.util.Set; + +public abstract class ElasticsearchIndexDeletionPolicy extends IndexDeletionPolicy { + + /** + * Captures the most recent commit point or the most recent safe commit point. + * Index files of the capturing commit point won't be released until the commit reference is closed. + * + * @param acquiringSafeCommit captures the most recent safe commit point if true; otherwise captures the most recent commit point. + */ + public abstract IndexCommit acquireIndexCommit(boolean acquiringSafeCommit); + + /** + * Releases an index commit that was acquired by {@link #acquireIndexCommit(boolean)}. + * + * @return true if the acquired commit can be clean up. + */ + public abstract boolean releaseIndexCommit(IndexCommit acquiredIndexCommit); + + /** + * @return information about the safe commit + */ + public abstract SafeCommitInfo getSafeCommitInfo(); + + public abstract boolean hasAcquiredIndexCommitsForTesting(); + + public abstract boolean hasUnreferencedCommits(); + + public interface CommitsListener { + + void onNewAcquiredCommit(IndexCommit commit, Set additionalFiles); + + void onDeletedCommit(IndexCommit commit); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index cdb8a39d4713b..f91deeccaea0d 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -41,6 +41,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -151,6 +152,11 @@ public Supplier retentionLeasesSupplier() { private final MergeMetrics mergeMetrics; + /** + * Allows to pass an {@link ElasticsearchIndexDeletionPolicy} wrapper to egine implementations. + */ + private final Function indexDeletionPolicyWrapper; + /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ @@ -184,7 +190,8 @@ public EngineConfig( boolean promotableToPrimary, MapperService mapperService, EngineResetLock engineResetLock, - MergeMetrics mergeMetrics + MergeMetrics mergeMetrics, + Function indexDeletionPolicyWrapper ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -233,6 +240,7 @@ public EngineConfig( this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); this.engineResetLock = engineResetLock; this.mergeMetrics = mergeMetrics; + this.indexDeletionPolicyWrapper = indexDeletionPolicyWrapper; } /** @@ -485,4 +493,11 @@ public EngineResetLock getEngineResetLock() { public MergeMetrics getMergeMetrics() { return mergeMetrics; } + + /** + * @return an {@link ElasticsearchIndexDeletionPolicy} wrapper, to be use by engine implementations. + */ + public Function getIndexDeletionPolicyWrapper() { + return indexDeletionPolicyWrapper; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 48c56a73f17ef..7753decfc4854 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -170,7 +170,7 @@ public class InternalEngine extends Engine { private final LocalCheckpointTracker localCheckpointTracker; - private final CombinedDeletionPolicy combinedDeletionPolicy; + private final ElasticsearchIndexDeletionPolicy indexDeletionPolicy; // How many callers are currently requesting index throttling. Currently there are only two situations where we do this: when merges // are falling behind and when writing indexing buffer to disk is too slow. When this is 0, there is no throttling, else we throttle @@ -277,13 +277,7 @@ public InternalEngine(EngineConfig engineConfig) { this.totalDiskSpace = ByteSizeValue.of(Environment.getFileStore(translog.location()).getTotalSpace(), ByteSizeUnit.BYTES); this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); this.softDeletesPolicy = newSoftDeletesPolicy(); - this.combinedDeletionPolicy = new CombinedDeletionPolicy( - logger, - translogDeletionPolicy, - softDeletesPolicy, - translog::getLastSyncedGlobalCheckpoint, - newCommitsListener() - ); + this.indexDeletionPolicy = newIndexDeletionPolicy(engineConfig, logger, translog, softDeletesPolicy); this.localCheckpointTracker = createLocalCheckpointTracker(localCheckpointTrackerSupplier); writer = createWriter(); bootstrapAppendOnlyInfoFromWriter(writer); @@ -391,6 +385,25 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException { ); } + protected ElasticsearchIndexDeletionPolicy newIndexDeletionPolicy( + EngineConfig engineConfig, + Logger logger, + Translog translog, + SoftDeletesPolicy softDeletesPolicy + ) { + var wrapper = engineConfig.getIndexDeletionPolicyWrapper(); + assert wrapper != null : "no index deletion policy wrapper for " + engineConfig.getShardId(); + return wrapper.apply( + new CombinedDeletionPolicy( + logger, + translog.getDeletionPolicy(), + softDeletesPolicy, + translog::getLastSyncedGlobalCheckpoint, + newCommitsListener() + ) + ); + } + @Nullable private CombinedDeletionPolicy.CommitsListener newCommitsListener() { IndexCommitListener listener = engineConfig.getIndexCommitListener(); @@ -682,7 +695,7 @@ Translog getTranslog() { // Package private for testing purposes only boolean hasAcquiredIndexCommitsForTesting() { - return combinedDeletionPolicy.hasAcquiredIndexCommitsForTesting(); + return indexDeletionPolicy.hasAcquiredIndexCommitsForTesting(); } @Override @@ -748,7 +761,7 @@ public Translog.Location getTranslogLastWriteLocation() { } private void revisitIndexDeletionPolicyOnTranslogSynced() throws IOException { - if (combinedDeletionPolicy.hasUnreferencedCommits()) { + if (indexDeletionPolicy.hasUnreferencedCommits()) { indexWriter.deleteUnusedFiles(); } translog.trimUnreferencedReaders(); @@ -2555,17 +2568,17 @@ public IndexCommitRef acquireLastIndexCommit(final boolean flushFirst) throws En future.actionGet(); logger.trace("finish flush for snapshot"); } - return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(false)); + return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(false)); } @Override public IndexCommitRef acquireSafeIndexCommit() throws EngineException { - return acquireIndexCommitRef(() -> combinedDeletionPolicy.acquireIndexCommit(true)); + return acquireIndexCommitRef(() -> indexDeletionPolicy.acquireIndexCommit(true)); } private void releaseIndexCommit(IndexCommit snapshot) throws IOException { // Revisit the deletion policy if we can clean up the snapshotting commit. - if (combinedDeletionPolicy.releaseCommit(snapshot)) { + if (indexDeletionPolicy.releaseIndexCommit(snapshot)) { try { // Here we don't have to trim translog because snapshotting an index commit // does not lock translog or prevents unreferenced files from trimming. @@ -2578,7 +2591,7 @@ private void releaseIndexCommit(IndexCommit snapshot) throws IOException { @Override public SafeCommitInfo getSafeCommitInfo() { - return combinedDeletionPolicy.getSafeCommitInfo(); + return indexDeletionPolicy.getSafeCommitInfo(); } private boolean failOnTragicEvent(AlreadyClosedException ex) { @@ -2756,7 +2769,7 @@ private IndexWriterConfig getIndexWriterConfig() { final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); iwc.setCommitOnClose(false); // we by default don't commit on close iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); - iwc.setIndexDeletionPolicy(combinedDeletionPolicy); + iwc.setIndexDeletionPolicy(indexDeletionPolicy); iwc.setInfoStream(TESTS_VERBOSE ? InfoStream.getDefault() : new LoggerInfoStream(logger)); iwc.setMergeScheduler(mergeScheduler.getMergeScheduler()); // Give us the opportunity to upgrade old segments while performing diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8dc7440c5dccf..59129f9911da2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3765,7 +3765,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { routingEntry().isPromotableToPrimary(), mapperService(), engineResetLock, - mergeMetrics + mergeMetrics, + Function.identity() ); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 699a34b312592..535a2dc100955 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -193,7 +193,7 @@ public void testAcquireIndexCommit() throws Exception { final IndexCommit lastCommit = commitList.get(commitList.size() - 1); safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commitList, globalCheckpoint.get()); assertThat( - indexPolicy.releaseCommit(snapshot), + indexPolicy.releaseIndexCommit(snapshot), equalTo(pendingSnapshots == 0 && snapshot.equals(lastCommit) == false && snapshot.equals(safeCommit) == false) ); } @@ -211,7 +211,7 @@ public void testAcquireIndexCommit() throws Exception { ) ); } - snapshottingCommits.forEach(indexPolicy::releaseCommit); + snapshottingCommits.forEach(indexPolicy::releaseIndexCommit); globalCheckpoint.set(randomLongBetween(lastMaxSeqNo, Long.MAX_VALUE)); commitList.forEach(this::resetDeletion); indexPolicy.onCommit(commitList); @@ -350,8 +350,8 @@ protected int getDocCountOfCommit(IndexCommit indexCommit) { } @Override - synchronized boolean releaseCommit(IndexCommit acquiredCommit) { - return super.releaseCommit(wrapCommit(acquiredCommit)); + public synchronized boolean releaseIndexCommit(IndexCommit acquiredCommit) { + return super.releaseIndexCommit(wrapCommit(acquiredCommit)); } }; @@ -383,7 +383,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { assertThat(deletedCommits, hasSize(0)); assertThat(newCommitFiles, containsInAnyOrder(equalTo("_1.cfe"), equalTo("_1.si"), equalTo("_1.cfs"), equalTo("segments_2"))); - boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit0); + boolean maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit0); assertThat(maybeCleanUpCommits, equalTo(true)); globalCheckpoint.set(20L); @@ -454,7 +454,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { ) ); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit2); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit2); assertThat("No commits to clean up (commit #2 is the safe commit)", maybeCleanUpCommits, equalTo(false)); globalCheckpoint.set(30L); @@ -499,13 +499,13 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { assertThat(deletedCommits, contains(commit0, commit2)); assertThat(newCommitFiles, containsInAnyOrder(equalTo("_4.cfe"), equalTo("_4.si"), equalTo("_4.cfs"), equalTo("segments_4"))); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit3); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit3); assertThat("No commits to clean up (commit #3 is the safe commit)", maybeCleanUpCommits, equalTo(false)); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit4); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit4); assertThat("No commits to clean up (commit #4 is the last commit)", maybeCleanUpCommits, equalTo(false)); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit1); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit1); assertThat(maybeCleanUpCommits, equalTo(true)); final boolean globalCheckpointCatchUp = randomBoolean(); @@ -560,7 +560,7 @@ synchronized boolean releaseCommit(IndexCommit acquiredCommit) { } assertThat(newCommitFiles, containsInAnyOrder(equalTo("_5.cfe"), equalTo("_5.si"), equalTo("_5.cfs"), equalTo("segments_5"))); - maybeCleanUpCommits = combinedDeletionPolicy.releaseCommit(commit5); + maybeCleanUpCommits = combinedDeletionPolicy.releaseIndexCommit(commit5); assertThat("No commits to clean up (commit #5 is the last commit)", maybeCleanUpCommits, equalTo(false)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a41a5dd9394b4..cfce840cda9f0 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3636,7 +3636,8 @@ public void testRecoverFromForeignTranslog() throws IOException { true, config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + Function.identity() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7245,7 +7246,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); @@ -7525,7 +7527,8 @@ public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) { globalCheckpoint::get, () -> RetentionLeases.EMPTY, new NoneCircuitBreakerService(), - indexCommitListener + indexCommitListener, + Function.identity() ) ) ) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 1d59d44c3def7..d3330cfc17216 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5087,7 +5087,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + Function.identity() ); return new InternalEngine(configWithWarmer); }); @@ -5370,7 +5371,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {} config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + Function.identity() ); lazyEngineConfig.set(engineConfigWithBlockingRefreshListener); return new InternalEngine(engineConfigWithBlockingRefreshListener) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 8f0604956a98b..bf6a4f4ec2d56 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; import java.util.function.Consumer; +import java.util.function.Function; import static org.elasticsearch.core.TimeValue.timeValueMillis; import static org.hamcrest.Matchers.arrayContaining; @@ -177,7 +178,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { true, EngineTestCase.createMapperService(), new EngineResetLock(), - MergeMetrics.NOOP + MergeMetrics.NOOP, + Function.identity() ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index e31a0391b66ed..36f50f3ce958e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -316,7 +316,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } @@ -351,7 +352,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } @@ -386,7 +388,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } @@ -788,7 +791,8 @@ public EngineConfig config( globalCheckpointSupplier, retentionLeasesSupplier, new NoneCircuitBreakerService(), - null + null, + Function.identity() ); } @@ -814,7 +818,8 @@ public EngineConfig config( maybeGlobalCheckpointSupplier, maybeGlobalCheckpointSupplier == null ? null : () -> RetentionLeases.EMPTY, breakerService, - null + null, + Function.identity() ); } @@ -829,7 +834,8 @@ public EngineConfig config( final @Nullable LongSupplier maybeGlobalCheckpointSupplier, final @Nullable Supplier maybeRetentionLeasesSupplier, final CircuitBreakerService breakerService, - final @Nullable Engine.IndexCommitListener indexCommitListener + final @Nullable Engine.IndexCommitListener indexCommitListener, + final @Nullable Function indexDeletionPolicyWrapper ) { final IndexWriterConfig iwc = newIndexWriterConfig(); final TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); @@ -893,7 +899,8 @@ public EngineConfig config( true, mapperService, new EngineResetLock(), - mergeMetrics + mergeMetrics, + indexDeletionPolicyWrapper == null ? Function.identity() : indexDeletionPolicyWrapper ); } @@ -936,7 +943,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.isPromotableToPrimary(), config.getMapperService(), config.getEngineResetLock(), - config.getMergeMetrics() + config.getMergeMetrics(), + config.getIndexDeletionPolicyWrapper() ); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 54bdbd0c0e91c..93b3a00c1019a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -75,6 +75,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; @@ -286,7 +287,8 @@ public void onFailedEngine(String reason, Exception e) { true, mapperService, new EngineResetLock(), - MergeMetrics.NOOP + MergeMetrics.NOOP, + Function.identity() ); }