Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2dd9eac
First pass on WriteLoadConstraintMonitor
nicktindall Aug 14, 2025
471bc19
Fix disabled condition
nicktindall Aug 15, 2025
0e4db55
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 15, 2025
7ab66a7
Add tests
nicktindall Aug 18, 2025
018e100
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 18, 2025
6bd054f
equal to threshold is OK
nicktindall Aug 18, 2025
2484a77
More random
nicktindall Aug 18, 2025
899ba00
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 19, 2025
7049a5e
Store high utilization threshold as ratio
nicktindall Aug 19, 2025
c57935e
Change default queue latency threshold to 5s
nicktindall Aug 19, 2025
6971468
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Aug 19, 2025
d3308d2
Use notFullyEnabled
nicktindall Aug 19, 2025
b816a15
Include hot-spotted and under-threshold node IDs in reason, in-line c…
nicktindall Aug 19, 2025
8ad414b
Fix constraints
nicktindall Aug 19, 2025
62bdc83
Assert write thread pool stats are present
nicktindall Aug 19, 2025
a326745
Change debug message when no nodes exceeding threshold
nicktindall Aug 19, 2025
ee54418
Make over-latency-threshold and under-utilization threshold mutually …
nicktindall Aug 19, 2025
d8ff9e5
Tidy
nicktindall Aug 21, 2025
996ac06
Simplify reason, write detailed log message when debug enabled
nicktindall Aug 21, 2025
2f7123f
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 21, 2025
1789a51
Don't ignore hot-spotted nodes with shard movement in progress
nicktindall Aug 27, 2025
6e5d393
latencyThresholdMillis -> queueLatencyThresholdMillis
nicktindall Aug 27, 2025
8a66257
Use system time by default
nicktindall Aug 27, 2025
8a2b43b
Remove dead code
nicktindall Aug 27, 2025
8b23b03
rerouteService -> mockRerouteService
nicktindall Aug 27, 2025
1cd4154
createClusterInfoWithHotSpots, add better javadoc
nicktindall Aug 27, 2025
1a4979a
Fix test names
nicktindall Aug 27, 2025
98ba24d
Improve comment
nicktindall Aug 27, 2025
1d63962
Leave default as 30 for this PR
nicktindall Aug 27, 2025
3510567
Merge branch 'main' into ES-119922_implement_WriteLoadConstraintMontitor
nicktindall Aug 27, 2025
2abd946
Explicitly configure a non-zero reroute interval
nicktindall Aug 28, 2025
06beb49
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 28, 2025
81eb6e5
Call reroute even if there are no nodes under utilisation/latency thr…
nicktindall Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,5 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeFloat(this.averageThreadPoolUtilization);
out.writeVLong(this.maxThreadPoolQueueLatencyMillis);
}

@Override
public int hashCode() {
return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis);
}

@Override
public String toString() {
return "[totalThreadPoolThreads="
+ totalThreadPoolThreads
+ ", averageThreadPoolUtilization="
+ averageThreadPoolUtilization
+ ", maxThreadPoolQueueLatencyMillis="
+ maxThreadPoolQueueLatencyMillis
+ "]";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadPoolUsageStats other = (ThreadPoolUsageStats) o;
return totalThreadPoolThreads == other.totalThreadPoolThreads
&& averageThreadPoolUtilization == other.averageThreadPoolUtilization
&& maxThreadPoolQueueLatencyMillis == other.maxThreadPoolQueueLatencyMillis;
}

} // ThreadPoolUsageStats
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a record and these looked like the default equals/hashCode/toString


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,33 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Set;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via
* {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds.
*
* TODO (ES-11992): implement
*/
public class WriteLoadConstraintMonitor {
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class);
private final WriteLoadConstraintSettings writeLoadConstraintSettings;
private final Supplier<ClusterState> clusterStateSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
private volatile long lastRerouteTimeMillis = 0;
private volatile Set<String> lastSetOfHotSpottedNodes = Set.of();

public WriteLoadConstraintMonitor(
ClusterSettings clusterSettings,
Expand All @@ -60,29 +67,76 @@ public void onNewInfo(ClusterInfo clusterInfo) {
return;
}

if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) {
logger.trace("skipping monitor because the write load decider is disabled");
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() != WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED) {
logger.debug("skipping monitor because the write load decider is disabled");
return;
}

logger.trace("processing new cluster info");

boolean reroute = false;
String explanation = "";
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size();
final Set<String> nodeIdsExceedingLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
final Set<String> nodeIdsBelowUtilizationThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> {
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() > writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) {
nodeIdsExceedingLatencyThreshold.add(nodeId);
}
if (writeThreadPoolStats.averageThreadPoolUtilization() <= writeLoadConstraintSettings.getHighUtilizationThreshold()
.getAsRatio()) {
nodeIdsBelowUtilizationThreshold.add(nodeId);
}
});

if (nodeIdsExceedingLatencyThreshold.isEmpty()) {
logger.debug("No nodes exceeding latency threshold");
return;
}

