Skip to content

Commit b371590

Browse files
authored
Add shard write-load to cluster info (#131496)
Relates: ES-12419, ES-12420
1 parent 47bb535 commit b371590

File tree

11 files changed

+219
-37
lines changed

11 files changed

+219
-37
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
2828
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
2929
import org.elasticsearch.cluster.metadata.IndexMetadata;
30+
import org.elasticsearch.cluster.metadata.ProjectId;
3031
import org.elasticsearch.cluster.node.DiscoveryNode;
3132
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
3233
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -104,6 +105,7 @@
104105
import java.util.concurrent.atomic.AtomicReference;
105106
import java.util.function.Predicate;
106107
import java.util.stream.Collectors;
108+
import java.util.stream.IntStream;
107109
import java.util.stream.Stream;
108110

109111
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
@@ -355,6 +357,62 @@ public void testNodeWriteLoadsArePresent() {
355357
}
356358
}
357359

360+
public void testShardWriteLoadsArePresent() {
361+
// Create some indices and some write-load
362+
final int numIndices = randomIntBetween(1, 5);
363+
final String indexPrefix = randomIdentifier();
364+
IntStream.range(0, numIndices).forEach(i -> {
365+
final String indexName = indexPrefix + "_" + i;
366+
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)).build());
367+
IntStream.range(0, randomIntBetween(1, 500))
368+
.forEach(j -> prepareIndex(indexName).setSource("foo", randomIdentifier(), "bar", randomIdentifier()).get());
369+
});
370+
371+
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
372+
373+
// Not collecting stats yet because allocation write load stats collection is disabled by default.
374+
{
375+
ClusterInfoServiceUtils.refresh(clusterInfoService);
376+
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads();
377+
assertNotNull(shardWriteLoads);
378+
assertTrue(shardWriteLoads.isEmpty());
379+
}
380+
381+
// Turn on collection of write-load stats.
382+
updateClusterSettings(
383+
Settings.builder()
384+
.put(
385+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
386+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
387+
)
388+
.build()
389+
);
390+
391+
try {
392+
// Force a ClusterInfo refresh to run collection of the write-load stats.
393+
ClusterInfoServiceUtils.refresh(clusterInfoService);
394+
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads();
395+
396+
// Verify that each shard has write-load reported.
397+
final ClusterState state = getInstanceFromNode(ClusterService.class).state();
398+
assertEquals(state.projectState(ProjectId.DEFAULT).metadata().getTotalNumberOfShards(), shardWriteLoads.size());
399+
double maximumLoadRecorded = 0;
400+
for (IndexMetadata indexMetadata : state.projectState(ProjectId.DEFAULT).metadata()) {
401+
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
402+
final ShardId shardId = new ShardId(indexMetadata.getIndex(), i);
403+
assertTrue(shardWriteLoads.containsKey(shardId));
404+
maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded);
405+
}
406+
}
407+
// And that at least one is greater than zero
408+
assertThat(maximumLoadRecorded, greaterThan(0.0));
409+
} finally {
410+
updateClusterSettings(
411+
Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build()
412+
);
413+
}
414+
}
415+
358416
public void testIndexCanChangeCustomDataPath() throws Exception {
359417
final String index = "test-custom-data-path";
360418
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ static TransportVersion def(int id) {
344344
public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00);
345345
public static final TransportVersion PROJECT_STATE_REGISTRY_ENTRY = def(9_124_0_00);
346346
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
347+
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
347348

348349
/*
349350
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
/**
4343
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
44-
* and a map of shard ids to shard sizes, see
44+
* and a map of shard ids to shard sizes and shard write-loads, see
4545
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
4646
* for the key used in the shardSizes map
4747
*/
@@ -59,9 +59,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
6060
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
6161
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
62+
final Map<ShardId, Double> shardWriteLoads;
6263

6364
protected ClusterInfo() {
64-
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
65+
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
6566
}
6667

6768
/**
@@ -85,7 +86,8 @@ public ClusterInfo(
8586
Map<NodeAndShard, String> dataPath,
8687
Map<NodeAndPath, ReservedSpace> reservedSpace,
8788
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
88-
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools
89+
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
90+
Map<ShardId, Double> shardWriteLoads
8991
) {
9092
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
9193
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@@ -95,6 +97,7 @@ public ClusterInfo(
9597
this.reservedSpace = Map.copyOf(reservedSpace);
9698
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
9799
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
100+
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
98101
}
99102

100103
public ClusterInfo(StreamInput in) throws IOException {
@@ -116,6 +119,11 @@ public ClusterInfo(StreamInput in) throws IOException {
116119
} else {
117120
this.nodeUsageStatsForThreadPools = Map.of();
118121
}
122+
if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
123+
this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble);
124+
} else {
125+
this.shardWriteLoads = Map.of();
126+
}
119127
}
120128

121129
@Override
@@ -136,6 +144,9 @@ public void writeTo(StreamOutput out) throws IOException {
136144
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
137145
out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable);
138146
}
147+
if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
148+
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble);
149+
}
139150
}
140151

141152
/**
@@ -216,7 +227,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
216227
return builder.endObject(); // NodeAndPath
217228
}),
218229
endArray() // end "reserved_sizes"
219-
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools at this stage, to avoid
230+
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads at this stage, to avoid
220231
// committing to API payloads until the features are settled
221232
);
222233
}
@@ -255,6 +266,16 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
255266
return this.mostAvailableSpaceUsage;
256267
}
257268

269+
/**
270+
* Returns a map of shard IDs to the write-loads for use in balancing. The write-loads can be interpreted
271+
* as the average number of threads that ingestion to the shard will consume.
272+
* This information may be partial or missing altogether under some circumstances. The absence of a shard
273+
* write load from the map should be interpreted as "unknown".
274+
*/
275+
public Map<ShardId, Double> getShardWriteLoads() {
276+
return shardWriteLoads;
277+
}
278+
258279
/**
259280
* Returns the shard size for the given shardId or <code>null</code> if that metric is not available.
260281
*/
@@ -331,7 +352,9 @@ public boolean equals(Object o) {
331352
&& shardDataSetSizes.equals(that.shardDataSetSizes)
332353
&& dataPath.equals(that.dataPath)
333354
&& reservedSpace.equals(that.reservedSpace)
334-
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools);
355+
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
356+
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
357+
&& shardWriteLoads.equals(that.shardWriteLoads);
335358
}
336359

