@@ -207,49 +207,14 @@ public void run(SystemIndexMigrationTaskState taskState) {
207207        }
208208
209209        // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex 
210-         cleanUpPreviousMigration (
211-             taskState ,
212-             clusterState ,
213-             state  -> prepareNextIndex (state , state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName )
214-         );
215-     }
216- 
217-     private  void  cleanUpPreviousMigration (
218-         SystemIndexMigrationTaskState  taskState ,
219-         ClusterState  currentState ,
220-         Consumer <ClusterState > listener 
221-     ) {
222210        logger .debug ("cleaning up previous migration, task state: [{}]" , taskState  == null  ? "null"  : Strings .toString (taskState ));
223-         if  (taskState  != null  && taskState .getCurrentIndex () != null ) {
224-             SystemIndexMigrationInfo  migrationInfo ;
225-             try  {
226-                 migrationInfo  = SystemIndexMigrationInfo .fromTaskState (
227-                     taskState ,
228-                     systemIndices ,
229-                     currentState .metadata (),
230-                     indexScopedSettings 
231-                 );
232-             } catch  (Exception  e ) {
233-                 markAsFailed (e );
234-                 return ;
235-             }
236-             final  String  newIndexName  = migrationInfo .getNextIndexName ();
237-             logger .info ("removing index [{}] from previous incomplete migration" , newIndexName );
238- 
239-             migrationInfo .createClient (baseClient )
240-                 .admin ()
241-                 .indices ()
242-                 .prepareDelete (newIndexName )
243-                 .execute (ActionListener .wrap (ackedResponse  -> {
244-                     if  (ackedResponse .isAcknowledged ()) {
245-                         logger .debug ("successfully removed index [{}]" , newIndexName );
246-                         clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
247-                     }
248-                 }, this ::markAsFailed ));
249-         } else  {
250-             logger .debug ("no incomplete index to remove" );
251-             clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
252-         }
211+         clearResults (
212+             clusterService ,
213+             ActionListener .wrap (
214+                 state  -> prepareNextIndex (state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName ),
215+                 this ::markAsFailed 
216+             )
217+         );
253218    }
254219
255220    private  void  finishIndexAndLoop (BulkByScrollResponse  bulkResponse ) {
@@ -289,11 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
289254                }, this ::markAsFailed )
290255            );
291256        } else  {
292-             prepareNextIndex (
293-                 clusterService .state (),
294-                 state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ),
295-                 lastMigrationInfo .getFeatureName ()
296-             );
257+             prepareNextIndex (state2  -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), lastMigrationInfo .getFeatureName ());
297258        }
298259    }
299260
@@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
303264            SingleFeatureMigrationResult .success (),
304265            ActionListener .wrap (state  -> {
305266                prepareNextIndex (
306-                     state ,
307267                    clusterState  -> migrateSingleIndex (clusterState , this ::finishIndexAndLoop ),
308268                    lastMigrationInfo .getFeatureName ()
309269                );
@@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
312272        updateTask .submit (clusterService );
313273    }
314274
315-     private  void  prepareNextIndex (ClusterState   clusterState ,  Consumer <ClusterState > listener , String  lastFeatureName ) {
275+     private  void  prepareNextIndex (Consumer <ClusterState > listener , String  lastFeatureName ) {
316276        synchronized  (migrationQueue ) {
317277            assert  migrationQueue  != null ;
318278            if  (migrationQueue .isEmpty ()) {
@@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
424384        logger .info ("migrating index [{}] from feature [{}] to new index [{}]" , oldIndexName , migrationInfo .getFeatureName (), newIndexName );
425385        ActionListener <BulkByScrollResponse > innerListener  = ActionListener .wrap (listener ::accept , this ::markAsFailed );
426386        try  {
427-             createIndex (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
387+             createIndexRetryOnFailure (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
428388                logger .debug (
429389                    "while migrating [{}] , got create index response: [{}]" ,
430390                    oldIndexName ,
@@ -508,7 +468,22 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
508468        }
509469    }
510470
471+     // ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed); 
472+     private  <T > void  retryAfterFailureHandler (
473+         ActionListener <T > listener ,
474+         Consumer <ActionListener <T >> retryableAction ,
475+         Consumer <Exception > failureHandler 
476+     ) {
477+         retryableAction .accept (ActionListener .wrap (listener ::onResponse , e  -> {
478+             logger .error ("error occurred while executing retryable action" , e );
479+             failureHandler .accept (e );
480+             listener .onFailure (e );
481+         }));
482+     }
483+ 
511484    private  void  createIndex (SystemIndexMigrationInfo  migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
485+         logger .info ("creating new system index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
486+ 
512487        final  CreateIndexClusterStateUpdateRequest  createRequest  = new  CreateIndexClusterStateUpdateRequest (
513488            "migrate-system-index" ,
514489            migrationInfo .getNextIndexName (),
@@ -534,6 +509,27 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
534509        );
535510    }
536511
512+     private  void  createIndexRetryOnFailure (SystemIndexMigrationInfo  migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
513+         createIndex (migrationInfo , ActionListener .wrap (listener ::onResponse , e  -> {
514+             logger .warn ("createIndex failed, retrying after removing index [{}] from previous attempt" , migrationInfo .getNextIndexName ());
515+             deleteIndex (migrationInfo , ActionListener .wrap (cleanupResponse  -> createIndex (migrationInfo , listener ), e2  -> {
516+                 logger .warn ("createIndex failed after retrying, aborting" , e2 );
517+                 listener .onFailure (e2 );
518+             }));
519+         }));
520+     }
521+ 
522+     private  <T > void  deleteIndex (SystemIndexMigrationInfo  migrationInfo , ActionListener <AcknowledgedResponse > listener ) {
523+         logger .info ("removing index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
524+         String  newIndexName  = migrationInfo .getNextIndexName ();
525+         baseClient .admin ().indices ().prepareDelete (newIndexName ).execute (ActionListener .wrap (ackedResponse  -> {
526+             if  (ackedResponse .isAcknowledged ()) {
527+                 logger .info ("successfully removed index [{}]" , newIndexName );
528+                 listener .onResponse (ackedResponse );
529+             }
530+         }, listener ::onFailure ));
531+     }
532+ 
537533    private  void  setAliasAndRemoveOldIndex (SystemIndexMigrationInfo  migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
538534        final  IndicesAliasesRequestBuilder  aliasesRequest  = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
539535        aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
@@ -639,6 +635,7 @@ public void markAsFailed(Exception e) {
639635        String  indexName  = Optional .ofNullable (migrationInfo ).map (SystemIndexMigrationInfo ::getCurrentIndexName ).orElse ("<unknown index>" );
640636
641637        MigrationResultsUpdateTask .upsert (
638+             // this is for the Custom Metadata, not the Persistent Task 
642639            featureName ,
643640            SingleFeatureMigrationResult .failure (indexName , e ),
644641            ActionListener .wrap (state  -> super .markAsFailed (e ), exception  -> super .markAsFailed (e ))
0 commit comments