Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2dd9eac
First pass on WriteLoadConstraintMonitor
nicktindall Aug 14, 2025
471bc19
Fix disabled condition
nicktindall Aug 15, 2025
0e4db55
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 15, 2025
7ab66a7
Add tests
nicktindall Aug 18, 2025
018e100
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 18, 2025
6bd054f
equal to threshold is OK
nicktindall Aug 18, 2025
2484a77
More random
nicktindall Aug 18, 2025
899ba00
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 19, 2025
7049a5e
Store high utilization threshold as ratio
nicktindall Aug 19, 2025
c57935e
Change default queue latency threshold to 5s
nicktindall Aug 19, 2025
6971468
Update server/src/main/java/org/elasticsearch/cluster/routing/allocat…
nicktindall Aug 19, 2025
d3308d2
Use notFullyEnabled
nicktindall Aug 19, 2025
b816a15
Include hot-spotted and under-threshold node IDs in reason, in-line c…
nicktindall Aug 19, 2025
8ad414b
Fix constraints
nicktindall Aug 19, 2025
62bdc83
Assert write thread pool stats are present
nicktindall Aug 19, 2025
a326745
Change debug message when no nodes exceeding threshold
nicktindall Aug 19, 2025
ee54418
Make over-latency-threshold and under-utilization threshold mutually …
nicktindall Aug 19, 2025
d8ff9e5
Tidy
nicktindall Aug 21, 2025
996ac06
Simplify reason, write detailed log message when debug enabled
nicktindall Aug 21, 2025
2f7123f
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 21, 2025
1789a51
Don't ignore hot-spotted nodes with shard movement in progress
nicktindall Aug 27, 2025
6e5d393
latencyThresholdMillis -> queueLatencyThresholdMillis
nicktindall Aug 27, 2025
8a66257
Use system time by default
nicktindall Aug 27, 2025
8a2b43b
Remove dead code
nicktindall Aug 27, 2025
8b23b03
rerouteService -> mockRerouteService
nicktindall Aug 27, 2025
1cd4154
createClusterInfoWithHotSpots, add better javadoc
nicktindall Aug 27, 2025
1a4979a
Fix test names
nicktindall Aug 27, 2025
98ba24d
Improve comment
nicktindall Aug 27, 2025
1d63962
Leave default as 30 for this PR
nicktindall Aug 27, 2025
3510567
Merge branch 'main' into ES-119922_implement_WriteLoadConstraintMontitor
nicktindall Aug 27, 2025
2abd946
Explicitly configure a non-zero reroute interval
nicktindall Aug 28, 2025
06beb49
Merge remote-tracking branch 'origin/main' into ES-119922_implement_W…
nicktindall Aug 28, 2025
81eb6e5
Call reroute even if there are no nodes under utilisation/latency thr…
nicktindall Aug 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,5 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeFloat(this.averageThreadPoolUtilization);
out.writeVLong(this.maxThreadPoolQueueLatencyMillis);
}

@Override
public int hashCode() {
return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis);
}

@Override
public String toString() {
return "[totalThreadPoolThreads="
+ totalThreadPoolThreads
+ ", averageThreadPoolUtilization="
+ averageThreadPoolUtilization
+ ", maxThreadPoolQueueLatencyMillis="
+ maxThreadPoolQueueLatencyMillis
+ "]";
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ThreadPoolUsageStats other = (ThreadPoolUsageStats) o;
return totalThreadPoolThreads == other.totalThreadPoolThreads
&& averageThreadPoolUtilization == other.averageThreadPoolUtilization
&& maxThreadPoolQueueLatencyMillis == other.maxThreadPoolQueueLatencyMillis;
}

} // ThreadPoolUsageStats
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a record and these looked like the default equals/hashCode/toString


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,32 @@
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Set;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

/**
* Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via
* Monitors the node-level write thread pool usage across the cluster and initiates a rebalancing round (via
* {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds.
*
* TODO (ES-11992): implement
*/
public class WriteLoadConstraintMonitor {
private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class);
private static final int MAX_NODE_IDS_IN_MESSAGE = 3;
private final WriteLoadConstraintSettings writeLoadConstraintSettings;
private final Supplier<ClusterState> clusterStateSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
private volatile long lastRerouteTimeMillis = 0;
private volatile Set<String> lastSetOfHotSpottedNodes = Set.of();