337360
@Override
@@ -343,7 +366,9 @@ public int hashCode() {
343366
shardDataSetSizes,
344367
dataPath,
345368
reservedSpace,
346-
nodeUsageStatsForThreadPools
369+
estimatedHeapUsages,
370+
nodeUsageStatsForThreadPools,
371+
shardWriteLoads
347372
);
348373
}
349374

@@ -466,6 +491,7 @@ public static class Builder {
466491
private Map<NodeAndPath, ReservedSpace> reservedSpace = Map.of();
467492
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
468493
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
494+
private Map<ShardId, Double> shardWriteLoads = Map.of();
469495

470496
public ClusterInfo build() {
471497
return new ClusterInfo(
@@ -476,7 +502,8 @@ public ClusterInfo build() {
476502
dataPath,
477503
reservedSpace,
478504
estimatedHeapUsages,
479-
nodeUsageStatsForThreadPools
505+
nodeUsageStatsForThreadPools,
506+
shardWriteLoads
480507
);
481508
}
482509

@@ -519,5 +546,10 @@ public Builder nodeUsageStatsForThreadPools(Map<String, NodeUsageStatsForThreadP
519546
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
520547
return this;
521548
}
549+
550+
public Builder shardWriteLoads(Map<ShardId, Double> shardWriteLoads) {
551+
this.shardWriteLoads = shardWriteLoads;
552+
return this;
553+
}
522554
}
523555
}

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ public ClusterInfo getClusterInfo() {
159159
dataPath,
160160
Map.of(),
161161
estimatedHeapUsages,
162-
nodeThreadPoolUsageStats
162+
nodeThreadPoolUsageStats,
163+
allocation.clusterInfo().getShardWriteLoads()
163164
);
164165
}
165166
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.unit.ByteSizeValue;
3939
import org.elasticsearch.common.util.concurrent.EsExecutors;
4040
import org.elasticsearch.core.TimeValue;
41+
import org.elasticsearch.index.shard.IndexingStats;
4142
import org.elasticsearch.index.shard.ShardId;
4243
import org.elasticsearch.index.store.StoreStats;
4344
import org.elasticsearch.threadpool.ThreadPool;
@@ -215,7 +216,7 @@ void execute() {
215216
logger.trace("starting async refresh");
216217

217218
try (var ignoredRefs = fetchRefs) {
218-
maybeFetchIndicesStats(diskThresholdEnabled);
219+
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED);
219220
maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled);
220221
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled);
221222
maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled);
@@ -301,7 +302,14 @@ public void onFailure(Exception e) {
301302
private void fetchIndicesStats() {
302303
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
303304
indicesStatsRequest.clear();
304-
indicesStatsRequest.store(true);
305+
if (diskThresholdEnabled) {
306+
// This returns the shard sizes on disk
307+
indicesStatsRequest.store(true);
308+
}
309+
if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) {
310+
// This returns the shard write-loads
311+
indicesStatsRequest.indexing(true);
312+
}
305313
indicesStatsRequest.indicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_CLOSED_HIDDEN);
306314
indicesStatsRequest.timeout(fetchTimeout);
307315
client.admin()
@@ -350,13 +358,15 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
350358
}
351359

