Skip to content

Commit c1f8c64

Browse files
authored
Fix race condition when storing Feature migration callback metadata (#80049)
This PR fixes a race condition where the callback metadata was stored in a callback which probably won't run right away, and just immediately proceeds, meaning we move on before the callback's been called. The existing tests didn't catch this because all the test callbacks are actually synchronous. I'd like to add a test that catches this, but doing so may be complex and I'd like to get the fix in sooner rather than later.
1 parent 8b5b762 commit c1f8c64

File tree

1 file changed

+11
-10
lines changed

1 file changed

+11
-10
lines changed

server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -331,29 +331,30 @@ private void prepareNextIndex(ClusterState clusterState, Consumer<ClusterState>
331331
migrationInfo.getFeatureName(),
332332
migrationInfo.getNextIndexName()
333333
);
334-
final AtomicReference<Map<String, Object>> updatedTaskStateFeatureMetadata = new AtomicReference<>();
335334
if (migrationInfo.getFeatureName().equals(lastFeatureName) == false) {
336335
// And then invoke the pre-migration hook for the next one.
337-
migrationInfo.prepareForIndicesMigration(
338-
clusterService,
339-
baseClient,
340-
ActionListener.wrap(updatedTaskStateFeatureMetadata::set, this::markAsFailed)
341-
);
336+
migrationInfo.prepareForIndicesMigration(clusterService, baseClient, ActionListener.wrap(newMetadata -> {
337+
currentFeatureCallbackMetadata.set(newMetadata);
338+
updateTaskState(migrationInfo, listener, newMetadata);
339+
}, this::markAsFailed));
342340
} else {
343341
// Otherwise, just re-use what we already have.
344-
updatedTaskStateFeatureMetadata.set(currentFeatureCallbackMetadata.get());
342+
updateTaskState(migrationInfo, listener, currentFeatureCallbackMetadata.get());
345343
}
344+
}
345+
346+
private void updateTaskState(SystemIndexMigrationInfo migrationInfo, Consumer<ClusterState> listener, Map<String, Object> metadata) {
346347
final SystemIndexMigrationTaskState newTaskState = new SystemIndexMigrationTaskState(
347348
migrationInfo.getCurrentIndexName(),
348349
migrationInfo.getFeatureName(),
349-
updatedTaskStateFeatureMetadata.get()
350+
metadata
350351
);
351352
logger.debug("updating task state to [{}]", Strings.toString(newTaskState));
352-
currentFeatureCallbackMetadata.set(updatedTaskStateFeatureMetadata.get());
353+
currentFeatureCallbackMetadata.set(metadata);
353354
updatePersistentTaskState(newTaskState, ActionListener.wrap(task -> {
354355
assert newTaskState.equals(task.getState()) : "task state returned by update method did not match submitted task state";
355356
logger.debug("new task state [{}] accepted", Strings.toString(newTaskState));
356-
listener.accept(clusterState);
357+
listener.accept(clusterService.state());
357358
}, this::markAsFailed));
358359
}
359360

0 commit comments

Comments
 (0)