Skip to content

Commit 1b15e55

Browse files
Bring over merge metrics from stateless
1 parent 41f186d commit 1b15e55

File tree

23 files changed

+270
-45
lines changed

23 files changed

+270
-45
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.index.VersionType;
5050
import org.elasticsearch.index.engine.CommitStats;
5151
import org.elasticsearch.index.engine.Engine;
52+
import org.elasticsearch.index.engine.MergeMetrics;
5253
import org.elasticsearch.index.engine.NoOpEngine;
5354
import org.elasticsearch.index.flush.FlushStats;
5455
import org.elasticsearch.index.mapper.MapperMetrics;
@@ -638,7 +639,8 @@ public static final IndexShard newIndexShard(
638639
System::nanoTime,
639640
null,
640641
MapperMetrics.NOOP,
641-
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings())
642+
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
643+
MergeMetrics.NOOP
642644
);
643645
}
644646

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8888
config.getIndexCommitListener(),
8989
config.isPromotableToPrimary(),
9090
config.getMapperService(),
91-
config.getEngineResetLock()
91+
config.getEngineResetLock(),
92+
config.getMergeMetrics()
9293
);
9394
}
9495

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.cache.query.QueryCache;
4444
import org.elasticsearch.index.engine.Engine;
4545
import org.elasticsearch.index.engine.EngineFactory;
46+
import org.elasticsearch.index.engine.MergeMetrics;
4647
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
4748
import org.elasticsearch.index.mapper.IdFieldMapper;
4849
import org.elasticsearch.index.mapper.MapperMetrics;
@@ -179,6 +180,7 @@ public interface DirectoryWrapper {
179180
private final SetOnce<Engine.IndexCommitListener> indexCommitListener = new SetOnce<>();
180181
private final MapperMetrics mapperMetrics;
181182
private final IndexingStatsSettings indexingStatsSettings;
183+
private final MergeMetrics mergeMetrics;
182184

183185
/**
184186
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -188,6 +190,7 @@ public interface DirectoryWrapper {
188190
* @param analysisRegistry the analysis registry
189191
* @param engineFactory the engine factory
190192
* @param directoryFactories the available store types
193+
* @param mergeMetrics
191194
*/
192195
public IndexModule(
193196
final IndexSettings indexSettings,
@@ -200,7 +203,8 @@ public IndexModule(
200203
final SlowLogFieldProvider slowLogFieldProvider,
201204
final MapperMetrics mapperMetrics,
202205
final List<SearchOperationListener> searchOperationListeners,
203-
final IndexingStatsSettings indexingStatsSettings
206+
final IndexingStatsSettings indexingStatsSettings,
207+
final MergeMetrics mergeMetrics
204208
) {
205209
this.indexSettings = indexSettings;
206210
this.analysisRegistry = analysisRegistry;
@@ -216,6 +220,7 @@ public IndexModule(
216220
this.recoveryStateFactories = recoveryStateFactories;
217221
this.mapperMetrics = mapperMetrics;
218222
this.indexingStatsSettings = indexingStatsSettings;
223+
this.mergeMetrics = mergeMetrics;
219224
}
220225

221226
/**
@@ -552,7 +557,8 @@ public IndexService newIndexService(
552557
indexCommitListener.get(),
553558
mapperMetrics,
554559
queryRewriteInterceptor,
555-
indexingStatsSettings
560+
indexingStatsSettings,
561+
mergeMetrics
556562
);
557563
success = true;
558564
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.index.cache.query.QueryCache;
5050
import org.elasticsearch.index.engine.Engine;
5151
import org.elasticsearch.index.engine.EngineFactory;
52+
import org.elasticsearch.index.engine.MergeMetrics;
5253
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
5354
import org.elasticsearch.index.fielddata.FieldDataContext;
5455
import org.elasticsearch.index.fielddata.IndexFieldData;
@@ -170,6 +171,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
170171
private final MapperMetrics mapperMetrics;
171172
private final QueryRewriteInterceptor queryRewriteInterceptor;
172173
private final IndexingStatsSettings indexingStatsSettings;
174+
private final MergeMetrics mergeMetrics;
173175

174176
@SuppressWarnings("this-escape")
175177
public IndexService(
@@ -207,7 +209,8 @@ public IndexService(
207209
Engine.IndexCommitListener indexCommitListener,
208210
MapperMetrics mapperMetrics,
209211
QueryRewriteInterceptor queryRewriteInterceptor,
210-
IndexingStatsSettings indexingStatsSettings
212+
IndexingStatsSettings indexingStatsSettings,
213+
MergeMetrics mergeMetrics
211214
) {
212215
super(indexSettings);
213216
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
@@ -293,6 +296,7 @@ public IndexService(
293296
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
294297
}
295298
this.indexingStatsSettings = indexingStatsSettings;
299+
this.mergeMetrics = mergeMetrics;
296300
updateFsyncTaskIfNecessary();
297301
}
298302

@@ -583,7 +587,8 @@ public synchronized IndexShard createShard(
583587
System::nanoTime,
584588
indexCommitListener,
585589
mapperMetrics,
586-
indexingStatsSettings
590+
indexingStatsSettings,
591+
mergeMetrics
587592
);
588593
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
589594
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
149149

150150
private final EngineResetLock engineResetLock;
151151

152+
private final MergeMetrics mergeMetrics;
153+
152154
/**
153155
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
154156
*/
@@ -181,7 +183,8 @@ public EngineConfig(
181183
Engine.IndexCommitListener indexCommitListener,
182184
boolean promotableToPrimary,
183185
MapperService mapperService,
184-
EngineResetLock engineResetLock
186+
EngineResetLock engineResetLock,
187+
MergeMetrics mergeMetrics
185188
) {
186189
this.shardId = shardId;
187190
this.indexSettings = indexSettings;
@@ -229,6 +232,7 @@ public EngineConfig(
229232
// always use compound on flush - reduces # of file-handles on refresh
230233
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
231234
this.engineResetLock = engineResetLock;
235+
this.mergeMetrics = mergeMetrics;
232236
}
233237

234238
/**
@@ -477,4 +481,8 @@ public MapperService getMapperService() {
477481
public EngineResetLock getEngineResetLock() {
478482
return engineResetLock;
479483
}
484+
485+
public MergeMetrics getMergeMetrics() {
486+
return mergeMetrics;
487+
}
480488
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ public class InternalEngine extends Engine {
188188
private final CounterMetric numDocAppends = new CounterMetric();
189189
private final CounterMetric numDocUpdates = new CounterMetric();
190190
private final MeanMetric totalFlushTimeExcludingWaitingOnLock = new MeanMetric();
191+
private final MergeMetrics mergeMetrics;
191192

192193
private final NumericDocValuesField softDeletesField = Lucene.newSoftDeletesField();
193194
private final SoftDeletesPolicy softDeletesPolicy;
@@ -240,6 +241,7 @@ public InternalEngine(EngineConfig engineConfig) {
240241
InternalEngine(EngineConfig engineConfig, int maxDocs, BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
241242
super(engineConfig);
242243
this.maxDocs = maxDocs;
244+
this.mergeMetrics = engineConfig.getMergeMetrics();
243245
this.relativeTimeInNanosSupplier = config().getRelativeTimeInNanosSupplier();
244246
this.lastFlushTimestamp = relativeTimeInNanosSupplier.getAsLong(); // default to creation timestamp
245247
this.liveVersionMapArchive = createLiveVersionMapArchive();
@@ -2909,7 +2911,7 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
29092911
IndexSettings indexSettings,
29102912
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
29112913
) {
2912-
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
2914+
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, mergeMetrics);
29132915
}
29142916

29152917
@Override
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.MergePolicy;
13+
import org.elasticsearch.index.merge.OnGoingMerge;
14+
import org.elasticsearch.telemetry.TelemetryProvider;
15+
import org.elasticsearch.telemetry.metric.LongCounter;
16+
import org.elasticsearch.telemetry.metric.LongHistogram;
17+
import org.elasticsearch.telemetry.metric.LongWithAttributes;
18+
import org.elasticsearch.telemetry.metric.MeterRegistry;
19+
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
public class MergeMetrics {
23+
24+
public static final String MERGE_SEGMENTS_SIZE = "es.merge.segments.size";
25+
public static final String MERGE_DOCS_TOTAL = "es.merge.docs.total";
26+
public static final String MERGE_SEGMENTS_QUEUED_USAGE = "es.merge.segments.queued.usage";
27+
public static final String MERGE_SEGMENTS_RUNNING_USAGE = "es.merge.segments.running.usage";
28+
public static final String MERGE_SEGMENTS_MERGED_SIZE = "es.merge.segments.merged.size";
29+
public static final String MERGE_QUEUED_ESTIMATED_MEMORY_SIZE = "es.merge.segments.memory.size";
30+
public static final String MERGE_TIME_IN_SECONDS = "es.merge.time";
31+
public static MergeMetrics NOOP = new MergeMetrics(TelemetryProvider.NOOP.getMeterRegistry());
32+
33+
private final LongCounter mergeSizeInBytes;
34+
private final LongCounter mergeMergedSegmentSizeInBytes;
35+
private final LongCounter mergeNumDocs;
36+
private final LongHistogram mergeTimeInSeconds;
37+
38+
private final AtomicLong runningMergeSizeInBytes = new AtomicLong();
39+
private final AtomicLong queuedMergeSizeInBytes = new AtomicLong();
40+
private final AtomicLong queuedEstimatedMergeMemoryInBytes = new AtomicLong();
41+
42+
public MergeMetrics(MeterRegistry meterRegistry) {
43+
mergeSizeInBytes = meterRegistry.registerLongCounter(MERGE_SEGMENTS_SIZE, "Total size of segments merged", "bytes");
44+
meterRegistry.registerLongGauge(
45+
MERGE_SEGMENTS_QUEUED_USAGE,
46+
"Total usage of segments queued to be merged",
47+
"bytes",
48+
() -> new LongWithAttributes(queuedMergeSizeInBytes.get())
49+
);
50+
meterRegistry.registerLongGauge(
51+
MERGE_SEGMENTS_RUNNING_USAGE,
52+
"Total usage of segments currently being merged",
53+
"bytes",
54+
() -> new LongWithAttributes(runningMergeSizeInBytes.get())
55+
);
56+
mergeMergedSegmentSizeInBytes = meterRegistry.registerLongCounter(
57+
MERGE_SEGMENTS_MERGED_SIZE,
58+
"Total size of the new merged segments",
59+
"bytes"
60+
);
61+
mergeNumDocs = meterRegistry.registerLongCounter(MERGE_DOCS_TOTAL, "Total number of documents merged", "documents");
62+
mergeTimeInSeconds = meterRegistry.registerLongHistogram(MERGE_TIME_IN_SECONDS, "Merge time in seconds", "seconds");
63+
meterRegistry.registerLongGauge(
64+
MERGE_QUEUED_ESTIMATED_MEMORY_SIZE,
65+
"Estimated memory usage for queued merges",
66+
"bytes",
67+
() -> new LongWithAttributes(queuedEstimatedMergeMemoryInBytes.get())
68+
);
69+
}
70+
71+
public void incrementQueuedMergeBytes(OnGoingMerge currentMerge, long estimatedMemorySize) {
72+
queuedMergeSizeInBytes.getAndAdd(currentMerge.getTotalBytesSize());
73+
queuedEstimatedMergeMemoryInBytes.getAndAdd(estimatedMemorySize);
74+
}
75+
76+
public void moveQueuedMergeBytesToRunning(OnGoingMerge currentMerge, long estimatedMemorySize) {
77+
long totalSize = currentMerge.getTotalBytesSize();
78+
queuedMergeSizeInBytes.getAndAdd(-totalSize);
79+
runningMergeSizeInBytes.getAndAdd(totalSize);
80+
queuedEstimatedMergeMemoryInBytes.getAndAdd(-estimatedMemorySize);
81+
}
82+
83+
public void decrementRunningMergeBytes(OnGoingMerge currentMerge) {
84+
runningMergeSizeInBytes.getAndAdd(-currentMerge.getTotalBytesSize());
85+
}
86+
87+
public void markMergeMetrics(MergePolicy.OneMerge currentMerge, long mergedSegmentSize, long tookMillis) {
88+
mergeSizeInBytes.incrementBy(currentMerge.totalBytesSize());
89+
mergeMergedSegmentSizeInBytes.incrementBy(mergedSegmentSize);
90+
mergeNumDocs.incrementBy(currentMerge.totalNumDocs());
91+
mergeTimeInSeconds.record(tookMillis / 1000);
92+
}
93+
94+
public long getQueuedMergeSizeInBytes() {
95+
return queuedMergeSizeInBytes.get();
96+
}
97+
98+
public long getRunningMergeSizeInBytes() {
99+
return runningMergeSizeInBytes.get();
100+
}
101+
}

server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import org.elasticsearch.index.merge.OnGoingMerge;
3131
import org.elasticsearch.index.shard.ShardId;
3232

33+
import java.io.FileNotFoundException;
3334
import java.io.IOException;
35+
import java.nio.file.NoSuchFileException;
3436
import java.util.Comparator;
3537
import java.util.HashMap;
3638
import java.util.Locale;
@@ -52,6 +54,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
5254
private final MergeSchedulerConfig config;
5355
protected final Logger logger;
5456
private final MergeTracking mergeTracking;
57+
private final MergeMetrics mergeMetrics;
5558
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
5659
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
5760
16,
@@ -74,16 +77,19 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
7477
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
7578
* @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
7679
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
80+
* @param mergeMetrics
7781
*/
7882
public ThreadPoolMergeScheduler(
7983
ShardId shardId,
8084
IndexSettings indexSettings,
8185
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
82-
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
86+
MergeMemoryEstimateProvider mergeMemoryEstimateProvider,
87+
MergeMetrics mergeMetrics
8388
) {
8489
this.shardId = shardId;
8590
this.config = indexSettings.getMergeSchedulerConfig();
8691
this.logger = Loggers.getLogger(getClass(), shardId);
92+
this.mergeMetrics = mergeMetrics;
8793
this.mergeTracking = new MergeTracking(
8894
logger,
8995
() -> this.config.isAutoThrottle()
@@ -214,6 +220,7 @@ protected void handleMergeException(Throwable t) {
214220
boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
215221
try {
216222
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
223+
mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes());
217224
mergeQueued(mergeTask.onGoingMerge);
218225
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
219226
} finally {
@@ -297,6 +304,7 @@ synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) {
297304

298305
private void mergeTaskDone(OnGoingMerge merge) {
299306
doneMergeTaskCount.incrementAndGet();
307+
mergeMetrics.decrementRunningMergeBytes(merge);
300308
mergeExecutedOrAborted(merge);
301309
checkMergeTaskThrottling();
302310
}
@@ -418,18 +426,21 @@ public void run() {
418426
assert isRunning() == false;
419427
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
420428
: "runNowOrBacklog must be invoked before actually running the merge task";
429+
boolean success = false;
421430
try {
422431
beforeMerge(onGoingMerge);
423432
try {
424433
if (mergeStartTimeNS.compareAndSet(0L, System.nanoTime()) == false) {
425434
throw new IllegalStateException("The merge task is already started or aborted");
426435
}
427436
mergeTracking.mergeStarted(onGoingMerge);
437+
mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes);
428438
if (verbose()) {
429439
message(String.format(Locale.ROOT, "merge task %s start", this));
430440
}
431441
try {
432442
doMerge(mergeSource, onGoingMerge.getMerge());
443+
success = onGoingMerge.getMerge().isAborted() == false;
433444
if (verbose()) {
434445
message(
435446
String.format(
@@ -449,6 +460,10 @@ public void run() {
449460
}
450461
} finally {
451462
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get());
463+
if (success) {
464+
long newSegmentSize = getNewSegmentSize(onGoingMerge.getMerge());
465+
mergeMetrics.markMergeMetrics(onGoingMerge.getMerge(), newSegmentSize, tookMS);
466+
}
452467
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
453468
}
454469
} finally {
@@ -523,6 +538,24 @@ public OnGoingMerge getOnGoingMerge() {
523538
return onGoingMerge;
524539
}
525540

541+
private static long getNewSegmentSize(MergePolicy.OneMerge currentMerge) {
542+
try {
543+
return currentMerge.getMergeInfo().sizeInBytes();
544+
} catch (FileNotFoundException | NoSuchFileException e) {
545+
// It is (rarely) possible that the merged segment could be merged away by the IndexWriter prior to reaching this point.
546+
// Once the IW creates the new segment, it could be exposed to be included in a new merge. That merge can be executed
547+
// concurrently if more than 1 merge threads are configured. That new merge allows this IW to delete segment created by
548+
// this merge. Although the files may still be available in the object store for executing searches, the IndexDirectory
549+
// will no longer have references to the underlying segment files and will throw file not found if we try to read them.
550+
// In this case, we will ignore that exception (which would otherwise fail the shard) and use the originally estimated
551+
// merge size for metrics.
552+
return currentMerge.estimatedMergeBytes;
553+
} catch (IOException e) {
554+
// TODO how to handle?
555+
return currentMerge.estimatedMergeBytes;
556+
}
557+
}
558+
526559
@Override
527560
public String toString() {
528561
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");

0 commit comments

Comments
 (0)