28
28
import org .elasticsearch .cluster .routing .ShardRouting ;
29
29
import org .elasticsearch .cluster .routing .UnassignedInfo ;
30
30
import org .elasticsearch .cluster .routing .allocation .WriteLoadConstraintSettings ;
31
+ import org .elasticsearch .cluster .routing .allocation .allocator .BalancedShardsAllocator ;
31
32
import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceMetrics ;
32
33
import org .elasticsearch .cluster .routing .allocation .allocator .DesiredBalanceShardsAllocator ;
33
34
import org .elasticsearch .cluster .service .ClusterService ;
54
55
import java .nio .file .Path ;
55
56
import java .util .ArrayList ;
56
57
import java .util .Collection ;
58
+ import java .util .Collections ;
57
59
import java .util .HashMap ;
58
60
import java .util .List ;
59
61
import java .util .Map ;
62
+ import java .util .Set ;
60
63
import java .util .concurrent .CountDownLatch ;
64
+ import java .util .stream .StreamSupport ;
61
65
62
66
import static java .util .stream .IntStream .range ;
63
67
import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_REPLICAS ;
64
68
import static org .elasticsearch .cluster .metadata .IndexMetadata .SETTING_NUMBER_OF_SHARDS ;
69
+ import static org .hamcrest .Matchers .equalTo ;
65
70
import static org .hamcrest .Matchers .everyItem ;
66
71
import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
67
72
import static org .hamcrest .Matchers .hasSize ;
@@ -130,7 +135,7 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
130
135
setUpMockTransportIndicesStatsResponse (
131
136
harness .firstDiscoveryNode ,
132
137
indexMetadata .getNumberOfShards (),
133
- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
138
+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
134
139
);
135
140
setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
136
141
setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -235,7 +240,7 @@ public void testShardsAreAssignedToNotPreferredWhenAlternativeIsNo() {
235
240
setUpMockTransportIndicesStatsResponse (
236
241
harness .firstDiscoveryNode ,
237
242
indexMetadata .getNumberOfShards (),
238
- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
243
+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
239
244
);
240
245
setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
241
246
setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -333,7 +338,7 @@ public void testCanRemainNotPreferredIsIgnoredWhenAllOtherNodesReturnNotPreferre
333
338
setUpMockTransportIndicesStatsResponse (
334
339
harness .firstDiscoveryNode ,
335
340
indexMetadata .getNumberOfShards (),
336
- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
341
+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
337
342
);
338
343
setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
339
344
setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -429,15 +434,12 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
429
434
* will show that all shards have non-empty write load stats (so that the WriteLoadDecider will evaluate assigning them to a node).
430
435
*/
431
436
432
- IndexMetadata indexMetadata = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class )
433
- .state ()
434
- .getMetadata ()
435
- .getProject ()
436
- .index (harness .indexName );
437
+ final ClusterState originalClusterState = internalCluster ().getCurrentMasterNodeInstance (ClusterService .class ).state ();
438
+ final IndexMetadata indexMetadata = originalClusterState .getMetadata ().getProject ().index (harness .indexName );
437
439
setUpMockTransportIndicesStatsResponse (
438
440
harness .firstDiscoveryNode ,
439
441
indexMetadata .getNumberOfShards (),
440
- createShardStatsResponseForIndex (indexMetadata , harness .randomShardWriteLoad , harness .firstDataNodeId )
442
+ createShardStatsResponseForIndex (indexMetadata , harness .maxShardWriteLoad , harness .firstDataNodeId )
441
443
);
442
444
setUpMockTransportIndicesStatsResponse (harness .secondDiscoveryNode , 0 , List .of ());
443
445
setUpMockTransportIndicesStatsResponse (harness .thirdDiscoveryNode , 0 , List .of ());
@@ -483,6 +485,7 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
483
485
harness .randomNumberOfShards ,
484
486
countShardsStillAssignedToFirstNode + 1
485
487
);
488
+ assertThatTheBestShardWasMoved (harness , originalClusterState , desiredBalanceResponse );
486
489
} catch (AssertionError error ) {
487
490
ClusterState state = client ().admin ()
488
491
.cluster ()
@@ -498,6 +501,36 @@ public void testCanRemainRelocatesOneShardWhenAHotSpotOccurs() {
498
501
}
499
502
}
500
503
504
+ /**
505
+ * Determine which shard was moved and check that it's the "best" according to
506
+ * {@link org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer.PrioritiseByShardWriteLoadComparator}
507
+ */
508
+ private void assertThatTheBestShardWasMoved (
509
+ TestHarness harness ,
510
+ ClusterState originalClusterState ,
511
+ DesiredBalanceResponse desiredBalanceResponse
512
+ ) {
513
+ int movedShardId = desiredBalanceResponse .getRoutingTable ().get (harness .indexName ).entrySet ().stream ().filter (e -> {
514
+ Set <String > desiredNodeIds = e .getValue ().desired ().nodeIds ();
515
+ return desiredNodeIds .contains (harness .secondDiscoveryNode .getId ())
516
+ || desiredNodeIds .contains (harness .thirdDiscoveryNode .getId ());
517
+ }).findFirst ().map (Map .Entry ::getKey ).orElseThrow (() -> new AssertionError ("No shard was moved to a non-hot-spotting node" ));
518
+
519
+ final BalancedShardsAllocator .Balancer .PrioritiseByShardWriteLoadComparator comparator =
520
+ new BalancedShardsAllocator .Balancer .PrioritiseByShardWriteLoadComparator (
521
+ desiredBalanceResponse .getClusterInfo (),
522
+ originalClusterState .getRoutingNodes ().node (harness .firstDataNodeId )
523
+ );
524
+
525
+ final List <ShardRouting > bestShardsToMove = StreamSupport .stream (
526
+ originalClusterState .getRoutingNodes ().node (harness .firstDataNodeId ).spliterator (),
527
+ false
528
+ ).sorted (comparator ).toList ();
529
+
530
+ // The moved shard should be at the head of the sorted list
531
+ assertThat (movedShardId , equalTo (bestShardsToMove .get (0 ).shardId ().id ()));
532
+ }
533
+
501
534
public void testMaxQueueLatencyMetricIsPublished () {
502
535
final Settings settings = Settings .builder ()
503
536
.put (
@@ -659,16 +692,35 @@ private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools(
659
692
}
660
693
661
694
/**
662
- * Helper to create a list of dummy {@link ShardStats} for the given index, each shard reporting a {@code peakShardWriteLoad} stat.
695
+ * Helper to create a list of dummy {@link ShardStats} for the given index, each shard being randomly allocated a peak write load
696
+ * between 0 and {@code maximumShardWriteLoad}. There will always be at least one shard reporting the specified
697
+ * {@code maximumShardWriteLoad}.
663
698
*/
664
699
private List <ShardStats > createShardStatsResponseForIndex (
665
700
IndexMetadata indexMetadata ,
666
- float peakShardWriteLoad ,
701
+ float maximumShardWriteLoad ,
667
702
String assignedShardNodeId
668
703
) {
669
- List <ShardStats > shardStats = new ArrayList <>(indexMetadata .getNumberOfShards ());
704
+ // Randomly distribute shards' peak write-loads so that we can check later that shard movements are prioritized correctly
705
+ final double writeLoadThreshold = maximumShardWriteLoad
706
+ * BalancedShardsAllocator .Balancer .PrioritiseByShardWriteLoadComparator .THRESHOLD_RATIO ;
707
+ final List <Double > shardPeakWriteLoads = new ArrayList <>();
708
+ // Need at least one with the maximum write-load
709
+ shardPeakWriteLoads .add ((double ) maximumShardWriteLoad );
710
+ final int remainingShards = indexMetadata .getNumberOfShards () - 1 ;
711
+ // Some over-threshold, some under
712
+ for (int i = 0 ; i < remainingShards ; ++i ) {
713
+ if (randomBoolean ()) {
714
+ shardPeakWriteLoads .add (randomDoubleBetween (writeLoadThreshold , maximumShardWriteLoad , true ));
715
+ } else {
716
+ shardPeakWriteLoads .add (randomDoubleBetween (0.0 , writeLoadThreshold , true ));
717
+ }
718
+ }
719
+ assertThat (shardPeakWriteLoads , hasSize (indexMetadata .getNumberOfShards ()));
720
+ Collections .shuffle (shardPeakWriteLoads , random ());
721
+ final List <ShardStats > shardStats = new ArrayList <>(indexMetadata .getNumberOfShards ());
670
722
for (int i = 0 ; i < indexMetadata .getNumberOfShards (); i ++) {
671
- shardStats .add (createShardStats (indexMetadata , i , peakShardWriteLoad , assignedShardNodeId ));
723
+ shardStats .add (createShardStats (indexMetadata , i , shardPeakWriteLoads . get ( i ) , assignedShardNodeId ));
672
724
}
673
725
return shardStats ;
674
726
}
@@ -719,7 +771,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
719
771
int randomUtilizationThresholdPercent = randomIntBetween (50 , 100 );
720
772
int randomNumberOfWritePoolThreads = randomIntBetween (2 , 20 );
721
773
long randomQueueLatencyThresholdMillis = randomLongBetween (1 , 20_000 );
722
- float randomShardWriteLoad = randomFloatBetween (0.0f , 0.01f , false );
774
+ float maximumShardWriteLoad = randomFloatBetween (0.0f , 0.01f , false );
723
775
Settings settings = enabledWriteLoadDeciderSettings (randomUtilizationThresholdPercent , randomQueueLatencyThresholdMillis );
724
776
725
777
internalCluster ().startMasterOnlyNode (settings );
@@ -756,8 +808,8 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
756
808
+ randomUtilizationThresholdPercent
757
809
+ ", write threads: "
758
810
+ randomNumberOfWritePoolThreads
759
- + ", individual shard write loads : "
760
- + randomShardWriteLoad
811
+ + ", maximum shard write load : "
812
+ + maximumShardWriteLoad
761
813
);
762
814
763
815
/**
@@ -775,7 +827,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
775
827
776
828
// Calculate the maximum utilization a node can report while still being able to accept all relocating shards
777
829
int shardWriteLoadOverhead = shardLoadUtilizationOverhead (
778
- randomShardWriteLoad * randomNumberOfShards ,
830
+ maximumShardWriteLoad * randomNumberOfShards ,
779
831
randomNumberOfWritePoolThreads
780
832
);
781
833
int maxUtilBelowThresholdThatAllowsAllShardsToRelocate = randomUtilizationThresholdPercent - shardWriteLoadOverhead - 1 ;
@@ -819,7 +871,7 @@ private TestHarness setUpThreeTestNodesAndAllIndexShardsOnFirstNode() {
819
871
randomUtilizationThresholdPercent ,
820
872
randomNumberOfWritePoolThreads ,
821
873
randomQueueLatencyThresholdMillis ,
822
- randomShardWriteLoad ,
874
+ maximumShardWriteLoad ,
823
875
indexName ,
824
876
randomNumberOfShards ,
825
877
maxUtilBelowThresholdThatAllowsAllShardsToRelocate
@@ -842,7 +894,7 @@ record TestHarness(
842
894
int randomUtilizationThresholdPercent ,
843
895
int randomNumberOfWritePoolThreads ,
844
896
long randomQueueLatencyThresholdMillis ,
845
- float randomShardWriteLoad ,
897
+ float maxShardWriteLoad ,
846
898
String indexName ,
847
899
int randomNumberOfShards ,
848
900
int maxUtilBelowThresholdThatAllowsAllShardsToRelocate
0 commit comments