@@ -207,49 +207,11 @@ public void run(SystemIndexMigrationTaskState taskState) {
207207 }
208208
209209 // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
210- cleanUpPreviousMigration (
211- taskState ,
212- clusterState ,
213- state -> prepareNextIndex (state , state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName )
214- );
215- }
216-
217- private void cleanUpPreviousMigration (
218- SystemIndexMigrationTaskState taskState ,
219- ClusterState currentState ,
220- Consumer <ClusterState > listener
221- ) {
222210 logger .debug ("cleaning up previous migration, task state: [{}]" , taskState == null ? "null" : Strings .toString (taskState ));
223- if (taskState != null && taskState .getCurrentIndex () != null ) {
224- SystemIndexMigrationInfo migrationInfo ;
225- try {
226- migrationInfo = SystemIndexMigrationInfo .fromTaskState (
227- taskState ,
228- systemIndices ,
229- currentState .metadata (),
230- indexScopedSettings
231- );
232- } catch (Exception e ) {
233- markAsFailed (e );
234- return ;
235- }
236- final String newIndexName = migrationInfo .getNextIndexName ();
237- logger .info ("removing index [{}] from previous incomplete migration" , newIndexName );
238-
239- migrationInfo .createClient (baseClient )
240- .admin ()
241- .indices ()
242- .prepareDelete (newIndexName )
243- .execute (ActionListener .wrap (ackedResponse -> {
244- if (ackedResponse .isAcknowledged ()) {
245- logger .debug ("successfully removed index [{}]" , newIndexName );
246- clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
247- }
248- }, this ::markAsFailed ));
249- } else {
250- logger .debug ("no incomplete index to remove" );
251- clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
252- }
211+ clearResults (clusterService , ActionListener .wrap (state ->
212+ prepareNextIndex (state2 ->
213+ migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName ),
214+ this ::markAsFailed ));
253215 }
254216
255217 private void finishIndexAndLoop (BulkByScrollResponse bulkResponse ) {
@@ -290,7 +252,6 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
290252 );
291253 } else {
292254 prepareNextIndex (
293- clusterService .state (),
294255 state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ),
295256 lastMigrationInfo .getFeatureName ()
296257 );
@@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
303264 SingleFeatureMigrationResult .success (),
304265 ActionListener .wrap (state -> {
305266 prepareNextIndex (
306- state ,
307267 clusterState -> migrateSingleIndex (clusterState , this ::finishIndexAndLoop ),
308268 lastMigrationInfo .getFeatureName ()
309269 );
@@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
312272 updateTask .submit (clusterService );
313273 }
314274
315- private void prepareNextIndex (ClusterState clusterState , Consumer <ClusterState > listener , String lastFeatureName ) {
275+ private void prepareNextIndex (Consumer <ClusterState > listener , String lastFeatureName ) {
316276 synchronized (migrationQueue ) {
317277 assert migrationQueue != null ;
318278 if (migrationQueue .isEmpty ()) {
@@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
424384 logger .info ("migrating index [{}] from feature [{}] to new index [{}]" , oldIndexName , migrationInfo .getFeatureName (), newIndexName );
425385 ActionListener <BulkByScrollResponse > innerListener = ActionListener .wrap (listener ::accept , this ::markAsFailed );
426386 try {
427- createIndex (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
387+ createIndexRetryOnFailure (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
428388 logger .debug (
429389 "while migrating [{}] , got create index response: [{}]" ,
430390 oldIndexName ,
@@ -508,7 +468,26 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
508468 }
509469 }
510470
471+ // ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
472+ private <T > void retryAfterFailureHandler (
473+ ActionListener <T > listener ,
474+ Consumer <ActionListener <T >> retryableAction ,
475+ Consumer <Exception > failureHandler
476+ ) {
477+ retryableAction .accept (ActionListener .wrap (listener ::onResponse , e -> {
478+ logger .error ("error occurred while executing retryable action" , e );
479+ failureHandler .accept (e );
480+ listener .onFailure (e );
481+ }));
482+ }
483+
511484 private void createIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
485+ logger .info (
486+ "creating new system index [{}] from feature [{}]" ,
487+ migrationInfo .getNextIndexName (),
488+ migrationInfo .getFeatureName ()
489+ );
490+
512491 final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest (
513492 "migrate-system-index" ,
514493 migrationInfo .getNextIndexName (),
@@ -534,6 +513,40 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
534513 );
535514 }
536515
516+
517+ private void createIndexRetryOnFailure (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
518+ createIndex (migrationInfo , ActionListener .wrap (listener ::onResponse , e -> {
519+ logger .warn (
520+ "createIndex failed, retrying after removing index [{}] from previous attempt" ,
521+ migrationInfo .getNextIndexName ()
522+ );
523+ deleteIndex (migrationInfo , ActionListener .wrap (
524+ cleanupResponse -> createIndex (migrationInfo , listener ),
525+ e2 -> {
526+ logger .warn ("createIndex failed after retrying, aborting" , e2 );
527+ listener .onFailure (e2 );
528+ }
529+ ));
530+ }));
531+ }
532+
533+ private <T > void deleteIndex (
534+ SystemIndexMigrationInfo migrationInfo ,
535+ ActionListener <AcknowledgedResponse > listener
536+ ) {
537+ logger .info ("removing index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
538+ String newIndexName = migrationInfo .getNextIndexName ();
539+ baseClient .admin ()
540+ .indices ()
541+ .prepareDelete (newIndexName )
542+ .execute (ActionListener .wrap (ackedResponse -> {
543+ if (ackedResponse .isAcknowledged ()) {
544+ logger .info ("successfully removed index [{}]" , newIndexName );
545+ listener .onResponse (ackedResponse );
546+ }
547+ }, listener ::onFailure ));
548+ }
549+
537550 private void setAliasAndRemoveOldIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
538551 final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
539552 aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
@@ -638,7 +651,7 @@ public void markAsFailed(Exception e) {
638651 String featureName = Optional .ofNullable (migrationInfo ).map (SystemIndexMigrationInfo ::getFeatureName ).orElse ("<unknown feature>" );
639652 String indexName = Optional .ofNullable (migrationInfo ).map (SystemIndexMigrationInfo ::getCurrentIndexName ).orElse ("<unknown index>" );
640653
641- MigrationResultsUpdateTask .upsert (
654+ MigrationResultsUpdateTask .upsert ( // this is for the Custom Metadata, not the Persistent Task
642655 featureName ,
643656 SingleFeatureMigrationResult .failure (indexName , e ),
644657 ActionListener .wrap (state -> super .markAsFailed (e ), exception -> super .markAsFailed (e ))
0 commit comments