@@ -208,10 +208,13 @@ public void run(SystemIndexMigrationTaskState taskState) {
208208
209209 // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
210210 logger .debug ("cleaning up previous migration, task state: [{}]" , taskState == null ? "null" : Strings .toString (taskState ));
211- clearResults (clusterService , ActionListener .wrap (state ->
212- prepareNextIndex (state2 ->
213- migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName ),
214- this ::markAsFailed ));
211+ clearResults (
212+ clusterService ,
213+ ActionListener .wrap (
214+ state -> prepareNextIndex (state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName ),
215+ this ::markAsFailed
216+ )
217+ );
215218 }
216219
217220 private void finishIndexAndLoop (BulkByScrollResponse bulkResponse ) {
@@ -251,10 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
251254 }, this ::markAsFailed )
252255 );
253256 } else {
254- prepareNextIndex (
255- state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ),
256- lastMigrationInfo .getFeatureName ()
257- );
257+ prepareNextIndex (state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), lastMigrationInfo .getFeatureName ());
258258 }
259259 }
260260
@@ -468,7 +468,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
468468 }
469469 }
470470
471- // ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
471+ // ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
472472 private <T > void retryAfterFailureHandler (
473473 ActionListener <T > listener ,
474474 Consumer <ActionListener <T >> retryableAction ,
@@ -482,11 +482,7 @@ private <T> void retryAfterFailureHandler(
482482 }
483483
484484 private void createIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
485- logger .info (
486- "creating new system index [{}] from feature [{}]" ,
487- migrationInfo .getNextIndexName (),
488- migrationInfo .getFeatureName ()
489- );
485+ logger .info ("creating new system index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
490486
491487 final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest (
492488 "migrate-system-index" ,
@@ -513,38 +509,25 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
513509 );
514510 }
515511
516-
517512 private void createIndexRetryOnFailure (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
518513 createIndex (migrationInfo , ActionListener .wrap (listener ::onResponse , e -> {
519- logger .warn (
520- "createIndex failed, retrying after removing index [{}] from previous attempt" ,
521- migrationInfo .getNextIndexName ()
522- );
523- deleteIndex (migrationInfo , ActionListener .wrap (
524- cleanupResponse -> createIndex (migrationInfo , listener ),
525- e2 -> {
526- logger .warn ("createIndex failed after retrying, aborting" , e2 );
527- listener .onFailure (e2 );
528- }
529- ));
514+ logger .warn ("createIndex failed, retrying after removing index [{}] from previous attempt" , migrationInfo .getNextIndexName ());
515+ deleteIndex (migrationInfo , ActionListener .wrap (cleanupResponse -> createIndex (migrationInfo , listener ), e2 -> {
516+ logger .warn ("createIndex failed after retrying, aborting" , e2 );
517+ listener .onFailure (e2 );
518+ }));
530519 }));
531520 }
532521
533- private <T > void deleteIndex (
534- SystemIndexMigrationInfo migrationInfo ,
535- ActionListener <AcknowledgedResponse > listener
536- ) {
522+ private <T > void deleteIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <AcknowledgedResponse > listener ) {
537523 logger .info ("removing index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
538524 String newIndexName = migrationInfo .getNextIndexName ();
539- baseClient .admin ()
540- .indices ()
541- .prepareDelete (newIndexName )
542- .execute (ActionListener .wrap (ackedResponse -> {
543- if (ackedResponse .isAcknowledged ()) {
544- logger .info ("successfully removed index [{}]" , newIndexName );
545- listener .onResponse (ackedResponse );
546- }
547- }, listener ::onFailure ));
525+ baseClient .admin ().indices ().prepareDelete (newIndexName ).execute (ActionListener .wrap (ackedResponse -> {
526+ if (ackedResponse .isAcknowledged ()) {
527+ logger .info ("successfully removed index [{}]" , newIndexName );
528+ listener .onResponse (ackedResponse );
529+ }
530+ }, listener ::onFailure ));
548531 }
549532
550533 private void setAliasAndRemoveOldIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
@@ -651,7 +634,8 @@ public void markAsFailed(Exception e) {
651634 String featureName = Optional .ofNullable (migrationInfo ).map (SystemIndexMigrationInfo ::getFeatureName ).orElse ("<unknown feature>" );
652635 String indexName = Optional .ofNullable (migrationInfo ).map (SystemIndexMigrationInfo ::getCurrentIndexName ).orElse ("<unknown index>" );
653636
654- MigrationResultsUpdateTask .upsert ( // this is for the Custom Metadata, not the Persistent Task
637+ MigrationResultsUpdateTask .upsert (
638+ // this is for the Custom Metadata, not the Persistent Task
655639 featureName ,
656640 SingleFeatureMigrationResult .failure (indexName , e ),
657641 ActionListener .wrap (state -> super .markAsFailed (e ), exception -> super .markAsFailed (e ))
0 commit comments