diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 75eaca88c66b1..37467d1cf01ad 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -221,3 +221,22 @@ left the cluster, for example), are impacted by this setting. This setting controls how often assignment checks are performed to react to these factors. The default is 30 seconds. The minimum permitted value is 10 seconds. + + +`cluster.persistent_tasks.allocation.allow_lightweight_assignments_to_nodes_shutting_down`:: ++ +-- +(<>) +Allow lightweight persistent Tasks to be assigned even if all nodes are shutting down. + +* `false` - (default) Lightweight tasks will not be assigned if all nodes are shutting down +* `true` - Lightweight task will be assigned, even if all nodes are shutting down + +By default, persistent tasks will not be assigned to nodes that are marked for shutdown. +For certain lightweight, persistent tasks (like the health node), it makes sense to not +have this limitation. + +NOTE: For the health node for example, this provides the benefit of having accurate +data from the health API until cluster shutdown. Without a health node, the health API +will return `No disk usage data`. +-- diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 18aaaf414101b..bc002e36809cd 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -529,6 +529,7 @@ public void apply(Settings value, Settings current, Settings previous) { OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING, IndexGraveyard.SETTING_MAX_TOMBSTONES, PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, + PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING, EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING, PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING, PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java index 3357936e5f10c..a84ee90a8b9de 100644 --- a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.health.node.selection.HealthNode.TASK_NAME; +import static org.elasticsearch.persistent.PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING; /** * Persistent task executor that is managing the {@link HealthNode}. @@ -64,6 +65,7 @@ public final class HealthNodeTaskExecutor extends PersistentTasksExecutor currentTask = new AtomicReference<>(); private final ClusterStateListener taskStarter; private final ClusterStateListener shutdownListener; + private Boolean allowLightweightAssignmentsToNodesShuttingDown; private volatile boolean enabled; private HealthNodeTaskExecutor( @@ -79,6 +81,18 @@ private HealthNodeTaskExecutor( this.taskStarter = this::startTask; this.shutdownListener = this::shuttingDown; this.enabled = ENABLED_SETTING.get(settings); + this.allowLightweightAssignmentsToNodesShuttingDown = + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING.get(settings); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING, + this::setAllowLightweightAssignmentsToNodesShuttingDownFlag + ); + } + + // visible for testing only + public void setAllowLightweightAssignmentsToNodesShuttingDownFlag(Boolean allowLightweightAssignmentsToNodesShuttingDown) { + this.allowLightweightAssignmentsToNodesShuttingDown = allowLightweightAssignmentsToNodesShuttingDown; } public static HealthNodeTaskExecutor create( @@ -186,9 +200,15 @@ void startTask(ClusterChangedEvent event) { // visible for testing void shuttingDown(ClusterChangedEvent event) { - DiscoveryNode node = clusterService.localNode(); - if (isNodeShuttingDown(event, node.getId())) { - abortTaskIfApplicable("node [{" + node.getName() + "}{" + node.getId() + "}] shutting down"); + DiscoveryNode localNode = clusterService.localNode(); + + if (isNodeShuttingDown(event, localNode.getId())) { + // only abort task if not every node is shutting down to avoid constant rescheduling on a cluster that is shutting down. + // only check this condition if lightweight tasks are allowed to be scheduled to nodes shutting down in the first place. + if (allowLightweightAssignmentsToNodesShuttingDown == false + || isEveryNodeShuttingDown(clusterService.state(), event) == false) { + abortTaskIfApplicable("node [{" + localNode.getName() + "}{" + localNode.getId() + "}] shutting down"); + } } } @@ -207,6 +227,10 @@ private static boolean isNodeShuttingDown(ClusterChangedEvent event, String node && event.state().metadata().nodeShutdowns().contains(nodeId); } + private static boolean isEveryNodeShuttingDown(ClusterState clusterState, ClusterChangedEvent event) { + return clusterState.nodes().getAllNodes().stream().allMatch(dn -> event.state().metadata().nodeShutdowns().contains(dn.getId())); + } + public static List getNamedXContentParsers() { return List.of( new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TASK_NAME), HealthNodeTaskParams::fromXContent) diff --git a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java index 2252bd502aa13..98fd194dec22b 100644 --- a/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java +++ b/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskParams.java @@ -68,4 +68,9 @@ public int hashCode() { public boolean equals(Object obj) { return obj instanceof HealthNodeTaskParams; } + + @Override + public boolean isLightweight() { + return true; + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java index a5f6ae9f3fb0c..9885bb84af12e 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTaskParams.java @@ -16,5 +16,13 @@ * Parameters used to start persistent task */ public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject { - + /** + * Lightweight tasks are allowed to be scheduled to nodes shutting down iff all nodes are shutting down. + * The setting + * {@link PersistentTasksClusterService#CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING} + * needs to be set for this behavior to apply. + */ + default boolean isLightweight() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java index 003aaa20d6ac5..0da7dbe2fb9db 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java @@ -57,6 +57,14 @@ public final class PersistentTasksClusterService implements ClusterStateListener Setting.Property.NodeScope ); + public static final Setting CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING = Setting + .boolSetting( + "cluster.persistent_tasks.allocation.allow_lightweight_assignments_to_nodes_shutting_down", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(PersistentTasksClusterService.class); private final ClusterService clusterService; @@ -65,6 +73,7 @@ public final class PersistentTasksClusterService implements ClusterStateListener private final ThreadPool threadPool; private final PeriodicRechecker periodicRechecker; private final AtomicBoolean reassigningTasks = new AtomicBoolean(false); + private Boolean allowLightweightAssignmentsToNodesShuttingDown; public PersistentTasksClusterService( Settings settings, @@ -77,11 +86,18 @@ public PersistentTasksClusterService( this.enableDecider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings()); this.threadPool = threadPool; this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings)); + this.allowLightweightAssignmentsToNodesShuttingDown = + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING.get(settings); if (DiscoveryNode.isMasterNode(settings)) { clusterService.addListener(this); } clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer( + CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING, + this::setAllowLightweightAssignmentsToNodesShuttingDownFlag + ); } // visible for testing only @@ -89,6 +105,11 @@ public void setRecheckInterval(TimeValue recheckInterval) { periodicRechecker.setInterval(recheckInterval); } + // visible for testing only + public void setAllowLightweightAssignmentsToNodesShuttingDownFlag(Boolean allowLightweightAssignmentsToNodesShuttingDown) { + this.allowLightweightAssignmentsToNodesShuttingDown = allowLightweightAssignmentsToNodesShuttingDown; + } + // visible for testing only PeriodicRechecker getPeriodicRechecker() { return periodicRechecker; @@ -340,19 +361,30 @@ private Assignment createAssignment( return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"); } - // Filter all nodes that are marked as shutting down, because we do not + // Filter out all nodes that are marked as shutting down, because we do not // want to assign a persistent task to a node that will shortly be - // leaving the cluster - final List candidateNodes = currentState.nodes() - .stream() + // leaving the cluster. The exception to this are lightweight tasks if no other + // nodes are available. An example of a lightweight task is the HealthNodeTask. + // This behavior needs to be enabled with the CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING + // setting. + final List allNodes = currentState.nodes().stream().toList(); + final List candidateNodes = allNodes.stream() .filter(dn -> currentState.metadata().nodeShutdowns().contains(dn.getId()) == false) .collect(Collectors.toCollection(ArrayList::new)); + + if (candidateNodes.isEmpty() && allNodes.isEmpty() == false) { // all nodes are shutting down + if (this.allowLightweightAssignmentsToNodesShuttingDown && taskParams.isLightweight()) { + candidateNodes.addAll(allNodes); + } + } + // Task assignment should not rely on node order Randomness.shuffle(candidateNodes); final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState); assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not"; - assert (assignment.getExecutorNode() == null + assert ((this.allowLightweightAssignmentsToNodesShuttingDown && taskParams.isLightweight()) + || assignment.getExecutorNode() == null || currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false) : "expected task [" + taskName diff --git a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java index 97f44f7480a72..3794fd973c4bd 100644 --- a/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutorTests.java @@ -164,6 +164,26 @@ public void testAbortOnShutdown() { } } + public void testNoAbortOnShutdownIfAssignOnShutdownSettingEnabled() { + for (SingleNodeShutdownMetadata.Type type : REMOVE_SHUTDOWN_TYPES) { + HealthNodeTaskExecutor executor = HealthNodeTaskExecutor.create( + clusterService, + persistentTasksService, + featureService, + settings, + clusterSettings + ); + executor.setAllowLightweightAssignmentsToNodesShuttingDownFlag(true); + HealthNode task = mock(HealthNode.class); + PersistentTaskState state = mock(PersistentTaskState.class); + executor.nodeOperation(task, new HealthNodeTaskParams(), state); + ClusterState initialState = initialState(); + ClusterState withShutdown = stateWithNodeShuttingDown(initialState, type); + executor.shuttingDown(new ClusterChangedEvent("shutdown node", withShutdown, initialState)); + verify(task, never()).markAsLocallyAborted(anyString()); + } + } + public void testAbortOnDisable() { HealthNodeTaskExecutor executor = HealthNodeTaskExecutor.create( clusterService, diff --git a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java index c568f6a38a5fb..d48a9b7e458ea 100644 --- a/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java @@ -712,6 +712,81 @@ public void testReassignOnlyOnce() throws Exception { verifyNoMoreInteractions(recheckTestClusterService); } + public void testLightWeightTaskAssignedIfAllNodesMarkedForShutdown() throws Exception { + for (SingleNodeShutdownMetadata.Type type : List.of( + SingleNodeShutdownMetadata.Type.RESTART, + SingleNodeShutdownMetadata.Type.REMOVE, + SingleNodeShutdownMetadata.Type.SIGTERM + )) { + ClusterState clusterState = initialState(); + ClusterState.Builder builder = ClusterState.builder(clusterState); + PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder( + clusterState.metadata().custom(PersistentTasksCustomMetadata.TYPE) + ); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + addTestNodes(nodes, randomIntBetween(2, 10)); + int numberOfTasks = randomIntBetween(20, 40); + for (int i = 0; i < numberOfTasks; i++) { + addLightweightTask( + tasks, + randomFrom("assign_me", "assign_one", "assign_based_on_non_cluster_state_condition"), + randomBoolean() ? null : "no_longer_exists" + ); + } + + Metadata.Builder metadata = Metadata.builder(clusterState.metadata()) + .putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()); + clusterState = builder.metadata(metadata).nodes(nodes).build(); + + // Now that we have a bunch of tasks that need to be assigned, let's + // mark all the nodes as shut down and make sure the tasks are still assigned + var allNodes = clusterState.nodes(); + var shutdownMetadataMap = allNodes.stream() + .collect( + toMap( + DiscoveryNode::getId, + node -> SingleNodeShutdownMetadata.builder() + .setNodeId(node.getId()) + .setReason("shutdown for a unit test") + .setType(type) + .setStartedAtMillis(randomNonNegativeLong()) + .setGracePeriod(type == SIGTERM ? randomTimeValue() : null) + .build() + ) + ); + logger.info("--> all nodes marked as shutting down: {}", shutdownMetadataMap.keySet()); + + ClusterState shutdownState = ClusterState.builder(clusterState) + .metadata( + Metadata.builder(clusterState.metadata()) + .putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(shutdownMetadataMap)) + .build() + ) + .build(); + + logger.info("--> assigning after marking all nodes as shutting down"); + nonClusterStateCondition = randomBoolean(); + clusterState = reassign(shutdownState, true); + PersistentTasksCustomMetadata tasksInProgress = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); + assertThat(tasksInProgress, notNullValue()); + Set nodesWithTasks = tasksInProgress.tasks() + .stream() + .map(PersistentTask::getAssignment) + .map(Assignment::getExecutorNode) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + Set shutdownNodes = shutdownMetadataMap.keySet(); + + assertTrue( + "expected shut down nodes: " + + shutdownNodes + + " to have more than zero nodes in common with nodes assigned tasks: " + + nodesWithTasks, + Sets.haveNonEmptyIntersection(shutdownNodes, nodesWithTasks) + ); + } + } + private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) { return createStateUpdateClusterState(initialState, shouldSimulateFailure, null); } @@ -750,6 +825,10 @@ private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) } private ClusterState reassign(ClusterState clusterState) { + return reassign(clusterState, false); + } + + private ClusterState reassign(ClusterState clusterState, boolean allowLightweightAssignmentsToNodesShuttingDownFlag) { PersistentTasksClusterService service = createService((params, candidateNodes, currentState) -> { TestParams testParams = (TestParams) params; switch (testParams.getTestParam()) { @@ -784,6 +863,7 @@ private ClusterState reassign(ClusterState clusterState) { return NO_NODE_FOUND; }); + service.setAllowLightweightAssignmentsToNodesShuttingDownFlag(allowLightweightAssignmentsToNodesShuttingDownFlag); return service.reassignTasks(clusterState); } @@ -1022,6 +1102,12 @@ private String addTask(PersistentTasksCustomMetadata.Builder tasks, String param return id; } + private String addLightweightTask(PersistentTasksCustomMetadata.Builder tasks, String param, String node) { + String id = UUIDs.base64UUID(); + tasks.addTask(id, TestPersistentTasksExecutor.NAME, new TestParams(param, true), new Assignment(node, "explanation: " + param)); + return id; + } + private DiscoveryNode newNode(String nodeId) { return DiscoveryNodeUtils.builder(nodeId).roles(Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE)).build(); } diff --git a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java index d6582c2cc7dcf..b451b640c68d9 100644 --- a/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java +++ b/server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java @@ -139,6 +139,8 @@ public static class TestParams implements PersistentTaskParams { private String testParam = null; + private boolean isLightweight = false; + public TestParams() { this((String) null); } @@ -147,6 +149,11 @@ public TestParams(String testParam) { this(testParam, TransportVersion.current(), Optional.empty()); } + public TestParams(String testParam, boolean isLightweight) { + this(testParam); + this.isLightweight = isLightweight; + } + public TestParams(String testParam, TransportVersion minVersion, Optional feature) { this.testParam = testParam; this.minVersion = minVersion; @@ -223,6 +230,11 @@ public TransportVersion getMinimalSupportedVersion() { return minVersion; } + @Override + public boolean isLightweight() { + return isLightweight; + } + } public static class State implements PersistentTaskState {