Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2d60021
Add nodeHeapUsage field to ClusterInfo
nicktindall Jun 2, 2025
542c256
Populate nodesHeapUsage, make HeapUsageSupplier pluggable
nicktindall Jun 2, 2025
356beb5
Fix tests
nicktindall Jun 2, 2025
81fd063
Allow deferred creation of HeapUsageSupplier
nicktindall Jun 3, 2025
bc0682c
Default HeapUsageSupplier
nicktindall Jun 3, 2025
747b5a2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 3, 2025
13d1de8
Clarify that heap usage is a minimum
nicktindall Jun 3, 2025
f4d9db5
Test that InternalClusterInfoService polls for heap usage
nicktindall Jun 3, 2025
bf51e85
Test that getNodesHeapUsage returns heap usage
nicktindall Jun 3, 2025
c47c0ca
More caveats for #getNodesHeapUsage()
nicktindall Jun 3, 2025
23eb8e6
Remove HeapUsageSupplier from ClusterPlugin interface
nicktindall Jun 4, 2025
887bcaf
Swap free for used in HeapUsage
nicktindall Jun 4, 2025
7275acb
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
85fd019
Don't report heap usage in ClusterInfo serialization
nicktindall Jun 4, 2025
f112a3b
Fix tests
nicktindall Jun 4, 2025
3a1ada2
Only skip disk usage fetches when disk usage is disabled
nicktindall Jun 4, 2025
8fa587f
HeapUsage -> ShardHeapUsage
nicktindall Jun 4, 2025
6d4b204
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
2c42a82
icis -> internalClusterInfoService
nicktindall Jun 4, 2025
58402bd
diskUsage -> shardHeapUsage
nicktindall Jun 4, 2025
63bbea8
Note about not serializing shardHeapUsages
nicktindall Jun 4, 2025
0cacdc7
Remove unused serialization interface/methods
nicktindall Jun 4, 2025
dd73d37
Additional assertions
nicktindall Jun 4, 2025
765ade8
Clear shardHeapUsages on failure to fetch
nicktindall Jun 4, 2025
e26b62f
Fix naming
nicktindall Jun 4, 2025
55637b6
Restore + test percentage methods
nicktindall Jun 5, 2025
f4b90b5
Load ShardHeapUsageSupplier via SPI
nicktindall Jun 5, 2025
0789fef
Move SPI config to internalClusterTest
nicktindall Jun 5, 2025
2d475c8
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 5, 2025
f56f00e
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 6, 2025
7b5bf95
*Supplier -> *Collector
nicktindall Jun 7, 2025
08a5ca3
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 7, 2025
09ca9dc
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 8, 2025
cd6b7e9
Don't assert estimate <= max heap
nicktindall Jun 10, 2025
5e2cb9f
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 10, 2025
c529194
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 10, 2025
d831194
Use node stats to retrieve max heap size
nicktindall Jun 11, 2025
b8387bb
[CI] Auto commit changes from spotless
Jun 11, 2025
26dba4d
Fix build
nicktindall Jun 11, 2025
f15fca2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.HeapUsage;
import org.elasticsearch.cluster.HeapUsageSupplier;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -61,6 +63,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.DummyShardLock;
Expand All @@ -81,6 +84,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -89,6 +93,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
Expand All @@ -110,12 +115,13 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class IndexShardIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(InternalSettingsPlugin.class);
return pluginList(InternalSettingsPlugin.class, BogusHeapUsagePlugin.class);
}

public void testLockTryingToDelete() throws Exception {
Expand Down Expand Up @@ -253,6 +259,20 @@ public void testExpectedShardSizeIsPresent() throws InterruptedException {
assertThat(dataSetSize.get(), greaterThan(0L));
}

public void testHeapUsageEstimateIsPresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Map<String, HeapUsage> heapUsages = clusterInfoService.getClusterInfo().getNodesHeapUsage();
assertNotNull(heapUsages);
assertEquals(state.nodes().size(), heapUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(heapUsages.containsKey(node.getId()));
HeapUsage heapUsage = heapUsages.get(node.getId());
assertThat(heapUsage.freeBytes(), lessThanOrEqualTo(heapUsage.totalBytes()));
}
}

public void testIndexCanChangeCustomDataPath() throws Exception {
final String index = "test-custom-data-path";
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
Expand Down Expand Up @@ -795,4 +815,43 @@ private static void assertAllIndicesRemovedAndDeletionCompleted(Iterable<Indices
assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES);
}
}

private static class BogusHeapUsageSupplier implements HeapUsageSupplier {

private final ClusterService clusterService;

private BogusHeapUsageSupplier(ClusterService clusterService) {
this.clusterService = clusterService;
}

@Override
public void getClusterHeapUsage(ActionListener<Map<String, HeapUsage>> listener) {
ActionListener.completeWith(
listener,
() -> clusterService.state().nodes().stream().collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> {
final long maxHeap = randomNonNegativeLong();
final long freeHeap = (long) (randomFloat() * maxHeap);
return new HeapUsage(node.getId(), node.getName(), maxHeap, freeHeap);
}))
);
}
}

