Skip to content

Commit b0e00ab

Browse files
committed
Add ThreadWatchdog to ClusterApplierService
Adds another layer of detection of slow activity on the cluster applier thread. In particular this can detect activity that isn't included in an `UpdateTask`, which particularly may include completing an expensive listener attached to a `ClusterStateObserver`. Moreover it captures a thread dump if slow activity is detected.
1 parent e5c91ca commit b0e00ab

File tree

5 files changed

+181
-17
lines changed

5 files changed

+181
-17
lines changed

server/src/main/java/org/elasticsearch/cluster/service/ClusterApplierService.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
2626
import org.elasticsearch.common.Priority;
2727
import org.elasticsearch.common.component.AbstractLifecycleComponent;
28+
import org.elasticsearch.common.network.ThreadWatchdog;
2829
import org.elasticsearch.common.settings.ClusterSettings;
2930
import org.elasticsearch.common.settings.Setting;
3031
import org.elasticsearch.common.settings.Settings;
@@ -75,13 +76,27 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
7576
Setting.Property.NodeScope
7677
);
7778

79+
public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL = Setting.positiveTimeSetting(
80+
"cluster.service.applier.thread.watchdog.interval",
81+
TimeValue.timeValueMinutes(5),
82+
Setting.Property.NodeScope
83+
);
84+
85+
public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME = Setting.positiveTimeSetting(
86+
"cluster.service.applier.thread.watchdog.quiet_time",
87+
TimeValue.timeValueHours(1),
88+
Setting.Property.NodeScope
89+
);
90+
7891
public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";
7992

8093
private final ClusterSettings clusterSettings;
8194
private final ThreadPool threadPool;
8295

8396
private volatile TimeValue slowTaskLoggingThreshold;
8497
private volatile TimeValue slowTaskThreadDumpTimeout;
98+
private final TimeValue watchdogInterval;
99+
private final TimeValue watchdogQuietTime;
85100

86101
private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;
87102

@@ -103,6 +118,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
103118

104119
private NodeConnectionsService nodeConnectionsService;
105120

121+
private final ThreadWatchdog threadWatchdog = new ThreadWatchdog();
122+
106123
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
107124
this.clusterSettings = clusterSettings;
108125
this.threadPool = threadPool;
@@ -112,6 +129,9 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
112129

113130
clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, t -> slowTaskLoggingThreshold = t);
114131
clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, t -> slowTaskThreadDumpTimeout = t);
132+
133+
this.watchdogInterval = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL);
134+
this.watchdogQuietTime = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME);
115135
}
116136

117137
public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
@@ -133,6 +153,7 @@ protected synchronized void doStart() {
133153
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
134154
Objects.requireNonNull(state.get(), "please set initial state before starting");
135155
threadPoolExecutor = createThreadPoolExecutor();
156+
threadWatchdog.run(watchdogInterval, watchdogQuietTime, threadPool, lifecycle, logger);
136157
}
137158

138159
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
@@ -156,7 +177,13 @@ class UpdateTask extends SourcePrioritizedRunnable {
156177

157178
@Override
158179
public void run() {
159-
runTask(source(), updateFunction, listener);
180+
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
181+
try {
182+
activityTracker.startActivity();
183+
runTask(source(), updateFunction, listener);
184+
} finally {
185+
activityTracker.stopActivity();
186+
}
160187
}
161188
}
162189

@@ -289,17 +316,23 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC
289316
threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
290317
@Override
291318
public void run() {
292-
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
293-
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
294-
assert previous == null : "Added same listener [" + listener + "]";
295-
if (lifecycle.stoppedOrClosed()) {
296-
listener.onClose();
297-
return;
298-
}
299-
if (timeout != null) {
300-
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
319+
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
320+
try {
321+
activityTracker.startActivity();
322+
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
323+
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
324+
assert previous == null : "Added same listener [" + listener + "]";
325+
if (lifecycle.stoppedOrClosed()) {
326+
listener.onClose();
327+
return;
328+
}
329+
if (timeout != null) {
330+
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
331+
}
332+
listener.postAdded();
333+
} finally {
334+
activityTracker.stopActivity();
301335
}
302-
listener.postAdded();
303336
}
304337
});
305338
} catch (EsRejectedExecutionException e) {

server/src/main/java/org/elasticsearch/common/network/ThreadWatchdog.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,6 @@ public class ThreadWatchdog {
5050
Setting.Property.NodeScope
5151
);
5252

53-
private static final Logger logger = LogManager.getLogger(ThreadWatchdog.class);
54-
5553
/**
5654
* Activity tracker for the current thread. Thread-locals are only retained by the owning thread so these will be GCd after thread exit.
5755
*/
@@ -169,8 +167,17 @@ private static boolean isIdle(long value) {
169167
}
170168

171169
public void run(Settings settings, ThreadPool threadPool, Lifecycle lifecycle) {
172-
new Checker(threadPool, NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), lifecycle)
173-
.run();
170+
run(
171+
NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings),
172+
NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings),
173+
threadPool,
174+
lifecycle,
175+
LogManager.getLogger(ThreadWatchdog.class)
176+
);
177+
}
178+
179+
public void run(TimeValue interval, TimeValue quietTime, ThreadPool threadPool, Lifecycle lifecycle, Logger logger) {
180+
new Checker(threadPool, interval, quietTime, lifecycle, logger).run();
174181
}
175182

