diff --git a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java index 9b0297cd73abd..64112c1aadbcf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java +++ b/server/src/main/java/org/elasticsearch/cluster/NodeUsageStatsForThreadPools.java @@ -89,33 +89,5 @@ public void writeTo(StreamOutput out) throws IOException { out.writeFloat(this.averageThreadPoolUtilization); out.writeVLong(this.maxThreadPoolQueueLatencyMillis); } - - @Override - public int hashCode() { - return Objects.hash(totalThreadPoolThreads, averageThreadPoolUtilization, maxThreadPoolQueueLatencyMillis); - } - - @Override - public String toString() { - return "[totalThreadPoolThreads=" - + totalThreadPoolThreads - + ", averageThreadPoolUtilization=" - + averageThreadPoolUtilization - + ", maxThreadPoolQueueLatencyMillis=" - + maxThreadPoolQueueLatencyMillis - + "]"; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ThreadPoolUsageStats other = (ThreadPoolUsageStats) o; - return totalThreadPoolThreads == other.totalThreadPoolThreads - && averageThreadPoolUtilization == other.averageThreadPoolUtilization - && maxThreadPoolQueueLatencyMillis == other.maxThreadPoolQueueLatencyMillis; - } - - } // ThreadPoolUsageStats - + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java index 4826aeda574de..d497270950224 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java @@ -15,26 +15,32 @@ import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.threadpool.ThreadPool; +import java.util.Set; import java.util.function.LongSupplier; import java.util.function.Supplier; /** - * Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via + * Monitors the node-level write thread pool usage across the cluster and initiates a rebalancing round (via * {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds. - * - * TODO (ES-11992): implement */ public class WriteLoadConstraintMonitor { private static final Logger logger = LogManager.getLogger(WriteLoadConstraintMonitor.class); + private static final int MAX_NODE_IDS_IN_MESSAGE = 3; private final WriteLoadConstraintSettings writeLoadConstraintSettings; private final Supplier clusterStateSupplier; private final LongSupplier currentTimeMillisSupplier; private final RerouteService rerouteService; + private volatile long lastRerouteTimeMillis = 0; + private volatile Set lastSetOfHotSpottedNodes = Set.of(); public WriteLoadConstraintMonitor( ClusterSettings clusterSettings, @@ -60,29 +66,64 @@ public void onNewInfo(ClusterInfo clusterInfo) { return; } - if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled() == WriteLoadConstraintSettings.WriteLoadDeciderStatus.DISABLED) { - logger.trace("skipping monitor because the write load decider is disabled"); + if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { + logger.debug("skipping monitor because the write load decider is not fully enabled"); return; } logger.trace("processing new cluster info"); - boolean reroute = false; - String explanation = ""; - final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size(); + final Set nodeIdsExceedingLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes); + clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> { + final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE); + assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]"; + if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() > writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) { + nodeIdsExceedingLatencyThreshold.add(nodeId); + } + }); - // TODO (ES-11992): implement + if (nodeIdsExceedingLatencyThreshold.isEmpty()) { + logger.debug("No hot-spotting nodes detected"); + return; + } - if (reroute) { - logger.debug("rerouting shards: [{}]", explanation); - rerouteService.reroute("disk threshold monitor", Priority.NORMAL, ActionListener.wrap(ignored -> { - final var reroutedClusterState = clusterStateSupplier.get(); + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis; + final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings.getMinimumRerouteInterval() + .millis(); - // TODO (ES-11992): implement + if (haveCalledRerouteRecently == false + || Sets.difference(nodeIdsExceedingLatencyThreshold, lastSetOfHotSpottedNodes).isEmpty() == false) { + if (logger.isDebugEnabled()) { + logger.debug( + "Found {} exceeding the write thread pool queue latency threshold ({} total), triggering reroute", + nodeSummary(nodeIdsExceedingLatencyThreshold), + state.nodes().size() + ); + } + final String reason = "hot-spotting detected by write load constraint monitor"; + rerouteService.reroute( + reason, + Priority.NORMAL, + ActionListener.wrap( + ignored -> logger.trace("{} reroute successful", reason), + e -> logger.debug(() -> Strings.format("reroute failed, reason: %s", reason), e) + ) + ); + lastRerouteTimeMillis = currentTimeMillisSupplier.getAsLong(); + lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold; + } else { + logger.debug("Not calling reroute because we called reroute recently and there are no new hot spots"); + } + } - }, e -> logger.debug("reroute failed", e))); + private static String nodeSummary(Set nodeIds) { + if (nodeIds.isEmpty() == false && nodeIds.size() <= MAX_NODE_IDS_IN_MESSAGE) { + return "[" + String.join(", ", nodeIds) + "]"; } else { - logger.trace("no reroute required"); + return nodeIds.size() + " nodes"; } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java index c6654c7fced19..21c1a7ba04f0f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintSettings.java @@ -107,41 +107,40 @@ public boolean disabled() { Setting.Property.NodeScope ); - WriteLoadDeciderStatus writeLoadDeciderStatus; - TimeValue writeLoadDeciderRerouteIntervalSetting; - double writeThreadPoolHighUtilizationThresholdSetting; + private volatile WriteLoadDeciderStatus writeLoadDeciderStatus; + private volatile TimeValue minimumRerouteInterval; + private volatile double highUtilizationThreshold; + private volatile TimeValue queueLatencyThreshold; public WriteLoadConstraintSettings(ClusterSettings clusterSettings) { - clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, this::setWriteLoadConstraintEnabled); - clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, this::setWriteLoadDeciderRerouteIntervalSetting); + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_ENABLED_SETTING, status -> this.writeLoadDeciderStatus = status); + clusterSettings.initializeAndWatch( + WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING, + timeValue -> this.minimumRerouteInterval = timeValue + ); clusterSettings.initializeAndWatch( WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING, - this::setWriteThreadPoolHighUtilizationThresholdSetting + value -> highUtilizationThreshold = value.getAsRatio() ); - - }; - - private void setWriteLoadConstraintEnabled(WriteLoadDeciderStatus status) { - this.writeLoadDeciderStatus = status; + clusterSettings.initializeAndWatch(WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING, value -> queueLatencyThreshold = value); } public WriteLoadDeciderStatus getWriteLoadConstraintEnabled() { return this.writeLoadDeciderStatus; } - public TimeValue getWriteLoadDeciderRerouteIntervalSetting() { - return this.writeLoadDeciderRerouteIntervalSetting; + public TimeValue getMinimumRerouteInterval() { + return this.minimumRerouteInterval; } - public double getWriteThreadPoolHighUtilizationThresholdSetting() { - return this.writeThreadPoolHighUtilizationThresholdSetting; + public TimeValue getQueueLatencyThreshold() { + return this.queueLatencyThreshold; } - private void setWriteLoadDeciderRerouteIntervalSetting(TimeValue timeValue) { - this.writeLoadDeciderRerouteIntervalSetting = timeValue; - } - - private void setWriteThreadPoolHighUtilizationThresholdSetting(RatioValue percent) { - this.writeThreadPoolHighUtilizationThresholdSetting = percent.getAsRatio(); + /** + * @return The threshold as a ratio - i.e. in [0, 1] + */ + public double getHighUtilizationThreshold() { + return this.highUtilizationThreshold; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java index ef24760f02a6b..e814f570a67bb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/WriteLoadConstraintDecider.java @@ -61,7 +61,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().isEmpty() == false; assert nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE) != null; var nodeWriteThreadPoolStats = nodeUsageStatsForThreadPools.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); - var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getWriteThreadPoolHighUtilizationThresholdSetting(); + var nodeWriteThreadPoolLoadThreshold = writeLoadConstraintSettings.getHighUtilizationThreshold(); if (nodeWriteThreadPoolStats.averageThreadPoolUtilization() >= nodeWriteThreadPoolLoadThreshold) { // The node's write thread pool usage stats already show high utilization above the threshold for accepting new shards. String explain = Strings.format( diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java new file mode 100644 index 0000000000000..fea79d92c9fbf --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java @@ -0,0 +1,402 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.apache.logging.log4j.Level; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterInfo; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.NodeUsageStatsForThreadPools; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RerouteService; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.MockLog; +import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongSupplier; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +public class WriteLoadConstraintMonitorTests extends ESTestCase { + + public void testRerouteIsCalledWhenAHotSpotIsDetected() { + final TestState testState = createRandomTestStateThatWillTriggerReroute(); + final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( + testState.clusterSettings, + testState.currentTimeSupplier, + () -> testState.clusterState, + testState.mockRerouteService + ); + + writeLoadConstraintMonitor.onNewInfo(testState.clusterInfo); + verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any()); + } + + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG", + reason = "ensure we're skipping reroute for the right reason" + ) + public void testRerouteIsNotCalledWhenStateIsNotRecovered() { + final TestState testState = createRandomTestStateThatWillTriggerReroute(); + final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( + testState.clusterSettings, + testState.currentTimeSupplier, + () -> ClusterState.builder(testState.clusterState) + .blocks(ClusterBlocks.builder().addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK).build()) + .build(), + testState.mockRerouteService + ); + + try (MockLog mockLog = MockLog.capture(WriteLoadConstraintMonitor.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "don't reroute due to global block", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + "skipping monitor as the cluster state is not recovered yet" + ) + ); + + writeLoadConstraintMonitor.onNewInfo(testState.clusterInfo); + mockLog.assertAllExpectationsMatched(); + verifyNoInteractions(testState.mockRerouteService); + } + } + + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG", + reason = "ensure we're skipping reroute for the right reason" + ) + public void testRerouteIsNotCalledWhenDeciderIsNotEnabled() { + final TestState testState = createRandomTestStateThatWillTriggerReroute(); + final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( + createClusterSettings( + randomValueOtherThan( + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED, + () -> randomFrom(WriteLoadConstraintSettings.WriteLoadDeciderStatus.values()) + ), + testState.latencyThresholdMillis, + testState.highUtilizationThresholdPercent + ), + testState.currentTimeSupplier, + () -> testState.clusterState, + testState.mockRerouteService + ); + + try (MockLog mockLog = MockLog.capture(WriteLoadConstraintMonitor.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "don't reroute due to decider being disabled", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + "skipping monitor because the write load decider is not fully enabled" + ) + ); + + writeLoadConstraintMonitor.onNewInfo(testState.clusterInfo); + mockLog.assertAllExpectationsMatched(); + verifyNoInteractions(testState.mockRerouteService); + } + } + + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG", + reason = "ensure we're skipping reroute for the right reason" + ) + public void testRerouteIsNotCalledWhenNoNodesAreHotSpotting() { + final TestState testState = createRandomTestStateThatWillTriggerReroute(); + final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( + testState.clusterSettings, + testState.currentTimeSupplier, + () -> testState.clusterState, + testState.mockRerouteService + ); + + try (MockLog mockLog = MockLog.capture(WriteLoadConstraintMonitor.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "don't reroute due to no nodes hot-spotting", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + "No hot-spotting nodes detected" + ) + ); + + writeLoadConstraintMonitor.onNewInfo( + createClusterInfoWithHotSpots( + testState.clusterState, + 0, + testState.latencyThresholdMillis, + testState.highUtilizationThresholdPercent + ) + ); + mockLog.assertAllExpectationsMatched(); + verifyNoInteractions(testState.mockRerouteService); + } + } + + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG", + reason = "ensure we're skipping reroute for the right reason" + ) + public void testRerouteIsNotCalledAgainBeforeMinimumIntervalHasPassed() { + final TestState testState = createRandomTestStateThatWillTriggerReroute(); + final TimeValue minimumInterval = testState.clusterSettings.get( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING + ); + assertThat(minimumInterval, greaterThan(TimeValue.ZERO)); + final long nowMillis = System.currentTimeMillis(); + final AtomicLong currentTimeMillis = new AtomicLong(nowMillis); + + final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( + testState.clusterSettings, + currentTimeMillis::get, + () -> testState.clusterState, + testState.mockRerouteService + ); + + // We should trigger a re-route @ nowMillis + writeLoadConstraintMonitor.onNewInfo(testState.clusterInfo); + verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any()); + reset(testState.mockRerouteService); + + while (currentTimeMillis.get() < nowMillis + minimumInterval.millis()) { + try (MockLog mockLog = MockLog.capture(WriteLoadConstraintMonitor.class)) { + mockLog.addExpectation( + new MockLog.SeenEventExpectation( + "don't reroute due to reroute being called recently", + WriteLoadConstraintMonitor.class.getCanonicalName(), + Level.DEBUG, + "Not calling reroute because we called reroute recently and there are no new hot spots" + ) + ); + writeLoadConstraintMonitor.onNewInfo(testState.clusterInfo); + mockLog.assertAllExpectationsMatched(); + verifyNoInteractions(testState.mockRerouteService); + } + + currentTimeMillis.addAndGet(randomLongBetween(500, 1_000)); + } + + // We're now passed the minimum interval + writeLoadConstraintMonitor.onNewInfo(testState.clusterInfo); + verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any()); + } + + @TestLogging( + value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:DEBUG", + reason = "ensure we're skipping reroute for the right reason" + ) + public void testRerouteIsCalledBeforeMinimumIntervalHasPassedIfNewNodesBecomeHotSpotted() { + final TestState testState = createRandomTestStateThatWillTriggerReroute(); + final AtomicLong currentTimeMillis = new AtomicLong(System.currentTimeMillis()); + final TimeValue minimumInterval = testState.clusterSettings.get( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING + ); + assertThat(minimumInterval, greaterThan(TimeValue.ZERO)); + + final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor( + testState.clusterSettings, + currentTimeMillis::get, + () -> testState.clusterState, + testState.mockRerouteService + ); + + // We should trigger a re-route @ currentTime + writeLoadConstraintMonitor.onNewInfo(testState.clusterInfo); + verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any()); + reset(testState.mockRerouteService); + + assertThat( + "Test setup should leave at least two nodes not hot-spotted", + testState.clusterInfo.getNodeUsageStatsForThreadPools().size() - testState.clusterInfo.getNodeUsageStatsForThreadPools() + .values() + .stream() + .filter(stats -> nodeExceedsQueueLatencyThreshold(stats, testState.latencyThresholdMillis)) + .count(), + greaterThanOrEqualTo(2L) + ); + + // Now update cluster info to add another hot-spotted node + final AtomicBoolean thresholdIncreased = new AtomicBoolean(false); + var nodeUsageStatsWithExtraHotSpot = Maps.transformValues(testState.clusterInfo.getNodeUsageStatsForThreadPools(), stats -> { + if (thresholdIncreased.get() == false && nodeExceedsQueueLatencyThreshold(stats, testState.latencyThresholdMillis) == false) { + thresholdIncreased.set(true); + return new NodeUsageStatsForThreadPools( + stats.nodeId(), + Maps.transformValues( + stats.threadPoolUsageStatsMap(), + tpStats -> new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + tpStats.totalThreadPoolThreads(), + tpStats.averageThreadPoolUtilization(), + testState.latencyThresholdMillis + randomLongBetween(1, 100_000) + ) + ) + ); + } + return stats; + }); + + // Advance the clock by less than the re-route interval + currentTimeMillis.addAndGet(randomLongBetween(0, minimumInterval.millis() - 1)); + + // We should reroute again despite the minimum interval not having passed + writeLoadConstraintMonitor.onNewInfo(ClusterInfo.builder().nodeUsageStatsForThreadPools(nodeUsageStatsWithExtraHotSpot).build()); + verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any()); + } + + private boolean nodeExceedsQueueLatencyThreshold(NodeUsageStatsForThreadPools nodeUsageStats, long latencyThresholdMillis) { + return nodeUsageStats.threadPoolUsageStatsMap() + .get(ThreadPool.Names.WRITE) + .maxThreadPoolQueueLatencyMillis() > latencyThresholdMillis; + } + + private TestState createRandomTestStateThatWillTriggerReroute() { + final long queueLatencyThresholdMillis = randomLongBetween(1000, 5000); + final int highUtilizationThresholdPercent = randomIntBetween(70, 100); + final int numberOfNodes = randomIntBetween(3, 10); + final ClusterSettings clusterSettings = createClusterSettings( + WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED, + queueLatencyThresholdMillis, + highUtilizationThresholdPercent + ); + final ClusterState state = ClusterStateCreationUtils.state( + numberOfNodes, + new String[] { randomIdentifier() }, + randomIntBetween(1, numberOfNodes) + ); + final RerouteService rerouteService = mock(RerouteService.class); + final ClusterInfo clusterInfo = createClusterInfoWithHotSpots( + state, + randomIntBetween(1, numberOfNodes - 2), + queueLatencyThresholdMillis, + highUtilizationThresholdPercent + ); + return new TestState( + queueLatencyThresholdMillis, + highUtilizationThresholdPercent, + numberOfNodes, + clusterSettings, + System::currentTimeMillis, + state, + rerouteService, + clusterInfo + ); + } + + private static ClusterSettings createClusterSettings( + WriteLoadConstraintSettings.WriteLoadDeciderStatus status, + long queueLatencyThresholdMillis, + int highUtilizationThresholdPercent + ) { + return ClusterSettings.createBuiltInClusterSettings( + Settings.builder() + .put(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(), status.name()) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING.getKey(), + TimeValue.timeValueMillis(queueLatencyThresholdMillis) + ) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING.getKey(), + highUtilizationThresholdPercent + "%" + ) + .put( + WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING.getKey(), + randomTimeValue(1, 30, TimeUnit.SECONDS) + ) + .build() + ); + } + + /** + * Create a {@link ClusterInfo} with the specified number of hot spotting nodes, + * all other nodes will have no queue latency and have utilization below the specified + * high-utilization threshold. + * + * @param state The cluster state + * @param numberOfNodesHotSpotting The number of nodes that should be hot-spotting + * @param queueLatencyThresholdMillis The latency threshold in milliseconds + * @param highUtilizationThresholdPercent The high utilization threshold as a percentage + * @return a ClusterInfo with the given parameters + */ + private static ClusterInfo createClusterInfoWithHotSpots( + ClusterState state, + int numberOfNodesHotSpotting, + long queueLatencyThresholdMillis, + int highUtilizationThresholdPercent + ) { + final float maxRatioForUnderUtilised = (highUtilizationThresholdPercent - 1) / 100.0f; + final AtomicInteger hotSpottingNodes = new AtomicInteger(numberOfNodesHotSpotting); + return ClusterInfo.builder() + .nodeUsageStatsForThreadPools(state.nodes().stream().collect(Collectors.toMap(DiscoveryNode::getId, node -> { + if (hotSpottingNodes.getAndDecrement() > 0) { + // hot-spotting node + return new NodeUsageStatsForThreadPools( + node.getId(), + Map.of( + ThreadPool.Names.WRITE, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomNonNegativeInt(), + randomFloatBetween(0f, 1f, true), + randomLongBetween(queueLatencyThresholdMillis + 1, queueLatencyThresholdMillis * 2) + ) + ) + ); + } else { + // not-hot-spotting node + return new NodeUsageStatsForThreadPools( + node.getId(), + Map.of( + ThreadPool.Names.WRITE, + new NodeUsageStatsForThreadPools.ThreadPoolUsageStats( + randomNonNegativeInt(), + randomFloatBetween(0f, maxRatioForUnderUtilised, true), + randomLongBetween(0, queueLatencyThresholdMillis) + ) + ) + ); + } + }))) + .build(); + } + + private record TestState( + long latencyThresholdMillis, + int highUtilizationThresholdPercent, + int numberOfNodes, + ClusterSettings clusterSettings, + LongSupplier currentTimeSupplier, + ClusterState clusterState, + RerouteService mockRerouteService, + ClusterInfo clusterInfo + ) {} +}