Skip to content

Commit 6bf7c46

Browse files
committed
change write-thread-pool-stats to list of samples
1 parent 8171d5b commit 6bf7c46

File tree

13 files changed

+151
-228
lines changed

13 files changed

+151
-228
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 31 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.elasticsearch.cluster.EstimatedHeapUsage;
2525
import org.elasticsearch.cluster.EstimatedHeapUsageCollector;
2626
import org.elasticsearch.cluster.InternalClusterInfoService;
27-
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
27+
import org.elasticsearch.cluster.ThreadPoolUsage;
2828
import org.elasticsearch.cluster.ThreadPoolUsageCollector;
2929
import org.elasticsearch.cluster.metadata.IndexMetadata;
3030
import org.elasticsearch.cluster.metadata.ProjectId;
@@ -79,7 +79,6 @@
7979
import org.elasticsearch.test.ESSingleNodeTestCase;
8080
import org.elasticsearch.test.IndexSettingsModule;
8181
import org.elasticsearch.test.InternalSettingsPlugin;
82-
import org.elasticsearch.threadpool.ThreadPool;
8382
import org.elasticsearch.xcontent.XContentType;
8483
import org.junit.Assert;
8584

@@ -92,7 +91,6 @@
9291
import java.util.Arrays;
9392
import java.util.Collection;
9493
import java.util.Collections;
95-
import java.util.HashMap;
9694
import java.util.List;
9795
import java.util.Locale;
9896
import java.util.Map;
@@ -135,11 +133,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
135133

136134
@Override
137135
protected Collection<Class<? extends Plugin>> getPlugins() {
138-
return pluginList(
139-
InternalSettingsPlugin.class,
140-
BogusEstimatedHeapUsagePlugin.class,
141-
BogusNodeUsageStatsForThreadPoolsCollectorPlugin.class
142-
);
136+
return pluginList(InternalSettingsPlugin.class, BogusEstimatedHeapUsagePlugin.class, BogusNodeThreadPoolUsageCollectorPlugin.class);
143137
}
144138

