Skip to content

Commit 542c256

Browse files
committed
Populate nodesHeapUsage, make HeapUsageSupplier pluggable
1 parent 2d60021 commit 542c256

File tree

8 files changed

+127
-18
lines changed

8 files changed

+127
-18
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import static org.elasticsearch.cluster.routing.ShardRouting.newUnassigned;
3737
import static org.elasticsearch.cluster.routing.UnassignedInfo.Reason.REINITIALIZED;
3838
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk;
39-
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endArray;
39+
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.endObject;
4040
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.startObject;
4141

4242
/**
@@ -57,7 +57,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5757
final Map<ShardId, Long> shardDataSetSizes;
5858
final Map<NodeAndShard, String> dataPath;
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
60-
final Map<String, HeapUsage> nodeHeapUsage;
60+
final Map<String, HeapUsage> nodesHeapUsage;
6161

6262
protected ClusterInfo() {
6363
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
@@ -81,15 +81,15 @@ public ClusterInfo(
8181
Map<ShardId, Long> shardDataSetSizes,
8282
Map<NodeAndShard, String> dataPath,
8383
Map<NodeAndPath, ReservedSpace> reservedSpace,
84-
Map<String, HeapUsage> nodeHeapUsage
84+
Map<String, HeapUsage> nodesHeapUsage
8585
) {
8686
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
8787
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
8888
this.shardSizes = Map.copyOf(shardSizes);
8989
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
9090
this.dataPath = Map.copyOf(dataPath);
9191
this.reservedSpace = Map.copyOf(reservedSpace);
92-
this.nodeHeapUsage = Map.copyOf(nodeHeapUsage);
92+
this.nodesHeapUsage = Map.copyOf(nodesHeapUsage);
9393
}
9494

9595
public ClusterInfo(StreamInput in) throws IOException {
@@ -102,9 +102,9 @@ public ClusterInfo(StreamInput in) throws IOException {
102102
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
103103
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
104104
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
105-
this.nodeHeapUsage = in.readImmutableMap(HeapUsage::new);
105+
this.nodesHeapUsage = in.readImmutableMap(HeapUsage::new);
106106
} else {
107-
this.nodeHeapUsage = Map.of();
107+
this.nodesHeapUsage = Map.of();
108108
}
109109
}
110110

@@ -121,7 +121,7 @@ public void writeTo(StreamOutput out) throws IOException {
121121
}
122122
out.writeMap(this.reservedSpace);
123123
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
124-
out.writeMap(this.nodeHeapUsage, StreamOutput::writeWriteable);
124+
out.writeMap(this.nodesHeapUsage, StreamOutput::writeWriteable);
125125
}
126126
}
127127

@@ -202,7 +202,17 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
202202
}
203203
return builder.endObject(); // NodeAndPath
204204
}),
205-
endArray() // end "reserved_sizes"
205+
chunk(
206+
(builder, p) -> builder.endArray() // end "reserved_sizes"
207+
.startObject("heap_usage")
208+
),
209+
Iterators.map(nodesHeapUsage.entrySet().iterator(), c -> (builder, p) -> {
210+
builder.startObject(c.getKey());
211+
c.getValue().toShortXContent(builder);
212+
builder.endObject();
213+
return builder;
214+
}),
215+
endObject() // end "heap_usage"
206216
);
207217
}
208218

server/src/main/java/org/elasticsearch/cluster/HeapUsage.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ public void writeTo(StreamOutput out) throws IOException {
3535
out.writeVLong(this.freeBytes);
3636
}
3737

38-
@Override
39-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
40-
builder.field("node_id", this.nodeId);
38+
public XContentBuilder toShortXContent(XContentBuilder builder) throws IOException {
4139
builder.field("node_name", this.nodeName);
4240
builder.humanReadableField("total_heap_bytes", "total", ByteSizeValue.ofBytes(this.totalBytes));
4341
builder.humanReadableField("used_heap_bytes", "used", ByteSizeValue.ofBytes(this.usedBytes()));
@@ -47,6 +45,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
4745
return builder;
4846
}
4947

48+
@Override
49+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
50+
builder.field("node_id", this.nodeId);
51+
toShortXContent(builder);
52+
return builder;
53+
}
54+
5055
public double freeHeapAsPercentage() {
5156
return 100.0 * freeBytes / (double) totalBytes;
5257
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
14+
import java.util.Map;
15+
16+
public interface HeapUsageSupplier {
17+
18+
/**
19+
* This will be used when there are no heap usage suppliers available
20+
*/
21+
HeapUsageSupplier EMPTY = listener -> listener.onResponse(Map.of());
22+
23+
/**
24+
* Get the heap usage for every node in the cluster
25+
*
26+
* @param listener The listener which will receive the results
27+
*/
28+
void getClusterHeapUsage(ActionListener<Map<String, HeapUsage>> listener);
29+
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,12 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
8989
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
9090
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
9191
private volatile IndicesStatsSummary indicesStatsSummary;
92-
private volatile Map<String, HeapUsage> nodeHeapUsages;
92+
private volatile Map<String, HeapUsage> nodesHeapUsage;
9393

