-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Create an end-to-end IT test for WriteLoadConstraintDecider#canAllocate #133500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
elasticsearchmachine
merged 15 commits into
elastic:main
from
DiannaHohensee:2025/08/21/ES-12620-IT-Test
Aug 29, 2025
Merged
Changes from all commits
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
d4d5a85
Testing still needs to set up shard write load dummy values
DiannaHohensee d165802
finished test, not run yet
DiannaHohensee a02290f
debugging test
DiannaHohensee 699d6e2
test runs successfully, needs lots of cleanup
DiannaHohensee e5b4842
test fixed up
DiannaHohensee 5dfefcc
make map immutable so that the simulator copy-and-replace call doesn'…
DiannaHohensee a4c5d47
improve comment
DiannaHohensee abcc58e
undo public constructor
DiannaHohensee aa30fdb
remove empty line
DiannaHohensee 57312b2
Merge branch 'main' into 2025/08/21/ES-12620-IT-Test
DiannaHohensee bb30938
use helper for plugin addition
DiannaHohensee 274a164
most of the review changes
DiannaHohensee 5d3dd54
randomize utilization threshold setting
DiannaHohensee 352ddda
Merge branch 'main' into 2025/08/21/ES-12620-IT-Test
DiannaHohensee 02d1de1
Merge branch 'main' into 2025/08/21/ES-12620-IT-Test
DiannaHohensee File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
337 changes: 337 additions & 0 deletions
337
...va/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDeciderIT.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,337 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.routing.allocation.decider; | ||
|
||
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; | ||
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; | ||
import org.elasticsearch.action.admin.indices.stats.CommonStats; | ||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; | ||
import org.elasticsearch.action.admin.indices.stats.ShardStats; | ||
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; | ||
import org.elasticsearch.cluster.ClusterInfoService; | ||
import org.elasticsearch.cluster.ClusterInfoServiceUtils; | ||
import org.elasticsearch.cluster.InternalClusterInfoService; | ||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.routing.RecoverySource; | ||
import org.elasticsearch.cluster.routing.RoutingNodes; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
import org.elasticsearch.cluster.routing.UnassignedInfo; | ||
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.util.CollectionUtils; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.shard.DocsStats; | ||
import org.elasticsearch.index.shard.IndexingStats; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.shard.ShardPath; | ||
import org.elasticsearch.index.store.StoreStats; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.test.ClusterServiceUtils; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.elasticsearch.test.transport.MockTransportService; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.nio.file.Path; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; | ||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; | ||
|
||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase { | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> getMockPlugins() { | ||
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); | ||
} | ||
|
||
/** | ||
* Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the | ||
* balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. | ||
* | ||
* Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of | ||
* Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. | ||
*/ | ||
public void testHighNodeWriteLoadPreventsNewShardAllocation() { | ||
int randomUtilizationThresholdPercent = randomIntBetween(50, 100); | ||
Settings settings = Settings.builder() | ||
.put( | ||
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), | ||
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED | ||
) | ||
.put( | ||
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), | ||
randomUtilizationThresholdPercent + "%" | ||
) | ||
.build(); | ||
|
||
final String masterName = internalCluster().startMasterOnlyNode(settings); | ||
final var dataNodes = internalCluster().startDataOnlyNodes(3, settings); | ||
final String firstDataNodeName = dataNodes.get(0); | ||
final String secondDataNodeName = dataNodes.get(1); | ||
final String thirdDataNodeName = dataNodes.get(2); | ||
final String firstDataNodeId = getNodeId(firstDataNodeName); | ||
final String secondDataNodeId = getNodeId(secondDataNodeName); | ||
final String thirdDataNodeId = getNodeId(thirdDataNodeName); | ||
ensureStableCluster(4); | ||
|
||
logger.info( | ||
"---> first node name " | ||
+ firstDataNodeName | ||
+ " and ID " | ||
+ firstDataNodeId | ||
+ "; second node name " | ||
+ secondDataNodeName | ||
+ " and ID " | ||
+ secondDataNodeId | ||
+ "; third node name " | ||
+ thirdDataNodeName | ||
+ " and ID " | ||
+ thirdDataNodeId | ||
); | ||
|
||
/** | ||
* Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings. | ||
* Then create an index with many shards, which will all be assigned to the first data node. | ||
*/ | ||
|
||
logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes"); | ||
updateClusterSettings( | ||
Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName) | ||
); | ||
|
||
String indexName = randomIdentifier(); | ||
int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental. | ||
|
||
var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { | ||
var indexRoutingTable = clusterState.routingTable().index(indexName); | ||
if (indexRoutingTable == null) { | ||
return false; | ||
} | ||
return checkShardAssignment( | ||
clusterState.getRoutingNodes(), | ||
indexRoutingTable.getIndex(), | ||
firstDataNodeId, | ||
secondDataNodeId, | ||
thirdDataNodeId, | ||
randomNumberOfShards, | ||
0, | ||
0 | ||
); | ||
}); | ||
|
||
createIndex( | ||
indexName, | ||
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() | ||
); | ||
ensureGreen(indexName); | ||
|
||
logger.info("---> Waiting for all shards to be assigned to node " + firstDataNodeName); | ||
safeAwait(verifyAssignmentToFirstNodeListener); | ||
|
||
/** | ||
* Override the {@link TransportNodeUsageStatsForThreadPoolsAction} and {@link TransportIndicesStatsAction} actions on the data | ||
* nodes to supply artificial write load stats. The stats will show the third node hot-spotting, and that all shards have non-empty | ||
* write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). | ||
*/ | ||
|
||
final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); | ||
final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); | ||
final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); | ||
final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( | ||
firstDiscoveryNode, | ||
2, | ||
0.5f, | ||
0 | ||
); | ||
final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( | ||
secondDiscoveryNode, | ||
2, | ||
0.5f, | ||
0 | ||
); | ||
final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( | ||
thirdDiscoveryNode, | ||
2, | ||
randomUtilizationThresholdPercent + 1 / 100, | ||
0 | ||
); | ||
|
||
MockTransportService.getInstance(firstDataNodeName).<NodeUsageStatsForThreadPoolsAction.NodeRequest>addRequestHandlingBehavior( | ||
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
(handler, request, channel, task) -> channel.sendResponse( | ||
new NodeUsageStatsForThreadPoolsAction.NodeResponse(firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) | ||
) | ||
); | ||
MockTransportService.getInstance(secondDataNodeName) | ||
.addRequestHandlingBehavior( | ||
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
(handler, request, channel, task) -> channel.sendResponse( | ||
new NodeUsageStatsForThreadPoolsAction.NodeResponse(secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) | ||
) | ||
); | ||
MockTransportService.getInstance(thirdDataNodeName) | ||
.addRequestHandlingBehavior( | ||
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
(handler, request, channel, task) -> channel.sendResponse( | ||
new NodeUsageStatsForThreadPoolsAction.NodeResponse(thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) | ||
) | ||
); | ||
|
||
IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) | ||
.state() | ||
.getMetadata() | ||
.getProject() | ||
.index(indexName); | ||
double shardWriteLoadDefault = 0.2; | ||
MockTransportService.getInstance(firstDataNodeName) | ||
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { | ||
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); | ||
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { | ||
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId)); | ||
} | ||
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName); | ||
channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of())); | ||
}); | ||
MockTransportService.getInstance(secondDataNodeName) | ||
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { | ||
// Return no stats for the index because none are assigned to this node. | ||
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, secondDataNodeName); | ||
channel.sendResponse(instance.new NodeResponse(secondDataNodeId, 0, List.of(), List.of())); | ||
}); | ||
MockTransportService.getInstance(thirdDataNodeName) | ||
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { | ||
// Return no stats for the index because none are assigned to this node. | ||
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, thirdDataNodeName); | ||
channel.sendResponse(instance.new NodeResponse(thirdDataNodeId, 0, List.of(), List.of())); | ||
}); | ||
|
||
/** | ||
* Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and | ||
* initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the | ||
* second node, since the third is reporting as hot-spotted and should not accept any shards. | ||
*/ | ||
|
||
logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); | ||
final InternalClusterInfoService clusterInfoService = asInstanceOf( | ||
InternalClusterInfoService.class, | ||
internalCluster().getInstance(ClusterInfoService.class, masterName) | ||
); | ||
ClusterInfoServiceUtils.refresh(clusterInfoService); | ||
|
||
logger.info( | ||
"---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes" | ||
); | ||
// Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. | ||
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", firstDataNodeName)); | ||
|
||
safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { | ||
Index index = clusterState.routingTable().index(indexName).getIndex(); | ||
return checkShardAssignment( | ||
clusterState.getRoutingNodes(), | ||
index, | ||
firstDataNodeId, | ||
secondDataNodeId, | ||
thirdDataNodeId, | ||
0, | ||
randomNumberOfShards, | ||
0 | ||
); | ||
})); | ||
} | ||
|
||
/** | ||
* Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node. | ||
*/ | ||
private boolean checkShardAssignment( | ||
RoutingNodes routingNodes, | ||
Index index, | ||
String firstDataNodeId, | ||
String secondDataNodeId, | ||
String thirdDataNodeId, | ||
int firstDataNodeExpectedNumShards, | ||
int secondDataNodeExpectedNumShards, | ||
int thirdDataNodeExpectedNumShards | ||
) { | ||
|
||
int firstDataNodeRealNumberOfShards = routingNodes.node(firstDataNodeId).numberOfOwningShardsForIndex(index); | ||
if (firstDataNodeRealNumberOfShards != firstDataNodeExpectedNumShards) { | ||
return false; | ||
} | ||
int secondDataNodeRealNumberOfShards = routingNodes.node(secondDataNodeId).numberOfOwningShardsForIndex(index); | ||
if (secondDataNodeRealNumberOfShards != secondDataNodeExpectedNumShards) { | ||
return false; | ||
} | ||
int thirdDataNodeRealNumberOfShards = routingNodes.node(thirdDataNodeId).numberOfOwningShardsForIndex(index); | ||
if (thirdDataNodeRealNumberOfShards != thirdDataNodeExpectedNumShards) { | ||
return false; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
private DiscoveryNode getDiscoveryNode(String nodeName) { | ||
final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName); | ||
assertNotNull(transportService); | ||
return transportService.getLocalNode(); | ||
} | ||
|
||
/** | ||
* Helper to create a {@link NodeUsageStatsForThreadPools} for the given node with the given WRITE thread pool usage stats. | ||
*/ | ||
private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( | ||
DiscoveryNode discoveryNode, | ||
int totalWriteThreadPoolThreads, | ||
float averageWriteThreadPoolUtilization, | ||
long maxWriteThreadPoolQueueLatencyMillis | ||
) { | ||
var threadPoolUsageMap = Map.of( | ||
ThreadPool.Names.WRITE, | ||
new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||
totalWriteThreadPoolThreads, | ||
averageWriteThreadPoolUtilization, | ||
maxWriteThreadPoolQueueLatencyMillis | ||
) | ||
); | ||
|
||
return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); | ||
} | ||
|
||
/** | ||
* Helper to create a dummy {@link ShardStats} for the given index shard with the supplied {@code peakWriteLoad} value. | ||
*/ | ||
private static ShardStats createShardStats(IndexMetadata indexMeta, int shardIndex, double peakWriteLoad, String assignedShardNodeId) { | ||
ShardId shardId = new ShardId(indexMeta.getIndex(), shardIndex); | ||
Path path = createTempDir().resolve("indices").resolve(indexMeta.getIndexUUID()).resolve(String.valueOf(shardIndex)); | ||
ShardRouting shardRouting = ShardRouting.newUnassigned( | ||
shardId, | ||
true, | ||
RecoverySource.EmptyStoreRecoverySource.INSTANCE, | ||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null), | ||
ShardRouting.Role.DEFAULT | ||
); | ||
shardRouting = shardRouting.initialize(assignedShardNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); | ||
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); | ||
CommonStats stats = new CommonStats(); | ||
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes()); | ||
stats.store = new StoreStats(); | ||
stats.indexing = new IndexingStats( | ||
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, 234, 234, 1000, 0.123, peakWriteLoad) | ||
); | ||
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap) implements Writeable { | ||
|
||
public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { | ||
this(in.readString(), in.readMap(ThreadPoolUsageStats::new)); | ||
this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to change the NodeUsageStatsForThreadPool constructor to read into an immutable map because the ShardMovements*Simulator expects an immutable map. |
||
} | ||
|
||
@Override | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
peak
rather thanpeek
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed 👍