Skip to content

Commit 4f63e10

Browse files
Integration tests for allocateUnassigned and moveShards (elastic#141636)
Co-authored-by: Dianna Hohensee <artemisapple@gmail.com>
1 parent 79cbd1d commit 4f63e10

File tree

3 files changed

+399
-0
lines changed

3 files changed

+399
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.action.support.SubscribableListener;
13+
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.ProjectId;
15+
import org.elasticsearch.cluster.node.DiscoveryNode;
16+
import org.elasticsearch.cluster.routing.RoutingNode;
17+
import org.elasticsearch.cluster.routing.ShardRouting;
18+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
19+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
20+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
21+
import org.elasticsearch.common.settings.ClusterSettings;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.common.util.CollectionUtils;
24+
import org.elasticsearch.core.Nullable;
25+
import org.elasticsearch.plugins.ClusterPlugin;
26+
import org.elasticsearch.plugins.Plugin;
27+
import org.elasticsearch.test.ESIntegTestCase;
28+
import org.junit.Before;
29+
30+
import java.util.Collection;
31+
import java.util.Collections;
32+
import java.util.HashSet;
33+
import java.util.List;
34+
import java.util.Set;
35+
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.atomic.AtomicReference;
37+
38+
import static org.elasticsearch.test.ClusterServiceUtils.addTemporaryStateListener;
39+
40+
/**
41+
* Supplies an {@link AllocationDecider} with settable sets of data nodes that return certain {@link Decision}s for
42+
* {@link AllocationDecider#canAllocate} and {@link AllocationDecider#canRemain}. Rebalancing in the integration tests
43+
* will be disabled because {@link AllocationDecider#canRebalance} is overridden: only the first two phases of the balancer,
44+
* allocating unassigned shards and moving shards that cannot remain, will run.
45+
*/
46+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
47+
public abstract class AbstractAllocationDecisionTestCase extends ESIntegTestCase {
48+
49+
protected static final Set<String> CAN_ALLOCATE_NOT_PREFERRED_NODE_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
50+
protected static final Set<String> CAN_ALLOCATE_THROTTLE_NODE_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
51+
protected static final Set<String> CAN_ALLOCATE_NO_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
52+
protected static final Set<String> CAN_REMAIN_NO_NODE_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
53+
protected static final Set<String> CAN_REMAIN_NOT_PREFERRED_NODE_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
54+
55+
@Before
56+
public final void clearDeciderState() {
57+
CAN_ALLOCATE_NOT_PREFERRED_NODE_IDS.clear();
58+
CAN_ALLOCATE_THROTTLE_NODE_IDS.clear();
59+
CAN_ALLOCATE_NO_IDS.clear();
60+
CAN_REMAIN_NO_NODE_IDS.clear();
61+
CAN_REMAIN_NOT_PREFERRED_NODE_IDS.clear();
62+
}
63+
64+
@Override
65+
protected Collection<Class<? extends Plugin>> nodePlugins() {
66+
return CollectionUtils.appendToCopy(super.nodePlugins(), TestAllocationPlugin.class);
67+
}
68+
69+
/**
70+
* Wait for the index shard to be allocated to a node.
71+
*
72+
* @param indexName The single-shard index to monitor
73+
* @return A {@link SubscribableListener} that will resolve with the node that the shard was allocated to
74+
*/
75+
protected SubscribableListener<DiscoveryNode> waitForAllocation(String indexName) {
76+
final var firstAllocationNode = new AtomicReference<DiscoveryNode>();
77+
return addTemporaryStateListener(internalCluster().clusterService(), state -> {
78+
final var index = state.routingTable(ProjectId.DEFAULT).index(indexName);
79+
if (index != null) {
80+
assert index.allShards().count() == 1 : "expected a single shard, got " + index.allShards().toList();
81+
final var shardRouting = index.shard(0).primaryShard();
82+
final var currentNodeId = shardRouting.currentNodeId();
83+
if (currentNodeId != null && shardRouting.started()) {
84+
final var node = state.nodes().get(currentNodeId);
85+
firstAllocationNode.set(node);
86+
return true;
87+
}
88+
}
89+
return false;
90+
}).andThenApply(v -> firstAllocationNode.get());
91+
}
92+
93+
/**
94+
* Ensure a shard is eventually allocated to one of a set of nodes
95+
*
96+
* @param indexName The single-shard index to monitor
97+
* @param expectedNodes The set of nodes that will constitute a successful allocation
98+
*/
99+
protected void ensureShardIsAllocatedToNodes(String indexName, Set<String> expectedNodes) {
100+
awaitClusterState(state -> {
101+
final var index = state.routingTable(ProjectId.DEFAULT).index(indexName);
102+
if (index != null) {
103+
assert index.allShards().count() == 1 : "expected a single shard, got " + index.allShards().toList();
104+
final var shardRouting = index.shard(0).primaryShard();
105+
final var currentNodeId = shardRouting.currentNodeId();
106+
if (currentNodeId != null && shardRouting.started()) {
107+
final var node = state.nodes().get(currentNodeId);
108+
return expectedNodes.contains(node.getName());
109+
}
110+
}
111+
return false;
112+
});
113+
}
114+
115+
/**
116+
* Data structure to hold the results of {@link #createNodes}, lists of which nodes return what {@link AllocationDecider#canAllocate}
117+
* {@link Decision}.
118+
*/
119+
protected record CreatedNodes(Set<String> noNodes, Set<String> notPreferredNodes, Set<String> throttleNodes, Set<String> yesNodes) {}
120+
121+
protected CreatedNodes createNodes(int noNodes, int notPreferredNodes, int throttleNodes, int yesNodes) {
122+
final var preExistingNodeCount = internalCluster().size();
123+
final var noNodeNames = new HashSet<String>(noNodes);
124+
final var notPreferredNodeNames = new HashSet<String>(notPreferredNodes);
125+
final var throttleNodeNames = new HashSet<String>(throttleNodes);
126+
final var yesNodeNames = new HashSet<String>(yesNodes);
127+
final int totalNodes = notPreferredNodes + noNodes + throttleNodes + yesNodes;
128+
final var allNodeNames = new HashSet<>(internalCluster().startNodes(totalNodes));
129+
allocateNodesAndUpdateSets(yesNodes, allNodeNames, null, yesNodeNames);
130+
allocateNodesAndUpdateSets(throttleNodes, allNodeNames, CAN_ALLOCATE_THROTTLE_NODE_IDS, throttleNodeNames);
131+
allocateNodesAndUpdateSets(notPreferredNodes, allNodeNames, CAN_ALLOCATE_NOT_PREFERRED_NODE_IDS, notPreferredNodeNames);
132+
allocateNodesAndUpdateSets(noNodes, allNodeNames, CAN_ALLOCATE_NO_IDS, noNodeNames);
133+
assert allNodeNames.isEmpty() : "all nodes should have been used: " + allNodeNames;
134+
ensureStableCluster(preExistingNodeCount + totalNodes);
135+
final var createdNodes = new CreatedNodes(noNodeNames, notPreferredNodeNames, throttleNodeNames, yesNodeNames);
136+
logger.info("--> created nodes {}", createdNodes);
137+
return createdNodes;
138+
}
139+
140+
private static void allocateNodesAndUpdateSets(
141+
int nodeCount,
142+
Set<String> allNodeNames,
143+
@Nullable Set<String> deciderSet,
144+
Set<String> nodeNameSet
145+
) {
146+
for (int i = 0; i < nodeCount; i++) {
147+
final var nodeName = randomFrom(allNodeNames);
148+
allNodeNames.remove(nodeName);
149+
if (deciderSet != null) {
150+
deciderSet.add(getNodeId(nodeName));
151+
}
152+
nodeNameSet.add(nodeName);
153+
}
154+
}
155+
156+
public static class TestAllocationPlugin extends Plugin implements ClusterPlugin {
157+
158+
@Override
159+
public Collection<AllocationDecider> createAllocationDeciders(Settings settings, ClusterSettings clusterSettings) {
160+
return List.of(new TestAllocationDecider());
161+
}
162+
}
163+
164+
public static class TestAllocationDecider extends AllocationDecider {
165+
166+
/**
167+
* These tests aren't about rebalancing, disable it so it doesn't interfere with the results
168+
*/
169+
@Override
170+
public Decision canRebalance(RoutingAllocation allocation) {
171+
return Decision.NO;
172+
}
173+
174+
@Override
175+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
176+
if (CAN_ALLOCATE_NO_IDS.contains(node.nodeId())) {
177+
return Decision.NO;
178+
}
179+
if (CAN_ALLOCATE_NOT_PREFERRED_NODE_IDS.contains(node.nodeId())) {
180+
return Decision.NOT_PREFERRED;
181+
}
182+
if (CAN_ALLOCATE_THROTTLE_NODE_IDS.contains(node.nodeId()) && allocation.isSimulating() == false) {
183+
return Decision.THROTTLE;
184+
}
185+
return Decision.YES;
186+
}
187+
188+
@Override
189+
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
190+
if (CAN_REMAIN_NO_NODE_IDS.contains(node.nodeId())) {
191+
return Decision.NO;
192+
} else if (CAN_REMAIN_NOT_PREFERRED_NODE_IDS.contains(node.nodeId())) {
193+
return Decision.NOT_PREFERRED;
194+
}
195+
return Decision.YES;
196+
}
197+
}
198+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
14+
15+
import java.util.Set;
16+
17+
import static org.hamcrest.Matchers.in;
18+
19+
public class AllocateUnassignedIT extends AbstractAllocationDecisionTestCase {
20+
21+
public void testNewShardsAreAllocatedToYesNodesWhenPresent() {
22+
final var nodes = createNodes(randomIntBetween(1, 3), randomIntBetween(1, 3), 0, randomIntBetween(1, 3));
23+
24+
createSingleShardAndAssertItIsAssignedToNodes(nodes.yesNodes());
25+
}
26+
27+
public void testNewShardsAreAllocatedToThrottleNodesWhenNoYesNodesArePresent() {
28+
final var nodes = createNodes(randomIntBetween(1, 3), randomIntBetween(1, 3), randomIntBetween(1, 3), 0);
29+
final var indexName = randomIdentifier();
30+
31+
final var createIndexFuture = prepareCreate(indexName).setSettings(indexSettings(1, 0)).execute();
32+
waitForIndexCreationThenReroute(indexName);
33+
34+
final var state = clusterService().state();
35+
final var index = state.routingTable(ProjectId.DEFAULT).index(indexName);
36+
37+
// No shards should be assigned (because we're waiting for the throttled nodes)
38+
assertEquals(0, index.shard(0).assignedShards().size());
39+
assertFalse(createIndexFuture.isDone());
40+
41+
final var firstAllocationListener = waitForAllocation(indexName);
42+
43+
// Un-throttle the nodes, re-route should see them allocated to one of the previously throttled nodes
44+
CAN_ALLOCATE_THROTTLE_NODE_IDS.clear();
45+
ClusterRerouteUtils.reroute(client());
46+
47+
final var firstAllocatedNode = safeAwait(firstAllocationListener);
48+
assertThat(firstAllocatedNode.getName(), in(nodes.throttleNodes()));
49+
safeGet(createIndexFuture);
50+
}
51+
52+
public void testNewShardsAreAllocatedToNotPreferredNodesWhenNoThrottleOrYesNodesArePresent() {
53+
final var nodes = createNodes(randomIntBetween(1, 3), randomIntBetween(1, 3), 0, 0);
54+
55+
createSingleShardAndAssertItIsAssignedToNodes(nodes.notPreferredNodes());
56+
}
57+
58+
public void testNewShardsAreNotAllocatedToNoNodes() {
59+
createNodes(randomIntBetween(1, 3), 0, 0, 0);
60+
61+
final var indexName = randomIdentifier();
62+
63+
final var createFuture = prepareCreate(indexName).setSettings(indexSettings(1, 0)).execute();
64+
waitForIndexCreationThenReroute(indexName);
65+
66+
final var state = clusterService().state();
67+
final var index = state.routingTable(ProjectId.DEFAULT).index(indexName);
68+
69+
// No shards should be assigned (because canAllocate is NO everywhere)
70+
assertEquals(0, index.shard(0).assignedShards().size());
71+
assertFalse(createFuture.isDone());
72+
}
73+
74+
/**
75+
* We want to know that the index has been created and a balancing round has been run. This method
76+
* waits to see the index appear in the cluster state, then calls re-route. The reason we don't just call
77+
* re-route is that it runs at a higher priority than the index creation and can jump the queue.
78+
* <p>
79+
* This method will block until the result of the re-route is published. Note that we don't use the cluster
80+
* state returned in the {@link org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse}
81+
* because that cluster state will be one prior to the re-route when the {@link DesiredBalanceShardsAllocator}
82+
* is in use.
83+
*/
84+
private void waitForIndexCreationThenReroute(String indexName) {
85+
awaitClusterState(clusterState -> clusterState.routingTable(ProjectId.DEFAULT).hasIndex(indexName));
86+
ClusterRerouteUtils.reroute(client());
87+
}
88+
89+
private void createSingleShardAndAssertItIsAssignedToNodes(Set<String> expectedNodeNames) {
90+
final var indexName = randomIdentifier();
91+
final var firstAllocationListener = waitForAllocation(indexName);
92+
93+
// The single-shard should be allocated to one of the expected nodes
94+
createIndex(indexName, 1, 0);
95+
96+
final var firstAllocatedNode = safeAwait(firstAllocationListener);
97+
assertThat(firstAllocatedNode.getName(), in(expectedNodeNames));
98+
}
99+
}

0 commit comments

Comments
 (0)