Skip to content

Commit 66e9e81

Browse files
authored
Add ThreadWatchdog to ClusterApplierService (#134361)
Adds another layer of detection of slow activity on the cluster applier thread. In particular this can detect activity that isn't included in an `UpdateTask`, which particularly may include completing an expensive listener attached to a `ClusterStateObserver`. Moreover it captures a thread dump if slow activity is detected.
1 parent f081e45 commit 66e9e81

File tree

25 files changed

+311
-40
lines changed

25 files changed

+311
-40
lines changed

docs/changelog/134361.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 134361
2+
summary: Add `ThreadWatchdog` to `ClusterApplierService`
3+
area: Cluster Coordination
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
2626
import org.elasticsearch.common.Priority;
2727
import org.elasticsearch.common.component.AbstractLifecycleComponent;
28+
import org.elasticsearch.common.network.ThreadWatchdog;
2829
import org.elasticsearch.common.settings.ClusterSettings;
2930
import org.elasticsearch.common.settings.Setting;
3031
import org.elasticsearch.common.settings.Settings;
@@ -75,13 +76,27 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
7576
Setting.Property.NodeScope
7677
);
7778

79+
public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL = Setting.positiveTimeSetting(
80+
"cluster.service.applier.thread.watchdog.interval",
81+
TimeValue.timeValueMinutes(5),
82+
Setting.Property.NodeScope
83+
);
84+
85+
public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME = Setting.positiveTimeSetting(
86+
"cluster.service.applier.thread.watchdog.quiet_time",
87+
TimeValue.timeValueHours(1),
88+
Setting.Property.NodeScope
89+
);
90+
7891
public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";
7992

8093
private final ClusterSettings clusterSettings;
8194
private final ThreadPool threadPool;
8295

8396
private volatile TimeValue slowTaskLoggingThreshold;
8497
private volatile TimeValue slowTaskThreadDumpTimeout;
98+
private final TimeValue watchdogInterval;
99+
private final TimeValue watchdogQuietTime;
85100

86101
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
87102

@@ -103,6 +118,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
103118

104119
private NodeConnectionsService nodeConnectionsService;
105120

121+
private final ThreadWatchdog threadWatchdog = new ThreadWatchdog();
122+
106123
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
107124
this.clusterSettings = clusterSettings;
108125
this.threadPool = threadPool;
@@ -112,6 +129,9 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
112129

113130
clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, t -> slowTaskLoggingThreshold = t);
114131
clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, t -> slowTaskThreadDumpTimeout = t);
132+
133+
this.watchdogInterval = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL);
134+
this.watchdogQuietTime = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME);
115135
}
116136

117137
public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
@@ -133,6 +153,7 @@ protected synchronized void doStart() {
133153
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
134154
Objects.requireNonNull(state.get(), "please set initial state before starting");
135155
threadPoolExecutor = createThreadPoolExecutor();
156+
threadWatchdog.run(watchdogInterval, watchdogQuietTime, threadPool, lifecycle, logger);
136157
}
137158

138159
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
@@ -156,7 +177,13 @@ class UpdateTask extends SourcePrioritizedRunnable {
156177

157178
@Override
158179
public void run() {
159-
runTask(source(), updateFunction, listener);
180+
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
181+
try {
182+
activityTracker.startActivity();
183+
runTask(source(), updateFunction, listener);
184+
} finally {
185+
activityTracker.stopActivity();
186+
}
160187
}
161188
}
162189

@@ -289,17 +316,23 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC
289316
threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
290317
@Override
291318
public void run() {
292-
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
293-
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
294-
assert previous == null : "Added same listener [" + listener + "]";
295-
if (lifecycle.stoppedOrClosed()) {
296-
listener.onClose();
297-
return;
298-
}
299-
if (timeout != null) {
300-
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
319+
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
320+
try {
321+
activityTracker.startActivity();
322+
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
323+
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
324+
assert previous == null : "Added same listener [" + listener + "]";
325+
if (lifecycle.stoppedOrClosed()) {
326+
listener.onClose();
327+
return;
328+
}
329+
if (timeout != null) {
330+
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
331+
}
332+
listener.postAdded();
333+
} finally {
334+
activityTracker.stopActivity();
301335
}
302-
listener.postAdded();
303336
}
304337
});
305338
} catch (EsRejectedExecutionException e) {

server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ public class ThreadWatchdog {
5050
Setting.Property.NodeScope
5151
);
5252

53-
private static final Logger logger = LogManager.getLogger(ThreadWatchdog.class);
54-
5553
/**
5654
* Activity tracker for the current thread. Thread-locals are only retained by the owning thread so these will be GCd after thread exit.
5755
*/
@@ -169,8 +167,17 @@ private static boolean isIdle(long value) {
169167
}
170168

171169
public void run(Settings settings, ThreadPool threadPool, Lifecycle lifecycle) {
172-
new Checker(threadPool, NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), lifecycle)
173-
.run();
170+
run(
171+
NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings),
172+
NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings),
173+
threadPool,
174+
lifecycle,
175+
LogManager.getLogger(ThreadWatchdog.class)
176+
);
177+
}
178+
179+
public void run(TimeValue interval, TimeValue quietTime, ThreadPool threadPool, Lifecycle lifecycle, Logger logger) {
180+
new Checker(threadPool, interval, quietTime, lifecycle, logger).run();
174181
}
175182

