@@ -207,49 +207,14 @@ public void run(SystemIndexMigrationTaskState taskState) {
207207 }
208208
209209 // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
210- cleanUpPreviousMigration (
211- taskState ,
212- clusterState ,
213- state -> prepareNextIndex (state , state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName )
214- );
215- }
216-
217- private void cleanUpPreviousMigration (
218- SystemIndexMigrationTaskState taskState ,
219- ClusterState currentState ,
220- Consumer <ClusterState > listener
221- ) {
222210 logger .debug ("cleaning up previous migration, task state: [{}]" , taskState == null ? "null" : Strings .toString (taskState ));
223- if (taskState != null && taskState .getCurrentIndex () != null ) {
224- SystemIndexMigrationInfo migrationInfo ;
225- try {
226- migrationInfo = SystemIndexMigrationInfo .fromTaskState (
227- taskState ,
228- systemIndices ,
229- currentState .metadata (),
230- indexScopedSettings
231- );
232- } catch (Exception e ) {
233- markAsFailed (e );
234- return ;
235- }
236- final String newIndexName = migrationInfo .getNextIndexName ();
237- logger .info ("removing index [{}] from previous incomplete migration" , newIndexName );
238-
239- migrationInfo .createClient (baseClient )
240- .admin ()
241- .indices ()
242- .prepareDelete (newIndexName )
243- .execute (ActionListener .wrap (ackedResponse -> {
244- if (ackedResponse .isAcknowledged ()) {
245- logger .debug ("successfully removed index [{}]" , newIndexName );
246- clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
247- }
248- }, this ::markAsFailed ));
249- } else {
250- logger .debug ("no incomplete index to remove" );
251- clearResults (clusterService , ActionListener .wrap (listener ::accept , this ::markAsFailed ));
252- }
211+ clearResults (
212+ clusterService ,
213+ ActionListener .wrap (
214+ state -> prepareNextIndex (state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), stateFeatureName ),
215+ this ::markAsFailed
216+ )
217+ );
253218 }
254219
255220 private void finishIndexAndLoop (BulkByScrollResponse bulkResponse ) {
@@ -289,11 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
289254 }, this ::markAsFailed )
290255 );
291256 } else {
292- prepareNextIndex (
293- clusterService .state (),
294- state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ),
295- lastMigrationInfo .getFeatureName ()
296- );
257+ prepareNextIndex (state2 -> migrateSingleIndex (state2 , this ::finishIndexAndLoop ), lastMigrationInfo .getFeatureName ());
297258 }
298259 }
299260
@@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
303264 SingleFeatureMigrationResult .success (),
304265 ActionListener .wrap (state -> {
305266 prepareNextIndex (
306- state ,
307267 clusterState -> migrateSingleIndex (clusterState , this ::finishIndexAndLoop ),
308268 lastMigrationInfo .getFeatureName ()
309269 );
@@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
312272 updateTask .submit (clusterService );
313273 }
314274
315- private void prepareNextIndex (ClusterState clusterState , Consumer <ClusterState > listener , String lastFeatureName ) {
275+ private void prepareNextIndex (Consumer <ClusterState > listener , String lastFeatureName ) {
316276 synchronized (migrationQueue ) {
317277 assert migrationQueue != null ;
318278 if (migrationQueue .isEmpty ()) {
@@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
424384 logger .info ("migrating index [{}] from feature [{}] to new index [{}]" , oldIndexName , migrationInfo .getFeatureName (), newIndexName );
425385 ActionListener <BulkByScrollResponse > innerListener = ActionListener .wrap (listener ::accept , this ::markAsFailed );
426386 try {
427- createIndex (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
387+ createIndexRetryOnFailure (migrationInfo , innerListener .delegateFailureAndWrap ((delegate , shardsAcknowledgedResponse ) -> {
428388 logger .debug (
429389 "while migrating [{}] , got create index response: [{}]" ,
430390 oldIndexName ,
@@ -509,6 +469,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
509469 }
510470
511471 private void createIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
472+ logger .info ("creating new system index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
473+
512474 final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest (
513475 "migrate-system-index" ,
514476 migrationInfo .getNextIndexName (),
@@ -534,6 +496,35 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
534496 );
535497 }
536498
499+ private void createIndexRetryOnFailure (SystemIndexMigrationInfo migrationInfo , ActionListener <ShardsAcknowledgedResponse > listener ) {
500+ createIndex (migrationInfo , listener .delegateResponse ((l , e ) -> {
501+ logger .warn ("createIndex failed, retrying after removing index [{}] from previous attempt" , migrationInfo .getNextIndexName ());
502+ deleteIndex (migrationInfo , ActionListener .wrap (cleanupResponse -> createIndex (migrationInfo , l .delegateResponse ((l3 , e3 ) -> {
503+ logger .error (
504+ "createIndex failed after retrying, aborting system index migration. index: " + migrationInfo .getNextIndexName (),
505+ e3
506+ );
507+ l .onFailure (e3 );
508+ })), e2 -> {
509+ logger .error ("deleteIndex failed, aborting system index migration. index: " + migrationInfo .getNextIndexName (), e2 );
510+ l .onFailure (e2 );
511+ }));
512+ }));
513+ }
514+
515+ private <T > void deleteIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <AcknowledgedResponse > listener ) {
516+ logger .info ("removing index [{}] from feature [{}]" , migrationInfo .getNextIndexName (), migrationInfo .getFeatureName ());
517+ String newIndexName = migrationInfo .getNextIndexName ();
518+ baseClient .admin ().indices ().prepareDelete (newIndexName ).execute (ActionListener .wrap (ackedResponse -> {
519+ if (ackedResponse .isAcknowledged ()) {
520+ logger .info ("successfully removed index [{}]" , newIndexName );
521+ listener .onResponse (ackedResponse );
522+ } else {
523+ listener .onFailure (new ElasticsearchException ("Failed to acknowledge index deletion for [" + newIndexName + "]" ));
524+ }
525+ }, listener ::onFailure ));
526+ }
527+
537528 private void setAliasAndRemoveOldIndex (SystemIndexMigrationInfo migrationInfo , ActionListener <IndicesAliasesResponse > listener ) {
538529 final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo .createClient (baseClient ).admin ().indices ().prepareAliases ();
539530 aliasesRequest .removeIndex (migrationInfo .getCurrentIndexName ());
0 commit comments