9494
private final ThreadPool threadPool;
9595
private final Client client;
9696
private final List<Consumer<ClusterInfo>> listeners = new CopyOnWriteArrayList<>();
97+
private final HeapUsageSupplier heapUsageSupplier;
9798

9899
private final Object mutex = new Object();
99100
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();
@@ -102,13 +103,20 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
102103
private RefreshScheduler refreshScheduler;
103104

104105
@SuppressWarnings("this-escape")
105-
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
106+
public InternalClusterInfoService(
107+
Settings settings,
108+
ClusterService clusterService,
109+
ThreadPool threadPool,
110+
Client client,
111+
HeapUsageSupplier heapUsageSupplier
112+
) {
106113
this.leastAvailableSpaceUsages = Map.of();
107114
this.mostAvailableSpaceUsages = Map.of();
108-
this.nodeHeapUsages = Map.of();
115+
this.nodesHeapUsage = Map.of();
109116
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
110117
this.threadPool = threadPool;
111118
this.client = client;
119+
this.heapUsageSupplier = heapUsageSupplier;
112120
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
113121
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
114122
this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
@@ -175,6 +183,7 @@ void execute() {
175183
logger.trace("skipping collecting info from cluster, notifying listeners with empty cluster info");
176184
leastAvailableSpaceUsages = Map.of();
177185
mostAvailableSpaceUsages = Map.of();
186+
nodesHeapUsage = Map.of();
178187
indicesStatsSummary = IndicesStatsSummary.EMPTY;
179188
callListeners();
180189
return;
@@ -189,9 +198,26 @@ void execute() {
189198
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
190199
fetchIndicesStats();
191200
}
201+
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
202+
fetchNodesHeapUsage();
203+
}
192204
}
193205
}
194206

207+
private void fetchNodesHeapUsage() {
208+
heapUsageSupplier.getClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
209+
@Override
210+
public void onResponse(Map<String, HeapUsage> stringHeapUsageMap) {
211+
nodesHeapUsage = stringHeapUsageMap;
212+
}
213+
214+
@Override
215+
public void onFailure(Exception e) {
216+
logger.warn("failed to fetch heap usage for nodes", e);
217+
}
218+
}, fetchRefs.acquire()));
219+
}
220+
195221
private void fetchIndicesStats() {
196222
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
197223
indicesStatsRequest.clear();
@@ -416,7 +442,7 @@ public ClusterInfo getClusterInfo() {
416442
indicesStatsSummary.shardDataSetSizes,
417443
indicesStatsSummary.dataPath,
418444
indicesStatsSummary.reservedSpace,
419-
nodeHeapUsages
445+
nodesHeapUsage
420446
);
421447
}
422448