public static class BogusHeapUsagePlugin extends Plugin implements ClusterPlugin {

private BogusHeapUsageSupplier bogusHeapUsageSupplier;

public BogusHeapUsagePlugin() {}

@Override
public Collection<?> createComponents(PluginServices services) {
bogusHeapUsageSupplier = new BogusHeapUsageSupplier(services.clusterService());
return Collections.singletonList(bogusHeapUsageSupplier);
}

@Override
public HeapUsageSupplier getHeapUsageSupplier() {
return bogusHeapUsageSupplier;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_085_0_00);
public static final TransportVersion ESQL_REGEX_MATCH_WITH_CASE_INSENSITIVITY = def(9_086_0_00);
public static final TransportVersion IDP_CUSTOM_SAML_ATTRIBUTES = def(9_087_0_00);
public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_088_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
42 changes: 37 additions & 5 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.elasticsearch.cluster.routing.ShardRouting.newUnassigned;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.REINITIALIZED;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endArray;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endObject;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject;

/**
Expand All @@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<ShardId, Long> shardDataSetSizes;
final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, HeapUsage> nodesHeapUsage;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add comment to say this field is deliberately ignored in toXContentChunked so that another reader knows this is intentional and not a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 63bbea8


protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -71,6 +72,7 @@ protected ClusterInfo() {
* @param shardDataSetSizes a shard id to data set size in bytes mapping per shard
* @param dataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path
* @param nodesHeapUsage heap usage broken down by node
* @see #shardIdentifierFromRouting
*/
public ClusterInfo(
Expand All @@ -79,14 +81,16 @@ public ClusterInfo(
Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizes,
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, HeapUsage> nodesHeapUsage
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
this.shardSizes = Map.copyOf(shardSizes);
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace);
this.nodesHeapUsage = Map.copyOf(nodesHeapUsage);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -98,6 +102,11 @@ public ClusterInfo(StreamInput in) throws IOException {
? in.readImmutableMap(NodeAndShard::new, StreamInput::readString)
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
this.nodesHeapUsage = in.readImmutableMap(HeapUsage::new);
} else {
this.nodesHeapUsage = Map.of();
}
}

@Override
Expand All @@ -112,6 +121,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString);
}
out.writeMap(this.reservedSpace);
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.nodesHeapUsage, StreamOutput::writeWriteable);
}
}

/**
Expand Down Expand Up @@ -191,10 +203,29 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
}
return builder.endObject(); // NodeAndPath
}),
endArray() // end "reserved_sizes"
chunk(
(builder, p) -> builder.endArray() // end "reserved_sizes"
.startObject("heap_usage")
),
Iterators.map(nodesHeapUsage.entrySet().iterator(), c -> (builder, p) -> {
builder.startObject(c.getKey());
c.getValue().toShortXContent(builder);
builder.endObject();
return builder;
}),
endObject() // end "heap_usage"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want include the new heap usage in the xcontent just yet? This will be a visible stateful change via the AllocationExplain API. Since we may still want to iterate on this field once we get more data from serverless, not including it here gives us flexibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 85fd019

);
}

/**
* Returns a node id to estimated heap usage mapping for all nodes that we have such data for.
* Note that these estimates should be considered minimums. They may be used to determine whether
* there IS NOT capacity to do something, but not to determine that there IS capacity to do something.
*/
public Map<String, HeapUsage> getNodesHeapUsage() {
return nodesHeapUsage;
}

/**
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.
Expand Down Expand Up @@ -301,7 +332,8 @@ public String toString() {

// exposed for tests, computed here rather than exposing all the collections separately
int getChunkCount() {
return leastAvailableSpaceUsage.size() + shardSizes.size() + shardDataSetSizes.size() + dataPath.size() + reservedSpace.size() + 6;
return leastAvailableSpaceUsage.size() + shardSizes.size() + shardDataSetSizes.size() + dataPath.size() + reservedSpace.size()
+ nodesHeapUsage.size() + 7;
}

public record NodeAndShard(String nodeId, ShardId shardId) implements Writeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public ClusterInfo getClusterInfo() {
shardSizes.toImmutableMap(),
shardDataSetSizes,
dataPath,
Map.of(),
Map.of()
);
}
Expand Down
70 changes: 70 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/HeapUsage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Record representing the heap usage for a single cluster node
*/
public record HeapUsage(String nodeId, String nodeName, long totalBytes, long freeBytes) implements ToXContentFragment, Writeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the freeBytes is probably modelled after DiskUsage. But I feel usedBytes (and in turn usedPercent) is more suitable here since that is what we actually measure and more commonly talked about? I think DiskUsage uses freeBytes due to the methods available via Java NIO's FileStore class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 887bcaf


public HeapUsage(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeString(this.nodeName);
out.writeVLong(this.totalBytes);
out.writeVLong(this.freeBytes);
}

public XContentBuilder toShortXContent(XContentBuilder builder) throws IOException {
builder.field("node_name", this.nodeName);
builder.humanReadableField("total_heap_bytes", "total", ByteSizeValue.ofBytes(this.totalBytes));
builder.humanReadableField("used_heap_bytes", "used", ByteSizeValue.ofBytes(this.usedBytes()));
builder.humanReadableField("free_heap_bytes", "free", ByteSizeValue.ofBytes(this.freeBytes));
builder.field("free_heap_percent", truncatePercent(this.freeHeapAsPercentage()));
builder.field("used_heap_percent", truncatePercent(this.usedHeapAsPercentage()));
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node_id", this.nodeId);
toShortXContent(builder);
return builder;
}

public double freeHeapAsPercentage() {
return 100.0 * freeBytes / (double) totalBytes;
}

public double usedHeapAsPercentage() {
return 100.0 - freeHeapAsPercentage();
}

public long usedBytes() {
return totalBytes - freeBytes;
}

private static double truncatePercent(double pct) {
return Math.round(pct * 10.0) / 10.0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.action.ActionListener;

import java.util.Map;

public interface HeapUsageSupplier {

/**
* This will be used when there are no heap usage suppliers available
*/
HeapUsageSupplier EMPTY = listener -> listener.onResponse(Map.of());

/**
* Get the heap usage for every node in the cluster
*
* @param listener The listener which will receive the results
*/
void getClusterHeapUsage(ActionListener<Map<String, HeapUsage>> listener);
}
Loading