Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,22 @@ left the cluster, for example), are impacted by this setting.
This setting controls how often assignment checks are performed to react to
these factors. The default is 30 seconds. The minimum permitted value is 10
seconds.


`cluster.persistent_tasks.allocation.allow_lightweight_assignments_to_nodes_shutting_down`::
+
--
(<<dynamic-cluster-setting,Dynamic>>)
Allow lightweight persistent Tasks to be assigned even if all nodes are shutting down.

* `false` - (default) Lightweight tasks will not be assigned if all nodes are shutting down
* `true` - Lightweight task will be assigned, even if all nodes are shutting down

By default, persistent tasks will not be assigned to nodes that are marked for shutdown.
For certain lightweight, persistent tasks (like the health node), it makes sense to not
have this limitation.

NOTE: For the health node for example, this provides the benefit of having accurate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this note is needed here; better add this information to the changelog.

data from the health API until cluster shutdown. Without a health node, the health API
will return `No disk usage data`.
--
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ public void apply(Settings value, Settings current, Settings previous) {
OperationRouting.USE_ADAPTIVE_REPLICA_SELECTION_SETTING,
IndexGraveyard.SETTING_MAX_TOMBSTONES,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING,
PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING,
EnableAssignmentDecider.CLUSTER_TASKS_ALLOCATION_ENABLE_SETTING,
PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING,
PeerFinder.DISCOVERY_REQUEST_PEERS_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.health.node.selection.HealthNode.TASK_NAME;
import static org.elasticsearch.persistent.PersistentTasksClusterService.CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING;

/**
* Persistent task executor that is managing the {@link HealthNode}.
Expand All @@ -64,6 +65,7 @@ public final class HealthNodeTaskExecutor extends PersistentTasksExecutor<Health
private final AtomicReference<HealthNode> currentTask = new AtomicReference<>();
private final ClusterStateListener taskStarter;
private final ClusterStateListener shutdownListener;
private Boolean allowLightweightAssignmentsToNodesShuttingDown;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why a Boolean and not a boolean?

private volatile boolean enabled;

private HealthNodeTaskExecutor(
Expand All @@ -79,6 +81,18 @@ private HealthNodeTaskExecutor(
this.taskStarter = this::startTask;
this.shutdownListener = this::shuttingDown;
this.enabled = ENABLED_SETTING.get(settings);
this.allowLightweightAssignmentsToNodesShuttingDown =
CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING,
this::setAllowLightweightAssignmentsToNodesShuttingDownFlag
);
}

// visible for testing only
public void setAllowLightweightAssignmentsToNodesShuttingDownFlag(Boolean allowLightweightAssignmentsToNodesShuttingDown) {
this.allowLightweightAssignmentsToNodesShuttingDown = allowLightweightAssignmentsToNodesShuttingDown;
}

public static HealthNodeTaskExecutor create(
Expand Down Expand Up @@ -186,9 +200,15 @@ void startTask(ClusterChangedEvent event) {

// visible for testing
void shuttingDown(ClusterChangedEvent event) {
DiscoveryNode node = clusterService.localNode();
if (isNodeShuttingDown(event, node.getId())) {
abortTaskIfApplicable("node [{" + node.getName() + "}{" + node.getId() + "}] shutting down");
DiscoveryNode localNode = clusterService.localNode();

if (isNodeShuttingDown(event, localNode.getId())) {
// only abort task if not every node is shutting down to avoid constant rescheduling on a cluster that is shutting down.
// only check this condition if lightweight tasks are allowed to be scheduled to nodes shutting down in the first place.
if (allowLightweightAssignmentsToNodesShuttingDown == false
|| isEveryNodeShuttingDown(clusterService.state(), event) == false) {
abortTaskIfApplicable("node [{" + localNode.getName() + "}{" + localNode.getId() + "}] shutting down");
}
}
}

Expand All @@ -207,6 +227,10 @@ private static boolean isNodeShuttingDown(ClusterChangedEvent event, String node
&& event.state().metadata().nodeShutdowns().contains(nodeId);
}

private static boolean isEveryNodeShuttingDown(ClusterState clusterState, ClusterChangedEvent event) {
return clusterState.nodes().getAllNodes().stream().allMatch(dn -> event.state().metadata().nodeShutdowns().contains(dn.getId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using clusterState form clusterService.state() when you already have event.state()? Am I missing something here?

}

public static List<NamedXContentRegistry.Entry> getNamedXContentParsers() {
return List.of(
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(TASK_NAME), HealthNodeTaskParams::fromXContent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public int hashCode() {
public boolean equals(Object obj) {
return obj instanceof HealthNodeTaskParams;
}

@Override
public boolean isLightweight() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,13 @@
* Parameters used to start persistent task
*/
public interface PersistentTaskParams extends VersionedNamedWriteable, ToXContentObject {

/**
* Lightweight tasks are allowed to be scheduled to nodes shutting down iff all nodes are shutting down.
* The setting
* {@link PersistentTasksClusterService#CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING}
* needs to be set for this behavior to apply.
*/
default boolean isLightweight() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing this term was inspired by the original issue. However, this term is not established and it was more used to describe the health node task. Since we do not have any other usage for this method, I suggest we give a more specific name, something along the lines of keepRunningDuringShutdown or canRunOnShuttingDownNodes. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ for canRunOnShuttingDownNodes

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public final class PersistentTasksClusterService implements ClusterStateListener
Setting.Property.NodeScope
);

public static final Setting<Boolean> CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING = Setting
.boolSetting(
"cluster.persistent_tasks.allocation.allow_lightweight_assignments_to_nodes_shutting_down",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

Comment on lines +60 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is safe enough to not add a switch. Each task that defines itself as lightweight should ensure that it is safe to run always like that or that it needs the flag. What do you think?

private static final Logger logger = LogManager.getLogger(PersistentTasksClusterService.class);

private final ClusterService clusterService;
Expand All @@ -65,6 +73,7 @@ public final class PersistentTasksClusterService implements ClusterStateListener
private final ThreadPool threadPool;
private final PeriodicRechecker periodicRechecker;
private final AtomicBoolean reassigningTasks = new AtomicBoolean(false);
private Boolean allowLightweightAssignmentsToNodesShuttingDown;

public PersistentTasksClusterService(
Settings settings,
Expand All @@ -77,18 +86,30 @@ public PersistentTasksClusterService(
this.enableDecider = new EnableAssignmentDecider(settings, clusterService.getClusterSettings());
this.threadPool = threadPool;
this.periodicRechecker = new PeriodicRechecker(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING.get(settings));
this.allowLightweightAssignmentsToNodesShuttingDown =
CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING.get(settings);
if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(this);
}
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING,
this::setAllowLightweightAssignmentsToNodesShuttingDownFlag
);
}

// visible for testing only
public void setRecheckInterval(TimeValue recheckInterval) {
periodicRechecker.setInterval(recheckInterval);
}

// visible for testing only
public void setAllowLightweightAssignmentsToNodesShuttingDownFlag(Boolean allowLightweightAssignmentsToNodesShuttingDown) {
this.allowLightweightAssignmentsToNodesShuttingDown = allowLightweightAssignmentsToNodesShuttingDown;
}

// visible for testing only
PeriodicRechecker getPeriodicRechecker() {
return periodicRechecker;
Expand Down Expand Up @@ -340,19 +361,30 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
}

// Filter all nodes that are marked as shutting down, because we do not
// Filter out all nodes that are marked as shutting down, because we do not
// want to assign a persistent task to a node that will shortly be
// leaving the cluster
final List<DiscoveryNode> candidateNodes = currentState.nodes()
.stream()
// leaving the cluster. The exception to this are lightweight tasks if no other
// nodes are available. An example of a lightweight task is the HealthNodeTask.
// This behavior needs to be enabled with the CLUSTER_TASKS_ALLOCATION_ALLOW_LIGHTWEIGHT_ASSIGNMENTS_TO_NODES_SHUTTING_DOWN_SETTING
// setting.
Comment on lines +367 to +369
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comment is clear enough without the example. Someone can always check which tasks implement the isLightweight.

final List<DiscoveryNode> allNodes = currentState.nodes().stream().toList();
final List<DiscoveryNode> candidateNodes = allNodes.stream()
.filter(dn -> currentState.metadata().nodeShutdowns().contains(dn.getId()) == false)
.collect(Collectors.toCollection(ArrayList::new));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use .toList() here too.


if (candidateNodes.isEmpty() && allNodes.isEmpty() == false) { // all nodes are shutting down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think creating a new collection allNodes is not necessary. I think currentState.nodes().getNodes().isEmpty() will not create new objects and therefore be more efficient.

if (this.allowLightweightAssignmentsToNodesShuttingDown && taskParams.isLightweight()) {
candidateNodes.addAll(allNodes);
}
}

// Task assignment should not rely on node order
Randomness.shuffle(candidateNodes);

final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState);
assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not";
assert (assignment.getExecutorNode() == null
assert ((this.allowLightweightAssignmentsToNodesShuttingDown && taskParams.isLightweight())
|| assignment.getExecutorNode() == null
|| currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false)
: "expected task ["
+ taskName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,26 @@ public void testAbortOnShutdown() {
}
}

