Skip to content

Commit ae3134d

Browse files
Create an end-to-end IT test for WriteLoadConstraintDecider#canAllocate (#133500)
Closes ES-12620 ------
1 parent 36de99b commit ae3134d

File tree

2 files changed

+338
-1
lines changed

2 files changed

+338
-1
lines changed
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.decider;
11+
12+
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
13+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
14+
import org.elasticsearch.action.admin.indices.stats.CommonStats;
15+
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
16+
import org.elasticsearch.action.admin.indices.stats.ShardStats;
17+
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction;
18+
import org.elasticsearch.cluster.ClusterInfoService;
19+
import org.elasticsearch.cluster.ClusterInfoServiceUtils;
20+
import org.elasticsearch.cluster.InternalClusterInfoService;
21+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
22+
import org.elasticsearch.cluster.metadata.IndexMetadata;
23+
import org.elasticsearch.cluster.node.DiscoveryNode;
24+
import org.elasticsearch.cluster.routing.RecoverySource;
25+
import org.elasticsearch.cluster.routing.RoutingNodes;
26+
import org.elasticsearch.cluster.routing.ShardRouting;
27+
import org.elasticsearch.cluster.routing.UnassignedInfo;
28+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
29+
import org.elasticsearch.cluster.service.ClusterService;
30+
import org.elasticsearch.common.settings.Settings;
31+
import org.elasticsearch.common.util.CollectionUtils;
32+
import org.elasticsearch.index.Index;
33+
import org.elasticsearch.index.shard.DocsStats;
34+
import org.elasticsearch.index.shard.IndexingStats;
35+
import org.elasticsearch.index.shard.ShardId;
36+
import org.elasticsearch.index.shard.ShardPath;
37+
import org.elasticsearch.index.store.StoreStats;
38+
import org.elasticsearch.plugins.Plugin;
39+
import org.elasticsearch.test.ClusterServiceUtils;
40+
import org.elasticsearch.test.ESIntegTestCase;
41+
import org.elasticsearch.test.transport.MockTransportService;
42+
import org.elasticsearch.threadpool.ThreadPool;
43+
import org.elasticsearch.transport.TransportService;
44+
45+
import java.nio.file.Path;
46+
import java.util.ArrayList;
47+
import java.util.Collection;
48+
import java.util.List;
49+
import java.util.Map;
50+
51+
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
52+
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
53+
54+
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
55+
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
56+
57+
@Override
58+
protected Collection<Class<? extends Plugin>> getMockPlugins() {
59+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
60+
}
61+
62+
/**
63+
* Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the
64+
* balancer, specifically the effect of the {@link WriteLoadConstraintDecider}.
65+
*
66+
* Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of
67+
* Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2.
68+
*/
69+
public void testHighNodeWriteLoadPreventsNewShardAllocation() {
70+
int randomUtilizationThresholdPercent = randomIntBetween(50, 100);
71+
Settings settings = Settings.builder()
72+
.put(
73+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
74+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
75+
)
76+
.put(
77+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(),
78+
randomUtilizationThresholdPercent + "%"
79+
)
80+
.build();
81+
82+
final String masterName = internalCluster().startMasterOnlyNode(settings);
83+
final var dataNodes = internalCluster().startDataOnlyNodes(3, settings);
84+
final String firstDataNodeName = dataNodes.get(0);
85+
final String secondDataNodeName = dataNodes.get(1);
86+
final String thirdDataNodeName = dataNodes.get(2);
87+
final String firstDataNodeId = getNodeId(firstDataNodeName);
88+
final String secondDataNodeId = getNodeId(secondDataNodeName);
89+
final String thirdDataNodeId = getNodeId(thirdDataNodeName);
90+
ensureStableCluster(4);
91+
92+
logger.info(
93+
"---> first node name "
94+
+ firstDataNodeName
95+
+ " and ID "
96+
+ firstDataNodeId
97+
+ "; second node name "
98+
+ secondDataNodeName
99+
+ " and ID "
100+
+ secondDataNodeId
101+
+ "; third node name "
102+
+ thirdDataNodeName
103+
+ " and ID "
104+
+ thirdDataNodeId
105+
);
106+
107+
/**
108+
* Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings.
109+
* Then create an index with many shards, which will all be assigned to the first data node.
110+
*/
111+
112+
logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes");
113+
updateClusterSettings(
114+
Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName)
115+
);
116+
117+
String indexName = randomIdentifier();
118+
int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental.
119+
120+
var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
121+
var indexRoutingTable = clusterState.routingTable().index(indexName);
122+
if (indexRoutingTable == null) {
123+
return false;
124+
}
125+
return checkShardAssignment(
126+
clusterState.getRoutingNodes(),
127+
indexRoutingTable.getIndex(),
128+
firstDataNodeId,
129+
secondDataNodeId,
130+
thirdDataNodeId,
131+
randomNumberOfShards,
132+
0,
133+
0
134+
);
135+
});
136+
137+
createIndex(
138+
indexName,
139+
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build()
140+
);
141+
ensureGreen(indexName);
142+
143+
logger.info("---> Waiting for all shards to be assigned to node " + firstDataNodeName);
144+
safeAwait(verifyAssignmentToFirstNodeListener);
145+
146+
/**
147+
* Override the {@link TransportNodeUsageStatsForThreadPoolsAction} and {@link TransportIndicesStatsAction} actions on the data
148+
* nodes to supply artificial write load stats. The stats will show the third node hot-spotting, and that all shards have non-empty
149+
* write load stats (so that the WriteLoadDecider will evaluate assigning them to a node).
150+
*/
151+
152+
final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName);
153+
final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName);
154+
final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName);
155+
final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
156+
firstDiscoveryNode,
157+
2,
158+
0.5f,
159+
0
160+
);
161+
final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
162+
secondDiscoveryNode,
163+
2,
164+
0.5f,
165+
0
166+
);
167+
final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
168+
thirdDiscoveryNode,
169+
2,
170+
randomUtilizationThresholdPercent + 1 / 100,
171+
0
172+
);
173+
174+
MockTransportService.getInstance(firstDataNodeName).<NodeUsageStatsForThreadPoolsAction.NodeRequest>addRequestHandlingBehavior(
175+
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
176+
(handler, request, channel, task) -> channel.sendResponse(
177+
new NodeUsageStatsForThreadPoolsAction.NodeResponse(firstDiscoveryNode, firstNodeNonHotSpottingNodeStats)
178+
)
179+
);
180+
MockTransportService.getInstance(secondDataNodeName)
181+
.addRequestHandlingBehavior(
182+
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
183+
(handler, request, channel, task) -> channel.sendResponse(
184+
new NodeUsageStatsForThreadPoolsAction.NodeResponse(secondDiscoveryNode, secondNodeNonHotSpottingNodeStats)
185+
)
186+
);
187+
MockTransportService.getInstance(thirdDataNodeName)
188+
.addRequestHandlingBehavior(
189+
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]",
190+
(handler, request, channel, task) -> channel.sendResponse(
191+
new NodeUsageStatsForThreadPoolsAction.NodeResponse(thirdDiscoveryNode, thirdNodeHotSpottingNodeStats)
192+
)
193+
);
194+
195+
IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
196+
.state()
197+
.getMetadata()
198+
.getProject()
199+
.index(indexName);
200+
double shardWriteLoadDefault = 0.2;
201+
MockTransportService.getInstance(firstDataNodeName)
202+
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
203+
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
204+
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
205+
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId));
206+
}
207+
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName);
208+
channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()));
209+
});
210+
MockTransportService.getInstance(secondDataNodeName)
211+
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
212+
// Return no stats for the index because none are assigned to this node.
213+
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, secondDataNodeName);
214+
channel.sendResponse(instance.new NodeResponse(secondDataNodeId, 0, List.of(), List.of()));
215+
});
216+
MockTransportService.getInstance(thirdDataNodeName)
217+
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
218+
// Return no stats for the index because none are assigned to this node.
219+
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, thirdDataNodeName);
220+
channel.sendResponse(instance.new NodeResponse(thirdDataNodeId, 0, List.of(), List.of()));
221+
});
222+
223+
/**
224+
* Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and
225+
* initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the
226+
* second node, since the third is reporting as hot-spotted and should not accept any shards.
227+
*/
228+
229+
logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node");
230+
final InternalClusterInfoService clusterInfoService = asInstanceOf(
231+
InternalClusterInfoService.class,
232+
internalCluster().getInstance(ClusterInfoService.class, masterName)
233+
);
234+
ClusterInfoServiceUtils.refresh(clusterInfoService);
235+
236+
logger.info(
237+
"---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes"
238+
);
239+
// Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test.
240+
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", firstDataNodeName));
241+
242+
safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
243+
Index index = clusterState.routingTable().index(indexName).getIndex();
244+
return checkShardAssignment(
245+
clusterState.getRoutingNodes(),
246+
index,
247+
firstDataNodeId,
248+
secondDataNodeId,
249+
thirdDataNodeId,
250+
0,
251+
randomNumberOfShards,
252+
0
253+
);
254+
}));
255+
}
256+
257+
/**
258+
* Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node.
259+
*/
260+
private boolean checkShardAssignment(
261+
RoutingNodes routingNodes,
262+
Index index,
263+
String firstDataNodeId,
264+
String secondDataNodeId,
265+
String thirdDataNodeId,
266+
int firstDataNodeExpectedNumShards,
267+
int secondDataNodeExpectedNumShards,
268+
int thirdDataNodeExpectedNumShards
269+
) {
270+
271+
int firstDataNodeRealNumberOfShards = routingNodes.node(firstDataNodeId).numberOfOwningShardsForIndex(index);
272+
if (firstDataNodeRealNumberOfShards != firstDataNodeExpectedNumShards) {
273+
return false;
274+
}
275+
int secondDataNodeRealNumberOfShards = routingNodes.node(secondDataNodeId).numberOfOwningShardsForIndex(index);
276+
if (secondDataNodeRealNumberOfShards != secondDataNodeExpectedNumShards) {
277+
return false;
278+
}
279+
int thirdDataNodeRealNumberOfShards = routingNodes.node(thirdDataNodeId).numberOfOwningShardsForIndex(index);
280+
if (thirdDataNodeRealNumberOfShards != thirdDataNodeExpectedNumShards) {
281+
return false;
282+
}
283+
284+
return true;
285+
}
286+
287+
private DiscoveryNode getDiscoveryNode(String nodeName) {
288+
final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName);
289+
assertNotNull(transportService);
290+
return transportService.getLocalNode();
291+
}
292+
293+
/**
294+
* Helper to create a {@link NodeUsageStatsForThreadPools} for the given node with the given WRITE thread pool usage stats.
295+
*/
296+
private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools(
297+
DiscoveryNode discoveryNode,
298+
int totalWriteThreadPoolThreads,
299+
float averageWriteThreadPoolUtilization,
300+
long maxWriteThreadPoolQueueLatencyMillis
301+
) {
302+
var threadPoolUsageMap = Map.of(
303+
ThreadPool.Names.WRITE,
304+
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
305+
totalWriteThreadPoolThreads,
306+
averageWriteThreadPoolUtilization,
307+
maxWriteThreadPoolQueueLatencyMillis
308+
)
309+
);
310+
311+
return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap);
312+
}
313+
314+
/**
315+
* Helper to create a dummy {@link ShardStats} for the given index shard with the supplied {@code peakWriteLoad} value.
316+
*/
317+
private static ShardStats createShardStats(IndexMetadata indexMeta, int shardIndex, double peakWriteLoad, String assignedShardNodeId) {
318+
ShardId shardId = new ShardId(indexMeta.getIndex(), shardIndex);
319+
Path path = createTempDir().resolve("indices").resolve(indexMeta.getIndexUUID()).resolve(String.valueOf(shardIndex));
320+
ShardRouting shardRouting = ShardRouting.newUnassigned(
321+
shardId,
322+
true,
323+
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
324+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null),
325+
ShardRouting.Role.DEFAULT
326+
);
327+
shardRouting = shardRouting.initialize(assignedShardNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
328+
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
329+
CommonStats stats = new CommonStats();
330+
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
331+
stats.store = new StoreStats();
332+
stats.indexing = new IndexingStats(
333+
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, 234, 234, 1000, 0.123, peakWriteLoad)
334+
);
335+
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
336+
}
337+
}

server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap) implements Writeable {
2828

2929
public NodeUsageStatsForThreadPools(StreamInput in) throws IOException {
30-
this(in.readString(), in.readMap(ThreadPoolUsageStats::new));
30+
this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new));
3131
}
3232

3333
@Override

0 commit comments

Comments
 (0)