145139
public void testLockTryingToDelete() throws Exception {
@@ -313,12 +307,11 @@ public void testHeapUsageEstimateIsPresent() {
313307
public void testNodeWriteLoadsArePresent() {
314308
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
315309
ClusterInfoServiceUtils.refresh(clusterInfoService);
316-
Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolStats = clusterInfoService.getClusterInfo()
317-
.getNodeUsageStatsForThreadPools();
318-
assertNotNull(nodeThreadPoolStats);
310+
var writeThreadPoolUsages = clusterInfoService.getClusterInfo().getWriteThreadPoolUsages();
311+
assertNotNull(writeThreadPoolUsages);
319312
/** Not collecting stats yet because allocation write load stats collection is disabled by default.
320313
* see {@link WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING} */
321-
assertTrue(nodeThreadPoolStats.isEmpty());
314+
assertTrue(writeThreadPoolUsages.isEmpty());
322315

323316
// Enable collection for node write loads.
324317
updateClusterSettings(
@@ -332,23 +325,20 @@ public void testNodeWriteLoadsArePresent() {
332325
try {
333326
// Force a ClusterInfo refresh to run collection of the node thread pool usage stats.
334327
ClusterInfoServiceUtils.refresh(clusterInfoService);
335-
nodeThreadPoolStats = clusterInfoService.getClusterInfo().getNodeUsageStatsForThreadPools();
328+
writeThreadPoolUsages = clusterInfoService.getClusterInfo().getWriteThreadPoolUsages();
336329

337-
/** Verify that each node has usage stats reported. The test {@link BogusThreadPoolUsageCollector} implementation
330+
/** Verify that each node has usage stats reported. The test {@link BogusNodeThreadPoolUsageCollector} implementation
338331
* generates random usage values */
339332
ClusterState state = getInstanceFromNode(ClusterService.class).state();
340-
assertEquals(state.nodes().size(), nodeThreadPoolStats.size());
333+
assertEquals(state.nodes().size(), writeThreadPoolUsages.size());
341334
for (DiscoveryNode node : state.nodes()) {
342-
assertTrue(nodeThreadPoolStats.containsKey(node.getId()));
343-
NodeUsageStatsForThreadPools nodeUsageStatsForThreadPools = nodeThreadPoolStats.get(node.getId());
344-
assertThat(nodeUsageStatsForThreadPools.nodeId(), equalTo(node.getId()));
345-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = nodeUsageStatsForThreadPools
346-
.threadPoolUsageStatsMap()
347-
.get(ThreadPool.Names.WRITE);
348-
assertNotNull(writeThreadPoolStats);
349-
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
350-
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
351-
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
335+
assertTrue(writeThreadPoolUsages.containsKey(node.getId()));
336+
var sample = writeThreadPoolUsages.get(node.getId()).samples().getFirst();
337+
assertNotNull(sample);
338+
assertThat(sample.timeNano(), greaterThanOrEqualTo(0L));
339+
assertThat(sample.totalThreads(), greaterThanOrEqualTo(0));
340+
assertThat(sample.averageUtilization(), greaterThanOrEqualTo(0.0f));
341+
assertThat(sample.averageQueueLatency(), greaterThanOrEqualTo(0L));
352342
}
353343
} finally {
354344
updateClusterSettings(
@@ -996,47 +986,45 @@ public ClusterService getClusterService() {
996986

997987
/**
998988
* A simple {@link ThreadPoolUsageCollector} implementation that creates and returns random
999-
* {@link NodeUsageStatsForThreadPools} for each node in the cluster.
989+
* {@link ThreadPoolUsage} for each node in the cluster.
1000990
* <p>
1001991
* Note: there's an 'org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector' file that declares this implementation so that the
1002992
* plugin system can pick it up and use it for the test set-up.
1003993
*/
1004-
public static class BogusThreadPoolUsageCollector implements ThreadPoolUsageCollector {
994+
public static class BogusNodeThreadPoolUsageCollector implements ThreadPoolUsageCollector {
1005995

1006-
private final BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin;
996+
private final BogusNodeThreadPoolUsageCollectorPlugin plugin;
1007997

1008-
public BogusThreadPoolUsageCollector(BogusNodeUsageStatsForThreadPoolsCollectorPlugin plugin) {
998+
public BogusNodeThreadPoolUsageCollector(BogusNodeThreadPoolUsageCollectorPlugin plugin) {
1009999
this.plugin = plugin;
10101000
}
10111001

10121002
@Override
1013-
public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThreadPools>> listener) {
1003+
public void collectUsageStats(String threadPoolName, ActionListener<Map<String, ThreadPoolUsage>> listener) {
10141004
ActionListener.completeWith(
10151005
listener,
10161006
() -> plugin.getClusterService()
10171007
.state()
10181008
.nodes()
10191009
.stream()
1020-
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> makeRandomNodeUsageStats(node.getId())))
1021-
);
1022-
}
1023-
1024-
private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) {
1025-
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
1026-
randomNonNegativeInt(),
1027-
randomFloat(),
1028-
randomNonNegativeLong()
1010+
.collect(
1011+
Collectors.toUnmodifiableMap(
1012+
DiscoveryNode::getId,
1013+
node -> new ThreadPoolUsage(
1014+
List.of(
1015+
new ThreadPoolUsage.Sample(randomNonNegativeLong(), randomInt(), randomFloat(), randomNonNegativeLong())
1016+
)
1017+
)
1018+
)
1019+
)
10291020
);
1030-
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
1031-
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
1032-
return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools);
10331021
}
10341022
}
10351023

10361024
/**
10371025
* Make a plugin to gain access to the {@link ClusterService} instance.
10381026
*/
1039-
public static class BogusNodeUsageStatsForThreadPoolsCollectorPlugin extends Plugin implements ClusterPlugin {
1027+
public static class BogusNodeThreadPoolUsageCollectorPlugin extends Plugin implements ClusterPlugin {
10401028

10411029
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
10421030

server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ThreadPoolUsageCollector

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@
77
# License v3.0 only", or the "Server Side Public License, v 1".
88
#
99

10-
org.elasticsearch.index.shard.IndexShardIT$BogusThreadPoolUsageCollector
10+
org.elasticsearch.index.shard.IndexShardIT$BogusNodeThreadPoolUsageCollector

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5858
final Map<NodeAndShard, String> dataPath;
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
6060
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
61-
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
61+
final Map<String, ThreadPoolUsage> writeThreadPoolUsages;
6262
final Map<ShardId, Double> shardWriteLoads;
6363

6464
protected ClusterInfo() {
@@ -75,7 +75,7 @@ protected ClusterInfo() {
7575
* @param dataPath the shard routing to datapath mapping
7676
* @param reservedSpace reserved space per shard broken down by node and data path
7777
* @param estimatedHeapUsages estimated heap usage broken down by node
78-
* @param nodeUsageStatsForThreadPools node-level usage stats (operational load) broken down by node
78+
* @param writeThreadPoolUsages node-level usage stats (operational load) broken down by node
7979
* @see #shardIdentifierFromRouting
8080
*/
8181
public ClusterInfo(
@@ -86,7 +86,7 @@ public ClusterInfo(
8686
Map<NodeAndShard, String> dataPath,
8787
Map<NodeAndPath, ReservedSpace> reservedSpace,
8888
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
89-
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
89+
Map<String, ThreadPoolUsage> writeThreadPoolUsages,
9090
Map<ShardId, Double> shardWriteLoads
9191
) {
9292
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
@@ -96,7 +96,7 @@ public ClusterInfo(
9696
this.dataPath = Map.copyOf(dataPath);
9797
this.reservedSpace = Map.copyOf(reservedSpace);
9898
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
99-
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
99+
this.writeThreadPoolUsages = Map.copyOf(writeThreadPoolUsages);
100100
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
101101
}
102102

@@ -115,9 +115,9 @@ public ClusterInfo(StreamInput in) throws IOException {
115115
this.estimatedHeapUsages = Map.of();
116116
}
117117
if (in.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
118-
this.nodeUsageStatsForThreadPools = in.readImmutableMap(NodeUsageStatsForThreadPools::new);
118+
this.writeThreadPoolUsages = in.readImmutableMap(ThreadPoolUsage::fromStreamInput);
119119
} else {
120-
this.nodeUsageStatsForThreadPools = Map.of();
120+
this.writeThreadPoolUsages = Map.of();
121121
}
122122
if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
123123
this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble);
@@ -142,7 +142,7 @@ public void writeTo(StreamOutput out) throws IOException {
142142
out.writeMap(this.estimatedHeapUsages, StreamOutput::writeWriteable);
143143
}
144144
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
145-
out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable);
145+
out.writeMap(this.writeThreadPoolUsages, StreamOutput::writeWriteable);
146146
}
147147
if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
148148
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble);
@@ -244,10 +244,10 @@ public Map<String, EstimatedHeapUsage> getEstimatedHeapUsages() {
244244
}
245245

246246
/**
247-
* Returns a map containing thread pool usage stats for each node, keyed by node ID.
247+
* Returns a map containing write thread pool usage stats for each node, keyed by node ID.
248248
*/
249-
public Map<String, NodeUsageStatsForThreadPools> getNodeUsageStatsForThreadPools() {
250-
return nodeUsageStatsForThreadPools;
249+
public Map<String, ThreadPoolUsage> getWriteThreadPoolUsages() {
250+
return writeThreadPoolUsages;
251251
}
252252

253253
/**
@@ -353,7 +353,7 @@ public boolean equals(Object o) {
353353
&& dataPath.equals(that.dataPath)
354354
&& reservedSpace.equals(that.reservedSpace)
355355
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
356-
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
356+
&& writeThreadPoolUsages.equals(that.writeThreadPoolUsages)
357357
&& shardWriteLoads.equals(that.shardWriteLoads);
358358
}
359359

@@ -367,7 +367,7 @@ public int hashCode() {
367367
dataPath,
368368
reservedSpace,
369369
estimatedHeapUsages,
370-
nodeUsageStatsForThreadPools,
370+
writeThreadPoolUsages,
371371
shardWriteLoads
372372
);
373373
}
@@ -490,7 +490,7 @@ public static class Builder {
490490
private Map<NodeAndShard, String> dataPath = Map.of();
491491
private Map<NodeAndPath, ReservedSpace> reservedSpace = Map.of();
492492
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
493-
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
493+
private Map<String, ThreadPoolUsage> writeThreadPoolUsages = Map.of();
494494
private Map<ShardId, Double> shardWriteLoads = Map.of();
495495

496496
public ClusterInfo build() {
@@ -502,7 +502,7 @@ public ClusterInfo build() {
502502
dataPath,
503503
reservedSpace,
504504
estimatedHeapUsages,
505-
nodeUsageStatsForThreadPools,
505+
writeThreadPoolUsages,
506506
shardWriteLoads
507507
);
508508
}
@@ -542,8 +542,8 @@ public Builder estimatedHeapUsages(Map<String, EstimatedHeapUsage> estimatedHeap
542542
return this;
543543
}
544544

545-
public Builder nodeUsageStatsForThreadPools(Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools) {
546-
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
545+
public Builder writeThreadPoolUsages(Map<String, ThreadPoolUsage> writeThreadPoolUsages) {
546+
this.writeThreadPoolUsages = writeThreadPoolUsages;
547547
return this;
548548
}
549549

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class ClusterInfoSimulator {
3434
private final Map<ShardId, Long> shardDataSetSizes;
3535
private final Map<NodeAndShard, String> dataPath;
3636
private final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
37-
private final Map<String, NodeUsageStatsForThreadPools> nodeThreadPoolUsageStats;
37+
private final Map<String, ThreadPoolUsage> writeThreadPoolUsages;
3838

3939
public ClusterInfoSimulator(RoutingAllocation allocation) {
4040
this.allocation = allocation;
@@ -44,7 +44,7 @@ public ClusterInfoSimulator(RoutingAllocation allocation) {
4444
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
4545
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
4646
this.estimatedHeapUsages = allocation.clusterInfo().getEstimatedHeapUsages();
47-
this.nodeThreadPoolUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
47+
this.writeThreadPoolUsages = allocation.clusterInfo().getWriteThreadPoolUsages();
4848
}
4949

5050
/**
@@ -159,7 +159,7 @@ public ClusterInfo getClusterInfo() {
159159
dataPath,
160160
Map.of(),
161161
estimatedHeapUsages,
162-
nodeThreadPoolUsageStats,
162+
writeThreadPoolUsages,
163163
allocation.clusterInfo().getShardWriteLoads()
164164
);
165165
}

0 commit comments

Comments
 (0)