1818import org .elasticsearch .action .datastreams .GetDataStreamAction ;
1919import org .elasticsearch .action .datastreams .ModifyDataStreamsAction ;
2020import org .elasticsearch .action .support .CountDownActionListener ;
21+ import org .elasticsearch .action .support .IndicesOptions ;
2122import org .elasticsearch .action .support .SubscribableListener ;
2223import org .elasticsearch .action .support .master .AcknowledgedResponse ;
2324import org .elasticsearch .client .internal .Client ;
2425import org .elasticsearch .cluster .metadata .DataStream ;
2526import org .elasticsearch .cluster .metadata .DataStreamAction ;
27+ import org .elasticsearch .cluster .metadata .IndexMetadata ;
2628import org .elasticsearch .cluster .service .ClusterService ;
2729import org .elasticsearch .common .settings .Setting ;
2830import org .elasticsearch .core .Nullable ;
3537import org .elasticsearch .tasks .TaskId ;
3638import org .elasticsearch .threadpool .ThreadPool ;
3739import org .elasticsearch .xpack .migrate .action .ReindexDataStreamIndexAction ;
40+ import org .elasticsearch .xpack .migrate .action .ReindexDataStreamIndexTransportAction ;
3841
3942import java .util .ArrayList ;
4043import java .util .Collections ;
@@ -59,6 +62,7 @@ public class ReindexDataStreamPersistentTaskExecutor extends PersistentTasksExec
5962 );
6063 private static final Logger logger = LogManager .getLogger (ReindexDataStreamPersistentTaskExecutor .class );
6164 private static final TimeValue TASK_KEEP_ALIVE_TIME = TimeValue .timeValueDays (1 );
65+ private static final IndicesOptions IGNORE_MISSING_OPTIONS = IndicesOptions .fromOptions (true , true , false , false );
6266 private final Client client ;
6367 private final ClusterService clusterService ;
6468 private final ThreadPool threadPool ;
@@ -224,9 +228,26 @@ private void maybeProcessNextIndex(
224228 listener .onResponse (null );
225229 maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , sourceDataStream , listener , parentTaskId );
226230 }, e -> {
227- reindexDataStreamTask .reindexFailed (index .getName (), e );
228- listener .onResponse (null );
229- maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , sourceDataStream , listener , parentTaskId );
231+ /*
232+ * One possible cause of exception here is that the source index no longer exists. This can happen if ILM performs certain
233+ * actions while reindex is happening, or if a user manually deletes it. In this case we don't want to fail the task. We
234+ * treat it as a success and move on after making an attempt at deleting the destination index if it has been created yet.
235+ */
236+ IndexMetadata sourceIndex = clusterService .state ().getMetadata ().index (index .getName ());
237+ if (sourceIndex == null ) {
238+ String destIndexName = ReindexDataStreamIndexTransportAction .generateDestIndexName (index .getName ());
239+ logMissingSourceIndex (sourceDataStream , index .getName (), destIndexName );
240+ deleteIndex (destIndexName , parentTaskId , ActionListener .wrap (acknowledgedResponse -> {
241+ reindexDataStreamTask .reindexSucceeded (index .getName ());
242+ listener .onResponse (null );
243+ maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , sourceDataStream , listener , parentTaskId );
244+ }, listener ::onFailure ));
245+ } else {
246+ // The source index still exists, so this is a real problem we want to record
247+ reindexDataStreamTask .reindexFailed (index .getName (), e );
248+ listener .onResponse (null );
249+ maybeProcessNextIndex (indicesRemaining , reindexDataStreamTask , sourceDataStream , listener , parentTaskId );
250+ }
230251 }));
231252 }
232253
@@ -237,17 +258,32 @@ private void updateDataStream(
237258 ActionListener <AcknowledgedResponse > listener ,
238259 TaskId parentTaskId
239260 ) {
240- ModifyDataStreamsAction .Request modifyDataStreamRequest = new ModifyDataStreamsAction .Request (
241- TimeValue .MAX_VALUE ,
242- TimeValue .MAX_VALUE ,
243- List .of (DataStreamAction .removeBackingIndex (dataStream , oldIndex ), DataStreamAction .addBackingIndex (dataStream , newIndex ))
261+ IndexMetadata sourceIndex = clusterService .state ().getMetadata ().index (oldIndex );
262+ if (sourceIndex == null ) {
263+ logMissingSourceIndex (dataStream , oldIndex , newIndex );
264+ deleteIndex (newIndex , parentTaskId , listener );
265+ } else {
266+ ModifyDataStreamsAction .Request modifyDataStreamRequest = new ModifyDataStreamsAction .Request (
267+ TimeValue .MAX_VALUE ,
268+ TimeValue .MAX_VALUE ,
269+ List .of (DataStreamAction .removeBackingIndex (dataStream , oldIndex ), DataStreamAction .addBackingIndex (dataStream , newIndex ))
270+ );
271+ modifyDataStreamRequest .setParentTask (parentTaskId );
272+ client .execute (ModifyDataStreamsAction .INSTANCE , modifyDataStreamRequest , listener );
273+ }
274+ }
275+
276+ private void logMissingSourceIndex (String dataStreamName , String sourceIndexName , String destinationIndexName ) {
277+ logger .debug (
278+ "Index {} in data stream {} no longer exists after reindexing completed. Deleting reindexed index {}" ,
279+ sourceIndexName ,
280+ dataStreamName ,
281+ destinationIndexName
244282 );
245- modifyDataStreamRequest .setParentTask (parentTaskId );
246- client .execute (ModifyDataStreamsAction .INSTANCE , modifyDataStreamRequest , listener );
247283 }
248284
249285 private void deleteIndex (String indexName , TaskId parentTaskId , ActionListener <AcknowledgedResponse > listener ) {
250- DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest (indexName );
286+ DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest (indexName ). indicesOptions ( IGNORE_MISSING_OPTIONS ) ;
251287 deleteIndexRequest .setParentTask (parentTaskId );
252288 client .execute (TransportDeleteIndexAction .TYPE , deleteIndexRequest , listener );
253289 }
0 commit comments