- 
                Notifications
    You must be signed in to change notification settings 
- Fork 25.6k
System Index Migration Failure Results in a Non-Recoverable State #122326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
6fa041c
              abf0f97
              aa2b112
              98cabbd
              ed44d0d
              b3faf74
              b1d8538
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 122326 | ||
| summary: System Index Migration Failure Results in a Non-Recoverable State | ||
| area: Infra/Core | ||
| type: bug | ||
| issues: [] | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -207,49 +207,14 @@ public void run(SystemIndexMigrationTaskState taskState) { | |
| } | ||
|  | ||
| // Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex | ||
| cleanUpPreviousMigration( | ||
| taskState, | ||
| clusterState, | ||
| state -> prepareNextIndex(state, state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName) | ||
| ); | ||
| } | ||
|  | ||
| private void cleanUpPreviousMigration( | ||
| SystemIndexMigrationTaskState taskState, | ||
| ClusterState currentState, | ||
| Consumer<ClusterState> listener | ||
| ) { | ||
| logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState)); | ||
| if (taskState != null && taskState.getCurrentIndex() != null) { | ||
| SystemIndexMigrationInfo migrationInfo; | ||
| try { | ||
| migrationInfo = SystemIndexMigrationInfo.fromTaskState( | ||
| taskState, | ||
| systemIndices, | ||
| currentState.metadata(), | ||
| indexScopedSettings | ||
| ); | ||
| } catch (Exception e) { | ||
| markAsFailed(e); | ||
| return; | ||
| } | ||
| final String newIndexName = migrationInfo.getNextIndexName(); | ||
| logger.info("removing index [{}] from previous incomplete migration", newIndexName); | ||
|  | ||
| migrationInfo.createClient(baseClient) | ||
| .admin() | ||
| .indices() | ||
| .prepareDelete(newIndexName) | ||
| .execute(ActionListener.wrap(ackedResponse -> { | ||
| if (ackedResponse.isAcknowledged()) { | ||
| logger.debug("successfully removed index [{}]", newIndexName); | ||
| clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed)); | ||
| } | ||
| }, this::markAsFailed)); | ||
| } else { | ||
| logger.debug("no incomplete index to remove"); | ||
| clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed)); | ||
| } | ||
| clearResults( | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The purpose of It may be possible that there are other failure scenarios that leave the teask sate (e.g. a node getting abruptly killed). So I left this herere for now untill I can reasont through this with @gwbrown when she's back. | ||
| clusterService, | ||
| ActionListener.wrap( | ||
| state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName), | ||
| this::markAsFailed | ||
| ) | ||
| ); | ||
| } | ||
|  | ||
| private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { | ||
|  | @@ -289,11 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) { | |
| }, this::markAsFailed) | ||
| ); | ||
| } else { | ||
| prepareNextIndex( | ||
| clusterService.state(), | ||
| state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), | ||
| lastMigrationInfo.getFeatureName() | ||
| ); | ||
| prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName()); | ||
| } | ||
| } | ||
|  | ||
|  | @@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI | |
| SingleFeatureMigrationResult.success(), | ||
| ActionListener.wrap(state -> { | ||
| prepareNextIndex( | ||
| state, | ||
| clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop), | ||
| lastMigrationInfo.getFeatureName() | ||
| ); | ||
|  | @@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI | |
| updateTask.submit(clusterService); | ||
| } | ||
|  | ||
| private void prepareNextIndex(ClusterState clusterState, Consumer<ClusterState> listener, String lastFeatureName) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The clusterState was never used, so I removed this extraneous param. | ||
| private void prepareNextIndex(Consumer<ClusterState> listener, String lastFeatureName) { | ||
| synchronized (migrationQueue) { | ||
| assert migrationQueue != null; | ||
| if (migrationQueue.isEmpty()) { | ||
|  | @@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll | |
| logger.info("migrating index [{}] from feature [{}] to new index [{}]", oldIndexName, migrationInfo.getFeatureName(), newIndexName); | ||
| ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed); | ||
| try { | ||
| createIndex(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> { | ||
| createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> { | ||
| logger.debug( | ||
| "while migrating [{}] , got create index response: [{}]", | ||
| oldIndexName, | ||
|  | @@ -509,6 +469,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll | |
| } | ||
|  | ||
| private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) { | ||
| logger.info("creating new system index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName()); | ||
|  | ||
| final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest( | ||
| "migrate-system-index", | ||
| migrationInfo.getNextIndexName(), | ||
|  | @@ -534,6 +496,36 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener< | |
| ); | ||
| } | ||
|  | ||
| private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) { | ||
| createIndex(migrationInfo, listener.delegateResponse((l, e) -> { | ||
| logger.warn("createIndex failed, retrying after removing index [{}] from previous attempt", migrationInfo.getNextIndexName()); | ||
| deleteIndex(migrationInfo, ActionListener.wrap(cleanupResponse -> createIndex(migrationInfo, l.delegateResponse((l3, e3) -> { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, we try to delete the index and retry if there is any error with the prior create call. I could instead only catch the specific "resource already exists" exception. However, I'd rather be more broad here in case there are other valid exceptions that get thrown, where deleting the existing resource is the right thing to do. Otherwise, we may end up in a non-recoverable state as a result of some scenario we haven't predicted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree that it is better to be more broad here | ||
| logger.error( | ||
| "createIndex failed after retrying, aborting; index [{}] will be left in an inconsistent state", | ||
|          | ||
| migrationInfo.getNextIndexName(), | ||
| e3 | ||
| ); | ||
| l.onFailure(e3); | ||
| })), e2 -> { | ||
| logger.error("deleteIndex failed after retrying, aborting", e2); | ||
| l.onFailure(e2); | ||
| })); | ||
| })); | ||
| } | ||
|  | ||
| private <T> void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) { | ||
| logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName()); | ||
| String newIndexName = migrationInfo.getNextIndexName(); | ||
| baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is identical to the prior  | ||
| if (ackedResponse.isAcknowledged()) { | ||
| logger.info("successfully removed index [{}]", newIndexName); | ||
| listener.onResponse(ackedResponse); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is identical to the prior cleanUpPreviousMigration code except that I don't call  | ||
| } else { | ||
| listener.onFailure(new ElasticsearchException("Failed to acknowledge index deletion for [" + newIndexName + "]")); | ||
| } | ||
| }, listener::onFailure)); | ||
| } | ||
|  | ||
| private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) { | ||
| final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases(); | ||
| aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName()); | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The taskState is always null, so the cleanup logic is never invoked here.