diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/UpdateIndexMigrationVersionAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/UpdateIndexMigrationVersionAction.java index 674354c15702b..9b8965784c7a1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/UpdateIndexMigrationVersionAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/action/UpdateIndexMigrationVersionAction.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.core.security.action; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; @@ -19,7 +21,10 @@ import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; @@ -41,6 +46,8 @@ */ public class UpdateIndexMigrationVersionAction extends ActionType { + private static final Logger logger = LogManager.getLogger(UpdateIndexMigrationVersionAction.class); + public static final UpdateIndexMigrationVersionAction INSTANCE = new UpdateIndexMigrationVersionAction(); public static final String NAME = "internal:index/metadata/migration_version/update"; public static final String MIGRATION_VERSION_CUSTOM_KEY = "migration_version"; @@ -89,13 +96,15 @@ public String getIndexName() { public static class TransportAction extends TransportMasterNodeAction { private final MasterServiceTaskQueue updateIndexMigrationVersionTaskQueue; + private final ProjectResolver projectResolver; @Inject public TransportAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( UpdateIndexMigrationVersionAction.NAME, @@ -112,6 +121,7 @@ public TransportAction( Priority.LOW, UPDATE_INDEX_MIGRATION_VERSION_TASK_EXECUTOR ); + this.projectResolver = projectResolver; } private static final SimpleBatchedExecutor UPDATE_INDEX_MIGRATION_VERSION_TASK_EXECUTOR = @@ -131,15 +141,34 @@ static class UpdateIndexMigrationVersionTask implements ClusterStateTaskListener private final ActionListener listener; private final int indexMigrationVersion; private final String indexName; - - UpdateIndexMigrationVersionTask(ActionListener listener, int indexMigrationVersion, String indexName) { + private final ProjectId projectId; + + UpdateIndexMigrationVersionTask( + ActionListener listener, + int indexMigrationVersion, + String indexName, + ProjectId projectId + ) { this.listener = listener; this.indexMigrationVersion = indexMigrationVersion; this.indexName = indexName; + this.projectId = projectId; } ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getProject(); + final Metadata metadata = currentState.metadata(); + if (metadata.hasProject(projectId) == false) { + // project has been deleted? nothing to do + logger.warn( + "Cannot update security index [{}] in project [{}] to migration-version [{}]" + + " because it does not exist in cluster state", + indexName, + projectId, + indexMigrationVersion + ); + return currentState; + } + final var project = metadata.getProject(projectId); IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(project.indices().get(indexName)); indexMetadataBuilder.putCustom( MIGRATION_VERSION_CUSTOM_KEY, @@ -168,20 +197,30 @@ protected void masterOperation( ClusterState state, ActionListener listener ) throws Exception { + final ProjectId projectId = projectResolver.getProjectId(); updateIndexMigrationVersionTaskQueue.submitTask( "Updating cluster state with a new index migration version", - new UpdateIndexMigrationVersionTask( - ActionListener.wrap(response -> listener.onResponse(new UpdateIndexMigrationVersionResponse()), listener::onFailure), - request.getIndexMigrationVersion(), - request.getIndexName() - ), + new UpdateIndexMigrationVersionTask(ActionListener.wrap(response -> { + logger.info( + "Updated project=[{}] index=[{}] to migration-version=[{}]", + projectId, + request.getIndexName(), + request.getIndexMigrationVersion() + ); + listener.onResponse(new UpdateIndexMigrationVersionResponse()); + }, listener::onFailure), request.getIndexMigrationVersion(), request.getIndexName(), projectId), null ); } @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { - return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[] { request.getIndexName() }); + return state.blocks() + .indicesBlockedException( + projectResolver.getProjectId(), + ClusterBlockLevel.METADATA_WRITE, + new String[] { request.getIndexName() } + ); } } } diff --git a/x-pack/plugin/security/qa/multi-project/build.gradle b/x-pack/plugin/security/qa/multi-project/build.gradle index 8f8c291e96dd8..2ff94887a5b4f 100644 --- a/x-pack/plugin/security/qa/multi-project/build.gradle +++ b/x-pack/plugin/security/qa/multi-project/build.gradle @@ -1,9 +1,11 @@ apply plugin: 'elasticsearch.internal-java-rest-test' dependencies { - javaRestTestImplementation "com.nimbusds:nimbus-jose-jwt:10.0.2" clusterModules project(':test:external-modules:test-multi-project') clusterModules project(':modules:analysis-common') + javaRestTestImplementation project(':x-pack:plugin:core') + javaRestTestImplementation project(':x-pack:plugin:security') + javaRestTestImplementation testArtifact(project(':x-pack:plugin:security')) } tasks.named('javaRestTest') { diff --git a/x-pack/plugin/security/qa/multi-project/src/javaRestTest/java/org/elasticsearch/xpack/security/support/SecurityIndexMigrationMultiProjectIT.java b/x-pack/plugin/security/qa/multi-project/src/javaRestTest/java/org/elasticsearch/xpack/security/support/SecurityIndexMigrationMultiProjectIT.java new file mode 100644 index 0000000000000..d240d13546a91 --- /dev/null +++ b/x-pack/plugin/security/qa/multi-project/src/javaRestTest/java/org/elasticsearch/xpack/security/support/SecurityIndexMigrationMultiProjectIT.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.security.support; + +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.TestSecurityClient; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; +import org.elasticsearch.xpack.core.security.user.User; +import org.junit.ClassRule; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; + +public class SecurityIndexMigrationMultiProjectIT extends ESRestTestCase { + + private static final String USER = "admin-user"; + private static final String PASS = "admin-password"; + + @ClassRule + public static ElasticsearchCluster CLUSTER = ElasticsearchCluster.local() + .distribution(DistributionType.INTEG_TEST) + .module("test-multi-project") + .module("analysis-common") + .nodes(2) + .setting("test.multi_project.enabled", "true") + .setting("xpack.security.enabled", "true") + .user(USER, PASS) + .build(); + + @Override + protected String getTestRestCluster() { + return CLUSTER.getHttpAddresses(); + } + + @Override + protected Settings restClientSettings() { + String token = basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray())); + final Settings.Builder builder = Settings.builder() + .put(super.restClientSettings()) + .put(ThreadContext.PREFIX + ".Authorization", token); + return builder.build(); + } + + public void testMigrateSecurityIndex() throws Exception { + final String expectedMigrationVersion = SecurityMigrations.MIGRATIONS_BY_VERSION.keySet() + .stream() + .max(Comparator.naturalOrder()) + .map(String::valueOf) + .orElseThrow(); + + final List projectIds = randomList(1, 4, ESTestCase::randomIdentifier); + for (String projectId : projectIds) { + logger.info("Creating project [{}]", projectId); + createProject(projectId); + final TestSecurityClient securityClient = new TestSecurityClient( + adminClient(), + RequestOptions.DEFAULT.toBuilder().addHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, projectId).build() + ); + // Trigger creation of the security index + final String username = randomAlphaOfLength(8); + securityClient.putUser(new User(username), new SecureString(randomAlphaOfLength(12).toCharArray())); + logger.info("Created user [{}] in project [{}]", username, projectId); + } + + assertBusy(() -> { + // Get the index state for every security index + final Map clusterState = getAsMap( + adminClient(), + "/_cluster/state/metadata/" + SecuritySystemIndices.SECURITY_MAIN_ALIAS + "?multi_project=true" + ); + final Map> projectStateById = ObjectPath.>>evaluate( + clusterState, + "metadata.projects" + ).stream().collect(Collectors.toMap(obj -> obj.get("id"), Function.identity())); + + for (var projectId : projectIds) { + var projectState = projectStateById.get(projectId); + assertThat("project [" + projectId + "] is not available in cluster state", projectState, notNullValue()); + + Map indices = ObjectPath.evaluate(projectState, "indices"); + assertThat("project [ " + projectId + "] should have a single security index", indices, aMapWithSize(1)); + + final var entry = Iterables.get(indices.entrySet(), 0); + Object migrationVersion = ObjectPath.evaluate(entry.getValue(), "migration_version.version"); + assertThat( + "project [" + projectId + ", index [" + entry.getKey() + "] should have been migrated", + migrationVersion, + equalTo(expectedMigrationVersion) + ); + } + }); + } +} diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java index db3ef6b6f2397..cfa4c189087b0 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java @@ -482,9 +482,10 @@ public void prepareIndexIfNeededThenExecute(final Consumer consumer, consumer.accept(new IllegalStateException(error)); } else { logger.info( - "security index does not exist, creating [{}] with alias [{}]", + "security index does not exist, creating [{}] with alias [{}] in project [{}]", this.concreteIndexName, - descriptorForVersion.getAliasName() + descriptorForVersion.getAliasName(), + this.projectId ); // Although `TransportCreateIndexAction` is capable of automatically applying the right mappings, settings and // aliases diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java index 17c9b3270dff9..7a6b842366dbd 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java @@ -17,8 +17,8 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectDeletedListener; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.TimeValue; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -47,6 +47,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -264,20 +265,23 @@ protected static List getDuplicateRoleMappingNames(ExpressionRoleMapping public static class Manager { - private static final int MAX_SECURITY_MIGRATION_RETRY_COUNT = 10; + private static final int MAX_SECURITY_MIGRATION_ATTEMPT_COUNT = 10; private final PersistentTasksService persistentTasksService; private final SecuritySystemIndices systemIndices; // Node local retry count for migration jobs that's checked only on the master node to make sure - // submit migration jobs doesn't get out of hand and retries forever if they fail. Reset by a - // restart or master node change. - private final AtomicInteger nodeLocalMigrationRetryCount; + // submit migration jobs doesn't get out of hand and retries forever if they fail. + // Reset by a restart or master node change. + // This tracks the number of tasks that have been started up until the migration is complete (at which point it is cleared) + // It just tracks attempts to start jobs (not whether they completed successfully) + private final Map taskSubmissionAttemptCounter; public Manager(ClusterService clusterService, PersistentTasksService persistentTasksService, SecuritySystemIndices systemIndices) { this.persistentTasksService = persistentTasksService; this.systemIndices = systemIndices; - this.nodeLocalMigrationRetryCount = new AtomicInteger(0); + this.taskSubmissionAttemptCounter = new ConcurrentHashMap<>(); + new ProjectDeletedListener(this::projectDeleted).attach(clusterService); systemIndices.getMainIndexManager().addStateListener((projectId, oldState, newState) -> { // Only consider applying migrations if it's the master node and the security index exists if (clusterService.state().nodes().isLocalNodeElectedMaster() && newState.indexExists()) { @@ -286,12 +290,10 @@ public Manager(ClusterService clusterService, PersistentTasksService persistentT }); } - @FixForMultiProject - // TODO : The migration task needs to be project aware private void applyPendingSecurityMigrations(ProjectId projectId, SecurityIndexManager.IndexState newState) { // If no migrations have been applied and the security index is on the latest version (new index), all migrations can be skipped if (newState.migrationsVersion == 0 && newState.createdOnLatestVersion) { - submitPersistentMigrationTask(SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey(), false); + submitPersistentMigrationTask(projectId, SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey(), false); return; } @@ -300,26 +302,27 @@ private void applyPendingSecurityMigrations(ProjectId projectId, SecurityIndexMa ); // Check if next migration that has not been applied is eligible to run on the current cluster - if (nextMigration == null - || systemIndices.getMainIndexManager() - .getProject(projectId) - .isEligibleSecurityMigration(nextMigration.getValue()) == false) { + if (nextMigration == null || newState.isEligibleSecurityMigration(nextMigration.getValue()) == false) { // Reset retry counter if all eligible migrations have been applied successfully - nodeLocalMigrationRetryCount.set(0); - } else if (nodeLocalMigrationRetryCount.get() > MAX_SECURITY_MIGRATION_RETRY_COUNT) { - logger.warn("Security migration failed [" + nodeLocalMigrationRetryCount.get() + "] times, restart node to retry again."); - } else if (systemIndices.getMainIndexManager().getProject(projectId).isReadyForSecurityMigration(nextMigration.getValue())) { - submitPersistentMigrationTask(newState.migrationsVersion); + resetNumberOfAttempts(projectId); + } else { + var retryCount = getNumberOfAttempts(projectId); + if (retryCount > MAX_SECURITY_MIGRATION_ATTEMPT_COUNT) { + logger.warn("Security migration attempted [" + retryCount + "] times, restart node to retry again."); + } else if (newState.isReadyForSecurityMigration(nextMigration.getValue())) { + submitPersistentMigrationTask(projectId, newState.migrationsVersion); + } } } - private void submitPersistentMigrationTask(int migrationsVersion) { - submitPersistentMigrationTask(migrationsVersion, true); + private void submitPersistentMigrationTask(ProjectId projectId, int migrationsVersion) { + submitPersistentMigrationTask(projectId, migrationsVersion, true); } - private void submitPersistentMigrationTask(int migrationsVersion, boolean securityMigrationNeeded) { - nodeLocalMigrationRetryCount.incrementAndGet(); - persistentTasksService.sendStartRequest( + private void submitPersistentMigrationTask(ProjectId projectId, int migrationsVersion, boolean securityMigrationNeeded) { + incrementAttemptCount(projectId); + persistentTasksService.sendProjectStartRequest( + projectId, SecurityMigrationTaskParams.TASK_NAME, SecurityMigrationTaskParams.TASK_NAME, new SecurityMigrationTaskParams(migrationsVersion, securityMigrationNeeded), @@ -330,7 +333,7 @@ private void submitPersistentMigrationTask(int migrationsVersion, boolean securi // Do nothing if the task is already in progress if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { // Do not count ResourceAlreadyExistsException as failure - nodeLocalMigrationRetryCount.decrementAndGet(); + decrementAttemptCount(projectId); } else { logger.warn("Submit security migration task failed: " + exception.getCause()); } @@ -347,5 +350,30 @@ public PersistentTasksExecutor getPersistentTasksExecutor(Client client, Thre SecurityMigrations.MIGRATIONS_BY_VERSION ); } + + private int getNumberOfAttempts(ProjectId project) { + var retryCount = taskSubmissionAttemptCounter.get(project); + if (retryCount == null) { + return 0; + } + return retryCount.get(); + } + + private void projectDeleted(ProjectId projectId) { + resetNumberOfAttempts(projectId); + } + + private void resetNumberOfAttempts(ProjectId project) { + taskSubmissionAttemptCounter.remove(project); + } + + private void incrementAttemptCount(ProjectId project) { + taskSubmissionAttemptCounter.computeIfAbsent(project, ignore -> new AtomicInteger(0)).incrementAndGet(); + } + + private void decrementAttemptCount(ProjectId project) { + taskSubmissionAttemptCounter.computeIfAbsent(project, ignore -> new AtomicInteger(0)).decrementAndGet(); + } + } }