Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
Expand All @@ -30,7 +28,6 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
Expand Down Expand Up @@ -61,11 +58,9 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.env.Environment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
Expand Down Expand Up @@ -224,7 +219,6 @@
import org.elasticsearch.xpack.core.security.authz.store.ReservedRolesStore;
import org.elasticsearch.xpack.core.security.authz.store.RoleRetrievalResult;
import org.elasticsearch.xpack.core.security.support.Automatons;
import org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams;
import org.elasticsearch.xpack.core.security.user.AnonymousUser;
import org.elasticsearch.xpack.core.ssl.SSLConfigurationSettings;
import org.elasticsearch.xpack.core.ssl.SSLService;
Expand Down Expand Up @@ -419,8 +413,6 @@
import org.elasticsearch.xpack.security.support.QueryableBuiltInRolesProviderFactory;
import org.elasticsearch.xpack.security.support.QueryableBuiltInRolesSynchronizer;
import org.elasticsearch.xpack.security.support.ReloadableSecurityComponent;
import org.elasticsearch.xpack.security.support.SecurityIndexManager;
import org.elasticsearch.xpack.security.support.SecurityMigrationExecutor;
import org.elasticsearch.xpack.security.support.SecurityMigrations;
import org.elasticsearch.xpack.security.support.SecuritySystemIndices;
import org.elasticsearch.xpack.security.transport.SecurityHttpSettings;
Expand Down Expand Up @@ -449,7 +441,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -487,8 +478,6 @@ public class Security extends Plugin

public static final String SECURITY_CRYPTO_THREAD_POOL_NAME = XPackField.SECURITY + "-crypto";

private static final int MAX_SECURITY_MIGRATION_RETRY_COUNT = 10;

