Skip to content

Commit 4db0011

Browse files
authored
Expose node heap size in cluster info (#134436)
This is needed for a new Stateless decider that limits concurrent recoveries based on node heap size. Relates ES-12554
1 parent 2a1176a commit 4db0011

File tree

11 files changed

+75
-12
lines changed

11 files changed

+75
-12
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,19 @@ public void testShardWriteLoadsArePresent() {
410410
}
411411
}
412412

413+
public void testMaxHeapPerNodeIsPresent() {
414+
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
415+
ClusterInfoServiceUtils.refresh(clusterInfoService);
416+
Map<String, ByteSizeValue> maxHeapSizePerNode = clusterInfoService.getClusterInfo().getMaxHeapSizePerNode();
417+
assertNotNull(maxHeapSizePerNode);
418+
ClusterState state = getInstanceFromNode(ClusterService.class).state();
419+
assertEquals(state.nodes().size(), maxHeapSizePerNode.size());
420+
for (DiscoveryNode node : state.nodes()) {
421+
assertTrue(maxHeapSizePerNode.containsKey(node.getId()));
422+
assertThat(maxHeapSizePerNode.get(node.getId()), greaterThan(ByteSizeValue.ZERO));
423+
}
424+
}
425+
413426
public void testIndexCanChangeCustomDataPath() throws Exception {
414427
final String index = "test-custom-data-path";
415428
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ static TransportVersion def(int id) {
324324
public static final TransportVersion INFERENCE_API_EIS_DIAGNOSTICS = def(9_156_0_00);
325325
public static final TransportVersion ML_INFERENCE_ENDPOINT_CACHE = def(9_157_0_00);
326326
public static final TransportVersion INDEX_SOURCE = def(9_158_0_00);
327+
public static final TransportVersion MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO = def(9_159_0_00);
327328

328329
/*
329330
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,11 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
6363
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
6464
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
6565
final Map<ShardId, Double> shardWriteLoads;
66+
// max heap size per node ID
67+
final Map<String, ByteSizeValue> maxHeapSizePerNode;
6668

6769
protected ClusterInfo() {
68-
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
70+
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
6971
}
7072

7173
/**
@@ -80,6 +82,7 @@ protected ClusterInfo() {
8082
* @param estimatedHeapUsages estimated heap usage broken down by node
8183
* @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node
8284
* @see #shardIdentifierFromRouting
85+
* @param maxHeapSizePerNode node id to max heap size
8386
*/
8487
public ClusterInfo(
8588
Map<String, DiskUsage> leastAvailableSpaceUsage,
@@ -90,7 +93,8 @@ public ClusterInfo(
9093
Map<NodeAndPath, ReservedSpace> reservedSpace,
9194
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
9295
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
93-
Map<ShardId, Double> shardWriteLoads
96+
Map<ShardId, Double> shardWriteLoads,
97+
Map<String, ByteSizeValue> maxHeapSizePerNode
9498
) {
9599
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
96100
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@@ -101,6 +105,7 @@ public ClusterInfo(
101105
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
102106
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
103107
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
108+
this.maxHeapSizePerNode = Map.copyOf(maxHeapSizePerNode);
104109
}
105110

106111
public ClusterInfo(StreamInput in) throws IOException {
@@ -125,6 +130,11 @@ public ClusterInfo(StreamInput in) throws IOException {
125130
} else {
126131
this.shardWriteLoads = Map.of();
127132
}
133+
if (in.getTransportVersion().onOrAfter(TransportVersions.MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO)) {
134+
this.maxHeapSizePerNode = in.readImmutableMap(ByteSizeValue::readFrom);
135+
} else {
136+
this.maxHeapSizePerNode = Map.of();
137+
}
128138
}
129139

130140
@Override
@@ -144,6 +154,9 @@ public void writeTo(StreamOutput out) throws IOException {
144154
if (out.getTransportVersion().supports(SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
145155
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble);
146156
}
157+
if (out.getTransportVersion().onOrAfter(TransportVersions.MAX_HEAP_SIZE_PER_NODE_IN_CLUSTER_INFO)) {
158+
out.writeMap(this.maxHeapSizePerNode, StreamOutput::writeWriteable);
159+
}
147160
}
148161

149162
/**
@@ -224,8 +237,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
224237
return builder.endObject(); // NodeAndPath
225238
}),
226239
endArray() // end "reserved_sizes"
227-
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads at this stage, to avoid
228-
// committing to API payloads until the features are settled
240+
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads/maxHeapSizePerNode at this stage,
241+
// to avoid committing to API payloads until the features are settled
229242
);
230243
}
231244

@@ -326,6 +339,10 @@ public ReservedSpace getReservedSpace(String nodeId, String dataPath) {
326339
return result == null ? ReservedSpace.EMPTY : result;
327340
}
328341

342+
public Map<String, ByteSizeValue> getMaxHeapSizePerNode() {
343+
return this.maxHeapSizePerNode;
344+
}
345+
329346
/**
330347
* Method that incorporates the ShardId for the shard into a string that
331348
* includes a 'p' or 'r' depending on whether the shard is a primary.
@@ -351,7 +368,8 @@ public boolean equals(Object o) {
351368
&& reservedSpace.equals(that.reservedSpace)
352369
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
353370
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
354-
&& shardWriteLoads.equals(that.shardWriteLoads);
371+
&& shardWriteLoads.equals(that.shardWriteLoads)
372+
&& maxHeapSizePerNode.equals(that.maxHeapSizePerNode);
355373
}
356374

357375
@Override
@@ -365,7 +383,8 @@ public int hashCode() {
365383
reservedSpace,
366384
estimatedHeapUsages,
367385
nodeUsageStatsForThreadPools,
368-
shardWriteLoads
386+
shardWriteLoads,
387+
maxHeapSizePerNode
369388
);
370389
}
371390

@@ -489,6 +508,7 @@ public static class Builder {
489508
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
490509
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
491510
private Map<ShardId, Double> shardWriteLoads = Map.of();
511+
private Map<String, ByteSizeValue> maxHeapSizePerNode = Map.of();
492512

493513
public ClusterInfo build() {
494514
return new ClusterInfo(
@@ -500,7 +520,8 @@ public ClusterInfo build() {
500520
reservedSpace,
501521
estimatedHeapUsages,
502522
nodeUsageStatsForThreadPools,
503-
shardWriteLoads
523+
shardWriteLoads,
524+
maxHeapSizePerNode
504525
);
505526
}
506527

@@ -548,5 +569,10 @@ public Builder shardWriteLoads(Map<ShardId, Double> shardWriteLoads) {
548569
this.shardWriteLoads = shardWriteLoads;
549570
return this;
550571
}
572+
573+
public Builder maxHeapSizePerNode(Map<String, ByteSizeValue> maxHeapSizePerNode) {
574+
this.maxHeapSizePerNode = maxHeapSizePerNode;
575+
return this;
576+
}
551577
}
552578
}

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
1414
import org.elasticsearch.cluster.routing.ShardRouting;
1515
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
16+
import org.elasticsearch.common.unit.ByteSizeValue;
1617
import org.elasticsearch.common.util.CopyOnFirstWriteMap;
1718
import org.elasticsearch.index.shard.ShardId;
1819

@@ -35,6 +36,7 @@ public class ClusterInfoSimulator {
3536
private final Map<ShardId, Long> shardDataSetSizes;
3637
private final Map<NodeAndShard, String> dataPath;
3738
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
39+
private final Map<String, ByteSizeValue> maxHeapSizePerNode;
3840
private final ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator;
3941

4042
public ClusterInfoSimulator(RoutingAllocation allocation) {
@@ -45,6 +47,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
4547
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
4648
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4749
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
50+
this.maxHeapSizePerNode = Map.copyOf(allocation.clusterInfo().maxHeapSizePerNode);
4851
this.shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);
4952
}
5053

@@ -162,7 +165,8 @@ public ClusterInfo getClusterInfo() {
162165
Map.of(),
163166
estimatedHeapUsages,
164167
shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(),
165-
allocation.clusterInfo().getShardWriteLoads()
168+
allocation.clusterInfo().getShardWriteLoads(),
169+
maxHeapSizePerNode
166170
);
167171
}
168172
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,8 @@ private boolean shouldRefresh() {
539539
public ClusterInfo getClusterInfo() {
540540
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
541541
final Map<String, EstimatedHeapUsage> estimatedHeapUsages = new HashMap<>();
542-
maxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
542+
final var currentMaxHeapPerNode = this.maxHeapPerNode; // Make sure we use a consistent view
543+
currentMaxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
543544
final Long estimatedHeapUsage = estimatedHeapUsagePerNode.get(nodeId);
544545
if (estimatedHeapUsage != null) {
545546
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
@@ -554,7 +555,8 @@ public ClusterInfo getClusterInfo() {
554555
indicesStatsSummary.reservedSpace,
555556
estimatedHeapUsages,
556557
nodeThreadPoolUsageStatsPerNode,
557-
indicesStatsSummary.shardWriteLoads()
558+
indicesStatsSummary.shardWriteLoads(),
559+
currentMaxHeapPerNode
558560
);
559561
}
560562

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
210210
*
211211
* This method returns the corresponding initializing shard that would be allocated to this node.
212212
*/
213-
private static ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
213+
public static ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
214214
final ShardRouting initializingShard;
215215
if (shardRouting.unassigned()) {
216216
initializingShard = shardRouting.initialize(currentNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);

server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.cluster;
1010

1111
import org.elasticsearch.common.io.stream.Writeable;
12+
import org.elasticsearch.common.unit.ByteSizeValue;
1213
import org.elasticsearch.common.util.Maps;
1314
import org.elasticsearch.index.shard.ShardId;
1415
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
@@ -45,7 +46,8 @@ public static ClusterInfo randomClusterInfo() {
4546
randomReservedSpace(),
4647
randomNodeHeapUsage(),
4748
randomNodeUsageStatsForThreadPools(),
48-
randomShardWriteLoad()
49+
randomShardWriteLoad(),
50+
randomMaxHeapSizes()
4951
);
5052
}
5153

@@ -58,6 +60,15 @@ private static Map<ShardId, Double> randomShardWriteLoad() {
5860
return builder;
5961
}
6062

63+
private static Map<String, ByteSizeValue> randomMaxHeapSizes() {
64+
int numEntries = randomIntBetween(0, 128);
65+
Map<String, ByteSizeValue> nodeMaxHeapSizes = new HashMap<>(numEntries);
66+
for (int i = 0; i < numEntries; i++) {
67+
nodeMaxHeapSizes.put(randomAlphaOfLength(32), randomByteSizeValue());
68+
}
69+
return nodeMaxHeapSizes;
70+
}
71+
6172
private static Map<String, EstimatedHeapUsage> randomNodeHeapUsage() {
6273
int numEntries = randomIntBetween(0, 128);
6374
Map<String, EstimatedHeapUsage> nodeHeapUsage = new HashMap<>(numEntries);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,7 @@ public void testUnassignedAllocationPredictsDiskUsage() {
622622
ImmutableOpenMap.of(),
623623
ImmutableOpenMap.of(),
624624
ImmutableOpenMap.of(),
625+
ImmutableOpenMap.of(),
625626
ImmutableOpenMap.of()
626627
);
627628

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1415,6 +1415,7 @@ static class DevNullClusterInfo extends ClusterInfo {
14151415
reservedSpace,
14161416
Map.of(),
14171417
Map.of(),
1418+
Map.of(),
14181419
Map.of()
14191420
);
14201421
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public void testCanAllocateUsesMaxAvailableSpace() {
111111
Map.of(),
112112
Map.of(),
113113
Map.of(),
114+
Map.of(),
114115
Map.of()
115116
);
116117
RoutingAllocation allocation = new RoutingAllocation(
@@ -185,6 +186,7 @@ private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroo
185186
Map.of(),
186187
Map.of(),
187188
Map.of(),
189+
Map.of(),
188190
Map.of()
189191
);
190192
RoutingAllocation allocation = new RoutingAllocation(
@@ -333,6 +335,7 @@ private void doTestCanRemainUsesLeastAvailableSpace(boolean testMaxHeadroom) {
333335
Map.of(),
334336
Map.of(),
335337
Map.of(),
338+
Map.of(),
336339
Map.of()
337340
);
338341
RoutingAllocation allocation = new RoutingAllocation(

0 commit comments

Comments
 (0)