Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122326.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122326
summary: System Index Migration Failure Results in a Non-Recoverable State
area: Infra/Core
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
});
}

@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,49 +207,14 @@ public void run(SystemIndexMigrationTaskState taskState) {
}

// Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
cleanUpPreviousMigration(
taskState,
clusterState,
state -> prepareNextIndex(state, state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName)
);
}

private void cleanUpPreviousMigration(
SystemIndexMigrationTaskState taskState,
ClusterState currentState,
Consumer<ClusterState> listener
) {
logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState));
if (taskState != null && taskState.getCurrentIndex() != null) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The taskState is always null, so the cleanup logic is never invoked here.

SystemIndexMigrationInfo migrationInfo;
try {
migrationInfo = SystemIndexMigrationInfo.fromTaskState(
taskState,
systemIndices,
currentState.metadata(),
indexScopedSettings
);
} catch (Exception e) {
markAsFailed(e);
return;
}
final String newIndexName = migrationInfo.getNextIndexName();
logger.info("removing index [{}] from previous incomplete migration", newIndexName);

migrationInfo.createClient(baseClient)
.admin()
.indices()
.prepareDelete(newIndexName)
.execute(ActionListener.wrap(ackedResponse -> {
if (ackedResponse.isAcknowledged()) {
logger.debug("successfully removed index [{}]", newIndexName);
clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
}
}, this::markAsFailed));
} else {
logger.debug("no incomplete index to remove");
clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
}
clearResults(
Copy link
Contributor Author

@JVerwolf JVerwolf Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose ofclearResults is to remove the Persistent Task State leftover from prior runs so that we begin the migration with fresh start. I'm not sure if we ever even store the persistent task state in practice? The call to super.markAsFailed(e); in markAsFailed ultimately removes the task state, though perhaps this wasn't the original intention.

It may be possible that there are other failure scenarios that leave the teask sate (e.g. a node getting abruptly killed). So I left this herere for now untill I can reasont through this with @gwbrown when she's back.

clusterService,
ActionListener.wrap(
state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName),
this::markAsFailed
)
);
}

private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
Expand Down Expand Up @@ -289,11 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
}, this::markAsFailed)
);
} else {
prepareNextIndex(
clusterService.state(),
state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop),
lastMigrationInfo.getFeatureName()
);
prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName());
}
}

Expand All @@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
SingleFeatureMigrationResult.success(),
ActionListener.wrap(state -> {
prepareNextIndex(
state,
clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop),
lastMigrationInfo.getFeatureName()
);
Expand All @@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
updateTask.submit(clusterService);
}

private void prepareNextIndex(ClusterState clusterState, Consumer<ClusterState> listener, String lastFeatureName) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clusterState was never used, so I removed this extraneous param.

private void prepareNextIndex(Consumer<ClusterState> listener, String lastFeatureName) {
synchronized (migrationQueue) {
assert migrationQueue != null;
if (migrationQueue.isEmpty()) {
Expand Down Expand Up @@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
logger.info("migrating index [{}] from feature [{}] to new index [{}]", oldIndexName, migrationInfo.getFeatureName(), newIndexName);
ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
try {
createIndex(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
logger.debug(
"while migrating [{}] , got create index response: [{}]",
oldIndexName,
Expand Down Expand Up @@ -509,6 +469,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
}

private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
logger.info("creating new system index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());

final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest(
"migrate-system-index",
migrationInfo.getNextIndexName(),
Expand All @@ -534,6 +496,35 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
);
}

private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
createIndex(migrationInfo, listener.delegateResponse((l, e) -> {
logger.warn("createIndex failed, retrying after removing index [{}] from previous attempt", migrationInfo.getNextIndexName());
deleteIndex(migrationInfo, ActionListener.wrap(cleanupResponse -> createIndex(migrationInfo, l.delegateResponse((l3, e3) -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we try to delete the index and retry if there is any error with the prior create call. I could instead only catch the specific "resource already exists" exception. However, I'd rather be more broad here in case there are other valid exceptions that get thrown, where deleting the existing resource is the right thing to do. Otherwise, we may end up in a non-recoverable state as a result of some scenario we haven't predicted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that it is better to be more broad here

logger.error(
"createIndex failed after retrying, aborting system index migration. index: " + migrationInfo.getNextIndexName(),
e3
);
l.onFailure(e3);
})), e2 -> {
logger.error("deleteIndex failed, aborting system index migration. index: " + migrationInfo.getNextIndexName(), e2);
l.onFailure(e2);
}));
}));
}

private <T> void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) {
logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
String newIndexName = migrationInfo.getNextIndexName();
baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is identical to the prior cleanUpPreviousMigration code except for that I'm using a baseClient here since I don't have access to the migrationInfo needed for migrationInfo.createClient(baseClient). This previous call produced an Origin Setting Client. I don't think we need that here, though I'm not 100% on this.

if (ackedResponse.isAcknowledged()) {
logger.info("successfully removed index [{}]", newIndexName);
listener.onResponse(ackedResponse);
Copy link
Contributor Author

@JVerwolf JVerwolf Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is identical to the prior cleanUpPreviousMigration code except that I don't call clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed)); here, as I've already cleaned up at the beginning of the migration, and I don't want to remove in-progress state since this now happens once the migration is already in progress.

} else {
listener.onFailure(new ElasticsearchException("Failed to acknowledge index deletion for [" + newIndexName + "]"));
}
}, listener::onFailure));
}

private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
Expand Down