diff --git a/docs/changelog/122326.yaml b/docs/changelog/122326.yaml new file mode 100644 index 0000000000000..91c71041d58fc --- /dev/null +++ b/docs/changelog/122326.yaml @@ -0,0 +1,5 @@ +pr: 122326 +summary: System Index Migration Failure Results in a Non-Recoverable State +area: Infra/Core +type: bug +issues: [] diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java index ee95ce5513820..efca437d14eb4 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java @@ -277,7 +277,6 @@ public void testMigrateIndexWithWriteBlock() throws Exception { }); } - @AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception { createSystemIndexForDescriptor(INTERNAL_UNMANAGED); ensureGreen(); diff --git a/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java b/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java index cdd466c567e8b..9947606470178 100644 --- a/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java +++ b/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java @@ -207,49 +207,14 @@ public void run(SystemIndexMigrationTaskState taskState) { } // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex - cleanUpPreviousMigration( - taskState, - clusterState, - state -> prepareNextIndex(state, state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName) - ); - } - - private void cleanUpPreviousMigration( - SystemIndexMigrationTaskState taskState, - ClusterState currentState, - Consumer listener - ) { logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState)); - if (taskState != null && taskState.getCurrentIndex() != null) { - SystemIndexMigrationInfo migrationInfo; - try { - migrationInfo = SystemIndexMigrationInfo.fromTaskState( - taskState, - systemIndices, - currentState.metadata(), - indexScopedSettings - ); - } catch (Exception e) { - markAsFailed(e); - return; - } - final String newIndexName = migrationInfo.getNextIndexName(); - logger.info("removing index [{}] from previous incomplete migration", newIndexName); - - migrationInfo.createClient(baseClient) - .admin() - .indices() - .prepareDelete(newIndexName) - .execute(ActionListener.wrap(ackedResponse -> { - if (ackedResponse.isAcknowledged()) { - logger.debug("successfully removed index [{}]", newIndexName); - clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed)); - } - }, this::markAsFailed)); - } else { - logger.debug("no incomplete index to remove"); - clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed)); - } + clearResults( + clusterService, + ActionListener.wrap( + state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName), + this::markAsFailed + ) + ); } private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { @@ -289,11 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { }, this::markAsFailed) ); } else { - prepareNextIndex( - clusterService.state(), - state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), - lastMigrationInfo.getFeatureName() - ); + prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName()); } } @@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI SingleFeatureMigrationResult.success(), ActionListener.wrap(state -> { prepareNextIndex( - state, clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName() ); @@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI updateTask.submit(clusterService); } - private void prepareNextIndex(ClusterState clusterState, Consumer listener, String lastFeatureName) { + private void prepareNextIndex(Consumer listener, String lastFeatureName) { synchronized (migrationQueue) { assert migrationQueue != null; if (migrationQueue.isEmpty()) { @@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer innerListener = ActionListener.wrap(listener::accept, this::markAsFailed); try { - createIndex(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> { + createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> { logger.debug( "while migrating [{}] , got create index response: [{}]", oldIndexName, @@ -509,6 +469,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer listener) { + logger.info("creating new system index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName()); + final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest( "migrate-system-index", migrationInfo.getNextIndexName(), @@ -534,6 +496,35 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener< ); } + private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { + createIndex(migrationInfo, listener.delegateResponse((l, e) -> { + logger.warn("createIndex failed, retrying after removing index [{}] from previous attempt", migrationInfo.getNextIndexName()); + deleteIndex(migrationInfo, ActionListener.wrap(cleanupResponse -> createIndex(migrationInfo, l.delegateResponse((l3, e3) -> { + logger.error( + "createIndex failed after retrying, aborting system index migration. index: " + migrationInfo.getNextIndexName(), + e3 + ); + l.onFailure(e3); + })), e2 -> { + logger.error("deleteIndex failed, aborting system index migration. index: " + migrationInfo.getNextIndexName(), e2); + l.onFailure(e2); + })); + })); + } + + private void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { + logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName()); + String newIndexName = migrationInfo.getNextIndexName(); + baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> { + if (ackedResponse.isAcknowledged()) { + logger.info("successfully removed index [{}]", newIndexName); + listener.onResponse(ackedResponse); + } else { + listener.onFailure(new ElasticsearchException("Failed to acknowledge index deletion for [" + newIndexName + "]")); + } + }, listener::onFailure)); + } + private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases(); aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());