Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2dd9eac
First pass on WriteLoadConstraintMonitor
nicktindall Aug 14, 2025
471bc19
Fix disabled condition
nicktindall Aug 15, 2025
0e4db55
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 15, 2025
7ab66a7
Add tests
nicktindall Aug 18, 2025
018e100
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 18, 2025
6bd054f
equal to threshold is OK
nicktindall Aug 18, 2025
2484a77
More random
nicktindall Aug 18, 2025
899ba00
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 19, 2025
7049a5e
Store high utilization threshold as ratio
nicktindall Aug 19, 2025
c57935e
Change default queue latency threshold to 5s
nicktindall Aug 19, 2025
6971468
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Aug 19, 2025
d3308d2
Use notFullyEnabled
nicktindall Aug 19, 2025
b816a15
Include hot-spotted and under-threshold node IDs in reason, in-line c…
nicktindall Aug 19, 2025
8ad414b
Fix constraints
nicktindall Aug 19, 2025
62bdc83
Assert write thread pool stats are present
nicktindall Aug 19, 2025
a326745
Change debug message when no nodes exceeding threshold
nicktindall Aug 19, 2025
ee54418
Make over-latency-threshold and under-utilization threshold mutually …
nicktindall Aug 19, 2025
d8ff9e5
Tidy
nicktindall Aug 21, 2025
996ac06
Simplify reason, write detailed log message when debug enabled
nicktindall Aug 21, 2025
2f7123f
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 21, 2025
1789a51
Don't ignore hot-spotted nodes with shard movement in progress
nicktindall Aug 27, 2025
6e5d393
latencyThresholdMillis -> queueLatencyThresholdMillis
nicktindall Aug 27, 2025
8a66257
Use system time by default
nicktindall Aug 27, 2025
8a2b43b
Remove dead code
nicktindall Aug 27, 2025
8b23b03
rerouteService -> mockRerouteService
nicktindall Aug 27, 2025
1cd4154
createClusterInfoWithHotSpots, add better javadoc
nicktindall Aug 27, 2025
1a4979a
Fix test names
nicktindall Aug 27, 2025
98ba24d
Improve comment
nicktindall Aug 27, 2025
1d63962
Leave default as 30 for this PR
nicktindall Aug 27, 2025
3510567
Merge branch 'main' into ES-119922_implement_WriteLoadConstraintMontitor
nicktindall Aug 27, 2025
2abd946
Explicitly configure a non-zero reroute interval
nicktindall Aug 28, 2025
06beb49
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 28, 2025
81eb6e5
Call reroute even if there are no nodes under utilisation/latency thr…
nicktindall Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,5 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeFloat(this.averageThreadPoolUtilization);
out.writeVLong(this.maxThreadPoolQueueLatencyMillis);
}

@Override
public int hashCode() {
return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis);
}

@Override
public String toString() {
return "[totalThreadPoolThreads="
+ totalThreadPoolThreads
+ ", averageThreadPoolUtilization="
+ averageThreadPoolUtilization
+ ", maxThreadPoolQueueLatencyMillis="
+ maxThreadPoolQueueLatencyMillis
+ "]";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadPoolUsageStats other = (ThreadPoolUsageStats) o;
return totalThreadPoolThreads == other.totalThreadPoolThreads
&& averageThreadPoolUtilization == other.averageThreadPoolUtilization
&& maxThreadPoolQueueLatencyMillis == other.maxThreadPoolQueueLatencyMillis;
}

} // ThreadPoolUsageStats
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a record and these looked like the default equals/hashCode/toString


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,34 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Set;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via
* Monitors the node-level write thread pool usage across the cluster and initiates a rebalancing round (via
* {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds.
*
* TODO (ES-11992): implement
*/
public class WriteLoadConstraintMonitor {
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class);
private static final int MAX_NODE_IDS_IN_MESSAGE = 3;
private final WriteLoadConstraintSettings writeLoadConstraintSettings;
private final Supplier<ClusterState> clusterStateSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
private volatile long lastRerouteTimeMillis = 0;
private volatile Set<String> lastSetOfHotSpottedNodes = Set.of();

public WriteLoadConstraintMonitor(
ClusterSettings clusterSettings,
Expand All @@ -60,29 +68,86 @@ public void onNewInfo(ClusterInfo clusterInfo) {
return;
}

if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) {
logger.trace("skipping monitor because the write load decider is disabled");
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) {
logger.debug("skipping monitor because the write load decider is not fully enabled");
return;
}

logger.trace("processing new cluster info");

boolean reroute = false;
String explanation = "";
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size();
final Set<String> nodeIdsExceedingLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
final Set<String> potentialRelocationTargets = Sets.newHashSetWithExpectedSize(numberOfNodes);
clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> {
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]";
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() > writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) {
nodeIdsExceedingLatencyThreshold.add(nodeId);
} else if (writeThreadPoolStats.averageThreadPoolUtilization() <= writeLoadConstraintSettings.getHighUtilizationThreshold()) {
potentialRelocationTargets.add(nodeId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need this. I would be fine with calling reroute once for every onNewInfo call with a node having a queue latency above the threshold. Then leave it to the deciders to figure out if anything can move rather than be too smart about it here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to remove it if we don't mind calling reroute potentially more frequently. I thought we were trying to identify when we're hot-spotting in this logic, our working definition of hot-spotting includes that there are nodes with capacity if I'm not mistaken. But don't have strong feelings about it. I believe there's work scheduled to do the determination of "hot-spotting" elsewhere, which I'm not 100% clear on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this additional check, we can put it in if it's needed later.

}
});
assert Sets.intersection(nodeIdsExceedingLatencyThreshold, potentialRelocationTargets).isEmpty()
: "We assume any nodes exceeding the latency threshold are not viable targets for relocation";

