Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.MapperMetrics;
Expand Down Expand Up @@ -638,7 +639,8 @@ public static final IndexShard newIndexShard(
System::nanoTime,
null,
MapperMetrics.NOOP,
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService(),
config.getEngineResetLock()
config.getEngineResetLock(),
config.getMergeMetrics()
);
}

Expand Down
10 changes: 8 additions & 2 deletions server/src/main/java/org/elasticsearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperMetrics;
Expand Down Expand Up @@ -179,6 +180,7 @@ public interface DirectoryWrapper {
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
private final MapperMetrics mapperMetrics;
private final IndexingStatsSettings indexingStatsSettings;
private final MergeMetrics mergeMetrics;

/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
Expand All @@ -188,6 +190,7 @@ public interface DirectoryWrapper {
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param directoryFactories the available store types
* @param mergeMetrics
*/
public IndexModule(
final IndexSettings indexSettings,
Expand All @@ -200,7 +203,8 @@ public IndexModule(
final SlowLogFieldProvider slowLogFieldProvider,
final MapperMetrics mapperMetrics,
final List<SearchOperationListener> searchOperationListeners,
final IndexingStatsSettings indexingStatsSettings
final IndexingStatsSettings indexingStatsSettings,
final MergeMetrics mergeMetrics
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -216,6 +220,7 @@ public IndexModule(
this.recoveryStateFactories = recoveryStateFactories;
this.mapperMetrics = mapperMetrics;
this.indexingStatsSettings = indexingStatsSettings;
this.mergeMetrics = mergeMetrics;
}

/**
Expand Down Expand Up @@ -552,7 +557,8 @@ public IndexService newIndexService(
indexCommitListener.get(),
mapperMetrics,
queryRewriteInterceptor,
indexingStatsSettings
indexingStatsSettings,
mergeMetrics
);
success = true;
return indexService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -170,6 +171,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final MapperMetrics mapperMetrics;
private final QueryRewriteInterceptor queryRewriteInterceptor;
private final IndexingStatsSettings indexingStatsSettings;
private final MergeMetrics mergeMetrics;

@SuppressWarnings("this-escape")
public IndexService(
Expand Down Expand Up @@ -207,7 +209,8 @@ public IndexService(
Engine.IndexCommitListener indexCommitListener,
MapperMetrics mapperMetrics,
QueryRewriteInterceptor queryRewriteInterceptor,
IndexingStatsSettings indexingStatsSettings
IndexingStatsSettings indexingStatsSettings,
MergeMetrics mergeMetrics
) {
super(indexSettings);
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
Expand Down Expand Up @@ -293,6 +296,7 @@ public IndexService(
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
}
this.indexingStatsSettings = indexingStatsSettings;
this.mergeMetrics = mergeMetrics;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -583,7 +587,8 @@ public synchronized IndexShard createShard(
System::nanoTime,
indexCommitListener,
mapperMetrics,
indexingStatsSettings
indexingStatsSettings,
mergeMetrics
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {

private final EngineResetLock engineResetLock;

private final MergeMetrics mergeMetrics;

/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
Expand Down Expand Up @@ -181,7 +183,8 @@ public EngineConfig(
Engine.IndexCommitListener indexCommitListener,
boolean promotableToPrimary,
MapperService mapperService,
EngineResetLock engineResetLock
EngineResetLock engineResetLock,
MergeMetrics mergeMetrics
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
Expand Down Expand Up @@ -229,6 +232,7 @@ public EngineConfig(
// always use compound on flush - reduces # of file-handles on refresh
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
this.engineResetLock = engineResetLock;
this.mergeMetrics = mergeMetrics;
}

/**
Expand Down Expand Up @@ -477,4 +481,8 @@ public MapperService getMapperService() {
public EngineResetLock getEngineResetLock() {
return engineResetLock;
}

public MergeMetrics getMergeMetrics() {
return mergeMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public class InternalEngine extends Engine {
private final CounterMetric numDocAppends = new CounterMetric();
private final CounterMetric numDocUpdates = new CounterMetric();
private final MeanMetric totalFlushTimeExcludingWaitingOnLock = new MeanMetric();
private final MergeMetrics mergeMetrics;

private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
private final SoftDeletesPolicy softDeletesPolicy;
Expand Down Expand Up @@ -239,6 +240,7 @@ public InternalEngine(EngineConfig engineConfig) {
InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
super(engineConfig);
this.maxDocs = maxDocs;
this.mergeMetrics = engineConfig.getMergeMetrics();
this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier();
this.lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); // default to creation timestamp
this.liveVersionMapArchive = createLiveVersionMapArchive();
Expand Down Expand Up @@ -2914,7 +2916,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, mergeMetrics);
}

@Override
Expand Down
101 changes: 101 additions & 0 deletions server/src/main/java/org/elasticsearch/index/engine/MergeMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index.engine;

import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;

import java.util.concurrent.atomic.AtomicLong;

public class MergeMetrics {

public static final String MERGE_SEGMENTS_SIZE = "es.merge.segments.size";
public static final String MERGE_DOCS_TOTAL = "es.merge.docs.total";
public static final String MERGE_SEGMENTS_QUEUED_USAGE = "es.merge.segments.queued.usage";
public static final String MERGE_SEGMENTS_RUNNING_USAGE = "es.merge.segments.running.usage";
public static final String MERGE_SEGMENTS_MERGED_SIZE = "es.merge.segments.merged.size";
public static final String MERGE_QUEUED_ESTIMATED_MEMORY_SIZE = "es.merge.segments.memory.size";
public static final String MERGE_TIME_IN_SECONDS = "es.merge.time";
public static MergeMetrics NOOP = new MergeMetrics(TelemetryProvider.NOOP.getMeterRegistry());

private final LongCounter mergeSizeInBytes;
private final LongCounter mergeMergedSegmentSizeInBytes;
private final LongCounter mergeNumDocs;
private final LongHistogram mergeTimeInSeconds;

private final AtomicLong runningMergeSizeInBytes = new AtomicLong();
private final AtomicLong queuedMergeSizeInBytes = new AtomicLong();
private final AtomicLong queuedEstimatedMergeMemoryInBytes = new AtomicLong();

public MergeMetrics(MeterRegistry meterRegistry) {
mergeSizeInBytes = meterRegistry.registerLongCounter(MERGE_SEGMENTS_SIZE, "Total size of segments merged", "bytes");
meterRegistry.registerLongGauge(
MERGE_SEGMENTS_QUEUED_USAGE,
"Total usage of segments queued to be merged",
"bytes",
() -> new LongWithAttributes(queuedMergeSizeInBytes.get())
);
meterRegistry.registerLongGauge(
MERGE_SEGMENTS_RUNNING_USAGE,
"Total usage of segments currently being merged",
"bytes",
() -> new LongWithAttributes(runningMergeSizeInBytes.get())
);
mergeMergedSegmentSizeInBytes = meterRegistry.registerLongCounter(
MERGE_SEGMENTS_MERGED_SIZE,
"Total size of the new merged segments",
"bytes"
);
mergeNumDocs = meterRegistry.registerLongCounter(MERGE_DOCS_TOTAL, "Total number of documents merged", "documents");
mergeTimeInSeconds = meterRegistry.registerLongHistogram(MERGE_TIME_IN_SECONDS, "Merge time in seconds", "seconds");
meterRegistry.registerLongGauge(
MERGE_QUEUED_ESTIMATED_MEMORY_SIZE,
"Estimated memory usage for queued merges",
"bytes",
() -> new LongWithAttributes(queuedEstimatedMergeMemoryInBytes.get())
);
}

public void incrementQueuedMergeBytes(OnGoingMerge currentMerge, long estimatedMemorySize) {
queuedMergeSizeInBytes.getAndAdd(currentMerge.getTotalBytesSize());
queuedEstimatedMergeMemoryInBytes.getAndAdd(estimatedMemorySize);
}

public void moveQueuedMergeBytesToRunning(OnGoingMerge currentMerge, long estimatedMemorySize) {
long totalSize = currentMerge.getTotalBytesSize();
queuedMergeSizeInBytes.getAndAdd(-totalSize);
runningMergeSizeInBytes.getAndAdd(totalSize);
queuedEstimatedMergeMemoryInBytes.getAndAdd(-estimatedMemorySize);
}

public void decrementRunningMergeBytes(OnGoingMerge currentMerge) {
runningMergeSizeInBytes.getAndAdd(-currentMerge.getTotalBytesSize());
}

public void markMergeMetrics(MergePolicy.OneMerge currentMerge, long mergedSegmentSize, long tookMillis) {
mergeSizeInBytes.incrementBy(currentMerge.totalBytesSize());
mergeMergedSegmentSizeInBytes.incrementBy(mergedSegmentSize);
mergeNumDocs.incrementBy(currentMerge.totalNumDocs());
mergeTimeInSeconds.record(tookMillis / 1000);
}

public long getQueuedMergeSizeInBytes() {
return queuedMergeSizeInBytes.get();
}

public long getRunningMergeSizeInBytes() {
return runningMergeSizeInBytes.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.ShardId;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Locale;
Expand All @@ -52,6 +54,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final MergeSchedulerConfig config;
protected final Logger logger;
private final MergeTracking mergeTracking;
private final MergeMetrics mergeMetrics;
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
16,
Expand All @@ -74,16 +77,19 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
* @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
* @param mergeMetrics
*/
public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
MergeMemoryEstimateProvider mergeMemoryEstimateProvider,
MergeMetrics mergeMetrics
) {
this.shardId = shardId;
this.config = indexSettings.getMergeSchedulerConfig();
this.logger = Loggers.getLogger(getClass(), shardId);
this.mergeMetrics = mergeMetrics;
this.mergeTracking = new MergeTracking(
logger,
() -> this.config.isAutoThrottle()
Expand Down Expand Up @@ -214,6 +220,7 @@ protected void handleMergeException(Throwable t) {
boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
try {
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes());
mergeQueued(mergeTask.onGoingMerge);
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
} finally {
Expand Down Expand Up @@ -297,6 +304,7 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) {

private void mergeTaskDone(OnGoingMerge merge) {
doneMergeTaskCount.incrementAndGet();
mergeMetrics.decrementRunningMergeBytes(merge);
mergeExecutedOrAborted(merge);
checkMergeTaskThrottling();
}
Expand Down Expand Up @@ -418,18 +426,21 @@ public void run() {
assert isRunning() == false;
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
: "runNowOrBacklog must be invoked before actually running the merge task";
boolean success = false;
try {
beforeMerge(onGoingMerge);
try {
if (mergeStartTimeNS.compareAndSet(0L, System.nanoTime()) == false) {
throw new IllegalStateException("The merge task is already started or aborted");
}
mergeTracking.mergeStarted(onGoingMerge);
mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes);
if (verbose()) {
message(String.format(Locale.ROOT, "merge task %s start", this));
}
try {
doMerge(mergeSource, onGoingMerge.getMerge());
success = onGoingMerge.getMerge().isAborted() == false;
if (verbose()) {
message(
String.format(
Expand All @@ -449,6 +460,10 @@ public void run() {
}
} finally {
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get());
if (success) {
long newSegmentSize = getNewSegmentSize(onGoingMerge.getMerge());
mergeMetrics.markMergeMetrics(onGoingMerge.getMerge(), newSegmentSize, tookMS);
}
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
}
} finally {
Expand Down Expand Up @@ -523,6 +538,24 @@ public OnGoingMerge getOnGoingMerge() {
return onGoingMerge;
}

private static long getNewSegmentSize(MergePolicy.OneMerge currentMerge) {
try {
return currentMerge.getMergeInfo().sizeInBytes();
} catch (FileNotFoundException | NoSuchFileException e) {
// It is (rarely) possible that the merged segment could be merged away by the IndexWriter prior to reaching this point.
// Once the IW creates the new segment, it could be exposed to be included in a new merge. That merge can be executed
// concurrently if more than 1 merge threads are configured. That new merge allows this IW to delete segment created by
// this merge. Although the files may still be available in the object store for executing searches, the IndexDirectory
// will no longer have references to the underlying segment files and will throw file not found if we try to read them.
// In this case, we will ignore that exception (which would otherwise fail the shard) and use the originally estimated
// merge size for metrics.
return currentMerge.estimatedMergeBytes;
} catch (IOException e) {
// TODO how to handle?
return currentMerge.estimatedMergeBytes;
}
}

@Override
public String toString() {
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");
Expand Down
Loading