Skip to content

Commit f0e4e03

Browse files
authored
System Index Migration Failure Results in a Non-Recoverable State (#122326) (#122874)
This PR changes the code to no-longer rely on the persistent task state for the cleanup logic of existing indices. (cherry picked from commit 9076ac4)
1 parent b00a24f commit f0e4e03

File tree

3 files changed

+46
-51
lines changed

3 files changed

+46
-51
lines changed

docs/changelog/122326.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122326
2+
summary: System Index Migration Failure Results in a Non-Recoverable State
3+
area: Infra/Core
4+
type: bug
5+
issues: []

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,6 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
279279
});
280280
}
281281

282-
@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
283282
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
284283
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
285284
ensureGreen();

server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java

Lines changed: 41 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -206,49 +206,14 @@ public void run(SystemIndexMigrationTaskState taskState) {
206206
}
207207

208208
// Kick off our callback "loop" - finishIndexAndLoop calls back into prepareNextIndex
209-
cleanUpPreviousMigration(
210-
taskState,
211-
clusterState,
212-
state -> prepareNextIndex(state, state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName)
213-
);
214-
}
215-
216-
private void cleanUpPreviousMigration(
217-
SystemIndexMigrationTaskState taskState,
218-
ClusterState currentState,
219-
Consumer<ClusterState> listener
220-
) {
221209
logger.debug("cleaning up previous migration, task state: [{}]", taskState == null ? "null" : Strings.toString(taskState));
222-
if (taskState != null && taskState.getCurrentIndex() != null) {
223-
SystemIndexMigrationInfo migrationInfo;
224-
try {
225-
migrationInfo = SystemIndexMigrationInfo.fromTaskState(
226-
taskState,
227-
systemIndices,
228-
currentState.metadata(),
229-
indexScopedSettings
230-
);
231-
} catch (Exception e) {
232-
markAsFailed(e);
233-
return;
234-
}
235-
final String newIndexName = migrationInfo.getNextIndexName();
236-
logger.info("removing index [{}] from previous incomplete migration", newIndexName);
237-
238-
migrationInfo.createClient(baseClient)
239-
.admin()
240-
.indices()
241-
.prepareDelete(newIndexName)
242-
.execute(ActionListener.wrap(ackedResponse -> {
243-
if (ackedResponse.isAcknowledged()) {
244-
logger.debug("successfully removed index [{}]", newIndexName);
245-
clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
246-
}
247-
}, this::markAsFailed));
248-
} else {
249-
logger.debug("no incomplete index to remove");
250-
clearResults(clusterService, ActionListener.wrap(listener::accept, this::markAsFailed));
251-
}
210+
clearResults(
211+
clusterService,
212+
ActionListener.wrap(
213+
state -> prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), stateFeatureName),
214+
this::markAsFailed
215+
)
216+
);
252217
}
253218

254219
private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
@@ -288,11 +253,7 @@ private void finishIndexAndLoop(BulkByScrollResponse bulkResponse) {
288253
}, this::markAsFailed)
289254
);
290255
} else {
291-
prepareNextIndex(
292-
clusterService.state(),
293-
state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop),
294-
lastMigrationInfo.getFeatureName()
295-
);
256+
prepareNextIndex(state2 -> migrateSingleIndex(state2, this::finishIndexAndLoop), lastMigrationInfo.getFeatureName());
296257
}
297258
}
298259

@@ -302,7 +263,6 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
302263
SingleFeatureMigrationResult.success(),
303264
ActionListener.wrap(state -> {
304265
prepareNextIndex(
305-
state,
306266
clusterState -> migrateSingleIndex(clusterState, this::finishIndexAndLoop),
307267
lastMigrationInfo.getFeatureName()
308268
);
@@ -311,7 +271,7 @@ private void recordIndexMigrationSuccess(SystemIndexMigrationInfo lastMigrationI
311271
updateTask.submit(clusterService);
312272
}
313273

314-
private void prepareNextIndex(ClusterState clusterState, Consumer<ClusterState> listener, String lastFeatureName) {
274+
private void prepareNextIndex(Consumer<ClusterState> listener, String lastFeatureName) {
315275
synchronized (migrationQueue) {
316276
assert migrationQueue != null;
317277
if (migrationQueue.isEmpty()) {
@@ -423,7 +383,7 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
423383
logger.info("migrating index [{}] from feature [{}] to new index [{}]", oldIndexName, migrationInfo.getFeatureName(), newIndexName);
424384
ActionListener<BulkByScrollResponse> innerListener = ActionListener.wrap(listener::accept, this::markAsFailed);
425385
try {
426-
createIndex(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
386+
createIndexRetryOnFailure(migrationInfo, innerListener.delegateFailureAndWrap((delegate, shardsAcknowledgedResponse) -> {
427387
logger.debug(
428388
"while migrating [{}] , got create index response: [{}]",
429389
oldIndexName,
@@ -508,6 +468,8 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
508468
}
509469

510470
private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
471+
logger.info("creating new system index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
472+
511473
final CreateIndexClusterStateUpdateRequest createRequest = new CreateIndexClusterStateUpdateRequest(
512474
"migrate-system-index",
513475
migrationInfo.getNextIndexName(),
@@ -527,6 +489,35 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
527489
metadataCreateIndexService.createIndex(TimeValue.MINUS_ONE, TimeValue.ZERO, null, createRequest, listener);
528490
}
529491

492+
private void createIndexRetryOnFailure(SystemIndexMigrationInfo migrationInfo, ActionListener<ShardsAcknowledgedResponse> listener) {
493+
createIndex(migrationInfo, listener.delegateResponse((l, e) -> {
494+
logger.warn("createIndex failed, retrying after removing index [{}] from previous attempt", migrationInfo.getNextIndexName());
495+
deleteIndex(migrationInfo, ActionListener.wrap(cleanupResponse -> createIndex(migrationInfo, l.delegateResponse((l3, e3) -> {
496+
logger.error(
497+
"createIndex failed after retrying, aborting system index migration. index: " + migrationInfo.getNextIndexName(),
498+
e3
499+
);
500+
l.onFailure(e3);
501+
})), e2 -> {
502+
logger.error("deleteIndex failed, aborting system index migration. index: " + migrationInfo.getNextIndexName(), e2);
503+
l.onFailure(e2);
504+
}));
505+
}));
506+
}
507+
508+
private <T> void deleteIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<AcknowledgedResponse> listener) {
509+
logger.info("removing index [{}] from feature [{}]", migrationInfo.getNextIndexName(), migrationInfo.getFeatureName());
510+
String newIndexName = migrationInfo.getNextIndexName();
511+
baseClient.admin().indices().prepareDelete(newIndexName).execute(ActionListener.wrap(ackedResponse -> {
512+
if (ackedResponse.isAcknowledged()) {
513+
logger.info("successfully removed index [{}]", newIndexName);
514+
listener.onResponse(ackedResponse);
515+
} else {
516+
listener.onFailure(new ElasticsearchException("Failed to acknowledge index deletion for [" + newIndexName + "]"));
517+
}
518+
}, listener::onFailure));
519+
}
520+
530521
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
531522
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
532523
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());

0 commit comments

Comments
 (0)