Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/134361.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 134361
summary: Add `ThreadWatchdog` to `ClusterApplierService`
area: Cluster Coordination
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.service.ClusterApplierRecordingService.Recorder;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.network.ThreadWatchdog;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -75,13 +76,27 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
Setting.Property.NodeScope
);

public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL = Setting.positiveTimeSetting(
"cluster.service.applier.thread.watchdog.interval",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope
);

public static final Setting<TimeValue> CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME = Setting.positiveTimeSetting(
"cluster.service.applier.thread.watchdog.quiet_time",
TimeValue.timeValueHours(1),
Setting.Property.NodeScope
);

public static final String CLUSTER_UPDATE_THREAD_NAME = "clusterApplierService#updateTask";

private final ClusterSettings clusterSettings;
private final ThreadPool threadPool;

private volatile TimeValue slowTaskLoggingThreshold;
private volatile TimeValue slowTaskThreadDumpTimeout;
private final TimeValue watchdogInterval;
private final TimeValue watchdogQuietTime;

private volatile PrioritizedEsThreadPoolExecutor threadPoolExecutor;

Expand All @@ -103,6 +118,8 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private NodeConnectionsService nodeConnectionsService;

private final ThreadWatchdog threadWatchdog = new ThreadWatchdog();

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
Expand All @@ -112,6 +129,9 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings

clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING, t -> slowTaskLoggingThreshold = t);
clusterSettings.initializeAndWatch(CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING, t -> slowTaskThreadDumpTimeout = t);

this.watchdogInterval = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL);
this.watchdogQuietTime = clusterSettings.get(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME);
}

public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
Expand All @@ -133,6 +153,7 @@ protected synchronized void doStart() {
Objects.requireNonNull(nodeConnectionsService, "please set the node connection service before starting");
Objects.requireNonNull(state.get(), "please set initial state before starting");
threadPoolExecutor = createThreadPoolExecutor();
threadWatchdog.run(watchdogInterval, watchdogQuietTime, threadPool, lifecycle, logger);
}

protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
Expand All @@ -156,7 +177,13 @@ class UpdateTask extends SourcePrioritizedRunnable {

@Override
public void run() {
runTask(source(), updateFunction, listener);
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
try {
activityTracker.startActivity();
runTask(source(), updateFunction, listener);
} finally {
activityTracker.stopActivity();
}
}
}

Expand Down Expand Up @@ -289,17 +316,23 @@ public void addTimeoutListener(@Nullable final TimeValue timeout, final TimeoutC
threadPoolExecutor.execute(new SourcePrioritizedRunnable(Priority.HIGH, "_add_listener_") {
@Override
public void run() {
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
assert previous == null : "Added same listener [" + listener + "]";
if (lifecycle.stoppedOrClosed()) {
listener.onClose();
return;
}
if (timeout != null) {
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
final var activityTracker = threadWatchdog.getActivityTrackerForCurrentThread();
try {
activityTracker.startActivity();
final NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
final NotifyTimeout previous = timeoutClusterStateListeners.put(listener, notifyTimeout);
assert previous == null : "Added same listener [" + listener + "]";
if (lifecycle.stoppedOrClosed()) {
listener.onClose();
return;
}
if (timeout != null) {
notifyTimeout.cancellable = threadPool.schedule(notifyTimeout, timeout, threadPool.generic());
}
listener.postAdded();
} finally {
activityTracker.stopActivity();
}
listener.postAdded();
}
});
} catch (EsRejectedExecutionException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class ThreadWatchdog {
Setting.Property.NodeScope
);

private static final Logger logger = LogManager.getLogger(ThreadWatchdog.class);

/**
* Activity tracker for the current thread. Thread-locals are only retained by the owning thread so these will be GCd after thread exit.
*/
Expand Down Expand Up @@ -169,8 +167,17 @@ private static boolean isIdle(long value) {
}

public void run(Settings settings, ThreadPool threadPool, Lifecycle lifecycle) {
new Checker(threadPool, NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings), NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings), lifecycle)
.run();
run(
NETWORK_THREAD_WATCHDOG_INTERVAL.get(settings),
NETWORK_THREAD_WATCHDOG_QUIET_TIME.get(settings),
threadPool,
lifecycle,
LogManager.getLogger(ThreadWatchdog.class)
);
}

public void run(TimeValue interval, TimeValue quietTime, ThreadPool threadPool, Lifecycle lifecycle, Logger logger) {
new Checker(threadPool, interval, quietTime, lifecycle, logger).run();
}

