1111
1212import org .apache .logging .log4j .LogManager ;
1313import org .apache .logging .log4j .Logger ;
14+ import org .elasticsearch .cluster .NodeUsageStatsForThreadPools .ThreadPoolUsageStats ;
1415import org .elasticsearch .cluster .metadata .IndexMetadata ;
1516import org .elasticsearch .cluster .routing .RoutingNode ;
1617import org .elasticsearch .cluster .routing .ShardRouting ;
@@ -74,6 +75,22 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
7475 return Decision .NO ;
7576 }
7677
78+ if (nodeWriteThreadPoolStats .averageThreadPoolUtilization () + calculateShardMovementChange (nodeWriteThreadPoolStats , shardWriteLoad ) >= nodeWriteThreadPoolLoadThreshold ) {
79+ // The node's write thread pool usage would be raised above the high utilization threshold. This could lead to a hot spot on
80+ // this node and is undesirable.
81+ logger .debug (
82+ "The high utilization threshold of {} would be exceeded on node {} if shard {} with estimated write load {} were "
83+ + "assigned to it. Cannot allocate shard {} to node {} without risking increased write latencies." ,
84+ nodeWriteThreadPoolLoadThreshold ,
85+ node .nodeId (),
86+ shardRouting .shardId (),
87+ shardWriteLoad ,
88+ shardRouting .shardId (),
89+ node .nodeId ()
90+ );
91+ return Decision .NO ;
92+ }
93+
7794 return Decision .YES ;
7895 }
7996
@@ -86,4 +103,14 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
86103 return Decision .YES ;
87104 }
88105
106+ /**
107+ * Calculates the change to the node's write thread pool utilization percentage if the shard is added to the node.
108+ * Returns the percent thread pool utilization change.
109+ */
110+ private float calculateShardMovementChange (ThreadPoolUsageStats nodeWriteThreadPoolStats , double shardWriteLoad ) {
111+ assert shardWriteLoad > 0 ;
112+ // NOMERGE: move this into an utility class, should be commonly accessible with the simulator.
113+ // TODO: implement..
114+ return 0 ;
115+ }
89116}
0 commit comments