diff --git a/docs/changelog/121120.yaml b/docs/changelog/121120.yaml deleted file mode 100644 index ff375d9088ac9..0000000000000 --- a/docs/changelog/121120.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 121120 -summary: Revert "Reduce Data Loss in System Indices Migration 8x" -area: Infra/Core -type: bug -issues: [] diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java index 87126aa7a0f59..3a9d2710ce412 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java @@ -9,14 +9,17 @@ package org.elasticsearch.migration; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -28,6 +31,7 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.SystemIndexPlugin; @@ -50,6 +54,10 @@ import java.util.function.BiConsumer; import java.util.function.Function; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static java.util.Collections.unmodifiableSet; +import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -267,12 +275,18 @@ protected void assertIndexHasCorrectProperties( assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT)); } - public static class TestPlugin extends Plugin implements SystemIndexPlugin { + public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin { public final AtomicReference>> preMigrationHook = new AtomicReference<>(); public final AtomicReference>> postMigrationHook = new AtomicReference<>(); + private final BlockingActionFilter blockingActionFilter; public TestPlugin() { + blockingActionFilter = new BlockingActionFilter(); + } + @Override + public List getActionFilters() { + return singletonList(blockingActionFilter); } @Override @@ -311,5 +325,26 @@ public void indicesMigrationComplete( postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata); listener.onResponse(true); } + + public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple { + private Set blockedActions = emptySet(); + + @Override + protected boolean apply(String action, ActionRequest request, ActionListener listener) { + if (blockedActions.contains(action)) { + throw new ElasticsearchException("force exception on [" + action + "]"); + } + return true; + } + + @Override + public int order() { + return 0; + } + + public void blockActions(String... actions) { + blockedActions = unmodifiableSet(newHashSet(actions)); + } + } } } diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java index cd5857da078ba..af91d07f2dea5 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java @@ -17,11 +17,14 @@ import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest; import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse; import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -36,10 +39,12 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.indices.SystemIndexDescriptor; +import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter; import org.elasticsearch.painless.PainlessPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.upgrades.FeatureMigrationResults; import org.elasticsearch.upgrades.SingleFeatureMigrationResult; @@ -274,6 +279,60 @@ public void testMigrateIndexWithWriteBlock() throws Exception { }); } + @AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue + public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception { + createSystemIndexForDescriptor(INTERNAL_UNMANAGED); + ensureGreen(); + + // Block the alias request to simulate a failure + InternalTestCluster internalTestCluster = internalCluster(); + ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName()); + BlockingActionFilter blockingActionFilter = null; + for (ActionFilter filter : actionFilters.filters()) { + if (filter instanceof BlockingActionFilter) { + blockingActionFilter = (BlockingActionFilter) filter; + break; + } + } + assertNotNull("BlockingActionFilter should exist", blockingActionFilter); + blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME); + + // Start the migration + client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get(); + + // Wait till the migration fails + assertBusy(() -> { + GetFeatureUpgradeStatusResponse statusResp = client().execute( + GetFeatureUpgradeStatusAction.INSTANCE, + new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT) + ).get(); + logger.info(Strings.toString(statusResp)); + assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR)); + }); + + // Get the settings to see if the write block was removed + var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings(); + var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old"); + var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()); + assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false")); + + // Unblock the alias request + blockingActionFilter.blockActions(); + + // Retry the migration + client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get(); + + // Ensure that the migration is successful after the alias request is unblocked + assertBusy(() -> { + GetFeatureUpgradeStatusResponse statusResp = client().execute( + GetFeatureUpgradeStatusAction.INSTANCE, + new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT) + ).get(); + logger.info(Strings.toString(statusResp)); + assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED)); + }); + } + public void testMigrationWillRunAfterError() throws Exception { createSystemIndexForDescriptor(INTERNAL_MANAGED); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java index 69ab9f57d2be7..071e9b42752c0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java @@ -77,6 +77,17 @@ public boolean hasErrors() { return errors; } + /** + * Get a list of all errors from the response. If there are no errors, an empty list is returned. + */ + public List getErrors() { + if (errors == false) { + return List.of(); + } else { + return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList(); + } + } + /** * Build a response from a list of action results. Sets the errors boolean based * on whether an of the individual results contain an error. @@ -165,6 +176,13 @@ public static AliasActionResult buildSuccess(List indices, AliasActions return new AliasActionResult(indices, action, null); } + /** + * The error result if the action failed, null if the action succeeded. + */ + public ElasticsearchException getError() { + return error; + } + private int getStatus() { return error == null ? 200 : error.status().getStatus(); } diff --git a/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java b/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java index 2215fd33730eb..4a54fd373a8d4 100644 --- a/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java +++ b/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java @@ -15,7 +15,9 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; +import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -31,7 +33,6 @@ import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -58,6 +59,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION; +import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE; import static org.elasticsearch.core.Strings.format; @@ -447,12 +449,33 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer { + if (aliasesResponse.hasErrors()) { + var e = new ElasticsearchException("Aliases request had errors"); + for (var error : aliasesResponse.getErrors()) { + e.addSuppressed(error); + } + throw e; + } + logger.info( + "Successfully migrated old index [{}] to new index [{}] from feature [{}]", + oldIndexName, + migrationInfo.getNextIndexName(), + migrationInfo.getFeatureName() + ); + delegate2.onResponse(bulkByScrollResponse); + }, e -> { + logger.error( + () -> format( + "An error occurred while changing aliases and removing the old index [%s] from feature [%s]", + oldIndexName, + migrationInfo.getFeatureName() + ), + e + ); + removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e); + })); } }, e -> { logger.error( @@ -504,10 +527,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener< metadataCreateIndexService.createIndex(TimeValue.MINUS_ONE, TimeValue.ZERO, null, createRequest, listener); } - private CheckedBiConsumer, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex( - SystemIndexMigrationInfo migrationInfo, - BulkByScrollResponse bulkByScrollResponse - ) { + private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases(); aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName()); aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName()); @@ -526,30 +546,42 @@ private CheckedBiConsumer, AcknowledgedResp ); }); - // Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing - // while we're trying to migrate them. - return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute( - listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse)) - ); + aliasesRequest.execute(listener); } /** - * Makes the index readonly if it's not set as a readonly yet + * Sets the write block on the index to the given value. */ private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener listener) { - final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build(); - - metadataUpdateSettingsService.updateSettings( - new UpdateSettingsClusterStateUpdateRequest( - TimeValue.MINUS_ONE, - TimeValue.ZERO, - readOnlySettings, - UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE, - UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT, - index - ), - listener - ); + if (readOnlyValue) { + // Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all + // in-flight writes are completed before returning. + baseClient.admin() + .indices() + .addBlock( + new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(TimeValue.MINUS_ONE), + listener.delegateFailureAndWrap((l, response) -> { + if (response.isAcknowledged() == false) { + throw new ElasticsearchException("Failed to acknowledge read-only block index request"); + } + l.onResponse(response); + }) + ); + } else { + // The only way to remove a Block is via a settings update. + final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build(); + metadataUpdateSettingsService.updateSettings( + new UpdateSettingsClusterStateUpdateRequest( + TimeValue.MINUS_ONE, + TimeValue.ZERO, + readOnlySettings, + UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE, + UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT, + index + ), + listener + ); + } } private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java index 013d7cc21a54a..29e98ccee5bcb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.security.authz.privilege; +import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction; import org.elasticsearch.action.search.TransportSearchShardsAction; import org.elasticsearch.index.seqno.RetentionLeaseActions; import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; @@ -38,12 +39,13 @@ public final class SystemPrivilege extends Privilege { RetentionLeaseActions.ADD.name() + "*", // needed for CCR to add retention leases RetentionLeaseActions.REMOVE.name() + "*", // needed for CCR to remove retention leases RetentionLeaseActions.RENEW.name() + "*", // needed for CCR to renew retention leases - "indices:admin/settings/update", // needed for DiskThresholdMonitor.markIndicesReadOnly + "indices:admin/settings/update", // needed for: DiskThresholdMonitor.markIndicesReadOnly, SystemIndexMigrator CompletionPersistentTaskAction.NAME, // needed for ShardFollowTaskCleaner "indices:data/write/*", // needed for SystemIndexMigrator "indices:data/read/*", // needed for SystemIndexMigrator "indices:admin/refresh", // needed for SystemIndexMigrator "indices:admin/aliases", // needed for SystemIndexMigrator + TransportAddIndexBlockAction.TYPE.name() + "*", // needed for SystemIndexMigrator TransportSearchShardsAction.TYPE.name(), // added so this API can be called with the system user by other APIs ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name() // needed for Security plugin reload of remote cluster credentials );