diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index 2b927b531b5d6..f92db78b32071 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -16,8 +16,6 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; @@ -30,7 +28,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; -import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.project.ProjectResolver; @@ -61,11 +58,9 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.env.Environment; import org.elasticsearch.features.FeatureService; import org.elasticsearch.features.NodeFeature; @@ -224,7 +219,6 @@ import org.elasticsearch.xpack.core.security.authz.store.ReservedRolesStore; import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult; import org.elasticsearch.xpack.core.security.support.Automatons; -import org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams; import org.elasticsearch.xpack.core.security.user.AnonymousUser; import org.elasticsearch.xpack.core.ssl.SSLConfigurationSettings; import org.elasticsearch.xpack.core.ssl.SSLService; @@ -419,8 +413,6 @@ import org.elasticsearch.xpack.security.support.QueryableBuiltInRolesProviderFactory; import org.elasticsearch.xpack.security.support.QueryableBuiltInRolesSynchronizer; import org.elasticsearch.xpack.security.support.ReloadableSecurityComponent; -import org.elasticsearch.xpack.security.support.SecurityIndexManager; -import org.elasticsearch.xpack.security.support.SecurityMigrationExecutor; import org.elasticsearch.xpack.security.support.SecurityMigrations; import org.elasticsearch.xpack.security.support.SecuritySystemIndices; import org.elasticsearch.xpack.security.transport.SecurityHttpSettings; @@ -449,7 +441,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; @@ -487,8 +478,6 @@ public class Security extends Plugin public static final String SECURITY_CRYPTO_THREAD_POOL_NAME = XPackField.SECURITY + "-crypto"; - private static final int MAX_SECURITY_MIGRATION_RETRY_COUNT = 10; - // TODO: ip filtering does not actually track license usage yet public static final LicensedFeature.Momentary IP_FILTERING_FEATURE = LicensedFeature.momentaryLenient( null, @@ -628,7 +617,6 @@ public class Security extends Plugin private final SetOnce getBuiltinPrivilegesResponseTranslator = new SetOnce<>(); private final SetOnce hasPrivilegesRequestBuilderFactory = new SetOnce<>(); - private final SetOnce persistentTasksService = new SetOnce<>(); private final SetOnce fileRolesStore = new SetOnce<>(); private final SetOnce operatorPrivilegesService = new SetOnce<>(); private final SetOnce reservedRoleMappingAction = new SetOnce<>(); @@ -640,13 +628,8 @@ public class Security extends Plugin private final SetOnce fileRoleValidator = new SetOnce<>(); private final SetOnce secondaryAuthActions = new SetOnce<>(); private final SetOnce queryableRolesProviderFactory = new SetOnce<>(); - private final SetOnce securityMigrationExecutor = new SetOnce<>(); - - // Node local retry count for migration jobs that's checked only on the master node to make sure - // submit migration jobs doesn't get out of hand and retries forever if they fail. Reset by a - // restart or master node change. - private final AtomicInteger nodeLocalMigrationRetryCount = new AtomicInteger(0); + private final SetOnce migrationManager = new SetOnce<>(); private final SetOnce> closableComponents = new SetOnce<>(); public Security(Settings settings) { @@ -796,23 +779,7 @@ Collection createComponents( systemIndices.init(client, featureService, clusterService, projectResolver); - this.securityMigrationExecutor.set( - new SecurityMigrationExecutor( - SecurityMigrationTaskParams.TASK_NAME, - threadPool.executor(ThreadPool.Names.MANAGEMENT), - systemIndices.getMainIndexManager(), - client, - SecurityMigrations.MIGRATIONS_BY_VERSION - ) - ); - this.persistentTasksService.set(persistentTasksService); - - systemIndices.getMainIndexManager().addStateListener((projectId, oldState, newState) -> { - // Only consider applying migrations if it's the master node and the security index exists - if (clusterService.state().nodes().isLocalNodeElectedMaster() && newState.indexExists()) { - applyPendingSecurityMigrations(projectId, newState); - } - }); + this.migrationManager.set(new SecurityMigrations.Manager(clusterService, persistentTasksService, systemIndices)); scriptServiceReference.set(scriptService); // We need to construct the checks here while the secure settings are still available. @@ -1362,57 +1329,6 @@ private static boolean isInternalExtension(SecurityExtension extension) { return canonicalName.startsWith("org.elasticsearch.xpack.") || canonicalName.startsWith("co.elastic.elasticsearch."); } - @FixForMultiProject - // TODO : The migration task needs to be project aware - private void applyPendingSecurityMigrations(ProjectId projectId, SecurityIndexManager.IndexState newState) { - // If no migrations have been applied and the security index is on the latest version (new index), all migrations can be skipped - if (newState.migrationsVersion == 0 && newState.createdOnLatestVersion) { - submitPersistentMigrationTask(SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey(), false); - return; - } - - Map.Entry nextMigration = SecurityMigrations.MIGRATIONS_BY_VERSION.higherEntry( - newState.migrationsVersion - ); - - // Check if next migration that has not been applied is eligible to run on the current cluster - if (nextMigration == null - || systemIndices.getMainIndexManager().getProject(projectId).isEligibleSecurityMigration(nextMigration.getValue()) == false) { - // Reset retry counter if all eligible migrations have been applied successfully - nodeLocalMigrationRetryCount.set(0); - } else if (nodeLocalMigrationRetryCount.get() > MAX_SECURITY_MIGRATION_RETRY_COUNT) { - logger.warn("Security migration failed [" + nodeLocalMigrationRetryCount.get() + "] times, restart node to retry again."); - } else if (systemIndices.getMainIndexManager().getProject(projectId).isReadyForSecurityMigration(nextMigration.getValue())) { - submitPersistentMigrationTask(newState.migrationsVersion); - } - } - - private void submitPersistentMigrationTask(int migrationsVersion) { - submitPersistentMigrationTask(migrationsVersion, true); - } - - private void submitPersistentMigrationTask(int migrationsVersion, boolean securityMigrationNeeded) { - nodeLocalMigrationRetryCount.incrementAndGet(); - persistentTasksService.get() - .sendStartRequest( - SecurityMigrationTaskParams.TASK_NAME, - SecurityMigrationTaskParams.TASK_NAME, - new SecurityMigrationTaskParams(migrationsVersion, securityMigrationNeeded), - TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, - ActionListener.wrap((response) -> { - logger.debug("Security migration task submitted"); - }, (exception) -> { - // Do nothing if the task is already in progress - if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { - // Do not count ResourceAlreadyExistsException as failure - nodeLocalMigrationRetryCount.decrementAndGet(); - } else { - logger.warn("Submit security migration task failed: " + exception.getCause()); - } - }) - ); - } - private static Executor buildRoleBuildingExecutor(ThreadPool threadPool, Settings settings) { final int allocatedProcessors = EsExecutors.allocatedProcessors(settings); final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("build_roles", allocatedProcessors, threadPool.generic()); @@ -2558,7 +2474,8 @@ public List> getPersistentTasksExecutor( SettingsModule settingsModule, IndexNameExpressionResolver expressionResolver ) { - return this.securityMigrationExecutor.get() != null ? List.of(this.securityMigrationExecutor.get()) : List.of(); + final SecurityMigrations.Manager manager = this.migrationManager.get(); + return manager == null ? List.of() : List.of(manager.getPersistentTasksExecutor(client, threadPool)); } List> reservedProjectStateHandlers() { diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java index 2830a755cde66..17c9b3270dff9 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityMigrations.java @@ -9,19 +9,28 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.core.FixForMultiProject; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.UpdateByQueryAction; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.persistent.PersistentTasksExecutor; +import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingAction; import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingRequestBuilder; import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingResponse; @@ -29,6 +38,7 @@ import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsRequestBuilder; import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsResponse; import org.elasticsearch.xpack.core.security.authc.support.mapper.ExpressionRoleMapping; +import org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams; import org.elasticsearch.xpack.security.support.SecurityIndexManager.IndexState; import java.util.Arrays; @@ -37,6 +47,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN; @@ -250,4 +261,91 @@ protected static List getDuplicateRoleMappingNames(ExpressionRoleMapping .toList(); } } + + public static class Manager { + + private static final int MAX_SECURITY_MIGRATION_RETRY_COUNT = 10; + + private final PersistentTasksService persistentTasksService; + private final SecuritySystemIndices systemIndices; + + // Node local retry count for migration jobs that's checked only on the master node to make sure + // submit migration jobs doesn't get out of hand and retries forever if they fail. Reset by a + // restart or master node change. + private final AtomicInteger nodeLocalMigrationRetryCount; + + public Manager(ClusterService clusterService, PersistentTasksService persistentTasksService, SecuritySystemIndices systemIndices) { + this.persistentTasksService = persistentTasksService; + this.systemIndices = systemIndices; + this.nodeLocalMigrationRetryCount = new AtomicInteger(0); + systemIndices.getMainIndexManager().addStateListener((projectId, oldState, newState) -> { + // Only consider applying migrations if it's the master node and the security index exists + if (clusterService.state().nodes().isLocalNodeElectedMaster() && newState.indexExists()) { + applyPendingSecurityMigrations(projectId, newState); + } + }); + } + + @FixForMultiProject + // TODO : The migration task needs to be project aware + private void applyPendingSecurityMigrations(ProjectId projectId, SecurityIndexManager.IndexState newState) { + // If no migrations have been applied and the security index is on the latest version (new index), all migrations can be skipped + if (newState.migrationsVersion == 0 && newState.createdOnLatestVersion) { + submitPersistentMigrationTask(SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey(), false); + return; + } + + Map.Entry nextMigration = SecurityMigrations.MIGRATIONS_BY_VERSION.higherEntry( + newState.migrationsVersion + ); + + // Check if next migration that has not been applied is eligible to run on the current cluster + if (nextMigration == null + || systemIndices.getMainIndexManager() + .getProject(projectId) + .isEligibleSecurityMigration(nextMigration.getValue()) == false) { + // Reset retry counter if all eligible migrations have been applied successfully + nodeLocalMigrationRetryCount.set(0); + } else if (nodeLocalMigrationRetryCount.get() > MAX_SECURITY_MIGRATION_RETRY_COUNT) { + logger.warn("Security migration failed [" + nodeLocalMigrationRetryCount.get() + "] times, restart node to retry again."); + } else if (systemIndices.getMainIndexManager().getProject(projectId).isReadyForSecurityMigration(nextMigration.getValue())) { + submitPersistentMigrationTask(newState.migrationsVersion); + } + } + + private void submitPersistentMigrationTask(int migrationsVersion) { + submitPersistentMigrationTask(migrationsVersion, true); + } + + private void submitPersistentMigrationTask(int migrationsVersion, boolean securityMigrationNeeded) { + nodeLocalMigrationRetryCount.incrementAndGet(); + persistentTasksService.sendStartRequest( + SecurityMigrationTaskParams.TASK_NAME, + SecurityMigrationTaskParams.TASK_NAME, + new SecurityMigrationTaskParams(migrationsVersion, securityMigrationNeeded), + TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */, + ActionListener.wrap((response) -> { + logger.debug("Security migration task submitted"); + }, (exception) -> { + // Do nothing if the task is already in progress + if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { + // Do not count ResourceAlreadyExistsException as failure + nodeLocalMigrationRetryCount.decrementAndGet(); + } else { + logger.warn("Submit security migration task failed: " + exception.getCause()); + } + }) + ); + } + + public PersistentTasksExecutor getPersistentTasksExecutor(Client client, ThreadPool threadPool) { + return new SecurityMigrationExecutor( + SecurityMigrationTaskParams.TASK_NAME, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + systemIndices.getMainIndexManager(), + client, + SecurityMigrations.MIGRATIONS_BY_VERSION + ); + } + } }