// Remove any over-threshold nodes that already have shards relocating away
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems okay to me be because the shard started/failed cluster state update provokes a reroute() call. Not sure if that's what you were aiming at? Otherwise, I'd be worried that a cluster that is doing a lot of rebalancing for a significant amount of time may not reconsider shard allocation decisions when there's a hot-spot. This and this bit of code are responsible for the reroute on cluster state update post shard state change, IIUC. Could you add that argument to the comment, if you agree? I think there should be an explanation of why here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, I was originally thinking that the monitor would maintain both that a node is hot-spotting and the timestamp when the node's hot-spot began (and then update the timestamp if/when reroute is called again for the same node). That way, after a reasonable amount of time in which we'd expect the hot-spot to have been addressed, the monitor can instigate reroute again for the same hot-spotting node.

final RoutingNodes routingNodes = state.getRoutingNodes();
nodeIdsExceedingLatencyThreshold.removeIf(
nodeId -> routingNodes.node(nodeId).numberOfShardsWithState(ShardRoutingState.RELOCATING) > 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this reliable? I haven't followed around how RELOCATING is used, but my understanding is that reconciliation will select a small subset of shard moves from the DesiredBalance and update the cluster state to start those moves. So there could be 100 shard moves queued for nodeA in the DesiredBalance, but maybe reconciliation fulfilled the shard move quota with other nodes. Or nodeA can't move shards until some other target node moves some shards off first. Etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I do understand correctly, I'd be inclined to move away from this solution, since it'd be difficult to get right, and instead track the timestamp start of a node hot-spot, or the last reroute call because of that node hot-spot, and call reroute again if enough time passes for a particular node hot-spot.

Probably still obey haveCalledRerouteRecently first, but if haven't called recently, allow reroute for the same hot-spot.

Copy link
Contributor Author

@nicktindall nicktindall Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just to address the following from the ticket

Wait to call reroute if shards are already scheduled to move away from the hot node, until fresh node write load data is received after those moves have completed. The balancer may already be resolving the hot-spot.

As long as the hot-spotted node has some shards RELOCATING (this is the status for a shard that's moving on the source side, the target side will be INITIALIZING), we won't call reroute for that node/shard. If there are other hot-spotted nodes with no relocations ongoing this won't prevent reroute being called.

You're right this won't take into account undesired allocations. Perhaps a better solution would expand the condition to node has shards with state = RELOCATING || node-has-undesired-allocations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we currently have information about undesired allocations. So we'd need to do some additional work to get that into the cluster info if we wanted to implement the above.

Copy link
Contributor

@DiannaHohensee DiannaHohensee Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait to call reroute if shards are already scheduled to move away from the hot node, until fresh node write load data is received after those moves have completed. The balancer may already be resolving the hot-spot.

Ah yes, my mistake, I didn't consider all the interpretations.

Right, I don't think we have an undesired count saved any place. Even the DesiredBalance is a list of final assignments, and reconciliation looks for nodes missing a shard assignment. There's no running count. Thus the timestamp idea, keeping track of when reroute was last called for a hot-spot, and recalling reroute if X (5 mins?) time has passed and the hot-spot hasn't been resolved. I don't know of much harm in re-calling reroute, as opposed to the risk of not calling it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus the timestamp idea, keeping track of when reroute was last called for a hot-spot, and recalling reroute if X (5 mins?) time has passed and the hot-spot hasn't been resolved. I don't know of much harm in re-calling reroute, as opposed to the risk of not calling it.

I think that's how this will already work. If the hot-spot is not resolved and haveCalledRerouteRecently == false then we'll call reroute again.

I think it's probably worthwhile to wait until we see no movement away from a hot-spotted node before we decide to intervene. As you pointed out there will potentially be queued moves in the desired balancer that we're not privy to, but this condition seems better than nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather we removed this check. We don't know how long ago the DesiredBalance being executed was calculated, and on what data it made allocation choices, which could delay addressing a new hot-spot. We'll wait 30 seconds before calling reroute again, and there is no harm in calling reroute again: if nothing has changed, then the new DesiredBalance will be the same.

I think that's how this will already work. If the hot-spot is not resolved and haveCalledRerouteRecently == false then we'll call reroute again.

I meant a timestamp per node hot-spot, as opposed to a single timestamp for all nodes (current implementation). But the timestamp handling works as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd also prefer removing this check and leave it to the deciders/simulator. If simulation says that a shard leaves the node then that will already handle it.

I would prefer not to add further delays though. I think the ClusterInfo poll is enough. We can keep the reroute_interval, but as an operational tool (defaulting to 0s).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this check in 1789a51

);

// TODO (ES-11992): implement
if (nodeIdsExceedingLatencyThreshold.isEmpty()) {
logger.debug("All nodes over threshold have relocation in progress");
return;
}

if (reroute) {
logger.debug("rerouting shards: [{}]", explanation);
rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> {
final var reroutedClusterState = clusterStateSupplier.get();
if (Sets.difference(nodeIdsBelowUtilizationThreshold, nodeIdsExceedingLatencyThreshold).isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this have no overlap? I'd assume in the filtering code above to populate these two sets that a hot-spotting node is at 100% utilization. It doesn't make a lot of sense to have a hot spot but the node is below 90% utilization (or whatever we set the default to).

Then we'd have a check such as

if (nodeIdsBelowUtilizationThreshold.isEmpty || nodeIdsExceedingLatencyThreshold.isEmpty()) {
    // Do nothing, because either there aren't any target nodes or there aren't any source hot-spotting nodes.
}

I think we'll have to do some magic in the ES-12623 and ES-12634 to always supply 100% node utilization in the ClusterInfo when a node is hot spotted (in case of strange stat number reports), but that's different work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in ee54418

I'd assume in the filtering code above to populate these two sets that a hot-spotting node is at 100% utilization.

That's not a safe assumption. It'll probably be very high, but because it's an average there's a good chance it won't be 100%. I don't think we should necessarily fudge the numbers either, especially if we are going to use those numbers for simulation, we'd just be throwing information away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I was implicitly thinking any node without the queue latency -- so nodes between the low and high thresholds -- would be eligible to receive more shards. But the current implementation is not to do shard movements unless there are nodes below the low threshold (90% cpu usage).

Would we want that behavior? Suppose one node is queueing, and 5 other nodes are at 92% CPU utilization. It seems like it would still be better to initiate rebalancing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the above falls outside our definition of a hot-spot. If we find ourselves in that situation for a long period then I would argue autoscaling is broken.

logger.debug("No nodes below utilization threshold that are not exceeding latency threshold");
return;
}

// TODO (ES-11992): implement
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis;
final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings.getMinimumRerouteInterval()
.millis();

}, e -> logger.debug("reroute failed", e)));
if (haveCalledRerouteRecently == false
|| Sets.difference(nodeIdsExceedingLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) {
callReroute(nodeIdsExceedingLatencyThreshold);
} else {
logger.trace("no reroute required");
logger.debug("Not calling reroute because we called reroute recently and there are no new hot spots");
}
}

