Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
2d60021
Add nodeHeapUsage field to ClusterInfo
nicktindall Jun 2, 2025
542c256
Populate nodesHeapUsage, make HeapUsageSupplier pluggable
nicktindall Jun 2, 2025
356beb5
Fix tests
nicktindall Jun 2, 2025
81fd063
Allow deferred creation of HeapUsageSupplier
nicktindall Jun 3, 2025
bc0682c
Default HeapUsageSupplier
nicktindall Jun 3, 2025
747b5a2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 3, 2025
13d1de8
Clarify that heap usage is a minimum
nicktindall Jun 3, 2025
f4d9db5
Test that InternalClusterInfoService polls for heap usage
nicktindall Jun 3, 2025
bf51e85
Test that getNodesHeapUsage returns heap usage
nicktindall Jun 3, 2025
c47c0ca
More caveats for #getNodesHeapUsage()
nicktindall Jun 3, 2025
23eb8e6
Remove HeapUsageSupplier from ClusterPlugin interface
nicktindall Jun 4, 2025
887bcaf
Swap free for used in HeapUsage
nicktindall Jun 4, 2025
7275acb
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
85fd019
Don't report heap usage in ClusterInfo serialization
nicktindall Jun 4, 2025
f112a3b
Fix tests
nicktindall Jun 4, 2025
3a1ada2
Only skip disk usage fetches when disk usage is disabled
nicktindall Jun 4, 2025
8fa587f
HeapUsage -> ShardHeapUsage
nicktindall Jun 4, 2025
6d4b204
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 4, 2025
2c42a82
icis -> internalClusterInfoService
nicktindall Jun 4, 2025
58402bd
diskUsage -> shardHeapUsage
nicktindall Jun 4, 2025
63bbea8
Note about not serializing shardHeapUsages
nicktindall Jun 4, 2025
0cacdc7
Remove unused serialization interface/methods
nicktindall Jun 4, 2025
dd73d37
Additional assertions
nicktindall Jun 4, 2025
765ade8
Clear shardHeapUsages on failure to fetch
nicktindall Jun 4, 2025
e26b62f
Fix naming
nicktindall Jun 4, 2025
55637b6
Restore + test percentage methods
nicktindall Jun 5, 2025
f4b90b5
Load ShardHeapUsageSupplier via SPI
nicktindall Jun 5, 2025
0789fef
Move SPI config to internalClusterTest
nicktindall Jun 5, 2025
2d475c8
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 5, 2025
f56f00e
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 6, 2025
7b5bf95
*Supplier -> *Collector
nicktindall Jun 7, 2025
08a5ca3
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 7, 2025
09ca9dc
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 8, 2025
cd6b7e9
Don't assert estimate <= max heap
nicktindall Jun 10, 2025
5e2cb9f
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 10, 2025
c529194
Merge branch 'main' into ES_11445_add_heap_memory_to_cluster_info
nicktindall Jun 10, 2025
d831194
Use node stats to retrieve max heap size
nicktindall Jun 11, 2025
b8387bb
[CI] Auto commit changes from spotless
Jun 11, 2025
26dba4d
Fix build
nicktindall Jun 11, 2025
f15fca2
Merge remote-tracking branch 'origin/main' into ES_11445_add_heap_mem…
nicktindall Jun 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INFERENCE_CUSTOM_SERVICE_ADDED = def(9_084_0_00);
public static final TransportVersion ESQL_LIMIT_ROW_SIZE = def(9_085_0_00);
public static final TransportVersion ESQL_REGEX_MATCH_WITH_CASE_INSENSITIVITY = def(9_086_0_00);
public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_087_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
29 changes: 25 additions & 4 deletions server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import static org.elasticsearch.cluster.routing.ShardRouting.newUnassigned;
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.REINITIALIZED;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endArray;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endObject;
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject;

