Skip to content

Commit f81f9f9

Browse files
Add node write load to the ClusterInfo
Sets up ClusterInfoService to collect node write load and pass it into ClusterInfo. The node write load stats are not yet supplied, they'll be zero/empty in the ClusterInfo for now. Relates ES-11990
1 parent a6dfe64 commit f81f9f9

File tree

30 files changed

+372
-28
lines changed

30 files changed

+372
-28
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
*/
99
package org.elasticsearch.index.shard;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.apache.lucene.index.DirectoryReader;
1214
import org.apache.lucene.store.LockObtainFailedException;
1315
import org.apache.lucene.util.SetOnce;
@@ -22,13 +24,16 @@
2224
import org.elasticsearch.cluster.EstimatedHeapUsage;
2325
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
2426
import org.elasticsearch.cluster.InternalClusterInfoService;
27+
import org.elasticsearch.cluster.NodeWriteLoad;
28+
import org.elasticsearch.cluster.WriteLoadCollector;
2529
import org.elasticsearch.cluster.metadata.IndexMetadata;
2630
import org.elasticsearch.cluster.node.DiscoveryNode;
2731
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
2832
import org.elasticsearch.cluster.routing.RecoverySource;
2933
import org.elasticsearch.cluster.routing.ShardRouting;
3034
import org.elasticsearch.cluster.routing.ShardRoutingState;
3135
import org.elasticsearch.cluster.routing.UnassignedInfo;
36+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
3237
import org.elasticsearch.cluster.service.ClusterService;
3338
import org.elasticsearch.common.Strings;
3439
import org.elasticsearch.common.UUIDs;
@@ -117,14 +122,16 @@
117122
import static org.hamcrest.Matchers.either;
118123
import static org.hamcrest.Matchers.equalTo;
119124
import static org.hamcrest.Matchers.greaterThan;
125+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
120126
import static org.hamcrest.Matchers.instanceOf;
121127
import static org.hamcrest.Matchers.lessThanOrEqualTo;
122128

123129
public class IndexShardIT extends ESSingleNodeTestCase {
130+
private static final Logger logger = LogManager.getLogger(IndexShardIT.class);
124131

125132
@Override
126133
protected Collection<Class<? extends Plugin>> getPlugins() {
127-
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class);
134+
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusWriteLoadCollectorPlugin.class);
128135
}
129136

130137
public void testLockTryingToDelete() throws Exception {
@@ -295,6 +302,47 @@ public void testHeapUsageEstimateIsPresent() {
295302
}
296303
}
297304

