@@ -30,6 +30,7 @@ public class ReindexDataStreamTask extends AllocatedPersistentTask {
3030 private final long persistentTaskStartTime ;
3131 private final int initialTotalIndices ;
3232 private final int initialTotalIndicesToBeUpgraded ;
33+ private boolean isCompleteLocally = false ;
3334 private volatile Exception exception ;
3435 private final Set <String > inProgress = Collections .synchronizedSet (new HashSet <>());
3536 private final AtomicInteger pending = new AtomicInteger ();
@@ -100,6 +101,7 @@ public ReindexDataStreamStatus getStatus() {
100101 }
101102
102103 public void allReindexesCompleted (ThreadPool threadPool , TimeValue timeToLive ) {
104+ isCompleteLocally = true ;
103105 if (isCancelled ()) {
104106 completeTask .run ();
105107 } else {
@@ -126,7 +128,7 @@ public void incrementInProgressIndicesCount(String index) {
126128 pending .decrementAndGet ();
127129 }
128130
129- private boolean isComplete () {
131+ private boolean isCompleteInClusterState () {
130132 PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService .state ()
131133 .getMetadata ()
132134 .getProject ()
@@ -154,8 +156,10 @@ public void onCancelled() {
154156 * If the task is complete, but just waiting for its scheduled removal, we go ahead and call markAsCompleted/markAsFailed
155157 * immediately. This results in the running task being removed from the task manager. If the task is not complete, then one of
156158 * allReindexesCompleted or taskFailed will be called in the future, resulting in the same thing.
159+ * We check both the cluster state and isCompleteLocally -- it is possible (especially in tests) that hte cluster state
160+ * update has not happened in between when allReindexesCompleted was called and when this is called.
157161 */
158- if (isComplete () ) {
162+ if (isCompleteInClusterState () || isCompleteLocally ) {
159163 completeTask .run ();
160164 }
161165 }
0 commit comments