Skip to content

Commit e27a1ec

Browse files
authored
Revert "Reduce Data Loss in System Indices Migration 8x" (#121120)
Reverts #120566 The original PR is causing the following exception to be thrown when security is enabled: ``` system-indices-testing-es01-1 | org.elasticsearch.ElasticsearchSecurityException: action [indices:admin/block/add] is unauthorized for user [_system] with effective roles [_system], this action is granted by the index privileges [manage,all] ```
1 parent 9e098ff commit e27a1ec

File tree

5 files changed

+36
-175
lines changed

5 files changed

+36
-175
lines changed

docs/changelog/121120.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121120
2+
summary: Revert "Reduce Data Loss in System Indices Migration 8x"
3+
area: Infra/Core
4+
type: bug
5+
issues: []

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,14 @@
99

1010
package org.elasticsearch.migration;
1111

12-
import org.elasticsearch.ElasticsearchException;
1312
import org.elasticsearch.Version;
1413
import org.elasticsearch.action.ActionListener;
15-
import org.elasticsearch.action.ActionRequest;
1614
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
1715
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
1816
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
1917
import org.elasticsearch.action.admin.indices.stats.IndexStats;
2018
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
2119
import org.elasticsearch.action.index.IndexRequestBuilder;
22-
import org.elasticsearch.action.support.ActionFilter;
2320
import org.elasticsearch.action.support.ActiveShardCount;
2421
import org.elasticsearch.client.internal.Client;
2522
import org.elasticsearch.cluster.ClusterState;
@@ -31,7 +28,6 @@
3128
import org.elasticsearch.index.IndexVersion;
3229
import org.elasticsearch.indices.AssociatedIndexDescriptor;
3330
import org.elasticsearch.indices.SystemIndexDescriptor;
34-
import org.elasticsearch.plugins.ActionPlugin;
3531
import org.elasticsearch.plugins.Plugin;
3632
import org.elasticsearch.plugins.PluginsService;
3733
import org.elasticsearch.plugins.SystemIndexPlugin;
@@ -54,10 +50,6 @@
5450
import java.util.function.BiConsumer;
5551
import java.util.function.Function;
5652

57-
import static java.util.Collections.emptySet;
58-
import static java.util.Collections.singletonList;
59-
import static java.util.Collections.unmodifiableSet;
60-
import static org.elasticsearch.common.util.set.Sets.newHashSet;
6153
import static org.hamcrest.Matchers.containsInAnyOrder;
6254
import static org.hamcrest.Matchers.endsWith;
6355
import static org.hamcrest.Matchers.equalTo;
@@ -275,18 +267,12 @@ protected void assertIndexHasCorrectProperties(
275267
assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
276268
}
277269

278-
public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin {
270+
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
279271
public final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
280272
public final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();
281-
private final BlockingActionFilter blockingActionFilter;
282273

283274
public TestPlugin() {
284-
blockingActionFilter = new BlockingActionFilter();
285-
}
286275

287-
@Override
288-
public List<ActionFilter> getActionFilters() {
289-
return singletonList(blockingActionFilter);
290276
}
291277

292278
@Override
@@ -325,26 +311,5 @@ public void indicesMigrationComplete(
325311
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
326312
listener.onResponse(true);
327313
}
328-
329-
public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple {
330-
private Set<String> blockedActions = emptySet();
331-
332-
@Override
333-
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
334-
if (blockedActions.contains(action)) {
335-
throw new ElasticsearchException("force exception on [" + action + "]");
336-
}
337-
return true;
338-
}
339-
340-
@Override
341-
public int order() {
342-
return 0;
343-
}
344-
345-
public void blockActions(String... actions) {
346-
blockedActions = unmodifiableSet(newHashSet(actions));
347-
}
348-
}
349314
}
350315
}

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
1818
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
1919
import org.elasticsearch.action.admin.indices.alias.Alias;
20-
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
2120
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
2221
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2322
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
2423
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2524
import org.elasticsearch.action.search.SearchRequestBuilder;
26-
import org.elasticsearch.action.support.ActionFilter;
27-
import org.elasticsearch.action.support.ActionFilters;
2825
import org.elasticsearch.action.support.ActiveShardCount;
2926
import org.elasticsearch.cluster.ClusterState;
3027
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -39,12 +36,10 @@
3936
import org.elasticsearch.common.settings.Settings;
4037
import org.elasticsearch.index.query.QueryBuilders;
4138
import org.elasticsearch.indices.SystemIndexDescriptor;
42-
import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter;
4339
import org.elasticsearch.painless.PainlessPlugin;
4440
import org.elasticsearch.plugins.Plugin;
4541
import org.elasticsearch.plugins.SystemIndexPlugin;
4642
import org.elasticsearch.reindex.ReindexPlugin;
47-
import org.elasticsearch.test.InternalTestCluster;
4843
import org.elasticsearch.upgrades.FeatureMigrationResults;
4944
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
5045

