99
1010import org .elasticsearch .ElasticsearchException ;
1111import org .elasticsearch .action .ActionListener ;
12+ import org .elasticsearch .action .admin .indices .rollover .RolloverAction ;
13+ import org .elasticsearch .action .admin .indices .rollover .RolloverRequest ;
1214import org .elasticsearch .action .datastreams .GetDataStreamAction ;
15+ import org .elasticsearch .action .datastreams .ModifyDataStreamsAction ;
16+ import org .elasticsearch .action .support .CountDownActionListener ;
17+ import org .elasticsearch .action .support .master .AcknowledgedResponse ;
1318import org .elasticsearch .client .internal .Client ;
19+ import org .elasticsearch .cluster .metadata .DataStream ;
20+ import org .elasticsearch .cluster .metadata .DataStreamAction ;
1421import org .elasticsearch .cluster .service .ClusterService ;
1522import org .elasticsearch .core .TimeValue ;
1623import org .elasticsearch .index .Index ;
2027import org .elasticsearch .persistent .PersistentTasksExecutor ;
2128import org .elasticsearch .tasks .TaskId ;
2229import org .elasticsearch .threadpool .ThreadPool ;
30+ import org .elasticsearch .xpack .migrate .action .ReindexDataStreamIndexAction ;
2331
32+ import java .util .ArrayList ;
33+ import java .util .Collections ;
2434import java .util .List ;
2535import java .util .Map ;
2636
@@ -72,22 +82,109 @@ protected void nodeOperation(AllocatedPersistentTask task, ReindexDataStreamTask
7282 reindexClient .execute (GetDataStreamAction .INSTANCE , request , ActionListener .wrap (response -> {
7383 List <GetDataStreamAction .Response .DataStreamInfo > dataStreamInfos = response .getDataStreams ();
7484 if (dataStreamInfos .size () == 1 ) {
75- List <Index > indices = dataStreamInfos .get (0 ).getDataStream ().getIndices ();
76- List <Index > indicesToBeReindexed = indices .stream ()
77- .filter (getReindexRequiredPredicate (clusterService .state ().metadata ()))
78- .toList ();
79- reindexDataStreamTask .setPendingIndicesCount (indicesToBeReindexed .size ());
80- for (Index index : indicesToBeReindexed ) {
81- reindexDataStreamTask .incrementInProgressIndicesCount (index .getName ());
82- // TODO This is just a placeholder. This is where the real data stream reindex logic will go
83- reindexDataStreamTask .reindexSucceeded (index .getName ());
85+ DataStream dataStream = dataStreamInfos .get (0 ).getDataStream ();
86+ if (getReindexRequiredPredicate (clusterService .state ().metadata ()).test (dataStream .getWriteIndex ())) {
87+ reindexClient .execute (
88+ RolloverAction .INSTANCE ,
89+ new RolloverRequest (sourceDataStream , null ),
90+ ActionListener .wrap (
91+ rolloverResponse -> reindexIndices (dataStream , reindexDataStreamTask , reindexClient , sourceDataStream ),
92+ e -> completeFailedPersistentTask (reindexDataStreamTask , e )
93+ )
94+ );
95+ } else {
96+ reindexIndices (dataStream , reindexDataStreamTask , reindexClient , sourceDataStream );
8497 }
85-
86- completeSuccessfulPersistentTask (reindexDataStreamTask );
8798 } else {
8899 completeFailedPersistentTask (reindexDataStreamTask , new ElasticsearchException ("data stream does not exist" ));
89100 }
90- }, reindexDataStreamTask ::markAsFailed ));
101+ }, exception -> completeFailedPersistentTask (reindexDataStreamTask , exception )));
102+ }
103+
104+ private void reindexIndices (
105+ DataStream dataStream ,
106+ ReindexDataStreamTask reindexDataStreamTask ,
107+ ExecuteWithHeadersClient reindexClient ,
108+ String sourceDataStream
109+ ) {
110+ List <Index > indices = dataStream .getIndices ();
111+ List <Index > indicesToBeReindexed = indices .stream ().filter (getReindexRequiredPredicate (clusterService .state ().metadata ())).toList ();
112+ reindexDataStreamTask .setPendingIndicesCount (indicesToBeReindexed .size ());
113+ // The CountDownActionListener is 1 more than the number of indices so that the count is not 0 if we have no indices
114+ CountDownActionListener listener = new CountDownActionListener (indicesToBeReindexed .size () + 1 , ActionListener .wrap (response1 -> {
115+ completeSuccessfulPersistentTask (reindexDataStreamTask );
116+ }, exception -> { completeFailedPersistentTask (reindexDataStreamTask , exception ); }));
117+ List <Index > indicesRemaining = Collections .synchronizedList (new ArrayList <>(indicesToBeReindexed ));
118+ final int maxConcurrentIndices = 1 ;
119+ for (int i = 0 ; i < maxConcurrentIndices ; i ++) {
120+ maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , reindexClient , sourceDataStream , listener );
121+ }
122+ // This takes care of the additional latch count referenced above:
123+ listener .onResponse (null );
124+ }
125+
126+ private void maybeProcessNextIndex (
127+ List <Index > indicesRemaining ,
128+ ReindexDataStreamTask reindexDataStreamTask ,
129+ ExecuteWithHeadersClient reindexClient ,
130+ String sourceDataStream ,
131+ CountDownActionListener listener
132+ ) {
133+ if (indicesRemaining .isEmpty ()) {
134+ return ;
135+ }
136+ Index index ;
137+ try {
138+ index = indicesRemaining .remove (0 );
139+ } catch (IndexOutOfBoundsException e ) {
140+ return ;
141+ }
142+ reindexDataStreamTask .incrementInProgressIndicesCount (index .getName ());
143+ reindexClient .execute (
144+ ReindexDataStreamIndexAction .INSTANCE ,
145+ new ReindexDataStreamIndexAction .Request (index .getName ()),
146+ ActionListener .wrap (response1 -> {
147+ updateDataStream (sourceDataStream , index .getName (), response1 .getDestIndex (), ActionListener .wrap (unused -> {
148+ reindexDataStreamTask .reindexSucceeded (index .getName ());
149+ listener .onResponse (null );
150+ maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , reindexClient , sourceDataStream , listener );
151+ }, exception -> {
152+ reindexDataStreamTask .reindexFailed (index .getName (), exception );
153+ listener .onResponse (null );
154+ }), reindexClient );
155+ }, exception -> {
156+ reindexDataStreamTask .reindexFailed (index .getName (), exception );
157+ listener .onResponse (null );
158+ })
159+ );
160+ }
161+
162+ private void updateDataStream (
163+ String dataStream ,
164+ String oldIndex ,
165+ String newIndex ,
166+ ActionListener <Void > listener ,
167+ ExecuteWithHeadersClient reindexClient
168+ ) {
169+ reindexClient .execute (
170+ ModifyDataStreamsAction .INSTANCE ,
171+ new ModifyDataStreamsAction .Request (
172+ TimeValue .MAX_VALUE ,
173+ TimeValue .MAX_VALUE ,
174+ List .of (DataStreamAction .removeBackingIndex (dataStream , oldIndex ), DataStreamAction .addBackingIndex (dataStream , newIndex ))
175+ ),
176+ new ActionListener <>() {
177+ @ Override
178+ public void onResponse (AcknowledgedResponse response ) {
179+ listener .onResponse (null );
180+ }
181+
182+ @ Override
183+ public void onFailure (Exception e ) {
184+ listener .onFailure (e );
185+ }
186+ }
187+ );
91188 }
92189
93190 private void completeSuccessfulPersistentTask (ReindexDataStreamTask persistentTask ) {
@@ -105,6 +202,9 @@ private TimeValue getTimeToLive(ReindexDataStreamTask reindexDataStreamTask) {
105202 PersistentTasksCustomMetadata .PersistentTask <?> persistentTask = persistentTasksCustomMetadata .getTask (
106203 reindexDataStreamTask .getPersistentTaskId ()
107204 );
205+ if (persistentTask == null ) {
206+ return TimeValue .timeValueMillis (0 );
207+ }
108208 PersistentTaskState state = persistentTask .getState ();
109209 final long completionTime ;
110210 if (state == null ) {
0 commit comments