Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

import com.carrotsearch.hppc.ObjectDoubleHashMap;
import com.carrotsearch.hppc.ObjectDoubleMap;
import com.carrotsearch.hppc.procedures.ObjectDoubleProcedure;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.util.Maps;
Expand All @@ -21,6 +24,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;

/**
Expand All @@ -29,6 +33,7 @@
*/
public class ShardMovementWriteLoadSimulator {

private static final Logger logger = LogManager.getLogger(ShardMovementWriteLoadSimulator.class);
private final Map<String, NodeUsageStatsForThreadPools> originalNodeUsageStatsForThreadPools;
private final ObjectDoubleMap<String> simulatedNodeWriteLoadDeltas;
private final Map<ShardId, Double> writeLoadsPerShard;
Expand Down Expand Up @@ -87,9 +92,46 @@ public Map<String, NodeUsageStatsForThreadPools> simulatedNodeUsageStatsForThrea
adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of adding a second non-trivial for-loop in addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools, could we replace the above original for-loop with iteration of the simulatedNodeWriteLoadDeltas data structure to begin with?
Then use something like originalNodeUsageStatsForThreadPools.forEach(adjustedNodeUsageStatsForThreadPools::putIfAbsent) to straight copy the remainder.

This would be in combination with my other suggestion to add the new node to originalNodeUsageStatsForThreadPools beforehand for simplicity.

// Add `NodeUsageStatsForThreadPools` for any nodes not present in the original `NodeUsageStatsForThreadPools` map.
addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools(adjustedNodeUsageStatsForThreadPools);

return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools);
}

private void addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools(
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools
) {
// Assume the new node has the same size thread pool as the largest existing node
final OptionalInt largestWriteThreadPool = originalNodeUsageStatsForThreadPools.values()
Copy link
Contributor

@DiannaHohensee DiannaHohensee Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we have to build the new node basis every time we calculated an updated ClusterInfo. Could we instead immediately add a new 0 utilization node to originalNodeUsageStatsForThreadPools? Then we can move forward always expecting an entry in originalNodeUsageStatsForThreadPools for every node. Might simplify the code.

.stream()
.map(NodeUsageStatsForThreadPools::threadPoolUsageStatsMap)
.map(m -> m.get(ThreadPool.Names.WRITE))
.mapToInt(NodeUsageStatsForThreadPools.ThreadPoolUsageStats::totalThreadPoolThreads)
.max();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the max thread pool size is probably "optimistic", we could also be pessimistic and assume the minimum pool size

Copy link
Contributor

@DiannaHohensee DiannaHohensee Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't really know what would be best, can we pick the first node in the map? Then we avoid any performance slow downs with streams or iteration, since this path is going to be hit a lot.


if (largestWriteThreadPool.isPresent()) {
simulatedNodeWriteLoadDeltas.forEach((ObjectDoubleProcedure<? super String>) (nodeId, writeLoadDelta) -> {
nodeUsageStatsForThreadPools.computeIfAbsent(
nodeId,
missingNodeId -> new NodeUsageStatsForThreadPools(
missingNodeId,
Map.of(
ThreadPool.Names.WRITE,
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
largestWriteThreadPool.getAsInt(),
updateNodeUtilizationWithShardMovements(0.0f, (float) writeLoadDelta, largestWriteThreadPool.getAsInt()),
0
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fudge a NodeUsageStatsForThreadPools that is good for our usage (i.e. it has a single entry for the WRITE pool). Nobody else relies on these stats at the moment, so that's probably OK, but if another decider starts using these stats we'd need to be careful about doing this maybe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also probably do better in the event a node with no stats has a shard moved off of it. In that case it doesn't make sense to assume the node was empty before the move, because clearly it was not, but then it seems very unlikely we'd find ourselves in that situation so I don't know how much effort we want to put into improving that estimate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fudge a NodeUsageStatsForThreadPools that is good for our usage (i.e. it has a single entry for the WRITE pool). Nobody else relies on these stats at the moment, so that's probably OK, but if another decider starts using these stats we'd need to be careful about doing this maybe.

Just FYI, the Decider does take some action based on the ClusterInfo it receives.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also probably do better in the event a node with no stats has a shard moved off of it. In that case it doesn't make sense to assume the node was empty before the move, because clearly it was not, but then it seems very unlikely we'd find ourselves in that situation so I don't know how much effort we want to put into improving that estimate.

I agree, not worth the effort. Not sure the scenario can even happen, actually. A new node joins, we'd have to assign shards to the node, first, which means simulation begins. The simulation might be a little off because of the thread pool thread count guess, but otherwise fine.

)
)
);
});
} else {
logger.debug("No nodes found to estimate write thread pool size, skipping");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get here, there were no other nodes in the ClusterInfo to base our estimate off of. In this case we could also assume the same pool size as the local node perhaps? but we don't have that information in here. This also seems very unlikely to actually occur.

}
}

