|
88 | 88 | import java.util.concurrent.atomic.AtomicReference; |
89 | 89 | import java.util.function.BiPredicate; |
90 | 90 | import java.util.function.Consumer; |
| 91 | +import java.util.function.Function; |
91 | 92 | import java.util.function.Predicate; |
92 | 93 | import java.util.stream.Collectors; |
93 | 94 | import java.util.stream.IntStream; |
|
107 | 108 | import static org.hamcrest.Matchers.notNullValue; |
108 | 109 | import static org.hamcrest.Matchers.nullValue; |
109 | 110 | import static org.hamcrest.Matchers.oneOf; |
| 111 | +import static org.junit.Assert.assertTrue; |
110 | 112 | import static org.mockito.Mockito.mock; |
111 | 113 | import static org.mockito.Mockito.when; |
112 | 114 |
|
@@ -982,6 +984,76 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocat |
982 | 984 | assertThat(shuttingDownState.getRoutingNodes().node("node-2").numberOfShardsWithState(ShardRoutingState.INITIALIZING), equalTo(1)); |
983 | 985 | } |
984 | 986 |
|
| 987 | + /** |
| 988 | + * Simulate many nodes leaving a cluster, ensure that |
| 989 | + * shards that are reallocated evenly among desired |
| 990 | + * candidates |
| 991 | + */ |
| 992 | + public void testShardsAreRelocatedEvenly() { |
| 993 | + final var numNodes = randomIntBetween(6, 10); |
| 994 | + final var numToRemain = randomIntBetween(2, numNodes - 1); |
| 995 | + final var discoveryNodes = discoveryNodes(numNodes); |
| 996 | + final var numberOfIndices = randomIntBetween(6, 20); |
| 997 | + |
| 998 | + final Metadata.Builder metadataBuilder = Metadata.builder(); |
| 999 | + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY); |
| 1000 | + for (int i = 0; i < numberOfIndices; i++) { |
| 1001 | + final String indexName = "index-" + i; |
| 1002 | + final IndexMetadata indexMetadata = randomPriorityIndex(indexName, 1, 0); |
| 1003 | + metadataBuilder.put(indexMetadata, true); |
| 1004 | + routingTableBuilder.addAsNew(indexMetadata); |
| 1005 | + } |
| 1006 | + |
| 1007 | + var clusterState = ClusterState.builder(ClusterName.DEFAULT) |
| 1008 | + .nodes(discoveryNodes) |
| 1009 | + .metadata(metadataBuilder.build()) |
| 1010 | + .routingTable(routingTableBuilder.build()) |
| 1011 | + .build(); |
| 1012 | + |
| 1013 | + Function<String, Integer> nodeOrdinal = (String nodeName) -> Integer.parseInt(nodeName.split("-")[1]); |
| 1014 | + |
| 1015 | + // Initially put all shards on the nodes that are going to leave the cluster |
| 1016 | + AtomicReference<DesiredBalance> db = new AtomicReference<>( |
| 1017 | + desiredBalance(clusterState, (shardId, nodeId) -> nodeOrdinal.apply(nodeId) >= numToRemain) |
| 1018 | + ); |
| 1019 | + |
| 1020 | + final var allocationService = createTestAllocationService(routingAllocation -> reconcile(routingAllocation, db.get())); |
| 1021 | + |
| 1022 | + clusterState = fullyReconcile(allocationService, clusterState); |
| 1023 | + logger.info("Initial state: {}", shardCounts(clusterState)); |
| 1024 | + |
| 1025 | + // Recalculate desired balance |
| 1026 | + db.set(desiredBalance(clusterState, (shardId, nodeId) -> nodeOrdinal.apply(nodeId) < numToRemain)); |
| 1027 | + |
| 1028 | + // Reconcile it |
| 1029 | + clusterState = fullyReconcile(allocationService, clusterState); |
| 1030 | + logger.info("State after shutdowns: {}", shardCounts(clusterState)); |
| 1031 | + |
| 1032 | + Map<String, Integer> allocationCounts = shardCounts(clusterState); |
| 1033 | + // All allocations should be on the remaining nodes |
| 1034 | + assertTrue(allocationCounts.keySet().stream().allMatch(nodeId -> nodeOrdinal.apply(nodeId) < numToRemain)); |
| 1035 | + // Allocations should be spread evenly amongst them |
| 1036 | + int minimumAllocationCount = allocationCounts.values().stream().min(Integer::compareTo).orElse(0); |
| 1037 | + int maximumAllocationCount = allocationCounts.values().stream().max(Integer::compareTo).orElse(Integer.MAX_VALUE); |
| 1038 | + assertThat(maximumAllocationCount - minimumAllocationCount, lessThanOrEqualTo(1)); |
| 1039 | + } |
| 1040 | + |
| 1041 | + private Map<String, Integer> shardCounts(ClusterState clusterState) { |
| 1042 | + Map<String, Integer> shardCounts = new HashMap<>(); |
| 1043 | + clusterState.routingTable().allShards().forEach(sr -> shardCounts.compute(sr.currentNodeId(), (v, e) -> e == null ? 0 : e + 1)); |
| 1044 | + return shardCounts; |
| 1045 | + } |
| 1046 | + |
| 1047 | + private static ClusterState fullyReconcile(AllocationService allocationService, ClusterState clusterState) { |
| 1048 | + boolean changed; |
| 1049 | + do { |
| 1050 | + final var newState = startInitializingShardsAndReroute(allocationService, clusterState); |
| 1051 | + changed = newState != clusterState; |
| 1052 | + clusterState = newState; |
| 1053 | + } while (changed); |
| 1054 | + return clusterState; |
| 1055 | + } |
| 1056 | + |
985 | 1057 | public void testRebalance() { |
986 | 1058 | final var discoveryNodes = discoveryNodes(4); |
987 | 1059 | final var metadata = Metadata.builder(); |
|
0 commit comments