Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e508206
WriteLoadConstraintDecider PoC
DiannaHohensee Jul 28, 2025
72471c9
add test extension
DiannaHohensee Jul 28, 2025
4fdc73f
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Jul 29, 2025
121d4de
WIP testing -- can't run until internet returns
DiannaHohensee Jul 31, 2025
3dcb94d
wip
DiannaHohensee Jul 31, 2025
9102349
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 1, 2025
6d85655
log msg
DiannaHohensee Aug 1, 2025
64d6344
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 1, 2025
606cadc
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 11, 2025
68be3df
check that new shard assignment won't exceed threshold
DiannaHohensee Aug 11, 2025
6244ef8
cleanup
DiannaHohensee Aug 11, 2025
da441c3
[CI] Auto commit changes from spotless
Aug 11, 2025
7ce1e59
clear IT test for later
DiannaHohensee Aug 11, 2025
a6783a2
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 11, 2025
74b38e1
fix allocation decider call order
DiannaHohensee Aug 13, 2025
8b955e6
update WriteLoadDeciderStatus callers with enum helper methods
DiannaHohensee Aug 13, 2025
fc934dc
change debugMsg to explain, to reflect what that the output goes into…
DiannaHohensee Aug 13, 2025
a0e61e0
randomize testing to select ENABLED or LOW_THRESHOLD_ONLY
DiannaHohensee Aug 13, 2025
3014d88
modularize test set up
DiannaHohensee Aug 13, 2025
6197bcb
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 13, 2025
b896291
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 14, 2025
0824b59
improve explanation messages: limit decimal points and improve wording
DiannaHohensee Aug 14, 2025
2e6824f
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 14, 2025
85f4448
add explanation to canRemain method to fix testing
DiannaHohensee Aug 14, 2025
1e53258
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;

public class WriteLoadConstraintDeciderIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
}

// TODO: integration testing to see that the components work together.
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
Expand Down Expand Up @@ -453,6 +454,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new ThrottlingAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new ShardsLimitAllocationDecider(clusterSettings));
addAllocationDecider(deciders, new AwarenessAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new WriteLoadConstraintDecider(clusterSettings));

clusterPlugins.stream()
.flatMap(p -> p.createAllocationDeciders(settings, clusterSettings).stream())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,16 @@ public enum WriteLoadDeciderStatus {

WriteLoadDeciderStatus writeLoadDeciderStatus;
TimeValue writeLoadDeciderRerouteIntervalSetting;
double writeThreadPoolHighUtilizationThresholdSetting;

WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
public WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
this::setWriteThreadPoolHighUtilizationThresholdSetting
);

};

private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) {
Expand All @@ -120,7 +126,15 @@ public TimeValue getWriteLoadDeciderRerouteIntervalSetting() {
return this.writeLoadDeciderRerouteIntervalSetting;
}

public double getWriteThreadPoolHighUtilizationThresholdSetting() {
return this.writeThreadPoolHighUtilizationThresholdSetting;
}

private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) {
this.writeLoadDeciderRerouteIntervalSetting = timeValue;
}

private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) {
this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.threadpool.ThreadPool;

/**
* Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread
* pool usage stats and any candidate shard's write load estimate.
*/
public class WriteLoadConstraintDecider extends AllocationDecider {
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintDecider.class);

public static final String NAME = "write_load";

private final WriteLoadConstraintSettings writeLoadConstraintSettings;

public WriteLoadConstraintDecider(ClusterSettings clusterSettings) {
this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) {
return Decision.YES;
}

// Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion.
var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId());
if (shardWriteLoad == null || shardWriteLoad == 0) {
return Decision.YES;
}

var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId());
if (nodeUsageStatsForThreadPools == null) {
// No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle
// setting a node-level write load for this node after this shard is assigned.
return Decision.YES;
}

assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false;
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null;
var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting();
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
This one looks redundant after calculateShardMovementChange. If simulation fails this one will fail too. I don't think "overhead" of calculateShardMovementChange will be noticeable anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the next check would also catch this. It's a matter of explanation message for the NO decision, really, at this point. They could be combined; separately the messages can be clearer for the user to understand, I think.

Copy link
Contributor

@mhl-b mhl-b Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculateShardMovementChange already contains information about current thread pool utilization, so it's not hard to read that node is at high threshold before movement attempt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you're suggesting. Are you proposing to add logic to calculateShardMovementChange? If the threshold is already exceeded, the calculation adds on top of the value (exceeding threshold more), nothing need change there.

I like the clarity of the separate explain messages. Do you feel strongly about merging the two if statements?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly

// The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards.
logger.debug(
"The high utilization threshold of {} has already been reached on node {}. Cannot allocate shard {} to node {} "
+ "without risking increased write latencies.",
nodeWriteThreadPoolLoadThreshold,
node.nodeId(),
shardRouting.shardId(),
node.nodeId()
);
return Decision.NO;
}

return Decision.YES;
}

@Override
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) {
return Decision.YES;
}

return Decision.YES;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.common.settings.Settings;

public class WriteLoadConstraintDeciderTests extends ESAllocationTestCase {

public void testWriteLoadDeciderIsDisabled() {
// TODO
}

public void testShardWithNoWriteLoadEstimateIsAlwaysYES() {
Settings writeLoadConstraintSettings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
)
.build();
// TODO
}

public void testShardWithWriteLoadEstimate() {
// TODO: test successful re-assignment and rejected re-assignment due to threshold
}

}