@@ -206,49 +206,14 @@ public void run(SystemIndexMigrationTaskState taskState) {
206206        }
207207
208208        // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex 
209-         cleanUpPreviousMigration (
210-             taskState ,
211-             clusterState ,
212-             state  -> prepareNextIndex (state , state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName )
213-         );
214-     }
215- 
216-     private  void  cleanUpPreviousMigration (
217-         SystemIndexMigrationTaskState  taskState ,
218-         ClusterState  currentState ,
219-         Consumer <ClusterState > listener 
220-     ) {
221209        logger .debug ("cleaning up previous migration, task state: [{}]" , taskState  == null  ? "null"  : Strings .toString (taskState ));
222-         if  (taskState  != null  && taskState .getCurrentIndex () != null ) {
223-             SystemIndexMigrationInfo  migrationInfo ;
224-             try  {
225-                 migrationInfo  = SystemIndexMigrationInfo .fromTaskState (
226-                     taskState ,
227-                     systemIndices ,
228-                     currentState .metadata (),
229-                     indexScopedSettings 
230-                 );
231-             } catch  (Exception  e ) {
232-                 markAsFailed (e );
233-                 return ;
234-             }
235-             final  String  newIndexName  = migrationInfo .getNextIndexName ();
236-             logger .info ("removing index [{}] from previous incomplete migration" , newIndexName );
237- 
238-             migrationInfo .createClient (baseClient )
239-                 .admin ()
240-                 .indices ()
241-                 .prepareDelete (newIndexName )
242-                 .execute (ActionListener .wrap (ackedResponse  -> {
243-                     if  (ackedResponse .isAcknowledged ()) {
244-                         logger .debug ("successfully removed index [{}]" , newIndexName );
245-                         clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
246-                     }
247-                 }, this ::markAsFailed ));
248-         } else  {
249-             logger .debug ("no incomplete index to remove" );
250-             clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
251-         }
210+         clearResults (
211+             clusterService ,
212+             ActionListener .wrap (
213+                 state  -> prepareNextIndex (state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName ),
214+                 this ::markAsFailed 
215+             )
216+         );
252217    }
253218
254219    private  void  finishIndexAndLoop (BulkByScrollResponse  bulkResponse ) {
@@ -288,11 +253,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
288253                }, this ::markAsFailed )
289254            );
290255        } else  {
291-             prepareNextIndex (
292-                 clusterService .state (),
293-                 state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ),
294-                 lastMigrationInfo .getFeatureName ()
295-             );
256+             prepareNextIndex (state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), lastMigrationInfo .getFeatureName ());
296257        }
297258    }
298259
@@ -302,7 +263,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
302263            SingleFeatureMigrationResult .success (),
303264            ActionListener .wrap (state  -> {
304265                prepareNextIndex (
305-                     state ,
306266                    clusterState  -> migrateSingleIndex (clusterState , this ::finishIndexAndLoop ),
307267                    lastMigrationInfo .getFeatureName ()
308268                );
@@ -311,7 +271,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
311271        updateTask .submit (clusterService );
312272    }
313273
314-     private  void  prepareNextIndex (ClusterState   clusterState ,  Consumer <ClusterState > listener , String  lastFeatureName ) {
274+     private  void  prepareNextIndex (Consumer <ClusterState > listener , String  lastFeatureName ) {
315275        synchronized  (migrationQueue ) {
316276            assert  migrationQueue  != null ;
317277            if  (migrationQueue .isEmpty ()) {
@@ -423,7 +383,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
423383        logger .info ("migrating index [{}] from feature [{}] to new index [{}]" , oldIndexName , migrationInfo .getFeatureName (), newIndexName );
424384        ActionListener <BulkByScrollResponse > innerListener  = ActionListener .wrap (listener ::accept , this ::markAsFailed );
425385        try  {
426-             createIndex (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
386+             createIndexRetryOnFailure (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
427387                logger .debug (
428388                    "while migrating [{}] , got create index response: [{}]" ,
429389                    oldIndexName ,
@@ -508,6 +468,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
508468    }
509469
510470    private  void  createIndex (SystemIndexMigrationInfo  migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
471+         logger .info ("creating new system index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
472+ 
511473        final  CreateIndexClusterStateUpdateRequest  createRequest  = new  CreateIndexClusterStateUpdateRequest (
512474            "migrate-system-index" ,
513475            migrationInfo .getNextIndexName (),
@@ -527,6 +489,35 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
527489        metadataCreateIndexService .createIndex (TimeValue .MINUS_ONE , TimeValue .ZERO , null , createRequest , listener );
528490    }
529491
492+     private  void  createIndexRetryOnFailure (SystemIndexMigrationInfo  migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
493+         createIndex (migrationInfo , listener .delegateResponse ((l , e ) -> {
494+             logger .warn ("createIndex failed, retrying after removing index [{}] from previous attempt" , migrationInfo .getNextIndexName ());
495+             deleteIndex (migrationInfo , ActionListener .wrap (cleanupResponse  -> createIndex (migrationInfo , l .delegateResponse ((l3 , e3 ) -> {
496+                 logger .error (
497+                     "createIndex failed after retrying, aborting system index migration. index: "  + migrationInfo .getNextIndexName (),
498+                     e3 
499+                 );
500+                 l .onFailure (e3 );
501+             })), e2  -> {
502+                 logger .error ("deleteIndex failed, aborting system index migration. index: "  + migrationInfo .getNextIndexName (), e2 );
503+                 l .onFailure (e2 );
504+             }));
505+         }));
506+     }
507+ 
508+     private  <T > void  deleteIndex (SystemIndexMigrationInfo  migrationInfo , ActionListener <AcknowledgedResponse > listener ) {
509+         logger .info ("removing index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
510+         String  newIndexName  = migrationInfo .getNextIndexName ();
511+         baseClient .admin ().indices ().prepareDelete (newIndexName ).execute (ActionListener .wrap (ackedResponse  -> {
512+             if  (ackedResponse .isAcknowledged ()) {
513+                 logger .info ("successfully removed index [{}]" , newIndexName );
514+                 listener .onResponse (ackedResponse );
515+             } else  {
516+                 listener .onFailure (new  ElasticsearchException ("Failed to acknowledge index deletion for ["  + newIndexName  + "]" ));
517+             }
518+         }, listener ::onFailure ));
519+     }
520+ 
530521    private  void  setAliasAndRemoveOldIndex (SystemIndexMigrationInfo  migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
531522        final  IndicesAliasesRequestBuilder  aliasesRequest  = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
532523        aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
0 commit comments