-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Create an end-to-end IT test for WriteLoadConstraintDecider#canAllocate #133500
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
d4d5a85
d165802
a02290f
699d6e2
e5b4842
5dfefcc
a4c5d47
abcc58e
aa30fdb
57312b2
bb30938
274a164
5d3dd54
352ddda
02d1de1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,340 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.routing.allocation.decider; | ||
|
||
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction; | ||
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction; | ||
import org.elasticsearch.action.admin.indices.stats.CommonStats; | ||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; | ||
import org.elasticsearch.action.admin.indices.stats.ShardStats; | ||
import org.elasticsearch.action.admin.indices.stats.TransportIndicesStatsAction; | ||
import org.elasticsearch.cluster.ClusterInfoService; | ||
import org.elasticsearch.cluster.ClusterInfoServiceUtils; | ||
import org.elasticsearch.cluster.InternalClusterInfoService; | ||
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.routing.RecoverySource; | ||
import org.elasticsearch.cluster.routing.RoutingNodes; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
import org.elasticsearch.cluster.routing.UnassignedInfo; | ||
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.index.Index; | ||
import org.elasticsearch.index.shard.DocsStats; | ||
import org.elasticsearch.index.shard.IndexingStats; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.shard.ShardPath; | ||
import org.elasticsearch.index.store.StoreStats; | ||
import org.elasticsearch.plugins.Plugin; | ||
import org.elasticsearch.test.ClusterServiceUtils; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
import org.elasticsearch.test.transport.MockTransportService; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
import org.elasticsearch.transport.TransportService; | ||
|
||
import java.nio.file.Path; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
|
||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; | ||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; | ||
|
||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) | ||
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase { | ||
@Override | ||
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { | ||
return Settings.builder() | ||
.put(super.nodeSettings(nodeOrdinal, otherSettings)) | ||
.put( | ||
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), | ||
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED | ||
) | ||
.build(); | ||
} | ||
|
||
@Override | ||
protected Collection<Class<? extends Plugin>> getMockPlugins() { | ||
final Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.getMockPlugins()); | ||
plugins.add(MockTransportService.TestPlugin.class); | ||
return plugins; | ||
|
||
} | ||
|
||
/** | ||
* Uses MockTransportService to set up write load stat responses from the data nodes and tests the allocation decisions made by the | ||
* balancer, specifically the effect of the {@link WriteLoadConstraintDecider}. | ||
* | ||
* Leverages the {@link FilterAllocationDecider} to first start all shards on a Node1, and then eventually force the shards off of | ||
* Node1 while Node3 is hot-spotting, resulting in reassignment of all shards to Node2. | ||
*/ | ||
public void testHighNodeWriteLoadPreventsNewShardAllocation() { | ||
final String masterName = internalCluster().startMasterOnlyNode(); | ||
final var dataNodes = internalCluster().startDataOnlyNodes(3); | ||
final String firstDataNodeName = dataNodes.get(0); | ||
final String secondDataNodeName = dataNodes.get(1); | ||
final String thirdDataNodeName = dataNodes.get(2); | ||
final String firstDataNodeId = getNodeId(firstDataNodeName); | ||
final String secondDataNodeId = getNodeId(secondDataNodeName); | ||
final String thirdDataNodeId = getNodeId(thirdDataNodeName); | ||
ensureStableCluster(4); | ||
|
||
logger.info( | ||
"---> first node name " | ||
+ firstDataNodeName | ||
+ " and ID " | ||
+ firstDataNodeId | ||
+ "; second node name " | ||
+ secondDataNodeName | ||
+ " and ID " | ||
+ secondDataNodeId | ||
+ "; third node name " | ||
+ thirdDataNodeName | ||
+ " and ID " | ||
+ thirdDataNodeId | ||
); | ||
|
||
/** | ||
* Exclude assignment of shards to the second and third data nodes via the {@link FilterAllocationDecider} settings. | ||
* Then create an index with many shards, which will all be assigned to the first data node. | ||
*/ | ||
|
||
logger.info("---> Limit shard assignment to node " + firstDataNodeName + " by excluding the other nodes"); | ||
updateClusterSettings( | ||
Settings.builder().put("cluster.routing.allocation.exclude._name", secondDataNodeName + "," + thirdDataNodeName) | ||
); | ||
|
||
String indexName = randomIdentifier(); | ||
int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental. | ||
|
||
var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { | ||
var indexRoutingTable = clusterState.routingTable().index(indexName); | ||
if (indexRoutingTable == null) { | ||
return false; | ||
} | ||
return checkShardAssignment( | ||
clusterState.getRoutingNodes(), | ||
indexRoutingTable.getIndex(), | ||
firstDataNodeId, | ||
secondDataNodeId, | ||
thirdDataNodeId, | ||
randomNumberOfShards, | ||
0, | ||
0 | ||
); | ||
}); | ||
|
||
createIndex( | ||
indexName, | ||
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, randomNumberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 0).build() | ||
); | ||
index(indexName, Integer.toString(randomInt(10)), Collections.singletonMap("foo", "bar")); | ||
|
||
ensureGreen(indexName); | ||
|
||
logger.info("---> Waiting for all shards to be assigned to node " + firstDataNodeName); | ||
safeAwait(verifyAssignmentToFirstNodeListener); | ||
|
||
/** | ||
* Override the {@link TransportNodeUsageStatsForThreadPoolsAction} and {@link TransportIndicesStatsAction} actions on the data | ||
* nodes to supply artificial write load stats. The stats will show the third node hot-spotting, and that all shards have non-empty | ||
* write load stats (so that the WriteLoadDecider will evaluate assigning them to a node). | ||
*/ | ||
|
||
final DiscoveryNode firstDiscoveryNode = getDiscoveryNode(firstDataNodeName); | ||
final DiscoveryNode secondDiscoveryNode = getDiscoveryNode(secondDataNodeName); | ||
final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName); | ||
final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( | ||
firstDiscoveryNode, | ||
2, | ||
0.5f, | ||
0 | ||
); | ||
final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools( | ||
secondDiscoveryNode, | ||
2, | ||
0.5f, | ||
0 | ||
); | ||
final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools( | ||
thirdDiscoveryNode, | ||
2, | ||
1.00f, | ||
0 | ||
); | ||
|
||
MockTransportService.getInstance(firstDataNodeName).<NodeUsageStatsForThreadPoolsAction.NodeRequest>addRequestHandlingBehavior( | ||
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
(handler, request, channel, task) -> channel.sendResponse( | ||
new NodeUsageStatsForThreadPoolsAction.NodeResponse(firstDiscoveryNode, firstNodeNonHotSpottingNodeStats) | ||
) | ||
); | ||
MockTransportService.getInstance(secondDataNodeName) | ||
.addRequestHandlingBehavior( | ||
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
(handler, request, channel, task) -> channel.sendResponse( | ||
new NodeUsageStatsForThreadPoolsAction.NodeResponse(secondDiscoveryNode, secondNodeNonHotSpottingNodeStats) | ||
) | ||
); | ||
MockTransportService.getInstance(thirdDataNodeName) | ||
.addRequestHandlingBehavior( | ||
TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", | ||
(handler, request, channel, task) -> channel.sendResponse( | ||
new NodeUsageStatsForThreadPoolsAction.NodeResponse(thirdDiscoveryNode, thirdNodeHotSpottingNodeStats) | ||
) | ||
); | ||
|
||
IndexMetadata indexMetadata = internalCluster().getCurrentMasterNodeInstance(ClusterService.class) | ||
.state() | ||
.getMetadata() | ||
.getProject() | ||
.index(indexName); | ||
double shardWriteLoadDefault = 0.2; | ||
MockTransportService.getInstance(firstDataNodeName) | ||
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { | ||
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards()); | ||
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) { | ||
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId)); | ||
} | ||
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName); | ||
channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of())); | ||
}); | ||
MockTransportService.getInstance(secondDataNodeName) | ||
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { | ||
// Return no stats for the index because none are assigned to this node. | ||
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName); | ||
|
||
channel.sendResponse(instance.new NodeResponse(secondDataNodeId, 0, List.of(), List.of())); | ||
}); | ||
MockTransportService.getInstance(thirdDataNodeName) | ||
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> { | ||
// Return no stats for the index because none are assigned to this node. | ||
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName); | ||
channel.sendResponse(instance.new NodeResponse(thirdDataNodeId, 0, List.of(), List.of())); | ||
}); | ||
|
||
/** | ||
* Provoke a ClusterInfo stats refresh, update the cluster settings to make shard assignment to the first node undesired, and | ||
* initiate rebalancing via a reroute request. Then wait to see a cluster state update that has all the shards assigned to the | ||
* second node, since the third is reporting as hot-spotted and should not accept any shards. | ||
*/ | ||
|
||
logger.info("---> Refreshing the cluster info to pull in the dummy thread pool stats with a hot-spotting node"); | ||
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) internalCluster().getInstance( | ||
|
||
ClusterInfoService.class, | ||
masterName | ||
); | ||
ClusterInfoServiceUtils.refresh(clusterInfoService); | ||
|
||
logger.info( | ||
"---> Update the filter to exclude " + firstDataNodeName + " so that shards will be reassigned away to the other nodes" | ||
); | ||
// Updating the cluster settings will trigger a reroute request, no need to explicitly request one in the test. | ||
updateClusterSettings(Settings.builder().put("cluster.routing.allocation.exclude._name", firstDataNodeName)); | ||
|
||
safeAwait(ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> { | ||
Index index = clusterState.routingTable().index(indexName).getIndex(); | ||
return checkShardAssignment( | ||
clusterState.getRoutingNodes(), | ||
index, | ||
firstDataNodeId, | ||
secondDataNodeId, | ||
thirdDataNodeId, | ||
0, | ||
randomNumberOfShards, | ||
0 | ||
); | ||
})); | ||
} | ||
|
||
/** | ||
* Verifies that the {@link RoutingNodes} shows that the expected portion of an index's shards are assigned to each node. | ||
*/ | ||
private boolean checkShardAssignment( | ||
RoutingNodes routingNodes, | ||
Index index, | ||
String firstDataNodeId, | ||
String secondDataNodeId, | ||
String thirdDataNodeId, | ||
int firstDataNodeExpectedNumShards, | ||
int secondDataNodeExpectedNumShards, | ||
int thirdDataNodeExpectedNumShards | ||
) { | ||
|
||
int firstDataNodeRealNumberOfShards = routingNodes.node(firstDataNodeId).numberOfOwningShardsForIndex(index); | ||
if (firstDataNodeRealNumberOfShards != firstDataNodeExpectedNumShards) { | ||
return false; | ||
} | ||
int secondDataNodeRealNumberOfShards = routingNodes.node(secondDataNodeId).numberOfOwningShardsForIndex(index); | ||
if (secondDataNodeRealNumberOfShards != secondDataNodeExpectedNumShards) { | ||
return false; | ||
} | ||
int thirdDataNodeRealNumberOfShards = routingNodes.node(thirdDataNodeId).numberOfOwningShardsForIndex(index); | ||
if (thirdDataNodeRealNumberOfShards != thirdDataNodeExpectedNumShards) { | ||
return false; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
private DiscoveryNode getDiscoveryNode(String nodeName) { | ||
final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName); | ||
assertNotNull(transportService); | ||
return transportService.getLocalNode(); | ||
} | ||
|
||
/** | ||
* Helper to create a {@link NodeUsageStatsForThreadPools} for the given node with the given WRITE thread pool usage stats. | ||
*/ | ||
private NodeUsageStatsForThreadPools createNodeUsageStatsForThreadPools( | ||
DiscoveryNode discoveryNode, | ||
int totalWriteThreadPoolThreads, | ||
float averageWriteThreadPoolUtilization, | ||
long averageWriteThreadPoolQueueLatencyMillis | ||
|
||
) { | ||
|
||
// Create thread pool usage stats map for node1. | ||
var writeThreadPoolUsageStats = new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( | ||
totalWriteThreadPoolThreads, | ||
averageWriteThreadPoolUtilization, | ||
averageWriteThreadPoolQueueLatencyMillis | ||
); | ||
var threadPoolUsageMap = new HashMap<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats>(); | ||
threadPoolUsageMap.put(ThreadPool.Names.WRITE, writeThreadPoolUsageStats); | ||
|
||
|
||
// Create the node's thread pool usage map | ||
return new NodeUsageStatsForThreadPools(discoveryNode.getId(), threadPoolUsageMap); | ||
} | ||
|
||
/** | ||
* Helper to create a dummy {@link ShardStats} for the given index shard with the supplied {@code peekWriteLoad} value. | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed 👍 |
||
private static ShardStats createShardStats(IndexMetadata indexMeta, int shardIndex, double peekWriteLoad, String assignedShardNodeId) { | ||
ShardId shardId = new ShardId(indexMeta.getIndex(), shardIndex); | ||
Path path = createTempDir().resolve("indices").resolve(indexMeta.getIndexUUID()).resolve(String.valueOf(shardIndex)); | ||
ShardRouting shardRouting = ShardRouting.newUnassigned( | ||
shardId, | ||
true, | ||
RecoverySource.EmptyStoreRecoverySource.INSTANCE, | ||
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null), | ||
ShardRouting.Role.DEFAULT | ||
); | ||
shardRouting = shardRouting.initialize(assignedShardNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); | ||
shardRouting = shardRouting.moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); | ||
CommonStats stats = new CommonStats(); | ||
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes()); | ||
stats.store = new StoreStats(); | ||
stats.indexing = new IndexingStats( | ||
new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, 234, 234, 1000, 0.123, peekWriteLoad) | ||
); | ||
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
public record NodeUsageStatsForThreadPools(String nodeId, Map<String, ThreadPoolUsageStats> threadPoolUsageStatsMap) implements Writeable { | ||
|
||
public NodeUsageStatsForThreadPools(StreamInput in) throws IOException { | ||
this(in.readString(), in.readMap(ThreadPoolUsageStats::new)); | ||
this(in.readString(), in.readImmutableMap(ThreadPoolUsageStats::new)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to change the NodeUsageStatsForThreadPool constructor to read into an immutable map because the ShardMovements*Simulator expects an immutable map. |
||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we set the write load threshold explicitly (and randomly) and calculate the above/below thresholds based on that below? We're relying on defaults as it stands and they might change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. IIUC, you're also proposing randomizing the node's reported thread pool utilization to somewhere above the random threshold? Updated as such. 5d3dd54