@@ -279,60 +274,6 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
279274
});
280275
}
281276

282-
@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
283-
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
284-
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
285-
ensureGreen();
286-
287-
// Block the alias request to simulate a failure
288-
InternalTestCluster internalTestCluster = internalCluster();
289-
ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName());
290-
BlockingActionFilter blockingActionFilter = null;
291-
for (ActionFilter filter : actionFilters.filters()) {
292-
if (filter instanceof BlockingActionFilter) {
293-
blockingActionFilter = (BlockingActionFilter) filter;
294-
break;
295-
}
296-
}
297-
assertNotNull("BlockingActionFilter should exist", blockingActionFilter);
298-
blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME);
299-
300-
// Start the migration
301-
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
302-
303-
// Wait till the migration fails
304-
assertBusy(() -> {
305-
GetFeatureUpgradeStatusResponse statusResp = client().execute(
306-
GetFeatureUpgradeStatusAction.INSTANCE,
307-
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
308-
).get();
309-
logger.info(Strings.toString(statusResp));
310-
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR));
311-
});
312-
313-
// Get the settings to see if the write block was removed
314-
var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings();
315-
var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old");
316-
var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey());
317-
assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false"));
318-
319-
// Unblock the alias request
320-
blockingActionFilter.blockActions();
321-
322-
// Retry the migration
323-
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
324-
325-
// Ensure that the migration is successful after the alias request is unblocked
326-
assertBusy(() -> {
327-
GetFeatureUpgradeStatusResponse statusResp = client().execute(
328-
GetFeatureUpgradeStatusAction.INSTANCE,
329-
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
330-
).get();
331-
logger.info(Strings.toString(statusResp));
332-
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
333-
});
334-
}
335-
336277
public void testMigrationWillRunAfterError() throws Exception {
337278
createSystemIndexForDescriptor(INTERNAL_MANAGED);
338279

server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,6 @@ public boolean hasErrors() {
7777
return errors;
7878
}
7979

80-
/**
81-
* Get a list of all errors from the response. If there are no errors, an empty list is returned.
82-
*/
83-
public List<ElasticsearchException> getErrors() {
84-
if (errors == false) {
85-
return List.of();
86-
} else {
87-
return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList();
88-
}
89-
}
90-
9180
/**
9281
* Build a response from a list of action results. Sets the errors boolean based
9382
* on whether an of the individual results contain an error.
@@ -176,13 +165,6 @@ public static AliasActionResult buildSuccess(List<String> indices, AliasActions
176165
return new AliasActionResult(indices, action, null);
177166
}
178167

179-
/**
180-
* The error result if the action failed, null if the action succeeded.
181-
*/
182-
public ElasticsearchException getError() {
183-
return error;
184-
}
185-
186168
private int getStatus() {
187169
return error == null ? 200 : error.status().getStatus();
188170
}

server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java

Lines changed: 30 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1717
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
18-
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
1918
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
20-
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
2119
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
2220
import org.elasticsearch.action.support.ActiveShardCount;
2321
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -33,6 +31,7 @@
3331
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
3432
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
3533
import org.elasticsearch.cluster.service.ClusterService;
34+
import org.elasticsearch.common.CheckedBiConsumer;
3635
import org.elasticsearch.common.Strings;
3736
import org.elasticsearch.common.settings.IndexScopedSettings;
3837
import org.elasticsearch.common.settings.Settings;
@@ -59,7 +58,6 @@
5958
import java.util.stream.Collectors;
6059

6160
import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION;
62-
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
6361
import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE;
6462
import static org.elasticsearch.core.Strings.format;
6563

@@ -449,33 +447,12 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
449447
logAndThrowExceptionForFailures(bulkByScrollResponse)
450448
);
451449
} else {
452-
// Successful completion of reindexing. Now we need to set the alias and remove the old index.
453-
setAliasAndRemoveOldIndex(migrationInfo, ActionListener.wrap(aliasesResponse -> {
454-
if (aliasesResponse.hasErrors()) {
455-
var e = new ElasticsearchException("Aliases request had errors");
456-
for (var error : aliasesResponse.getErrors()) {
457-
e.addSuppressed(error);
458-
}
459-
throw e;
460-
}
461-
logger.info(
462-
"Successfully migrated old index [{}] to new index [{}] from feature [{}]",
463-
oldIndexName,
464-
migrationInfo.getNextIndexName(),
465-
migrationInfo.getFeatureName()
466-
);
467-
delegate2.onResponse(bulkByScrollResponse);
468-
}, e -> {
469-
logger.error(
470-
() -> format(
471-
"An error occurred while changing aliases and removing the old index [%s] from feature [%s]",
472-
oldIndexName,
473-
migrationInfo.getFeatureName()
474-
),
475-
e
476-
);
477-
removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e);
478-
}));
450+
// Successful completion of reindexing - remove read only and delete old index
451+
setWriteBlock(
452+
oldIndex,
453+
false,
454+
delegate2.delegateFailureAndWrap(setAliasAndRemoveOldIndex(migrationInfo, bulkByScrollResponse))
455+
);
479456
}
480457
}, e -> {
481458
logger.error(
@@ -527,7 +504,10 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
527504
metadataCreateIndexService.createIndex(TimeValue.MINUS_ONE, TimeValue.ZERO, null, createRequest, listener);
528505
}
529506

530-
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
507+
private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(
508+
SystemIndexMigrationInfo migrationInfo,
509+
BulkByScrollResponse bulkByScrollResponse
510+
) {
531511
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
532512
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
533513
aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName());
@@ -546,42 +526,30 @@ private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, A
546526
);
547527
});
548528

