Skip to content

Commit 9c7e04c

Browse files
change NodeLoad data structure
1 parent ac350d9 commit 9c7e04c

File tree

12 files changed

+216
-152
lines changed

12 files changed

+216
-152
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
import org.elasticsearch.cluster.EstimatedHeapUsage;
2525
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
2626
import org.elasticsearch.cluster.InternalClusterInfoService;
27-
import org.elasticsearch.cluster.NodeWriteLoad;
28-
import org.elasticsearch.cluster.WriteLoadCollector;
27+
import org.elasticsearch.cluster.NodeExecutionLoad;
28+
import org.elasticsearch.cluster.NodeUsageLoadCollector;
2929
import org.elasticsearch.cluster.metadata.IndexMetadata;
3030
import org.elasticsearch.cluster.node.DiscoveryNode;
3131
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -78,6 +78,7 @@
7878
import org.elasticsearch.test.ESSingleNodeTestCase;
7979
import org.elasticsearch.test.IndexSettingsModule;
8080
import org.elasticsearch.test.InternalSettingsPlugin;
81+
import org.elasticsearch.threadpool.ThreadPool;
8182
import org.elasticsearch.xcontent.XContentType;
8283
import org.junit.Assert;
8384

@@ -90,6 +91,7 @@
9091
import java.util.Arrays;
9192
import java.util.Collection;
9293
import java.util.Collections;
94+
import java.util.HashMap;
9395
import java.util.List;
9496
import java.util.Locale;
9597
import java.util.Map;
@@ -131,7 +133,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
131133

132134
@Override
133135
protected Collection<Class<? extends Plugin>> getPlugins() {
134-
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusWriteLoadCollectorPlugin.class);
136+
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusNodeUsageLoadCollectorPlugin.class);
135137
}
136138