176183
/**
@@ -182,12 +189,14 @@ private final class Checker extends AbstractRunnable {
182189
private final TimeValue interval;
183190
private final TimeValue quietTime;
184191
private final Lifecycle lifecycle;
192+
private final Logger logger;
185193

186-
Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle) {
194+
Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle, Logger logger) {
187195
this.threadPool = threadPool;
188196
this.interval = interval;
189197
this.quietTime = quietTime.compareTo(interval) <= 0 ? interval : quietTime;
190198
this.lifecycle = lifecycle;
199+
this.logger = logger;
191200
assert this.interval.millis() <= this.quietTime.millis();
192201
}
193202

server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,8 @@ public void apply(Settings value, Settings current, Settings previous) {
358358
IndexSettings.NODE_DEFAULT_REFRESH_INTERVAL_SETTING,
359359
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
360360
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING,
361+
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL,
362+
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME,
361363
ClusterService.USER_DEFINED_METADATA,
362364
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
363365
MasterService.MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING,
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.service;
11+
12+
import org.apache.logging.log4j.Level;
13+
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.NodeConnectionsService;
16+
import org.elasticsearch.common.Priority;
17+
import org.elasticsearch.common.settings.ClusterSettings;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
20+
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
21+
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.logging.LogManager;
23+
import org.elasticsearch.logging.Logger;
24+
import org.elasticsearch.test.ESTestCase;
25+
import org.elasticsearch.test.MockLog;
26+
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
29+
import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL;
30+
import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME;
31+
import static org.mockito.Mockito.mock;
32+
33+
public class ClusterApplierServiceWatchdogTests extends ESTestCase {
34+
35+
private static final Logger logger = LogManager.getLogger(ClusterApplierServiceWatchdogTests.class);
36+
37+
public void testThreadWatchdogLogging() {
38+
final var deterministicTaskQueue = new DeterministicTaskQueue();
39+
40+
final var settingsBuilder = Settings.builder();
41+
42+
final long intervalMillis;
43+
if (randomBoolean()) {
44+
intervalMillis = randomLongBetween(1, 1_000_000);
45+
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.timeValueMillis(intervalMillis));
46+
} else {
47+
intervalMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.get(Settings.EMPTY).millis();
48+
}
49+
50+
final long quietTimeMillis;
51+
if (randomBoolean()) {
52+
quietTimeMillis = randomLongBetween(intervalMillis, 3 * intervalMillis);
53+
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.getKey(), TimeValue.timeValueMillis(quietTimeMillis));
54+
} else {
55+
quietTimeMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.get(Settings.EMPTY).millis();
56+
}
57+
58+
final var settings = settingsBuilder.build();
59+
60+
try (
61+
var clusterApplierService = new ClusterApplierService(
62+
randomIdentifier(),
63+
settings,
64+
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
65+
deterministicTaskQueue.getThreadPool()
66+
) {
67+
@Override
68+
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
69+
return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
70+
}
71+
};
72+
var mockLog = MockLog.capture(ClusterApplierService.class)
73+
) {
74+
clusterApplierService.setNodeConnectionsService(mock(NodeConnectionsService.class));
75+
clusterApplierService.setInitialState(ClusterState.EMPTY_STATE);
76+
clusterApplierService.start();
77+
78+
final AtomicBoolean completedTask = new AtomicBoolean();
79+
80+
clusterApplierService.runOnApplierThread("blocking task", randomFrom(Priority.values()), ignored -> {
81+
82+
final var startMillis = deterministicTaskQueue.getCurrentTimeMillis();
83+
84+
for (int i = 0; i < 3; i++) {
85+
mockLog.addExpectation(
86+
new MockLog.SeenEventExpectation(
87+
"hot threads dump [" + i + "]",
88+
ClusterApplierService.class.getCanonicalName(),
89+
Level.WARN,
90+
"hot threads dump due to active threads not making progress"
91+
)
92+
);
93+
94+
while (deterministicTaskQueue.getCurrentTimeMillis() < startMillis + 2 * intervalMillis + i * quietTimeMillis) {
95+
deterministicTaskQueue.advanceTime();
96+
deterministicTaskQueue.runAllRunnableTasks();
97+
}
98+
99+
mockLog.assertAllExpectationsMatched();
100+
}
101+
}, ActionListener.running(() -> completedTask.set(true)));
102+
103+
deterministicTaskQueue.runAllRunnableTasks();
104+
105+
assertTrue(completedTask.get());
106+
}
107+
}
108+
109+
}

server/src/test/java/org/elasticsearch/common/network/ThreadWatchdogTests.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.common.network;
1111

1212
import org.apache.logging.log4j.Level;
13+
import org.apache.logging.log4j.LogManager;
1314
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.common.component.Lifecycle;
1516
import org.elasticsearch.common.settings.Settings;
@@ -222,7 +223,17 @@ public void testLoggingAndScheduling() {
222223
settings.put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME.getKey(), timeValueMillis(quietTimeMillis));
223224
}
224225

225-
watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle);
226+
if (randomBoolean()) {
227+
watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle);
228+
} else {
229+
watchdog.run(
230+
TimeValue.timeValueMillis(checkIntervalMillis),
231+
TimeValue.timeValueMillis(quietTimeMillis),
232+
deterministicTaskQueue.getThreadPool(),
233+
lifecycle,
234+
LogManager.getLogger(ThreadWatchdog.class)
235+
);
236+
}
226237

227238
for (int i = 0; i < 3; i++) {
228239
assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis);

0 commit comments

Comments
 (0)