Skip to content

Commit 7d8b8be

Browse files
nicktindallKubik42
authored andcommitted
Add WriteLoadConstraintMonitorIT (elastic#136459)
1 parent 5433259 commit 7d8b8be

File tree

2 files changed

+200
-1
lines changed

2 files changed

+200
-1
lines changed
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.allocation;
11+
12+
import org.apache.logging.log4j.Level;
13+
import org.elasticsearch.action.admin.cluster.node.usage.NodeUsageStatsForThreadPoolsAction;
14+
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodeUsageStatsForThreadPoolsAction;
15+
import org.elasticsearch.action.support.ChannelActionListener;
16+
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
17+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor;
18+
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
19+
import org.elasticsearch.common.Strings;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.util.CollectionUtils;
22+
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.plugins.Plugin;
24+
import org.elasticsearch.test.ESIntegTestCase;
25+
import org.elasticsearch.test.MockLog;
26+
import org.elasticsearch.test.junit.annotations.TestLogging;
27+
import org.elasticsearch.test.transport.MockTransportService;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.transport.TestTransportChannel;
30+
31+
import java.util.Collection;
32+
import java.util.Map;
33+
import java.util.stream.Collectors;
34+
35+
@ESIntegTestCase.ClusterScope(numDataNodes = 0)
36+
public class WriteLoadConstraintMonitorIT extends ESIntegTestCase {
37+
38+
@Override
39+
protected Collection<Class<? extends Plugin>> nodePlugins() {
40+
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
41+
}
42+
43+
@TestLogging(
44+
value = "org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintMonitor:TRACE",
45+
reason = "so we can see what the monitor is doing"
46+
)
47+
public void testRerouteIsCalledWhenHotSpotAppears() {
48+
// Set the threshold very high so we don't get any non-synthetic hot-spotting occurring
49+
final long queueLatencyThresholdMillis = randomLongBetween(50_000, 100_000);
50+
final Settings settings = enabledWriteLoadDeciderSettings(queueLatencyThresholdMillis);
51+
internalCluster().startMasterOnlyNode(settings);
52+
final String dataNodeOne = internalCluster().startDataOnlyNode(settings);
53+
final String dataNodeTwo = internalCluster().startDataOnlyNode(settings);
54+
55+
// Unmodified cluster info should detect no hot-spotting nodes
56+
MockLog.awaitLogger(
57+
ESIntegTestCase::refreshClusterInfo,
58+
WriteLoadConstraintMonitor.class,
59+
new MockLog.SeenEventExpectation(
60+
"no hot-spots detected",
61+
WriteLoadConstraintMonitor.class.getCanonicalName(),
62+
Level.TRACE,
63+
"No hot-spotting nodes detected"
64+
)
65+
);
66+
67+
// Simulate hot-spotting on a node
68+
simulateHotSpottingOnNode(dataNodeOne, queueLatencyThresholdMillis);
69+
70+
// Single node hot-spotting should trigger reroute
71+
MockLog.awaitLogger(
72+
ESIntegTestCase::refreshClusterInfo,
73+
WriteLoadConstraintMonitor.class,
74+
new MockLog.SeenEventExpectation(
75+
"hot spot detected message",
76+
WriteLoadConstraintMonitor.class.getCanonicalName(),
77+
Level.DEBUG,
78+
Strings.format("""
79+
Nodes [[%s]] are hot-spotting, of 3 total cluster nodes. Reroute for hot-spotting has never previously been called. \
80+
Previously hot-spotting nodes are [0 nodes]. The write thread pool queue latency threshold is [%s]. \
81+
Triggering reroute.
82+
""", getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis))
83+
)
84+
);
85+
86+
// We should skip another re-route if no additional nodes are hot-spotting
87+
MockLog.awaitLogger(
88+
ESIntegTestCase::refreshClusterInfo,
89+
WriteLoadConstraintMonitor.class,
90+
new MockLog.SeenEventExpectation(
91+
"reroute skipped due to being called recently",
92+
WriteLoadConstraintMonitor.class.getCanonicalName(),
93+
Level.DEBUG,
94+
Strings.format(
95+
"Not calling reroute because we called reroute [*] ago and there are no new hot spots",
96+
getNodeId(dataNodeOne),
97+
TimeValue.timeValueMillis(queueLatencyThresholdMillis)
98+
)
99+
)
100+
);
101+
102+
// Simulate hot-spotting on an additional node
103+
simulateHotSpottingOnNode(dataNodeTwo, queueLatencyThresholdMillis);
104+
105+
// Additional node hot-spotting should trigger reroute
106+
MockLog.awaitLogger(
107+
ESIntegTestCase::refreshClusterInfo,
108+
WriteLoadConstraintMonitor.class,
109+
new MockLog.SeenEventExpectation(
110+
"hot spot detected message",
111+
WriteLoadConstraintMonitor.class.getCanonicalName(),
112+
Level.DEBUG,
113+
Strings.format("""
114+
Nodes [[*]] are hot-spotting, of 3 total cluster nodes. \
115+
Reroute for hot-spotting was last called [*] ago. Previously hot-spotting nodes are [[%s]]. \
116+
The write thread pool queue latency threshold is [%s]. Triggering reroute.
117+
""", getNodeId(dataNodeOne), TimeValue.timeValueMillis(queueLatencyThresholdMillis))
118+
)
119+
);
120+
121+
// Clear simulated hot-spotting
122+
MockTransportService.getInstance(dataNodeOne).clearAllRules();
123+
MockTransportService.getInstance(dataNodeTwo).clearAllRules();
124+
125+
// We should again detect no hot-spotting nodes
126+
MockLog.awaitLogger(
127+
ESIntegTestCase::refreshClusterInfo,
128+
WriteLoadConstraintMonitor.class,
129+
new MockLog.SeenEventExpectation(
130+
"no hot-spots detected",
131+
WriteLoadConstraintMonitor.class.getCanonicalName(),
132+
Level.TRACE,
133+
"No hot-spotting nodes detected"
134+
)
135+
);
136+
}
137+
138+
private void simulateHotSpottingOnNode(String nodeName, long queueLatencyThresholdMillis) {
139+
MockTransportService.getInstance(nodeName)
140+
.addRequestHandlingBehavior(TransportNodeUsageStatsForThreadPoolsAction.NAME + "[n]", (handler, request, channel, task) -> {
141+
handler.messageReceived(
142+
request,
143+
new TestTransportChannel(new ChannelActionListener<>(channel).delegateFailure((l, response) -> {
144+
NodeUsageStatsForThreadPoolsAction.NodeResponse r = (NodeUsageStatsForThreadPoolsAction.NodeResponse) response;
145+
l.onResponse(
146+
new NodeUsageStatsForThreadPoolsAction.NodeResponse(
147+
r.getNode(),
148+
new NodeUsageStatsForThreadPools(
149+
r.getNodeUsageStatsForThreadPools().nodeId(),
150+
addQueueLatencyToWriteThreadPool(
151+
r.getNodeUsageStatsForThreadPools().threadPoolUsageStatsMap(),
152+
queueLatencyThresholdMillis
153+
)
154+
)
155+
)
156+
);
157+
})),
158+
task
159+
);
160+
});
161+
}
162+
163+
private Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> addQueueLatencyToWriteThreadPool(
164+
Map<String, NodeUsageStatsForThreadPools.ThreadPoolUsageStats> stringThreadPoolUsageStatsMap,
165+
long queueLatencyThresholdMillis
166+
) {
167+
return stringThreadPoolUsageStatsMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> {
168+
NodeUsageStatsForThreadPools.ThreadPoolUsageStats originalStats = e.getValue();
169+
if (e.getKey().equals(ThreadPool.Names.WRITE)) {
170+
return new NodeUsageStatsForThreadPools.ThreadPoolUsageStats(
171+
originalStats.totalThreadPoolThreads(),
172+
originalStats.averageThreadPoolUtilization(),
173+
randomLongBetween(queueLatencyThresholdMillis * 2, queueLatencyThresholdMillis * 3)
174+
);
175+
}
176+
return originalStats;
177+
}));
178+
179+
}
180+
181+
/**
182+
* Enables the write-load decider and overrides other write load decider settings.
183+
* @param queueLatencyThresholdMillis Exceeding this is what makes the monitor call re-route
184+
*/
185+
private Settings enabledWriteLoadDeciderSettings(long queueLatencyThresholdMillis) {
186+
return Settings.builder()
187+
.put(
188+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
189+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
190+
)
191+
.put(
192+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING.getKey(),
193+
TimeValue.timeValueMillis(queueLatencyThresholdMillis)
194+
)
195+
// Make the re-route interval large so we can test it
196+
.put(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(5))
197+
.build();
198+
}
199+
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/WriteLoadConstraintMonitor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void onNewInfo(ClusterInfo clusterInfo) {
111111
? "has never previously been called"
112112
: "was last called [" + TimeValue.timeValueMillis(timeSinceLastRerouteMillis) + "] ago",
113113
nodeSummary(lastSetOfHotSpottedNodes),
114-
state.nodes().size()
114+
writeLoadConstraintSettings.getQueueLatencyThreshold()
115115
);
116116
}
117117
final String reason = "hot-spotting detected by write load constraint monitor";

0 commit comments

Comments
 (0)