Skip to content

Commit 1925187

Browse files
committed
Add a programmer friendly code to Persistent Task Assignments (#53711)
When a task can not be assigned, the ptask framework is able to message back a reason in form of a string. This feature is also used for successful assignments, for example in the task that ads a health node. Callers might need to interpret an assignment failure to e.g. return a HTTP status code. A string is hard to interpret and therefore not suitable. Therefore, the Assignment can now take a programmer friendly code. It is still possible to provide a freetext String, as some context dependent information (e.g. nodeId) can not be stored in an Enum. Because some tasks provided multiple reasons for being unable to assign (all concatenated in a string), it is possible to provide multiple codes. This commit also adapted the serialization and deserialization methods for backwards compatibility. Closes #53711
1 parent 3f9fea2 commit 1925187

File tree

60 files changed

+1239
-315
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+1239
-315
lines changed

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import java.util.Objects;
3535

3636
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
37+
import static org.hamcrest.Matchers.contains;
38+
import static org.hamcrest.Matchers.containsString;
3739
import static org.hamcrest.Matchers.either;
3840
import static org.hamcrest.Matchers.empty;
3941
import static org.hamcrest.Matchers.equalTo;
@@ -328,12 +330,22 @@ public void testUnassignRunningPersistentTask() throws Exception {
328330
// Disallow re-assignment after it is unassigned to verify master and node state
329331
TestPersistentTasksExecutor.setNonClusterStateCondition(false);
330332

331-
persistentTasksClusterService.unassignPersistentTask(taskId, task.getAllocationId() + 1, "unassignment test", unassignmentFuture);
333+
logger.info("unassigning persistent task");
334+
persistentTasksClusterService.unassignPersistentTask(
335+
taskId,
336+
task.getAllocationId() + 1,
337+
PersistentTasksCustomMetadata.Explanation.GENERIC_REASON,
338+
"unassignment test",
339+
unassignmentFuture
340+
);
332341
PersistentTask<?> unassignedTask = unassignmentFuture.get();
333342
assertThat(unassignedTask.getId(), equalTo(taskId));
334-
assertThat(unassignedTask.getAssignment().getExplanation(), equalTo("unassignment test"));
343+
assertThat(
344+
unassignedTask.getAssignment().getExplanationCodes(),
345+
contains(PersistentTasksCustomMetadata.Explanation.GENERIC_REASON)
346+
);
347+
assertThat(unassignedTask.getAssignment().getExplanation(), containsString("unassignment test"));
335348
assertThat(unassignedTask.getAssignment().getExecutorNode(), is(nullValue()));
336-
337349
assertBusy(() -> {
338350
// Verify that the task is NOT running on the node
339351
List<TaskInfo> tasks = clusterAdmin().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]").get().getTasks();
@@ -342,13 +354,14 @@ public void testUnassignRunningPersistentTask() throws Exception {
342354
// Verify that the task is STILL in internal cluster state
343355
assertClusterStateHasTask(taskId);
344356
});
357+
logger.info("persistent task unassigned");
345358

346359
// Allow it to be reassigned again to the same node
347360
TestPersistentTasksExecutor.setNonClusterStateCondition(true);
348-
361+
logger.info("task is allowed to start again");
349362
// Verify it starts again
350363
waitForTaskToStart();
351-
364+
logger.info("task has started again");
352365
assertClusterStateHasTask(taskId);
353366

354367
// Complete or cancel the running task
@@ -406,6 +419,12 @@ public void testAbortLocally() throws Exception {
406419
task.getAssignment().getExplanation(),
407420
either(equalTo("Simulating local abort")).or(equalTo("non cluster state condition prevents assignment"))
408421
);
422+
assertThat(
423+
task.getAssignment().getExplanationCodes(),
424+
either(contains(PersistentTasksCustomMetadata.Explanation.ABORTED_LOCALLY)).or(
425+
contains(PersistentTasksCustomMetadata.Explanation.GENERIC_REASON)
426+
)
427+
);
409428
});
410429

