15
15
import org .elasticsearch .cluster .ClusterInfo ;
16
16
import org .elasticsearch .cluster .ClusterInfoService ;
17
17
import org .elasticsearch .cluster .ClusterState ;
18
+ import org .elasticsearch .cluster .NodeUsageStatsForThreadPools ;
18
19
import org .elasticsearch .cluster .routing .RerouteService ;
19
20
import org .elasticsearch .common .Priority ;
21
+ import org .elasticsearch .common .Strings ;
20
22
import org .elasticsearch .common .settings .ClusterSettings ;
23
+ import org .elasticsearch .common .util .set .Sets ;
21
24
import org .elasticsearch .gateway .GatewayService ;
25
+ import org .elasticsearch .threadpool .ThreadPool ;
22
26
27
+ import java .util .Set ;
23
28
import java .util .function .LongSupplier ;
24
29
import java .util .function .Supplier ;
25
30
26
31
/**
27
- * Monitors the node-level write thread pool usage across the cluster and initiates (coming soon) a rebalancing round (via
32
+ * Monitors the node-level write thread pool usage across the cluster and initiates a rebalancing round (via
28
33
* {@link RerouteService#reroute}) whenever a node crosses the node-level write load thresholds.
29
- *
30
- * TODO (ES-11992): implement
31
34
*/
32
35
public class WriteLoadConstraintMonitor {
33
36
private static final Logger logger = LogManager .getLogger (WriteLoadConstraintMonitor .class );
37
+ private static final int MAX_NODE_IDS_IN_MESSAGE = 3 ;
34
38
private final WriteLoadConstraintSettings writeLoadConstraintSettings ;
35
39
private final Supplier <ClusterState > clusterStateSupplier ;
36
40
private final LongSupplier currentTimeMillisSupplier ;
37
41
private final RerouteService rerouteService ;
42
+ private volatile long lastRerouteTimeMillis = 0 ;
43
+ private volatile Set <String > lastSetOfHotSpottedNodes = Set .of ();
38
44
39
45
public WriteLoadConstraintMonitor (
40
46
ClusterSettings clusterSettings ,
@@ -60,29 +66,64 @@ public void onNewInfo(ClusterInfo clusterInfo) {
60
66
return ;
61
67
}
62
68
63
- if (writeLoadConstraintSettings .getWriteLoadConstraintEnabled () == WriteLoadConstraintSettings . WriteLoadDeciderStatus . DISABLED ) {
64
- logger .trace ("skipping monitor because the write load decider is disabled " );
69
+ if (writeLoadConstraintSettings .getWriteLoadConstraintEnabled (). notFullyEnabled () ) {
70
+ logger .debug ("skipping monitor because the write load decider is not fully enabled " );
65
71
return ;
66
72
}
67
73
68
74
logger .trace ("processing new cluster info" );
69
75
70
- boolean reroute = false ;
71
- String explanation = "" ;
72
- final long currentTimeMillis = currentTimeMillisSupplier .getAsLong ();
76
+ final int numberOfNodes = clusterInfo .getNodeUsageStatsForThreadPools ().size ();
77
+ final Set <String > nodeIdsExceedingLatencyThreshold = Sets .newHashSetWithExpectedSize (numberOfNodes );
78
+ clusterInfo .getNodeUsageStatsForThreadPools ().forEach ((nodeId , usageStats ) -> {
79
+ final NodeUsageStatsForThreadPools .ThreadPoolUsageStats writeThreadPoolStats = usageStats .threadPoolUsageStatsMap ()
80
+ .get (ThreadPool .Names .WRITE );
81
+ assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]" ;
82
+ if (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis () > writeLoadConstraintSettings .getQueueLatencyThreshold ().millis ()) {
83
+ nodeIdsExceedingLatencyThreshold .add (nodeId );
84
+ }
85
+ });
73
86
74
- // TODO (ES-11992): implement
87
+ if (nodeIdsExceedingLatencyThreshold .isEmpty ()) {
88
+ logger .debug ("No hot-spotting nodes detected" );
89
+ return ;
90
+ }
75
91
76
- if ( reroute ) {
77
- logger . debug ( "rerouting shards: [{}]" , explanation ) ;
78
- rerouteService . reroute ( "disk threshold monitor" , Priority . NORMAL , ActionListener . wrap ( ignored -> {
79
- final var reroutedClusterState = clusterStateSupplier . get ();
92
+ final long currentTimeMillis = currentTimeMillisSupplier . getAsLong ();
93
+ final long timeSinceLastRerouteMillis = currentTimeMillis - lastRerouteTimeMillis ;
94
+ final boolean haveCalledRerouteRecently = timeSinceLastRerouteMillis < writeLoadConstraintSettings . getMinimumRerouteInterval ()
95
+ . millis ();
80
96
81
- // TODO (ES-11992): implement
97
+ if (haveCalledRerouteRecently == false
98
+ || Sets .difference (nodeIdsExceedingLatencyThreshold , lastSetOfHotSpottedNodes ).isEmpty () == false ) {
99
+ if (logger .isDebugEnabled ()) {
100
+ logger .debug (
101
+ "Found {} exceeding the write thread pool queue latency threshold ({} total), triggering reroute" ,
102
+ nodeSummary (nodeIdsExceedingLatencyThreshold ),
103
+ state .nodes ().size ()
104
+ );
105
+ }
106
+ final String reason = "hot-spotting detected by write load constraint monitor" ;
107
+ rerouteService .reroute (
108
+ reason ,
109
+ Priority .NORMAL ,
110
+ ActionListener .wrap (
111
+ ignored -> logger .trace ("{} reroute successful" , reason ),
112
+ e -> logger .debug (() -> Strings .format ("reroute failed, reason: %s" , reason ), e )
113
+ )
114
+ );
115
+ lastRerouteTimeMillis = currentTimeMillisSupplier .getAsLong ();
116
+ lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold ;
117
+ } else {
118
+ logger .debug ("Not calling reroute because we called reroute recently and there are no new hot spots" );
119
+ }
120
+ }
82
121
83
- }, e -> logger .debug ("reroute failed" , e )));
122
+ private static String nodeSummary (Set <String > nodeIds ) {
123
+ if (nodeIds .isEmpty () == false && nodeIds .size () <= MAX_NODE_IDS_IN_MESSAGE ) {
124
+ return "[" + String .join (", " , nodeIds ) + "]" ;
84
125
} else {
85
- logger . trace ( "no reroute required" ) ;
126
+ return nodeIds . size () + " nodes" ;
86
127
}
87
128
}
88
129
}
0 commit comments