Skip to content

Commit 51e52f3

Browse files
BrianRothermichmridula-s109
authored andcommitted
Bring over merge metrics from stateless (elastic#128617)
Relates to an effort to combine the merge schedulers from stateless and stateful. The stateless merge scheduler has MergeMetrics that we want in both stateless and stateful. This PR copies over the merge metrics from the stateless merge scheduler into the combined merge scheduler. Relates ES-9687
1 parent 6341175 commit 51e52f3

File tree

24 files changed

+353
-71
lines changed

24 files changed

+353
-71
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,14 @@ class TestInternalEngine extends org.elasticsearch.index.engine.InternalEngine {
8888
protected ElasticsearchMergeScheduler createMergeScheduler(
8989
ShardId shardId,
9090
IndexSettings indexSettings,
91-
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService
91+
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
92+
MergeMetrics mergeMetrics
9293
) {
9394
ElasticsearchMergeScheduler mergeScheduler = super.createMergeScheduler(
9495
shardId,
9596
indexSettings,
96-
threadPoolMergeExecutorService
97+
threadPoolMergeExecutorService,
98+
mergeMetrics
9799
);
98100
assertThat(mergeScheduler, instanceOf(ThreadPoolMergeScheduler.class));
99101
// assert there is a single merge executor service for all shards

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.index.VersionType;
5353
import org.elasticsearch.index.engine.CommitStats;
5454
import org.elasticsearch.index.engine.Engine;
55+
import org.elasticsearch.index.engine.MergeMetrics;
5556
import org.elasticsearch.index.engine.NoOpEngine;
5657
import org.elasticsearch.index.flush.FlushStats;
5758
import org.elasticsearch.index.mapper.MapperMetrics;
@@ -680,7 +681,8 @@ public static final IndexShard newIndexShard(
680681
null,
681682
MapperMetrics.NOOP,
682683
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
683-
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
684+
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
685+
MergeMetrics.NOOP
684686
);
685687
}
686688

server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ EngineConfig engineConfigWithLargerIndexingMemory(EngineConfig config) {
8888
config.getIndexCommitListener(),
8989
config.isPromotableToPrimary(),
9090
config.getMapperService(),
91-
config.getEngineResetLock()
91+
config.getEngineResetLock(),
92+
config.getMergeMetrics()
9293
);
9394
}
9495

server/src/main/java/org/elasticsearch/index/IndexModule.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.cache.query.QueryCache;
4444
import org.elasticsearch.index.engine.Engine;
4545
import org.elasticsearch.index.engine.EngineFactory;
46+
import org.elasticsearch.index.engine.MergeMetrics;
4647
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
4748
import org.elasticsearch.index.mapper.IdFieldMapper;
4849
import org.elasticsearch.index.mapper.MapperMetrics;
@@ -181,6 +182,7 @@ public interface DirectoryWrapper {
181182
private final MapperMetrics mapperMetrics;
182183
private final IndexingStatsSettings indexingStatsSettings;
183184
private final SearchStatsSettings searchStatsSettings;
185+
private final MergeMetrics mergeMetrics;
184186

185187
/**
186188
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@@ -190,6 +192,7 @@ public interface DirectoryWrapper {
190192
* @param analysisRegistry the analysis registry
191193
* @param engineFactory the engine factory
192194
* @param directoryFactories the available store types
195+
* @param mergeMetrics
193196
*/
194197
public IndexModule(
195198
final IndexSettings indexSettings,
@@ -203,7 +206,8 @@ public IndexModule(
203206
final MapperMetrics mapperMetrics,
204207
final List<SearchOperationListener> searchOperationListeners,
205208
final IndexingStatsSettings indexingStatsSettings,
206-
final SearchStatsSettings searchStatsSettings
209+
final SearchStatsSettings searchStatsSettings,
210+
final MergeMetrics mergeMetrics
207211
) {
208212
this.indexSettings = indexSettings;
209213
this.analysisRegistry = analysisRegistry;
@@ -220,6 +224,7 @@ public IndexModule(
220224
this.mapperMetrics = mapperMetrics;
221225
this.indexingStatsSettings = indexingStatsSettings;
222226
this.searchStatsSettings = searchStatsSettings;
227+
this.mergeMetrics = mergeMetrics;
223228
}
224229

225230
/**
@@ -557,7 +562,8 @@ public IndexService newIndexService(
557562
mapperMetrics,
558563
queryRewriteInterceptor,
559564
indexingStatsSettings,
560-
searchStatsSettings
565+
searchStatsSettings,
566+
mergeMetrics
561567
);
562568
success = true;
563569
return indexService;

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.index.cache.query.QueryCache;
5050
import org.elasticsearch.index.engine.Engine;
5151
import org.elasticsearch.index.engine.EngineFactory;
52+
import org.elasticsearch.index.engine.MergeMetrics;
5253
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
5354
import org.elasticsearch.index.fielddata.FieldDataContext;
5455
import org.elasticsearch.index.fielddata.IndexFieldData;
@@ -172,6 +173,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
172173
private final QueryRewriteInterceptor queryRewriteInterceptor;
173174
private final IndexingStatsSettings indexingStatsSettings;
174175
private final SearchStatsSettings searchStatsSettings;
176+
private final MergeMetrics mergeMetrics;
175177

176178
@SuppressWarnings("this-escape")
177179
public IndexService(
@@ -210,7 +212,8 @@ public IndexService(
210212
MapperMetrics mapperMetrics,
211213
QueryRewriteInterceptor queryRewriteInterceptor,
212214
IndexingStatsSettings indexingStatsSettings,
213-
SearchStatsSettings searchStatsSettings
215+
SearchStatsSettings searchStatsSettings,
216+
MergeMetrics mergeMetrics
214217
) {
215218
super(indexSettings);
216219
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
@@ -297,6 +300,7 @@ public IndexService(
297300
}
298301
this.indexingStatsSettings = indexingStatsSettings;
299302
this.searchStatsSettings = searchStatsSettings;
303+
this.mergeMetrics = mergeMetrics;
300304
updateFsyncTaskIfNecessary();
301305
}
302306

@@ -588,7 +592,8 @@ public synchronized IndexShard createShard(
588592
indexCommitListener,
589593
mapperMetrics,
590594
indexingStatsSettings,
591-
searchStatsSettings
595+
searchStatsSettings,
596+
mergeMetrics
592597
);
593598
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
594599
eventListener.afterIndexShardCreated(indexShard);

server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ public Supplier<RetentionLeases> retentionLeasesSupplier() {
149149

150150
private final EngineResetLock engineResetLock;
151151

152+
private final MergeMetrics mergeMetrics;
153+
152154
/**
153155
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
154156
*/
@@ -181,7 +183,8 @@ public EngineConfig(
181183
Engine.IndexCommitListener indexCommitListener,
182184
boolean promotableToPrimary,
183185
MapperService mapperService,
184-
EngineResetLock engineResetLock
186+
EngineResetLock engineResetLock,
187+
MergeMetrics mergeMetrics
185188
) {
186189
this.shardId = shardId;
187190
this.indexSettings = indexSettings;
@@ -229,6 +232,7 @@ public EngineConfig(
229232
// always use compound on flush - reduces # of file-handles on refresh
230233
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
231234
this.engineResetLock = engineResetLock;
235+
this.mergeMetrics = mergeMetrics;
232236
}
233237

234238
/**
@@ -477,4 +481,8 @@ public MapperService getMapperService() {
477481
public EngineResetLock getEngineResetLock() {
478482
return engineResetLock;
479483
}
484+
485+
public MergeMetrics getMergeMetrics() {
486+
return mergeMetrics;
487+
}
480488
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,8 @@ public InternalEngine(EngineConfig engineConfig) {
257257
mergeScheduler = createMergeScheduler(
258258
engineConfig.getShardId(),
259259
engineConfig.getIndexSettings(),
260-
engineConfig.getThreadPoolMergeExecutorService()
260+
engineConfig.getThreadPoolMergeExecutorService(),
261+
engineConfig.getMergeMetrics()
261262
);
262263
scheduler = mergeScheduler.getMergeScheduler();
263264
throttle = new IndexThrottle(pauseIndexingOnThrottle);
@@ -2908,10 +2909,11 @@ protected void doRun() {
29082909
protected ElasticsearchMergeScheduler createMergeScheduler(
29092910
ShardId shardId,
29102911
IndexSettings indexSettings,
2911-
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService
2912+
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
2913+
MergeMetrics mergeMetrics
29122914
) {
29132915
if (threadPoolMergeExecutorService != null) {
2914-
return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService);
2916+
return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService, mergeMetrics);
29152917
} else {
29162918
return new EngineConcurrentMergeScheduler(shardId, indexSettings);
29172919
}
@@ -2921,9 +2923,10 @@ private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeSchedu
29212923
EngineThreadPoolMergeScheduler(
29222924
ShardId shardId,
29232925
IndexSettings indexSettings,
2924-
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
2926+
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
2927+
MergeMetrics mergeMetrics
29252928
) {
2926-
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
2929+
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, mergeMetrics);
29272930
}
29282931

29292932
@Override
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.engine;
11+
12+
import org.apache.lucene.index.MergePolicy;
13+
import org.elasticsearch.index.merge.OnGoingMerge;
14+
import org.elasticsearch.telemetry.TelemetryProvider;
15+
import org.elasticsearch.telemetry.metric.LongCounter;
16+
import org.elasticsearch.telemetry.metric.LongHistogram;
17+
import org.elasticsearch.telemetry.metric.LongWithAttributes;
18+
import org.elasticsearch.telemetry.metric.MeterRegistry;
19+
20+
import java.util.concurrent.atomic.AtomicLong;
21+
22+
public class MergeMetrics {
23+
24+
public static final String MERGE_SEGMENTS_SIZE = "es.merge.segments.size";
25+
public static final String MERGE_DOCS_TOTAL = "es.merge.docs.total";
26+
public static final String MERGE_SEGMENTS_QUEUED_USAGE = "es.merge.segments.queued.usage";
27+
public static final String MERGE_SEGMENTS_RUNNING_USAGE = "es.merge.segments.running.usage";
28+
public static final String MERGE_SEGMENTS_MERGED_SIZE = "es.merge.segments.merged.size";
29+
public static final String MERGE_QUEUED_ESTIMATED_MEMORY_SIZE = "es.merge.segments.memory.size";
30+
public static final String MERGE_TIME_IN_SECONDS = "es.merge.time";
31+
public static MergeMetrics NOOP = new MergeMetrics(TelemetryProvider.NOOP.getMeterRegistry());
32+
33+
private final LongCounter mergeSizeInBytes;
34+
private final LongCounter mergeMergedSegmentSizeInBytes;
35+
private final LongCounter mergeNumDocs;
36+
private final LongHistogram mergeTimeInSeconds;
37+
38+
private final AtomicLong runningMergeSizeInBytes = new AtomicLong();
39+
private final AtomicLong queuedMergeSizeInBytes = new AtomicLong();
40+
private final AtomicLong queuedEstimatedMergeMemoryInBytes = new AtomicLong();
41+
42+
public MergeMetrics(MeterRegistry meterRegistry) {
43+
mergeSizeInBytes = meterRegistry.registerLongCounter(MERGE_SEGMENTS_SIZE, "Total size of segments merged", "bytes");
44+
meterRegistry.registerLongGauge(
45+
MERGE_SEGMENTS_QUEUED_USAGE,
46+
"Total usage of segments queued to be merged",
47+
"bytes",
48+
() -> new LongWithAttributes(queuedMergeSizeInBytes.get())
49+
);
50+
meterRegistry.registerLongGauge(
51+
MERGE_SEGMENTS_RUNNING_USAGE,
52+
"Total usage of segments currently being merged",
53+
"bytes",
54+
() -> new LongWithAttributes(runningMergeSizeInBytes.get())
55+
);
56+
mergeMergedSegmentSizeInBytes = meterRegistry.registerLongCounter(
57+
MERGE_SEGMENTS_MERGED_SIZE,
58+
"Total size of the new merged segments",
59+
"bytes"
60+
);
61+
mergeNumDocs = meterRegistry.registerLongCounter(MERGE_DOCS_TOTAL, "Total number of documents merged", "documents");
62+
mergeTimeInSeconds = meterRegistry.registerLongHistogram(MERGE_TIME_IN_SECONDS, "Merge time in seconds", "seconds");
63+
meterRegistry.registerLongGauge(
64+
MERGE_QUEUED_ESTIMATED_MEMORY_SIZE,
65+
"Estimated memory usage for queued merges",
66+
"bytes",
67+
() -> new LongWithAttributes(queuedEstimatedMergeMemoryInBytes.get())
68+
);
69+
}
70+
71+
public void incrementQueuedMergeBytes(OnGoingMerge currentMerge, long estimatedMemorySize) {
72+
queuedMergeSizeInBytes.getAndAdd(currentMerge.getTotalBytesSize());
73+
queuedEstimatedMergeMemoryInBytes.getAndAdd(estimatedMemorySize);
74+
}
75+
76+
public void moveQueuedMergeBytesToRunning(OnGoingMerge currentMerge, long estimatedMemorySize) {
77+
long totalSize = currentMerge.getTotalBytesSize();
78+
queuedMergeSizeInBytes.getAndAdd(-totalSize);
79+
runningMergeSizeInBytes.getAndAdd(totalSize);
80+
queuedEstimatedMergeMemoryInBytes.getAndAdd(-estimatedMemorySize);
81+
}
82+
83+
public void decrementRunningMergeBytes(OnGoingMerge currentMerge) {
84+
runningMergeSizeInBytes.getAndAdd(-currentMerge.getTotalBytesSize());
85+
}
86+
87+
public void markMergeMetrics(MergePolicy.OneMerge currentMerge, long mergedSegmentSize, long tookMillis) {
88+
mergeSizeInBytes.incrementBy(currentMerge.totalBytesSize());
89+
mergeMergedSegmentSizeInBytes.incrementBy(mergedSegmentSize);
90+
mergeNumDocs.incrementBy(currentMerge.totalNumDocs());
91+
mergeTimeInSeconds.record(tookMillis / 1000);
92+
}
93+
94+
public long getQueuedMergeSizeInBytes() {
95+
return queuedMergeSizeInBytes.get();
96+
}
97+
98+
public long getRunningMergeSizeInBytes() {
99+
return runningMergeSizeInBytes.get();
100+
}
101+
}

0 commit comments

Comments
 (0)