176183
/**
@@ -182,12 +189,14 @@ private final class Checker extends AbstractRunnable {
182189
private final TimeValue interval;
183190
private final TimeValue quietTime;
184191
private final Lifecycle lifecycle;
192+
private final Logger logger;
185193

186-
Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle) {
194+
Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle, Logger logger) {
187195
this.threadPool = threadPool;
188196
this.interval = interval;
189197
this.quietTime = quietTime.compareTo(interval) <= 0 ? interval : quietTime;
190198
this.lifecycle = lifecycle;
199+
this.logger = logger;
191200
assert this.interval.millis() <= this.quietTime.millis();
192201
}
193202

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ public void apply(Settings value, Settings current, Settings previous) {
358358
IndexSettings.NODE_DEFAULT_REFRESH_INTERVAL_SETTING,
359359
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
360360
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING,
361+
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL,
362+
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME,
361363
ClusterService.USER_DEFINED_METADATA,
362364
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
363365
MasterService.MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING,

server/src/test/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresActionTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424
import org.elasticsearch.cluster.node.DiscoveryNodes;
2525
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2626
import org.elasticsearch.cluster.routing.RoutingTable;
27+
import org.elasticsearch.cluster.service.ClusterApplierService;
2728
import org.elasticsearch.cluster.service.ClusterService;
2829
import org.elasticsearch.common.settings.ClusterSettings;
2930
import org.elasticsearch.common.settings.Settings;
3031
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
32+
import org.elasticsearch.core.TimeValue;
3133
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
3234
import org.elasticsearch.index.IndexVersion;
3335
import org.elasticsearch.index.shard.ShardId;
@@ -192,7 +194,11 @@ private abstract static class TestHarness implements Closeable {
192194

193195
final var threadPool = deterministicTaskQueue.getThreadPool();
194196

195-
final var settings = Settings.EMPTY;
197+
final var settings = Settings.builder()
198+
// disable thread watchdog to avoid infinitely repeating task
199+
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
200+
.build();
201+
196202
final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
197203

198204
final var transportService = new TransportService(

server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.settings.Settings;
3434
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
3535
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
36+
import org.elasticsearch.core.TimeValue;
3637
import org.elasticsearch.node.Node;
3738
import org.elasticsearch.test.ClusterServiceUtils;
3839
import org.elasticsearch.test.ESTestCase;
@@ -63,6 +64,7 @@ public void testScheduling() {
6364
final Settings.Builder settingsBuilder = Settings.builder()
6465
.put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName())
6566
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
67+
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
6668
.put(
6769
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
6870
randomBoolean()

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,10 @@ public void testBalanceQuality() throws IOException {
216216
final var deterministicTaskQueue = new DeterministicTaskQueue();
217217
final var threadPool = deterministicTaskQueue.getThreadPool();
218218

219-
final var settings = Settings.EMPTY;
219+
final var settings = Settings.builder()
220+
// disable thread watchdog to avoid infinitely repeating task
221+
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
222+
.build();
220223
final var clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
221224

222225
final var masterService = new MasterService(

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,11 @@ public void testAllocate(AllocateUnassignedHandler allocateUnassigned, Consumer<
145145
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
146146
.build();
147147

148-
var settings = Settings.EMPTY;
148+
final var settings = Settings.builder()
149+
// disable thread watchdog to avoid infinitely repeating task
150+
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
151+
.build();
152+
149153
var clusterSettings = createBuiltInClusterSettings(settings);
150154
var clusterService = new ClusterService(
151155
settings,
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.service;
11+
12+
import org.apache.logging.log4j.Level;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.cluster.ClusterChangedEvent;
15+
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.NodeConnectionsService;
17+
import org.elasticsearch.cluster.TimeoutClusterStateListener;
18+
import org.elasticsearch.common.Priority;
19+
import org.elasticsearch.common.settings.ClusterSettings;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
22+
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
23+
import org.elasticsearch.core.TimeValue;
24+
import org.elasticsearch.test.ESTestCase;
25+
import org.elasticsearch.test.MockLog;
26+
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
29+
import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL;
30+
import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME;
31+
import static org.mockito.Mockito.mock;
32+
33+
public class ClusterApplierServiceWatchdogTests extends ESTestCase {
34+
35+
public void testThreadWatchdogLogging() {
36+
final var deterministicTaskQueue = new DeterministicTaskQueue();
37+
38+
final var settingsBuilder = Settings.builder();
39+
40+
final long intervalMillis;
41+
if (randomBoolean()) {
42+
intervalMillis = randomLongBetween(1, 1_000_000);
43+
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.timeValueMillis(intervalMillis));
44+
} else {
45+
intervalMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.get(Settings.EMPTY).millis();
46+
}
47+
48+
final long quietTimeMillis;
49+
if (randomBoolean()) {
50+
quietTimeMillis = randomLongBetween(intervalMillis, 3 * intervalMillis);
51+
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.getKey(), TimeValue.timeValueMillis(quietTimeMillis));
52+
} else {
53+
quietTimeMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.get(Settings.EMPTY).millis();
54+
}
55+
56+
final var settings = settingsBuilder.build();
57+
58+
try (
59+
var clusterApplierService = new ClusterApplierService(
60+
randomIdentifier(),
61+
settings,
62+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
63+
deterministicTaskQueue.getThreadPool()
64+
) {
65+
@Override
66+
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
67+
return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
68+
}
69+
};
70+
var mockLog = MockLog.capture(ClusterApplierService.class)
71+
) {
72+
clusterApplierService.setNodeConnectionsService(mock(NodeConnectionsService.class));
73+
clusterApplierService.setInitialState(ClusterState.EMPTY_STATE);
74+
clusterApplierService.start();
75+
76+
final AtomicBoolean completedTask = new AtomicBoolean();
77+
78+
final Runnable hotThreadsDumpsAsserter = () -> {
79+
final var startMillis = deterministicTaskQueue.getCurrentTimeMillis();
80+
81+
for (int i = 0; i < 3; i++) {
82+
mockLog.addExpectation(
83+
new MockLog.SeenEventExpectation(
84+
"hot threads dump [" + i + "]",
85+
ClusterApplierService.class.getCanonicalName(),
86+
Level.WARN,
87+
"hot threads dump due to active threads not making progress"
88+
)
89+
);
90+
91+
while (deterministicTaskQueue.getCurrentTimeMillis() < startMillis + 2 * intervalMillis + i * quietTimeMillis) {
92+
deterministicTaskQueue.advanceTime();
93+
deterministicTaskQueue.runAllRunnableTasks();
94+
}
95+
96+
mockLog.assertAllExpectationsMatched();
97+
}
98+
};
99+
100+
if (randomBoolean()) {
101+
clusterApplierService.runOnApplierThread(
102+
"slow task",
103+
randomFrom(Priority.values()),
104+
ignored -> hotThreadsDumpsAsserter.run(),
105+
ActionListener.running(() -> assertTrue(completedTask.compareAndSet(false, true)))
106+
);
107+
} else {
108+
class TestListener implements TimeoutClusterStateListener {
109+
@Override
110+
public void postAdded() {
111+
hotThreadsDumpsAsserter.run();
112+
}
113+
114+
@Override
115+
public void onClose() {
116+
fail("should time out before closing");
117+
}
118+
119+
@Override
120+
public void onTimeout(TimeValue timeout) {
121+
assertTrue(completedTask.compareAndSet(false, true));
122+
clusterApplierService.removeTimeoutListener(TestListener.this);
123+
}
124+
125+
@Override
126+
public void clusterChanged(ClusterChangedEvent event) {
127+
fail("no cluster state updates expected");
128+
}
129+
}
130+
131+
clusterApplierService.addTimeoutListener(
132+
// timeout sufficiently short that it elapses while postAdded() is still running
133+
TimeValue.timeValueMillis(randomLongBetween(0, 2 * intervalMillis + 2 * quietTimeMillis)),
134+
new TestListener()
135+
);
136+
}
137+
138+
deterministicTaskQueue.runAllRunnableTasks();
139+
140+
assertTrue(completedTask.get());
141+
}
142+
}
143+
}

0 commit comments

Comments
 (0)