diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java index 2325168daed01..2548836b9e634 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java @@ -9,6 +9,10 @@ package org.elasticsearch.cluster.routing.allocation.decider; +import org.apache.logging.log4j.Level; +import org.elasticsearch.action.admin.cluster.allocation.DesiredBalanceRequest; +import org.elasticsearch.action.admin.cluster.allocation.DesiredBalanceResponse; +import org.elasticsearch.action.admin.cluster.allocation.TransportGetDesiredBalanceAction; import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; import org.elasticsearch.action.admin.indices.stats.CommonStats; @@ -25,9 +29,11 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceMetrics; +import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.shard.IndexingStats; @@ -39,6 +45,8 @@ import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -54,9 +62,6 @@ import static java.util.stream.IntStream.range; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING; -import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING; -import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING; import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -77,7 +82,7 @@ protected Collection> getMockPlugins() { /** * Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the * balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. - * + *

* Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of * Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. */ @@ -108,28 +113,9 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { 0 ); - MockTransportService.getInstance(harness.firstDataNodeName).< - NodeUsageStatsForThreadPoolsAction - .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); - MockTransportService.getInstance(harness.secondDataNodeName) - .addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) - ) - ); - MockTransportService.getInstance(harness.thirdDataNodeName) - .addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) - ) - ); + setUpMockTransportNodeUsageStatsResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats); /** * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats @@ -141,38 +127,13 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() { .getMetadata() .getProject() .index(harness.indexName); - MockTransportService.getInstance(harness.firstDataNodeName) - .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { - List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); - for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - shardStats.add(createShardStats(indexMetadata, i, harness.randomShardWriteLoad, harness.firstDataNodeId)); - } - TransportIndicesStatsAction instance = internalCluster().getInstance( - TransportIndicesStatsAction.class, - harness.firstDataNodeName - ); - channel.sendResponse( - instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()) - ); - }); - MockTransportService.getInstance(harness.secondDataNodeName) - .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { - // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance( - TransportIndicesStatsAction.class, - harness.secondDataNodeName - ); - channel.sendResponse(instance.new NodeResponse(harness.secondDataNodeId, 0, List.of(), List.of())); - }); - MockTransportService.getInstance(harness.thirdDataNodeName) - .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { - // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance( - TransportIndicesStatsAction.class, - harness.thirdDataNodeName - ); - channel.sendResponse(instance.new NodeResponse(harness.thirdDataNodeId, 0, List.of(), List.of())); - }); + setUpMockTransportIndicesStatsResponse( + harness.firstDiscoveryNode, + indexMetadata.getNumberOfShards(), + createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + ); + setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); + setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); /** * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and @@ -183,26 +144,28 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); refreshClusterInfo(); - logger.info( - "---> Update the filter to exclude " + harness.firstDataNodeName + " so that shards will be reassigned away to the other nodes" - ); - // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. - updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); + var temporaryClusterStateListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + Index index = clusterState.routingTable().index(harness.indexName).getIndex(); + return checkShardAssignment( + clusterState.getRoutingNodes(), + index, + harness.firstDataNodeId, + harness.secondDataNodeId, + harness.thirdDataNodeId, + 0, + harness.randomNumberOfShards, + 0 + ); + }); try { - safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { - Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - return checkShardAssignment( - clusterState.getRoutingNodes(), - index, - harness.firstDataNodeId, - harness.secondDataNodeId, - harness.thirdDataNodeId, - 0, - harness.randomNumberOfShards, - 0 - ); - })); + logger.info( + "---> Update the filter to exclude " + harness.firstDataNodeName + " so shards will be reassigned away to the other nodes" + ); + // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); + + safeAwait(temporaryClusterStateListener); } catch (AssertionError error) { ClusterState state = internalCluster().client() .admin() @@ -223,7 +186,7 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar * Tests that {@link AllocationDecider#canRemain} returning {@link Decision.Type#NO} for a {@code NodeX} will ignore a * {@link AllocationDecider#canAllocate} response of {@link Decision.Type#NOT_PREFERRED} from a {@code NodeY} and reassign the shard * when there are no better node options. - * + *

* Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the * balancer. Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards * off of Node1 while Node2 and Node3 are hot-spotting, resulting in overriding not-preferred and relocating shards to Node2 and Node3. @@ -255,28 +218,9 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { 0 ); - MockTransportService.getInstance(harness.firstDataNodeName).< - NodeUsageStatsForThreadPoolsAction - .NodeRequest>addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) - ) - ); - MockTransportService.getInstance(harness.secondDataNodeName) - .addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) - ) - ); - MockTransportService.getInstance(harness.thirdDataNodeName) - .addRequestHandlingBehavior( - TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", - (handler, request, channel, task) -> channel.sendResponse( - new NodeUsageStatsForThreadPoolsAction.NodeResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) - ) - ); + setUpMockTransportNodeUsageStatsResponse(harness.firstDiscoveryNode, firstNodeNonHotSpottingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.thirdDiscoveryNode, thirdNodeHotSpottingNodeStats); /** * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats @@ -288,38 +232,13 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() { .getMetadata() .getProject() .index(harness.indexName); - MockTransportService.getInstance(harness.firstDataNodeName) - .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { - List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); - for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { - shardStats.add(createShardStats(indexMetadata, i, harness.randomShardWriteLoad, harness.firstDataNodeId)); - } - TransportIndicesStatsAction instance = internalCluster().getInstance( - TransportIndicesStatsAction.class, - harness.firstDataNodeName - ); - channel.sendResponse( - instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()) - ); - }); - MockTransportService.getInstance(harness.secondDataNodeName) - .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { - // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance( - TransportIndicesStatsAction.class, - harness.secondDataNodeName - ); - channel.sendResponse(instance.new NodeResponse(harness.secondDataNodeId, 0, List.of(), List.of())); - }); - MockTransportService.getInstance(harness.thirdDataNodeName) - .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { - // Return no stats for the index because none are assigned to this node. - TransportIndicesStatsAction instance = internalCluster().getInstance( - TransportIndicesStatsAction.class, - harness.thirdDataNodeName - ); - channel.sendResponse(instance.new NodeResponse(harness.thirdDataNodeId, 0, List.of(), List.of())); - }); + setUpMockTransportIndicesStatsResponse( + harness.firstDiscoveryNode, + indexMetadata.getNumberOfShards(), + createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + ); + setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); + setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); /** * Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired and @@ -330,20 +249,24 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); refreshClusterInfo(); - logger.info( - "---> Update the filter to exclude " + harness.firstDataNodeName + " so that shards will be reassigned away to the other nodes" - ); - // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. - updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", harness.firstDataNodeName)); + var temporaryClusterStateListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { + Index index = clusterState.routingTable().index(harness.indexName).getIndex(); + if (clusterState.getRoutingNodes() + .node(harness.firstDataNodeId) + .numberOfOwningShardsForIndex(index) == harness.randomNumberOfShards) { + return true; + } + return false; + }); try { - safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { - Index index = clusterState.routingTable().index(harness.indexName).getIndex(); - if (clusterState.getRoutingNodes().node(harness.firstDataNodeId).numberOfOwningShardsForIndex(index) == 0) { - return true; - } - return false; - })); + logger.info( + "---> Update the filter to remove exclusions so that shards can be reassigned based on the write load decider only" + ); + // Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", "")); + + safeAwait(temporaryClusterStateListener); } catch (AssertionError error) { ClusterState state = internalCluster().client() .admin() @@ -360,6 +283,221 @@ instance.new NodeResponse(harness.firstDataNodeId, indexMetadata.getNumberOfShar } } + @TestLogging( + reason = "track when reconciliation has completed", + value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:DEBUG" + ) + public void testCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferred() { + TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); + + /** + * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write + * load stats. The stats will show all the nodes above the high utilization threshold, so they do not accept new shards, while the + * first node will show queue latency above the threshold and request a shard to move away. However, there will be nowhere to + * reassign any shards. + */ + + final NodeUsageStatsForThreadPools firstNodeUtilHotSpottingAndQueuingNodeStats = createNodeUsageStatsForThreadPools( + harness.firstDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, + randomLongBetween(harness.randomQueueLatencyThresholdMillis, harness.randomQueueLatencyThresholdMillis + 1_000) + ); + final NodeUsageStatsForThreadPools secondNodeUtilHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.secondDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, + 0 + ); + final NodeUsageStatsForThreadPools thirdNodeUtilHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.thirdDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, + 0 + ); + + setUpMockTransportNodeUsageStatsResponse(harness.firstDiscoveryNode, firstNodeUtilHotSpottingAndQueuingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.secondDiscoveryNode, secondNodeUtilHotSpottingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.thirdDiscoveryNode, thirdNodeUtilHotSpottingNodeStats); + + /** + * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats + * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). + */ + + IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .state() + .getMetadata() + .getProject() + .index(harness.indexName); + setUpMockTransportIndicesStatsResponse( + harness.firstDiscoveryNode, + indexMetadata.getNumberOfShards(), + createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + ); + setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); + setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); + + /** + * Refresh the ClusterInfo to pull in the new dummy hot-spot stats. Then remove the filter restricting the shards to the first node. + * Then wait for the DesiredBalance computation to finish running after the cluster settings update. All the shards should remain on + * the first node, despite hot-spotting with queuing, because no other node has below utilization threshold stats. + */ + + // Wait for the DesiredBalance to be recomputed as a result of the ClusterInfo refresh. Ensures no async computation. + MockLog.awaitLogger(() -> { + logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with hot-spot stats"); + refreshClusterInfo(); + }, DesiredBalanceShardsAllocator.class, createBalancerConvergedSeenEvent()); + + // Wait for the DesiredBalance to be recomputed as a result of the settings change. + MockLog.awaitLogger(() -> { + logger.info( + "---> Update the filter to remove exclusions so that shards can be reassigned based on the write load decider only" + ); + // Updating the cluster settings will trigger a reroute request. + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", "")); + }, DesiredBalanceShardsAllocator.class, createBalancerConvergedSeenEvent()); + + try { + // Now check that all the shards remain on the first node because the other two nodes have too high write thread pool + // utilization to accept additional shards. + var desiredBalanceResponse = safeGet( + client().execute(TransportGetDesiredBalanceAction.TYPE, new DesiredBalanceRequest(TEST_REQUEST_TIMEOUT)) + ); + Map shardsMap = desiredBalanceResponse.getRoutingTable().get(harness.indexName); + logger.info("---> Checking desired shard assignments are still on the first data node. Desired assignments: " + shardsMap); + for (var desiredShard : shardsMap.values()) { + for (var desiredNodeId : desiredShard.desired().nodeIds()) { + assertEquals("Found a shard assigned to an unexpected node: " + shardsMap, desiredNodeId, harness.firstDataNodeId); + } + } + } catch (AssertionError error) { + ClusterState state = client().admin() + .cluster() + .prepareState(TEST_REQUEST_TIMEOUT) + .clear() + .setMetadata(true) + .setNodes(true) + .setRoutingTable(true) + .get() + .getState(); + logger.info("---> Failed to reach expected allocation state. Dumping assignments: " + state.getRoutingNodes()); + throw error; + } + } + + @TestLogging( + reason = "track when reconciliation has completed", + value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:DEBUG" + ) + public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() { + TestHarness harness = setUpThreeTestNodesAndAllIndexShardsOnFirstNode(); + + /** + * Override the {@link TransportNodeUsageStatsForThreadPoolsAction} action on the data nodes to supply artificial thread pool write + * load stats. The stats will show all the nodes above the high utilization threshold, so they do not accept new shards, while the + * first node will show queue latency above the threshold and request a shard to move away. A single shard should move away from the + * queuing node, no more than one. + */ + + final NodeUsageStatsForThreadPools firstNodeUtilHotSpottingAndQueuingNodeStats = createNodeUsageStatsForThreadPools( + harness.firstDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + (harness.randomUtilizationThresholdPercent + 1) / 100f, + randomLongBetween(harness.randomQueueLatencyThresholdMillis, harness.randomQueueLatencyThresholdMillis + 1_000) + ); + final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.secondDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, + 0 + ); + final NodeUsageStatsForThreadPools thirdNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( + harness.thirdDiscoveryNode, + harness.randomNumberOfWritePoolThreads, + randomIntBetween(0, harness.maxUtilBelowThresholdThatAllowsAllShardsToRelocate) / 100f, + 0 + ); + + setUpMockTransportNodeUsageStatsResponse(harness.firstDiscoveryNode, firstNodeUtilHotSpottingAndQueuingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.secondDiscoveryNode, secondNodeNonHotSpottingNodeStats); + setUpMockTransportNodeUsageStatsResponse(harness.thirdDiscoveryNode, thirdNodeNonHotSpottingNodeStats); + + /** + * Override the {@link TransportIndicesStatsAction} action on the data nodes to supply artificial shard write load stats. The stats + * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). + */ + + IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .state() + .getMetadata() + .getProject() + .index(harness.indexName); + setUpMockTransportIndicesStatsResponse( + harness.firstDiscoveryNode, + indexMetadata.getNumberOfShards(), + createShardStatsResponseForIndex(indexMetadata, harness.randomShardWriteLoad, harness.firstDataNodeId) + ); + setUpMockTransportIndicesStatsResponse(harness.secondDiscoveryNode, 0, List.of()); + setUpMockTransportIndicesStatsResponse(harness.thirdDiscoveryNode, 0, List.of()); + + /** + * Refresh the ClusterInfo to pull in the new dummy hot-spot stats. Then remove the filter restricting the shards to the first node. + * Then wait for the DesiredBalance computation to finish running after the cluster settings update. All the shards should remain on + * the first node, despite hot-spotting, because no other node has below utilization threshold stats. + */ + + // Wait for the DesiredBalance to be recomputed as a result of the ClusterInfo refresh. This way nothing async is running. + MockLog.awaitLogger(() -> { + logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with hot-spot stats"); + refreshClusterInfo(); + }, DesiredBalanceShardsAllocator.class, createBalancerConvergedSeenEvent()); + + // Wait for the DesiredBalance to be recomputed as a result of the settings change. + MockLog.awaitLogger(() -> { + logger.info( + "---> Update the filter to remove exclusions so that shards can be reassigned based on the write load decider only" + ); + // Updating the cluster settings will trigger a reroute request. + updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", "")); + }, DesiredBalanceShardsAllocator.class, createBalancerConvergedSeenEvent()); + + try { + // Now check that all a single shard was moved off of the first node to address the queuing, but the rest of the shards remain. + var desiredBalanceResponse = safeGet( + client().execute(TransportGetDesiredBalanceAction.TYPE, new DesiredBalanceRequest(TEST_REQUEST_TIMEOUT)) + ); + Map shardsMap = desiredBalanceResponse.getRoutingTable().get(harness.indexName); + logger.info("---> Checking desired shard assignments. Desired assignments: " + shardsMap); + int countShardsStillAssignedToFirstNode = 0; + for (var desiredShard : shardsMap.values()) { + for (var desiredNodeId : desiredShard.desired().nodeIds()) { + if (desiredNodeId.equals(harness.firstDataNodeId)) { + ++countShardsStillAssignedToFirstNode; + } + } + } + assertEquals( + "Expected all shards except one to still be on the first data node: " + shardsMap, + harness.randomNumberOfShards, + countShardsStillAssignedToFirstNode + 1 + ); + } catch (AssertionError error) { + ClusterState state = client().admin() + .cluster() + .prepareState(TEST_REQUEST_TIMEOUT) + .clear() + .setMetadata(true) + .setNodes(true) + .setRoutingTable(true) + .get() + .getState(); + logger.info("---> Failed to reach expected allocation state. Dumping assignments: " + state.getRoutingNodes()); + throw error; + } + } + public void testMaxQueueLatencyMetricIsPublished() { final Settings settings = Settings.builder() .put( @@ -411,6 +549,24 @@ private static Map getMostRecentQueueLatencyMetrics(List d return measurements; } + private void setUpMockTransportNodeUsageStatsResponse(DiscoveryNode node, NodeUsageStatsForThreadPools nodeUsageStats) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior( + TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", + (handler, request, channel, task) -> channel.sendResponse( + new NodeUsageStatsForThreadPoolsAction.NodeResponse(node, nodeUsageStats) + ) + ); + } + + private void setUpMockTransportIndicesStatsResponse(DiscoveryNode node, int totalShards, List shardStats) { + MockTransportService.getInstance(node.getName()) + .addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { + TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, node.getName()); + channel.sendResponse(instance.new NodeResponse(node.getId(), totalShards, shardStats, List.of())); + }); + } + /** * Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node. */ @@ -441,7 +597,13 @@ private boolean checkShardAssignment( return true; } - private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent) { + /** + * Enables the write load decider and overrides other write load decider settings. + * @param utilizationThresholdPercent Sets the write thread pool utilization threshold, controlling when canAllocate starts returning + * not-preferred. + * @param queueLatencyThresholdMillis Sets the queue latency threshold, controlling when canRemain starts returning not-preferred. + */ + private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent, long queueLatencyThresholdMillis) { return Settings.builder() .put( WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), @@ -451,10 +613,12 @@ private Settings enabledWriteLoadDeciderSettings(int utilizationThresholdPercent WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), utilizationThresholdPercent + "%" ) - // TODO (ES-12862): remove these overrides when throttling is turned off for simulations. - .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 100) - .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 100) - .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 100) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING.getKey(), + TimeValue.timeValueMillis(queueLatencyThresholdMillis) + ) + // Disable rebalancing so that testing can see Decider change outcomes only. + .put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none") .build(); } @@ -494,6 +658,21 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); } + /** + * Helper to create a list of dummy {@link ShardStats} for the given index, each shard reporting a {@code peakShardWriteLoad} stat. + */ + private List createShardStatsResponseForIndex( + IndexMetadata indexMetadata, + float peakShardWriteLoad, + String assignedShardNodeId + ) { + List shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); + for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { + shardStats.add(createShardStats(indexMetadata, i, peakShardWriteLoad, assignedShardNodeId)); + } + return shardStats; + } + /** * Helper to create a dummy {@link ShardStats} for the given index shard with the supplied {@code peakWriteLoad} value. */ @@ -518,6 +697,18 @@ private static ShardStats createShardStats(IndexMetadata indexMeta, int shardInd return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); } + /** + * Creates a MockLog.SeenEventExpectation for a log message indicating that the balancer computation has converged / finalized. + */ + private MockLog.SeenEventExpectation createBalancerConvergedSeenEvent() { + return new MockLog.SeenEventExpectation( + "desired balance computation ran and completed", + DesiredBalanceShardsAllocator.class.getCanonicalName(), + Level.DEBUG, + "Desired balance computation for * is completed, scheduling reconciliation" + ); + } + /** * Sets up common test infrastructure to deduplicate code across tests. *