public void testNoAbortOnShutdownIfAssignOnShutdownSettingEnabled() {
for (SingleNodeShutdownMetadata.Type type : REMOVE_SHUTDOWN_TYPES) {
HealthNodeTaskExecutor executor = HealthNodeTaskExecutor.create(
clusterService,
persistentTasksService,
featureService,
settings,
clusterSettings
);
executor.setAllowLightweightAssignmentsToNodesShuttingDownFlag(true);
HealthNode task = mock(HealthNode.class);
PersistentTaskState state = mock(PersistentTaskState.class);
executor.nodeOperation(task, new HealthNodeTaskParams(), state);
ClusterState initialState = initialState();
ClusterState withShutdown = stateWithNodeShuttingDown(initialState, type);
executor.shuttingDown(new ClusterChangedEvent("shutdown node", withShutdown, initialState));
verify(task, never()).markAsLocallyAborted(anyString());
}
}

public void testAbortOnDisable() {
HealthNodeTaskExecutor executor = HealthNodeTaskExecutor.create(
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,81 @@ public void testReassignOnlyOnce() throws Exception {
verifyNoMoreInteractions(recheckTestClusterService);
}

public void testLightWeightTaskAssignedIfAllNodesMarkedForShutdown() throws Exception {
for (SingleNodeShutdownMetadata.Type type : List.of(
SingleNodeShutdownMetadata.Type.RESTART,
SingleNodeShutdownMetadata.Type.REMOVE,
SingleNodeShutdownMetadata.Type.SIGTERM
)) {
ClusterState clusterState = initialState();
ClusterState.Builder builder = ClusterState.builder(clusterState);
PersistentTasksCustomMetadata.Builder tasks = PersistentTasksCustomMetadata.builder(
clusterState.metadata().custom(PersistentTasksCustomMetadata.TYPE)
);
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
addTestNodes(nodes, randomIntBetween(2, 10));
int numberOfTasks = randomIntBetween(20, 40);
for (int i = 0; i < numberOfTasks; i++) {
addLightweightTask(
tasks,
randomFrom("assign_me", "assign_one", "assign_based_on_non_cluster_state_condition"),
randomBoolean() ? null : "no_longer_exists"
);
}

Metadata.Builder metadata = Metadata.builder(clusterState.metadata())
.putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build());
clusterState = builder.metadata(metadata).nodes(nodes).build();

// Now that we have a bunch of tasks that need to be assigned, let's
// mark all the nodes as shut down and make sure the tasks are still assigned
var allNodes = clusterState.nodes();
var shutdownMetadataMap = allNodes.stream()
.collect(
toMap(
DiscoveryNode::getId,
node -> SingleNodeShutdownMetadata.builder()
.setNodeId(node.getId())
.setReason("shutdown for a unit test")
.setType(type)
.setStartedAtMillis(randomNonNegativeLong())
.setGracePeriod(type == SIGTERM ? randomTimeValue() : null)
.build()
)
);
logger.info("--> all nodes marked as shutting down: {}", shutdownMetadataMap.keySet());

ClusterState shutdownState = ClusterState.builder(clusterState)
.metadata(
Metadata.builder(clusterState.metadata())
.putCustom(NodesShutdownMetadata.TYPE, new NodesShutdownMetadata(shutdownMetadataMap))
.build()
)
.build();

logger.info("--> assigning after marking all nodes as shutting down");
nonClusterStateCondition = randomBoolean();
clusterState = reassign(shutdownState, true);
PersistentTasksCustomMetadata tasksInProgress = clusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
assertThat(tasksInProgress, notNullValue());
Set<String> nodesWithTasks = tasksInProgress.tasks()
.stream()
.map(PersistentTask::getAssignment)
.map(Assignment::getExecutorNode)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
Set<String> shutdownNodes = shutdownMetadataMap.keySet();

assertTrue(
"expected shut down nodes: "
+ shutdownNodes
+ " to have more than zero nodes in common with nodes assigned tasks: "
+ nodesWithTasks,
Sets.haveNonEmptyIntersection(shutdownNodes, nodesWithTasks)
);
}
}

private ClusterService createStateUpdateClusterState(ClusterState initialState, boolean shouldSimulateFailure) {
return createStateUpdateClusterState(initialState, shouldSimulateFailure, null);
}
Expand Down Expand Up @@ -750,6 +825,10 @@ private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount)
}

