From dbf3e96c7720aa5ecca014c9f606fda5782c8c91 Mon Sep 17 00:00:00 2001 From: Nicolas Mesot Date: Thu, 12 Sep 2024 18:01:43 +0200 Subject: [PATCH] Make the health node available even if all nodes are marked for shutdown (#92193) This fixes the behavior of the health node not being available as soon as all nodes are marked for shutdown, but still running. A cluster without an operating health node will return "No disk usage data" through its health API. This introduces a cluster setting to enable this behavior, with the default being the previous behavior. I also had to modify the part of the code that immediately evicts any running tasks as soon as a node is marked for shutdown, leading to an endless loop of eviction -> reassignment -> eviction -> ... Fixes #92193 --- docs/reference/modules/cluster/misc.asciidoc | 19 ++++ .../common/settings/ClusterSettings.java | 1 + .../selection/HealthNodeTaskExecutor.java | 30 ++++++- .../node/selection/HealthNodeTaskParams.java | 5 ++ .../persistent/PersistentTaskParams.java | 10 ++- .../PersistentTasksClusterService.java | 42 +++++++-- .../HealthNodeTaskExecutorTests.java | 20 +++++ .../PersistentTasksClusterServiceTests.java | 86 +++++++++++++++++++ .../persistent/TestPersistentTasksPlugin.java | 12 +++ 9 files changed, 216 insertions(+), 9 deletions(-) diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 75eaca88c66b1..37467d1cf01ad 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -221,3 +221,22 @@ left the cluster, for example), are impacted by this setting. This setting controls how often assignment checks are performed to react to these factors. The default is 30 seconds. The minimum permitted value is 10 seconds. + + +`cluster.persistent_tasks.allocation.allow_lightweight_assignments_to_nodes_shutting_down`:: ++ +-- +(<>) +Allow lightweight persistent Tasks to be assigned even if all nodes are shutting down. + +* `false` - (default) Lightweight tasks will not be assigned if all nodes are shutting down +* `true` - Lightweight task will be assigned, even if all nodes are shutting down + +By default, persistent tasks will not be assigned to nodes that are marked for shutdown. +For certain lightweight, persistent tasks (like the health node), it makes sense to not +have this limitation. + +NOTE: For the health node for example, this provides the benefit of having accurate +data from the health API until cluster shutdown. Without a health node, the health API +will return `No disk usage data`. +-- diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index fb219f9093a96..a977b1deaec5b 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -528,6 +528,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, + PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java index 42a2854350fdb..8292d258f0e61 100644 --- a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.health.node.selection.HealthNode.TASK_NAME; +import static org.elasticsearch.persistent.PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING; /** * Persistent task executor that is managing the {@link HealthNode}. @@ -63,6 +64,7 @@ public final class HealthNodeTaskExecutor extends PersistentTasksExecutor currentTask = new AtomicReference<>(); private final ClusterStateListener taskStarter; private final ClusterStateListener shutdownListener; + private Boolean allowLightweightAssignmentsToNodesShuttingDown; private volatile boolean enabled; private HealthNodeTaskExecutor( @@ -78,6 +80,18 @@ private HealthNodeTaskExecutor( this.taskStarter = this::startTask; this.shutdownListener = this::shuttingDown; this.enabled = ENABLED_SETTING.get(settings); + this.allowLightweightAssignmentsToNodesShuttingDown = + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING, + this::setAllowLightweightAssignmentsToNodesShuttingDownFlag + ); + } + + // visible for testing only + public void setAllowLightweightAssignmentsToNodesShuttingDownFlag(Boolean allowLightweightAssignmentsToNodesShuttingDown) { + this.allowLightweightAssignmentsToNodesShuttingDown = allowLightweightAssignmentsToNodesShuttingDown; } public static HealthNodeTaskExecutor create( @@ -185,9 +199,15 @@ void startTask(ClusterChangedEvent event) { // visible for testing void shuttingDown(ClusterChangedEvent event) { - DiscoveryNode node = clusterService.localNode(); - if (isNodeShuttingDown(event, node.getId())) { - abortTaskIfApplicable("node [{" + node.getName() + "}{" + node.getId() + "}] shutting down"); + DiscoveryNode localNode = clusterService.localNode(); + + if (isNodeShuttingDown(event, localNode.getId())) { + // only abort task if not every node is shutting down to avoid constant rescheduling on a cluster that is shutting down. + // only check this condition if lightweight tasks are allowed to be scheduled to nodes shutting down in the first place. + if (allowLightweightAssignmentsToNodesShuttingDown == false + || isEveryNodeShuttingDown(clusterService.state(), event) == false) { + abortTaskIfApplicable("node [{" + localNode.getName() + "}{" + localNode.getId() + "}] shutting down"); + } } } @@ -206,6 +226,10 @@ private static boolean isNodeShuttingDown(ClusterChangedEvent event, String node && event.state().metadata().nodeShutdowns().contains(nodeId); } + private static boolean isEveryNodeShuttingDown(ClusterState clusterState, ClusterChangedEvent event) { + return clusterState.nodes().getAllNodes().stream().allMatch(dn -> event.state().metadata().nodeShutdowns().contains(dn.getId())); + } + public static List getNamedXContentParsers() { return List.of( new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TASK_NAME), HealthNodeTaskParams::fromXContent) diff --git a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java index a18f9c5c10b33..ca215eb0e3134 100644 --- a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java +++ b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java @@ -67,4 +67,9 @@ public int hashCode() { public boolean equals(Object obj) { return obj instanceof HealthNodeTaskParams; } + + @Override + public boolean isLightweight() { + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java index e525e3ae83e74..0c986bebf89e1 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java @@ -15,5 +15,13 @@ * Parameters used to start persistent task */ public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject { - + /** + * Lightweight tasks are allowed to be scheduled to nodes shutting down iff all nodes are shutting down. + * The setting + * {@link PersistentTasksClusterService#CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING} + * needs to be set for this behavior to apply. + */ + default boolean isLightweight() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index ba7b4bb51d9c7..6fc4a10ae0075 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -56,6 +56,14 @@ public final class PersistentTasksClusterService implements ClusterStateListener Setting.Property.NodeScope ); + public static final Setting CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING = Setting + .boolSetting( + "cluster.persistent_tasks.allocation.allow_lightweight_assignments_to_nodes_shutting_down", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(PersistentTasksClusterService.class); private final ClusterService clusterService; @@ -64,6 +72,7 @@ public final class PersistentTasksClusterService implements ClusterStateListener private final ThreadPool threadPool; private final PeriodicRechecker periodicRechecker; private final AtomicBoolean reassigningTasks = new AtomicBoolean(false); + private Boolean allowLightweightAssignmentsToNodesShuttingDown; public PersistentTasksClusterService( Settings settings, @@ -76,11 +85,18 @@ public PersistentTasksClusterService( this.enableDecider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); this.threadPool = threadPool; this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings)); + this.allowLightweightAssignmentsToNodesShuttingDown = + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING.get(settings); if (DiscoveryNode.isMasterNode(settings)) { clusterService.addListener(this); } clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING, + this::setAllowLightweightAssignmentsToNodesShuttingDownFlag + ); } // visible for testing only @@ -88,6 +104,11 @@ public void setRecheckInterval(TimeValue recheckInterval) { periodicRechecker.setInterval(recheckInterval); } + // visible for testing only + public void setAllowLightweightAssignmentsToNodesShuttingDownFlag(Boolean allowLightweightAssignmentsToNodesShuttingDown) { + this.allowLightweightAssignmentsToNodesShuttingDown = allowLightweightAssignmentsToNodesShuttingDown; + } + // visible for testing only PeriodicRechecker getPeriodicRechecker() { return periodicRechecker; @@ -339,19 +360,30 @@ private Assignment createAssignment( return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"); } - // Filter all nodes that are marked as shutting down, because we do not + // Filter out all nodes that are marked as shutting down, because we do not // want to assign a persistent task to a node that will shortly be - // leaving the cluster - final List candidateNodes = currentState.nodes() - .stream() + // leaving the cluster. The exception to this are lightweight tasks if no other + // nodes are available. An example of a lightweight task is the HealthNodeTask. + // This behavior needs to be enabled with the CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING + // setting. + final List allNodes = currentState.nodes().stream().toList(); + final List candidateNodes = allNodes.stream() .filter(dn -> currentState.metadata().nodeShutdowns().contains(dn.getId()) == false) .collect(Collectors.toCollection(ArrayList::new)); + + if (candidateNodes.isEmpty() && allNodes.isEmpty() == false) { // all nodes are shutting down + if (this.allowLightweightAssignmentsToNodesShuttingDown && taskParams.isLightweight()) { + candidateNodes.addAll(allNodes); + } + } + // Task assignment should not rely on node order Randomness.shuffle(candidateNodes); final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState); assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not"; - assert (assignment.getExecutorNode() == null + assert ((this.allowLightweightAssignmentsToNodesShuttingDown && taskParams.isLightweight()) + || assignment.getExecutorNode() == null || currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false) : "expected task [" + taskName diff --git a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java index 1227239277ef8..f0c70984b650a 100644 --- a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java @@ -163,6 +163,26 @@ public void testAbortOnShutdown() { } } + public void testNoAbortOnShutdownIfAssignOnShutdownSettingEnabled() { + for (SingleNodeShutdownMetadata.Type type : REMOVE_SHUTDOWN_TYPES) { + HealthNodeTaskExecutor executor = HealthNodeTaskExecutor.create( + clusterService, + persistentTasksService, + featureService, + settings, + clusterSettings + ); + executor.setAllowLightweightAssignmentsToNodesShuttingDownFlag(true); + HealthNode task = mock(HealthNode.class); + PersistentTaskState state = mock(PersistentTaskState.class); + executor.nodeOperation(task, new HealthNodeTaskParams(), state); + ClusterState initialState = initialState(); + ClusterState withShutdown = stateWithNodeShuttingDown(initialState, type); + executor.shuttingDown(new ClusterChangedEvent("shutdown node", withShutdown, initialState)); + verify(task, never()).markAsLocallyAborted(anyString()); + } + } + public void testAbortOnDisable() { HealthNodeTaskExecutor executor = HealthNodeTaskExecutor.create( clusterService, diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index 199e72c3c3311..ea51000b29d6a 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -711,6 +711,81 @@ public void testReassignOnlyOnce() throws Exception { verifyNoMoreInteractions(recheckTestClusterService); } + public void testLightWeightTaskAssignedIfAllNodesMarkedForShutdown() throws Exception { + for (SingleNodeShutdownMetadata.Type type : List.of( + SingleNodeShutdownMetadata.Type.RESTART, + SingleNodeShutdownMetadata.Type.REMOVE, + SingleNodeShutdownMetadata.Type.SIGTERM + )) { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder( + clusterState.metadata().custom(PersistentTasksCustomMetadata.TYPE) + ); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(2, 10)); + int numberOfTasks = randomIntBetween(20, 40); + for (int i = 0; i < numberOfTasks; i++) { + addLightweightTask( + tasks, + randomFrom("assign_me", "assign_one", "assign_based_on_non_cluster_state_condition"), + randomBoolean() ? null : "no_longer_exists" + ); + } + + Metadata.Builder metadata = Metadata.builder(clusterState.metadata()) + .putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()); + clusterState = builder.metadata(metadata).nodes(nodes).build(); + + // Now that we have a bunch of tasks that need to be assigned, let's + // mark all the nodes as shut down and make sure the tasks are still assigned + var allNodes = clusterState.nodes(); + var shutdownMetadataMap = allNodes.stream() + .collect( + toMap( + DiscoveryNode::getId, + node -> SingleNodeShutdownMetadata.builder() + .setNodeId(node.getId()) + .setReason("shutdown for a unit test") + .setType(type) + .setStartedAtMillis(randomNonNegativeLong()) + .setGracePeriod(type == SIGTERM ? randomTimeValue() : null) + .build() + ) + ); + logger.info("--> all nodes marked as shutting down: {}", shutdownMetadataMap.keySet()); + + ClusterState shutdownState = ClusterState.builder(clusterState) + .metadata( + Metadata.builder(clusterState.metadata()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(shutdownMetadataMap)) + .build() + ) + .build(); + + logger.info("--> assigning after marking all nodes as shutting down"); + nonClusterStateCondition = randomBoolean(); + clusterState = reassign(shutdownState, true); + PersistentTasksCustomMetadata tasksInProgress = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + assertThat(tasksInProgress, notNullValue()); + Set nodesWithTasks = tasksInProgress.tasks() + .stream() + .map(PersistentTask::getAssignment) + .map(Assignment::getExecutorNode) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + Set shutdownNodes = shutdownMetadataMap.keySet(); + + assertTrue( + "expected shut down nodes: " + + shutdownNodes + + " to have more than zero nodes in common with nodes assigned tasks: " + + nodesWithTasks, + Sets.haveNonEmptyIntersection(shutdownNodes, nodesWithTasks) + ); + } + } + private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) { return createStateUpdateClusterState(initialState, shouldSimulateFailure, null); } @@ -749,6 +824,10 @@ private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) } private ClusterState reassign(ClusterState clusterState) { + return reassign(clusterState, false); + } + + private ClusterState reassign(ClusterState clusterState, boolean allowLightweightAssignmentsToNodesShuttingDownFlag) { PersistentTasksClusterService service = createService((params, candidateNodes, currentState) -> { TestParams testParams = (TestParams) params; switch (testParams.getTestParam()) { @@ -783,6 +862,7 @@ private ClusterState reassign(ClusterState clusterState) { return NO_NODE_FOUND; }); + service.setAllowLightweightAssignmentsToNodesShuttingDownFlag(allowLightweightAssignmentsToNodesShuttingDownFlag); return service.reassignTasks(clusterState); } @@ -1021,6 +1101,12 @@ private String addTask(PersistentTasksCustomMetadata.Builder tasks, String param return id; } + private String addLightweightTask(PersistentTasksCustomMetadata.Builder tasks, String param, String node) { + String id = UUIDs.base64UUID(); + tasks.addTask(id, TestPersistentTasksExecutor.NAME, new TestParams(param, true), new Assignment(node, "explanation: " + param)); + return id; + } + private DiscoveryNode newNode(String nodeId) { return DiscoveryNodeUtils.builder(nodeId).roles(Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE)).build(); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index 6406ee640380b..7ffcfdfa10b5c 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -138,6 +138,8 @@ public static class TestParams implements PersistentTaskParams { private String testParam = null; + private boolean isLightweight = false; + public TestParams() { this((String) null); } @@ -146,6 +148,11 @@ public TestParams(String testParam) { this(testParam, TransportVersion.current(), Optional.empty()); } + public TestParams(String testParam, boolean isLightweight) { + this(testParam); + this.isLightweight = isLightweight; + } + public TestParams(String testParam, TransportVersion minVersion, Optional feature) { this.testParam = testParam; this.minVersion = minVersion; @@ -222,6 +229,11 @@ public TransportVersion getMinimalSupportedVersion() { return minVersion; } + @Override + public boolean isLightweight() { + return isLightweight; + } + } public static class State implements PersistentTaskState {