305+
public void testNodeWriteLoadsArePresent() {
306+
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
307+
ClusterInfoServiceUtils.refresh(clusterInfoService);
308+
Map<String, NodeWriteLoad> nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads();
309+
assertNotNull(nodeWriteLoads);
310+
/** Not collecting stats yet because allocation write load stats collection is disabled by default.
311+
* see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */
312+
assertTrue(nodeWriteLoads.isEmpty());
313+
314+
// Enable collection for node write loads.
315+
updateClusterSettings(
316+
Settings.builder()
317+
.put(
318+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
319+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
320+
)
321+
.build()
322+
);
323+
try {
324+
// Force a ClusterInfo refresh to run collection of the node write loads.
325+
ClusterInfoServiceUtils.refresh(clusterInfoService);
326+
nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads();
327+
328+
/** Verify that each node has a write load reported. The test {@link BogusWriteLoadCollector} generates random load values */
329+
ClusterState state = getInstanceFromNode(ClusterService.class).state();
330+
assertEquals(state.nodes().size(), nodeWriteLoads.size());
331+
for (DiscoveryNode node : state.nodes()) {
332+
assertTrue(nodeWriteLoads.containsKey(node.getId()));
333+
NodeWriteLoad nodeWriteLoad = nodeWriteLoads.get(node.getId());
334+
assertThat(nodeWriteLoad.nodeId(), equalTo(node.getId()));
335+
assertThat(nodeWriteLoad.totalWriteThreadPoolThreads(), greaterThanOrEqualTo(0));
336+
assertThat(nodeWriteLoad.percentWriteThreadPoolUtilization(), greaterThanOrEqualTo(0));
337+
assertThat(nodeWriteLoad.maxTaskTimeInWriteQueueMillis(), greaterThanOrEqualTo(0L));
338+
}
339+
} finally {
340+
updateClusterSettings(
341+
Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build()
342+
);
343+
}
344+
}
345+
298346
public void testIndexCanChangeCustomDataPath() throws Exception {
299347
final String index = "test-custom-data-path";
300348
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
@@ -875,4 +923,55 @@ public ClusterService getClusterService() {
875923
return clusterService.get();
876924
}
877925
}
926+
927+
/**
928+
* A simple {@link WriteLoadCollector} implementation that creates and returns random {@link NodeWriteLoad} for each node in the
929+
* cluster.
930+
* <p>
931+
* Note: there's an 'org.elasticsearch.cluster.WriteLoadCollector' file that declares this implementation so that the plugin system can
932+
* pick it up and use it for the test set-up.
933+
*/
934+
public static class BogusWriteLoadCollector implements WriteLoadCollector {
935+
936+
private final BogusWriteLoadCollectorPlugin plugin;
937+
938+
public BogusWriteLoadCollector(BogusWriteLoadCollectorPlugin plugin) {
939+
this.plugin = plugin;
940+
}
941+
942+
@Override
943+
public void collectWriteLoads(ActionListener<Map<String, NodeWriteLoad>> listener) {
944+
ActionListener.completeWith(
945+
listener,
946+
() -> plugin.getClusterService()
947+
.state()
948+
.nodes()
949+
.stream()
950+
.collect(
951+
Collectors.toUnmodifiableMap(
952+
DiscoveryNode::getId,
953+
node -> new NodeWriteLoad(node.getId(), randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeLong())
954+
)
955+
)
956+
);
957+
}
958+
}
959+
960+
/**
961+
* Make a plugin to gain access to the {@link ClusterService} instance.
962+
*/
963+
public static class BogusWriteLoadCollectorPlugin extends Plugin implements ClusterPlugin {
964+
965+
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
966+
967+
@Override
968+
public Collection<?> createComponents(PluginServices services) {
969+
clusterService.set(services.clusterService());
970+
return List.of();
971+
}
972+
973+
public ClusterService getClusterService() {
974+
return clusterService.get();
975+
}
976+
}
878977
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#
2+
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
# or more contributor license agreements. Licensed under the "Elastic License
4+
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
# Public License v 1"; you may not use this file except in compliance with, at
6+
# your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
# License v3.0 only", or the "Server Side Public License, v 1".
8+
#
9+
10+
org.elasticsearch.index.shard.IndexShardIT$BogusWriteLoadCollector

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ static TransportVersion def(int id) {
326326
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
327327
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
328328
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
329+
public static final TransportVersion NODE_WRITE_LOAD_IN_CLUSTER_INFO = def(9_113_0_00);
329330

330331
/*
331332
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5858
final Map<NodeAndShard, String> dataPath;
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
6060
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
61+
final Map<String, NodeWriteLoad> nodeWriteLoads;
6162

6263
protected ClusterInfo() {
63-
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
64+
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
6465
}
6566

6667
/**
@@ -73,6 +74,7 @@ protected ClusterInfo() {
7374
* @param dataPath the shard routing to datapath mapping
7475
* @param reservedSpace reserved space per shard broken down by node and data path
7576
* @param estimatedHeapUsages estimated heap usage broken down by node
77+
* @param nodeWriteLoads estimated node-level write load broken down by node
7678
* @see #shardIdentifierFromRouting
7779
*/
7880
public ClusterInfo(
@@ -82,7 +84,8 @@ public ClusterInfo(
8284
Map<ShardId, Long> shardDataSetSizes,
8385
Map<NodeAndShard, String> dataPath,
8486
Map<NodeAndPath, ReservedSpace> reservedSpace,
85-
Map<String, EstimatedHeapUsage> estimatedHeapUsages
87+
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
88+
Map<String, NodeWriteLoad> nodeWriteLoads
8689
) {
8790
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
8891
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@@ -91,6 +94,7 @@ public ClusterInfo(
9194
this.dataPath = Map.copyOf(dataPath);
9295
this.reservedSpace = Map.copyOf(reservedSpace);
9396
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
97+
this.nodeWriteLoads = Map.copyOf(nodeWriteLoads);
9498
}
9599

96100
public ClusterInfo(StreamInput in) throws IOException {
@@ -107,6 +111,11 @@ public ClusterInfo(StreamInput in) throws IOException {
107111
} else {
108112
this.estimatedHeapUsages = Map.of();
109113
}
114+
if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) {
115+
this.nodeWriteLoads = in.readImmutableMap(NodeWriteLoad::new);
116+
} else {
117+
this.nodeWriteLoads = Map.of();
118+
}
110119
}
111120

112121
@Override
@@ -124,6 +133,9 @@ public void writeTo(StreamOutput out) throws IOException {
124133
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
125134
out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable);
126135
}
136+
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) {
137+
out.writeMap(this.nodeWriteLoads, StreamOutput::writeWriteable);
138+
}
127139
}
128140

