diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 58440c65f8d4d..4432363d847a1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -88,6 +88,7 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -124,11 +125,13 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.not; public class IndexShardIT extends ESSingleNodeTestCase { private static final Logger logger = LogManager.getLogger(IndexShardIT.class); @@ -346,9 +349,8 @@ public void testNodeWriteLoadsArePresent() { .threadPoolUsageStatsMap() .get(ThreadPool.Names.WRITE); assertNotNull(writeThreadPoolStats); - assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0)); - assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f)); - assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L)); + assertThat(writeThreadPoolStats.numberOfThreads(), greaterThanOrEqualTo(0)); + assertThat(writeThreadPoolStats.utilizationSamples(), not(empty())); } } finally { updateClusterSettings( @@ -1022,11 +1024,16 @@ public void collectUsageStats(ActionListener utilizationSamples = IntStream.range(0, randomIntBetween(1, 10)) + .mapToObj( + i -> new NodeUsageStatsForThreadPools.UtilizationSample( + randomInstantBetween(Instant.MIN, Instant.MAX), + randomFloatBetween(0.0f, 1.0f, true) + ) + ) + .toList(); + final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(randomIntBetween(1, 1_000), utilizationSamples); Map statsForThreadPools = new HashMap<>(); statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats); return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index e57cb485361b6..74d4ced9aacb6 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -355,6 +355,7 @@ static TransportVersion def(int id) { public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00); public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00); public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00); + public static final TransportVersion THREAD_POOL_UTILIZATION_MULTI_SAMPLE_CLUSTER_INFO = def(9_134_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java index 5e84f29af8412..2dcfc92ff9813 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -9,13 +9,15 @@ package org.elasticsearch.cluster; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import java.io.IOException; +import java.time.Instant; +import java.util.List; import java.util.Map; -import java.util.Objects; /** * Record of a node's thread pool usage stats (operation load). Maps thread pool stats by thread pool name. @@ -27,7 +29,7 @@ public record NodeUsageStatsForThreadPools(String nodeId, Map threadPoolUsageStatsMap) implements Writeable { public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { - this(in.readString(), in.readMap(ThreadPoolUsageStats::new)); + this(in.readString(), in.readMap(ThreadPoolUsageStats::readFrom)); } @Override @@ -36,86 +38,60 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(threadPoolUsageStatsMap, StreamOutput::writeWriteable); } - @Override - public int hashCode() { - return Objects.hash(nodeId, threadPoolUsageStatsMap); - } + /** + * One utilization sample + * + * @param instant The time we received the sample + * @param utilization The utilization value in the range [0.0, 1.0] + */ + public record UtilizationSample(Instant instant, float utilization) implements Writeable { - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o; - for (var entry : other.threadPoolUsageStatsMap.entrySet()) { - var loadStats = threadPoolUsageStatsMap.get(entry.getKey()); - if (loadStats == null || loadStats.equals(entry.getValue()) == false) { - return false; - } + public UtilizationSample(StreamInput in) throws IOException { + this(in.readInstant(), in.readFloat()); } - return true; - } - @Override - public String toString() { - StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=["); - for (var entry : threadPoolUsageStatsMap.entrySet()) { - builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}"); + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInstant(instant); + out.writeFloat(utilization); } - builder.append("]}"); - return builder.toString(); } /** * Record of usage stats for a thread pool. * - * @param totalThreadPoolThreads Total number of threads in the thread pool. - * @param averageThreadPoolUtilization Percent of thread pool threads that are in use, averaged over some period of time. - * @param averageThreadPoolQueueLatencyMillis How much time tasks spend in the thread pool queue. Zero if there is nothing being queued - * in the write thread pool. + * @param numberOfThreads Total number of threads in the thread pool. + * @param utilizationSamples The list of recent utilization samples */ - public record ThreadPoolUsageStats( - int totalThreadPoolThreads, - float averageThreadPoolUtilization, - long averageThreadPoolQueueLatencyMillis - ) implements Writeable { + public record ThreadPoolUsageStats(int numberOfThreads, List utilizationSamples) implements Writeable { - public ThreadPoolUsageStats(StreamInput in) throws IOException { - this(in.readVInt(), in.readFloat(), in.readVLong()); + public ThreadPoolUsageStats { + assert numberOfThreads > 0; + assert utilizationSamples.isEmpty() == false; } - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(this.totalThreadPoolThreads); - out.writeFloat(this.averageThreadPoolUtilization); - out.writeVLong(this.averageThreadPoolQueueLatencyMillis); - } - - @Override - public int hashCode() { - return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, averageThreadPoolQueueLatencyMillis); - } - - @Override - public String toString() { - return "[totalThreadPoolThreads=" - + totalThreadPoolThreads - + ", averageThreadPoolUtilization=" - + averageThreadPoolUtilization - + ", averageThreadPoolQueueLatencyMillis=" - + averageThreadPoolQueueLatencyMillis - + "]"; + public static ThreadPoolUsageStats readFrom(StreamInput in) throws IOException { + final int numberOfThreads = in.readVInt(); + final List utilizationSamples; + if (in.getTransportVersion().onOrAfter(TransportVersions.THREAD_POOL_UTILIZATION_MULTI_SAMPLE_CLUSTER_INFO)) { + utilizationSamples = in.readCollectionAsImmutableList(UtilizationSample::new); + } else { + utilizationSamples = List.of(new UtilizationSample(Instant.now(), in.readFloat())); + in.readVLong(); // Skip over the queue latency + } + return new ThreadPoolUsageStats(numberOfThreads, utilizationSamples); } @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ThreadPoolUsageStats other = (ThreadPoolUsageStats) o; - return totalThreadPoolThreads == other.totalThreadPoolThreads - && averageThreadPoolUtilization == other.averageThreadPoolUtilization - && averageThreadPoolQueueLatencyMillis == other.averageThreadPoolQueueLatencyMillis; + public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(this.numberOfThreads); + if (out.getTransportVersion().onOrAfter(TransportVersions.THREAD_POOL_UTILIZATION_MULTI_SAMPLE_CLUSTER_INFO)) { + out.writeCollection(utilizationSamples, StreamOutput::writeWriteable); + } else { + out.writeFloat(this.utilizationSamples.getLast().utilization()); + out.writeVLong(0L); // Dummy queue latency value + } } - } // ThreadPoolUsageStats } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 1a729a992583c..cc14915a30423 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -18,7 +18,9 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -86,13 +88,17 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo ) { final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap() .get(ThreadPool.Names.WRITE); - return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( - writeThreadPoolStats.totalThreadPoolThreads(), - (float) Math.max( - (writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())), - 0.0 - ), - writeThreadPoolStats.averageThreadPoolQueueLatencyMillis() + final float newWritePoolUtilization = (float) Math.max( + (writeThreadPoolStats.utilizationSamples().getLast().utilization() + (writeLoadDelta / writeThreadPoolStats.numberOfThreads())), + 0.0 ); + final List newUtilizationSamples = new ArrayList<>( + writeThreadPoolStats.utilizationSamples() + ); + final NodeUsageStatsForThreadPools.UtilizationSample previousUtilization = newUtilizationSamples.removeLast(); + newUtilizationSamples.add( + new NodeUsageStatsForThreadPools.UtilizationSample(previousUtilization.instant(), newWritePoolUtilization) + ); + return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(writeThreadPoolStats.numberOfThreads(), newUtilizationSamples); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index e0e749aaa2360..def72e1983941 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -15,8 +15,10 @@ import org.elasticsearch.test.AbstractWireSerializingTestCase; import org.elasticsearch.threadpool.ThreadPool; +import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.stream.IntStream; public class ClusterInfoTests extends AbstractWireSerializingTestCase { @@ -81,8 +83,14 @@ private static Map randomNodeUsageStatsFor String nodeIdKey = randomAlphaOfLength(32); NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolUsageStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(/* totalThreadPoolThreads= */ randomIntBetween(1, 16), - /* averageThreadPoolUtilization= */ randomFloat(), - /* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000) + IntStream.range(0, randomIntBetween(1, 10)) + .mapToObj( + ignored -> new NodeUsageStatsForThreadPools.UtilizationSample( + randomInstantBetween(Instant.MIN, Instant.MAX), + randomFloatBetween(0.0f, 1.0f, true) + ) + ) + .toList() ); Map usageStatsForThreadPools = new HashMap<>(); usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java index 59a37747ae2fd..3db17bf18f1bc 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; +import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -87,16 +88,16 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2)); final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); - final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads(); - final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads(); + final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.numberOfThreads(); + final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.numberOfThreads(); // Some node_0 utilization should have been moved to node_1 - if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) { + if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.utilizationSamples().getLast().utilization()) { // We don't return utilization less than zero because that makes no sense assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f)); } else { assertThat( - (double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization( + (double) originalNode0ThreadPoolStats.utilizationSamples().getLast().utilization() - getAverageWritePoolUtilization( shardMovementWriteLoadSimulator, "node_0" ), @@ -105,7 +106,9 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { } assertThat( (double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats - .averageThreadPoolUtilization(), + .utilizationSamples() + .getLast() + .utilization(), closeTo(expectedUtilisationIncreaseAtDestination, 0.001f) ); @@ -117,11 +120,11 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() { // The utilization numbers should return to their original values assertThat( getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), - equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization()) + equalTo(originalNode0ThreadPoolStats.utilizationSamples().getLast().utilization()) ); assertThat( getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"), - equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization()) + equalTo(originalNode1ThreadPoolStats.utilizationSamples().getLast().utilization()) ); } @@ -150,14 +153,18 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) { final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write"); - return node0WritePoolStats.averageThreadPoolUtilization(); + return node0WritePoolStats.utilizationSamples().getLast().utilization(); } private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() { return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( randomIntBetween(4, 16), - randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true), - randomLongBetween(0, 60_000) + List.of( + new NodeUsageStatsForThreadPools.UtilizationSample( + randomInstantBetween(Instant.MIN, Instant.MAX), + randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true) + ) + ) ); }