- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
System Index Migration Failure Results in a Non-Recoverable State #122326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
6fa041c
              abf0f97
              aa2b112
              98cabbd
              ed44d0d
              b3faf74
              b1d8538
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 122326 | ||
| summary: System Index Migration Failure Results in a Non-Recoverable State | ||
| area: Infra/Core | ||
| type: bug | ||
| issues: [] | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -207,49 +207,14 @@ public void run(SystemIndexMigrationTaskState taskState) { | |
| } | ||
|  | ||
| // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex | ||
| cleanUpPreviousMigration( | ||
| taskState, | ||
| clusterState, | ||
| state -> prepareNextIndex(state, state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName) | ||
| ); | ||
| } | ||
|  | ||
| private void cleanUpPreviousMigration( | ||
| SystemIndexMigrationTaskState taskState, | ||
| ClusterState currentState, | ||
| Consumer<ClusterState> listener | ||
| ) { | ||
| logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState)); | ||
| if (taskState != null && taskState.getCurrentIndex() != null) { | ||
| SystemIndexMigrationInfo migrationInfo; | ||
| try { | ||
| migrationInfo = SystemIndexMigrationInfo.fromTaskState( | ||
| taskState, | ||
| systemIndices, | ||
| currentState.metadata(), | ||
| indexScopedSettings | ||
| ); | ||
| } catch (Exception e) { | ||
| markAsFailed(e); | ||
| return; | ||
| } | ||
| final String newIndexName = migrationInfo.getNextIndexName(); | ||
| logger.info("removing index [{}] from previous incomplete migration", newIndexName); | ||
|  | ||
| migrationInfo.createClient(baseClient) | ||
| .admin() | ||
| .indices() | ||
| .prepareDelete(newIndexName) | ||
| .execute(ActionListener.wrap(ackedResponse -> { | ||
| if (ackedResponse.isAcknowledged()) { | ||
| logger.debug("successfully removed index [{}]", newIndexName); | ||
| clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed)); | ||
| } | ||
| }, this::markAsFailed)); | ||
| } else { | ||
| logger.debug("no incomplete index to remove"); | ||
| clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed)); | ||
| } | ||
| clearResults( | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The purpose of It may be possible that there are other failure scenarios that leave the teask sate (e.g. a node getting abruptly killed). So I left this herere for now untill I can reasont through this with @gwbrown when she's back. | ||
| clusterService, | ||
| ActionListener.wrap( | ||
| state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName), | ||
| this::markAsFailed | ||
| ) | ||
| ); | ||
| } | ||
|  | ||
| private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { | ||
|  | @@ -289,11 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { | |
| }, this::markAsFailed) | ||
| ); | ||
| } else { | ||
| prepareNextIndex( | ||
| clusterService.state(), | ||
| state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), | ||
| lastMigrationInfo.getFeatureName() | ||
| ); | ||
| prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName()); | ||
| } | ||
| } | ||
|  | ||
|  | @@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI | |
| SingleFeatureMigrationResult.success(), | ||
| ActionListener.wrap(state -> { | ||
| prepareNextIndex( | ||
| state, | ||
| clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop), | ||
| lastMigrationInfo.getFeatureName() | ||
| ); | ||
|  | @@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI | |
| updateTask.submit(clusterService); | ||
| } | ||
|  | ||
| private void prepareNextIndex(ClusterState clusterState, Consumer<ClusterState> listener, String lastFeatureName) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The clusterState was never used, so I removed this extraneous param. | ||
| private void prepareNextIndex(Consumer<ClusterState> listener, String lastFeatureName) { | ||
| synchronized (migrationQueue) { | ||
| assert migrationQueue != null; | ||
| if (migrationQueue.isEmpty()) { | ||
|  | @@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll | |
| logger.info("migrating index [{}] from feature [{}] to new index [{}]", oldIndexName, migrationInfo.getFeatureName(), newIndexName); | ||
| ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed); | ||
| try { | ||
| createIndex(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> { | ||
| createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> { | ||
| logger.debug( | ||
| "while migrating [{}] , got create index response: [{}]", | ||
| oldIndexName, | ||
|  | @@ -509,6 +469,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll | |
| } | ||
|  | ||
| private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) { | ||
| logger.info("creating new system index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName()); | ||
|  | ||
| final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest( | ||
| "migrate-system-index", | ||
| migrationInfo.getNextIndexName(), | ||
|  | @@ -534,6 +496,29 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener< | |
| ); | ||
| } | ||
|  | ||
| private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) { | ||
| createIndex(migrationInfo, ActionListener.wrap(listener::onResponse, e -> { | ||
| logger.warn("createIndex failed, retrying after removing index [{}] from previous attempt", migrationInfo.getNextIndexName()); | ||
| deleteIndex(migrationInfo, ActionListener.wrap(cleanupResponse -> createIndex(migrationInfo, listener), e2 -> { | ||
|         
                  alexey-ivanov-es marked this conversation as resolved.
              Outdated
          
            Show resolved
            Hide resolved | ||
| logger.warn("createIndex failed after retrying, aborting", e2); | ||
|          | ||
| listener.onFailure(e2); | ||
| })); | ||
| })); | ||
| } | ||
|  | ||
| private <T> void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) { | ||
| logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName()); | ||
| String newIndexName = migrationInfo.getNextIndexName(); | ||
| baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is identical to the prior  | ||
| if (ackedResponse.isAcknowledged()) { | ||
| logger.info("successfully removed index [{}]", newIndexName); | ||
| listener.onResponse(ackedResponse); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is identical to the prior cleanUpPreviousMigration code except that I don't call  | ||
| } else { | ||
| listener.onFailure(new ElasticsearchException("Failed to acknowledge index deletion for [" + newIndexName + "]")); | ||
| } | ||
| }, listener::onFailure)); | ||
| } | ||
|  | ||
| private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) { | ||
| final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases(); | ||
| aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName()); | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The taskState is always null, so the cleanup logic is never invoked here.