Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.Objects;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -328,12 +330,22 @@ public void testUnassignRunningPersistentTask() throws Exception {
// Disallow re-assignment after it is unassigned to verify master and node state
TestPersistentTasksExecutor.setNonClusterStateCondition(false);

persistentTasksClusterService.unassignPersistentTask(taskId, task.getAllocationId() + 1, "unassignment test", unassignmentFuture);
logger.info("unassigning persistent task");
persistentTasksClusterService.unassignPersistentTask(
taskId,
task.getAllocationId() + 1,
PersistentTasksCustomMetadata.Explanation.GENERIC_REASON,
"unassignment test",
unassignmentFuture
);
PersistentTask<?> unassignedTask = unassignmentFuture.get();
assertThat(unassignedTask.getId(), equalTo(taskId));
assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test"));
assertThat(
unassignedTask.getAssignment().getExplanationCodes(),
contains(PersistentTasksCustomMetadata.Explanation.GENERIC_REASON)
);
assertThat(unassignedTask.getAssignment().getExplanation(), containsString("unassignment test"));
assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue()));

assertBusy(() -> {
// Verify that the task is NOT running on the node
List<TaskInfo> tasks = clusterAdmin().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks();
Expand All @@ -342,13 +354,14 @@ public void testUnassignRunningPersistentTask() throws Exception {
// Verify that the task is STILL in internal cluster state
assertClusterStateHasTask(taskId);
});
logger.info("persistent task unassigned");

// Allow it to be reassigned again to the same node
TestPersistentTasksExecutor.setNonClusterStateCondition(true);

logger.info("task is allowed to start again");
// Verify it starts again
waitForTaskToStart();

logger.info("task has started again");
assertClusterStateHasTask(taskId);

// Complete or cancel the running task
Expand Down Expand Up @@ -406,6 +419,12 @@ public void testAbortLocally() throws Exception {
task.getAssignment().getExplanation(),
either(equalTo("Simulating local abort")).or(equalTo("non cluster state condition prevents assignment"))
);
assertThat(
task.getAssignment().getExplanationCodes(),
either(contains(PersistentTasksCustomMetadata.Explanation.ABORTED_LOCALLY)).or(
contains(PersistentTasksCustomMetadata.Explanation.GENERIC_REASON)
)
);
});

// Allow it to be reassigned again
Expand All @@ -420,7 +439,7 @@ public void testAbortLocally() throws Exception {
// reason has not been published, hence the busy wait here.)
assertBusy(() -> {
PersistentTask<?> task = assertClusterStateHasTask(taskId);
assertThat(task.getAssignment().getExplanation(), not(equalTo("Simulating local abort")));
assertThat(task.getAssignment().getExplanation(), not(containsString("Simulating local abort")));
});

// Complete or cancel the running task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_DONT_DELETE_WHEN_SEMANTIC_TEXT_EXISTS = def(8_703_00_0);
public static final TransportVersion INFERENCE_ADAPTIVE_ALLOCATIONS = def(8_704_00_0);
public static final TransportVersion INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN = def(8_705_00_0);
public static final TransportVersion PERSISTENT_TASK_CUSTOM_METADATA_ASSIGNMENT_REASON_ENUM = def(8_706_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down Expand Up @@ -276,7 +277,7 @@ static TransportVersion def(int id) {
* Reference to the minimum transport version that can be used with CCS.
* This should be the transport version used by the previous minor release.
*/
public static final TransportVersion MINIMUM_CCS_VERSION = SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14;
public static final TransportVersion MINIMUM_CCS_VERSION = V_8_13_0;

static final NavigableMap<Integer, TransportVersion> VERSION_IDS = getAllVersionIds(TransportVersions.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
if (discoveryNode == null) {
return NO_NODE_FOUND;
} else {
return new PersistentTasksCustomMetadata.Assignment(discoveryNode.getId(), "");
return new PersistentTasksCustomMetadata.Assignment(
discoveryNode.getId(),
PersistentTasksCustomMetadata.Explanation.ASSIGNMENT_SUCCESSFUL
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/
package org.elasticsearch.persistent;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
Expand Down Expand Up @@ -36,6 +38,7 @@
*/
public class CompletionPersistentTaskAction extends ActionType<PersistentTaskResponse> {

private static final Logger logger = LogManager.getLogger(CompletionPersistentTaskAction.class);
public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
public static final String NAME = "cluster:admin/persistent/completion";

Expand Down Expand Up @@ -157,9 +160,11 @@ protected final void masterOperation(
if (request.localAbortReason != null) {
assert request.exception == null
: "request has both exception " + request.exception + " and local abort reason " + request.localAbortReason;
logger.info("Persistent task unassigned due to local abort reason: [{}]", request.localAbortReason);
persistentTasksClusterService.unassignPersistentTask(
request.taskId,
request.allocationId,
PersistentTasksCustomMetadata.Explanation.ABORTED_LOCALLY,
request.localAbortReason,
listener.map(PersistentTaskResponse::new)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,12 +286,14 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
* @param taskId the id of a persistent task
* @param taskAllocationId the expected allocation id of the persistent task
* @param reason the reason for unassigning the task from any node
* @param reasonDetails the detailed reason string for unassigning the task from any node
* @param listener the listener that will be called when task is unassigned
*/
public void unassignPersistentTask(
final String taskId,
final long taskAllocationId,
final String reason,
final PersistentTasksCustomMetadata.Explanation reason,
final String reasonDetails,
final ActionListener<PersistentTask<?>> listener
) {
submitUnbatchedTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
Expand All @@ -300,7 +302,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
PersistentTasksCustomMetadata.Builder tasksInProgress = builder(currentState);
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId);
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason)));
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason, reasonDetails)));
} else {
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
}
Expand Down Expand Up @@ -336,7 +338,10 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(

AssignmentDecision decision = enableDecider.canAssign();
if (decision.getType() == AssignmentDecision.Type.NO) {
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
return unassignedAssignment(
PersistentTasksCustomMetadata.Explanation.ASSIGNMENTS_NOT_ALLOWED,
"persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"
);
}

// Filter all nodes that are marked as shutting down, because we do not
Expand Down Expand Up @@ -515,8 +520,8 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus
}
}

private static Assignment unassignedAssignment(String reason) {
return new Assignment(null, reason);
private static Assignment unassignedAssignment(PersistentTasksCustomMetadata.Explanation reason, String details) {
return new Assignment(null, details, reason);
}

/**
Expand Down
Loading