if (nodeIdsExceedingLatencyThreshold.isEmpty()) {
logger.debug("No hot-spotting nodes detected");
return;
}

// Remove any over-threshold nodes that already have shards relocating away
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems okay to me be because the shard started/failed cluster state update provokes a reroute() call. Not sure if that's what you were aiming at? Otherwise, I'd be worried that a cluster that is doing a lot of rebalancing for a significant amount of time may not reconsider shard allocation decisions when there's a hot-spot. This and this bit of code are responsible for the reroute on cluster state update post shard state change, IIUC. Could you add that argument to the comment, if you agree? I think there should be an explanation of why here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, I was originally thinking that the monitor would maintain both that a node is hot-spotting and the timestamp when the node's hot-spot began (and then update the timestamp if/when reroute is called again for the same node). That way, after a reasonable amount of time in which we'd expect the hot-spot to have been addressed, the monitor can instigate reroute again for the same hot-spotting node.

final RoutingNodes routingNodes = state.getRoutingNodes();
nodeIdsExceedingLatencyThreshold.removeIf(
nodeId -> routingNodes.node(nodeId).numberOfShardsWithState(ShardRoutingState.RELOCATING) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this reliable? I haven't followed around how RELOCATING is used, but my understanding is that reconciliation will select a small subset of shard moves from the DesiredBalance and update the cluster state to start those moves. So there could be 100 shard moves queued for nodeA in the DesiredBalance, but maybe reconciliation fulfilled the shard move quota with other nodes. Or nodeA can't move shards until some other target node moves some shards off first. Etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do understand correctly, I'd be inclined to move away from this solution, since it'd be difficult to get right, and instead track the timestamp start of a node hot-spot, or the last reroute call because of that node hot-spot, and call reroute again if enough time passes for a particular node hot-spot.

Probably still obey haveCalledRerouteRecently first, but if haven't called recently, allow reroute for the same hot-spot.

Copy link
Contributor Author

@nicktindall nicktindall Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just to address the following from the ticket

Wait to call reroute if shards are already scheduled to move away from the hot node, until fresh node write load data is received after those moves have completed. The balancer may already be resolving the hot-spot.

As long as the hot-spotted node has some shards RELOCATING (this is the status for a shard that's moving on the source side, the target side will be INITIALIZING), we won't call reroute for that node/shard. If there are other hot-spotted nodes with no relocations ongoing this won't prevent reroute being called.

You're right this won't take into account undesired allocations. Perhaps a better solution would expand the condition to node has shards with state = RELOCATING || node-has-undesired-allocations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we currently have information about undesired allocations. So we'd need to do some additional work to get that into the cluster info if we wanted to implement the above.

Copy link
Contributor

@DiannaHohensee DiannaHohensee Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait to call reroute if shards are already scheduled to move away from the hot node, until fresh node write load data is received after those moves have completed. The balancer may already be resolving the hot-spot.

Ah yes, my mistake, I didn't consider all the interpretations.

Right, I don't think we have an undesired count saved any place. Even the DesiredBalance is a list of final assignments, and reconciliation looks for nodes missing a shard assignment. There's no running count. Thus the timestamp idea, keeping track of when reroute was last called for a hot-spot, and recalling reroute if X (5 mins?) time has passed and the hot-spot hasn't been resolved. I don't know of much harm in re-calling reroute, as opposed to the risk of not calling it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus the timestamp idea, keeping track of when reroute was last called for a hot-spot, and recalling reroute if X (5 mins?) time has passed and the hot-spot hasn't been resolved. I don't know of much harm in re-calling reroute, as opposed to the risk of not calling it.

I think that's how this will already work. If the hot-spot is not resolved and haveCalledRerouteRecently == false then we'll call reroute again.

I think it's probably worthwhile to wait until we see no movement away from a hot-spotted node before we decide to intervene. As you pointed out there will potentially be queued moves in the desired balancer that we're not privy to, but this condition seems better than nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather we removed this check. We don't know how long ago the DesiredBalance being executed was calculated, and on what data it made allocation choices, which could delay addressing a new hot-spot. We'll wait 30 seconds before calling reroute again, and there is no harm in calling reroute again: if nothing has changed, then the new DesiredBalance will be the same.

I think that's how this will already work. If the hot-spot is not resolved and haveCalledRerouteRecently == false then we'll call reroute again.

I meant a timestamp per node hot-spot, as opposed to a single timestamp for all nodes (current implementation). But the timestamp handling works as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd also prefer removing this check and leave it to the deciders/simulator. If simulation says that a shard leaves the node then that will already handle it.

I would prefer not to add further delays though. I think the ClusterInfo poll is enough. We can keep the reroute_interval, but as an operational tool (defaulting to 0s).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this check in 1789a51

);

if (nodeIdsExceedingLatencyThreshold.isEmpty()) {
logger.debug("All nodes over threshold have relocation in progress");
return;
}

// TODO (ES-11992): implement
if (potentialRelocationTargets.isEmpty()) {
logger.debug("No nodes are suitable as relocation targets");
return;
}

if (reroute) {
logger.debug("rerouting shards: [{}]", explanation);
rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> {
final var reroutedClusterState = clusterStateSupplier.get();
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis;
final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings.getMinimumRerouteInterval()
.millis();

// TODO (ES-11992): implement
if (haveCalledRerouteRecently == false
|| Sets.difference(nodeIdsExceedingLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) {
if (logger.isDebugEnabled()) {
logger.debug(
"Found {} exceeding the write thread pool queue latency threshold ({} with capacity, {} total), triggering reroute",
nodeSummary(nodeIdsExceedingLatencyThreshold),
nodeSummary(potentialRelocationTargets),
state.nodes().size()
);
}
final String reason = "hot-spotting detected by write load constraint monitor";
rerouteService.reroute(
reason,
Priority.NORMAL,
ActionListener.wrap(
ignored -> logger.trace("{} reroute successful", reason),
e -> logger.debug(() -> Strings.format("reroute failed, reason: %s", reason), e)
)
);
lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong();
lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold;
} else {
logger.debug("Not calling reroute because we called reroute recently and there are no new hot spots");
}
}

}, e -> logger.debug("reroute failed", e)));
private static String nodeSummary(Set<String> nodeIds) {
if (nodeIds.isEmpty() == false && nodeIds.size() <= MAX_NODE_IDS_IN_MESSAGE) {
return "[" + String.join(", ", nodeIds) + "]";
} else {
logger.trace("no reroute required");
return nodeIds.size() + " nodes";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public boolean disabled() {
*/
public static final Setting<TimeValue> WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING = Setting.timeSetting(
SETTING_PREFIX + "queue_latency_threshold",
TimeValue.timeValueSeconds(30),
TimeValue.timeValueSeconds(5),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand All @@ -117,41 +117,40 @@ public boolean disabled() {
Setting.Property.NodeScope
);

WriteLoadDeciderStatus writeLoadDeciderStatus;
TimeValue writeLoadDeciderRerouteIntervalSetting;
double writeThreadPoolHighUtilizationThresholdSetting;
private volatile WriteLoadDeciderStatus writeLoadDeciderStatus;
private volatile TimeValue minimumRerouteInterval;
private volatile double highUtilizationThreshold;
private volatile TimeValue queueLatencyThreshold;

public WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, status -> this.writeLoadDeciderStatus = status);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
timeValue -> this.minimumRerouteInterval = timeValue
);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
this::setWriteThreadPoolHighUtilizationThresholdSetting
value -> highUtilizationThreshold = value.getAsRatio()
);

};

private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) {
this.writeLoadDeciderStatus = status;
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING, value -> queueLatencyThreshold = value);
}

public WriteLoadDeciderStatus getWriteLoadConstraintEnabled() {
return this.writeLoadDeciderStatus;
}

public TimeValue getWriteLoadDeciderRerouteIntervalSetting() {
return this.writeLoadDeciderRerouteIntervalSetting;
public TimeValue getMinimumRerouteInterval() {
return this.minimumRerouteInterval;
}

public double getWriteThreadPoolHighUtilizationThresholdSetting() {
return this.writeThreadPoolHighUtilizationThresholdSetting;
public TimeValue getQueueLatencyThreshold() {
return this.queueLatencyThreshold;
}

private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) {
this.writeLoadDeciderRerouteIntervalSetting = timeValue;
}

private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) {
this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio();
/**
* @return The threshold as a ratio - i.e. in [0, 1]
*/
public double getHighUtilizationThreshold() {
return this.highUtilizationThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false;
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null;
var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting();
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getHighUtilizationThreshold();
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
// The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards.
String explain = Strings.format(
Expand Down
Loading