Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -519,9 +519,6 @@ tests:
- class: org.elasticsearch.xpack.esql.action.RandomizedTimeSeriesIT
method: testGroupBySubset
issue: https://github.com/elastic/elasticsearch/issues/133220
- class: org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDeciderIT
method: testHighNodeWriteLoadPreventsNewShardAllocation
issue: https://github.com/elastic/elasticsearch/issues/133857
- class: org.elasticsearch.xpack.kql.parser.KqlParserBooleanQueryTests
method: testParseOrQuery
issue: https://github.com/elastic/elasticsearch/issues/133863
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator.calculateUtilizationForWriteLoad;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
Expand All @@ -68,6 +69,8 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
*/
public void testHighNodeWriteLoadPreventsNewShardAllocation() {
int randomUtilizationThresholdPercent = randomIntBetween(50, 100);
int numberOfWritePoolThreads = randomIntBetween(2, 20);
float shardWriteLoad = randomFloatBetween(0.0f, 0.01f, false);
Settings settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
Expand Down Expand Up @@ -115,7 +118,14 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
);

String indexName = randomIdentifier();
int randomNumberOfShards = randomIntBetween(15, 40); // Pick a high number of shards, so it is clear assignment is not accidental.
int randomNumberOfShards = randomIntBetween(10, 20); // Pick a high number of shards, so it is clear assignment is not accidental.

// Calculate the maximum utilization a node can report while still being able to accept all relocating shards
double additionalLoadFromAllShards = calculateUtilizationForWriteLoad(
shardWriteLoad * randomNumberOfShards,
numberOfWritePoolThreads
);
int maxUtilizationPercent = randomUtilizationThresholdPercent - (int) (additionalLoadFromAllShards * 100) - 1;

var verifyAssignmentToFirstNodeListener = ClusterServiceUtils.addMasterTemporaryStateListener(clusterState -> {
var indexRoutingTable = clusterState.routingTable().index(indexName);
Expand Down Expand Up @@ -154,20 +164,20 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
final DiscoveryNode thirdDiscoveryNode = getDiscoveryNode(thirdDataNodeName);
final NodeUsageStatsForThreadPools firstNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
firstDiscoveryNode,
2,
0.5f,
numberOfWritePoolThreads,
randomIntBetween(0, maxUtilizationPercent) / 100f,
0
);
final NodeUsageStatsForThreadPools secondNodeNonHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
secondDiscoveryNode,
2,
0.5f,
numberOfWritePoolThreads,
randomIntBetween(0, maxUtilizationPercent) / 100f,
0
);
final NodeUsageStatsForThreadPools thirdNodeHotSpottingNodeStats = createNodeUsageStatsForThreadPools(
thirdDiscoveryNode,
2,
randomUtilizationThresholdPercent + 1 / 100,
numberOfWritePoolThreads,
(randomUtilizationThresholdPercent + 1) / 100f,
0
);

Expand Down Expand Up @@ -197,12 +207,11 @@ public void testHighNodeWriteLoadPreventsNewShardAllocation() {
.getMetadata()
.getProject()
.index(indexName);
double shardWriteLoadDefault = 0.2;
MockTransportService.getInstance(firstDataNodeName)
.addRequestHandlingBehavior(IndicesStatsAction.NAME + "[n]", (handler, request, channel, task) -> {
List<ShardStats> shardStats = new ArrayList<>(indexMetadata.getNumberOfShards());
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoadDefault, firstDataNodeId));
shardStats.add(createShardStats(indexMetadata, i, shardWriteLoad, firstDataNodeId));
}
TransportIndicesStatsAction instance = internalCluster().getInstance(TransportIndicesStatsAction.class, firstDataNodeName);
channel.sendResponse(instance.new NodeResponse(firstDataNodeId, indexMetadata.getNumberOfShards(), shardStats, List.of()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,21 @@ public static float updateNodeUtilizationWithShardMovements(
float shardWriteLoadDelta,
int numberOfWriteThreads
) {
float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads);
float newNodeUtilization = nodeUtilization + calculateUtilizationForWriteLoad(shardWriteLoadDelta, numberOfWriteThreads);
return (float) Math.max(newNodeUtilization, 0.0);
}

/**
* Calculate what percentage utilization increase would result from adding some amount of write-load
*
* @param totalShardWriteLoad The write-load being added/removed
* @param numberOfThreads The number of threads in the node-being-added-to's write thread pool
* @return The change in percentage utilization
*/
public static float calculateUtilizationForWriteLoad(float totalShardWriteLoad, int numberOfThreads) {
return totalShardWriteLoad / numberOfThreads;
}

/**
* Adjust the max thread pool queue latency by accounting for whether shard has moved away from the node.
* @param maxThreadPoolQueueLatencyMillis The current max thread pool queue latency.
Expand Down