server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.search.OnlinePrewarmingService;
1313
import org.elasticsearch.client.internal.node.NodeClient;
1414
import org.elasticsearch.cluster.ClusterInfoService;
15+
import org.elasticsearch.cluster.HeapUsageSupplier;
1516
import org.elasticsearch.cluster.InternalClusterInfoService;
1617
import org.elasticsearch.cluster.node.DiscoveryNode;
1718
import org.elasticsearch.cluster.service.ClusterService;
@@ -28,6 +29,7 @@
2829
import org.elasticsearch.indices.IndicesService;
2930
import org.elasticsearch.indices.breaker.CircuitBreakerService;
3031
import org.elasticsearch.indices.recovery.RecoverySettings;
32+
import org.elasticsearch.plugins.ClusterPlugin;
3133
import org.elasticsearch.plugins.PluginsLoader;
3234
import org.elasticsearch.plugins.PluginsService;
3335
import org.elasticsearch.readiness.ReadinessService;
@@ -44,6 +46,7 @@
4446
import org.elasticsearch.transport.TransportService;
4547

4648
import java.util.Map;
49+
import java.util.Objects;
4750
import java.util.function.Function;
4851
import java.util.function.LongSupplier;
4952

@@ -74,7 +77,14 @@ ClusterInfoService newClusterInfoService(
7477
ThreadPool threadPool,
7578
NodeClient client
7679
) {
77-
final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client);
80+
final HeapUsageSupplier heapUsageSupplier = getHeapUsageSupplier(pluginsService);
81+
final InternalClusterInfoService service = new InternalClusterInfoService(
82+
settings,
83+
clusterService,
84+
threadPool,
85+
client,
86+
heapUsageSupplier
87+
);
7888
if (DiscoveryNode.isMasterNode(settings)) {
7989
// listen for state changes (this node starts/stops being the elected master, or new nodes are added)
8090
clusterService.addListener(service);
@@ -146,4 +156,16 @@ void processRecoverySettings(PluginsService pluginsService, ClusterSettings clus
146156
ReadinessService newReadinessService(PluginsService pluginsService, ClusterService clusterService, Environment environment) {
147157
return new ReadinessService(clusterService, environment);
148158
}
159+
160+
private static HeapUsageSupplier getHeapUsageSupplier(PluginsService pluginsService) {
161+
final var heapUsageSuppliers = pluginsService.filterPlugins(ClusterPlugin.class)
162+
.map(ClusterPlugin::getHeapUsageSupplier)
163+
.filter(Objects::nonNull)
164+
.toList();
165+
return switch (heapUsageSuppliers.size()) {
166+
case 0 -> HeapUsageSupplier.EMPTY;
167+
case 1 -> heapUsageSuppliers.getFirst();
168+
default -> throw new IllegalArgumentException("multiple plugins define heap usage suppliers, which is not permitted");
169+
};
170+
}
149171
}

server/src/main/java/org/elasticsearch/plugins/ClusterPlugin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.plugins;
1111

12+
import org.elasticsearch.cluster.HeapUsageSupplier;
1213
import org.elasticsearch.cluster.routing.ShardRoutingRoleStrategy;
1314
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
1415
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
@@ -41,6 +42,16 @@ default Collection<AllocationDecider> createAllocationDeciders(Settings settings
4142
return Collections.emptyList();
4243
}
4344

45+
/**
46+
* Create a {@link HeapUsageSupplier} that will be used to determine the approximate heap usage for all
47+
* cluster nodes
48+
* <p>
49+
* Note: Only a single {@link ClusterPlugin} can define a heap usage supplier.
50+
*/
51+
default HeapUsageSupplier getHeapUsageSupplier() {
52+
return null;
53+
}
54+
4455
/**
4556
* Return {@link ShardsAllocator} implementations added by this plugin.
4657
*

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,13 @@ protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
7171
final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
7272

7373
final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool);
74-
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client);
74+
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(
75+
settings,
76+
clusterService,
77+
threadPool,
78+
client,
79+
HeapUsageSupplier.EMPTY
80+
);
7581
clusterService.addListener(clusterInfoService);
7682
clusterInfoService.addListener(ignored -> {});
7783

test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static class TestPlugin extends Plugin {}
4343
private volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;
4444

4545
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
46-
super(settings, clusterService, threadPool, client);
46+
super(settings, clusterService, threadPool, client, HeapUsageSupplier.EMPTY);
4747
}
4848

4949
public void setDiskUsageFunctionAndRefresh(BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFn) {

0 commit comments

Comments
 (0)