public WriteLoadConstraintMonitor(
ClusterSettings clusterSettings,
Expand All @@ -60,29 +66,64 @@ public void onNewInfo(ClusterInfo clusterInfo) {
return;
}

if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) {
logger.trace("skipping monitor because the write load decider is disabled");
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) {
logger.debug("skipping monitor because the write load decider is not fully enabled");
return;
}

logger.trace("processing new cluster info");

boolean reroute = false;
String explanation = "";
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size();
final Set<String> nodeIdsExceedingLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> {
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap()
.get(ThreadPool.Names.WRITE);
assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]";
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() > writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) {
nodeIdsExceedingLatencyThreshold.add(nodeId);
}
});

// TODO (ES-11992): implement
if (nodeIdsExceedingLatencyThreshold.isEmpty()) {
logger.debug("No hot-spotting nodes detected");
return;
}

if (reroute) {
logger.debug("rerouting shards: [{}]", explanation);
rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> {
final var reroutedClusterState = clusterStateSupplier.get();
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis;
final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings.getMinimumRerouteInterval()
.millis();

// TODO (ES-11992): implement
if (haveCalledRerouteRecently == false
|| Sets.difference(nodeIdsExceedingLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) {
if (logger.isDebugEnabled()) {
logger.debug(
"Found {} exceeding the write thread pool queue latency threshold ({} total), triggering reroute",
nodeSummary(nodeIdsExceedingLatencyThreshold),
state.nodes().size()
);
}
final String reason = "hot-spotting detected by write load constraint monitor";
rerouteService.reroute(
reason,
Priority.NORMAL,
ActionListener.wrap(
ignored -> logger.trace("{} reroute successful", reason),
e -> logger.debug(() -> Strings.format("reroute failed, reason: %s", reason), e)
)
);
lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong();
lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold;
} else {
logger.debug("Not calling reroute because we called reroute recently and there are no new hot spots");
}
}

}, e -> logger.debug("reroute failed", e)));
private static String nodeSummary(Set<String> nodeIds) {
if (nodeIds.isEmpty() == false && nodeIds.size() <= MAX_NODE_IDS_IN_MESSAGE) {
return "[" + String.join(", ", nodeIds) + "]";
} else {
logger.trace("no reroute required");
return nodeIds.size() + " nodes";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,41 +107,40 @@ public boolean disabled() {
Setting.Property.NodeScope
);

WriteLoadDeciderStatus writeLoadDeciderStatus;
TimeValue writeLoadDeciderRerouteIntervalSetting;
double writeThreadPoolHighUtilizationThresholdSetting;
private volatile WriteLoadDeciderStatus writeLoadDeciderStatus;
private volatile TimeValue minimumRerouteInterval;
private volatile double highUtilizationThreshold;
private volatile TimeValue queueLatencyThreshold;

public WriteLoadConstraintSettings(ClusterSettings clusterSettings) {
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting);
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, status -> this.writeLoadDeciderStatus = status);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING,
timeValue -> this.minimumRerouteInterval = timeValue
);
clusterSettings.initializeAndWatch(
WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
this::setWriteThreadPoolHighUtilizationThresholdSetting
value -> highUtilizationThreshold = value.getAsRatio()
);

};

private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) {
this.writeLoadDeciderStatus = status;
clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING, value -> queueLatencyThreshold = value);
}

public WriteLoadDeciderStatus getWriteLoadConstraintEnabled() {
return this.writeLoadDeciderStatus;
}

public TimeValue getWriteLoadDeciderRerouteIntervalSetting() {
return this.writeLoadDeciderRerouteIntervalSetting;
public TimeValue getMinimumRerouteInterval() {
return this.minimumRerouteInterval;
}

public double getWriteThreadPoolHighUtilizationThresholdSetting() {
return this.writeThreadPoolHighUtilizationThresholdSetting;
public TimeValue getQueueLatencyThreshold() {
return this.queueLatencyThreshold;
}

private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) {
this.writeLoadDeciderRerouteIntervalSetting = timeValue;
}

private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) {
this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio();
/**
* @return The threshold as a ratio - i.e. in [0, 1]
*/
public double getHighUtilizationThreshold() {
return this.highUtilizationThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false;
assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null;
var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE);
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting();
var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getHighUtilizationThreshold();
if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) {
// The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards.
String explain = Strings.format(
Expand Down
Loading