129141
/**
@@ -204,8 +216,8 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
204216
return builder.endObject(); // NodeAndPath
205217
}),
206218
endArray() // end "reserved_sizes"
207-
// NOTE: We don't serialize estimatedHeapUsages at this stage, to avoid
208-
// committing to API payloads until the feature is settled
219+
// NOTE: We don't serialize estimatedHeapUsages/nodeWriteLoads at this stage, to avoid
220+
// committing to API payloads until the features are settled
209221
);
210222
}
211223

@@ -220,6 +232,13 @@ public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() {
220232
return estimatedHeapUsages;
221233
}
222234

235+
/**
236+
* Returns a map containing the node-level write load estimate for each node by node ID.
237+
*/
238+
public Map<String, NodeWriteLoad> getNodeWriteLoads() {
239+
return nodeWriteLoads;
240+
}
241+
223242
/**
224243
* Returns a node id to disk usage mapping for the path that has the least available space on the node.
225244
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.
@@ -311,12 +330,21 @@ public boolean equals(Object o) {
311330
&& shardSizes.equals(that.shardSizes)
312331
&& shardDataSetSizes.equals(that.shardDataSetSizes)
313332
&& dataPath.equals(that.dataPath)
314-
&& reservedSpace.equals(that.reservedSpace);
333+
&& reservedSpace.equals(that.reservedSpace)
334+
&& nodeWriteLoads.equals(that.nodeWriteLoads);
315335
}
316336

317337
@Override
318338
public int hashCode() {
319-
return Objects.hash(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, shardDataSetSizes, dataPath, reservedSpace);
339+
return Objects.hash(
340+
leastAvailableSpaceUsage,
341+
mostAvailableSpaceUsage,
342+
shardSizes,
343+
shardDataSetSizes,
344+
dataPath,
345+
reservedSpace,
346+
nodeWriteLoads
347+
);
320348
}
321349

322350
@Override

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ClusterInfoSimulator {
3434
private final Map<ShardId, Long> shardDataSetSizes;
3535
private final Map<NodeAndShard, String> dataPath;
3636
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
37+
private final Map<String, NodeWriteLoad> nodeWriteLoads;
3738

3839
public ClusterInfoSimulator(RoutingAllocation allocation) {
3940
this.allocation = allocation;
@@ -43,6 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
4344
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
4445
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4546
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
47+
this.nodeWriteLoads = allocation.clusterInfo().getNodeWriteLoads();
4648
}
4749

4850
/**
@@ -156,7 +158,8 @@ public ClusterInfo getClusterInfo() {
156158
shardDataSetSizes,
157159
dataPath,
158160
Map.of(),
159-
estimatedHeapUsages
161+
estimatedHeapUsages,
162+
nodeWriteLoads
160163
);
161164
}
162165
}

0 commit comments

Comments
 (0)