1616import org .elasticsearch .cluster .ClusterInfoService ;
1717import org .elasticsearch .cluster .ClusterState ;
1818import org .elasticsearch .cluster .NodeUsageStatsForThreadPools ;
19+ import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
1920import org .elasticsearch .cluster .routing .RerouteService ;
2021import org .elasticsearch .common .Priority ;
2122import org .elasticsearch .common .Strings ;
2627import org .elasticsearch .threadpool .ThreadPool ;
2728
2829import java .util .Set ;
30+ import java .util .concurrent .atomic .AtomicBoolean ;
2931import java .util .function .LongSupplier ;
3032import java .util .function .Supplier ;
3133
@@ -75,18 +77,32 @@ public void onNewInfo(ClusterInfo clusterInfo) {
7577 logger .trace ("processing new cluster info" );
7678
7779 final int numberOfNodes = clusterInfo .getNodeUsageStatsForThreadPools ().size ();
78- final Set <String > nodeIdsExceedingLatencyThreshold = Sets .newHashSetWithExpectedSize (numberOfNodes );
80+ final Set <String > writeNodeIdsExceedingQueueLatencyThreshold = Sets .newHashSetWithExpectedSize (numberOfNodes );
81+ AtomicBoolean haveWriteNodesBelowQueueLatencyThreshold = new AtomicBoolean (false );
7982 clusterInfo .getNodeUsageStatsForThreadPools ().forEach ((nodeId , usageStats ) -> {
83+ if (state .getNodes ().get (nodeId ).getRoles ().contains (DiscoveryNodeRole .SEARCH_ROLE )) {
84+ // Search nodes are not expected to have write load hot-spots and are not considered for shard relocation.
85+ // TODO (ES-13314): consider stateful data tiers
86+ return ;
87+ }
8088 final NodeUsageStatsForThreadPools .ThreadPoolUsageStats writeThreadPoolStats = usageStats .threadPoolUsageStatsMap ()
8189 .get (ThreadPool .Names .WRITE );
8290 assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]" ;
83- if (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis () > writeLoadConstraintSettings .getQueueLatencyThreshold ().millis ()) {
84- nodeIdsExceedingLatencyThreshold .add (nodeId );
91+ if (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis () >= writeLoadConstraintSettings .getQueueLatencyThreshold ().millis ()) {
92+ writeNodeIdsExceedingQueueLatencyThreshold .add (nodeId );
93+ } else {
94+ haveWriteNodesBelowQueueLatencyThreshold .set (true );
8595 }
8696 });
8797
88- if (nodeIdsExceedingLatencyThreshold .isEmpty ()) {
89- logger .trace ("No hot-spotting nodes detected" );
98+ if (writeNodeIdsExceedingQueueLatencyThreshold .isEmpty ()) {
99+ logger .trace ("No hot-spotting write nodes detected" );
100+ return ;
101+ }
102+ if (haveWriteNodesBelowQueueLatencyThreshold .get () == false ) {
103+ logger .debug ("""
104+ Nodes [{}] are above the queue latency threshold, but there are no write nodes below the threshold. \
105+ Cannot rebalance shards.""" , nodeSummary (writeNodeIdsExceedingQueueLatencyThreshold ));
90106 return ;
91107 }
92108
@@ -98,14 +114,14 @@ public void onNewInfo(ClusterInfo clusterInfo) {
98114 // We know that there is at least one hot-spotting node if we've reached this code. Now check whether there are any new hot-spots
99115 // or hot-spots that are persisting and need further balancing work.
100116 if (haveCalledRerouteRecently == false
101- || Sets .difference (nodeIdsExceedingLatencyThreshold , lastSetOfHotSpottedNodes ).isEmpty () == false ) {
117+ || Sets .difference (writeNodeIdsExceedingQueueLatencyThreshold , lastSetOfHotSpottedNodes ).isEmpty () == false ) {
102118 if (logger .isDebugEnabled ()) {
103119 logger .debug (
104120 """
105121 Nodes [{}] are hot-spotting, of {} total cluster nodes. Reroute for hot-spotting {}. \
106122 Previously hot-spotting nodes are [{}]. The write thread pool queue latency threshold is [{}]. Triggering reroute.
107123 """ ,
108- nodeSummary (nodeIdsExceedingLatencyThreshold ),
124+ nodeSummary (writeNodeIdsExceedingQueueLatencyThreshold ),
109125 state .nodes ().size (),
110126 lastRerouteTimeMillis == 0
111127 ? "has never previously been called"
@@ -124,7 +140,7 @@ public void onNewInfo(ClusterInfo clusterInfo) {
124140 )
125141 );
126142 lastRerouteTimeMillis = currentTimeMillisSupplier .getAsLong ();
127- lastSetOfHotSpottedNodes = nodeIdsExceedingLatencyThreshold ;
143+ lastSetOfHotSpottedNodes = writeNodeIdsExceedingQueueLatencyThreshold ;
128144 } else {
129145 logger .debug (
130146 "Not calling reroute because we called reroute [{}] ago and there are no new hot spots" ,
0 commit comments