diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 72a3e85055891..57009a97b9450 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -11,7 +11,10 @@ import com.carrotsearch.hppc.ObjectDoubleHashMap; import com.carrotsearch.hppc.ObjectDoubleMap; +import com.carrotsearch.hppc.procedures.ObjectDoubleProcedure; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.util.Maps; @@ -21,6 +24,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Map; +import java.util.OptionalInt; import java.util.Set; /** @@ -29,6 +33,7 @@ */ public class ShardMovementWriteLoadSimulator { + private static final Logger logger = LogManager.getLogger(ShardMovementWriteLoadSimulator.class); private final Map originalNodeUsageStatsForThreadPools; private final ObjectDoubleMap simulatedNodeWriteLoadDeltas; private final Map writeLoadsPerShard; @@ -87,9 +92,46 @@ public Map simulatedNodeUsageStatsForThrea adjustedNodeUsageStatsForThreadPools.put(entry.getKey(), entry.getValue()); } } + + // Add `NodeUsageStatsForThreadPools` for any nodes not present in the original `NodeUsageStatsForThreadPools` map. + addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools(adjustedNodeUsageStatsForThreadPools); + return Collections.unmodifiableMap(adjustedNodeUsageStatsForThreadPools); } + private void addUsageStatsForAnyNodesNotPresentInOriginalNodeUsageStatsForThreadPools( + Map nodeUsageStatsForThreadPools + ) { + // Assume the new node has the same size thread pool as the largest existing node + final OptionalInt largestWriteThreadPool = originalNodeUsageStatsForThreadPools.values() + .stream() + .map(NodeUsageStatsForThreadPools::threadPoolUsageStatsMap) + .map(m -> m.get(ThreadPool.Names.WRITE)) + .mapToInt(NodeUsageStatsForThreadPools.ThreadPoolUsageStats::totalThreadPoolThreads) + .max(); + + if (largestWriteThreadPool.isPresent()) { + simulatedNodeWriteLoadDeltas.forEach((ObjectDoubleProcedure) (nodeId, writeLoadDelta) -> { + nodeUsageStatsForThreadPools.computeIfAbsent( + nodeId, + missingNodeId -> new NodeUsageStatsForThreadPools( + missingNodeId, + Map.of( + ThreadPool.Names.WRITE, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + largestWriteThreadPool.getAsInt(), + updateNodeUtilizationWithShardMovements(0.0f, (float) writeLoadDelta, largestWriteThreadPool.getAsInt()), + 0 + ) + ) + ) + ); + }); + } else { + logger.debug("No nodes found to estimate write thread pool size, skipping"); + } + } + private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoolStats( NodeUsageStatsForThreadPools value, double writeLoadDelta, diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java index 6178f25976f51..fb79497b4f209 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulatorTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.snapshots.SnapshotShardSizeInfo; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import java.util.Arrays; @@ -27,12 +28,15 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.DoubleSupplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.StreamSupport; import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.sameInstance; public class ShardMovementWriteLoadSimulatorTests extends ESTestCase { @@ -151,8 +155,9 @@ public void testMovementOfAShardWillMoveThreadPoolStats() { } public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { - final var originalNode0ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; - final var originalNode1ThreadPoolStats = randomBoolean() ? randomThreadPoolUsageStats() : null; + final var nodesHaveStats = randomBoolean(); + final var originalNode0ThreadPoolStats = nodesHaveStats ? randomThreadPoolUsageStats() : null; + final var originalNode1ThreadPoolStats = nodesHaveStats ? randomThreadPoolUsageStats() : null; final var allocation = createRoutingAllocationWithRandomisedWriteLoads( new HashSet<>(randomSubsetOf(Arrays.asList(INDICES))), originalNode0ThreadPoolStats, @@ -168,8 +173,43 @@ public void testMovementBetweenNodesWithNoThreadPoolAndWriteLoadStats() { allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); final var simulated = shardMovementWriteLoadSimulator.simulatedNodeUsageStatsForThreadPools(); - assertThat(simulated.containsKey("node_0"), equalTo(originalNode0ThreadPoolStats != null)); - assertThat(simulated.containsKey("node_1"), equalTo(originalNode1ThreadPoolStats != null)); + assertThat(simulated.containsKey("node_0"), equalTo(nodesHaveStats)); + assertThat(simulated.containsKey("node_1"), equalTo(nodesHaveStats)); + } + + public void testMovementToANodeWithNoThreadPoolStats() { + final var originalNode0ThreadPoolStats = randomThreadPoolUsageStats(); + final var allocation = createRoutingAllocationWithNonZeroRandomisedWriteLoads(Set.of(), originalNode0ThreadPoolStats); + final var shardMovementWriteLoadSimulator = new ShardMovementWriteLoadSimulator(allocation); + + // Relocate a random shard from node_0 to node_1 + final var expectedShardSize = randomNonNegativeLong(); + final var randomShard = randomFrom(StreamSupport.stream(allocation.routingNodes().node("node_0").spliterator(), false).toList()); + final var moveShardTuple = allocation.routingNodes().relocateShard(randomShard, "node_1", expectedShardSize, "testing", NOOP); + shardMovementWriteLoadSimulator.simulateShardStarted(moveShardTuple.v2()); + allocation.routingNodes().startShard(moveShardTuple.v2(), NOOP, expectedShardSize); + + final Map simulatedStats = shardMovementWriteLoadSimulator + .simulatedNodeUsageStatsForThreadPools(); + + // We should have created stats assuming node_1 was empty before the move + final NodeUsageStatsForThreadPools node1Stats = simulatedStats.get("node_1"); + assertNotNull(node1Stats); + final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writePoolStats = node1Stats.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + final double movedShardWriteLoad = allocation.clusterInfo().getShardWriteLoads().get(randomShard.shardId()); + assertThat(writePoolStats.totalThreadPoolThreads(), equalTo(originalNode0ThreadPoolStats.totalThreadPoolThreads())); + assertThat( + (double) writePoolStats.averageThreadPoolUtilization(), + closeTo((movedShardWriteLoad / originalNode0ThreadPoolStats.totalThreadPoolThreads()), 0.0001f) + ); + assertThat(writePoolStats.maxThreadPoolQueueLatencyMillis(), equalTo(0L)); + + // We should have deducted the shard write-load from node_0 + assertThat( + getAverageWritePoolUtilization(shardMovementWriteLoadSimulator, "node_0"), + either(equalTo(0.0f)).or(lessThan(originalNode0ThreadPoolStats.averageThreadPoolUtilization())) + ); } public void testUpdateThreadPoolQueueLatencyWithShardMovements() { @@ -203,8 +243,31 @@ private NodeUsageStatsForThreadPools.ThreadPoolUsageStats randomThreadPoolUsageS ); } + private RoutingAllocation createRoutingAllocationWithNonZeroRandomisedWriteLoads( + Set indicesWithNoWriteLoad, + NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats + ) { + return createRoutingAllocationWithRandomisedWriteLoads( + indicesWithNoWriteLoad, + () -> randomDoubleBetween(0.1, 5.0, true), + arrayOfNodeThreadPoolStats + ); + } + + private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( + Set indicesWithNoWriteLoad, + NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats + ) { + return createRoutingAllocationWithRandomisedWriteLoads( + indicesWithNoWriteLoad, + () -> randomBoolean() ? 0.0 : randomDoubleBetween(0.1, 5.0, true), + arrayOfNodeThreadPoolStats + ); + } + private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( Set indicesWithNoWriteLoad, + DoubleSupplier writeLoadSupplier, NodeUsageStatsForThreadPools.ThreadPoolUsageStats... arrayOfNodeThreadPoolStats ) { final Map nodeUsageStats = new HashMap<>(); @@ -212,7 +275,7 @@ private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( final var nodeThreadPoolStats = arrayOfNodeThreadPoolStats[i]; if (nodeThreadPoolStats != null) { final var nodeId = "node_" + i; - nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of("write", nodeThreadPoolStats))); + nodeUsageStats.put(nodeId, new NodeUsageStatsForThreadPools(nodeId, Map.of(ThreadPool.Names.WRITE, nodeThreadPoolStats))); } } @@ -225,12 +288,7 @@ private RoutingAllocation createRoutingAllocationWithRandomisedWriteLoads( .stream() .filter(index -> indicesWithNoWriteLoad.contains(index.getIndex().getName()) == false) .flatMap(index -> IntStream.range(0, 3).mapToObj(shardNum -> new ShardId(index.getIndex(), shardNum))) - .collect( - Collectors.toUnmodifiableMap( - shardId -> shardId, - shardId -> randomBoolean() ? 0.0f : randomDoubleBetween(0.1, 5.0, true) - ) - ) + .collect(Collectors.toUnmodifiableMap(shardId -> shardId, shardId -> writeLoadSupplier.getAsDouble())) ) .build();