1616import org .elasticsearch .cluster .ClusterInfoService ;
1717import org .elasticsearch .cluster .ClusterState ;
1818import org .elasticsearch .cluster .NodeUsageStatsForThreadPools ;
19+ import org .elasticsearch .cluster .node .DiscoveryNode ;
1920import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
20- import org .elasticsearch .cluster .node .DiscoveryNodes ;
2121import org .elasticsearch .cluster .routing .RerouteService ;
2222import org .elasticsearch .common .Priority ;
2323import org .elasticsearch .common .Strings ;
2424import org .elasticsearch .common .util .set .Sets ;
2525import org .elasticsearch .core .TimeValue ;
2626import org .elasticsearch .gateway .GatewayService ;
2727import org .elasticsearch .telemetry .metric .DoubleHistogram ;
28+ import org .elasticsearch .telemetry .metric .LongGauge ;
2829import org .elasticsearch .telemetry .metric .LongWithAttributes ;
2930import org .elasticsearch .telemetry .metric .MeterRegistry ;
3031import org .elasticsearch .threadpool .ThreadPool ;
3132
33+ import java .util .ArrayList ;
34+ import java .util .Collection ;
35+ import java .util .Collections ;
3236import java .util .HashMap ;
3337import java .util .List ;
3438import java .util .Map ;
4549public class WriteLoadConstraintMonitor {
4650 public static final String HOTSPOT_NODES_COUNT_METRIC_NAME = "es.allocator.allocations.node.write_load_hotspot.current" ;
4751 public static final String HOTSPOT_DURATION_METRIC_NAME = "es.allocator.allocations.node.write_load_hotspot.duration.histogram" ;
52+ public static final String HOTSPOT_NODES_FLAG_METRIC_NAME = "es.allocator.allocations.node.write_load_hotspot.node_flag.current" ;
4853
4954 private static final Logger logger = LogManager .getLogger (WriteLoadConstraintMonitor .class );
5055 private static final int MAX_NODE_IDS_IN_MESSAGE = 3 ;
@@ -53,11 +58,12 @@ public class WriteLoadConstraintMonitor {
5358 private final LongSupplier currentTimeMillisSupplier ;
5459 private final RerouteService rerouteService ;
5560 private volatile long lastRerouteTimeMillis = 0 ;
56- private final Map <String , Long > hotspotNodeStartTimes = new HashMap <> ();
61+ private volatile Map <NodeIdName , Long > hotspotNodeStartTimes = Map . of ();
5762 private long hotspotNodeStartTimesLastTerm = -1L ;
5863
5964 private final AtomicLong hotspotNodesCount = new AtomicLong (-1L ); // metrics source of hotspotting node count
6065 private final DoubleHistogram hotspotDurationHistogram ;
66+ private final LongGauge hotspotNodeFlagGauge ; // nodes hotspotting have 1 written, with the name/id
6167
6268 protected WriteLoadConstraintMonitor (
6369 WriteLoadConstraintSettings writeLoadConstraintSettings ,
@@ -88,6 +94,12 @@ public WriteLoadConstraintMonitor(
8894 this ::getHotspotNodesCount
8995 );
9096 hotspotDurationHistogram = meterRegistry .registerDoubleHistogram (HOTSPOT_DURATION_METRIC_NAME , "hotspot duration" , "s" );
97+ hotspotNodeFlagGauge = meterRegistry .registerLongsGauge (
98+ HOTSPOT_NODES_FLAG_METRIC_NAME ,
99+ "hotspot node flag" ,
100+ "flag" ,
101+ this ::getHotspottingNodeFlags
102+ );
91103 }
92104
93105 /**
@@ -110,7 +122,7 @@ public void onNewInfo(ClusterInfo clusterInfo) {
110122 logger .trace ("processing new cluster info" );
111123
112124 final int numberOfNodes = clusterInfo .getNodeUsageStatsForThreadPools ().size ();
113- final Set <String > writeNodeIdsExceedingQueueLatencyThreshold = Sets .newHashSetWithExpectedSize (numberOfNodes );
125+ final Set <NodeIdName > writeNodesExceedingQueueLatencyThreshold = Sets .newHashSetWithExpectedSize (numberOfNodes );
114126 final Map <String , NodeUsageStatsForThreadPools > nodeUsageStats = clusterInfo .getNodeUsageStatsForThreadPools ();
115127 var haveWriteNodesBelowQueueLatencyThreshold = false ;
116128 var totalIngestNodes = 0 ;
@@ -132,23 +144,23 @@ public void onNewInfo(ClusterInfo clusterInfo) {
132144 .get (ThreadPool .Names .WRITE );
133145 assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]" ;
134146 if (writeThreadPoolStats .maxThreadPoolQueueLatencyMillis () >= writeLoadConstraintSettings .getQueueLatencyThreshold ().millis ()) {
135- writeNodeIdsExceedingQueueLatencyThreshold .add (nodeId );
147+ writeNodesExceedingQueueLatencyThreshold .add (NodeIdName . nodeIdName ( node ) );
136148 } else {
137149 haveWriteNodesBelowQueueLatencyThreshold = true ;
138150 }
139151 }
140152
141153 final long currentTimeMillis = currentTimeMillisSupplier .getAsLong ();
142- Set <String > lastHotspotNodes = recordHotspotDurations (state , writeNodeIdsExceedingQueueLatencyThreshold , currentTimeMillis );
154+ Set <NodeIdName > lastHotspotNodes = recordHotspotDurations (state , writeNodesExceedingQueueLatencyThreshold , currentTimeMillis );
143155
144- if (writeNodeIdsExceedingQueueLatencyThreshold .isEmpty ()) {
156+ if (writeNodesExceedingQueueLatencyThreshold .isEmpty ()) {
145157 logger .trace ("No hot-spotting write nodes detected" );
146158 return ;
147159 }
148160 if (haveWriteNodesBelowQueueLatencyThreshold == false ) {
149161 logger .debug ("""
150162 Nodes [{}] are above the queue latency threshold, but there are no write nodes below the threshold. \
151- Cannot rebalance shards.""" , nodeSummary (writeNodeIdsExceedingQueueLatencyThreshold , state ));
163+ Cannot rebalance shards.""" , nodeSummary (writeNodesExceedingQueueLatencyThreshold ));
152164 return ;
153165 }
154166
@@ -158,20 +170,20 @@ public void onNewInfo(ClusterInfo clusterInfo) {
158170
159171 // We know that there is at least one hot-spotting node if we've reached this code. Now check whether there are any new hot-spots
160172 // or hot-spots that are persisting and need further balancing work.
161- Set <String > newHotspotNodes = Sets .difference (writeNodeIdsExceedingQueueLatencyThreshold , lastHotspotNodes );
173+ Set <NodeIdName > newHotspotNodes = Sets .difference (writeNodesExceedingQueueLatencyThreshold , lastHotspotNodes );
162174 if (haveCalledRerouteRecently == false || newHotspotNodes .isEmpty () == false ) {
163175 if (logger .isDebugEnabled ()) {
164176 logger .debug (
165177 """
166178 Nodes [{}] are hot-spotting, of {} total ingest nodes. Reroute for hot-spotting {}. \
167179 Previously hot-spotting nodes are [{}]. The write thread pool queue latency threshold is [{}]. Triggering reroute.
168180 """ ,
169- nodeSummary (writeNodeIdsExceedingQueueLatencyThreshold , state ),
181+ nodeSummary (writeNodesExceedingQueueLatencyThreshold ),
170182 totalIngestNodes ,
171183 lastRerouteTimeMillis == 0
172184 ? "has never previously been called"
173185 : "was last called [" + TimeValue .timeValueMillis (timeSinceLastRerouteMillis ) + "] ago" ,
174- nodeSummary (lastHotspotNodes , state ),
186+ nodeSummary (lastHotspotNodes ),
175187 writeLoadConstraintSettings .getQueueLatencyThreshold ()
176188 );
177189 }
@@ -195,30 +207,41 @@ public void onNewInfo(ClusterInfo clusterInfo) {
195207 }
196208 }
197209
198- private void recordHotspotStartTimes (Set <String > nodeIds , long startTimestamp ) {
199- for (String nodeId : nodeIds ) {
200- hotspotNodeStartTimes .put (nodeId , startTimestamp );
210+ private void recordHotspotStartTimes (Set <NodeIdName > newHotspotNodes , long startTimestamp ) {
211+ if (newHotspotNodes .size () > 0 ) {
212+ Map <NodeIdName , Long > hotspotNodeStartTimesUpdate = new HashMap <>(hotspotNodeStartTimes );
213+ for (NodeIdName nodeIdName : newHotspotNodes ) {
214+ hotspotNodeStartTimesUpdate .put (nodeIdName , startTimestamp );
215+ }
216+ hotspotNodeStartTimes = Collections .unmodifiableMap (hotspotNodeStartTimesUpdate );
201217 }
202218 hotspotNodesCount .set (hotspotNodeStartTimes .size ());
203219 }
204220
205- private Set <String > recordHotspotDurations (ClusterState state , Set <String > currentHotspotNodes , long hotspotEndTime ) {
221+ private Set <NodeIdName > recordHotspotDurations (ClusterState state , Set <NodeIdName > currentHotspotNodes , long hotspotEndTimeMs ) {
206222 // reset hotspotNodeStartTimes if the term has changed
207223 if (state .term () != hotspotNodeStartTimesLastTerm || state .nodes ().isLocalNodeElectedMaster () == false ) {
208224 hotspotNodeStartTimesLastTerm = state .term ();
209- hotspotNodeStartTimes . clear ();
225+ hotspotNodeStartTimes = Map . of ();
210226 }
211227
212- Set <String > lastHotspotNodes = hotspotNodeStartTimes .keySet ();
213- Set <String > staleHotspotNodes = Sets .difference (lastHotspotNodes , currentHotspotNodes );
214-
215- for (String nodeId : staleHotspotNodes ) {
216- assert hotspotNodeStartTimes .containsKey (nodeId ) : "Map should contain key from its own subset" ;
217- long hotspotStartTime = hotspotNodeStartTimes .remove (nodeId );
218- long hotspotDuration = hotspotEndTime - hotspotStartTime ;
219- assert hotspotDuration >= 0 : "hotspot duration should always be non-negative" ;
220- hotspotDurationHistogram .record (hotspotDuration / 1000.0 );
228+ Set <NodeIdName > lastHotspotNodes = hotspotNodeStartTimes .keySet ();
229+ Set <NodeIdName > staleHotspotNodes = Sets .difference (lastHotspotNodes , currentHotspotNodes );
230+ if (staleHotspotNodes .size () > 0 ) {
231+ Map <NodeIdName , Long > hotspotNodeStartTimesUpdate = new HashMap <>(hotspotNodeStartTimes );
232+ for (NodeIdName nodeIdName : staleHotspotNodes ) {
233+ assert hotspotNodeStartTimesUpdate .containsKey (nodeIdName ) : "Map should contain key from its own subset" ;
234+ long hotspotStartTimeMs = hotspotNodeStartTimesUpdate .remove (nodeIdName );
235+ long hotspotDurationMs = hotspotEndTimeMs - hotspotStartTimeMs ;
236+ assert hotspotDurationMs >= 0 : "hotspot duration should always be non-negative" ;
237+ hotspotDurationHistogram .record (
238+ hotspotDurationMs / 1000.0 ,
239+ Map .of ("es_node_id" , nodeIdName .nodeId (), "es_node_name" , nodeIdName .nodeName ())
240+ );
241+ }
242+ hotspotNodeStartTimes = Collections .unmodifiableMap (hotspotNodeStartTimesUpdate );
221243 }
244+
222245 hotspotNodesCount .set (hotspotNodeStartTimes .size ());
223246
224247 return lastHotspotNodes ;
@@ -233,21 +256,32 @@ private List<LongWithAttributes> getHotspotNodesCount() {
233256 }
234257 }
235258
236- private static String nodeSummary (Set <String > nodeIds , ClusterState state ) {
237- final var nodes = state .nodes ();
238- if (nodeIds .isEmpty () == false && nodeIds .size () <= MAX_NODE_IDS_IN_MESSAGE ) {
239- return nodeIds .stream ().map (nodeId -> nodeShortDescription (nodeId , nodes )).collect (Collectors .joining (", " ));
240- } else {
241- return nodeIds .size () + " nodes" ;
259+ private Collection <LongWithAttributes > getHotspottingNodeFlags () {
260+ final Map <NodeIdName , Long > hotspotNodeStartTimesView = hotspotNodeStartTimes ;
261+ List <LongWithAttributes > hotspottingNodeFlags = new ArrayList <>(hotspotNodeStartTimesView .size ());
262+ for (NodeIdName nodeIdName : hotspotNodeStartTimesView .keySet ()) {
263+ hotspottingNodeFlags .add (
264+ new LongWithAttributes (1L , Map .of ("es_node_id" , nodeIdName .nodeId (), "es_node_name" , nodeIdName .nodeName ()))
265+ );
242266 }
267+ return hotspottingNodeFlags ;
243268 }
244269
245- /**
246- * @return "{nodeId}/{nodeName}" if available, or just "{nodeId}" otherwise
247- */
248- private static String nodeShortDescription (String nodeId , DiscoveryNodes nodes ) {
249- final var discoveryNode = nodes .get (nodeId );
250- // It's possible a node might have left the cluster since the ClusterInfo was published
251- return discoveryNode != null ? discoveryNode .getShortNodeDescription () : nodeId ;
270+ public record NodeIdName (String nodeId , String nodeName ) {
271+ public static NodeIdName nodeIdName (DiscoveryNode node ) {
272+ return new NodeIdName (node .getId (), node .getName ());
273+ }
274+
275+ public String shortDescription () {
276+ return nodeId + "/" + nodeName ;
277+ }
278+ }
279+
280+ private static String nodeSummary (Set <NodeIdName > nodeIdNames ) {
281+ if (nodeIdNames .isEmpty () == false && nodeIdNames .size () <= MAX_NODE_IDS_IN_MESSAGE ) {
282+ return nodeIdNames .stream ().map (nodeIdName -> nodeIdName .shortDescription ()).collect (Collectors .joining (", " ));
283+ } else {
284+ return nodeIdNames .size () + " nodes" ;
285+ }
252286 }
253287}
0 commit comments