Skip to content

Commit c9c50ea

Browse files
authored
Log total ingest nodes, don't call reroute when only ML nodes not hot-spotted (#137417)
1 parent bb71252 commit c9c50ea

File tree

4 files changed

+47
-26
lines changed

4 files changed

+47
-26
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/allocation/WriteLoadConstraintMonitorIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() {
7878
WriteLoadConstraintMonitor.class.getCanonicalName(),
7979
Level.DEBUG,
8080
Strings.format("""
81-
Nodes [[%s]] are hot-spotting, of 4 total cluster nodes. Reroute for hot-spotting has never previously been called. \
81+
Nodes [[%s]] are hot-spotting, of 3 total ingest nodes. Reroute for hot-spotting has never previously been called. \
8282
Previously hot-spotting nodes are [0 nodes]. The write thread pool queue latency threshold is [%s]. \
8383
Triggering reroute.
8484
""", getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis))
@@ -113,7 +113,7 @@ public void testRerouteIsCalledWhenHotSpotAppears() {
113113
WriteLoadConstraintMonitor.class.getCanonicalName(),
114114
Level.DEBUG,
115115
Strings.format("""
116-
Nodes [[*]] are hot-spotting, of 4 total cluster nodes. \
116+
Nodes [[*]] are hot-spotting, of 3 total ingest nodes. \
117117
Reroute for hot-spotting was last called [*] ago. Previously hot-spotting nodes are [[%s]]. \
118118
The write thread pool queue latency threshold is [%s]. Triggering reroute.
119119
""", getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis))

server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.elasticsearch.threadpool.ThreadPool;
2828

2929
import java.util.Set;
30-
import java.util.concurrent.atomic.AtomicBoolean;
3130
import java.util.function.LongSupplier;
3231
import java.util.function.Supplier;
3332

