35
35
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
36
36
import org .elasticsearch .cluster .routing .RecoverySource ;
37
37
import org .elasticsearch .cluster .routing .RoutingChangesObserver ;
38
+ import org .elasticsearch .cluster .routing .RoutingNode ;
38
39
import org .elasticsearch .cluster .routing .RoutingNodes ;
39
40
import org .elasticsearch .cluster .routing .RoutingTable ;
40
41
import org .elasticsearch .cluster .routing .ShardRouting ;
44
45
import org .elasticsearch .cluster .routing .allocation .RoutingAllocation ;
45
46
import org .elasticsearch .cluster .routing .allocation .ShardAllocationDecision ;
46
47
import org .elasticsearch .cluster .routing .allocation .command .MoveAllocationCommand ;
48
+ import org .elasticsearch .cluster .routing .allocation .decider .AllocationDecider ;
47
49
import org .elasticsearch .cluster .routing .allocation .decider .AllocationDeciders ;
50
+ import org .elasticsearch .cluster .routing .allocation .decider .Decision ;
48
51
import org .elasticsearch .cluster .routing .allocation .decider .ThrottlingAllocationDecider ;
49
52
import org .elasticsearch .common .Randomness ;
50
53
import org .elasticsearch .common .UUIDs ;
54
57
import org .elasticsearch .common .time .TimeProviderUtils ;
55
58
import org .elasticsearch .common .unit .ByteSizeValue ;
56
59
import org .elasticsearch .common .util .Maps ;
60
+ import org .elasticsearch .common .util .set .Sets ;
57
61
import org .elasticsearch .core .Strings ;
58
62
import org .elasticsearch .core .TimeValue ;
59
63
import org .elasticsearch .core .Tuple ;
99
103
import static org .hamcrest .Matchers .aMapWithSize ;
100
104
import static org .hamcrest .Matchers .allOf ;
101
105
import static org .hamcrest .Matchers .anyOf ;
106
+ import static org .hamcrest .Matchers .arrayWithSize ;
102
107
import static org .hamcrest .Matchers .equalTo ;
103
108
import static org .hamcrest .Matchers .everyItem ;
104
109
import static org .hamcrest .Matchers .hasEntry ;
@@ -566,7 +571,24 @@ public void testNoDataNodes() {
566
571
}
567
572
568
573
public void testAppliesMoveCommands () {
569
- var desiredBalanceComputer = createDesiredBalanceComputer ();
574
+ var desiredBalanceComputer = createDesiredBalanceComputer (new ShardsAllocator () {
575
+ @ Override
576
+ public void allocate (RoutingAllocation allocation ) {
577
+ // This runs after the move commands have been applied, we assert that the relocating shards caused by the move
578
+ // commands are all started by the simulation.
579
+ assertThat (
580
+ "unexpected relocating shards: " + allocation .routingNodes (),
581
+ allocation .routingNodes ().getRelocatingShardCount (),
582
+ equalTo (0 )
583
+ );
584
+ assertThat (allocation .routingNodes ().node ("node-2" ).started (), arrayWithSize (2 ));
585
+ }
586
+
587
+ @ Override
588
+ public ShardAllocationDecision decideShardAllocation (ShardRouting shard , RoutingAllocation allocation ) {
589
+ throw new AssertionError ("only used for allocation explain" );
590
+ }
591
+ });
570
592
var clusterState = createInitialClusterState (3 );
571
593
var index = clusterState .metadata ().getProject ().index (TEST_INDEX ).getIndex ();
572
594
@@ -578,23 +600,99 @@ public void testAppliesMoveCommands() {
578
600
}
579
601
clusterState = rebuildRoutingTable (clusterState , routingNodes );
580
602
603
+ final var dataNodeIds = clusterState .nodes ().getDataNodes ().keySet ();
604
+ for (var nodeId : List .of ("node-0" , "node-1" )) {
605
+ final var desiredBalanceInput = DesiredBalanceInput .create (
606
+ randomInt (),
607
+ new RoutingAllocation (new AllocationDeciders (List .of (new AllocationDecider () {
608
+ @ Override
609
+ public Decision canAllocate (ShardRouting shardRouting , RoutingNode node , RoutingAllocation allocation ) {
610
+ // Move command works every decision except NO
611
+ return randomFrom (Decision .YES , Decision .THROTTLE , Decision .NOT_PREFERRED );
612
+ }
613
+ })), clusterState , ClusterInfo .EMPTY , SnapshotShardSizeInfo .EMPTY , 0L )
614
+ );
615
+ var desiredBalance = desiredBalanceComputer .compute (
616
+ DesiredBalance .BECOME_MASTER_INITIAL ,
617
+ desiredBalanceInput ,
618
+ queue (
619
+ new MoveAllocationCommand (index .getName (), 0 , nodeId , "node-2" ),
620
+ new MoveAllocationCommand (index .getName (), 1 , nodeId , "node-2" )
621
+ ),
622
+ input -> true
623
+ );
624
+
625
+ final Set <String > expectedNodeIds = Sets .difference (dataNodeIds , Set .of (nodeId ));
626
+ assertDesiredAssignments (
627
+ desiredBalance ,
628
+ Map .of (
629
+ new ShardId (index , 0 ),
630
+ new ShardAssignment (expectedNodeIds , 2 , 0 , 0 ),
631
+ new ShardId (index , 1 ),
632
+ new ShardAssignment (expectedNodeIds , 2 , 0 , 0 )
633
+ )
634
+ );
635
+ }
636
+ }
637
+
638
+ public void testCannotApplyMoveCommand () {
639
+ var desiredBalanceComputer = createDesiredBalanceComputer (new ShardsAllocator () {
640
+ @ Override
641
+ public void allocate (RoutingAllocation allocation ) {
642
+ // This runs after the move commands have been executed and failed, we assert that no movement should be seen
643
+ // in the routing nodes.
644
+ assertThat (
645
+ "unexpected relocating shards: " + allocation .routingNodes (),
646
+ allocation .routingNodes ().getRelocatingShardCount (),
647
+ equalTo (0 )
648
+ );
649
+ assertThat (allocation .routingNodes ().node ("node-2" ).isEmpty (), equalTo (true ));
650
+ }
651
+
652
+ @ Override
653
+ public ShardAllocationDecision decideShardAllocation (ShardRouting shard , RoutingAllocation allocation ) {
654
+ throw new AssertionError ("only used for allocation explain" );
655
+ }
656
+ });
657
+ var clusterState = createInitialClusterState (3 );
658
+ var index = clusterState .metadata ().getProject ().index (TEST_INDEX ).getIndex ();
659
+
660
+ var changes = new RoutingChangesObserver .DelegatingRoutingChangesObserver ();
661
+ var routingNodes = clusterState .mutableRoutingNodes ();
662
+ for (var iterator = routingNodes .unassigned ().iterator (); iterator .hasNext ();) {
663
+ var shardRouting = iterator .next ();
664
+ routingNodes .startShard (iterator .initialize (shardRouting .primary () ? "node-0" : "node-1" , null , 0L , changes ), changes , 0L );
665
+ }
666
+ clusterState = rebuildRoutingTable (clusterState , routingNodes );
667
+
668
+ final var desiredBalanceInput = DesiredBalanceInput .create (
669
+ randomInt (),
670
+ new RoutingAllocation (new AllocationDeciders (List .of (new AllocationDecider () {
671
+ @ Override
672
+ public Decision canAllocate (ShardRouting shardRouting , RoutingNode node , RoutingAllocation allocation ) {
673
+ // Always return NO so that AllocationCommands will silently fail.
674
+ return Decision .NO ;
675
+ }
676
+ })), clusterState , ClusterInfo .EMPTY , SnapshotShardSizeInfo .EMPTY , 0L )
677
+ );
581
678
var desiredBalance = desiredBalanceComputer .compute (
582
679
DesiredBalance .BECOME_MASTER_INITIAL ,
583
- createInput ( clusterState ) ,
680
+ desiredBalanceInput ,
584
681
queue (
585
- new MoveAllocationCommand (index .getName (), 0 , "node-1" , "node-2" ),
586
- new MoveAllocationCommand (index .getName (), 1 , "node-1" , "node-2" )
682
+ new MoveAllocationCommand (index .getName (), 0 , randomFrom ( "node-0" , "node-1" ) , "node-2" ),
683
+ new MoveAllocationCommand (index .getName (), 1 , randomFrom ( "node-0" , "node-1" ) , "node-2" )
587
684
),
588
685
input -> true
589
686
);
590
687
688
+ final Set <String > expectedNodeIds = Set .of ("node-0" , "node-1" );
591
689
assertDesiredAssignments (
592
690
desiredBalance ,
593
691
Map .of (
594
692
new ShardId (index , 0 ),
595
- new ShardAssignment (Set . of ( "node-0" , "node-2" ) , 2 , 0 , 0 ),
693
+ new ShardAssignment (expectedNodeIds , 2 , 0 , 0 ),
596
694
new ShardId (index , 1 ),
597
- new ShardAssignment (Set . of ( "node-0" , "node-2" ) , 2 , 0 , 0 )
695
+ new ShardAssignment (expectedNodeIds , 2 , 0 , 0 )
598
696
)
599
697
);
600
698
}
0 commit comments