@@ -206,49 +206,14 @@ public void run(SystemIndexMigrationTaskState taskState) {
206206 }
207207
208208 // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
209- cleanUpPreviousMigration (
210- taskState ,
211- clusterState ,
212- state -> prepareNextIndex (state , state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName )
213- );
214- }
215-
216- private void cleanUpPreviousMigration (
217- SystemIndexMigrationTaskState taskState ,
218- ClusterState currentState ,
219- Consumer <ClusterState > listener
220- ) {
221209 logger .debug ("cleaning up previous migration, task state: [{}]" , taskState == null ? "null" : Strings .toString (taskState ));
222- if (taskState != null && taskState .getCurrentIndex () != null ) {
223- SystemIndexMigrationInfo migrationInfo ;
224- try {
225- migrationInfo = SystemIndexMigrationInfo .fromTaskState (
226- taskState ,
227- systemIndices ,
228- currentState .metadata (),
229- indexScopedSettings
230- );
231- } catch (Exception e ) {
232- markAsFailed (e );
233- return ;
234- }
235- final String newIndexName = migrationInfo .getNextIndexName ();
236- logger .info ("removing index [{}] from previous incomplete migration" , newIndexName );
237-
238- migrationInfo .createClient (baseClient )
239- .admin ()
240- .indices ()
241- .prepareDelete (newIndexName )
242- .execute (ActionListener .wrap (ackedResponse -> {
243- if (ackedResponse .isAcknowledged ()) {
244- logger .debug ("successfully removed index [{}]" , newIndexName );
245- clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
246- }
247- }, this ::markAsFailed ));
248- } else {
249- logger .debug ("no incomplete index to remove" );
250- clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
251- }
210+ clearResults (
211+ clusterService ,
212+ ActionListener .wrap (
213+ state -> prepareNextIndex (state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName ),
214+ this ::markAsFailed
215+ )
216+ );
252217 }
253218
254219 private void finishIndexAndLoop (BulkByScrollResponse bulkResponse ) {
@@ -288,11 +253,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
288253 }, this ::markAsFailed )
289254 );
290255 } else {
291- prepareNextIndex (
292- clusterService .state (),
293- state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ),
294- lastMigrationInfo .getFeatureName ()
295- );
256+ prepareNextIndex (state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), lastMigrationInfo .getFeatureName ());
296257 }
297258 }
298259
@@ -302,7 +263,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
302263 SingleFeatureMigrationResult .success (),
303264 ActionListener .wrap (state -> {
304265 prepareNextIndex (
305- state ,
306266 clusterState -> migrateSingleIndex (clusterState , this ::finishIndexAndLoop ),
307267 lastMigrationInfo .getFeatureName ()
308268 );
@@ -311,7 +271,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
311271 updateTask .submit (clusterService );
312272 }
313273
314- private void prepareNextIndex (ClusterState clusterState , Consumer <ClusterState > listener , String lastFeatureName ) {
274+ private void prepareNextIndex (Consumer <ClusterState > listener , String lastFeatureName ) {
315275 synchronized (migrationQueue ) {
316276 assert migrationQueue != null ;
317277 if (migrationQueue .isEmpty ()) {
@@ -423,7 +383,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
423383 logger .info ("migrating index [{}] from feature [{}] to new index [{}]" , oldIndexName , migrationInfo .getFeatureName (), newIndexName );
424384 ActionListener <BulkByScrollResponse > innerListener = ActionListener .wrap (listener ::accept , this ::markAsFailed );
425385 try {
426- createIndex (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
386+ createIndexRetryOnFailure (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
427387 logger .debug (
428388 "while migrating [{}] , got create index response: [{}]" ,
429389 oldIndexName ,
@@ -508,6 +468,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
508468 }
509469
510470 private void createIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
471+ logger .info ("creating new system index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
472+
511473 final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest (
512474 "migrate-system-index" ,
513475 migrationInfo .getNextIndexName (),
@@ -527,6 +489,35 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
527489 metadataCreateIndexService .createIndex (TimeValue .MINUS_ONE , TimeValue .ZERO , null , createRequest , listener );
528490 }
529491
492+ private void createIndexRetryOnFailure (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
493+ createIndex (migrationInfo , listener .delegateResponse ((l , e ) -> {
494+ logger .warn ("createIndex failed, retrying after removing index [{}] from previous attempt" , migrationInfo .getNextIndexName ());
495+ deleteIndex (migrationInfo , ActionListener .wrap (cleanupResponse -> createIndex (migrationInfo , l .delegateResponse ((l3 , e3 ) -> {
496+ logger .error (
497+ "createIndex failed after retrying, aborting system index migration. index: " + migrationInfo .getNextIndexName (),
498+ e3
499+ );
500+ l .onFailure (e3 );
501+ })), e2 -> {
502+ logger .error ("deleteIndex failed, aborting system index migration. index: " + migrationInfo .getNextIndexName (), e2 );
503+ l .onFailure (e2 );
504+ }));
505+ }));
506+ }
507+
508+ private <T > void deleteIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <AcknowledgedResponse > listener ) {
509+ logger .info ("removing index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
510+ String newIndexName = migrationInfo .getNextIndexName ();
511+ baseClient .admin ().indices ().prepareDelete (newIndexName ).execute (ActionListener .wrap (ackedResponse -> {
512+ if (ackedResponse .isAcknowledged ()) {
513+ logger .info ("successfully removed index [{}]" , newIndexName );
514+ listener .onResponse (ackedResponse );
515+ } else {
516+ listener .onFailure (new ElasticsearchException ("Failed to acknowledge index deletion for [" + newIndexName + "]" ));
517+ }
518+ }, listener ::onFailure ));
519+ }
520+
530521 private void setAliasAndRemoveOldIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
531522 final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
532523 aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
0 commit comments