@@ -527,8 +718,9 @@ private static ShardStats createShardStats(IndexMetadata indexMeta, int shardInd private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { int randomUtilizationThresholdPercent = randomIntBetween(50, 100); int randomNumberOfWritePoolThreads = randomIntBetween(2, 20); + long randomQueueLatencyThresholdMillis = randomLongBetween(1, 20_000); float randomShardWriteLoad = randomFloatBetween(0.0f, 0.01f, false); - Settings settings = enabledWriteLoadDeciderSettings(randomUtilizationThresholdPercent); + Settings settings = enabledWriteLoadDeciderSettings(randomUtilizationThresholdPercent, randomQueueLatencyThresholdMillis); internalCluster().startMasterOnlyNode(settings); final var dataNodes = internalCluster().startDataOnlyNodes(3, settings); @@ -626,6 +818,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() { thirdDiscoveryNode, randomUtilizationThresholdPercent, randomNumberOfWritePoolThreads, + randomQueueLatencyThresholdMillis, randomShardWriteLoad, indexName, randomNumberOfShards, @@ -648,6 +841,7 @@ record TestHarness( DiscoveryNode thirdDiscoveryNode, int randomUtilizationThresholdPercent, int randomNumberOfWritePoolThreads, + long randomQueueLatencyThresholdMillis, float randomShardWriteLoad, String indexName, int randomNumberOfShards, diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index 57d1b668851dd..ec7471f3ca147 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -108,11 +108,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing node.nodeId(), newWriteThreadPoolUtilization ); - - if (logger.isTraceEnabled()) { - logger.trace(explanation); - } - + logger.trace(explanation); return allocation.decision(Decision.YES, NAME, explanation); } @@ -122,9 +118,46 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting return Decision.single(Decision.Type.YES, NAME, "canRemain() is not enabled"); } - // TODO: implement + // Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion. + var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); + var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId()); + if (shardWriteLoad == null || shardWriteLoad == 0) { + return Decision.single(Decision.Type.YES, NAME, "Shard has no estimated write load. Decider takes no action."); + } + + var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); + var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId()); + if (nodeUsageStatsForThreadPools == null) { + // No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle + // setting a node-level write load for this node after this shard is assigned. + return Decision.single(Decision.Type.YES, NAME, "The node has no write load estimate. Decider takes no action."); + } + + assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false; + assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null; + var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); + var nodeWriteThreadPoolQueueLatencyThreshold = writeLoadConstraintSettings.getQueueLatencyThreshold(); + if (nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= nodeWriteThreadPoolQueueLatencyThreshold.millis()) { + String explain = Strings.format( + "Node [%s] has a queue latency of [%d] millis that exceeds the queue latency threshold of [%s]. This node is hot-spotting. " + + "Current thread pool utilization [%f]. Moving shard(s) away.", + node.nodeId(), + nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis(), + nodeWriteThreadPoolQueueLatencyThreshold.toHumanReadableString(2), + nodeWriteThreadPoolStats.averageThreadPoolUtilization() + ); + logger.debug(explain); + return Decision.single(Decision.Type.NOT_PREFERRED, NAME, explain); + } - return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented"); + String explanation = Strings.format( + "Node [%s]'s queue latency of [%d] does not exceed the threshold of [%s]", + node.nodeId(), + nodeWriteThreadPoolStats.maxThreadPoolQueueLatencyMillis(), + nodeWriteThreadPoolQueueLatencyThreshold.toHumanReadableString(2) + ); + logger.trace(explanation); + return allocation.decision(Decision.YES, NAME, explanation); } /** @@ -139,4 +172,5 @@ private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadP nodeWriteThreadPoolStats.totalThreadPoolThreads() ); } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java index 9617297474693..c47848a88c266 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderTests.java @@ -57,25 +57,31 @@ public void testWriteLoadDeciderDisabled() { assertEquals( Decision.Type.YES, writeLoadDecider.canAllocate( - testHarness.shardRouting2, + testHarness.shardRoutingOnNodeBelowUtilThreshold, testHarness.exceedingThresholdRoutingNode, testHarness.routingAllocation ).type() ); assertEquals( Decision.Type.YES, - writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.belowThresholdRoutingNode, testHarness.routingAllocation) - .type() + writeLoadDecider.canAllocate( + testHarness.shardRoutingOnNodeExceedingUtilThreshold, + testHarness.belowThresholdRoutingNode, + testHarness.routingAllocation + ).type() ); assertEquals( Decision.Type.YES, - writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation) - .type() + writeLoadDecider.canAllocate( + testHarness.shardRoutingOnNodeExceedingUtilThreshold, + testHarness.nearThresholdRoutingNode, + testHarness.routingAllocation + ).type() ); assertEquals( Decision.Type.YES, writeLoadDecider.canAllocate( - testHarness.thirdRoutingNoWriteLoad, + testHarness.shardRoutingNoWriteLoad, testHarness.exceedingThresholdRoutingNode, testHarness.routingAllocation ).type() @@ -85,7 +91,7 @@ public void testWriteLoadDeciderDisabled() { Decision.Type.YES, writeLoadDecider.canRemain( testHarness.clusterState.metadata().getProject().index(indexName), - testHarness.shardRouting1, + testHarness.shardRoutingOnNodeExceedingUtilThreshold, testHarness.exceedingThresholdRoutingNode, testHarness.routingAllocation ).type() @@ -112,7 +118,7 @@ public void testWriteLoadDeciderCanAllocate() { assertDecisionMatches( "Assigning a new shard to a node that is above the threshold should fail", writeLoadDecider.canAllocate( - testHarness.shardRouting2, + testHarness.shardRoutingOnNodeBelowUtilThreshold, testHarness.exceedingThresholdRoutingNode, testHarness.routingAllocation ), @@ -132,14 +138,18 @@ public void testWriteLoadDeciderCanAllocate() { ); assertDecisionMatches( "Assigning a new shard to a node that has capacity should succeed", - writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.belowThresholdRoutingNode, testHarness.routingAllocation), + writeLoadDecider.canAllocate( + testHarness.shardRoutingOnNodeExceedingUtilThreshold, + testHarness.belowThresholdRoutingNode, + testHarness.routingAllocation + ), Decision.Type.YES, null ); assertDecisionMatches( "Assigning a new shard without a write load estimate should _not_ be blocked by lack of capacity", writeLoadDecider.canAllocate( - testHarness.thirdRoutingNoWriteLoad, + testHarness.shardRoutingNoWriteLoad, testHarness.exceedingThresholdRoutingNode, testHarness.routingAllocation ), @@ -148,12 +158,83 @@ public void testWriteLoadDeciderCanAllocate() { ); assertDecisionMatches( "Assigning a new shard that would cause the node to exceed capacity should fail", - writeLoadDecider.canAllocate(testHarness.shardRouting1, testHarness.nearThresholdRoutingNode, testHarness.routingAllocation), + writeLoadDecider.canAllocate( + testHarness.shardRoutingOnNodeExceedingUtilThreshold, + testHarness.nearThresholdRoutingNode, + testHarness.routingAllocation + ), Decision.Type.NOT_PREFERRED, "The high utilization threshold of [0.900000] would be exceeded on node [*] with utilization [0.89] " + "if shard [[test-index][0]] with estimated additional utilisation [0.06250] (write load [0.50000] / threads [8]) were " + "assigned to it. Cannot allocate shard to node without risking increased write latencies." + ); + } + + /** + * Test the {@link WriteLoadConstraintDecider#canRemain} implementation. + */ + public void testWriteLoadDeciderCanRemain() { + String indexName = "test-index"; + var testHarness = createClusterStateAndRoutingAllocation(indexName); + + var writeLoadDecider = createWriteLoadConstraintDecider( + Settings.builder() + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED + ) + .build() + ); + assertEquals( + "A shard on a node below the util threshold should remain on its node", + Decision.Type.YES, + writeLoadDecider.canRemain( + testHarness.clusterState.metadata().getProject().index(indexName), + testHarness.shardRoutingOnNodeBelowUtilThreshold, + testHarness.belowThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "A shard on a node above the util threshold should remain on its node", + Decision.Type.YES, + writeLoadDecider.canRemain( + testHarness.clusterState.metadata().getProject().index(indexName), + testHarness.shardRoutingOnNodeExceedingUtilThreshold, + testHarness.exceedingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "A shard on a node with queuing below the threshold should remain on its node", + Decision.Type.YES, + writeLoadDecider.canRemain( + testHarness.clusterState.metadata().getProject().index(indexName), + testHarness.shardRoutingOnNodeBelowQueueThreshold, + testHarness.belowQueuingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "A shard on a node with queuing above the threshold should not remain", + Decision.Type.NOT_PREFERRED, + writeLoadDecider.canRemain( + testHarness.clusterState.metadata().getProject().index(indexName), + testHarness.shardRoutingOnNodeAboveQueueThreshold, + testHarness.aboveQueuingThresholdRoutingNode, + testHarness.routingAllocation + ).type() + ); + assertEquals( + "A shard without write load should remain on a node with queuing above the threshold", + Decision.Type.YES, + writeLoadDecider.canRemain( + testHarness.clusterState.metadata().getProject().index(indexName), + testHarness.shardRoutingNoWriteLoad, + testHarness.aboveQueuingThresholdRoutingNode, + testHarness.routingAllocation + ).type() ); } @@ -178,9 +259,13 @@ private record TestHarness( RoutingNode exceedingThresholdRoutingNode, RoutingNode belowThresholdRoutingNode, RoutingNode nearThresholdRoutingNode, - ShardRouting shardRouting1, - ShardRouting shardRouting2, - ShardRouting thirdRoutingNoWriteLoad, + RoutingNode belowQueuingThresholdRoutingNode, + RoutingNode aboveQueuingThresholdRoutingNode, + ShardRouting shardRoutingOnNodeExceedingUtilThreshold, + ShardRouting shardRoutingOnNodeBelowUtilThreshold, + ShardRouting shardRoutingNoWriteLoad, + ShardRouting shardRoutingOnNodeBelowQueueThreshold, + ShardRouting shardRoutingOnNodeAboveQueueThreshold, ShardRouting unassignedShardRouting ) {} @@ -192,9 +277,14 @@ private TestHarness createClusterStateAndRoutingAllocation(String indexName) { * Create the ClusterState for multiple nodes and multiple index shards. */ - ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas(new String[] { indexName }, 3, 1); - // The number of data nodes the util method above creates is numberOfReplicas+1, and three data nodes are needed for this test. - assertEquals(3, clusterState.nodes().size()); + int numberOfShards = 6; + ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas( + new String[] { indexName }, + numberOfShards, + 3 + ); + // The number of data nodes the util method above creates is numberOfReplicas+2, and five data nodes are needed for this test. + assertEquals(5, clusterState.nodes().size()); assertEquals(1, clusterState.metadata().getTotalNumberOfIndices()); /** @@ -208,6 +298,10 @@ private TestHarness createClusterStateAndRoutingAllocation(String indexName) { var belowThresholdDiscoveryNode2 = discoveryNodeIterator.next(); assertTrue(discoveryNodeIterator.hasNext()); var nearThresholdDiscoveryNode3 = discoveryNodeIterator.next(); + assertTrue(discoveryNodeIterator.hasNext()); + var queuingBelowThresholdDiscoveryNode4 = discoveryNodeIterator.next(); + assertTrue(discoveryNodeIterator.hasNext()); + var queuingAboveThresholdDiscoveryNode5 = discoveryNodeIterator.next(); assertFalse(discoveryNodeIterator.hasNext()); var indexIterator = clusterState.metadata().indicesAllProjects().iterator(); @@ -215,11 +309,13 @@ private TestHarness createClusterStateAndRoutingAllocation(String indexName) { IndexMetadata testIndexMetadata = indexIterator.next(); assertFalse(indexIterator.hasNext()); Index testIndex = testIndexMetadata.getIndex(); - assertEquals(3, testIndexMetadata.getNumberOfShards()); + assertEquals(numberOfShards, testIndexMetadata.getNumberOfShards()); ShardId testShardId1 = new ShardId(testIndex, 0); ShardId testShardId2 = new ShardId(testIndex, 1); ShardId testShardId3NoWriteLoad = new ShardId(testIndex, 2); - ShardId testShardId4Unassigned = new ShardId(testIndex, 3); + ShardId testShardId4 = new ShardId(testIndex, 3); + ShardId testShardId5 = new ShardId(testIndex, 4); + ShardId testShardId6Unassigned = new ShardId(testIndex, 5); /** * Create a ClusterInfo that includes the node and shard level write load estimates for a variety of node capacity situations. @@ -233,20 +329,36 @@ private TestHarness createClusterStateAndRoutingAllocation(String indexName) { ); var nodeThreadPoolStatsWithWriteBelowThreshold = createNodeUsageStatsForThreadPools(belowThresholdDiscoveryNode2, 8, 0.50f, 0); var nodeThreadPoolStatsWithWriteNearThreshold = createNodeUsageStatsForThreadPools(nearThresholdDiscoveryNode3, 8, 0.89f, 0); + var nodeThreadPoolStatsWithQueuingBelowThreshold = createNodeUsageStatsForThreadPools( + exceedingThresholdDiscoveryNode, + 8, + 0.99f, + 5_000 + ); + var nodeThreadPoolStatsWithQueuingAboveThreshold = createNodeUsageStatsForThreadPools( + exceedingThresholdDiscoveryNode, + 8, + 0.99f, + 15_000 + ); // Create a map of usage per node. var nodeIdToNodeUsageStatsForThreadPools = new HashMap(); nodeIdToNodeUsageStatsForThreadPools.put(exceedingThresholdDiscoveryNode.getId(), nodeThreadPoolStatsWithWriteExceedingThreshold); nodeIdToNodeUsageStatsForThreadPools.put(belowThresholdDiscoveryNode2.getId(), nodeThreadPoolStatsWithWriteBelowThreshold); nodeIdToNodeUsageStatsForThreadPools.put(nearThresholdDiscoveryNode3.getId(), nodeThreadPoolStatsWithWriteNearThreshold); + nodeIdToNodeUsageStatsForThreadPools.put(queuingBelowThresholdDiscoveryNode4.getId(), nodeThreadPoolStatsWithQueuingBelowThreshold); + nodeIdToNodeUsageStatsForThreadPools.put(queuingAboveThresholdDiscoveryNode5.getId(), nodeThreadPoolStatsWithQueuingAboveThreshold); // Create a map of usage per shard. var shardIdToWriteLoadEstimate = new HashMap(); shardIdToWriteLoadEstimate.put(testShardId1, 0.5); shardIdToWriteLoadEstimate.put(testShardId2, 0.5); shardIdToWriteLoadEstimate.put(testShardId3NoWriteLoad, 0d); + shardIdToWriteLoadEstimate.put(testShardId4, 0.5); + shardIdToWriteLoadEstimate.put(testShardId5, 0.5d); if (randomBoolean()) { - shardIdToWriteLoadEstimate.put(testShardId4Unassigned, randomDoubleBetween(0.0, 2.0, true)); + shardIdToWriteLoadEstimate.put(testShardId6Unassigned, randomDoubleBetween(0.0, 2.0, true)); } ClusterInfo clusterInfo = ClusterInfo.builder() @@ -267,29 +379,43 @@ private TestHarness createClusterStateAndRoutingAllocation(String indexName) { System.nanoTime() ); - ShardRouting shardRouting1 = TestShardRouting.newShardRouting( + ShardRouting shardRoutingOnNodeExceedingUtilThreshold = TestShardRouting.newShardRouting( testShardId1, exceedingThresholdDiscoveryNode.getId(), null, true, ShardRoutingState.STARTED ); - ShardRouting shardRouting2 = TestShardRouting.newShardRouting( + ShardRouting shardRoutingOnNodeBelowUtilThreshold = TestShardRouting.newShardRouting( testShardId2, belowThresholdDiscoveryNode2.getId(), null, true, ShardRoutingState.STARTED ); - ShardRouting thirdRoutingNoWriteLoad = TestShardRouting.newShardRouting( + ShardRouting shardRoutingNoWriteLoad = TestShardRouting.newShardRouting( testShardId3NoWriteLoad, belowThresholdDiscoveryNode2.getId(), null, true, ShardRoutingState.STARTED ); + ShardRouting shardRoutingOnNodeBelowQueueThreshold = TestShardRouting.newShardRouting( + testShardId4, + queuingBelowThresholdDiscoveryNode4.getId(), + null, + true, + ShardRoutingState.STARTED + ); + ShardRouting shardRoutingOnNodeAboveQueueThreshold = TestShardRouting.newShardRouting( + testShardId5, + queuingAboveThresholdDiscoveryNode5.getId(), + null, + true, + ShardRoutingState.STARTED + ); ShardRouting unassignedShardRouting = TestShardRouting.newShardRouting( - testShardId4Unassigned, + testShardId6Unassigned, null, true, ShardRoutingState.UNASSIGNED @@ -298,17 +424,27 @@ private TestHarness createClusterStateAndRoutingAllocation(String indexName) { RoutingNode exceedingThresholdRoutingNode = RoutingNodesHelper.routingNode( exceedingThresholdDiscoveryNode.getId(), exceedingThresholdDiscoveryNode, - shardRouting1 + shardRoutingOnNodeExceedingUtilThreshold ); RoutingNode belowThresholdRoutingNode = RoutingNodesHelper.routingNode( belowThresholdDiscoveryNode2.getId(), belowThresholdDiscoveryNode2, - shardRouting2 + shardRoutingOnNodeBelowUtilThreshold ); RoutingNode nearThresholdRoutingNode = RoutingNodesHelper.routingNode( nearThresholdDiscoveryNode3.getId(), nearThresholdDiscoveryNode3 ); + RoutingNode belowQueuingThresholdRoutingNode = RoutingNodesHelper.routingNode( + queuingBelowThresholdDiscoveryNode4.getId(), + queuingBelowThresholdDiscoveryNode4, + shardRoutingOnNodeBelowQueueThreshold + ); + RoutingNode aboveQueuingThresholdRoutingNode = RoutingNodesHelper.routingNode( + queuingAboveThresholdDiscoveryNode5.getId(), + queuingAboveThresholdDiscoveryNode5, + shardRoutingOnNodeAboveQueueThreshold + ); return new TestHarness( clusterState, @@ -316,9 +452,13 @@ private TestHarness createClusterStateAndRoutingAllocation(String indexName) { exceedingThresholdRoutingNode, belowThresholdRoutingNode, nearThresholdRoutingNode, - shardRouting1, - shardRouting2, - thirdRoutingNoWriteLoad, + belowQueuingThresholdRoutingNode, + aboveQueuingThresholdRoutingNode, + shardRoutingOnNodeExceedingUtilThreshold, + shardRoutingOnNodeBelowUtilThreshold, + shardRoutingNoWriteLoad, + shardRoutingOnNodeBelowQueueThreshold, + shardRoutingOnNodeAboveQueueThreshold, unassignedShardRouting ); }