private ClusterState reassign(ClusterState clusterState) {
return reassign(clusterState, false);
}

private ClusterState reassign(ClusterState clusterState, boolean allowLightweightAssignmentsToNodesShuttingDownFlag) {
PersistentTasksClusterService service = createService((params, candidateNodes, currentState) -> {
TestParams testParams = (TestParams) params;
switch (testParams.getTestParam()) {
Expand Down Expand Up @@ -784,6 +863,7 @@ private ClusterState reassign(ClusterState clusterState) {
return NO_NODE_FOUND;
});

service.setAllowLightweightAssignmentsToNodesShuttingDownFlag(allowLightweightAssignmentsToNodesShuttingDownFlag);
return service.reassignTasks(clusterState);
}

Expand Down Expand Up @@ -1022,6 +1102,12 @@ private String addTask(PersistentTasksCustomMetadata.Builder tasks, String param
return id;
}

private String addLightweightTask(PersistentTasksCustomMetadata.Builder tasks, String param, String node) {
String id = UUIDs.base64UUID();
tasks.addTask(id, TestPersistentTasksExecutor.NAME, new TestParams(param, true), new Assignment(node, "explanation: " + param));
return id;
}

private DiscoveryNode newNode(String nodeId) {
return DiscoveryNodeUtils.builder(nodeId).roles(Set.of(DiscoveryNodeRole.MASTER_ROLE, DiscoveryNodeRole.DATA_ROLE)).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public static class TestParams implements PersistentTaskParams {

private String testParam = null;

private boolean isLightweight = false;

public TestParams() {
this((String) null);
}
Expand All @@ -147,6 +149,11 @@ public TestParams(String testParam) {
this(testParam, TransportVersion.current(), Optional.empty());
}

public TestParams(String testParam, boolean isLightweight) {
this(testParam);
this.isLightweight = isLightweight;
}

public TestParams(String testParam, TransportVersion minVersion, Optional<String> feature) {
this.testParam = testParam;
this.minVersion = minVersion;
Expand Down Expand Up @@ -223,6 +230,11 @@ public TransportVersion getMinimalSupportedVersion() {
return minVersion;
}

@Override
public boolean isLightweight() {
return isLightweight;
}

}

public static class State implements PersistentTaskState {
Expand Down