Skip to content

Commit 8909630

Browse files
authored
Bugfix/fix privileges in system migration block (elastic#122217)
* Fix privileges for system index migration WRITE block * Update release notes * Delete docs/changelog/122217.yaml
1 parent 7456e79 commit 8909630

File tree

6 files changed

+178
-37
lines changed

6 files changed

+178
-37
lines changed

docs/changelog/121120.yaml

Lines changed: 0 additions & 5 deletions
This file was deleted.

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99

1010
package org.elasticsearch.migration;
1111

12+
import org.elasticsearch.ElasticsearchException;
1213
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.ActionRequest;
1416
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
1517
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
1618
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
1719
import org.elasticsearch.action.admin.indices.stats.IndexStats;
1820
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
1921
import org.elasticsearch.action.index.IndexRequestBuilder;
22+
import org.elasticsearch.action.support.ActionFilter;
2023
import org.elasticsearch.action.support.ActiveShardCount;
2124
import org.elasticsearch.client.internal.Client;
2225
import org.elasticsearch.cluster.ClusterState;
@@ -28,6 +31,7 @@
2831
import org.elasticsearch.index.IndexVersion;
2932
import org.elasticsearch.indices.AssociatedIndexDescriptor;
3033
import org.elasticsearch.indices.SystemIndexDescriptor;
34+
import org.elasticsearch.plugins.ActionPlugin;
3135
import org.elasticsearch.plugins.Plugin;
3236
import org.elasticsearch.plugins.PluginsService;
3337
import org.elasticsearch.plugins.SystemIndexPlugin;
@@ -50,6 +54,10 @@
5054
import java.util.function.BiConsumer;
5155
import java.util.function.Function;
5256

57+
import static java.util.Collections.emptySet;
58+
import static java.util.Collections.singletonList;
59+
import static java.util.Collections.unmodifiableSet;
60+
import static org.elasticsearch.common.util.set.Sets.newHashSet;
5361
import static org.hamcrest.Matchers.containsInAnyOrder;
5462
import static org.hamcrest.Matchers.endsWith;
5563
import static org.hamcrest.Matchers.equalTo;
@@ -267,12 +275,18 @@ protected void assertIndexHasCorrectProperties(
267275
assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
268276
}
269277

270-
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
278+
public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin {
271279
public final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
272280
public final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();
281+
private final BlockingActionFilter blockingActionFilter;
273282

274283
public TestPlugin() {
284+
blockingActionFilter = new BlockingActionFilter();
285+
}
275286

287+
@Override
288+
public List<ActionFilter> getActionFilters() {
289+
return singletonList(blockingActionFilter);
276290
}
277291

278292
@Override
@@ -311,5 +325,26 @@ public void indicesMigrationComplete(
311325
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
312326
listener.onResponse(true);
313327
}
328+
329+
public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple {
330+
private Set<String> blockedActions = emptySet();
331+
332+
@Override
333+
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
334+
if (blockedActions.contains(action)) {
335+
throw new ElasticsearchException("force exception on [" + action + "]");
336+
}
337+
return true;
338+
}
339+
340+
@Override
341+
public int order() {
342+
return 0;
343+
}
344+
345+
public void blockActions(String... actions) {
346+
blockedActions = unmodifiableSet(newHashSet(actions));
347+
}
348+
}
314349
}
315350
}

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
1818
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
1919
import org.elasticsearch.action.admin.indices.alias.Alias;
20+
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
2021
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
2122
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2223
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
2324
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2425
import org.elasticsearch.action.search.SearchRequestBuilder;
26+
import org.elasticsearch.action.support.ActionFilter;
27+
import org.elasticsearch.action.support.ActionFilters;
2528
import org.elasticsearch.action.support.ActiveShardCount;
2629
import org.elasticsearch.cluster.ClusterState;
2730
import org.elasticsearch.cluster.ClusterStateUpdateTask;
@@ -36,10 +39,12 @@
3639
import org.elasticsearch.common.settings.Settings;
3740
import org.elasticsearch.index.query.QueryBuilders;
3841
import org.elasticsearch.indices.SystemIndexDescriptor;
42+
import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter;
3943
import org.elasticsearch.painless.PainlessPlugin;
4044
import org.elasticsearch.plugins.Plugin;
4145
import org.elasticsearch.plugins.SystemIndexPlugin;
4246
import org.elasticsearch.reindex.ReindexPlugin;
47+
import org.elasticsearch.test.InternalTestCluster;
4348
import org.elasticsearch.upgrades.FeatureMigrationResults;
4449
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
4550

@@ -274,6 +279,60 @@ public void testMigrateIndexWithWriteBlock() throws Exception {
274279
});
275280
}
276281