137139
public void testLockTryingToDelete() throws Exception {
@@ -305,11 +307,11 @@ public void testHeapUsageEstimateIsPresent() {
305307
public void testNodeWriteLoadsArePresent() {
306308
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
307309
ClusterInfoServiceUtils.refresh(clusterInfoService);
308-
Map<String, NodeWriteLoad> nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads();
309-
assertNotNull(nodeWriteLoads);
310+
Map<String, NodeExecutionLoad> nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeExecutionStats();
311+
assertNotNull(nodeThreadPoolStats);
310312
/** Not collecting stats yet because allocation write load stats collection is disabled by default.
311313
* see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */
312-
assertTrue(nodeWriteLoads.isEmpty());
314+
assertTrue(nodeThreadPoolStats.isEmpty());
313315

314316
// Enable collection for node write loads.
315317
updateClusterSettings(
@@ -323,18 +325,21 @@ public void testNodeWriteLoadsArePresent() {
323325
try {
324326
// Force a ClusterInfo refresh to run collection of the node write loads.
325327
ClusterInfoServiceUtils.refresh(clusterInfoService);
326-
nodeWriteLoads = clusterInfoService.getClusterInfo().getNodeWriteLoads();
328+
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeExecutionStats();
327329

328-
/** Verify that each node has a write load reported. The test {@link BogusWriteLoadCollector} generates random load values */
330+
/** Verify that each node has a write load reported. The test {@link BogusNodeUsageLoadCollector} generates random load values */
329331
ClusterState state = getInstanceFromNode(ClusterService.class).state();
330-
assertEquals(state.nodes().size(), nodeWriteLoads.size());
332+
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
331333
for (DiscoveryNode node : state.nodes()) {
332-
assertTrue(nodeWriteLoads.containsKey(node.getId()));
333-
NodeWriteLoad nodeWriteLoad = nodeWriteLoads.get(node.getId());
334-
assertThat(nodeWriteLoad.nodeId(), equalTo(node.getId()));
335-
assertThat(nodeWriteLoad.totalWriteThreadPoolThreads(), greaterThanOrEqualTo(0));
336-
assertThat(nodeWriteLoad.averageWriteThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
337-
assertThat(nodeWriteLoad.averageWriteThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
334+
assertTrue(nodeThreadPoolStats.containsKey(node.getId()));
335+
NodeExecutionLoad nodeExecutionLoad = nodeThreadPoolStats.get(node.getId());
336+
assertThat(nodeExecutionLoad.nodeId(), equalTo(node.getId()));
337+
NodeExecutionLoad.ThreadPoolUsageStats writeThreadPoolStats = nodeExecutionLoad.threadPoolUsageStatsMap()
338+
.get(ThreadPool.Names.WRITE);
339+
assertNotNull(writeThreadPoolStats);
340+
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
341+
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
342+
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
338343
}
339344
} finally {
340345
updateClusterSettings(
@@ -925,42 +930,48 @@ public ClusterService getClusterService() {
925930
}
926931

927932
/**
928-
* A simple {@link WriteLoadCollector} implementation that creates and returns random {@link NodeWriteLoad} for each node in the
933+
* A simple {@link NodeUsageLoadCollector} implementation that creates and returns random {@link NodeExecutionLoad} for each node in the
929934
* cluster.
930935
* <p>
931936
* Note: there's an 'org.elasticsearch.cluster.WriteLoadCollector' file that declares this implementation so that the plugin system can
932937
* pick it up and use it for the test set-up.
933938
*/
934-
public static class BogusWriteLoadCollector implements WriteLoadCollector {
939+
public static class BogusNodeUsageLoadCollector implements NodeUsageLoadCollector {
935940

936-
private final BogusWriteLoadCollectorPlugin plugin;
941+
private final BogusNodeUsageLoadCollectorPlugin plugin;
937942

938-
public BogusWriteLoadCollector(BogusWriteLoadCollectorPlugin plugin) {
943+
public BogusNodeUsageLoadCollector(BogusNodeUsageLoadCollectorPlugin plugin) {
939944
this.plugin = plugin;
940945
}
941946

942947
@Override
943-
public void collectWriteLoads(ActionListener<Map<String, NodeWriteLoad>> listener) {
948+
public void collectUsageStats(ActionListener<Map<String, NodeExecutionLoad>> listener) {
944949
ActionListener.completeWith(
945950
listener,
946951
() -> plugin.getClusterService()
947952
.state()
948953
.nodes()
949954
.stream()
950-
.collect(
951-
Collectors.toUnmodifiableMap(
952-
DiscoveryNode::getId,
953-
node -> new NodeWriteLoad(node.getId(), randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeLong())
954-
)
955-
)
955+
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeLoad(node.getId())))
956+
);
957+
}
958+
959+
private NodeExecutionLoad makeRandomNodeLoad(String nodeId) {
960+
NodeExecutionLoad.ThreadPoolUsageStats writeThreadPoolStats = new NodeExecutionLoad.ThreadPoolUsageStats(
961+
randomNonNegativeInt(),
962+
randomFloat(),
963+
randomNonNegativeLong()
956964
);
965+
Map<String, NodeExecutionLoad.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
966+
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
967+
return new NodeExecutionLoad(nodeId, statsForThreadPools);
957968
}
958969
}
959970

960971
/**
961972
* Make a plugin to gain access to the {@link ClusterService} instance.
962973
*/
963-
public static class BogusWriteLoadCollectorPlugin extends Plugin implements ClusterPlugin {
974+
public static class BogusNodeUsageLoadCollectorPlugin extends Plugin implements ClusterPlugin {
964975

965976
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
966977

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
# License v3.0 only", or the "Server Side Public License, v 1".
88
#
99

10-
org.elasticsearch.index.shard.IndexShardIT$BogusWriteLoadCollector
10+
org.elasticsearch.index.shard.IndexShardIT$BogusNodeUsageLoadCollector

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5858
final Map<NodeAndShard, String> dataPath;
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
6060
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
61-
final Map<String, NodeWriteLoad> nodeWriteLoads;
61+
final Map<String, NodeExecutionLoad> nodeExecutionStats;
6262

6363
protected ClusterInfo() {
6464
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
@@ -74,7 +74,7 @@ protected ClusterInfo() {
7474
* @param dataPath the shard routing to datapath mapping
7575
* @param reservedSpace reserved space per shard broken down by node and data path
7676
* @param estimatedHeapUsages estimated heap usage broken down by node
77-
* @param nodeWriteLoads estimated node-level write load broken down by node
77+
* @param nodeExecutionStats estimated node-level execution load broken down by node
7878
* @see #shardIdentifierFromRouting
7979
*/
8080
public ClusterInfo(
@@ -85,7 +85,7 @@ public ClusterInfo(
8585
Map<NodeAndShard, String> dataPath,
8686
Map<NodeAndPath, ReservedSpace> reservedSpace,
8787
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
88-
Map<String, NodeWriteLoad> nodeWriteLoads
88+
Map<String, NodeExecutionLoad> nodeExecutionStats
8989
) {
9090
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
9191
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@@ -94,7 +94,7 @@ public ClusterInfo(
9494
this.dataPath = Map.copyOf(dataPath);
9595
this.reservedSpace = Map.copyOf(reservedSpace);
9696
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
97-
this.nodeWriteLoads = Map.copyOf(nodeWriteLoads);
97+
this.nodeExecutionStats = Map.copyOf(nodeExecutionStats);
9898
}
9999

100100
public ClusterInfo(StreamInput in) throws IOException {
@@ -112,9 +112,9 @@ public ClusterInfo(StreamInput in) throws IOException {
112112
this.estimatedHeapUsages = Map.of();
113113
}
114114
if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) {
115-
this.nodeWriteLoads = in.readImmutableMap(NodeWriteLoad::new);
115+
this.nodeExecutionStats = in.readImmutableMap(NodeExecutionLoad::new);
116116
} else {
117-
this.nodeWriteLoads = Map.of();
117+
this.nodeExecutionStats = Map.of();
118118
}
119119
}
120120

@@ -134,7 +134,7 @@ public void writeTo(StreamOutput out) throws IOException {
134134
out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable);
135135
}
136136
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_WRITE_LOAD_IN_CLUSTER_INFO)) {
137-
out.writeMap(this.nodeWriteLoads, StreamOutput::writeWriteable);
137+
out.writeMap(this.nodeExecutionStats, StreamOutput::writeWriteable);
138138
}
139139
}
140140

@@ -216,7 +216,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
216216
return builder.endObject(); // NodeAndPath
217217
}),
218218
endArray() // end "reserved_sizes"
219-
// NOTE: We don't serialize estimatedHeapUsages/nodeWriteLoads at this stage, to avoid
219+
// NOTE: We don't serialize estimatedHeapUsages/nodeExecutionStats at this stage, to avoid
220220
// committing to API payloads until the features are settled
221221
);
222222
}
@@ -235,8 +235,8 @@ public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() {
235235
/**
236236
* Returns a map containing the node-level write load estimate for each node by node ID.
237237
*/
238-
public Map<String, NodeWriteLoad> getNodeWriteLoads() {
239-
return nodeWriteLoads;
238+
public Map<String, NodeExecutionLoad> getNodeExecutionStats() {
239+
return nodeExecutionStats;
240240
}
241241

242242
/**
@@ -331,7 +331,7 @@ public boolean equals(Object o) {
331331
&& shardDataSetSizes.equals(that.shardDataSetSizes)
332332
&& dataPath.equals(that.dataPath)
333333
&& reservedSpace.equals(that.reservedSpace)
334-
&& nodeWriteLoads.equals(that.nodeWriteLoads);
334+
&& nodeExecutionStats.equals(that.nodeExecutionStats);
335335
}
336336

337337
@Override
@@ -343,7 +343,7 @@ public int hashCode() {
343343
shardDataSetSizes,
344344
dataPath,
345345
reservedSpace,
346-
nodeWriteLoads
346+
nodeExecutionStats
347347
);
348348
}
349349

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class ClusterInfoSimulator {
3434
private final Map<ShardId, Long> shardDataSetSizes;
3535
private final Map<NodeAndShard, String> dataPath;
3636
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
37-
private final Map<String, NodeWriteLoad> nodeWriteLoads;
37+
private final Map<String, NodeExecutionLoad> nodeExecutionStats;
3838

3939
public ClusterInfoSimulator(RoutingAllocation allocation) {
4040
this.allocation = allocation;
@@ -44,7 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
4444
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
4545
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4646
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
47-
this.nodeWriteLoads = allocation.clusterInfo().getNodeWriteLoads();
47+
this.nodeExecutionStats = allocation.clusterInfo().getNodeExecutionStats();
4848
}
4949

5050
/**
@@ -159,7 +159,7 @@ public ClusterInfo getClusterInfo() {
159159
dataPath,
160160
Map.of(),
161161
estimatedHeapUsages,
162-
nodeWriteLoads
162+
nodeExecutionStats
163163
);
164164
}
165165
}

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
102102
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
103103
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
104104
private volatile Map<String, Long> estimatedHeapUsagePerNode;
105-
private volatile Map<String, NodeWriteLoad> nodeWriteLoadPerNode;
105+
private volatile Map<String, NodeExecutionLoad> nodeExecutionLoadPerNode;
106106
private volatile IndicesStatsSummary indicesStatsSummary;
107107

108108
private final ThreadPool threadPool;
@@ -112,7 +112,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
112112
private final Object mutex = new Object();
113113
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();
114114
private final EstimatedHeapUsageCollector estimatedHeapUsageCollector;
115-
private final WriteLoadCollector writeLoadCollector;
115+
private final NodeUsageLoadCollector nodeUsageLoadCollector;
116116

117117
private AsyncRefresh currentRefresh;
118118
private RefreshScheduler refreshScheduler;
@@ -124,18 +124,18 @@ public InternalClusterInfoService(
124124
ThreadPool threadPool,
125125
Client client,
126126
EstimatedHeapUsageCollector estimatedHeapUsageCollector,
127-
WriteLoadCollector writeLoadCollector
127+
NodeUsageLoadCollector nodeUsageLoadCollector
128128
) {
129129
this.leastAvailableSpaceUsages = Map.of();
130130
this.mostAvailableSpaceUsages = Map.of();
131131
this.maxHeapPerNode = Map.of();
132132
this.estimatedHeapUsagePerNode = Map.of();
133-
this.nodeWriteLoadPerNode = Map.of();
133+
this.nodeExecutionLoadPerNode = Map.of();
134134
this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
135135
this.threadPool = threadPool;
136136
this.client = client;
137137
this.estimatedHeapUsageCollector = estimatedHeapUsageCollector;
138-
this.writeLoadCollector = writeLoadCollector;
138+
this.nodeUsageLoadCollector = nodeUsageLoadCollector;
139139
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
140140
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
141141
this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
@@ -264,21 +264,21 @@ private void maybeFetchNodesWriteLoadStats(WriteLoadDeciderStatus writeLoadConst
264264
}
265265
} else {
266266
logger.trace("skipping collecting shard/node write load estimates from cluster, feature currently disabled");
267-
nodeWriteLoadPerNode = Map.of();
267+
nodeExecutionLoadPerNode = Map.of();
268268
}
269269
}
270270

271271
private void fetchNodesWriteLoadStats() {
272-
writeLoadCollector.collectWriteLoads(ActionListener.releaseAfter(new ActionListener<>() {
272+
nodeUsageLoadCollector.collectUsageStats(ActionListener.releaseAfter(new ActionListener<>() {
273273
@Override
274-
public void onResponse(Map<String, NodeWriteLoad> writeLoads) {
275-
nodeWriteLoadPerNode = writeLoads;
274+
public void onResponse(Map<String, NodeExecutionLoad> writeLoads) {
275+
nodeExecutionLoadPerNode = writeLoads;
276276
}
277277

278278
@Override
279279
public void onFailure(Exception e) {
280280
logger.warn("failed to fetch write load estimates for nodes", e);
281-
nodeWriteLoadPerNode = Map.of();
281+
nodeExecutionLoadPerNode = Map.of();
282282
}
283283
}, fetchRefs.acquire()));
284284
}
@@ -527,8 +527,8 @@ public ClusterInfo getClusterInfo() {
527527
estimatedHeapUsages.put(nodeId, new EstimatedHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
528528
}
529529
});
530-
final Map<String, NodeWriteLoad> nodeWriteLoads = new HashMap<>();
531-
nodeWriteLoadPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeWriteLoads.put(nodeId, nodeWriteLoad); });
530+
final Map<String, NodeExecutionLoad> nodeWriteLoads = new HashMap<>();
531+
nodeExecutionLoadPerNode.forEach((nodeId, nodeWriteLoad) -> { nodeWriteLoads.put(nodeId, nodeWriteLoad); });
532532
return new ClusterInfo(
533533
leastAvailableSpaceUsages,
534534
mostAvailableSpaceUsages,

0 commit comments

Comments
 (0)