Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.RandomizedTimeSeriesIT
method: testGroupBySubset
issue: https://github.com/elastic/elasticsearch/issues/133220
- class: org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDeciderIT
method: testHighNodeWriteLoadPreventsNewShardAllocation
issue: https://github.com/elastic/elasticsearch/issues/133857
- class: org.elasticsearch.xpack.kql.parser.KqlParserBooleanQueryTests
method: testParseOrQuery
issue: https://github.com/elastic/elasticsearch/issues/133863
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator.calculateUtilizationForWriteLoad;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
Expand All @@ -68,6 +69,8 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
*/
public void testHighNodeWriteLoadPreventsNewShardAllocation() {
int randomUtilizationThresholdPercent = randomIntBetween(50, 100);
int numberOfWritePoolThreads = randomIntBetween(2, 20);
float shardWriteLoad = randomFloatBetween(0.0f, 0.01f, false);
Settings settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
Expand Down Expand Up @@ -115,7 +118,14 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
);

String indexName = randomIdentifier();
int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental.
int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental.

// Calculate the maximum utilization a node can report while still being able to accept all relocating shards
double additionalLoadFromAllShards = calculateUtilizationForWriteLoad(
shardWriteLoad * randomNumberOfShards,
numberOfWritePoolThreads
);
int maxUtilizationPercent = randomUtilizationThresholdPercent - (int) (additionalLoadFromAllShards * 100) - 1;

var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
var indexRoutingTable = clusterState.routingTable().index(indexName);
Expand Down Expand Up @@ -154,19 +164,19 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName);
final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
firstDiscoveryNode,
2,
0.5f,
numberOfWritePoolThreads,
randomIntBetween(0, maxUtilizationPercent) / 100f,
0
);
final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
secondDiscoveryNode,
2,
0.5f,
numberOfWritePoolThreads,
randomIntBetween(0, maxUtilizationPercent) / 100f,
0
);
final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
thirdDiscoveryNode,
2,
numberOfWritePoolThreads,
randomUtilizationThresholdPercent + 1 / 100,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be randomIntBetween(0, maxUtilizationPercent) / 100f, as well?

Copy link
Contributor Author

@nicktindall nicktindall Sep 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the test works by

  1. Create an index while cluster.routing.allocation.exclude._name = "secondDataNode, thirdDataNode" (all shards are assigned to firstDataNode)
  2. Mock the TransportNodeUsageStatsForThreadPoolsAction on the three data nodes such that the first and second nodes report being under the utilisation threshold, and the third node reports being over the utilisation threshold.
    • Also mock the shard write load responses (random between 0 and something, I don't think this is critical to the test)
  3. Refresh cluster info
  4. Update cluster.routing.allocation.exclude._name = "firstDataNode"
  5. Wait to see all the shards relocated to secondDataNode because first is excluded, and third is over the utilisation threshold.

So we have to have it over the threshold here.

0
);
Expand Down Expand Up @@ -197,12 +207,11 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
.getMetadata()
.getProject()
.index(indexName);
double shardWriteLoadDefault = 0.2;
MockTransportService.getInstance(firstDataNodeName)
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId));
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoad, firstDataNodeId));
}
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName);
channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,21 @@ public static float updateNodeUtilizationWithShardMovements(
float shardWriteLoadDelta,
int numberOfWriteThreads
) {
float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads);
float newNodeUtilization = nodeUtilization + calculateUtilizationForWriteLoad(shardWriteLoadDelta, numberOfWriteThreads);
return (float) Math.max(newNodeUtilization, 0.0);
}

/**
* Calculate what percentage utilization increase would result from adding some amount of write-load
*
* @param totalShardWriteLoad The write-load being added/removed
* @param numberOfThreads The number of threads in the node-being-added-to's write thread pool
* @return The change in percentage utilization
*/
public static float calculateUtilizationForWriteLoad(float totalShardWriteLoad, int numberOfThreads) {
return totalShardWriteLoad / numberOfThreads;
}

/**
* Adjust the max thread pool queue latency by accounting for whether shard has moved away from the node.
* @param maxThreadPoolQueueLatencyMillis The current max thread pool queue latency.
Expand Down