99
1010import  org .elasticsearch .ElasticsearchException ;
1111import  org .elasticsearch .action .ActionListener ;
12+ import  org .elasticsearch .action .admin .indices .rollover .RolloverAction ;
13+ import  org .elasticsearch .action .admin .indices .rollover .RolloverRequest ;
1214import  org .elasticsearch .action .datastreams .GetDataStreamAction ;
15+ import  org .elasticsearch .action .datastreams .ModifyDataStreamsAction ;
16+ import  org .elasticsearch .action .support .CountDownActionListener ;
17+ import  org .elasticsearch .action .support .master .AcknowledgedResponse ;
1318import  org .elasticsearch .client .internal .Client ;
19+ import  org .elasticsearch .cluster .metadata .DataStream ;
20+ import  org .elasticsearch .cluster .metadata .DataStreamAction ;
1421import  org .elasticsearch .cluster .service .ClusterService ;
1522import  org .elasticsearch .core .TimeValue ;
1623import  org .elasticsearch .index .Index ;
2027import  org .elasticsearch .persistent .PersistentTasksExecutor ;
2128import  org .elasticsearch .tasks .TaskId ;
2229import  org .elasticsearch .threadpool .ThreadPool ;
30+ import  org .elasticsearch .xpack .migrate .action .ReindexDataStreamIndexAction ;
2331
32+ import  java .util .ArrayList ;
33+ import  java .util .Collections ;
2434import  java .util .List ;
2535import  java .util .Map ;
36+ import  java .util .NoSuchElementException ;
2637
2738import  static  org .elasticsearch .xpack .migrate .action .ReindexDataStreamAction .getOldIndexVersionPredicate ;
2839
@@ -72,22 +83,109 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
7283        reindexClient .execute (GetDataStreamAction .INSTANCE , request , ActionListener .wrap (response  -> {
7384            List <GetDataStreamAction .Response .DataStreamInfo > dataStreamInfos  = response .getDataStreams ();
7485            if  (dataStreamInfos .size () == 1 ) {
75-                 List <Index > indices  = dataStreamInfos .getFirst ().getDataStream ().getIndices ();
76-                 List <Index > indicesToBeReindexed  = indices .stream ()
77-                     .filter (getOldIndexVersionPredicate (clusterService .state ().metadata ()))
78-                     .toList ();
79-                 reindexDataStreamTask .setPendingIndicesCount (indicesToBeReindexed .size ());
80-                 for  (Index  index  : indicesToBeReindexed ) {
81-                     reindexDataStreamTask .incrementInProgressIndicesCount (index .getName ());
82-                     // TODO This is just a placeholder. This is where the real data stream reindex logic will go 
83-                     reindexDataStreamTask .reindexSucceeded (index .getName ());
86+                 DataStream  dataStream  = dataStreamInfos .getFirst ().getDataStream ();
87+                 if  (getOldIndexVersionPredicate (clusterService .state ().metadata ()).test (dataStream .getWriteIndex ())) {
88+                     reindexClient .execute (
89+                         RolloverAction .INSTANCE ,
90+                         new  RolloverRequest (sourceDataStream , null ),
91+                         ActionListener .wrap (
92+                             rolloverResponse  -> reindexIndices (dataStream , reindexDataStreamTask , reindexClient , sourceDataStream ),
93+                             e  -> completeFailedPersistentTask (reindexDataStreamTask , e )
94+                         )
95+                     );
96+                 } else  {
97+                     reindexIndices (dataStream , reindexDataStreamTask , reindexClient , sourceDataStream );
8498                }
85- 
86-                 completeSuccessfulPersistentTask (reindexDataStreamTask );
8799            } else  {
88100                completeFailedPersistentTask (reindexDataStreamTask , new  ElasticsearchException ("data stream does not exist" ));
89101            }
90-         }, reindexDataStreamTask ::markAsFailed ));
102+         }, exception  -> completeFailedPersistentTask (reindexDataStreamTask , exception )));
103+     }
104+ 
105+     private  void  reindexIndices (
106+         DataStream  dataStream ,
107+         ReindexDataStreamTask  reindexDataStreamTask ,
108+         ExecuteWithHeadersClient  reindexClient ,
109+         String  sourceDataStream 
110+     ) {
111+         List <Index > indices  = dataStream .getIndices ();
112+         List <Index > indicesToBeReindexed  = indices .stream ().filter (getOldIndexVersionPredicate (clusterService .state ().metadata ())).toList ();
113+         reindexDataStreamTask .setPendingIndicesCount (indicesToBeReindexed .size ());
114+         // The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices 
115+         CountDownActionListener  listener  = new  CountDownActionListener (indicesToBeReindexed .size () + 1 , ActionListener .wrap (response1  -> {
116+             completeSuccessfulPersistentTask (reindexDataStreamTask );
117+         }, exception  -> { completeFailedPersistentTask (reindexDataStreamTask , exception ); }));
118+         List <Index > indicesRemaining  = Collections .synchronizedList (new  ArrayList <>(indicesToBeReindexed ));
119+         final  int  maxConcurrentIndices  = 1 ;
120+         for  (int  i  = 0 ; i  < maxConcurrentIndices ; i ++) {
121+             maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , reindexClient , sourceDataStream , listener );
122+         }
123+         // This takes care of the additional latch count referenced above: 
124+         listener .onResponse (null );
125+     }
126+ 
127+     private  void  maybeProcessNextIndex (
128+         List <Index > indicesRemaining ,
129+         ReindexDataStreamTask  reindexDataStreamTask ,
130+         ExecuteWithHeadersClient  reindexClient ,
131+         String  sourceDataStream ,
132+         CountDownActionListener  listener 
133+     ) {
134+         if  (indicesRemaining .isEmpty ()) {
135+             return ;
136+         }
137+         Index  index ;
138+         try  {
139+             index  = indicesRemaining .removeFirst ();
140+         } catch  (NoSuchElementException  e ) {
141+             return ;
142+         }
143+         reindexDataStreamTask .incrementInProgressIndicesCount (index .getName ());
144+         reindexClient .execute (
145+             ReindexDataStreamIndexAction .INSTANCE ,
146+             new  ReindexDataStreamIndexAction .Request (index .getName ()),
147+             ActionListener .wrap (response1  -> {
148+                 updateDataStream (sourceDataStream , index .getName (), response1 .getDestIndex (), ActionListener .wrap (unused  -> {
149+                     reindexDataStreamTask .reindexSucceeded (index .getName ());
150+                     listener .onResponse (null );
151+                     maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , reindexClient , sourceDataStream , listener );
152+                 }, exception  -> {
153+                     reindexDataStreamTask .reindexFailed (index .getName (), exception );
154+                     listener .onResponse (null );
155+                 }), reindexClient );
156+             }, exception  -> {
157+                 reindexDataStreamTask .reindexFailed (index .getName (), exception );
158+                 listener .onResponse (null );
159+             })
160+         );
161+     }
162+ 
163+     private  void  updateDataStream (
164+         String  dataStream ,
165+         String  oldIndex ,
166+         String  newIndex ,
167+         ActionListener <Void > listener ,
168+         ExecuteWithHeadersClient  reindexClient 
169+     ) {
170+         reindexClient .execute (
171+             ModifyDataStreamsAction .INSTANCE ,
172+             new  ModifyDataStreamsAction .Request (
173+                 TimeValue .MAX_VALUE ,
174+                 TimeValue .MAX_VALUE ,
175+                 List .of (DataStreamAction .removeBackingIndex (dataStream , oldIndex ), DataStreamAction .addBackingIndex (dataStream , newIndex ))
176+             ),
177+             new  ActionListener <>() {
178+                 @ Override 
179+                 public  void  onResponse (AcknowledgedResponse  response ) {
180+                     listener .onResponse (null );
181+                 }
182+ 
183+                 @ Override 
184+                 public  void  onFailure (Exception  e ) {
185+                     listener .onFailure (e );
186+                 }
187+             }
188+         );
91189    }
92190
93191    private  void  completeSuccessfulPersistentTask (ReindexDataStreamTask  persistentTask ) {
@@ -105,6 +203,9 @@ private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
105203        PersistentTasksCustomMetadata .PersistentTask <?> persistentTask  = persistentTasksCustomMetadata .getTask (
106204            reindexDataStreamTask .getPersistentTaskId ()
107205        );
206+         if  (persistentTask  == null ) {
207+             return  TimeValue .timeValueMillis (0 );
208+         }
108209        PersistentTaskState  state  = persistentTask .getState ();
109210        final  long  completionTime ;
110211        if  (state  == null ) {
0 commit comments