282+
@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
283+
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
284+
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
285+
ensureGreen();
286+
287+
// Block the alias request to simulate a failure
288+
InternalTestCluster internalTestCluster = internalCluster();
289+
ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName());
290+
BlockingActionFilter blockingActionFilter = null;
291+
for (ActionFilter filter : actionFilters.filters()) {
292+
if (filter instanceof BlockingActionFilter) {
293+
blockingActionFilter = (BlockingActionFilter) filter;
294+
break;
295+
}
296+
}
297+
assertNotNull("BlockingActionFilter should exist", blockingActionFilter);
298+
blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME);
299+
300+
// Start the migration
301+
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
302+
303+
// Wait till the migration fails
304+
assertBusy(() -> {
305+
GetFeatureUpgradeStatusResponse statusResp = client().execute(
306+
GetFeatureUpgradeStatusAction.INSTANCE,
307+
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
308+
).get();
309+
logger.info(Strings.toString(statusResp));
310+
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR));
311+
});
312+
313+
// Get the settings to see if the write block was removed
314+
var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings();
315+
var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old");
316+
var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey());
317+
assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false"));
318+
319+
// Unblock the alias request
320+
blockingActionFilter.blockActions();
321+
322+
// Retry the migration
323+
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
324+
325+
// Ensure that the migration is successful after the alias request is unblocked
326+
assertBusy(() -> {
327+
GetFeatureUpgradeStatusResponse statusResp = client().execute(
328+
GetFeatureUpgradeStatusAction.INSTANCE,
329+
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
330+
).get();
331+
logger.info(Strings.toString(statusResp));
332+
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
333+
});
334+
}
335+
277336
public void testMigrationWillRunAfterError() throws Exception {
278337
createSystemIndexForDescriptor(INTERNAL_MANAGED);
279338

server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,17 @@ public boolean hasErrors() {
7777
return errors;
7878
}
7979

80+
/**
81+
* Get a list of all errors from the response. If there are no errors, an empty list is returned.
82+
*/
83+
public List<ElasticsearchException> getErrors() {
84+
if (errors == false) {
85+
return List.of();
86+
} else {
87+
return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList();
88+
}
89+
}
90+
8091
/**
8192
* Build a response from a list of action results. Sets the errors boolean based
8293
* on whether an of the individual results contain an error.
@@ -165,6 +176,13 @@ public static AliasActionResult buildSuccess(List<String> indices, AliasActions
165176
return new AliasActionResult(indices, action, null);
166177
}
167178

179+
/**
180+
* The error result if the action failed, null if the action succeeded.
181+
*/
182+
public ElasticsearchException getError() {
183+
return error;
184+
}
185+
168186
private int getStatus() {
169187
return error == null ? 200 : error.status().getStatus();
170188
}

server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java

Lines changed: 62 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import org.elasticsearch.action.ActionListener;
1616
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1717
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
18+
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
1819
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
20+
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
1921
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
2022
import org.elasticsearch.action.support.ActiveShardCount;
2123
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -31,7 +33,6 @@
3133
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
3234
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
3335
import org.elasticsearch.cluster.service.ClusterService;
34-
import org.elasticsearch.common.CheckedBiConsumer;
3536
import org.elasticsearch.common.Strings;
3637
import org.elasticsearch.common.settings.IndexScopedSettings;
3738
import org.elasticsearch.common.settings.Settings;
@@ -58,6 +59,7 @@
5859
import java.util.stream.Collectors;
5960

6061
import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION;
62+
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
6163
import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE;
6264
import static org.elasticsearch.core.Strings.format;
6365

@@ -447,12 +449,33 @@ private void migrateSingleIndex(ClusterState clusterState, Consumer<BulkByScroll
447449
logAndThrowExceptionForFailures(bulkByScrollResponse)
448450
);
449451
} else {
450-
// Successful completion of reindexing - remove read only and delete old index
451-
setWriteBlock(
452-
oldIndex,
453-
false,
454-
delegate2.delegateFailureAndWrap(setAliasAndRemoveOldIndex(migrationInfo, bulkByScrollResponse))
455-
);
452+
// Successful completion of reindexing. Now we need to set the alias and remove the old index.
453+
setAliasAndRemoveOldIndex(migrationInfo, ActionListener.wrap(aliasesResponse -> {
454+
if (aliasesResponse.hasErrors()) {
455+
var e = new ElasticsearchException("Aliases request had errors");
456+
for (var error : aliasesResponse.getErrors()) {
457+
e.addSuppressed(error);
458+
}
459+
throw e;
460+
}
461+
logger.info(
462+
"Successfully migrated old index [{}] to new index [{}] from feature [{}]",
463+
oldIndexName,
464+
migrationInfo.getNextIndexName(),
465+
migrationInfo.getFeatureName()
466+
);
467+
delegate2.onResponse(bulkByScrollResponse);
468+
}, e -> {
469+
logger.error(
470+
() -> format(
471+
"An error occurred while changing aliases and removing the old index [%s] from feature [%s]",
472+
oldIndexName,
473+
migrationInfo.getFeatureName()
474+
),
475+
e
476+
);
477+
removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e);
478+
}));
456479
}
457480
}, e -> {
458481
logger.error(
@@ -504,10 +527,7 @@ private void createIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<
504527
metadataCreateIndexService.createIndex(TimeValue.MINUS_ONE, TimeValue.ZERO, null, createRequest, listener);
505528
}
506529

507-
private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(
508-
SystemIndexMigrationInfo migrationInfo,
509-
BulkByScrollResponse bulkByScrollResponse
510-
) {
530+
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
511531
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
512532
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
513533
aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName());
@@ -526,30 +546,42 @@ private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResp
526546
);
527547
});
528548

