Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/changelog/121120.yaml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@

package org.elasticsearch.migration;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -28,6 +31,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.SystemIndexPlugin;
Expand All @@ -50,6 +54,10 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.common.util.set.Sets.newHashSet;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -267,12 +275,18 @@ protected void assertIndexHasCorrectProperties(
assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
}

public static class TestPlugin extends Plugin implements SystemIndexPlugin {
public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin {
public final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
public final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();
private final BlockingActionFilter blockingActionFilter;

public TestPlugin() {
blockingActionFilter = new BlockingActionFilter();
}

@Override
public List<ActionFilter> getActionFilters() {
return singletonList(blockingActionFilter);
}

@Override
Expand Down Expand Up @@ -311,5 +325,26 @@ public void indicesMigrationComplete(
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
listener.onResponse(true);
}

public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple {
private Set<String> blockedActions = emptySet();

@Override
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
if (blockedActions.contains(action)) {
throw new ElasticsearchException("force exception on [" + action + "]");
}
return true;
}

@Override
public int order() {
return 0;
}

public void blockActions(String... actions) {
blockedActions = unmodifiableSet(newHashSet(actions));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
Expand All @@ -36,10 +39,12 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter;
import org.elasticsearch.painless.PainlessPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.upgrades.FeatureMigrationResults;
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;

Expand Down Expand Up @@ -274,6 +279,60 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
});
}

@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
ensureGreen();

// Block the alias request to simulate a failure
InternalTestCluster internalTestCluster = internalCluster();
ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName());
BlockingActionFilter blockingActionFilter = null;
for (ActionFilter filter : actionFilters.filters()) {
if (filter instanceof BlockingActionFilter) {
blockingActionFilter = (BlockingActionFilter) filter;
break;
}
}
assertNotNull("BlockingActionFilter should exist", blockingActionFilter);
blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME);

// Start the migration
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();

// Wait till the migration fails
assertBusy(() -> {
GetFeatureUpgradeStatusResponse statusResp = client().execute(
GetFeatureUpgradeStatusAction.INSTANCE,
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
).get();
logger.info(Strings.toString(statusResp));
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR));
});

// Get the settings to see if the write block was removed
var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings();
var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old");
var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey());
assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false"));

// Unblock the alias request
blockingActionFilter.blockActions();

// Retry the migration
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();

// Ensure that the migration is successful after the alias request is unblocked
assertBusy(() -> {
GetFeatureUpgradeStatusResponse statusResp = client().execute(
GetFeatureUpgradeStatusAction.INSTANCE,
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
).get();
logger.info(Strings.toString(statusResp));
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
});
}