352360
final ShardStats[] stats = indicesStatsResponse.getShards();
361+
final Map<ShardId, Double> shardWriteLoadByIdentifierBuilder = new HashMap<>();
353362
final Map<String, Long> shardSizeByIdentifierBuilder = new HashMap<>();
354363
final Map<ShardId, Long> shardDataSetSizeBuilder = new HashMap<>();
355364
final Map<ClusterInfo.NodeAndShard, String> dataPath = new HashMap<>();
356365
final Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace.Builder> reservedSpaceBuilders =
357366
new HashMap<>();
358367
buildShardLevelInfo(
359368
adjustShardStats(stats),
369+
shardWriteLoadByIdentifierBuilder,
360370
shardSizeByIdentifierBuilder,
361371
shardDataSetSizeBuilder,
362372
dataPath,
@@ -370,7 +380,8 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) {
370380
Map.copyOf(shardSizeByIdentifierBuilder),
371381
Map.copyOf(shardDataSetSizeBuilder),
372382
Map.copyOf(dataPath),
373-
Map.copyOf(reservedSpace)
383+
Map.copyOf(reservedSpace),
384+
Map.copyOf(shardWriteLoadByIdentifierBuilder)
374385
);
375386
}
376387

@@ -527,8 +538,6 @@ public ClusterInfo getClusterInfo() {
527538
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
528539
}
529540
});
530-
final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats = new HashMap<>();
531-
nodeThreadPoolUsageStatsPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeThreadPoolUsageStats.put(nodeId, nodeWriteLoad); });
532541
return new ClusterInfo(
533542
leastAvailableSpaceUsages,
534543
mostAvailableSpaceUsages,
@@ -537,7 +546,8 @@ public ClusterInfo getClusterInfo() {
537546
indicesStatsSummary.dataPath,
538547
indicesStatsSummary.reservedSpace,
539548
estimatedHeapUsages,
540-
nodeThreadPoolUsageStats
549+
nodeThreadPoolUsageStatsPerNode,
550+
indicesStatsSummary.shardWriteLoads()
541551
);
542552
}
543553

@@ -567,6 +577,7 @@ public void addListener(Consumer<ClusterInfo> clusterInfoConsumer) {
567577

568578
static void buildShardLevelInfo(
569579
ShardStats[] stats,
580+
Map<ShardId, Double> shardWriteLoads,
570581
Map<String, Long> shardSizes,
571582
Map<ShardId, Long> shardDataSetSizeBuilder,
572583
Map<ClusterInfo.NodeAndShard, String> dataPathByShard,
@@ -577,25 +588,31 @@ static void buildShardLevelInfo(
577588
dataPathByShard.put(ClusterInfo.NodeAndShard.from(shardRouting), s.getDataPath());
578589

579590
final StoreStats storeStats = s.getStats().getStore();
580-
if (storeStats == null) {
581-
continue;
582-
}
583-
final long size = storeStats.sizeInBytes();
584-
final long dataSetSize = storeStats.totalDataSetSizeInBytes();
585-
final long reserved = storeStats.reservedSizeInBytes();
586-
587-
final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
588-
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved);
589-
shardSizes.put(shardIdentifier, size);
590-
if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) {
591-
shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize);
591+
if (storeStats != null) {
592+
final long size = storeStats.sizeInBytes();
593+
final long dataSetSize = storeStats.totalDataSetSizeInBytes();
594+
final long reserved = storeStats.reservedSizeInBytes();
595+
596+
final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting);
597+
logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved);
598+
shardSizes.put(shardIdentifier, size);
599+
if (dataSetSize > shardDataSetSizeBuilder.getOrDefault(shardRouting.shardId(), -1L)) {
600+
shardDataSetSizeBuilder.put(shardRouting.shardId(), dataSetSize);
601+
}
602+
if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) {
603+
final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent(
604+
new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()),
605+
t -> new ClusterInfo.ReservedSpace.Builder()
606+
);
607+
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
608+
}
592609
}
593-
if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) {
594-
final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent(
595-
new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()),
596-
t -> new ClusterInfo.ReservedSpace.Builder()
597-
);
598-
reservedSpaceBuilder.add(shardRouting.shardId(), reserved);
610+
final IndexingStats indexingStats = s.getStats().getIndexing();
611+
if (indexingStats != null) {
612+
final double shardWriteLoad = indexingStats.getTotal().getPeakWriteLoad();
613+
if (shardWriteLoad > shardWriteLoads.getOrDefault(shardRouting.shardId(), -1.0)) {
614+
shardWriteLoads.put(shardRouting.shardId(), shardWriteLoad);
615+
}
599616
}
600617
}
601618
}
@@ -623,9 +640,10 @@ private record IndicesStatsSummary(
623640
Map<String, Long> shardSizes,
624641
Map<ShardId, Long> shardDataSetSizes,
625642
Map<ClusterInfo.NodeAndShard, String> dataPath,
626-
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace
643+
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace,
644+
Map<ShardId, Double> shardWriteLoads
627645
) {
628-
static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of());
646+
static final IndicesStatsSummary EMPTY = new IndicesStatsSummary(Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
629647
}
630648

631649
}

0 commit comments

Comments
 (0)