/**
Expand All @@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<ShardId, Long> shardDataSetSizes;
final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, HeapUsage> nodesHeapUsage;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add comment to say this field is deliberately ignored in toXContentChunked so that another reader knows this is intentional and not a bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 63bbea8


protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
}

/**
Expand All @@ -79,14 +80,16 @@ public ClusterInfo(
Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizes,
Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace
Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, HeapUsage> nodesHeapUsage
) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
this.shardSizes = Map.copyOf(shardSizes);
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace);
this.nodesHeapUsage = Map.copyOf(nodesHeapUsage);
}

public ClusterInfo(StreamInput in) throws IOException {
Expand All @@ -98,6 +101,11 @@ public ClusterInfo(StreamInput in) throws IOException {
? in.readImmutableMap(NodeAndShard::new, StreamInput::readString)
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
this.nodesHeapUsage = in.readImmutableMap(HeapUsage::new);
} else {
this.nodesHeapUsage = Map.of();
}
}

@Override
Expand All @@ -112,6 +120,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString);
}
out.writeMap(this.reservedSpace);
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.nodesHeapUsage, StreamOutput::writeWriteable);
}
}

/**
Expand Down Expand Up @@ -191,7 +202,17 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
}
return builder.endObject(); // NodeAndPath
}),
endArray() // end "reserved_sizes"
chunk(
(builder, p) -> builder.endArray() // end "reserved_sizes"
.startObject("heap_usage")
),
Iterators.map(nodesHeapUsage.entrySet().iterator(), c -> (builder, p) -> {
builder.startObject(c.getKey());
c.getValue().toShortXContent(builder);
builder.endObject();
return builder;
}),
endObject() // end "heap_usage"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't want include the new heap usage in the xcontent just yet? This will be a visible stateful change via the AllocationExplain API. Since we may still want to iterate on this field once we get more data from serverless, not including it here gives us flexibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed in 85fd019

);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public ClusterInfo getClusterInfo() {
shardSizes.toImmutableMap(),
shardDataSetSizes,
dataPath,
Map.of(),
Map.of()
);
}
Expand Down
70 changes: 70 additions & 0 deletions server/src/main/java/org/elasticsearch/cluster/HeapUsage.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Record representing the heap usage for a single cluster node
*/
public record HeapUsage(String nodeId, String nodeName, long totalBytes, long freeBytes) implements ToXContentFragment, Writeable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the freeBytes is probably modelled after DiskUsage. But I feel usedBytes (and in turn usedPercent) is more suitable here since that is what we actually measure and more commonly talked about? I think DiskUsage uses freeBytes due to the methods available via Java NIO's FileStore class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 887bcaf


public HeapUsage(StreamInput in) throws IOException {
this(in.readString(), in.readString(), in.readVLong(), in.readVLong());
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeString(this.nodeName);
out.writeVLong(this.totalBytes);
out.writeVLong(this.freeBytes);
}

public XContentBuilder toShortXContent(XContentBuilder builder) throws IOException {
builder.field("node_name", this.nodeName);
builder.humanReadableField("total_heap_bytes", "total", ByteSizeValue.ofBytes(this.totalBytes));
builder.humanReadableField("used_heap_bytes", "used", ByteSizeValue.ofBytes(this.usedBytes()));
builder.humanReadableField("free_heap_bytes", "free", ByteSizeValue.ofBytes(this.freeBytes));
builder.field("free_heap_percent", truncatePercent(this.freeHeapAsPercentage()));
builder.field("used_heap_percent", truncatePercent(this.usedHeapAsPercentage()));
return builder;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("node_id", this.nodeId);
toShortXContent(builder);
return builder;
}

public double freeHeapAsPercentage() {
return 100.0 * freeBytes / (double) totalBytes;
}

public double usedHeapAsPercentage() {
return 100.0 - freeHeapAsPercentage();
}

public long usedBytes() {
return totalBytes - freeBytes;
}

private static double truncatePercent(double pct) {
return Math.round(pct * 10.0) / 10.0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster;

import org.elasticsearch.action.ActionListener;

import java.util.Map;

public interface HeapUsageSupplier {

/**
* This will be used when there are no heap usage suppliers available
*/
HeapUsageSupplier EMPTY = listener -> listener.onResponse(Map.of());

/**
* Get the heap usage for every node in the cluster
*
* @param listener The listener which will receive the results
*/
void getClusterHeapUsage(ActionListener<Map<String, HeapUsage>> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile IndicesStatsSummary indicesStatsSummary;
private volatile Map<String, HeapUsage> nodesHeapUsage;

private final ThreadPool threadPool;
private final Client client;
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();
private final HeapUsageSupplier heapUsageSupplier;

private final Object mutex = new Object();
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();
Expand All @@ -101,12 +103,20 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private RefreshScheduler refreshScheduler;

@SuppressWarnings("this-escape")
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
public InternalClusterInfoService(
Settings settings,
ClusterService clusterService,
ThreadPool threadPool,
Client client,
HeapUsageSupplier heapUsageSupplier
) {
this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of();
this.nodesHeapUsage = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool;
this.client = client;
this.heapUsageSupplier = heapUsageSupplier;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
Expand Down Expand Up @@ -173,6 +183,7 @@ void execute() {
logger.trace("skipping collecting info from cluster, notifying listeners with empty cluster info");
leastAvailableSpaceUsages = Map.of();
mostAvailableSpaceUsages = Map.of();
nodesHeapUsage = Map.of();
indicesStatsSummary = IndicesStatsSummary.EMPTY;
callListeners();
return;
Expand All @@ -187,9 +198,26 @@ void execute() {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchIndicesStats();
}
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodesHeapUsage();
}
}
}

private void fetchNodesHeapUsage() {
heapUsageSupplier.getClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(Map<String, HeapUsage> stringHeapUsageMap) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nit:

Suggested change
public void onResponse(Map<String, HeapUsage> stringHeapUsageMap) {
public void onResponse(Map<String, HeapUsage> currentHeapUsageMap) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in e26b62f

nodesHeapUsage = stringHeapUsageMap;
}

@Override
public void onFailure(Exception e) {
logger.warn("failed to fetch heap usage for nodes", e);
}
}, fetchRefs.acquire()));
}

