diff --git a/muted-tests.yml b/muted-tests.yml index 2885e754a5cc0..5524779ac44af 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -519,9 +519,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.RandomizedTimeSeriesIT method: testGroupBySubset issue: https://github.com/elastic/elasticsearch/issues/133220 -- class: org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDeciderIT - method: testHighNodeWriteLoadPreventsNewShardAllocation - issue: https://github.com/elastic/elasticsearch/issues/133857 - class: org.elasticsearch.xpack.kql.parser.KqlParserBooleanQueryTests method: testParseOrQuery issue: https://github.com/elastic/elasticsearch/issues/133863 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index e419387acfd02..c8bd9ce10652b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -50,6 +50,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator.calculateUtilizationForWriteLoad; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class WriteLoadConstraintDeciderIT extends ESIntegTestCase { @@ -68,6 +69,8 @@ protected Collection> getMockPlugins() { */ public void testHighNodeWriteLoadPreventsNewShardAllocation() { int randomUtilizationThresholdPercent = randomIntBetween(50, 100); + int numberOfWritePoolThreads = randomIntBetween(2, 20); + float shardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); Settings settings = Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), @@ -115,7 +118,14 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { ); String indexName = randomIdentifier(); - int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental. + int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental. + + // Calculate the maximum utilization a node can report while still being able to accept all relocating shards + double additionalLoadFromAllShards = calculateUtilizationForWriteLoad( + shardWriteLoad * randomNumberOfShards, + numberOfWritePoolThreads + ); + int maxUtilizationPercent = randomUtilizationThresholdPercent - (int) (additionalLoadFromAllShards * 100) - 1; var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { var indexRoutingTable = clusterState.routingTable().index(indexName); @@ -154,20 +164,20 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( firstDiscoveryNode, - 2, - 0.5f, + numberOfWritePoolThreads, + randomIntBetween(0, maxUtilizationPercent) / 100f, 0 ); final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( secondDiscoveryNode, - 2, - 0.5f, + numberOfWritePoolThreads, + randomIntBetween(0, maxUtilizationPercent) / 100f, 0 ); final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( thirdDiscoveryNode, - 2, - randomUtilizationThresholdPercent + 1 / 100, + numberOfWritePoolThreads, + (randomUtilizationThresholdPercent + 1) / 100f, 0 ); @@ -197,12 +207,11 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { .getMetadata() .getProject() .index(indexName); - double shardWriteLoadDefault = 0.2; MockTransportService.getInstance(firstDataNodeName) .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId)); + shardStats.add(createShardStats(indexMetadata, i, shardWriteLoad, firstDataNodeId)); } TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName); channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of())); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java index 72a3e85055891..81eace527d04a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ShardMovementWriteLoadSimulator.java @@ -124,10 +124,21 @@ public static float updateNodeUtilizationWithShardMovements( float shardWriteLoadDelta, int numberOfWriteThreads ) { - float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads); + float newNodeUtilization = nodeUtilization + calculateUtilizationForWriteLoad(shardWriteLoadDelta, numberOfWriteThreads); return (float) Math.max(newNodeUtilization, 0.0); } + /** + * Calculate what percentage utilization increase would result from adding some amount of write-load + * + * @param totalShardWriteLoad The write-load being added/removed + * @param numberOfThreads The number of threads in the node-being-added-to's write thread pool + * @return The change in percentage utilization + */ + public static float calculateUtilizationForWriteLoad(float totalShardWriteLoad, int numberOfThreads) { + return totalShardWriteLoad / numberOfThreads; + } + /** * Adjust the max thread pool queue latency by accounting for whether shard has moved away from the node. * @param maxThreadPoolQueueLatencyMillis The current max thread pool queue latency.