Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -124,11 +125,13 @@
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;

public class IndexShardIT extends ESSingleNodeTestCase {
private static final Logger logger = LogManager.getLogger(IndexShardIT.class);
Expand Down Expand Up @@ -346,9 +349,8 @@ public void testNodeWriteLoadsArePresent() {
.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
assertNotNull(writeThreadPoolStats);
assertThat(writeThreadPoolStats.totalThreadPoolThreads(), greaterThanOrEqualTo(0));
assertThat(writeThreadPoolStats.averageThreadPoolUtilization(), greaterThanOrEqualTo(0.0f));
assertThat(writeThreadPoolStats.averageThreadPoolQueueLatencyMillis(), greaterThanOrEqualTo(0L));
assertThat(writeThreadPoolStats.numberOfThreads(), greaterThanOrEqualTo(0));
assertThat(writeThreadPoolStats.utilizationSamples(), not(empty()));
}
} finally {
updateClusterSettings(
Expand Down Expand Up @@ -1022,11 +1024,16 @@ public void collectUsageStats(ActionListener<Map<String, NodeUsageStatsForThread
}

private NodeUsageStatsForThreadPools makeRandomNodeUsageStats(String nodeId) {
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
randomNonNegativeInt(),
randomFloat(),
randomNonNegativeLong()
);
final List<NodeUsageStatsForThreadPools.UtilizationSample> utilizationSamples = IntStream.range(0, randomIntBetween(1, 10))
.mapToObj(
i -> new NodeUsageStatsForThreadPools.UtilizationSample(
randomInstantBetween(Instant.MIN, Instant.MAX),
randomFloatBetween(0.0f, 1.0f, true)
)
)
.toList();
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats =
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(randomIntBetween(1, 1_000), utilizationSamples);
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> statsForThreadPools = new HashMap<>();
statsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolStats);
return new NodeUsageStatsForThreadPools(nodeId, statsForThreadPools);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ static TransportVersion def(int id) {
public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00);
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
public static final TransportVersion THREAD_POOL_UTILIZATION_MULTI_SAMPLE_CLUSTER_INFO = def(9_133_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

package org.elasticsearch.cluster;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Record of a node's thread pool usage stats (operation load). Maps thread pool stats by thread pool name.
Expand All @@ -27,7 +29,7 @@
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap) implements Writeable {

public NodeUsageStatsForThreadPools(StreamInput in) throws IOException {
this(in.readString(), in.readMap(ThreadPoolUsageStats::new));
this(in.readString(), in.readMap(ThreadPoolUsageStats::readFrom));
}

@Override
Expand All @@ -36,86 +38,60 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(threadPoolUsageStatsMap, StreamOutput::writeWriteable);
}

@Override
public int hashCode() {
return Objects.hash(nodeId, threadPoolUsageStatsMap);
}
/**
* One utilization sample
*
* @param instant The time we received the sample
* @param utilization The utilization value in the range [0.0, 1.0]
*/
public record UtilizationSample(Instant instant, float utilization) implements Writeable {

@Override
public boolean equals(Object o) {
Copy link
Contributor Author

@nicktindall nicktindall Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There were equals, hashCode and toString methods on these records, I'm not sure if they were once classes or we're doing something special here? I removed them in lieu of the ones you get for free with a record.

if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeUsageStatsForThreadPools other = (NodeUsageStatsForThreadPools) o;
for (var entry : other.threadPoolUsageStatsMap.entrySet()) {
var loadStats = threadPoolUsageStatsMap.get(entry.getKey());
if (loadStats == null || loadStats.equals(entry.getValue()) == false) {
return false;
}
public UtilizationSample(StreamInput in) throws IOException {
this(Instant.ofEpochMilli(in.readVLong()), in.readFloat());
}
return true;
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder(getClass().getSimpleName() + "{nodeId=" + nodeId + ", threadPoolUsageStatsMap=[");
for (var entry : threadPoolUsageStatsMap.entrySet()) {
builder.append("{ThreadPool.Names=" + entry.getKey() + ", ThreadPoolUsageStats=" + entry.getValue() + "}");
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(instant.toEpochMilli());
out.writeFloat(utilization);
}
builder.append("]}");
return builder.toString();
}

/**
* Record of usage stats for a thread pool.
*
* @param totalThreadPoolThreads Total number of threads in the thread pool.
* @param averageThreadPoolUtilization Percent of thread pool threads that are in use, averaged over some period of time.
* @param averageThreadPoolQueueLatencyMillis How much time tasks spend in the thread pool queue. Zero if there is nothing being queued
* in the write thread pool.
* @param numberOfThreads Total number of threads in the thread pool.
* @param utilizationSamples The list of recent utilization samples
*/
public record ThreadPoolUsageStats(
int totalThreadPoolThreads,
float averageThreadPoolUtilization,
long averageThreadPoolQueueLatencyMillis
) implements Writeable {
public record ThreadPoolUsageStats(int numberOfThreads, List<UtilizationSample> utilizationSamples) implements Writeable {

public ThreadPoolUsageStats(StreamInput in) throws IOException {
this(in.readVInt(), in.readFloat(), in.readVLong());
public ThreadPoolUsageStats {
assert numberOfThreads > 0;
assert utilizationSamples.isEmpty() == false;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(this.totalThreadPoolThreads);
out.writeFloat(this.averageThreadPoolUtilization);
out.writeVLong(this.averageThreadPoolQueueLatencyMillis);
}

@Override
public int hashCode() {
return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, averageThreadPoolQueueLatencyMillis);
}

@Override
public String toString() {
return "[totalThreadPoolThreads="
+ totalThreadPoolThreads
+ ", averageThreadPoolUtilization="
+ averageThreadPoolUtilization
+ ", averageThreadPoolQueueLatencyMillis="
+ averageThreadPoolQueueLatencyMillis
+ "]";
public static ThreadPoolUsageStats readFrom(StreamInput in) throws IOException {
final int numberOfThreads = in.readVInt();
final List<UtilizationSample> utilizationSamples;
if (in.getTransportVersion().onOrAfter(TransportVersions.THREAD_POOL_UTILIZATION_MULTI_SAMPLE_CLUSTER_INFO)) {
utilizationSamples = in.readCollectionAsImmutableList(UtilizationSample::new);
} else {
utilizationSamples = List.of(new UtilizationSample(Instant.now(), in.readFloat()));
in.readVLong(); // Skip over the queue latency
}
return new ThreadPoolUsageStats(numberOfThreads, utilizationSamples);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadPoolUsageStats other = (ThreadPoolUsageStats) o;
return totalThreadPoolThreads == other.totalThreadPoolThreads
&& averageThreadPoolUtilization == other.averageThreadPoolUtilization
&& averageThreadPoolQueueLatencyMillis == other.averageThreadPoolQueueLatencyMillis;
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(this.numberOfThreads);
if (out.getTransportVersion().onOrAfter(TransportVersions.THREAD_POOL_UTILIZATION_MULTI_SAMPLE_CLUSTER_INFO)) {
out.writeGenericList(utilizationSamples, StreamOutput::writeWriteable);
} else {
out.writeFloat(this.utilizationSamples.getLast().utilization());
out.writeVLong(0L); // Dummy queue latency value
}
}

} // ThreadPoolUsageStats

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -86,13 +88,17 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo
) {
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = value.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
writeThreadPoolStats.totalThreadPoolThreads(),
(float) Math.max(
(writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())),
0.0
),
writeThreadPoolStats.averageThreadPoolQueueLatencyMillis()
final float newWritePoolUtilization = (float) Math.max(
(writeThreadPoolStats.utilizationSamples().getLast().utilization() + (writeLoadDelta / writeThreadPoolStats.numberOfThreads())),
0.0
);
final List<NodeUsageStatsForThreadPools.UtilizationSample> newUtilizationSamples = new ArrayList<>(
writeThreadPoolStats.utilizationSamples()
);
final NodeUsageStatsForThreadPools.UtilizationSample previousUtilization = newUtilizationSamples.removeLast();
newUtilizationSamples.add(
new NodeUsageStatsForThreadPools.UtilizationSample(previousUtilization.instant(), newWritePoolUtilization)
);
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(writeThreadPoolStats.numberOfThreads(), newUtilizationSamples);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simulator will replace the most recent utilization value with the new one. We could instead add one to the end, but then what timestamp would we put on it? 🤷

Hard to know the best strategy without knowing how the determination of "hot spotting" is made, and whether we care about that in the simulator.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hot-spot detection should be same for real data and simulation. How about simulate samples? Maybe apply fixed value to all of them and then run hot-spot detection?

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;

public class ClusterInfoTests extends AbstractWireSerializingTestCase<ClusterInfo> {

Expand Down Expand Up @@ -81,8 +83,14 @@ private static Map<String, NodeUsageStatsForThreadPools> randomNodeUsageStatsFor
String nodeIdKey = randomAlphaOfLength(32);
NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolUsageStats =
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(/* totalThreadPoolThreads= */ randomIntBetween(1, 16),
/* averageThreadPoolUtilization= */ randomFloat(),
/* averageThreadPoolQueueLatencyMillis= */ randomLongBetween(0, 50000)
IntStream.range(0, randomIntBetween(1, 10))
.mapToObj(
ignored -> new NodeUsageStatsForThreadPools.UtilizationSample(
randomInstantBetween(Instant.MIN, Instant.MAX),
randomFloatBetween(0.0f, 1.0f, true)
)
)
.toList()
);
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> usageStatsForThreadPools = new HashMap<>();
usageStatsForThreadPools.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;

import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -87,16 +88,16 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
assertThat(calculatedNodeUsageStats, Matchers.aMapWithSize(2));

final var shardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId());
final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads();
final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.totalThreadPoolThreads();
final var expectedUtilisationReductionAtSource = shardWriteLoad / originalNode0ThreadPoolStats.numberOfThreads();
final var expectedUtilisationIncreaseAtDestination = shardWriteLoad / originalNode1ThreadPoolStats.numberOfThreads();

// Some node_0 utilization should have been moved to node_1
if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.averageThreadPoolUtilization()) {
if (expectedUtilisationReductionAtSource > originalNode0ThreadPoolStats.utilizationSamples().getLast().utilization()) {
// We don't return utilization less than zero because that makes no sense
assertThat(getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), equalTo(0.0f));
} else {
assertThat(
(double) originalNode0ThreadPoolStats.averageThreadPoolUtilization() - getAverageWritePoolUtilization(
(double) originalNode0ThreadPoolStats.utilizationSamples().getLast().utilization() - getAverageWritePoolUtilization(
shardMovementWriteLoadSimulator,
"node_0"
),
Expand All @@ -105,7 +106,9 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
}
assertThat(
(double) getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1") - originalNode1ThreadPoolStats
.averageThreadPoolUtilization(),
.utilizationSamples()
.getLast()
.utilization(),
closeTo(expectedUtilisationIncreaseAtDestination, 0.001f)
);

Expand All @@ -117,11 +120,11 @@ public void testMovementOfAShardWillMoveThreadPoolUtilisation() {
// The utilization numbers should return to their original values
assertThat(
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"),
equalTo(originalNode0ThreadPoolStats.averageThreadPoolUtilization())
equalTo(originalNode0ThreadPoolStats.utilizationSamples().getLast().utilization())
);
assertThat(
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_1"),
equalTo(originalNode1ThreadPoolStats.averageThreadPoolUtilization())
equalTo(originalNode1ThreadPoolStats.utilizationSamples().getLast().utilization())
);
}

Expand Down Expand Up @@ -150,14 +153,18 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
private float getAverageWritePoolUtilization(ShardMovementWriteLoadSimulator shardMovementWriteLoadSimulator, String nodeId) {
final var generatedNodeUsageStates = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
final var node0WritePoolStats = generatedNodeUsageStates.get(nodeId).threadPoolUsageStatsMap().get("write");
return node0WritePoolStats.averageThreadPoolUtilization();
return node0WritePoolStats.utilizationSamples().getLast().utilization();
}

private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageStats() {
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
randomIntBetween(4, 16),
randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true),
randomLongBetween(0, 60_000)
List.of(
new NodeUsageStatsForThreadPools.UtilizationSample(
randomInstantBetween(Instant.MIN, Instant.MAX),
randomBoolean() ? 0.0f : randomFloatBetween(0.1f, 1.0f, true)
)
)
);
}

Expand Down