diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 507724bbaeb3c..72baecc593db6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.MapperMetrics; @@ -638,7 +639,8 @@ public static final IndexShard newIndexShard( System::nanoTime, null, MapperMetrics.NOOP, - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index a5166a5e68da9..520df8a8ebeca 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -88,7 +88,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 3418d8a9b7b2e..c32e7c7532e56 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperMetrics; @@ -179,6 +180,7 @@ public interface DirectoryWrapper { private final SetOnce indexCommitListener = new SetOnce<>(); private final MapperMetrics mapperMetrics; private final IndexingStatsSettings indexingStatsSettings; + private final MergeMetrics mergeMetrics; /** * Construct the index module for the index with the specified index settings. The index module contains extension points for plugins @@ -188,6 +190,7 @@ public interface DirectoryWrapper { * @param analysisRegistry the analysis registry * @param engineFactory the engine factory * @param directoryFactories the available store types + * @param mergeMetrics */ public IndexModule( final IndexSettings indexSettings, @@ -200,7 +203,8 @@ public IndexModule( final SlowLogFieldProvider slowLogFieldProvider, final MapperMetrics mapperMetrics, final List searchOperationListeners, - final IndexingStatsSettings indexingStatsSettings + final IndexingStatsSettings indexingStatsSettings, + final MergeMetrics mergeMetrics ) { this.indexSettings = indexSettings; this.analysisRegistry = analysisRegistry; @@ -216,6 +220,7 @@ public IndexModule( this.recoveryStateFactories = recoveryStateFactories; this.mapperMetrics = mapperMetrics; this.indexingStatsSettings = indexingStatsSettings; + this.mergeMetrics = mergeMetrics; } /** @@ -552,7 +557,8 @@ public IndexService newIndexService( indexCommitListener.get(), mapperMetrics, queryRewriteInterceptor, - indexingStatsSettings + indexingStatsSettings, + mergeMetrics ); success = true; return indexService; diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index d5c00294aa6b8..c31c7490d0ee0 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; @@ -170,6 +171,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final MapperMetrics mapperMetrics; private final QueryRewriteInterceptor queryRewriteInterceptor; private final IndexingStatsSettings indexingStatsSettings; + private final MergeMetrics mergeMetrics; @SuppressWarnings("this-escape") public IndexService( @@ -207,7 +209,8 @@ public IndexService( Engine.IndexCommitListener indexCommitListener, MapperMetrics mapperMetrics, QueryRewriteInterceptor queryRewriteInterceptor, - IndexingStatsSettings indexingStatsSettings + IndexingStatsSettings indexingStatsSettings, + MergeMetrics mergeMetrics ) { super(indexSettings); assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS @@ -293,6 +296,7 @@ public IndexService( this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); } this.indexingStatsSettings = indexingStatsSettings; + this.mergeMetrics = mergeMetrics; updateFsyncTaskIfNecessary(); } @@ -583,7 +587,8 @@ public synchronized IndexShard createShard( System::nanoTime, indexCommitListener, mapperMetrics, - indexingStatsSettings + indexingStatsSettings, + mergeMetrics ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 6137aed83ec7b..cdb8a39d4713b 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -149,6 +149,8 @@ public Supplier retentionLeasesSupplier() { private final EngineResetLock engineResetLock; + private final MergeMetrics mergeMetrics; + /** * Creates a new {@link org.elasticsearch.index.engine.EngineConfig} */ @@ -181,7 +183,8 @@ public EngineConfig( Engine.IndexCommitListener indexCommitListener, boolean promotableToPrimary, MapperService mapperService, - EngineResetLock engineResetLock + EngineResetLock engineResetLock, + MergeMetrics mergeMetrics ) { this.shardId = shardId; this.indexSettings = indexSettings; @@ -229,6 +232,7 @@ public EngineConfig( // always use compound on flush - reduces # of file-handles on refresh this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true); this.engineResetLock = engineResetLock; + this.mergeMetrics = mergeMetrics; } /** @@ -477,4 +481,8 @@ public MapperService getMapperService() { public EngineResetLock getEngineResetLock() { return engineResetLock; } + + public MergeMetrics getMergeMetrics() { + return mergeMetrics; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index f48318e519e74..f16fe5ccf9bbe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -187,6 +187,7 @@ public class InternalEngine extends Engine { private final CounterMetric numDocAppends = new CounterMetric(); private final CounterMetric numDocUpdates = new CounterMetric(); private final MeanMetric totalFlushTimeExcludingWaitingOnLock = new MeanMetric(); + private final MergeMetrics mergeMetrics; private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField(); private final SoftDeletesPolicy softDeletesPolicy; @@ -239,6 +240,7 @@ public InternalEngine(EngineConfig engineConfig) { InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction localCheckpointTrackerSupplier) { super(engineConfig); this.maxDocs = maxDocs; + this.mergeMetrics = engineConfig.getMergeMetrics(); this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier(); this.lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); // default to creation timestamp this.liveVersionMapArchive = createLiveVersionMapArchive(); @@ -2914,7 +2916,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { - super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes); + super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, mergeMetrics); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java b/server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java new file mode 100644 index 0000000000000..f381f442d05a0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.MergePolicy; +import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.telemetry.TelemetryProvider; +import org.elasticsearch.telemetry.metric.LongCounter; +import org.elasticsearch.telemetry.metric.LongHistogram; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.telemetry.metric.MeterRegistry; + +import java.util.concurrent.atomic.AtomicLong; + +public class MergeMetrics { + + public static final String MERGE_SEGMENTS_SIZE = "es.merge.segments.size"; + public static final String MERGE_DOCS_TOTAL = "es.merge.docs.total"; + public static final String MERGE_SEGMENTS_QUEUED_USAGE = "es.merge.segments.queued.usage"; + public static final String MERGE_SEGMENTS_RUNNING_USAGE = "es.merge.segments.running.usage"; + public static final String MERGE_SEGMENTS_MERGED_SIZE = "es.merge.segments.merged.size"; + public static final String MERGE_QUEUED_ESTIMATED_MEMORY_SIZE = "es.merge.segments.memory.size"; + public static final String MERGE_TIME_IN_SECONDS = "es.merge.time"; + public static MergeMetrics NOOP = new MergeMetrics(TelemetryProvider.NOOP.getMeterRegistry()); + + private final LongCounter mergeSizeInBytes; + private final LongCounter mergeMergedSegmentSizeInBytes; + private final LongCounter mergeNumDocs; + private final LongHistogram mergeTimeInSeconds; + + private final AtomicLong runningMergeSizeInBytes = new AtomicLong(); + private final AtomicLong queuedMergeSizeInBytes = new AtomicLong(); + private final AtomicLong queuedEstimatedMergeMemoryInBytes = new AtomicLong(); + + public MergeMetrics(MeterRegistry meterRegistry) { + mergeSizeInBytes = meterRegistry.registerLongCounter(MERGE_SEGMENTS_SIZE, "Total size of segments merged", "bytes"); + meterRegistry.registerLongGauge( + MERGE_SEGMENTS_QUEUED_USAGE, + "Total usage of segments queued to be merged", + "bytes", + () -> new LongWithAttributes(queuedMergeSizeInBytes.get()) + ); + meterRegistry.registerLongGauge( + MERGE_SEGMENTS_RUNNING_USAGE, + "Total usage of segments currently being merged", + "bytes", + () -> new LongWithAttributes(runningMergeSizeInBytes.get()) + ); + mergeMergedSegmentSizeInBytes = meterRegistry.registerLongCounter( + MERGE_SEGMENTS_MERGED_SIZE, + "Total size of the new merged segments", + "bytes" + ); + mergeNumDocs = meterRegistry.registerLongCounter(MERGE_DOCS_TOTAL, "Total number of documents merged", "documents"); + mergeTimeInSeconds = meterRegistry.registerLongHistogram(MERGE_TIME_IN_SECONDS, "Merge time in seconds", "seconds"); + meterRegistry.registerLongGauge( + MERGE_QUEUED_ESTIMATED_MEMORY_SIZE, + "Estimated memory usage for queued merges", + "bytes", + () -> new LongWithAttributes(queuedEstimatedMergeMemoryInBytes.get()) + ); + } + + public void incrementQueuedMergeBytes(OnGoingMerge currentMerge, long estimatedMemorySize) { + queuedMergeSizeInBytes.getAndAdd(currentMerge.getTotalBytesSize()); + queuedEstimatedMergeMemoryInBytes.getAndAdd(estimatedMemorySize); + } + + public void moveQueuedMergeBytesToRunning(OnGoingMerge currentMerge, long estimatedMemorySize) { + long totalSize = currentMerge.getTotalBytesSize(); + queuedMergeSizeInBytes.getAndAdd(-totalSize); + runningMergeSizeInBytes.getAndAdd(totalSize); + queuedEstimatedMergeMemoryInBytes.getAndAdd(-estimatedMemorySize); + } + + public void decrementRunningMergeBytes(OnGoingMerge currentMerge) { + runningMergeSizeInBytes.getAndAdd(-currentMerge.getTotalBytesSize()); + } + + public void markMergeMetrics(MergePolicy.OneMerge currentMerge, long mergedSegmentSize, long tookMillis) { + mergeSizeInBytes.incrementBy(currentMerge.totalBytesSize()); + mergeMergedSegmentSizeInBytes.incrementBy(mergedSegmentSize); + mergeNumDocs.incrementBy(currentMerge.totalNumDocs()); + mergeTimeInSeconds.record(tookMillis / 1000); + } + + public long getQueuedMergeSizeInBytes() { + return queuedMergeSizeInBytes.get(); + } + + public long getRunningMergeSizeInBytes() { + return runningMergeSizeInBytes.get(); + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index 33ef06699c8c7..3c1278c336e3a 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -30,7 +30,9 @@ import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.shard.ShardId; +import java.io.FileNotFoundException; import java.io.IOException; +import java.nio.file.NoSuchFileException; import java.util.Comparator; import java.util.HashMap; import java.util.Locale; @@ -52,6 +54,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics private final MergeSchedulerConfig config; protected final Logger logger; private final MergeTracking mergeTracking; + private final MergeMetrics mergeMetrics; private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final PriorityQueue backloggedMergeTasks = new PriorityQueue<>( 16, @@ -74,16 +77,19 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics * @param indexSettings used to obtain the {@link MergeSchedulerConfig} * @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler * @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take + * @param mergeMetrics */ public ThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService, - MergeMemoryEstimateProvider mergeMemoryEstimateProvider + MergeMemoryEstimateProvider mergeMemoryEstimateProvider, + MergeMetrics mergeMetrics ) { this.shardId = shardId; this.config = indexSettings.getMergeSchedulerConfig(); this.logger = Loggers.getLogger(getClass(), shardId); + this.mergeMetrics = mergeMetrics; this.mergeTracking = new MergeTracking( logger, () -> this.config.isAutoThrottle() @@ -214,6 +220,7 @@ protected void handleMergeException(Throwable t) { boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { try { MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger); + mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes()); mergeQueued(mergeTask.onGoingMerge); return threadPoolMergeExecutorService.submitMergeTask(mergeTask); } finally { @@ -297,6 +304,7 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) { private void mergeTaskDone(OnGoingMerge merge) { doneMergeTaskCount.incrementAndGet(); + mergeMetrics.decrementRunningMergeBytes(merge); mergeExecutedOrAborted(merge); checkMergeTaskThrottling(); } @@ -418,6 +426,7 @@ public void run() { assert isRunning() == false; assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) : "runNowOrBacklog must be invoked before actually running the merge task"; + boolean success = false; try { beforeMerge(onGoingMerge); try { @@ -425,11 +434,13 @@ public void run() { throw new IllegalStateException("The merge task is already started or aborted"); } mergeTracking.mergeStarted(onGoingMerge); + mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes); if (verbose()) { message(String.format(Locale.ROOT, "merge task %s start", this)); } try { doMerge(mergeSource, onGoingMerge.getMerge()); + success = onGoingMerge.getMerge().isAborted() == false; if (verbose()) { message( String.format( @@ -449,6 +460,10 @@ public void run() { } } finally { long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get()); + if (success) { + long newSegmentSize = getNewSegmentSize(onGoingMerge.getMerge()); + mergeMetrics.markMergeMetrics(onGoingMerge.getMerge(), newSegmentSize, tookMS); + } mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS); } } finally { @@ -523,6 +538,24 @@ public OnGoingMerge getOnGoingMerge() { return onGoingMerge; } + private static long getNewSegmentSize(MergePolicy.OneMerge currentMerge) { + try { + return currentMerge.getMergeInfo().sizeInBytes(); + } catch (FileNotFoundException | NoSuchFileException e) { + // It is (rarely) possible that the merged segment could be merged away by the IndexWriter prior to reaching this point. + // Once the IW creates the new segment, it could be exposed to be included in a new merge. That merge can be executed + // concurrently if more than 1 merge threads are configured. That new merge allows this IW to delete segment created by + // this merge. Although the files may still be available in the object store for executing searches, the IndexDirectory + // will no longer have references to the underlying segment files and will throw file not found if we try to read them. + // In this case, we will ignore that exception (which would otherwise fail the shard) and use the originally estimated + // merge size for metrics. + return currentMerge.estimatedMergeBytes; + } catch (IOException e) { + // TODO how to handle? + return currentMerge.estimatedMergeBytes; + } + } + @Override public String toString() { return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : ""); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 4b38a5d378ecf..ca2be7d75a28f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -91,6 +91,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; @@ -268,6 +269,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final MeanMetric externalRefreshMetric = new MeanMetric(); private final MeanMetric flushMetric = new MeanMetric(); private final CounterMetric periodicFlushMetric = new CounterMetric(); + private final MergeMetrics mergeMetrics; private final ShardEventListener shardEventListener = new ShardEventListener(); @@ -341,7 +343,8 @@ public IndexShard( final LongSupplier relativeTimeInNanosSupplier, final Engine.IndexCommitListener indexCommitListener, final MapperMetrics mapperMetrics, - final IndexingStatsSettings indexingStatsSettings + final IndexingStatsSettings indexingStatsSettings, + final MergeMetrics mergeMetrics ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -429,6 +432,7 @@ public IndexShard( this.refreshFieldHasValueListener = new RefreshFieldHasValueListener(); this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier; this.indexCommitListener = indexCommitListener; + this.mergeMetrics = mergeMetrics; } public ThreadPool getThreadPool() { @@ -3744,7 +3748,8 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) { indexCommitListener, routingEntry().isPromotableToPrimary(), mapperService(), - engineResetLock + engineResetLock, + mergeMetrics ); } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 73747bc798d30..5d21c6c90a786 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -98,6 +98,7 @@ import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; @@ -281,6 +282,7 @@ public class IndicesService extends AbstractLifecycleComponent private final QueryRewriteInterceptor queryRewriteInterceptor; final SlowLogFieldProvider slowLogFieldProvider; // pkg-private for testingå private final IndexingStatsSettings indexStatsSettings; + private final MergeMetrics mergeMetrics; @Override protected void doStart() { @@ -355,6 +357,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon this.requestCacheKeyDifferentiator = builder.requestCacheKeyDifferentiator; this.queryRewriteInterceptor = builder.queryRewriteInterceptor; this.mapperMetrics = builder.mapperMetrics; + this.mergeMetrics = builder.mergeMetrics; // doClose() is called when shutting down a node, yet there might still be ongoing requests // that we need to wait for before closing some resources such as the caches. In order to // avoid closing these resources while ongoing requests are still being processed, we use a @@ -795,7 +798,8 @@ private synchronized IndexService createIndexService( slowLogFieldProvider, mapperMetrics, searchOperationListeners, - indexStatsSettings + indexStatsSettings, + mergeMetrics ); for (IndexingOperationListener operationListener : indexingOperationListeners) { indexModule.addIndexOperationListener(operationListener); @@ -893,7 +897,8 @@ public synchronized MapperService createIndexMapperServiceForValidation(IndexMet slowLogFieldProvider, mapperMetrics, searchOperationListeners, - indexStatsSettings + indexStatsSettings, + mergeMetrics ); pluginsService.forEach(p -> p.onIndexModule(indexModule)); return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService); diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java index df3ed42db37a0..3b7f4d24869f2 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesServiceBuilder.java @@ -27,6 +27,7 @@ import org.elasticsearch.index.SlowLogFields; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; import org.elasticsearch.index.shard.SearchOperationListener; @@ -79,6 +80,7 @@ public class IndicesServiceBuilder { @Nullable CheckedBiConsumer requestCacheKeyDifferentiator; MapperMetrics mapperMetrics; + MergeMetrics mergeMetrics; List searchOperationListener = List.of(); QueryRewriteInterceptor queryRewriteInterceptor = null; SlowLogFieldProvider slowLogFieldProvider = new SlowLogFieldProvider() { @@ -206,6 +208,11 @@ public IndicesServiceBuilder mapperMetrics(MapperMetrics mapperMetrics) { return this; } + public IndicesServiceBuilder mergeMetrics(MergeMetrics mergeMetrics) { + this.mergeMetrics = mergeMetrics; + return this; + } + public List searchOperationListeners() { return searchOperationListener; } @@ -244,6 +251,7 @@ public IndicesService build() { Objects.requireNonNull(indexFoldersDeletionListeners); Objects.requireNonNull(snapshotCommitSuppliers); Objects.requireNonNull(mapperMetrics); + Objects.requireNonNull(mergeMetrics); Objects.requireNonNull(searchOperationListener); Objects.requireNonNull(slowLogFieldProvider); diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index eb4f2c2543475..71ea23725bb4a 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -117,6 +117,7 @@ import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.index.SlowLogFields; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.SourceFieldMetrics; import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics; @@ -799,6 +800,9 @@ private void construct( threadPool::relativeTimeInMillis ); MapperMetrics mapperMetrics = new MapperMetrics(sourceFieldMetrics); + + MergeMetrics mergeMetrics = new MergeMetrics(telemetryProvider.getMeterRegistry()); + final List searchOperationListeners = List.of( new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry()) ); @@ -887,6 +891,7 @@ public Map queryFields() { .valuesSourceRegistry(searchModule.getValuesSourceRegistry()) .requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator()) .mapperMetrics(mapperMetrics) + .mergeMetrics(mergeMetrics) .searchOperationListeners(searchOperationListeners) .slowLogFieldProvider(slowLogFieldProvider) .build(); diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 043b982ad4344..d3db5257287b6 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -60,6 +60,7 @@ import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.fielddata.IndexFieldDataCache; @@ -250,7 +251,8 @@ public void testWrapperIsBound() throws IOException { mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, emptyList(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); module.setReaderWrapper(s -> new Wrapper()); @@ -279,7 +281,8 @@ public void testRegisterIndexStore() throws IOException { mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, emptyList(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); final IndexService indexService = newIndexService(module); @@ -306,7 +309,8 @@ public void testDirectoryWrapper() throws IOException { mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, emptyList(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); module.setDirectoryWrapper(new TestDirectoryWrapper()); @@ -661,7 +665,8 @@ public void testRegisterCustomRecoveryStateFactory() throws IOException { mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, emptyList(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); final IndexService indexService = newIndexService(module); @@ -685,7 +690,8 @@ public void testIndexCommitListenerIsBound() throws IOException, ExecutionExcept mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, emptyList(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong(); @@ -789,7 +795,8 @@ private static IndexModule createIndexModule( mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, emptyList(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 2f2d7fa142976..5a08c2b5d71b1 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3633,7 +3633,8 @@ public void testRecoverFromForeignTranslog() throws IOException { null, true, config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); @@ -7197,7 +7198,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); try (InternalEngine engine = createEngine(configWithWarmer)) { assertThat(warmedUpReaders, empty()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 01a6150fd140c..6523890c61af1 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -62,7 +62,8 @@ public void testMergesExecuteInSizeOrder() throws IOException { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { List executedMergesList = new ArrayList<>(); @@ -105,7 +106,8 @@ public void testSimpleMergeTaskBacklogging() { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ); // more merge tasks than merge threads int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5); @@ -139,7 +141,8 @@ public void testSimpleMergeTaskReEnqueueingBySize() { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ); // sort backlogged merges by size PriorityQueue backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize)); @@ -351,7 +354,8 @@ public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { MergeSource mergeSource = mock(MergeSource.class); @@ -425,7 +429,8 @@ public void testMergesRunConcurrently() throws Exception { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + MergeMetrics.NOOP ) ) { // at least 1 extra merge than there are concurrently allowed @@ -510,7 +515,8 @@ public void testSchedulerCloseWaitsForRunningMerge() throws Exception { new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", settings), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mock(MergeMetrics.class) ) ) { CountDownLatch mergeDoneLatch = new CountDownLatch(1); @@ -574,6 +580,7 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress(); OneMerge oneMerge = mock(OneMerge.class); + MergeMetrics mergeMetrics = mock(MergeMetrics.class); when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong())); when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress); MergeSource mergeSource = mock(MergeSource.class); @@ -583,7 +590,8 @@ public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exce new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -602,6 +610,7 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settingsBuilder.build()); MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress(); OneMerge oneMerge = mock(OneMerge.class); + MergeMetrics mergeMetrics = mock(MergeMetrics.class); // forced merge with a set number of segments when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong(), randomNonNegativeInt())); when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress); @@ -613,7 +622,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ) ) { threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); @@ -630,7 +640,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ) ) { // merge submitted upon closing @@ -647,7 +658,8 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { new ShardId("index", "_na_", 1), indexSettings, threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ) ) { // merge submitted upon closing @@ -664,12 +676,14 @@ public void testAutoIOThrottleForMergeTasks() throws Exception { public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() { ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + MergeMetrics mergeMetrics = mock(MergeMetrics.class); // build a scheduler that always returns true for shouldSkipMerge ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( new ShardId("index", "_na_", 1), IndexSettingsModule.newIndexSettings("index", Settings.builder().build()), threadPoolMergeExecutorService, - merge -> 0 + merge -> 0, + mergeMetrics ) { @Override protected boolean shouldSkipMerge() { @@ -703,7 +717,7 @@ static class TestThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { IndexSettings indexSettings, ThreadPoolMergeExecutorService threadPoolMergeExecutorService ) { - super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0); + super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0, MergeMetrics.NOOP); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 10df837c8d4f2..cc682901876b6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5064,7 +5064,8 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); return new InternalEngine(configWithWarmer); }); @@ -5346,7 +5347,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {} config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); lazyEngineConfig.set(engineConfigWithBlockingRefreshListener); return new InternalEngine(engineConfigWithBlockingRefreshListener) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 699b5e93d79f9..06a4bcc91affc 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -167,7 +168,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) { null, true, EngineTestCase.createMapperService(), - new EngineResetLock() + new EngineResetLock(), + MergeMetrics.NOOP ); engine = new InternalEngine(config); EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 2daf1222748b4..d4d0460713001 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -144,6 +144,7 @@ import org.elasticsearch.index.IndexSettingProviders; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.analysis.AnalysisRegistry; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; @@ -2256,6 +2257,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { .client(client) .metaStateService(new MetaStateService(nodeEnv, namedXContentRegistry)) .mapperMetrics(MapperMetrics.NOOP) + .mergeMetrics(MergeMetrics.NOOP) .build(); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); snapshotShardsService = new SnapshotShardsService( diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 9bb6696b1ee6d..10517afd05d83 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -166,6 +166,7 @@ public abstract class EngineTestCase extends ESTestCase { protected InternalEngine engine; protected InternalEngine replicaEngine; + protected MergeMetrics mergeMetrics; protected IndexSettings defaultSettings; protected String codecName; @@ -258,6 +259,7 @@ public void setUp() throws Exception { primaryTranslogDir = createTempDir("translog-primary"); mapperService = createMapperService(defaultSettings.getSettings(), defaultMapping(), extraMappers()); translogHandler = createTranslogHandler(mapperService); + mergeMetrics = MergeMetrics.NOOP; engine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy()); LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig(); @@ -308,7 +310,8 @@ public static EngineConfig copy(EngineConfig config, LongSupplier globalCheckpoi config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } @@ -342,7 +345,8 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } @@ -376,7 +380,8 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } @@ -882,7 +887,8 @@ public EngineConfig config( indexCommitListener, true, mapperService, - new EngineResetLock() + new EngineResetLock(), + mergeMetrics ); } @@ -924,7 +930,8 @@ protected EngineConfig config(EngineConfig config, Store store, Path translogPat config.getIndexCommitListener(), config.isPromotableToPrimary(), config.getMapperService(), - config.getEngineResetLock() + config.getEngineResetLock(), + config.getMergeMetrics() ); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 89ce1f4eb06cd..9e2004f1602a0 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.mapper.MapperMetrics; @@ -554,7 +555,8 @@ protected IndexShard newShard( relativeTimeSupplier, null, MapperMetrics.NOOP, - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); success = true; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index b6ced318c1699..f70d72044328d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.engine.TranslogHandler; @@ -275,7 +276,8 @@ public void onFailedEngine(String reason, Exception e) { null, true, mapperService, - new EngineResetLock() + new EngineResetLock(), + MergeMetrics.NOOP ); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java index 4eac3ddf85f1b..12bfdc5258480 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.indices.TestIndexNameExpressionResolver; @@ -461,7 +462,8 @@ public void testOnIndexModuleIsNoOpWithSecurityDisabled() throws Exception { mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, List.of(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); security.onIndexModule(indexModule); // indexReaderWrapper is a SetOnce so if Security#onIndexModule had already set an ReaderWrapper we would get an exception here diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java index bd8d15ea809fe..9c2f7b54562e3 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherPluginTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.index.SlowLogFieldProvider; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.MergeMetrics; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.shard.IndexingStatsSettings; import org.elasticsearch.indices.SystemIndexDescriptor; @@ -74,7 +75,8 @@ public void testWatcherDisabledTests() throws Exception { mock(SlowLogFieldProvider.class), MapperMetrics.NOOP, List.of(), - new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()) + new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()), + MergeMetrics.NOOP ); // this will trip an assertion if the watcher indexing operation listener is null (which it is) but we try to add it watcher.onIndexModule(indexModule);