@@ -78,28 +77,33 @@ public void onNewInfo(ClusterInfo clusterInfo) {
7877

7978
final int numberOfNodes = clusterInfo.getNodeUsageStatsForThreadPools().size();
8079
final Set<String> writeNodeIdsExceedingQueueLatencyThreshold = Sets.newHashSetWithExpectedSize(numberOfNodes);
81-
AtomicBoolean haveWriteNodesBelowQueueLatencyThreshold = new AtomicBoolean(false);
82-
clusterInfo.getNodeUsageStatsForThreadPools().forEach((nodeId, usageStats) -> {
83-
if (state.getNodes().get(nodeId).getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) {
84-
// Search nodes are not expected to have write load hot-spots and are not considered for shard relocation.
80+
var haveWriteNodesBelowQueueLatencyThreshold = false;
81+
var totalIngestNodes = 0;
82+
for (var entry : clusterInfo.getNodeUsageStatsForThreadPools().entrySet()) {
83+
final var nodeId = entry.getKey();
84+
final var usageStats = entry.getValue();
85+
final var nodeRoles = state.getNodes().get(nodeId).getRoles();
86+
if (nodeRoles.contains(DiscoveryNodeRole.SEARCH_ROLE) || nodeRoles.contains(DiscoveryNodeRole.ML_ROLE)) {
87+
// Search & ML nodes are not expected to have write load hot-spots and are not considered for shard relocation.
8588
// TODO (ES-13314): consider stateful data tiers
86-
return;
89+
continue;
8790
}
91+
totalIngestNodes++;
8892
final NodeUsageStatsForThreadPools.ThreadPoolUsageStats writeThreadPoolStats = usageStats.threadPoolUsageStatsMap()
8993
.get(ThreadPool.Names.WRITE);
9094
assert writeThreadPoolStats != null : "Write thread pool is not publishing usage stats for node [" + nodeId + "]";
9195
if (writeThreadPoolStats.maxThreadPoolQueueLatencyMillis() >= writeLoadConstraintSettings.getQueueLatencyThreshold().millis()) {
9296
writeNodeIdsExceedingQueueLatencyThreshold.add(nodeId);
9397
} else {
94-
haveWriteNodesBelowQueueLatencyThreshold.set(true);
98+
haveWriteNodesBelowQueueLatencyThreshold = true;
9599
}
96-
});
100+
}
97101

98102
if (writeNodeIdsExceedingQueueLatencyThreshold.isEmpty()) {
99103
logger.trace("No hot-spotting write nodes detected");
100104
return;
101105
}
102-
if (haveWriteNodesBelowQueueLatencyThreshold.get() == false) {
106+
if (haveWriteNodesBelowQueueLatencyThreshold == false) {
103107
logger.debug("""
104108
Nodes [{}] are above the queue latency threshold, but there are no write nodes below the threshold. \
105109
Cannot rebalance shards.""", nodeSummary(writeNodeIdsExceedingQueueLatencyThreshold));
@@ -118,11 +122,11 @@ public void onNewInfo(ClusterInfo clusterInfo) {
118122
if (logger.isDebugEnabled()) {
119123
logger.debug(
120124
"""
121-
Nodes [{}] are hot-spotting, of {} total cluster nodes. Reroute for hot-spotting {}. \
125+
Nodes [{}] are hot-spotting, of {} total ingest nodes. Reroute for hot-spotting {}. \
122126
Previously hot-spotting nodes are [{}]. The write thread pool queue latency threshold is [{}]. Triggering reroute.
123127
""",
124128
nodeSummary(writeNodeIdsExceedingQueueLatencyThreshold),
125-
state.nodes().size(),
129+
totalIngestNodes,
126130
lastRerouteTimeMillis == 0
127131
? "has never previously been called"
128132
: "was last called [" + TimeValue.timeValueMillis(timeSinceLastRerouteMillis) + "] ago",

server/src/test/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitorTests.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ public void testRerouteIsNotCalledInAnAllNodesAreHotSpottingCluster() {
173173
final TestState testState = createTestStateWithNumberOfNodesAndHotSpots(
174174
numberOfIndexNodes,
175175
randomIntBetween(1, 5), // Search nodes should not be considered to address write load hot-spots.
176+
randomIntBetween(1, 5), // ML nodes should not be considered to address write load hot-spots.
176177
numberOfIndexNodes
177178
);
178179
final WriteLoadConstraintMonitor writeLoadConstraintMonitor = new WriteLoadConstraintMonitor(
@@ -282,7 +283,7 @@ public void testRerouteIsCalledBeforeMinimumIntervalHasPassedIfNewNodesBecomeHot
282283
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsWithExtraHotSpot = new HashMap<>();
283284
for (var entry : testState.clusterInfo.getNodeUsageStatsForThreadPools().entrySet()) {
284285
if (thresholdIncreased.get() == false
285-
&& nonSearchNodeBelowQueueLatencyThreshold(
286+
&& indexingNodeBelowQueueLatencyThreshold(
286287
testState.clusterState,
287288
entry.getKey(),
288289
entry.getValue(),
@@ -316,13 +317,15 @@ && nonSearchNodeBelowQueueLatencyThreshold(
316317
verify(testState.mockRerouteService).reroute(anyString(), eq(Priority.NORMAL), any());
317318
}
318319

319-
private boolean nonSearchNodeBelowQueueLatencyThreshold(
320+
private boolean indexingNodeBelowQueueLatencyThreshold(
320321
ClusterState clusterState,
321322
String nodeId,
322323
NodeUsageStatsForThreadPools nodeUsageStats,
323324
long latencyThresholdMillis
324325
) {
325-
return clusterState.getNodes().get(nodeId).getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE) == false
326+
final var nodeRoles = clusterState.getNodes().get(nodeId).getRoles();
327+
return nodeRoles.contains(DiscoveryNodeRole.SEARCH_ROLE) == false
328+
&& nodeRoles.contains(DiscoveryNodeRole.ML_ROLE) == false
326329
&& nodeUsageStats.threadPoolUsageStatsMap()
327330
.get(ThreadPool.Names.WRITE)
328331
.maxThreadPoolQueueLatencyMillis() < latencyThresholdMillis;
@@ -331,12 +334,18 @@ private boolean nonSearchNodeBelowQueueLatencyThreshold(
331334
private TestState createRandomTestStateThatWillTriggerReroute() {
332335
int numberOfNodes = randomIntBetween(3, 10);
333336
int numberOfHotSpottingNodes = numberOfNodes - 2; // Leave at least 2 non-hot-spotting nodes.
334-
return createTestStateWithNumberOfNodesAndHotSpots(numberOfNodes, randomIntBetween(0, 5), numberOfHotSpottingNodes);
337+
return createTestStateWithNumberOfNodesAndHotSpots(
338+
numberOfNodes,
339+
randomIntBetween(0, 5), // search nodes
340+
randomIntBetween(0, 2), // ML nodes
341+
numberOfHotSpottingNodes
342+
);
335343
}
336344

337345
private TestState createTestStateWithNumberOfNodesAndHotSpots(
338346
int numberOfIndexNodes,
339347
int numberOfSearchNodes,
348+
int numberOfMLNodes,
340349
int numberOfHotSpottingNodes
341350
) {
342351
assert numberOfHotSpottingNodes <= numberOfIndexNodes;
@@ -350,8 +359,9 @@ private TestState createTestStateWithNumberOfNodesAndHotSpots(
350359
final ClusterState state = ClusterStateCreationUtils.buildServerlessRoleNodes(
351360
randomIdentifier(), // index name
352361
randomIntBetween(1, numberOfIndexNodes), // num shard primaries
353-
numberOfIndexNodes, // number of index role nodes
354-
numberOfSearchNodes // number of search role nodes
362+
numberOfIndexNodes,
363+
numberOfSearchNodes,
364+
numberOfMLNodes
355365
);
356366

357367
final RerouteService rerouteService = mock(RerouteService.class);
@@ -400,7 +410,8 @@ private static ClusterSettings createClusterSettings(
400410
/**
401411
* Create a {@link ClusterInfo} with the specified number of hot spotting index nodes,
402412
* all other index nodes will have no queue latency and have utilization below the specified
403-
* high-utilization threshold. Any search nodes in the cluster will have zero usage write load stats.
413+
* high-utilization threshold. Any search or ML nodes in the cluster will have zero usage
414+
* write load stats.
404415
*
405416
* @param state The cluster state
406417
* @param numberOfNodesHotSpotting The number of nodes that should be hot-spotting
@@ -429,8 +440,8 @@ private static ClusterInfo createClusterInfoWithHotSpots(
429440
final AtomicInteger hotSpottingNodes = new AtomicInteger(numberOfNodesHotSpotting);
430441
return ClusterInfo.builder()
431442
.nodeUsageStatsForThreadPools(state.nodes().stream().collect(Collectors.toMap(DiscoveryNode::getId, node -> {
432-
if (node.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE)) {
433-
// Search nodes are skipped for write load hot-spots.
443+
if (node.getRoles().contains(DiscoveryNodeRole.SEARCH_ROLE) || node.getRoles().contains(DiscoveryNodeRole.ML_ROLE)) {
444+
// Search & ML nodes are skipped for write load hot-spots.
434445
return new NodeUsageStatsForThreadPools(node.getId(), ZERO_USAGE_THREAD_POOL_USAGE_MAP);
435446
}
436447
if (hotSpottingNodes.getAndDecrement() > 0) {

test/framework/src/main/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -334,15 +334,17 @@ public static ClusterState state(ProjectId projectId, String indexName, int numb
334334
}
335335

336336
/**
337-
* Creates cluster state with an index that has #(numberOfPrimaries) primary shards in the started state and no replicas. The cluster
338-
* state contains #(numberOfIndexNodes) nodes with {@link DiscoveryNodeRole#INDEX_ROLE}, assigning the primary shards to those nodes,
339-
* and #(numberOfSearchNodes) nodes with {@link DiscoveryNodeRole#SEARCH_ROLE}.
337+
* Creates cluster state with an index that has {@code numberOfPrimaries} primary shards in the started state and no replicas. The
338+
* cluster state contains {@code numberOfIndexNodes} nodes with {@link DiscoveryNodeRole#INDEX_ROLE}, assigning the primary shards
339+
* to those nodes, {@code numberOfSearchNodes} nodes with {@link DiscoveryNodeRole#SEARCH_ROLE}, and {@code numberOfMLNodes} with
340+
* {@link DiscoveryNodeRole#ML_ROLE}.
340341
*/
341342
public static ClusterState buildServerlessRoleNodes(
342343
String indexName,
343344
int numberOfPrimaries,
344345
int numberOfIndexNodes,
345-
int numberOfSearchNodes
346+
int numberOfSearchNodes,
347+
int numberOfMLNodes
346348
) {
347349
ProjectId projectId = Metadata.DEFAULT_PROJECT_ID;
348350
DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
@@ -356,6 +358,10 @@ public static ClusterState buildServerlessRoleNodes(
356358
final DiscoveryNode node = DiscoveryNodeUtils.builder("search_" + i).roles(Set.of(DiscoveryNodeRole.SEARCH_ROLE)).build();
357359
discoBuilder = discoBuilder.add(node);
358360
}
361+
for (int i = 0; i < numberOfMLNodes; i++) {
362+
final DiscoveryNode node = DiscoveryNodeUtils.builder("ml_" + i).roles(Set.of(DiscoveryNodeRole.ML_ROLE)).build();
363+
discoBuilder = discoBuilder.add(node);
364+
}
359365
discoBuilder.localNodeId(randomFrom(indexNodeIds));
360366
discoBuilder.masterNodeId(randomFrom(indexNodeIds));
361367
IndexState index = buildIndex(indexName, numberOfPrimaries, indexNodeIds);

0 commit comments

Comments
 (0)