private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats(
NodeUsageStatsForThreadPools value,
double writeLoadDelta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;

import java.util.Arrays;
Expand All @@ -27,12 +28,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.DoubleSupplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;

public class ShardMovementWriteLoadSimulatorTests extends ESTestCase {
Expand Down Expand Up @@ -151,8 +155,9 @@ public void testMovementOfAShardWillMoveThreadPoolStats() {
}

public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null;
final var nodesHaveStats = randomBoolean();
final var originalNode0ThreadPoolStats = nodesHaveStats ? randomThreadPoolUsageStats() : null;
final var originalNode1ThreadPoolStats = nodesHaveStats ? randomThreadPoolUsageStats() : null;
final var allocation = createRoutingAllocationWithRandomisedWriteLoads(
new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))),
originalNode0ThreadPoolStats,
Expand All @@ -168,8 +173,43 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() {
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);

final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools();
assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null));
assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null));
assertThat(simulated.containsKey("node_0"), equalTo(nodesHaveStats));
assertThat(simulated.containsKey("node_1"), equalTo(nodesHaveStats));
}

public void testMovementToANodeWithNoThreadPoolStats() {
final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats();
final var allocation = createRoutingAllocationWithNonZeroRandomisedWriteLoads(Set.of(), originalNode0ThreadPoolStats);
final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation);

// Relocate a random shard from node_0 to node_1
final var expectedShardSize = randomNonNegativeLong();
final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList());
final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP);
shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2());
allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize);

final Map<String, NodeUsageStatsForThreadPools> simulatedStats = shardMovementWriteLoadSimulator
.simulatedNodeUsageStatsForThreadPools();

// We should have created stats assuming node_1 was empty before the move
final NodeUsageStatsForThreadPools node1Stats = simulatedStats.get("node_1");
assertNotNull(node1Stats);
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writePoolStats = node1Stats.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
final double movedShardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId());
assertThat(writePoolStats.totalThreadPoolThreads(), equalTo(originalNode0ThreadPoolStats.totalThreadPoolThreads()));
assertThat(
(double) writePoolStats.averageThreadPoolUtilization(),
closeTo((movedShardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads()), 0.0001f)
);
assertThat(writePoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L));

// We should have deducted the shard write-load from node_0
assertThat(
getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"),
either(equalTo(0.0f)).or(lessThan(originalNode0ThreadPoolStats.averageThreadPoolUtilization()))
);
}

public void testUpdateThreadPoolQueueLatencyWithShardMovements() {
Expand Down Expand Up @@ -203,16 +243,39 @@ private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageS
);
}

private RoutingAllocation createRoutingAllocationWithNonZeroRandomisedWriteLoads(
Set<String> indicesWithNoWriteLoad,
NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats
) {
return createRoutingAllocationWithRandomisedWriteLoads(
indicesWithNoWriteLoad,
() -> randomDoubleBetween(0.1, 5.0, true),
arrayOfNodeThreadPoolStats
);
}

private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
Set<String> indicesWithNoWriteLoad,
NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats
) {
return createRoutingAllocationWithRandomisedWriteLoads(
indicesWithNoWriteLoad,
() -> randomBoolean() ? 0.0 : randomDoubleBetween(0.1, 5.0, true),
arrayOfNodeThreadPoolStats
);
}

private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
Set<String> indicesWithNoWriteLoad,
DoubleSupplier writeLoadSupplier,
NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats
) {
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStats = new HashMap<>();
for (int i = 0; i < arrayOfNodeThreadPoolStats.length; i++) {
final var nodeThreadPoolStats = arrayOfNodeThreadPoolStats[i];
if (nodeThreadPoolStats != null) {
final var nodeId = "node_" + i;
nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of("write", nodeThreadPoolStats)));
nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of(ThreadPool.Names.WRITE, nodeThreadPoolStats)));
}
}

Expand All @@ -225,12 +288,7 @@ private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads(
.stream()
.filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false)
.flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum)))
.collect(
Collectors.toUnmodifiableMap(
shardId -> shardId,
shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true)
)
)
.collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> writeLoadSupplier.getAsDouble()))
)
.build();

Expand Down