1616import org .elasticsearch .ElasticsearchException ;
1717import org .elasticsearch .ElasticsearchSecurityException ;
1818import org .elasticsearch .ElasticsearchStatusException ;
19- import org .elasticsearch .ExceptionsHelper ;
20- import org .elasticsearch .ResourceAlreadyExistsException ;
2119import org .elasticsearch .TransportVersion ;
2220import org .elasticsearch .action .ActionListener ;
2321import org .elasticsearch .action .ActionResponse ;
3028import org .elasticsearch .cluster .ClusterState ;
3129import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
3230import org .elasticsearch .cluster .metadata .IndexTemplateMetadata ;
33- import org .elasticsearch .cluster .metadata .ProjectId ;
3431import org .elasticsearch .cluster .node .DiscoveryNode ;
3532import org .elasticsearch .cluster .node .DiscoveryNodes ;
3633import org .elasticsearch .cluster .project .ProjectResolver ;
6158import org .elasticsearch .common .util .concurrent .ThreadContext ;
6259import org .elasticsearch .common .util .concurrent .ThrottledTaskRunner ;
6360import org .elasticsearch .common .util .set .Sets ;
64- import org .elasticsearch .core .FixForMultiProject ;
6561import org .elasticsearch .core .IOUtils ;
6662import org .elasticsearch .core .Nullable ;
6763import org .elasticsearch .core .Releasable ;
68- import org .elasticsearch .core .TimeValue ;
6964import org .elasticsearch .env .Environment ;
7065import org .elasticsearch .features .FeatureService ;
7166import org .elasticsearch .features .NodeFeature ;
224219import org .elasticsearch .xpack .core .security .authz .store .ReservedRolesStore ;
225220import org .elasticsearch .xpack .core .security .authz .store .RoleRetrievalResult ;
226221import org .elasticsearch .xpack .core .security .support .Automatons ;
227- import org .elasticsearch .xpack .core .security .support .SecurityMigrationTaskParams ;
228222import org .elasticsearch .xpack .core .security .user .AnonymousUser ;
229223import org .elasticsearch .xpack .core .ssl .SSLConfigurationSettings ;
230224import org .elasticsearch .xpack .core .ssl .SSLService ;
419413import org .elasticsearch .xpack .security .support .QueryableBuiltInRolesProviderFactory ;
420414import org .elasticsearch .xpack .security .support .QueryableBuiltInRolesSynchronizer ;
421415import org .elasticsearch .xpack .security .support .ReloadableSecurityComponent ;
422- import org .elasticsearch .xpack .security .support .SecurityIndexManager ;
423- import org .elasticsearch .xpack .security .support .SecurityMigrationExecutor ;
424416import org .elasticsearch .xpack .security .support .SecurityMigrations ;
425417import org .elasticsearch .xpack .security .support .SecuritySystemIndices ;
426418import org .elasticsearch .xpack .security .transport .SecurityHttpSettings ;
449441import java .util .Objects ;
450442import java .util .Set ;
451443import java .util .concurrent .Executor ;
452- import java .util .concurrent .atomic .AtomicInteger ;
453444import java .util .function .BiConsumer ;
454445import java .util .function .Function ;
455446import java .util .function .Predicate ;
@@ -487,8 +478,6 @@ public class Security extends Plugin
487478
488479 public static final String SECURITY_CRYPTO_THREAD_POOL_NAME = XPackField .SECURITY + "-crypto" ;
489480
490- private static final int MAX_SECURITY_MIGRATION_RETRY_COUNT = 10 ;
491-
492481 // TODO: ip filtering does not actually track license usage yet
493482 public static final LicensedFeature .Momentary IP_FILTERING_FEATURE = LicensedFeature .momentaryLenient (
494483 null ,
@@ -628,7 +617,6 @@ public class Security extends Plugin
628617 private final SetOnce <GetBuiltinPrivilegesResponseTranslator > getBuiltinPrivilegesResponseTranslator = new SetOnce <>();
629618 private final SetOnce <HasPrivilegesRequestBuilderFactory > hasPrivilegesRequestBuilderFactory = new SetOnce <>();
630619
631- private final SetOnce <PersistentTasksService > persistentTasksService = new SetOnce <>();
632620 private final SetOnce <FileRolesStore > fileRolesStore = new SetOnce <>();
633621 private final SetOnce <OperatorPrivileges .OperatorPrivilegesService > operatorPrivilegesService = new SetOnce <>();
634622 private final SetOnce <ReservedRoleMappingAction > reservedRoleMappingAction = new SetOnce <>();
@@ -640,13 +628,8 @@ public class Security extends Plugin
640628 private final SetOnce <FileRoleValidator > fileRoleValidator = new SetOnce <>();
641629 private final SetOnce <SecondaryAuthActions > secondaryAuthActions = new SetOnce <>();
642630 private final SetOnce <QueryableBuiltInRolesProviderFactory > queryableRolesProviderFactory = new SetOnce <>();
643- private final SetOnce <SecurityMigrationExecutor > securityMigrationExecutor = new SetOnce <>();
644-
645- // Node local retry count for migration jobs that's checked only on the master node to make sure
646- // submit migration jobs doesn't get out of hand and retries forever if they fail. Reset by a
647- // restart or master node change.
648- private final AtomicInteger nodeLocalMigrationRetryCount = new AtomicInteger (0 );
649631
632+ private final SetOnce <SecurityMigrations .Manager > migrationManager = new SetOnce <>();
650633 private final SetOnce <List <Closeable >> closableComponents = new SetOnce <>();
651634
652635 public Security (Settings settings ) {
@@ -796,23 +779,7 @@ Collection<Object> createComponents(
796779
797780 systemIndices .init (client , featureService , clusterService , projectResolver );
798781
799- this .securityMigrationExecutor .set (
800- new SecurityMigrationExecutor (
801- SecurityMigrationTaskParams .TASK_NAME ,
802- threadPool .executor (ThreadPool .Names .MANAGEMENT ),
803- systemIndices .getMainIndexManager (),
804- client ,
805- SecurityMigrations .MIGRATIONS_BY_VERSION
806- )
807- );
808- this .persistentTasksService .set (persistentTasksService );
809-
810- systemIndices .getMainIndexManager ().addStateListener ((projectId , oldState , newState ) -> {
811- // Only consider applying migrations if it's the master node and the security index exists
812- if (clusterService .state ().nodes ().isLocalNodeElectedMaster () && newState .indexExists ()) {
813- applyPendingSecurityMigrations (projectId , newState );
814- }
815- });
782+ this .migrationManager .set (new SecurityMigrations .Manager (clusterService , persistentTasksService , systemIndices ));
816783
817784 scriptServiceReference .set (scriptService );
818785 // We need to construct the checks here while the secure settings are still available.
@@ -1362,57 +1329,6 @@ private static boolean isInternalExtension(SecurityExtension extension) {
13621329 return canonicalName .startsWith ("org.elasticsearch.xpack." ) || canonicalName .startsWith ("co.elastic.elasticsearch." );
13631330 }
13641331
1365- @ FixForMultiProject
1366- // TODO : The migration task needs to be project aware
1367- private void applyPendingSecurityMigrations (ProjectId projectId , SecurityIndexManager .IndexState newState ) {
1368- // If no migrations have been applied and the security index is on the latest version (new index), all migrations can be skipped
1369- if (newState .migrationsVersion == 0 && newState .createdOnLatestVersion ) {
1370- submitPersistentMigrationTask (SecurityMigrations .MIGRATIONS_BY_VERSION .lastKey (), false );
1371- return ;
1372- }
1373-
1374- Map .Entry <Integer , SecurityMigrations .SecurityMigration > nextMigration = SecurityMigrations .MIGRATIONS_BY_VERSION .higherEntry (
1375- newState .migrationsVersion
1376- );
1377-
1378- // Check if next migration that has not been applied is eligible to run on the current cluster
1379- if (nextMigration == null
1380- || systemIndices .getMainIndexManager ().getProject (projectId ).isEligibleSecurityMigration (nextMigration .getValue ()) == false ) {
1381- // Reset retry counter if all eligible migrations have been applied successfully
1382- nodeLocalMigrationRetryCount .set (0 );
1383- } else if (nodeLocalMigrationRetryCount .get () > MAX_SECURITY_MIGRATION_RETRY_COUNT ) {
1384- logger .warn ("Security migration failed [" + nodeLocalMigrationRetryCount .get () + "] times, restart node to retry again." );
1385- } else if (systemIndices .getMainIndexManager ().getProject (projectId ).isReadyForSecurityMigration (nextMigration .getValue ())) {
1386- submitPersistentMigrationTask (newState .migrationsVersion );
1387- }
1388- }
1389-
1390- private void submitPersistentMigrationTask (int migrationsVersion ) {
1391- submitPersistentMigrationTask (migrationsVersion , true );
1392- }
1393-
1394- private void submitPersistentMigrationTask (int migrationsVersion , boolean securityMigrationNeeded ) {
1395- nodeLocalMigrationRetryCount .incrementAndGet ();
1396- persistentTasksService .get ()
1397- .sendStartRequest (
1398- SecurityMigrationTaskParams .TASK_NAME ,
1399- SecurityMigrationTaskParams .TASK_NAME ,
1400- new SecurityMigrationTaskParams (migrationsVersion , securityMigrationNeeded ),
1401- TimeValue .THIRTY_SECONDS /* TODO should this be configurable? longer by default? infinite? */ ,
1402- ActionListener .wrap ((response ) -> {
1403- logger .debug ("Security migration task submitted" );
1404- }, (exception ) -> {
1405- // Do nothing if the task is already in progress
1406- if (ExceptionsHelper .unwrapCause (exception ) instanceof ResourceAlreadyExistsException ) {
1407- // Do not count ResourceAlreadyExistsException as failure
1408- nodeLocalMigrationRetryCount .decrementAndGet ();
1409- } else {
1410- logger .warn ("Submit security migration task failed: " + exception .getCause ());
1411- }
1412- })
1413- );
1414- }
1415-
14161332 private static Executor buildRoleBuildingExecutor (ThreadPool threadPool , Settings settings ) {
14171333 final int allocatedProcessors = EsExecutors .allocatedProcessors (settings );
14181334 final ThrottledTaskRunner throttledTaskRunner = new ThrottledTaskRunner ("build_roles" , allocatedProcessors , threadPool .generic ());
@@ -2558,7 +2474,8 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
25582474 SettingsModule settingsModule ,
25592475 IndexNameExpressionResolver expressionResolver
25602476 ) {
2561- return this .securityMigrationExecutor .get () != null ? List .of (this .securityMigrationExecutor .get ()) : List .of ();
2477+ final SecurityMigrations .Manager manager = this .migrationManager .get ();
2478+ return manager == null ? List .of () : List .of (manager .getPersistentTasksExecutor (client , threadPool ));
25622479 }
25632480
25642481 List <ReservedProjectStateHandler <?>> reservedProjectStateHandlers () {
0 commit comments