Skip to content

Commit 71e45ae

Browse files
authored
Make persistent task name and ID available in update source (#122334)
Add the task name (when possible) and ID as part of the source string for updating cluster state. This helps better identifying the source of a task. The updatePersistentTaskState method already does it. This PR ensures it is the case in other places.
1 parent f789277 commit 71e45ae

File tree

3 files changed

+10
-7
lines changed

3 files changed

+10
-7
lines changed

server/src/internalClusterTest/java/org/elasticsearch/persistent/PersistentTaskCreationFailureIT.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ public ClusterState execute(ClusterState currentState) {
7777
.pendingTasks()
7878
.stream()
7979
.filter(
80-
pendingClusterTask -> pendingClusterTask.getSource().string().equals("finish persistent task (failed)")
80+
pendingClusterTask -> pendingClusterTask.getSource()
81+
.string()
82+
.matches("finish persistent task \\[.*] \\(failed\\)")
8183
)
8284
.count();
8385
assertThat(completePersistentTaskPendingTasksCount, lessThanOrEqualTo(1L));

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public <Params extends PersistentTaskParams> void createPersistentTask(
113113
Params taskParams,
114114
ActionListener<PersistentTask<?>> listener
115115
) {
116-
submitUnbatchedTask("create persistent task", new ClusterStateUpdateTask() {
116+
submitUnbatchedTask("create persistent task " + taskName + " [" + taskId + "]", new ClusterStateUpdateTask() {
117117
@Override
118118
public ClusterState execute(ClusterState currentState) {
119119
PersistentTasksCustomMetadata.Builder builder = builder(currentState);
@@ -166,9 +166,9 @@ public void completePersistentTask(String id, long allocationId, Exception failu
166166
final String source;
167167
if (failure != null) {
168168
logger.warn("persistent task " + id + " failed", failure);
169-
source = "finish persistent task (failed)";
169+
source = "finish persistent task [" + id + "] (failed)";
170170
} else {
171-
source = "finish persistent task (success)";
171+
source = "finish persistent task [" + id + "] (success)";
172172
}
173173
submitUnbatchedTask(source, new ClusterStateUpdateTask() {
174174
@Override
@@ -212,7 +212,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
212212
* @param listener the listener that will be called when task is removed
213213
*/
214214
public void removePersistentTask(String id, ActionListener<PersistentTask<?>> listener) {
215-
submitUnbatchedTask("remove persistent task", new ClusterStateUpdateTask() {
215+
submitUnbatchedTask("remove persistent task [" + id + "]", new ClusterStateUpdateTask() {
216216
@Override
217217
public ClusterState execute(ClusterState currentState) {
218218
PersistentTasksCustomMetadata.Builder tasksInProgress = builder(currentState);
@@ -295,7 +295,7 @@ public void unassignPersistentTask(
295295
final String reason,
296296
final ActionListener<PersistentTask<?>> listener
297297
) {
298-
submitUnbatchedTask("unassign persistent task from any node", new ClusterStateUpdateTask() {
298+
submitUnbatchedTask("unassign persistent task [" + taskId + "] from any node", new ClusterStateUpdateTask() {
299299
@Override
300300
public ClusterState execute(ClusterState currentState) throws Exception {
301301
PersistentTasksCustomMetadata.Builder tasksInProgress = builder(currentState);

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/action/TransportSetTransformUpgradeModeActionTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static org.hamcrest.Matchers.is;
3939
import static org.mockito.ArgumentMatchers.any;
4040
import static org.mockito.ArgumentMatchers.eq;
41+
import static org.mockito.ArgumentMatchers.matches;
4142
import static org.mockito.Mockito.doAnswer;
4243
import static org.mockito.Mockito.mock;
4344
import static org.mockito.Mockito.never;
@@ -176,7 +177,7 @@ public void testEnableUpgradeMode() throws InterruptedException {
176177

177178
upgradeModeSuccessfullyChanged(stateWithTransformTask(), assertNoFailureListener(r -> {
178179
assertThat(r, is(AcknowledgedResponse.TRUE));
179-
verify(clusterService).submitUnbatchedStateUpdateTask(eq("unassign persistent task from any node"), any());
180+
verify(clusterService).submitUnbatchedStateUpdateTask(matches("unassign persistent task \\[.*\\] from any node"), any());
180181
}));
181182
}
182183

0 commit comments

Comments
 (0)