Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e508206
WriteLoadConstraintDecider PoC
DiannaHohensee Jul 28, 2025
72471c9
add test extension
DiannaHohensee Jul 28, 2025
4fdc73f
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Jul 29, 2025
121d4de
WIP testing -- can't run until internet returns
DiannaHohensee Jul 31, 2025
3dcb94d
wip
DiannaHohensee Jul 31, 2025
9102349
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 1, 2025
6d85655
log msg
DiannaHohensee Aug 1, 2025
64d6344
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 1, 2025
606cadc
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 11, 2025
68be3df
check that new shard assignment won't exceed threshold
DiannaHohensee Aug 11, 2025
6244ef8
cleanup
DiannaHohensee Aug 11, 2025
da441c3
[CI] Auto commit changes from spotless
Aug 11, 2025
7ce1e59
clear IT test for later
DiannaHohensee Aug 11, 2025
a6783a2
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 11, 2025
74b38e1
fix allocation decider call order
DiannaHohensee Aug 13, 2025
8b955e6
update WriteLoadDeciderStatus callers with enum helper methods
DiannaHohensee Aug 13, 2025
fc934dc
change debugMsg to explain, to reflect what that the output goes into…
DiannaHohensee Aug 13, 2025
a0e61e0
randomize testing to select ENABLED or LOW_THRESHOLD_ONLY
DiannaHohensee Aug 13, 2025
3014d88
modularize test set up
DiannaHohensee Aug 13, 2025
6197bcb
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 13, 2025
b896291
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 14, 2025
0824b59
improve explanation messages: limit decimal points and improve wording
DiannaHohensee Aug 14, 2025
2e6824f
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 14, 2025
85f4448
add explanation to canRemain method to fix testing
DiannaHohensee Aug 14, 2025
1e53258
Merge branch 'main' into 2025/07/28/write-load-decider
DiannaHohensee Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ public void testClusterInfoIncludesNodeUsageStatsForThreadPools() {
var settings = Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
randomBoolean()
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
)
// Manually control cluster info refreshes
.put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), "60m")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,9 @@ public void testNodeWriteLoadsArePresent() {
Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
randomBoolean()
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
)
.build()
);
Expand Down Expand Up @@ -376,7 +378,9 @@ public void testShardWriteLoadsArePresent() {
Settings.builder()
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
randomBoolean()
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
)
.build()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
Expand Down Expand Up @@ -446,6 +447,7 @@ public static Collection<AllocationDecider> createAllocationDeciders(
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());
addAllocationDecider(deciders, new RestoreInProgressAllocationDecider());
addAllocationDecider(deciders, new NodeShutdownAllocationDecider());
addAllocationDecider(deciders, new WriteLoadConstraintDecider(clusterSettings));
addAllocationDecider(deciders, new NodeReplacementAllocationDecider());
addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new SameShardAllocationDecider(clusterSettings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void execute() {
logger.trace("starting async refresh");

try (var ignoredRefs = fetchRefs) {
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED);
maybeFetchIndicesStats(diskThresholdEnabled || writeLoadConstraintEnabled.atLeastLowThresholdEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: alternatively we could have a method WriteLoadDeciderStatus#requiresShardLevelWriteLoads() (and requiresNodeLevelWriteLoads()) which would return true for LOW_THRESHOLD_ONLY and ENABLED but false for DISABLED.

It would read nicer if the writeLoadConstraintEnabled field was called writeLoadDeciderStatus if we went that way.

Don't feel strongly about this naming thing though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requiresShardLevelWriteLoads and requiresNodeLevelWriteLoads doesn't seem like the right split, as I understand it. I was imagining LOW as the best-effort hot-spot prevention (canAllocate) without hot-spot correction (canRemain), and fully enabled as including hot-spot correction. Both node and shard level stats are needed for prevention, to compare the shard move write load change with the node's overall write load.

I'll leave this as is until some follow up.

maybeFetchNodeStats(diskThresholdEnabled || estimatedHeapThresholdEnabled);
maybeFetchNodesEstimatedHeapUsage(estimatedHeapThresholdEnabled);
maybeFetchNodesUsageStatsForThreadPools(writeLoadConstraintEnabled);
Expand Down Expand Up @@ -262,7 +262,7 @@ private void maybeFetchNodesEstimatedHeapUsage(boolean shouldFetch) {
}

private void maybeFetchNodesUsageStatsForThreadPools(WriteLoadDeciderStatus writeLoadConstraintEnabled) {
if (writeLoadConstraintEnabled != WriteLoadDeciderStatus.DISABLED) {
if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodesUsageStatsForThreadPools();
}
Expand Down Expand Up @@ -313,7 +313,7 @@ private void fetchIndicesStats() {
// This returns the shard sizes on disk
indicesStatsRequest.store(true);
}
if (writeLoadConstraintEnabled == WriteLoadDeciderStatus.ENABLED) {
if (writeLoadConstraintEnabled.atLeastLowThresholdEnabled()) {
// This returns the shard write-loads
indicesStatsRequest.indexing(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,32 @@ private static NodeUsageStatsForThreadPools.ThreadPoolUsageStats replaceWritePoo
.get(ThreadPool.Names.WRITE);
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
writeThreadPoolStats.totalThreadPoolThreads(),
(float) Math.max(
(writeThreadPoolStats.averageThreadPoolUtilization() + (writeLoadDelta / writeThreadPoolStats.totalThreadPoolThreads())),
0.0
updateNodeUtilizationWithShardMovements(
writeThreadPoolStats.averageThreadPoolUtilization(),
(float) writeLoadDelta,
writeThreadPoolStats.totalThreadPoolThreads()
),
writeThreadPoolStats.maxThreadPoolQueueLatencyMillis()
);
}

/**
* The {@code nodeUtilization} is the average utilization per thread for some duration of time. The {@code shardWriteLoadDelta} is the
* sum of shards' total execution time. Dividing the shards total execution time by the number of threads provides the average
* utilization of each write thread for those shards. The change in shard load can then be added to the node utilization.
*
* @param nodeUtilization The current node-level write load percent utilization.
* @param shardWriteLoadDelta The change in shard(s) execution time across all threads. This can be positive or negative depending on
* whether shards were moved onto the node or off of the node.
* @param numberOfWriteThreads The number of threads available in the node's write thread pool.
* @return The new node-level write load percent utilization after adding the shard write load delta.
*/
public static float updateNodeUtilizationWithShardMovements(
float nodeUtilization,
float shardWriteLoadDelta,
int numberOfWriteThreads
) {
float newNodeUtilization = nodeUtilization + (shardWriteLoadDelta / numberOfWriteThreads);
return (float) Math.max(newNodeUtilization, 0.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,30 @@ public enum WriteLoadDeciderStatus {
*/
DISABLED,
/**
* Only the low-threshold is enabled (write-load will not trigger rebalance)
* Only the low write low threshold, to try to avoid allocating to a node exceeding
* {@link #WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING}. Write-load hot-spot will not trigger rebalancing.
*/
LOW_ONLY,
LOW_THRESHOLD_ONLY,
/**
* The decider is enabled
* All write load decider development work is turned on.
*/
ENABLED
ENABLED;

public boolean fullyEnabled() {
return this == ENABLED;
}

public boolean notFullyEnabled() {
return this != ENABLED;
}

public boolean atLeastLowThresholdEnabled() {
return this != DISABLED;
}

public boolean disabled() {
return this == DISABLED;
}
}

public static final Setting<WriteLoadDeciderStatus> WRITE_LOAD_DECIDER_ENABLED_SETTING = Setting.enumSetting(
Expand Down Expand Up @@ -102,10 +119,16 @@ public enum WriteLoadDeciderStatus {

WriteLoadDeciderStatus writeLoadDeciderStatus;
TimeValue writeLoadDeciderRerouteIntervalSetting;
double writeThreadPoolHighUtilizationThresholdSetting;

WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
public WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
this::setWriteThreadPoolHighUtilizationThresholdSetting
);

};

private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) {
Expand All @@ -120,7 +143,15 @@ public TimeValue getWriteLoadDeciderRerouteIntervalSetting() {
return this.writeLoadDeciderRerouteIntervalSetting;
}

public double getWriteThreadPoolHighUtilizationThresholdSetting() {
return this.writeThreadPoolHighUtilizationThresholdSetting;
}

private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) {
this.writeLoadDeciderRerouteIntervalSetting = timeValue;
}

private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) {
this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.routing.allocation.decider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools.ThreadPoolUsageStats;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardMovementWriteLoadSimulator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.threadpool.ThreadPool;

/**
* Decides whether shards can be allocated to cluster nodes, or can remain on cluster nodes, based on the target node's current write thread
* pool usage stats and any candidate shard's write load estimate.
*/
public class WriteLoadConstraintDecider extends AllocationDecider {
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintDecider.class);

public static final String NAME = "write_load";

private final WriteLoadConstraintSettings writeLoadConstraintSettings;

public WriteLoadConstraintDecider(ClusterSettings clusterSettings) {
this.writeLoadConstraintSettings = new WriteLoadConstraintSettings(clusterSettings);
}

@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().disabled()) {
return Decision.single(Decision.Type.YES, NAME, "Decider is disabled");
}

// Check whether the shard being relocated has any write load estimate. If it does not, then this decider has no opinion.
var allShardWriteLoads = allocation.clusterInfo().getShardWriteLoads();
var shardWriteLoad = allShardWriteLoads.get(shardRouting.shardId());
if (shardWriteLoad == null || shardWriteLoad == 0) {
return Decision.single(Decision.Type.YES, NAME, "Shard has no estimated write load. Decider takes no action.");
}

var allNodeUsageStats = allocation.clusterInfo().getNodeUsageStatsForThreadPools();
var nodeUsageStatsForThreadPools = allNodeUsageStats.get(node.nodeId());
if (nodeUsageStatsForThreadPools == null) {
// No node-level thread pool usage stats were reported for this node. Let's assume this is OK and that the simulator will handle
// setting a node-level write load for this node after this shard is assigned.
return Decision.single(Decision.Type.YES, NAME, "The node has no write load estimate. Decider takes no action.");
}

assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false;
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null;
var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting();
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
This one looks redundant after calculateShardMovementChange. If simulation fails this one will fail too. I don't think "overhead" of calculateShardMovementChange will be noticeable anywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the next check would also catch this. It's a matter of explanation message for the NO decision, really, at this point. They could be combined; separately the messages can be clearer for the user to understand, I think.

Copy link
Contributor

@mhl-b mhl-b Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculateShardMovementChange already contains information about current thread pool utilization, so it's not hard to read that node is at high threshold before movement attempt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you're suggesting. Are you proposing to add logic to calculateShardMovementChange? If the threshold is already exceeded, the calculation adds on top of the value (exceeding threshold more), nothing need change there.

I like the clarity of the separate explain messages. Do you feel strongly about merging the two if statements?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly

// The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards.
String explain = Strings.format(
"Node [%s] with write thread pool utilization [%.2f] already exceeds the high utilization threshold of [%f]. Cannot "
+ "allocate shard [%s] to node without risking increased write latencies.",
node.nodeId(),
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
nodeWriteThreadPoolLoadThreshold,
shardRouting.shardId()
);
logger.debug(explain);
return Decision.single(Decision.Type.NO, NAME, explain);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should respond "not-preferred" instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was disagreement on implementing Decision#NOT_PREFERRED. So we're getting the basics in with NO, and I plan to explore the balancer and decision code in ES-11998 this sprint.

}

if (calculateShardMovementChange(nodeWriteThreadPoolStats, shardWriteLoad) >= nodeWriteThreadPoolLoadThreshold) {
// The node's write thread pool usage would be raised above the high utilization threshold with assignment of the new shard.
// This could lead to a hot spot on this node and is undesirable.
String explain = Strings.format(
"The high utilization threshold of [%f] would be exceeded on node [%s] with utilization [%.2f] if shard [%s] with "
+ "estimated additional utilisation [%.5f] (write load [%.5f] / threads [%d]) were assigned to it. Cannot allocate "
+ "shard to node without risking increased write latencies.",
nodeWriteThreadPoolLoadThreshold,
node.nodeId(),
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
shardRouting.shardId(),
shardWriteLoad / nodeWriteThreadPoolStats.totalThreadPoolThreads(),
shardWriteLoad,
nodeWriteThreadPoolStats.totalThreadPoolThreads()
);
logger.debug(explain);
return Decision.single(Decision.Type.NO, NAME, explain);
}

return Decision.YES;
}

@Override
public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) {
return Decision.single(Decision.Type.YES, NAME, "canRemain() is not enabled");
}

// TODO: implement

return Decision.single(Decision.Type.YES, NAME, "canRemain() is not yet implemented");
}

/**
* Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node.
* Returns the percent thread pool utilization change.
*/
private float calculateShardMovementChange(ThreadPoolUsageStats nodeWriteThreadPoolStats, double shardWriteLoad) {
assert shardWriteLoad > 0;
return ShardMovementWriteLoadSimulator.updateNodeUtilizationWithShardMovements(
nodeWriteThreadPoolStats.averageThreadPoolUtilization(),
(float) shardWriteLoad,
nodeWriteThreadPoolStats.totalThreadPoolThreads()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.WriteLoadConstraintDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
Expand Down Expand Up @@ -278,6 +279,7 @@ public void testAllocationDeciderOrder() {
SnapshotInProgressAllocationDecider.class,
RestoreInProgressAllocationDecider.class,
NodeShutdownAllocationDecider.class,
WriteLoadConstraintDecider.class,
NodeReplacementAllocationDecider.class,
FilterAllocationDecider.class,
SameShardAllocationDecider.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ public void testScheduling() {
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
randomBoolean()
? WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
: WriteLoadConstraintSettings.WriteLoadDeciderStatus.LOW_THRESHOLD_ONLY
);
if (randomBoolean()) {
settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms");
Expand Down
Loading