1616import org .elasticsearch .action .support .master .MasterNodeRequest ;
1717import org .elasticsearch .client .internal .Client ;
1818import org .elasticsearch .cluster .ClusterChangedEvent ;
19- import org .elasticsearch .cluster .ClusterState ;
2019import org .elasticsearch .cluster .ClusterStateListener ;
20+ import org .elasticsearch .cluster .metadata .ProjectId ;
21+ import org .elasticsearch .cluster .project .ProjectResolver ;
2122import org .elasticsearch .cluster .service .ClusterService ;
23+ import org .elasticsearch .core .NotMultiProjectCapable ;
2224import org .elasticsearch .ingest .EnterpriseGeoIpTask .EnterpriseGeoIpTaskParams ;
2325import org .elasticsearch .license .License ;
2426import org .elasticsearch .license .LicenseStateListener ;
3133import org .elasticsearch .xpack .core .XPackField ;
3234
3335import java .util .Objects ;
36+ import java .util .concurrent .ConcurrentHashMap ;
37+ import java .util .concurrent .ConcurrentMap ;
3438
3539import static org .elasticsearch .ingest .EnterpriseGeoIpTask .ENTERPRISE_GEOIP_DOWNLOADER ;
3640
3741public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateListener , ClusterStateListener {
3842 private static final Logger logger = LogManager .getLogger (EnterpriseGeoIpDownloaderLicenseListener .class );
39- // Note: This custom type is GeoIpMetadata .TYPE, but that class is not exposed to this plugin
43+ // Note: This custom type is IngestGeoIpMetadata .TYPE, but that class is not exposed to this plugin
4044 static final String INGEST_GEOIP_CUSTOM_METADATA_TYPE = "ingest_geoip" ;
4145
4246 private final PersistentTasksService persistentTasksService ;
@@ -47,18 +51,21 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis
4751 XPackField .ENTERPRISE_GEOIP_DOWNLOADER ,
4852 License .OperationMode .PLATINUM
4953 );
50- private volatile boolean licenseIsValid = false ;
51- private volatile boolean hasIngestGeoIpMetadata = false ;
54+ private final ConcurrentMap <ProjectId , Boolean > licenseIsValid = new ConcurrentHashMap <>();
55+ private final ConcurrentMap <ProjectId , Boolean > hasIngestGeoIpMetadata = new ConcurrentHashMap <>();
56+ private final ProjectResolver projectResolver ;
5257
5358 protected EnterpriseGeoIpDownloaderLicenseListener (
5459 Client client ,
5560 ClusterService clusterService ,
5661 ThreadPool threadPool ,
57- XPackLicenseState licenseState
62+ XPackLicenseState licenseState ,
63+ ProjectResolver projectResolver
5864 ) {
5965 this .persistentTasksService = new PersistentTasksService (clusterService , threadPool , client );
6066 this .clusterService = clusterService ;
6167 this .licenseState = licenseState ;
68+ this .projectResolver = projectResolver ;
6269 }
6370
6471 private volatile boolean licenseStateListenerRegistered ;
@@ -74,47 +81,55 @@ void listenForLicenseStateChanges() {
7481 licenseState .addListener (this );
7582 }
7683
84+ @ NotMultiProjectCapable (description = "Replace DEFAULT project after enterprise license is supported in serverless and project-aware" )
7785 @ Override
7886 public void licenseStateChanged () {
79- licenseIsValid = ENTERPRISE_GEOIP_FEATURE .checkWithoutTracking (licenseState );
80- maybeUpdateTaskState (clusterService .state ());
87+ licenseIsValid .put (ProjectId .DEFAULT , ENTERPRISE_GEOIP_FEATURE .checkWithoutTracking (licenseState ));
88+ final boolean isLocalNodeMaster = clusterService .state ().nodes ().isLocalNodeElectedMaster ();
89+ maybeUpdateTaskState (ProjectId .DEFAULT , isLocalNodeMaster );
8190 }
8291
8392 @ Override
8493 public void clusterChanged (ClusterChangedEvent event ) {
85- hasIngestGeoIpMetadata = event .state ().metadata ().getProject ().custom (INGEST_GEOIP_CUSTOM_METADATA_TYPE ) != null ;
86- final boolean ingestGeoIpCustomMetaChangedInEvent = event .metadataChanged ()
87- && event .changedCustomProjectMetadataSet ().contains (INGEST_GEOIP_CUSTOM_METADATA_TYPE );
8894 final boolean masterNodeChanged = Objects .equals (
8995 event .state ().nodes ().getMasterNode (),
9096 event .previousState ().nodes ().getMasterNode ()
9197 ) == false ;
92- /*
93- * We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState if this cluster change
94- * event involved the modification of custom geoip metadata OR a master node change
95- */
96- if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata )) {
97- maybeUpdateTaskState (event .state ());
98- }
98+ final boolean isLocalNodeMaster = event .state ().nodes ().isLocalNodeElectedMaster ();
99+ event .state ().metadata ().projects ().values ().forEach (projectMetadata -> {
100+ ProjectId projectId = projectMetadata .id ();
101+ final boolean hasMetadata = projectMetadata .custom (INGEST_GEOIP_CUSTOM_METADATA_TYPE ) != null ;
102+ hasIngestGeoIpMetadata .put (projectId , hasMetadata );
103+ final boolean ingestGeoIpCustomMetaChangedInEvent = event .metadataChanged ()
104+ && event .customMetadataChanged (projectId , INGEST_GEOIP_CUSTOM_METADATA_TYPE );
105+ /*
106+ * We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState
107+ * if this cluster change event involved the modification of custom geoip metadata OR a master node change
108+ */
109+ if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata .getOrDefault (projectId , false ))) {
110+ maybeUpdateTaskState (projectId , isLocalNodeMaster );
111+ }
112+ });
99113 }
100114
101- private void maybeUpdateTaskState (ClusterState state ) {
115+ private void maybeUpdateTaskState (ProjectId projectId , boolean isLocalNodeMaster ) {
102116 // We should only start/stop task from single node, master is the best as it will go through it anyway
103- if (state . nodes (). isLocalNodeElectedMaster () ) {
104- if (licenseIsValid ) {
105- if (hasIngestGeoIpMetadata ) {
106- ensureTaskStarted ();
117+ if (isLocalNodeMaster ) {
118+ if (licenseIsValid . getOrDefault ( projectId , false ) ) {
119+ if (hasIngestGeoIpMetadata . getOrDefault ( projectId , false ) ) {
120+ ensureTaskStarted (projectId );
107121 }
108122 } else {
109- ensureTaskStopped ();
123+ ensureTaskStopped (projectId );
110124 }
111125 }
112126 }
113127
114- private void ensureTaskStarted () {
115- assert licenseIsValid : "Task should never be started without valid license" ;
116- persistentTasksService .sendStartRequest (
117- ENTERPRISE_GEOIP_DOWNLOADER ,
128+ private void ensureTaskStarted (ProjectId projectId ) {
129+ assert licenseIsValid .getOrDefault (projectId , false ) : "Task should never be started without valid license" ;
130+ persistentTasksService .sendProjectStartRequest (
131+ projectId ,
132+ getTaskId (projectId , projectResolver .supportsMultipleProjects ()),
118133 ENTERPRISE_GEOIP_DOWNLOADER ,
119134 new EnterpriseGeoIpTaskParams (),
120135 MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
@@ -127,7 +142,7 @@ private void ensureTaskStarted() {
127142 );
128143 }
129144
130- private void ensureTaskStopped () {
145+ private void ensureTaskStopped (ProjectId projectId ) {
131146 ActionListener <PersistentTasksCustomMetadata .PersistentTask <?>> listener = ActionListener .wrap (
132147 r -> logger .debug ("Stopped enterprise geoip downloader task" ),
133148 e -> {
@@ -137,6 +152,15 @@ private void ensureTaskStopped() {
137152 }
138153 }
139154 );
140- persistentTasksService .sendRemoveRequest (ENTERPRISE_GEOIP_DOWNLOADER , MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT , listener );
155+ persistentTasksService .sendProjectRemoveRequest (
156+ projectId ,
157+ getTaskId (projectId , projectResolver .supportsMultipleProjects ()),
158+ MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
159+ listener
160+ );
161+ }
162+
163+ protected static String getTaskId (ProjectId projectId , boolean supportsMultipleProjects ) {
164+ return supportsMultipleProjects ? projectId + "/" + ENTERPRISE_GEOIP_DOWNLOADER : ENTERPRISE_GEOIP_DOWNLOADER ;
141165 }
142166}
0 commit comments