private void callReroute(Set<String> hotSpottedNodes) {
final String reason = Strings.format(
"write load constraint monitor: Found %d node(s) exceeding the write thread pool queue latency threshold",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add total and below threshold nodes, please? maybe inline callReroute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be reasonable to list all of the node IDs for actively hot-spotting nodes? That'd make it quite clear which nodes caused the rebalancing work, giving a lead where to investigate further.

The only risk I can think of is that a very large cluster could end up listing a lot of nodes. That'd be in a very unhappy large cluster, but we could put an upper limit on how many nodes we'll list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to include a limited number of hot-spotting and under-threshold node IDs, and added the total number of nodes. Also in-lined callReroute.

See b816a15

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I'm thinking maybe reroute message does not need this, but debug log with all mentioned above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed that method to handle no nodes and the limit properly in 8ad414b

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the reason string simpler and added the more detailed string as a debug log 996ac06

hotSpottedNodes.size()
);
rerouteService.reroute(
reason,
Priority.NORMAL,
ActionListener.wrap(
ignored -> logger.trace("{} reroute successful", reason),
e -> logger.debug(() -> Strings.format("reroute failed, reason: %s", reason), e)
)
);
lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong();
lastSetOfHotSpottedNodes = hotSpottedNodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,41 +117,37 @@ public boolean disabled() {
Setting.Property.NodeScope
);

WriteLoadDeciderStatus writeLoadDeciderStatus;
TimeValue writeLoadDeciderRerouteIntervalSetting;
double writeThreadPoolHighUtilizationThresholdSetting;
private volatile WriteLoadDeciderStatus writeLoadDeciderStatus;
private volatile TimeValue minimumRerouteInterval;
private volatile RatioValue highUtilizationThreshold;
private volatile TimeValue queueLatencyThreshold;

public WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, status -> this.writeLoadDeciderStatus = status);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
timeValue -> this.minimumRerouteInterval = timeValue
);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
this::setWriteThreadPoolHighUtilizationThresholdSetting
value -> highUtilizationThreshold = value
);

};

private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) {
this.writeLoadDeciderStatus = status;
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING, value -> queueLatencyThreshold = value);
}

public WriteLoadDeciderStatus getWriteLoadConstraintEnabled() {
return this.writeLoadDeciderStatus;
}

public TimeValue getWriteLoadDeciderRerouteIntervalSetting() {
return this.writeLoadDeciderRerouteIntervalSetting;
}

public double getWriteThreadPoolHighUtilizationThresholdSetting() {
return this.writeThreadPoolHighUtilizationThresholdSetting;
public TimeValue getMinimumRerouteInterval() {
return this.minimumRerouteInterval;
}

private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) {
this.writeLoadDeciderRerouteIntervalSetting = timeValue;
public TimeValue getQueueLatencyThreshold() {
return this.queueLatencyThreshold;
}

private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) {
this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio();
public RatioValue getHighUtilizationThreshold() {
return this.highUtilizationThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false;
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null;
var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting();
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getHighUtilizationThreshold().getAsRatio();
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
// The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards.
String explain = Strings.format(
Expand Down
Loading