Skip to content

Commit 6cd602c

Browse files
committed
Add a programmer friendly code to Persistent Task Assignments (#53711)
When a task can not be assigned, the ptask framework is able to message back a reason in form of a string. This feature also started to be used for successful assignments, for example in the task that ads a health node. Callers might need to interpret an assignment failure to e.g. return a HTTP status code. A string is hard to interpret and therefore not suitable. Therefore, the Assignment can now take a programmer friendly code. It is still possible to provide a freetext String, as some context dependent information (e.g. nodeId) can not be stored in an Enum. Because some tasks provided multiple reasons for being unable to assign (all concatenated in a string), it is possible to provide multiple codes. This commit also adapted the serialization and deserialization methods for backwards compatibility. Closes #53711
1 parent cfd873c commit 6cd602c

File tree

60 files changed

+1231
-303
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+1231
-303
lines changed

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.Objects;
3535

3636
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
37+
import static org.hamcrest.Matchers.contains;
38+
import static org.hamcrest.Matchers.containsString;
3739
import static org.hamcrest.Matchers.either;
3840
import static org.hamcrest.Matchers.empty;
3941
import static org.hamcrest.Matchers.equalTo;
@@ -328,10 +330,20 @@ public void testUnassignRunningPersistentTask() throws Exception {
328330
// Disallow re-assignment after it is unassigned to verify master and node state
329331
TestPersistentTasksExecutor.setNonClusterStateCondition(false);
330332

331-
persistentTasksClusterService.unassignPersistentTask(taskId, task.getAllocationId() + 1, "unassignment test", unassignmentFuture);
333+
persistentTasksClusterService.unassignPersistentTask(
334+
taskId,
335+
task.getAllocationId() + 1,
336+
PersistentTasksCustomMetadata.Explanation.GENERIC_REASON,
337+
"unassignment test",
338+
unassignmentFuture
339+
);
332340
PersistentTask<?> unassignedTask = unassignmentFuture.get();
333341
assertThat(unassignedTask.getId(), equalTo(taskId));
334-
assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test"));
342+
assertThat(
343+
unassignedTask.getAssignment().getExplanationCodes(),
344+
contains(PersistentTasksCustomMetadata.Explanation.GENERIC_REASON)
345+
);
346+
assertThat(unassignedTask.getAssignment().getExplanation(), containsString("unassignment test"));
335347
assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue()));
336348

337349
assertBusy(() -> {
@@ -406,6 +418,12 @@ public void testAbortLocally() throws Exception {
406418
task.getAssignment().getExplanation(),
407419
either(equalTo("Simulating local abort")).or(equalTo("non cluster state condition prevents assignment"))
408420
);
421+
assertThat(
422+
task.getAssignment().getExplanationCodes(),
423+
either(contains(PersistentTasksCustomMetadata.Explanation.ABORTED_LOCALLY)).or(
424+
contains(PersistentTasksCustomMetadata.Explanation.GENERIC_REASON)
425+
)
426+
);
409427
});
410428

411429
// Allow it to be reassigned again
@@ -420,7 +438,7 @@ public void testAbortLocally() throws Exception {
420438
// reason has not been published, hence the busy wait here.)
421439
assertBusy(() -> {
422440
PersistentTask<?> task = assertClusterStateHasTask(taskId);
423-
assertThat(task.getAssignment().getExplanation(), not(equalTo("Simulating local abort")));
441+
assertThat(task.getAssignment().getExplanation(), not(containsString("Simulating local abort")));
424442
});
425443

426444
// Complete or cancel the running task

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ static TransportVersion def(int id) {
196196
public static final TransportVersion VERSION_SUPPORTING_SPARSE_VECTOR_STATS = def(8_687_00_0);
197197
public static final TransportVersion ML_AD_OUTPUT_MEMORY_ALLOCATOR_FIELD = def(8_688_00_0);
198198
public static final TransportVersion FAILURE_STORE_LAZY_CREATION = def(8_689_00_0);
199+
public static final TransportVersion PERSISTENT_TASK_CUSTOM_METADATA_ASSIGNMENT_REASON_ENUM = def(8_690_00_0);
199200

200201
/*
201202
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,10 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
150150
if (discoveryNode == null) {
151151
return NO_NODE_FOUND;
152152
} else {
153-
return new PersistentTasksCustomMetadata.Assignment(discoveryNode.getId(), "");
153+
return new PersistentTasksCustomMetadata.Assignment(
154+
discoveryNode.getId(),
155+
PersistentTasksCustomMetadata.Explanation.ASSIGNMENT_SUCCESSFUL
156+
);
154157
}
155158
}
156159

server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*/
88
package org.elasticsearch.persistent;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.action.ActionRequestValidationException;
1214
import org.elasticsearch.action.ActionType;
@@ -36,6 +38,7 @@
3638
*/
3739
public class CompletionPersistentTaskAction extends ActionType<PersistentTaskResponse> {
3840

41+
private static final Logger logger = LogManager.getLogger(CompletionPersistentTaskAction.class);
3942
public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
4043
public static final String NAME = "cluster:admin/persistent/completion";
4144

@@ -157,9 +160,11 @@ protected final void masterOperation(
157160
if (request.localAbortReason != null) {
158161
assert request.exception == null
159162
: "request has both exception " + request.exception + " and local abort reason " + request.localAbortReason;
163+
logger.info("Persistent task unassigned due to local abort reason: [{}]", request.localAbortReason);
160164
persistentTasksClusterService.unassignPersistentTask(
161165
request.taskId,
162166
request.allocationId,
167+
PersistentTasksCustomMetadata.Explanation.ABORTED_LOCALLY,
163168
request.localAbortReason,
164169
listener.map(PersistentTaskResponse::new)
165170
);

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,14 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
286286
* @param taskId the id of a persistent task
287287
* @param taskAllocationId the expected allocation id of the persistent task
288288
* @param reason the reason for unassigning the task from any node
289+
* @param reasonDetails the detailed reason string for unassigning the task from any node
289290
* @param listener the listener that will be called when task is unassigned
290291
*/
291292
public void unassignPersistentTask(
292293
final String taskId,
293294
final long taskAllocationId,
294-
final String reason,
295+
final PersistentTasksCustomMetadata.Explanation reason,
296+
final String reasonDetails,
295297
final ActionListener<PersistentTask<?>> listener
296298
) {
297299
submitUnbatchedTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
@@ -300,7 +302,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
300302
PersistentTasksCustomMetadata.Builder tasksInProgress = builder(currentState);
301303
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
302304
logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId);
303-
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason)));
305+
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason, reasonDetails)));
304306
} else {
305307
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
306308
}
@@ -336,7 +338,10 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
336338

337339
AssignmentDecision decision = enableDecider.canAssign();
338340
if (decision.getType() == AssignmentDecision.Type.NO) {
339-
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
341+
return unassignedAssignment(
342+
PersistentTasksCustomMetadata.Explanation.ASSIGNMENTS_NOT_ALLOWED,
343+
"persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"
344+
);
340345
}
341346

342347
// Filter all nodes that are marked as shutting down, because we do not
@@ -515,8 +520,8 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus
515520
}
516521
}
517522

518-
private static Assignment unassignedAssignment(String reason) {
519-
return new Assignment(null, reason);
523+
private static Assignment unassignedAssignment(PersistentTasksCustomMetadata.Explanation reason, String details) {
524+
return new Assignment(null, reason, details);
520525
}
521526

522527
/**

0 commit comments

Comments
 (0)