1515import org .elasticsearch .cluster .ClusterInfo ;
1616import org .elasticsearch .cluster .ClusterInfoService ;
1717import org .elasticsearch .cluster .ClusterState ;
18+ import org .elasticsearch .cluster .NodeUsageStatsForThreadPools ;
1819import org .elasticsearch .cluster .routing .RerouteService ;
1920import org .elasticsearch .common .Priority ;
21+ import org .elasticsearch .common .Strings ;
2022import org .elasticsearch .common .settings .ClusterSettings ;
23+ import org .elasticsearch .common .util .set .Sets ;
2124import org .elasticsearch .gateway .GatewayService ;
25+ import org .elasticsearch .threadpool .ThreadPool ;
2226
27+ import java .util .Set ;
2328import java .util .function .LongSupplier ;
2429import java .util .function .Supplier ;
2530
2631/**
27- * Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via
32+ * Monitors the node-level write thread pool usage across the cluster and initiates a rebalancing round (via
2833 * {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds.
29- *
30- * TODO (ES-11992): implement
3134 */
3235public class WriteLoadConstraintMonitor {
3336 private static final Logger logger = LogManager .getLogger (WriteLoadConstraintMonitor .class );
37+ private static final int MAX_NODE_IDS_IN_MESSAGE = 3 ;
3438 private final WriteLoadConstraintSettings writeLoadConstraintSettings ;
3539 private final Supplier <ClusterState > clusterStateSupplier ;
3640 private final LongSupplier currentTimeMillisSupplier ;
3741 private final RerouteService rerouteService ;
42+ private volatile long lastRerouteTimeMillis = 0 ;
43+ private volatile Set <String > lastSetOfHotSpottedNodes = Set .of ();
3844
3945 public WriteLoadConstraintMonitor (
4046 ClusterSettings clusterSettings ,
@@ -60,29 +66,64 @@ public void onNewInfo(ClusterInfo clusterInfo) {
6066 return ;
6167 }
6268
63- if (writeLoadConstraintSettings .getWriteLoadConstraintEnabled () == WriteLoadConstraintSettings . WriteLoadDeciderStatus . DISABLED ) {
64- logger .trace ("skipping monitor because the write load decider is disabled " );
69+ if (writeLoadConstraintSettings .getWriteLoadConstraintEnabled (). notFullyEnabled () ) {
70+ logger .debug ("skipping monitor because the write load decider is not fully enabled " );
6571 return ;
6672 }
6773
6874 logger .trace ("processing new cluster info" );
6975
70- boolean reroute = false ;
71- String explanation = "" ;
72- final long currentTimeMillis = currentTimeMillisSupplier .getAsLong ();
76+ final int numberOfNodes = clusterInfo .getNodeUsageStatsForThreadPools ().size ();
77+ final Set <String > nodeIdsExceedingLatencyThreshold = Sets .newHashSetWithExpectedSize (numberOfNodes );
78+ clusterInfo .getNodeUsageStatsForThreadPools ().forEach ((nodeId , usageStats ) -> {
79+ final NodeUsageStatsForThreadPools .ThreadPoolUsageStats writeThreadPoolStats = usageStats .threadPoolUsageStatsMap ()
80+ .get (ThreadPool .Names .WRITE );
81+ assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]" ;
82+ if (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis () > writeLoadConstraintSettings .getQueueLatencyThreshold ().millis ()) {
83+ nodeIdsExceedingLatencyThreshold .add (nodeId );
84+ }
85+ });
7386
74- // TODO (ES-11992): implement
87+ if (nodeIdsExceedingLatencyThreshold .isEmpty ()) {
88+ logger .debug ("No hot-spotting nodes detected" );
89+ return ;
90+ }
7591
76- if ( reroute ) {
77- logger . debug ( "rerouting shards: [{}]" , explanation ) ;
78- rerouteService . reroute ( "disk threshold monitor" , Priority . NORMAL , ActionListener . wrap ( ignored -> {
79- final var reroutedClusterState = clusterStateSupplier . get ();
92+ final long currentTimeMillis = currentTimeMillisSupplier . getAsLong ();
93+ final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis ;
94+ final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings . getMinimumRerouteInterval ()
95+ . millis ();
8096
81- // TODO (ES-11992): implement
97+ if (haveCalledRerouteRecently == false
98+ || Sets .difference (nodeIdsExceedingLatencyThreshold , lastSetOfHotSpottedNodes ).isEmpty () == false ) {
99+ if (logger .isDebugEnabled ()) {
100+ logger .debug (
101+ "Found {} exceeding the write thread pool queue latency threshold ({} total), triggering reroute" ,
102+ nodeSummary (nodeIdsExceedingLatencyThreshold ),
103+ state .nodes ().size ()
104+ );
105+ }
106+ final String reason = "hot-spotting detected by write load constraint monitor" ;
107+ rerouteService .reroute (
108+ reason ,
109+ Priority .NORMAL ,
110+ ActionListener .wrap (
111+ ignored -> logger .trace ("{} reroute successful" , reason ),
112+ e -> logger .debug (() -> Strings .format ("reroute failed, reason: %s" , reason ), e )
113+ )
114+ );
115+ lastRerouteTimeMillis = currentTimeMillisSupplier .getAsLong ();
116+ lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold ;
117+ } else {
118+ logger .debug ("Not calling reroute because we called reroute recently and there are no new hot spots" );
119+ }
120+ }
82121
83- }, e -> logger .debug ("reroute failed" , e )));
122+ private static String nodeSummary (Set <String > nodeIds ) {
123+ if (nodeIds .isEmpty () == false && nodeIds .size () <= MAX_NODE_IDS_IN_MESSAGE ) {
124+ return "[" + String .join (", " , nodeIds ) + "]" ;
84125 } else {
85- logger . trace ( "no reroute required" ) ;
126+ return nodeIds . size () + " nodes" ;
86127 }
87128 }
88129}
0 commit comments