private void fetchIndicesStats() {
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear();
Expand Down Expand Up @@ -413,7 +441,8 @@ public ClusterInfo getClusterInfo() {
indicesStatsSummary.shardSizes,
indicesStatsSummary.shardDataSetSizes,
indicesStatsSummary.dataPath,
indicesStatsSummary.reservedSpace
indicesStatsSummary.reservedSpace,
nodesHeapUsage
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.search.OnlinePrewarmingService;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.HeapUsageSupplier;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.PluginsLoader;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.readiness.ReadinessService;
Expand All @@ -44,6 +46,7 @@
import org.elasticsearch.transport.TransportService;

import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -74,7 +77,14 @@ ClusterInfoService newClusterInfoService(
ThreadPool threadPool,
NodeClient client
) {
final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
final HeapUsageSupplier heapUsageSupplier = getHeapUsageSupplier(pluginsService);
final InternalClusterInfoService service = new InternalClusterInfoService(
settings,
clusterService,
threadPool,
client,
heapUsageSupplier
);
if (DiscoveryNode.isMasterNode(settings)) {
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
clusterService.addListener(service);
Expand Down Expand Up @@ -146,4 +156,16 @@ void processRecoverySettings(PluginsService pluginsService, ClusterSettings clus
ReadinessService newReadinessService(PluginsService pluginsService, ClusterService clusterService, Environment environment) {
return new ReadinessService(clusterService, environment);
}

private static HeapUsageSupplier getHeapUsageSupplier(PluginsService pluginsService) {
final var heapUsageSuppliers = pluginsService.filterPlugins(ClusterPlugin.class)
.map(ClusterPlugin::getHeapUsageSupplier)
.filter(Objects::nonNull)
.toList();
return switch (heapUsageSuppliers.size()) {
case 0 -> HeapUsageSupplier.EMPTY;
case 1 -> heapUsageSuppliers.getFirst();
default -> throw new IllegalArgumentException("multiple plugins define heap usage suppliers, which is not permitted");
};
}
}
11 changes: 11 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.elasticsearch.plugins;

import org.elasticsearch.cluster.HeapUsageSupplier;
import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy;
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
Expand Down Expand Up @@ -41,6 +42,16 @@ default Collection<AllocationDecider> createAllocationDeciders(Settings settings
return Collections.emptyList();
}

/**
* Create a {@link HeapUsageSupplier} that will be used to determine the approximate heap usage for all
* cluster nodes
* <p>
* Note: Only a single {@link ClusterPlugin} can define a heap usage supplier.
*/
default HeapUsageSupplier getHeapUsageSupplier() {
return null;
}

/**
* Return {@link ShardsAllocator} implementations added by this plugin.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,28 @@ public static ClusterInfo randomClusterInfo() {
randomShardSizes(),
randomDataSetSizes(),
randomRoutingToDataPath(),
randomReservedSpace()
randomReservedSpace(),
randomNodeHeapUsage()
);
}

private static Map<String, HeapUsage> randomNodeHeapUsage() {
int numEntries = randomIntBetween(0, 128);
Map<String, HeapUsage> nodeHeapUsage = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
String key = randomAlphaOfLength(32);
final int totalBytes = randomIntBetween(0, Integer.MAX_VALUE);
final HeapUsage diskUsage = new HeapUsage(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
final HeapUsage diskUsage = new HeapUsage(
final HeapUsage heapUsage = new HeapUsage(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 58402bd

randomAlphaOfLength(4),
randomAlphaOfLength(4),
totalBytes,
randomIntBetween(0, totalBytes)
);
nodeHeapUsage.put(key, diskUsage);
}
return nodeHeapUsage;
}

private static Map<String, DiskUsage> randomDiskUsage() {
int numEntries = randomIntBetween(0, 128);
Map<String, DiskUsage> builder = new HashMap<>(numEntries);
Expand Down
Loading