529-
// Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
530-
// while we're trying to migrate them.
531-
return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute(
532-
listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse))
533-
);
549+
aliasesRequest.execute(listener);
534550
}
535551

536552
/**
537-
* Makes the index readonly if it's not set as a readonly yet
553+
* Sets the write block on the index to the given value.
538554
*/
539555
private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener<AcknowledgedResponse> listener) {
540-
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build();
541-
542-
metadataUpdateSettingsService.updateSettings(
543-
new UpdateSettingsClusterStateUpdateRequest(
544-
TimeValue.MINUS_ONE,
545-
TimeValue.ZERO,
546-
readOnlySettings,
547-
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
548-
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
549-
index
550-
),
551-
listener
552-
);
556+
if (readOnlyValue) {
557+
// Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
558+
// in-flight writes are completed before returning.
559+
baseClient.admin()
560+
.indices()
561+
.addBlock(
562+
new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(TimeValue.MINUS_ONE),
563+
listener.delegateFailureAndWrap((l, response) -> {
564+
if (response.isAcknowledged() == false) {
565+
throw new ElasticsearchException("Failed to acknowledge read-only block index request");
566+
}
567+
l.onResponse(response);
568+
})
569+
);
570+
} else {
571+
// The only way to remove a Block is via a settings update.
572+
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build();
573+
metadataUpdateSettingsService.updateSettings(
574+
new UpdateSettingsClusterStateUpdateRequest(
575+
TimeValue.MINUS_ONE,
576+
TimeValue.ZERO,
577+
readOnlySettings,
578+
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
579+
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
580+
index
581+
),
582+
listener
583+
);
584+
}
553585
}
554586

555587
private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener<BulkByScrollResponse> listener) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/SystemPrivilege.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
*/
77
package org.elasticsearch.xpack.core.security.authz.privilege;
88

9+
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
910
import org.elasticsearch.action.search.TransportSearchShardsAction;
1011
import org.elasticsearch.index.seqno.RetentionLeaseActions;
1112
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
@@ -38,12 +39,13 @@ public final class SystemPrivilege extends Privilege {
3839
RetentionLeaseActions.ADD.name() + "*", // needed for CCR to add retention leases
3940
RetentionLeaseActions.REMOVE.name() + "*", // needed for CCR to remove retention leases
4041
RetentionLeaseActions.RENEW.name() + "*", // needed for CCR to renew retention leases
41-
"indices:admin/settings/update", // needed for DiskThresholdMonitor.markIndicesReadOnly
42+
"indices:admin/settings/update", // needed for: DiskThresholdMonitor.markIndicesReadOnly, SystemIndexMigrator
4243
CompletionPersistentTaskAction.NAME, // needed for ShardFollowTaskCleaner
4344
"indices:data/write/*", // needed for SystemIndexMigrator
4445
"indices:data/read/*", // needed for SystemIndexMigrator
4546
"indices:admin/refresh", // needed for SystemIndexMigrator
4647
"indices:admin/aliases", // needed for SystemIndexMigrator
48+
TransportAddIndexBlockAction.TYPE.name() + "*", // needed for SystemIndexMigrator
4749
TransportSearchShardsAction.TYPE.name(), // added so this API can be called with the system user by other APIs
4850
ActionTypes.RELOAD_REMOTE_CLUSTER_CREDENTIALS_ACTION.name() // needed for Security plugin reload of remote cluster credentials
4951
);

0 commit comments

Comments
 (0)