Skip to content

Commit 9076ac4

Browse files
authored
System Index Migration Failure Results in a Non-Recoverable State (#122326)
This PR changes the code to no-longer rely on the persistent task state for the cleanup logic of existing indices.
1 parent 482a49f commit 9076ac4

File tree

3 files changed

+46
-51
lines changed

3 files changed

+46
-51
lines changed

docs/changelog/122326.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122326
2+
summary: System Index Migration Failure Results in a Non-Recoverable State
3+
area: Infra/Core
4+
type: bug
5+
issues: []

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,6 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
277277
});
278278
}
279279

280-
@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
281280
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
282281
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
283282
ensureGreen();

server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java

Lines changed: 41 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -207,49 +207,14 @@ public void run(SystemIndexMigrationTaskState taskState) {
207207
}
208208

209209
// Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
210-
cleanUpPreviousMigration(
211-
taskState,
212-
clusterState,
213-
state -> prepareNextIndex(state, state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName)
214-
);
215-
}
216-
217-
private void cleanUpPreviousMigration(
218-
SystemIndexMigrationTaskState taskState,
219-
ClusterState currentState,
220-
Consumer<ClusterState> listener
221-
) {
222210
logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState));
223-
if (taskState != null && taskState.getCurrentIndex() != null) {
224-
SystemIndexMigrationInfo migrationInfo;
225-
try {
226-
migrationInfo = SystemIndexMigrationInfo.fromTaskState(
227-
taskState,
228-
systemIndices,
229-
currentState.metadata(),
230-
indexScopedSettings
231-
);
232-
} catch (Exception e) {
233-
markAsFailed(e);
234-
return;
235-
}
236-
final String newIndexName = migrationInfo.getNextIndexName();
237-
logger.info("removing index [{}] from previous incomplete migration", newIndexName);
238-
239-
migrationInfo.createClient(baseClient)
240-
.admin()
241-
.indices()
242-
.prepareDelete(newIndexName)
243-
.execute(ActionListener.wrap(ackedResponse -> {
244-
if (ackedResponse.isAcknowledged()) {
245-
logger.debug("successfully removed index [{}]", newIndexName);
246-
clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
247-
}
248-
}, this::markAsFailed));
249-
} else {
250-
logger.debug("no incomplete index to remove");
251-
clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
252-
}
211+
clearResults(
212+
clusterService,
213+
ActionListener.wrap(
214+
state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName),
215+
this::markAsFailed
216+
)
217+
);
253218
}
254219

255220
private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
@@ -289,11 +254,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
289254
}, this::markAsFailed)
290255
);
291256
} else {
292-
prepareNextIndex(
293-
clusterService.state(),
294-
state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop),
295-
lastMigrationInfo.getFeatureName()
296-
);
257+
prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName());
297258
}
298259
}
299260

@@ -303,7 +264,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
303264
SingleFeatureMigrationResult.success(),
304265
ActionListener.wrap(state -> {
305266
prepareNextIndex(
306-
state,
307267
clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop),
308268
lastMigrationInfo.getFeatureName()
309269
);
@@ -312,7 +272,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
312272
updateTask.submit(clusterService);
313273
}
314274

315-
private void prepareNextIndex(ClusterState clusterState, Consumer<ClusterState> listener, String lastFeatureName) {
275+
private void prepareNextIndex(Consumer<ClusterState> listener, String lastFeatureName) {
316276
synchronized (migrationQueue) {
317277
assert migrationQueue != null;
318278
if (migrationQueue.isEmpty()) {
@@ -424,7 +384,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
424384
logger.info("migrating index [{}] from feature [{}] to new index [{}]", oldIndexName, migrationInfo.getFeatureName(), newIndexName);
425385
ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
426386
try {
427-
createIndex(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
387+
createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
428388
logger.debug(
429389
"while migrating [{}] , got create index response: [{}]",
430390
oldIndexName,
@@ -509,6 +469,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
509469
}
510470

511471
private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
472+
logger.info("creating new system index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
473+
512474
final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest(
513475
"migrate-system-index",
514476
migrationInfo.getNextIndexName(),
@@ -534,6 +496,35 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
534496
);
535497
}
536498

499+
private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
500+
createIndex(migrationInfo, listener.delegateResponse((l, e) -> {
501+
logger.warn("createIndex failed, retrying after removing index [{}] from previous attempt", migrationInfo.getNextIndexName());
502+
deleteIndex(migrationInfo, ActionListener.wrap(cleanupResponse -> createIndex(migrationInfo, l.delegateResponse((l3, e3) -> {
503+
logger.error(
504+
"createIndex failed after retrying, aborting system index migration. index: " + migrationInfo.getNextIndexName(),
505+
e3
506+
);
507+
l.onFailure(e3);
508+
})), e2 -> {
509+
logger.error("deleteIndex failed, aborting system index migration. index: " + migrationInfo.getNextIndexName(), e2);
510+
l.onFailure(e2);
511+
}));
512+
}));
513+
}
514+
515+
private <T> void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) {
516+
logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
517+
String newIndexName = migrationInfo.getNextIndexName();
518+
baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> {
519+
if (ackedResponse.isAcknowledged()) {
520+
logger.info("successfully removed index [{}]", newIndexName);
521+
listener.onResponse(ackedResponse);
522+
} else {
523+
listener.onFailure(new ElasticsearchException("Failed to acknowledge index deletion for [" + newIndexName + "]"));
524+
}
525+
}, listener::onFailure));
526+
}
527+
537528
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
538529
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
539530
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());

0 commit comments

Comments
 (0)