/**
Expand All @@ -182,12 +189,14 @@ private final class Checker extends AbstractRunnable {
private final TimeValue interval;
private final TimeValue quietTime;
private final Lifecycle lifecycle;
private final Logger logger;

Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle) {
Checker(ThreadPool threadPool, TimeValue interval, TimeValue quietTime, Lifecycle lifecycle, Logger logger) {
this.threadPool = threadPool;
this.interval = interval;
this.quietTime = quietTime.compareTo(interval) <= 0 ? interval : quietTime;
this.lifecycle = lifecycle;
this.logger = logger;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hot-threads dumper still links the doc for ReferenceDocs.NETWORK_THREADING_MODEL which is not applicable in the new case. Do we care?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes you're right. I can't think of a great place to document this. I mean it's kind of the same principle, this thread should be frequently idle just like the transport_worker threads. I think I'm going to say we don't care.

assert this.interval.millis() <= this.quietTime.millis();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndexSettings.NODE_DEFAULT_REFRESH_INTERVAL_SETTING,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
ClusterApplierService.CLUSTER_SERVICE_SLOW_TASK_THREAD_DUMP_TIMEOUT_SETTING,
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL,
ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME,
ClusterService.USER_DEFINED_METADATA,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
MasterService.MASTER_SERVICE_STARVATION_LOGGING_THRESHOLD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -192,7 +194,11 @@ private abstract static class TestHarness implements Closeable {

final var threadPool = deterministicTaskQueue.getThreadPool();

final var settings = Settings.EMPTY;
final var settings = Settings.builder()
// disable thread watchdog to avoid infinitely repeating task
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.build();

final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);

final var transportService = new TransportService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -63,6 +64,7 @@ public void testScheduling() {
final Settings.Builder settingsBuilder = Settings.builder()
.put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName())
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_ESTIMATED_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.put(
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
randomBoolean()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ public void testBalanceQuality() throws IOException {
final var deterministicTaskQueue = new DeterministicTaskQueue();
final var threadPool = deterministicTaskQueue.getThreadPool();

final var settings = Settings.EMPTY;
final var settings = Settings.builder()
// disable thread watchdog to avoid infinitely repeating task
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.build();
final var clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);

final var masterService = new MasterService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ public void testAllocate(AllocateUnassignedHandler allocateUnassigned, Consumer<
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();

var settings = Settings.EMPTY;
final var settings = Settings.builder()
// disable thread watchdog to avoid infinitely repeating task
.put(ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.ZERO)
.build();

var clusterSettings = createBuiltInClusterSettings(settings);
var clusterService = new ClusterService(
settings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.cluster.service;

import org.apache.logging.log4j.Level;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;

import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL;
import static org.elasticsearch.cluster.service.ClusterApplierService.CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME;
import static org.mockito.Mockito.mock;

public class ClusterApplierServiceWatchdogTests extends ESTestCase {

private static final Logger logger = LogManager.getLogger(ClusterApplierServiceWatchdogTests.class);

public void testThreadWatchdogLogging() {
final var deterministicTaskQueue = new DeterministicTaskQueue();

final var settingsBuilder = Settings.builder();

final long intervalMillis;
if (randomBoolean()) {
intervalMillis = randomLongBetween(1, 1_000_000);
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.getKey(), TimeValue.timeValueMillis(intervalMillis));
} else {
intervalMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_INTERVAL.get(Settings.EMPTY).millis();
}

final long quietTimeMillis;
if (randomBoolean()) {
quietTimeMillis = randomLongBetween(intervalMillis, 3 * intervalMillis);
settingsBuilder.put(CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.getKey(), TimeValue.timeValueMillis(quietTimeMillis));
} else {
quietTimeMillis = CLUSTER_APPLIER_THREAD_WATCHDOG_QUIET_TIME.get(Settings.EMPTY).millis();
}

final var settings = settingsBuilder.build();

try (
var clusterApplierService = new ClusterApplierService(
randomIdentifier(),
settings,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
deterministicTaskQueue.getThreadPool()
) {
@Override
protected PrioritizedEsThreadPoolExecutor createThreadPoolExecutor() {
return deterministicTaskQueue.getPrioritizedEsThreadPoolExecutor();
}
};
var mockLog = MockLog.capture(ClusterApplierService.class)
) {
clusterApplierService.setNodeConnectionsService(mock(NodeConnectionsService.class));
clusterApplierService.setInitialState(ClusterState.EMPTY_STATE);
clusterApplierService.start();

final AtomicBoolean completedTask = new AtomicBoolean();

clusterApplierService.runOnApplierThread("blocking task", randomFrom(Priority.values()), ignored -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a test for immediate listener firing in ClusterApplierService#addTimeoutListener as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good point, see ea0fca3.


final var startMillis = deterministicTaskQueue.getCurrentTimeMillis();

for (int i = 0; i < 3; i++) {
mockLog.addExpectation(
new MockLog.SeenEventExpectation(
"hot threads dump [" + i + "]",
ClusterApplierService.class.getCanonicalName(),
Level.WARN,
"hot threads dump due to active threads not making progress"
)
);

while (deterministicTaskQueue.getCurrentTimeMillis() < startMillis + 2 * intervalMillis + i * quietTimeMillis) {
deterministicTaskQueue.advanceTime();
deterministicTaskQueue.runAllRunnableTasks();
}

mockLog.assertAllExpectationsMatched();
}
}, ActionListener.running(() -> completedTask.set(true)));

deterministicTaskQueue.runAllRunnableTasks();

assertTrue(completedTask.get());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.common.network;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -222,7 +223,17 @@ public void testLoggingAndScheduling() {
settings.put(ThreadWatchdog.NETWORK_THREAD_WATCHDOG_QUIET_TIME.getKey(), timeValueMillis(quietTimeMillis));
}

watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle);
if (randomBoolean()) {
watchdog.run(settings.build(), deterministicTaskQueue.getThreadPool(), lifecycle);
} else {
watchdog.run(
TimeValue.timeValueMillis(checkIntervalMillis),
TimeValue.timeValueMillis(quietTimeMillis),
deterministicTaskQueue.getThreadPool(),
lifecycle,
LogManager.getLogger(ThreadWatchdog.class)
);
}

for (int i = 0; i < 3; i++) {
assertAdvanceTime(deterministicTaskQueue, checkIntervalMillis);
Expand Down
Loading