public void testMigrationWillRunAfterError() throws Exception {
createSystemIndexForDescriptor(INTERNAL_MANAGED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,17 @@ public boolean hasErrors() {
return errors;
}

/**
* Get a list of all errors from the response. If there are no errors, an empty list is returned.
*/
public List<ElasticsearchException> getErrors() {
if (errors == false) {
return List.of();
} else {
return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList();
}
}

/**
* Build a response from a list of action results. Sets the errors boolean based
* on whether an of the individual results contain an error.
Expand Down Expand Up @@ -165,6 +176,13 @@ public static AliasActionResult buildSuccess(List<String> indices, AliasActions
return new AliasActionResult(indices, action, null);
}

/**
* The error result if the action failed, null if the action succeeded.
*/
public ElasticsearchException getError() {
return error;
}

private int getStatus() {
return error == null ? 200 : error.status().getStatus();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
Expand All @@ -31,7 +33,6 @@
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -58,6 +59,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE;
import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -447,12 +449,33 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
logAndThrowExceptionForFailures(bulkByScrollResponse)
);
} else {
// Successful completion of reindexing - remove read only and delete old index
setWriteBlock(
oldIndex,
false,
delegate2.delegateFailureAndWrap(setAliasAndRemoveOldIndex(migrationInfo, bulkByScrollResponse))
);
// Successful completion of reindexing. Now we need to set the alias and remove the old index.
setAliasAndRemoveOldIndex(migrationInfo, ActionListener.wrap(aliasesResponse -> {
if (aliasesResponse.hasErrors()) {
var e = new ElasticsearchException("Aliases request had errors");
for (var error : aliasesResponse.getErrors()) {
e.addSuppressed(error);
}
throw e;
}
logger.info(
"Successfully migrated old index [{}] to new index [{}] from feature [{}]",
oldIndexName,
migrationInfo.getNextIndexName(),
migrationInfo.getFeatureName()
);
delegate2.onResponse(bulkByScrollResponse);
}, e -> {
logger.error(
() -> format(
"An error occurred while changing aliases and removing the old index [%s] from feature [%s]",
oldIndexName,
migrationInfo.getFeatureName()
),
e
);
removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e);
}));
}
}, e -> {
logger.error(
Expand Down Expand Up @@ -504,10 +527,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
metadataCreateIndexService.createIndex(TimeValue.MINUS_ONE, TimeValue.ZERO, null, createRequest, listener);
}

private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(
SystemIndexMigrationInfo migrationInfo,
BulkByScrollResponse bulkByScrollResponse
) {
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName());
Expand All @@ -526,30 +546,42 @@ private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResp
);
});

// Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
// while we're trying to migrate them.
return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute(
listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse))
);
aliasesRequest.execute(listener);
}

/**
* Makes the index readonly if it's not set as a readonly yet
* Sets the write block on the index to the given value.
*/
private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener<AcknowledgedResponse> listener) {
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build();

metadataUpdateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
TimeValue.MINUS_ONE,
TimeValue.ZERO,
readOnlySettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
index
),
listener
);
if (readOnlyValue) {
// Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
// in-flight writes are completed before returning.
baseClient.admin()
.indices()
.addBlock(
new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(TimeValue.MINUS_ONE),
listener.delegateFailureAndWrap((l, response) -> {
if (response.isAcknowledged() == false) {
throw new ElasticsearchException("Failed to acknowledge read-only block index request");
}
l.onResponse(response);
})
);
} else {
// The only way to remove a Block is via a settings update.
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build();
metadataUpdateSettingsService.updateSettings(
new UpdateSettingsClusterStateUpdateRequest(
TimeValue.MINUS_ONE,
TimeValue.ZERO,
readOnlySettings,
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
index
),
listener
);
}
}

private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener<BulkByScrollResponse> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.security.authz.privilege;

import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
import org.elasticsearch.action.search.TransportSearchShardsAction;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
Expand Down Expand Up @@ -38,12 +39,13 @@ public final class SystemPrivilege extends Privilege {
RetentionLeaseActions.ADD.name() + "*", // needed for CCR to add retention leases
RetentionLeaseActions.REMOVE.name() + "*", // needed for CCR to remove retention leases
RetentionLeaseActions.RENEW.name() + "*", // needed for CCR to renew retention leases
"indices:admin/settings/update", // needed for DiskThresholdMonitor.markIndicesReadOnly
"indices:admin/settings/update", // needed for: DiskThresholdMonitor.markIndicesReadOnly, SystemIndexMigrator
CompletionPersistentTaskAction.NAME, // needed for ShardFollowTaskCleaner
"indices:data/write/*", // needed for SystemIndexMigrator
"indices:data/read/*", // needed for SystemIndexMigrator
"indices:admin/refresh", // needed for SystemIndexMigrator
"indices:admin/aliases", // needed for SystemIndexMigrator
TransportAddIndexBlockAction.TYPE.name() + "*", // needed for SystemIndexMigrator
TransportSearchShardsAction.TYPE.name(), // added so this API can be called with the system user by other APIs
ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name() // needed for Security plugin reload of remote cluster credentials
);
Expand Down