411430
// Allow it to be reassigned again
@@ -420,7 +439,7 @@ public void testAbortLocally() throws Exception {
420439
// reason has not been published, hence the busy wait here.)
421440
assertBusy(() -> {
422441
PersistentTask<?> task = assertClusterStateHasTask(taskId);
423-
assertThat(task.getAssignment().getExplanation(), not(equalTo("Simulating local abort")));
442+
assertThat(task.getAssignment().getExplanation(), not(containsString("Simulating local abort")));
424443
});
425444

426445
// Complete or cancel the running task

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ static TransportVersion def(int id) {
212212
public static final TransportVersion ML_INFERENCE_DONT_DELETE_WHEN_SEMANTIC_TEXT_EXISTS = def(8_703_00_0);
213213
public static final TransportVersion INFERENCE_ADAPTIVE_ALLOCATIONS = def(8_704_00_0);
214214
public static final TransportVersion INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN = def(8_705_00_0);
215+
public static final TransportVersion PERSISTENT_TASK_CUSTOM_METADATA_ASSIGNMENT_REASON_ENUM = def(8_706_00_0);
215216

216217
/*
217218
* STOP! READ THIS FIRST! No, really,
@@ -276,7 +277,7 @@ static TransportVersion def(int id) {
276277
* Reference to the minimum transport version that can be used with CCS.
277278
* This should be the transport version used by the previous minor release.
278279
*/
279-
public static final TransportVersion MINIMUM_CCS_VERSION = SHUTDOWN_REQUEST_TIMEOUTS_FIX_8_14;
280+
public static final TransportVersion MINIMUM_CCS_VERSION = V_8_13_0;
280281

281282
static final NavigableMap<Integer, TransportVersion> VERSION_IDS = getAllVersionIds(TransportVersions.class);
282283

server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,10 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
150150
if (discoveryNode == null) {
151151
return NO_NODE_FOUND;
152152
} else {
153-
return new PersistentTasksCustomMetadata.Assignment(discoveryNode.getId(), "");
153+
return new PersistentTasksCustomMetadata.Assignment(
154+
discoveryNode.getId(),
155+
PersistentTasksCustomMetadata.Explanation.ASSIGNMENT_SUCCESSFUL
156+
);
154157
}
155158
}
156159

server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
*/
88
package org.elasticsearch.persistent;
99

10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.action.ActionRequestValidationException;
1214
import org.elasticsearch.action.ActionType;
@@ -36,6 +38,7 @@
3638
*/
3739
public class CompletionPersistentTaskAction extends ActionType<PersistentTaskResponse> {
3840

41+
private static final Logger logger = LogManager.getLogger(CompletionPersistentTaskAction.class);
3942
public static final CompletionPersistentTaskAction INSTANCE = new CompletionPersistentTaskAction();
4043
public static final String NAME = "cluster:admin/persistent/completion";
4144

@@ -157,9 +160,11 @@ protected final void masterOperation(
157160
if (request.localAbortReason != null) {
158161
assert request.exception == null
159162
: "request has both exception " + request.exception + " and local abort reason " + request.localAbortReason;
163+
logger.info("Persistent task unassigned due to local abort reason: [{}]", request.localAbortReason);
160164
persistentTasksClusterService.unassignPersistentTask(
161165
request.taskId,
162166
request.allocationId,
167+
PersistentTasksCustomMetadata.Explanation.ABORTED_LOCALLY,
163168
request.localAbortReason,
164169
listener.map(PersistentTaskResponse::new)
165170
);

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -286,12 +286,14 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
286286
* @param taskId the id of a persistent task
287287
* @param taskAllocationId the expected allocation id of the persistent task
288288
* @param reason the reason for unassigning the task from any node
289+
* @param reasonDetails the detailed reason string for unassigning the task from any node
289290
* @param listener the listener that will be called when task is unassigned
290291
*/
291292
public void unassignPersistentTask(
292293
final String taskId,
293294
final long taskAllocationId,
294-
final String reason,
295+
final PersistentTasksCustomMetadata.Explanation reason,
296+
final String reasonDetails,
295297
final ActionListener<PersistentTask<?>> listener
296298
) {
297299
submitUnbatchedTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
@@ -300,7 +302,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
300302
PersistentTasksCustomMetadata.Builder tasksInProgress = builder(currentState);
301303
if (tasksInProgress.hasTask(taskId, taskAllocationId)) {
302304
logger.trace("Unassigning task {} with allocation id {}", taskId, taskAllocationId);
303-
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason)));
305+
return update(currentState, tasksInProgress.reassignTask(taskId, unassignedAssignment(reason, reasonDetails)));
304306
} else {
305307
throw new ResourceNotFoundException("the task with id {} and allocation id {} doesn't exist", taskId, taskAllocationId);
306308
}
@@ -336,7 +338,10 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
336338

337339
AssignmentDecision decision = enableDecider.canAssign();
338340
if (decision.getType() == AssignmentDecision.Type.NO) {
339-
return unassignedAssignment("persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]");
341+
return unassignedAssignment(
342+
PersistentTasksCustomMetadata.Explanation.ASSIGNMENTS_NOT_ALLOWED,
343+
"persistent task [" + taskName + "] cannot be assigned [" + decision.getReason() + "]"
344+
);
340345
}
341346

342347
// Filter all nodes that are marked as shutting down, because we do not
@@ -515,8 +520,8 @@ private static ClusterState update(ClusterState currentState, PersistentTasksCus
515520
}
516521
}
517522

518-
private static Assignment unassignedAssignment(String reason) {
519-
return new Assignment(null, reason);
523+
private static Assignment unassignedAssignment(PersistentTasksCustomMetadata.Explanation reason, String details) {
524+
return new Assignment(null, details, reason);
520525
}
521526

522527
/**

0 commit comments

Comments
 (0)