// TODO: ip filtering does not actually track license usage yet
public static final LicensedFeature.Momentary IP_FILTERING_FEATURE = LicensedFeature.momentaryLenient(
null,
Expand Down Expand Up @@ -628,7 +617,6 @@ public class Security extends Plugin
private final SetOnce<GetBuiltinPrivilegesResponseTranslator> getBuiltinPrivilegesResponseTranslator = new SetOnce<>();
private final SetOnce<HasPrivilegesRequestBuilderFactory> hasPrivilegesRequestBuilderFactory = new SetOnce<>();

private final SetOnce<PersistentTasksService> persistentTasksService = new SetOnce<>();
private final SetOnce<FileRolesStore> fileRolesStore = new SetOnce<>();
private final SetOnce<OperatorPrivileges.OperatorPrivilegesService> operatorPrivilegesService = new SetOnce<>();
private final SetOnce<ReservedRoleMappingAction> reservedRoleMappingAction = new SetOnce<>();
Expand All @@ -640,13 +628,8 @@ public class Security extends Plugin
private final SetOnce<FileRoleValidator> fileRoleValidator = new SetOnce<>();
private final SetOnce<SecondaryAuthActions> secondaryAuthActions = new SetOnce<>();
private final SetOnce<QueryableBuiltInRolesProviderFactory> queryableRolesProviderFactory = new SetOnce<>();
private final SetOnce<SecurityMigrationExecutor> securityMigrationExecutor = new SetOnce<>();

// Node local retry count for migration jobs that's checked only on the master node to make sure
// submit migration jobs doesn't get out of hand and retries forever if they fail. Reset by a
// restart or master node change.
private final AtomicInteger nodeLocalMigrationRetryCount = new AtomicInteger(0);

private final SetOnce<SecurityMigrations.Manager> migrationManager = new SetOnce<>();
private final SetOnce<List<Closeable>> closableComponents = new SetOnce<>();

public Security(Settings settings) {
Expand Down Expand Up @@ -796,23 +779,7 @@ Collection<Object> createComponents(

systemIndices.init(client, featureService, clusterService, projectResolver);

this.securityMigrationExecutor.set(
new SecurityMigrationExecutor(
SecurityMigrationTaskParams.TASK_NAME,
threadPool.executor(ThreadPool.Names.MANAGEMENT),
systemIndices.getMainIndexManager(),
client,
SecurityMigrations.MIGRATIONS_BY_VERSION
)
);
this.persistentTasksService.set(persistentTasksService);

systemIndices.getMainIndexManager().addStateListener((projectId, oldState, newState) -> {
// Only consider applying migrations if it's the master node and the security index exists
if (clusterService.state().nodes().isLocalNodeElectedMaster() && newState.indexExists()) {
applyPendingSecurityMigrations(projectId, newState);
}
});
this.migrationManager.set(new SecurityMigrations.Manager(clusterService, persistentTasksService, systemIndices));

scriptServiceReference.set(scriptService);
// We need to construct the checks here while the secure settings are still available.
Expand Down Expand Up @@ -1362,57 +1329,6 @@ private static boolean isInternalExtension(SecurityExtension extension) {
return canonicalName.startsWith("org.elasticsearch.xpack.") || canonicalName.startsWith("co.elastic.elasticsearch.");
}

@FixForMultiProject
// TODO : The migration task needs to be project aware
private void applyPendingSecurityMigrations(ProjectId projectId, SecurityIndexManager.IndexState newState) {
// If no migrations have been applied and the security index is on the latest version (new index), all migrations can be skipped
if (newState.migrationsVersion == 0 && newState.createdOnLatestVersion) {
submitPersistentMigrationTask(SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey(), false);
return;
}

Map.Entry<Integer, SecurityMigrations.SecurityMigration> nextMigration = SecurityMigrations.MIGRATIONS_BY_VERSION.higherEntry(
newState.migrationsVersion
);

// Check if next migration that has not been applied is eligible to run on the current cluster
if (nextMigration == null
|| systemIndices.getMainIndexManager().getProject(projectId).isEligibleSecurityMigration(nextMigration.getValue()) == false) {
// Reset retry counter if all eligible migrations have been applied successfully
nodeLocalMigrationRetryCount.set(0);
} else if (nodeLocalMigrationRetryCount.get() > MAX_SECURITY_MIGRATION_RETRY_COUNT) {
logger.warn("Security migration failed [" + nodeLocalMigrationRetryCount.get() + "] times, restart node to retry again.");
} else if (systemIndices.getMainIndexManager().getProject(projectId).isReadyForSecurityMigration(nextMigration.getValue())) {
submitPersistentMigrationTask(newState.migrationsVersion);
}
}

private void submitPersistentMigrationTask(int migrationsVersion) {
submitPersistentMigrationTask(migrationsVersion, true);
}

private void submitPersistentMigrationTask(int migrationsVersion, boolean securityMigrationNeeded) {
nodeLocalMigrationRetryCount.incrementAndGet();
persistentTasksService.get()
.sendStartRequest(
SecurityMigrationTaskParams.TASK_NAME,
SecurityMigrationTaskParams.TASK_NAME,
new SecurityMigrationTaskParams(migrationsVersion, securityMigrationNeeded),
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
ActionListener.wrap((response) -> {
logger.debug("Security migration task submitted");
}, (exception) -> {
// Do nothing if the task is already in progress
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
// Do not count ResourceAlreadyExistsException as failure
nodeLocalMigrationRetryCount.decrementAndGet();
} else {
logger.warn("Submit security migration task failed: " + exception.getCause());
}
})
);
}

private static Executor buildRoleBuildingExecutor(ThreadPool threadPool, Settings settings) {
final int allocatedProcessors = EsExecutors.allocatedProcessors(settings);
final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner("build_roles", allocatedProcessors, threadPool.generic());
Expand Down Expand Up @@ -2558,7 +2474,8 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
SettingsModule settingsModule,
IndexNameExpressionResolver expressionResolver
) {
return this.securityMigrationExecutor.get() != null ? List.of(this.securityMigrationExecutor.get()) : List.of();
final SecurityMigrations.Manager manager = this.migrationManager.get();
return manager == null ? List.of() : List.of(manager.getPersistentTasksExecutor(client, threadPool));
}

List<ReservedProjectStateHandler<?>> reservedProjectStateHandlers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,36 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.UpdateByQueryAction;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingAction;
import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingRequestBuilder;
import org.elasticsearch.xpack.core.security.action.rolemapping.DeleteRoleMappingResponse;
import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsAction;
import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsRequestBuilder;
import org.elasticsearch.xpack.core.security.action.rolemapping.GetRoleMappingsResponse;
import org.elasticsearch.xpack.core.security.authc.support.mapper.ExpressionRoleMapping;
import org.elasticsearch.xpack.core.security.support.SecurityMigrationTaskParams;
import org.elasticsearch.xpack.security.support.SecurityIndexManager.IndexState;

import java.util.Arrays;
Expand All @@ -37,6 +47,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.elasticsearch.xpack.core.ClientHelper.SECURITY_ORIGIN;
Expand Down Expand Up @@ -250,4 +261,91 @@ protected static List<String> getDuplicateRoleMappingNames(ExpressionRoleMapping
.toList();
}
}

public static class Manager {

private static final int MAX_SECURITY_MIGRATION_RETRY_COUNT = 10;

private final PersistentTasksService persistentTasksService;
private final SecuritySystemIndices systemIndices;

// Node local retry count for migration jobs that's checked only on the master node to make sure
// submit migration jobs doesn't get out of hand and retries forever if they fail. Reset by a
// restart or master node change.
private final AtomicInteger nodeLocalMigrationRetryCount;

public Manager(ClusterService clusterService, PersistentTasksService persistentTasksService, SecuritySystemIndices systemIndices) {
this.persistentTasksService = persistentTasksService;
this.systemIndices = systemIndices;
this.nodeLocalMigrationRetryCount = new AtomicInteger(0);
systemIndices.getMainIndexManager().addStateListener((projectId, oldState, newState) -> {
// Only consider applying migrations if it's the master node and the security index exists
if (clusterService.state().nodes().isLocalNodeElectedMaster() && newState.indexExists()) {
applyPendingSecurityMigrations(projectId, newState);
}
});
}

@FixForMultiProject
// TODO : The migration task needs to be project aware
private void applyPendingSecurityMigrations(ProjectId projectId, SecurityIndexManager.IndexState newState) {
// If no migrations have been applied and the security index is on the latest version (new index), all migrations can be skipped
if (newState.migrationsVersion == 0 && newState.createdOnLatestVersion) {
submitPersistentMigrationTask(SecurityMigrations.MIGRATIONS_BY_VERSION.lastKey(), false);
return;
}

Map.Entry<Integer, SecurityMigrations.SecurityMigration> nextMigration = SecurityMigrations.MIGRATIONS_BY_VERSION.higherEntry(
newState.migrationsVersion
);

// Check if next migration that has not been applied is eligible to run on the current cluster
if (nextMigration == null
|| systemIndices.getMainIndexManager()
.getProject(projectId)
.isEligibleSecurityMigration(nextMigration.getValue()) == false) {
// Reset retry counter if all eligible migrations have been applied successfully
nodeLocalMigrationRetryCount.set(0);
} else if (nodeLocalMigrationRetryCount.get() > MAX_SECURITY_MIGRATION_RETRY_COUNT) {
logger.warn("Security migration failed [" + nodeLocalMigrationRetryCount.get() + "] times, restart node to retry again.");
} else if (systemIndices.getMainIndexManager().getProject(projectId).isReadyForSecurityMigration(nextMigration.getValue())) {
submitPersistentMigrationTask(newState.migrationsVersion);
}
}

private void submitPersistentMigrationTask(int migrationsVersion) {
submitPersistentMigrationTask(migrationsVersion, true);
}

private void submitPersistentMigrationTask(int migrationsVersion, boolean securityMigrationNeeded) {
nodeLocalMigrationRetryCount.incrementAndGet();
persistentTasksService.sendStartRequest(
SecurityMigrationTaskParams.TASK_NAME,
SecurityMigrationTaskParams.TASK_NAME,
new SecurityMigrationTaskParams(migrationsVersion, securityMigrationNeeded),
TimeValue.THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */,
ActionListener.wrap((response) -> {
logger.debug("Security migration task submitted");
}, (exception) -> {
// Do nothing if the task is already in progress
if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) {
// Do not count ResourceAlreadyExistsException as failure
nodeLocalMigrationRetryCount.decrementAndGet();
} else {
logger.warn("Submit security migration task failed: " + exception.getCause());
}
})
);
}

public PersistentTasksExecutor<?> getPersistentTasksExecutor(Client client, ThreadPool threadPool) {
return new SecurityMigrationExecutor(
SecurityMigrationTaskParams.TASK_NAME,
threadPool.executor(ThreadPool.Names.MANAGEMENT),
systemIndices.getMainIndexManager(),
client,
SecurityMigrations.MIGRATIONS_BY_VERSION
);
}
}
}