2828import org .elasticsearch .cluster .routing .ShardRouting ;
2929import org .elasticsearch .cluster .routing .UnassignedInfo ;
3030import org .elasticsearch .cluster .routing .allocation .WriteLoadConstraintSettings ;
31+ import org .elasticsearch .cluster .routing .allocation .allocator .BalancedShardsAllocator ;
3132import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceMetrics ;
3233import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceShardsAllocator ;
3334import org .elasticsearch .cluster .service .ClusterService ;
5455import java .nio .file .Path ;
5556import java .util .ArrayList ;
5657import java .util .Collection ;
58+ import java .util .Collections ;
5759import java .util .HashMap ;
5860import java .util .List ;
5961import java .util .Map ;
62+ import java .util .Set ;
6063import java .util .concurrent .CountDownLatch ;
64+ import java .util .stream .StreamSupport ;
6165
6266import static java .util .stream .IntStream .range ;
6367import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
6468import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
69+ import static org .hamcrest .Matchers .equalTo ;
6570import static org .hamcrest .Matchers .everyItem ;
6671import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
6772import static org .hamcrest .Matchers .hasSize ;
@@ -130,7 +135,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
130135 setUpMockTransportIndicesStatsResponse (
131136 harness .firstDiscoveryNode ,
132137 indexMetadata .getNumberOfShards (),
133- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
138+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
134139 );
135140 setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
136141 setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -235,7 +240,7 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() {
235240 setUpMockTransportIndicesStatsResponse (
236241 harness .firstDiscoveryNode ,
237242 indexMetadata .getNumberOfShards (),
238- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
243+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
239244 );
240245 setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
241246 setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -333,7 +338,7 @@ public void testCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferre
333338 setUpMockTransportIndicesStatsResponse (
334339 harness .firstDiscoveryNode ,
335340 indexMetadata .getNumberOfShards (),
336- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
341+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
337342 );
338343 setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
339344 setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -429,15 +434,12 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
429434 * will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node).
430435 */
431436
432- IndexMetadata indexMetadata = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
433- .state ()
434- .getMetadata ()
435- .getProject ()
436- .index (harness .indexName );
437+ final ClusterState originalClusterState = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).state ();
438+ final IndexMetadata indexMetadata = originalClusterState .getMetadata ().getProject ().index (harness .indexName );
437439 setUpMockTransportIndicesStatsResponse (
438440 harness .firstDiscoveryNode ,
439441 indexMetadata .getNumberOfShards (),
440- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
442+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
441443 );
442444 setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
443445 setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -483,6 +485,7 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
483485 harness .randomNumberOfShards ,
484486 countShardsStillAssignedToFirstNode + 1
485487 );
488+ assertThatTheBestShardWasMoved (harness , originalClusterState , desiredBalanceResponse );
486489 } catch (AssertionError error ) {
487490 ClusterState state = client ().admin ()
488491 .cluster ()
@@ -498,6 +501,36 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
498501 }
499502 }
500503
504+ /**
505+ * Determine which shard was moved and check that it's the "best" according to
506+ * {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator}
507+ */
508+ private void assertThatTheBestShardWasMoved (
509+ TestHarness harness ,
510+ ClusterState originalClusterState ,
511+ DesiredBalanceResponse desiredBalanceResponse
512+ ) {
513+ int movedShardId = desiredBalanceResponse .getRoutingTable ().get (harness .indexName ).entrySet ().stream ().filter (e -> {
514+ Set <String > desiredNodeIds = e .getValue ().desired ().nodeIds ();
515+ return desiredNodeIds .contains (harness .secondDiscoveryNode .getId ())
516+ || desiredNodeIds .contains (harness .thirdDiscoveryNode .getId ());
517+ }).findFirst ().map (Map .Entry ::getKey ).orElseThrow (() -> new AssertionError ("No shard was moved to a non-hot-spotting node" ));
518+
519+ final BalancedShardsAllocator .Balancer .PrioritiseByShardWriteLoadComparator comparator =
520+ new BalancedShardsAllocator .Balancer .PrioritiseByShardWriteLoadComparator (
521+ desiredBalanceResponse .getClusterInfo (),
522+ originalClusterState .getRoutingNodes ().node (harness .firstDataNodeId )
523+ );
524+
525+ final List <ShardRouting > bestShardsToMove = StreamSupport .stream (
526+ originalClusterState .getRoutingNodes ().node (harness .firstDataNodeId ).spliterator (),
527+ false
528+ ).sorted (comparator ).toList ();
529+
530+ // The moved shard should be at the head of the sorted list
531+ assertThat (movedShardId , equalTo (bestShardsToMove .get (0 ).shardId ().id ()));
532+ }
533+
501534 public void testMaxQueueLatencyMetricIsPublished () {
502535 final Settings settings = Settings .builder ()
503536 .put (
@@ -659,16 +692,35 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools(
659692 }
660693
661694 /**
662- * Helper to create a list of dummy {@link ShardStats} for the given index, each shard reporting a {@code peakShardWriteLoad} stat.
695+ * Helper to create a list of dummy {@link ShardStats} for the given index, each shard being randomly allocated a peak write load
696+ * between 0 and {@code maximumShardWriteLoad}. There will always be at least one shard reporting the specified
697+ * {@code maximumShardWriteLoad}.
663698 */
664699 private List <ShardStats > createShardStatsResponseForIndex (
665700 IndexMetadata indexMetadata ,
666- float peakShardWriteLoad ,
701+ float maximumShardWriteLoad ,
667702 String assignedShardNodeId
668703 ) {
669- List <ShardStats > shardStats = new ArrayList <>(indexMetadata .getNumberOfShards ());
704+ // Randomly distribute shards' peak write-loads so that we can check later that shard movements are prioritized correctly
705+ final double writeLoadThreshold = maximumShardWriteLoad
706+ * BalancedShardsAllocator .Balancer .PrioritiseByShardWriteLoadComparator .THRESHOLD_RATIO ;
707+ final List <Double > shardPeakWriteLoads = new ArrayList <>();
708+ // Need at least one with the maximum write-load
709+ shardPeakWriteLoads .add ((double ) maximumShardWriteLoad );
710+ final int remainingShards = indexMetadata .getNumberOfShards () - 1 ;
711+ // Some over-threshold, some under
712+ for (int i = 0 ; i < remainingShards ; ++i ) {
713+ if (randomBoolean ()) {
714+ shardPeakWriteLoads .add (randomDoubleBetween (writeLoadThreshold , maximumShardWriteLoad , true ));
715+ } else {
716+ shardPeakWriteLoads .add (randomDoubleBetween (0.0 , writeLoadThreshold , true ));
717+ }
718+ }
719+ assertThat (shardPeakWriteLoads , hasSize (indexMetadata .getNumberOfShards ()));
720+ Collections .shuffle (shardPeakWriteLoads , random ());
721+ final List <ShardStats > shardStats = new ArrayList <>(indexMetadata .getNumberOfShards ());
670722 for (int i = 0 ; i < indexMetadata .getNumberOfShards (); i ++) {
671- shardStats .add (createShardStats (indexMetadata , i , peakShardWriteLoad , assignedShardNodeId ));
723+ shardStats .add (createShardStats (indexMetadata , i , shardPeakWriteLoads . get ( i ) , assignedShardNodeId ));
672724 }
673725 return shardStats ;
674726 }
@@ -719,7 +771,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
719771 int randomUtilizationThresholdPercent = randomIntBetween (50 , 100 );
720772 int randomNumberOfWritePoolThreads = randomIntBetween (2 , 20 );
721773 long randomQueueLatencyThresholdMillis = randomLongBetween (1 , 20_000 );
722- float randomShardWriteLoad = randomFloatBetween (0.0f , 0.01f , false );
774+ float maximumShardWriteLoad = randomFloatBetween (0.0f , 0.01f , false );
723775 Settings settings = enabledWriteLoadDeciderSettings (randomUtilizationThresholdPercent , randomQueueLatencyThresholdMillis );
724776
725777 internalCluster ().startMasterOnlyNode (settings );
@@ -756,8 +808,8 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
756808 + randomUtilizationThresholdPercent
757809 + ", write threads: "
758810 + randomNumberOfWritePoolThreads
759- + ", individual shard write loads : "
760- + randomShardWriteLoad
811+ + ", maximum shard write load : "
812+ + maximumShardWriteLoad
761813 );
762814
763815 /**
@@ -775,7 +827,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
775827
776828 // Calculate the maximum utilization a node can report while still being able to accept all relocating shards
777829 int shardWriteLoadOverhead = shardLoadUtilizationOverhead (
778- randomShardWriteLoad * randomNumberOfShards ,
830+ maximumShardWriteLoad * randomNumberOfShards ,
779831 randomNumberOfWritePoolThreads
780832 );
781833 int maxUtilBelowThresholdThatAllowsAllShardsToRelocate = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1 ;
@@ -819,7 +871,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
819871 randomUtilizationThresholdPercent ,
820872 randomNumberOfWritePoolThreads ,
821873 randomQueueLatencyThresholdMillis ,
822- randomShardWriteLoad ,
874+ maximumShardWriteLoad ,
823875 indexName ,
824876 randomNumberOfShards ,
825877 maxUtilBelowThresholdThatAllowsAllShardsToRelocate
@@ -842,7 +894,7 @@ record TestHarness(
842894 int randomUtilizationThresholdPercent ,
843895 int randomNumberOfWritePoolThreads ,
844896 long randomQueueLatencyThresholdMillis ,
845- float randomShardWriteLoad ,
897+ float maxShardWriteLoad ,
846898 String indexName ,
847899 int randomNumberOfShards ,
848900 int maxUtilBelowThresholdThatAllowsAllShardsToRelocate
0 commit comments