549-
aliasesRequest.execute(listener);
529+
// Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
530+
// while we're trying to migrate them.
531+
return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute(
532+
listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse))
533+
);
550534
}
551535

552536
/**
553-
* Sets the write block on the index to the given value.
537+
* Makes the index readonly if it's not set as a readonly yet
554538
*/
555539
private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener<AcknowledgedResponse> listener) {
556-
if (readOnlyValue) {
557-
// Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
558-
// in-flight writes are completed before returning.
559-
baseClient.admin()
560-
.indices()
561-
.addBlock(
562-
new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(TimeValue.MINUS_ONE),
563-
listener.delegateFailureAndWrap((l, response) -> {
564-
if (response.isAcknowledged() == false) {
565-
throw new ElasticsearchException("Failed to acknowledge read-only block index request");
566-
}
567-
l.onResponse(response);
568-
})
569-
);
570-
} else {
571-
// The only way to remove a Block is via a settings update.
572-
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build();
573-
metadataUpdateSettingsService.updateSettings(
574-
new UpdateSettingsClusterStateUpdateRequest(
575-
TimeValue.MINUS_ONE,
576-
TimeValue.ZERO,
577-
readOnlySettings,
578-
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
579-
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
580-
index
581-
),
582-
listener
583-
);
584-
}
540+
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build();
541+
542+
metadataUpdateSettingsService.updateSettings(
543+
new UpdateSettingsClusterStateUpdateRequest(
544+
TimeValue.MINUS_ONE,
545+
TimeValue.ZERO,
546+
readOnlySettings,
547+
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
548+
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
549+
index
550+
),
551+
listener
552+
);
585553
}
586554

587555
private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener<BulkByScrollResponse> listener) {

0 commit comments

Comments
 (0)