|
13 | 13 | import org.apache.lucene.util.SetOnce; |
14 | 14 | import org.elasticsearch.action.ActionListener; |
15 | 15 | import org.elasticsearch.action.support.ActionTestUtils; |
| 16 | +import org.elasticsearch.action.support.PlainActionFuture; |
16 | 17 | import org.elasticsearch.cluster.ClusterInfo; |
17 | 18 | import org.elasticsearch.cluster.ClusterName; |
18 | 19 | import org.elasticsearch.cluster.ClusterState; |
|
56 | 57 | import org.elasticsearch.test.MockLog; |
57 | 58 | import org.elasticsearch.threadpool.TestThreadPool; |
58 | 59 |
|
| 60 | +import java.util.Collections; |
59 | 61 | import java.util.List; |
60 | 62 | import java.util.Map; |
61 | 63 | import java.util.Queue; |
|
78 | 80 | import static org.hamcrest.Matchers.equalTo; |
79 | 81 | import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
80 | 82 | import static org.hamcrest.Matchers.hasItem; |
| 83 | +import static org.hamcrest.Matchers.is; |
81 | 84 | import static org.hamcrest.Matchers.not; |
| 85 | +import static org.hamcrest.Matchers.sameInstance; |
82 | 86 |
|
83 | 87 | public class DesiredBalanceShardsAllocatorTests extends ESAllocationTestCase { |
84 | 88 |
|
@@ -906,6 +910,76 @@ public void resetDesiredBalance() { |
906 | 910 | } |
907 | 911 | } |
908 | 912 |
|
| 913 | + public void testNotReconcileEagerlyForEmptyRoutingTable() { |
| 914 | + final var threadPool = new TestThreadPool(getTestName()); |
| 915 | + final var clusterService = ClusterServiceUtils.createClusterService(ClusterState.EMPTY_STATE, threadPool); |
| 916 | + final var clusterSettings = createBuiltInClusterSettings(); |
| 917 | + final var shardsAllocator = createShardsAllocator(); |
| 918 | + final var reconciliationTaskSubmitted = new AtomicBoolean(); |
| 919 | + final var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator( |
| 920 | + shardsAllocator, |
| 921 | + threadPool, |
| 922 | + clusterService, |
| 923 | + new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) { |
| 924 | + @Override |
| 925 | + public DesiredBalance compute( |
| 926 | + DesiredBalance previousDesiredBalance, |
| 927 | + DesiredBalanceInput desiredBalanceInput, |
| 928 | + Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves, |
| 929 | + Predicate<DesiredBalanceInput> isFresh |
| 930 | + ) { |
| 931 | + assertThat(previousDesiredBalance, sameInstance(DesiredBalance.INITIAL)); |
| 932 | + return new DesiredBalance(desiredBalanceInput.index(), Map.of()); |
| 933 | + } |
| 934 | + }, |
| 935 | + (clusterState, rerouteStrategy) -> null, |
| 936 | + TelemetryProvider.NOOP |
| 937 | + ) { |
| 938 | + |
| 939 | + private ActionListener<Void> lastListener; |
| 940 | + |
| 941 | + @Override |
| 942 | + public void allocate(RoutingAllocation allocation, ActionListener<Void> listener) { |
| 943 | + lastListener = listener; |
| 944 | + super.allocate(allocation, listener); |
| 945 | + } |
| 946 | + |
| 947 | + @Override |
| 948 | + protected void reconcile(DesiredBalance desiredBalance, RoutingAllocation allocation) { |
| 949 | + fail("should not call reconcile"); |
| 950 | + } |
| 951 | + |
| 952 | + @Override |
| 953 | + protected void submitReconcileTask(DesiredBalance desiredBalance) { |
| 954 | + assertThat(desiredBalance.lastConvergedIndex(), equalTo(0L)); |
| 955 | + reconciliationTaskSubmitted.set(true); |
| 956 | + lastListener.onResponse(null); |
| 957 | + } |
| 958 | + }; |
| 959 | + assertThat(desiredBalanceShardsAllocator.getDesiredBalance(), sameInstance(DesiredBalance.INITIAL)); |
| 960 | + try { |
| 961 | + final PlainActionFuture<Void> future = new PlainActionFuture<>(); |
| 962 | + desiredBalanceShardsAllocator.allocate( |
| 963 | + new RoutingAllocation( |
| 964 | + new AllocationDeciders(Collections.emptyList()), |
| 965 | + clusterService.state(), |
| 966 | + null, |
| 967 | + null, |
| 968 | + randomNonNegativeLong() |
| 969 | + ), |
| 970 | + future |
| 971 | + ); |
| 972 | + safeGet(future); |
| 973 | + assertThat(desiredBalanceShardsAllocator.getStats().computationSubmitted(), equalTo(1L)); |
| 974 | + assertThat(desiredBalanceShardsAllocator.getStats().computationExecuted(), equalTo(1L)); |
| 975 | + assertThat(reconciliationTaskSubmitted.get(), is(true)); |
| 976 | + assertThat(desiredBalanceShardsAllocator.getDesiredBalance().lastConvergedIndex(), equalTo(0L)); |
| 977 | + } finally { |
| 978 | + clusterService.close(); |
| 979 | + terminate(threadPool); |
| 980 | + } |
| 981 | + } |
| 982 | + |
909 | 983 | private static IndexMetadata createIndex(String name) { |
910 | 984 | return IndexMetadata.builder(name).settings(indexSettings(IndexVersion.current(), 1, 0)).build(); |
911 | 985 | } |
|
0 commit comments