1616import org .elasticsearch .action .support .master .MasterNodeRequest ;
1717import org .elasticsearch .client .internal .Client ;
1818import org .elasticsearch .cluster .ClusterChangedEvent ;
19- import org .elasticsearch .cluster .ClusterState ;
2019import org .elasticsearch .cluster .ClusterStateListener ;
20+ import org .elasticsearch .cluster .ProjectState ;
21+ import org .elasticsearch .cluster .metadata .ProjectId ;
22+ import org .elasticsearch .cluster .project .ProjectResolver ;
2123import org .elasticsearch .cluster .service .ClusterService ;
24+ import org .elasticsearch .core .FixForMultiProject ;
2225import org .elasticsearch .ingest .EnterpriseGeoIpTask .EnterpriseGeoIpTaskParams ;
2326import org .elasticsearch .license .License ;
2427import org .elasticsearch .license .LicenseStateListener ;
3134import org .elasticsearch .xpack .core .XPackField ;
3235
3336import java .util .Objects ;
37+ import java .util .concurrent .ConcurrentHashMap ;
38+ import java .util .concurrent .ConcurrentMap ;
3439
3540import static org .elasticsearch .ingest .EnterpriseGeoIpTask .ENTERPRISE_GEOIP_DOWNLOADER ;
3641
3742public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateListener , ClusterStateListener {
3843 private static final Logger logger = LogManager .getLogger (EnterpriseGeoIpDownloaderLicenseListener .class );
39- // Note: This custom type is GeoIpMetadata .TYPE, but that class is not exposed to this plugin
44+ // Note: This custom type is IngestGeoIpMetadata .TYPE, but that class is not exposed to this plugin
4045 static final String INGEST_GEOIP_CUSTOM_METADATA_TYPE = "ingest_geoip" ;
4146
4247 private final PersistentTasksService persistentTasksService ;
@@ -47,18 +52,21 @@ public class EnterpriseGeoIpDownloaderLicenseListener implements LicenseStateLis
4752 XPackField .ENTERPRISE_GEOIP_DOWNLOADER ,
4853 License .OperationMode .PLATINUM
4954 );
50- private volatile boolean licenseIsValid = false ;
51- private volatile boolean hasIngestGeoIpMetadata = false ;
55+ private final ConcurrentMap <ProjectId , Boolean > licenseIsValid = new ConcurrentHashMap <>();
56+ private final ConcurrentMap <ProjectId , Boolean > hasIngestGeoIpMetadata = new ConcurrentHashMap <>();
57+ private final ProjectResolver projectResolver ;
5258
5359 protected EnterpriseGeoIpDownloaderLicenseListener (
5460 Client client ,
5561 ClusterService clusterService ,
5662 ThreadPool threadPool ,
57- XPackLicenseState licenseState
63+ XPackLicenseState licenseState ,
64+ ProjectResolver projectResolver
5865 ) {
5966 this .persistentTasksService = new PersistentTasksService (clusterService , threadPool , client );
6067 this .clusterService = clusterService ;
6168 this .licenseState = licenseState ;
69+ this .projectResolver = projectResolver ;
6270 }
6371
6472 private volatile boolean licenseStateListenerRegistered ;
@@ -74,47 +82,54 @@ void listenForLicenseStateChanges() {
7482 licenseState .addListener (this );
7583 }
7684
85+ @ FixForMultiProject (description = "Replace DEFAULT project after license is project-aware" )
7786 @ Override
7887 public void licenseStateChanged () {
79- licenseIsValid = ENTERPRISE_GEOIP_FEATURE .checkWithoutTracking (licenseState );
80- maybeUpdateTaskState (clusterService .state ());
88+ licenseIsValid . put ( ProjectId . DEFAULT , ENTERPRISE_GEOIP_FEATURE .checkWithoutTracking (licenseState ) );
89+ maybeUpdateTaskState (clusterService .state (). projectState ( ProjectId . DEFAULT ) );
8190 }
8291
8392 @ Override
8493 public void clusterChanged (ClusterChangedEvent event ) {
85- hasIngestGeoIpMetadata = event .state ().metadata ().getProject ().custom (INGEST_GEOIP_CUSTOM_METADATA_TYPE ) != null ;
86- final boolean ingestGeoIpCustomMetaChangedInEvent = event .metadataChanged ()
87- && event .changedCustomProjectMetadataSet ().contains (INGEST_GEOIP_CUSTOM_METADATA_TYPE );
88- final boolean masterNodeChanged = Objects .equals (
89- event .state ().nodes ().getMasterNode (),
90- event .previousState ().nodes ().getMasterNode ()
91- ) == false ;
92- /*
93- * We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState if this cluster change
94- * event involved the modification of custom geoip metadata OR a master node change
95- */
96- if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata )) {
97- maybeUpdateTaskState (event .state ());
98- }
94+ event .state ().forEachProject (projectState -> {
95+ ProjectId projectId = projectState .projectId ();
96+ final boolean hasMetadata = event .state ().metadata ().getProject (projectId ).custom (INGEST_GEOIP_CUSTOM_METADATA_TYPE ) != null ;
97+ hasIngestGeoIpMetadata .put (projectId , hasMetadata );
98+ final boolean ingestGeoIpCustomMetaChangedInEvent = event .metadataChanged ()
99+ && event .customMetadataChanged (projectId , INGEST_GEOIP_CUSTOM_METADATA_TYPE );
100+ final boolean masterNodeChanged = Objects .equals (
101+ event .state ().nodes ().getMasterNode (),
102+ event .previousState ().nodes ().getMasterNode ()
103+ ) == false ;
104+ /*
105+ * We don't want to potentially start the task on every cluster state change, so only maybeUpdateTaskState
106+ * if this cluster change event involved the modification of custom geoip metadata OR a master node change
107+ */
108+ if (ingestGeoIpCustomMetaChangedInEvent || (masterNodeChanged && hasIngestGeoIpMetadata .getOrDefault (projectId , false ))) {
109+ maybeUpdateTaskState (projectState );
110+ }
111+ });
99112 }
100113
101- private void maybeUpdateTaskState (ClusterState state ) {
114+ private void maybeUpdateTaskState (ProjectState projectState ) {
115+ ProjectId projectId = projectState .projectId ();
102116 // We should only start/stop task from single node, master is the best as it will go through it anyway
103- if (state .nodes ().isLocalNodeElectedMaster ()) {
104- if (licenseIsValid ) {
105- if (hasIngestGeoIpMetadata ) {
106- ensureTaskStarted ();
117+ if (projectState . cluster () .nodes ().isLocalNodeElectedMaster ()) {
118+ if (licenseIsValid . getOrDefault ( projectId , false ) ) {
119+ if (hasIngestGeoIpMetadata . getOrDefault ( projectId , false ) ) {
120+ ensureTaskStarted (projectId );
107121 }
108122 } else {
109- ensureTaskStopped ();
123+ ensureTaskStopped (projectId );
110124 }
111125 }
112126 }
113127
114- private void ensureTaskStarted () {
115- assert licenseIsValid : "Task should never be started without valid license" ;
116- persistentTasksService .sendStartRequest (
117- ENTERPRISE_GEOIP_DOWNLOADER ,
128+ private void ensureTaskStarted (ProjectId projectId ) {
129+ assert licenseIsValid .getOrDefault (projectId , false ) : "Task should never be started without valid license" ;
130+ persistentTasksService .sendProjectStartRequest (
131+ projectId ,
132+ getTaskId (projectId , projectResolver .supportsMultipleProjects ()),
118133 ENTERPRISE_GEOIP_DOWNLOADER ,
119134 new EnterpriseGeoIpTaskParams (),
120135 MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
@@ -127,7 +142,7 @@ private void ensureTaskStarted() {
127142 );
128143 }
129144
130- private void ensureTaskStopped () {
145+ private void ensureTaskStopped (ProjectId projectId ) {
131146 ActionListener <PersistentTasksCustomMetadata .PersistentTask <?>> listener = ActionListener .wrap (
132147 r -> logger .debug ("Stopped enterprise geoip downloader task" ),
133148 e -> {
@@ -137,6 +152,14 @@ private void ensureTaskStopped() {
137152 }
138153 }
139154 );
140- persistentTasksService .sendRemoveRequest (ENTERPRISE_GEOIP_DOWNLOADER , MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT , listener );
155+ persistentTasksService .sendRemoveRequest (
156+ getTaskId (projectId , projectResolver .supportsMultipleProjects ()),
157+ MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
158+ listener
159+ );
160+ }
161+
162+ protected static String getTaskId (ProjectId projectId , boolean supportsMultipleProjects ) {
163+ return supportsMultipleProjects ? projectId + "/" + ENTERPRISE_GEOIP_DOWNLOADER : ENTERPRISE_GEOIP_DOWNLOADER ;
141164 }
142165}
0 commit comments