@@ -108,8 +108,9 @@ protected void nodeOperation(
108108 ReindexDataStreamTaskParams params ,
109109 PersistentTaskState persistentTaskState
110110 ) {
111- if (isComplete (persistentTaskState )) {
112- task .markAsCompleted ();
111+ Long completionTime = getCompletionTime (persistentTaskState );
112+ if (completionTime != null && task instanceof ReindexDataStreamTask reindexDataStreamTask ) {
113+ reindexDataStreamTask .allReindexesCompleted (threadPool , getTimeToLive (completionTime ));
113114 return ;
114115 }
115116 ReindexDataStreamPersistentTaskState state = (ReindexDataStreamPersistentTaskState ) persistentTaskState ;
@@ -320,14 +321,14 @@ private void completeFailedPersistentTask(
320321 persistentTask .taskFailed (threadPool , updateCompletionTimeAndGetTimeToLive (persistentTask , state ), e );
321322 }
322323
323- private boolean isComplete (PersistentTaskState persistentTaskState ) {
324+ private Long getCompletionTime (PersistentTaskState persistentTaskState ) {
324325 if (persistentTaskState == null ) {
325- return false ;
326+ return null ;
326327 } else {
327328 if (persistentTaskState instanceof ReindexDataStreamPersistentTaskState state ) {
328- return state .isComplete ();
329+ return state .completionTime ();
329330 } else {
330- return false ;
331+ return null ;
331332 }
332333 }
333334 }
@@ -361,6 +362,10 @@ private TimeValue updateCompletionTimeAndGetTimeToLive(
361362 completionTime = state .completionTime ();
362363 }
363364 }
364- return TimeValue .timeValueMillis (TASK_KEEP_ALIVE_TIME .millis () - (threadPool .absoluteTimeInMillis () - completionTime ));
365+ return getTimeToLive (completionTime );
366+ }
367+
368+ private TimeValue getTimeToLive (long completionTimeInMillis ) {
369+ return TimeValue .timeValueMillis (TASK_KEEP_ALIVE_TIME .millis () - (threadPool .absoluteTimeInMillis () - completionTimeInMillis ));
365370 }
366371}
0 commit comments