1212import org .elasticsearch .core .Tuple ;
1313import org .elasticsearch .persistent .AllocatedPersistentTask ;
1414import org .elasticsearch .tasks .TaskId ;
15- import org .elasticsearch .threadpool .ThreadPool ;
1615
1716import java .util .ArrayList ;
1817import java .util .List ;
1918import java .util .Map ;
19+ import java .util .concurrent .atomic .AtomicInteger ;
2020
2121public class ReindexDataStreamTask extends AllocatedPersistentTask {
2222 public static final String TASK_NAME = "reindex-data-stream" ;
2323 private final long persistentTaskStartTime ;
2424 private final int totalIndices ;
2525 private final int totalIndicesToBeUpgraded ;
26- private final ThreadPool threadPool ;
2726 private boolean complete = false ;
2827 private Exception exception ;
29- private List < String > inProgress = new ArrayList <>( );
30- private List < String > pending = List . of ();
28+ private AtomicInteger inProgress = new AtomicInteger ( 0 );
29+ private AtomicInteger pending = new AtomicInteger ();
3130 private List <Tuple <String , Exception >> errors = new ArrayList <>();
3231
3332 public ReindexDataStreamTask (
3433 long persistentTaskStartTime ,
3534 int totalIndices ,
3635 int totalIndicesToBeUpgraded ,
37- ThreadPool threadPool ,
3836 long id ,
3937 String type ,
4038 String action ,
@@ -46,7 +44,6 @@ public ReindexDataStreamTask(
4644 this .persistentTaskStartTime = persistentTaskStartTime ;
4745 this .totalIndices = totalIndices ;
4846 this .totalIndicesToBeUpgraded = totalIndicesToBeUpgraded ;
49- this .threadPool = threadPool ;
5047 }
5148
5249 @ Override
@@ -57,30 +54,36 @@ public ReindexDataStreamStatus getStatus() {
5754 totalIndicesToBeUpgraded ,
5855 complete ,
5956 exception ,
60- inProgress .size (),
61- pending .size (),
57+ inProgress .get (),
58+ pending .get (),
6259 errors
6360 );
6461 }
6562
66- public void reindexSucceeded () {
63+ public void allReindexesCompleted () {
6764 this .complete = true ;
6865 }
6966
70- public void reindexFailed (Exception e ) {
67+ public void taskFailed (Exception e ) {
7168 this .complete = true ;
7269 this .exception = e ;
7370 }
7471
75- public void setInProgressIndices (List <String > inProgressIndices ) {
76- this .inProgress = inProgressIndices ;
72+ public void reindexSucceeded () {
73+ inProgress .decrementAndGet ();
74+ }
75+
76+ public void reindexFailed (String index , Exception error ) {
77+ this .errors .add (Tuple .tuple (index , error ));
78+ inProgress .decrementAndGet ();
7779 }
7880
79- public void setPendingIndices (List <String > pendingIndices ) {
80- this .pending = pendingIndices ;
81+ public void incrementInProgressIndicesCount () {
82+ inProgress .incrementAndGet ();
83+ pending .decrementAndGet ();
8184 }
8285
83- public void addErrorIndex ( String index , Exception error ) {
84- this . errors . add ( Tuple . tuple ( index , error ) );
86+ public void setPendingIndicesCount ( int size ) {
87+ pending . set ( size );
8588 }
8689}
0 commit comments