1919import org .elasticsearch .client .internal .Client ;
2020import org .elasticsearch .client .internal .OriginSettingClient ;
2121import org .elasticsearch .cluster .ClusterChangedEvent ;
22- import org .elasticsearch .cluster .ClusterState ;
2322import org .elasticsearch .cluster .ClusterStateListener ;
2423import org .elasticsearch .cluster .metadata .IndexAbstraction ;
24+ import org .elasticsearch .cluster .metadata .ProjectId ;
25+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
2526import org .elasticsearch .cluster .node .DiscoveryNode ;
27+ import org .elasticsearch .cluster .project .ProjectResolver ;
2628import org .elasticsearch .cluster .service .ClusterService ;
2729import org .elasticsearch .common .settings .Setting ;
2830import org .elasticsearch .common .settings .Settings ;
31+ import org .elasticsearch .core .FixForMultiProject ;
2932import org .elasticsearch .core .TimeValue ;
3033import org .elasticsearch .gateway .GatewayService ;
3134import org .elasticsearch .index .Index ;
4952import java .util .Map ;
5053import java .util .Objects ;
5154import java .util .Set ;
55+ import java .util .concurrent .ConcurrentHashMap ;
5256import java .util .concurrent .atomic .AtomicBoolean ;
5357import java .util .concurrent .atomic .AtomicReference ;
5458
6468 */
6569public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor <GeoIpTaskParams > implements ClusterStateListener {
6670
71+ @ FixForMultiProject (description = "These settings need to be project-scoped" )
6772 private static final boolean ENABLED_DEFAULT = "false" .equals (
6873 System .getProperty ("ingest.geoip.downloader.enabled.default" , "true" )
6974 ) == false ;
@@ -97,11 +102,15 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
97102 private final ThreadPool threadPool ;
98103 private final Settings settings ;
99104 private final PersistentTasksService persistentTasksService ;
100- private final AtomicReference < GeoIpDownloader > currentTask = new AtomicReference <>();
105+ @ FixForMultiProject ( description = "These settings need to be project-scoped" )
101106 private volatile TimeValue pollInterval ;
102107 private volatile boolean eagerDownload ;
103- private volatile boolean atLeastOneGeoipProcessor ;
104- private final AtomicBoolean taskIsBootstrapped = new AtomicBoolean (false );
108+
109+ private final ConcurrentHashMap <ProjectId , AtomicBoolean > atLeastOneGeoipProcessorByProject = new ConcurrentHashMap <>();
110+ private final ConcurrentHashMap <ProjectId , AtomicBoolean > taskIsBootstrappedByProject = new ConcurrentHashMap <>();
111+ // TODO: start task add to map, stop task remove from map
112+ private final ConcurrentHashMap <ProjectId , AtomicReference <GeoIpDownloader >> tasks = new ConcurrentHashMap <>();
113+ private final ProjectResolver projectResolver ;
105114
106115 GeoIpDownloaderTaskExecutor (Client client , HttpClient httpClient , ClusterService clusterService , ThreadPool threadPool ) {
107116 super (GEOIP_DOWNLOADER , threadPool .generic ());
@@ -113,6 +122,27 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
113122 this .persistentTasksService = new PersistentTasksService (clusterService , threadPool , client );
114123 this .pollInterval = POLL_INTERVAL_SETTING .get (settings );
115124 this .eagerDownload = EAGER_DOWNLOAD_SETTING .get (settings );
125+ this .projectResolver = null ;
126+ }
127+
128+ // TODO: consolidate with the other constructor
129+ GeoIpDownloaderTaskExecutor (
130+ Client client ,
131+ HttpClient httpClient ,
132+ ClusterService clusterService ,
133+ ThreadPool threadPool ,
134+ ProjectResolver projectResolver
135+ ) {
136+ super (GEOIP_DOWNLOADER , threadPool .generic ());
137+ this .client = new OriginSettingClient (client , IngestService .INGEST_ORIGIN );
138+ this .httpClient = httpClient ;
139+ this .clusterService = clusterService ;
140+ this .threadPool = threadPool ;
141+ this .settings = clusterService .getSettings ();
142+ this .persistentTasksService = new PersistentTasksService (clusterService , threadPool , client );
143+ this .pollInterval = POLL_INTERVAL_SETTING .get (settings );
144+ this .eagerDownload = EAGER_DOWNLOAD_SETTING .get (settings );
145+ this .projectResolver = projectResolver ;
116146 }
117147
118148 /**
@@ -140,7 +170,7 @@ private void setEnabled(boolean enabled) {
140170 private void setEagerDownload (Boolean eagerDownload ) {
141171 if (Objects .equals (this .eagerDownload , eagerDownload ) == false ) {
142172 this .eagerDownload = eagerDownload ;
143- GeoIpDownloader currentDownloader = getCurrentTask ( );
173+ GeoIpDownloader currentDownloader = getTask ( projectResolver . getProjectId () );
144174 if (currentDownloader != null && Objects .equals (eagerDownload , Boolean .TRUE )) {
145175 currentDownloader .requestReschedule ();
146176 }
@@ -150,7 +180,7 @@ private void setEagerDownload(Boolean eagerDownload) {
150180 private void setPollInterval (TimeValue pollInterval ) {
151181 if (Objects .equals (this .pollInterval , pollInterval ) == false ) {
152182 this .pollInterval = pollInterval ;
153- GeoIpDownloader currentDownloader = getCurrentTask ( );
183+ GeoIpDownloader currentDownloader = getTask ( projectResolver . getProjectId () );
154184 if (currentDownloader != null ) {
155185 currentDownloader .requestReschedule ();
156186 }
@@ -162,7 +192,11 @@ protected void nodeOperation(AllocatedPersistentTask task, GeoIpTaskParams param
162192 GeoIpDownloader downloader = (GeoIpDownloader ) task ;
163193 GeoIpTaskState geoIpTaskState = (state == null ) ? GeoIpTaskState .EMPTY : (GeoIpTaskState ) state ;
164194 downloader .setState (geoIpTaskState );
165- currentTask .set (downloader );
195+ AtomicReference <GeoIpDownloader > downloaderReference = tasks .computeIfAbsent (
196+ projectResolver .getProjectId (),
197+ k -> new AtomicReference <>()
198+ );
199+ downloaderReference .set (downloader );
166200 if (ENABLED_SETTING .get (clusterService .state ().metadata ().settings (), settings )) {
167201 downloader .runDownloader ();
168202 }
@@ -177,6 +211,7 @@ protected GeoIpDownloader createTask(
177211 PersistentTasksCustomMetadata .PersistentTask <GeoIpTaskParams > taskInProgress ,
178212 Map <String , String > headers
179213 ) {
214+ AtomicBoolean atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject .get (projectResolver .getProjectId ());
180215 return new GeoIpDownloader (
181216 client ,
182217 httpClient ,
@@ -191,7 +226,8 @@ protected GeoIpDownloader createTask(
191226 headers ,
192227 () -> pollInterval ,
193228 () -> eagerDownload ,
194- () -> atLeastOneGeoipProcessor
229+ () -> atLeastOneGeoipProcessor != null && atLeastOneGeoipProcessor .get (),
230+ projectResolver
195231 );
196232 }
197233
@@ -208,52 +244,63 @@ public void clusterChanged(ClusterChangedEvent event) {
208244 return ;
209245 }
210246
211- if (taskIsBootstrapped .getAndSet (true ) == false ) {
212- this .atLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor (event .state ());
213- if (ENABLED_SETTING .get (event .state ().getMetadata ().settings (), settings )) {
214- startTask (() -> taskIsBootstrapped .set (false ));
215- } else {
216- stopTask (() -> taskIsBootstrapped .set (false ));
217- }
218- }
219-
220247 if (event .metadataChanged () == false ) {
221248 return ;
222249 }
223250
224- boolean hasIndicesChanges = event .previousState ()
225- .metadata ()
226- .getProject ()
227- .indices ()
228- .equals (event .state ().metadata ().getProject ().indices ()) == false ;
229- boolean hasIngestPipelineChanges = event .metadataChanged () && event .changedCustomProjectMetadataSet ().contains (IngestMetadata .TYPE );
230-
231- if (hasIngestPipelineChanges || hasIndicesChanges ) {
232- boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor (event .state ());
233- if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false ) {
234- atLeastOneGeoipProcessor = true ;
235- logger .trace ("Scheduling runDownloader because a geoip processor has been added" );
236- GeoIpDownloader currentDownloader = getCurrentTask ();
237- if (currentDownloader != null ) {
238- currentDownloader .requestReschedule ();
251+ // TODO: optimize to only loop over the projects that has changed
252+ for (var projectMetadataEntry : event .state ().metadata ().projects ().entrySet ()) {
253+ ProjectId projectId = projectMetadataEntry .getKey ();
254+ ProjectMetadata projectMetadata = projectMetadataEntry .getValue ();
255+
256+ // bootstrap task once iff it is not already bootstrapped
257+ AtomicBoolean taskIsBootstrapped = taskIsBootstrappedByProject .computeIfAbsent (projectId , k -> new AtomicBoolean (false ));
258+ if (taskIsBootstrapped .getAndSet (true ) == false ) {
259+ this .taskIsBootstrappedByProject .computeIfAbsent (
260+ projectId ,
261+ k -> new AtomicBoolean (hasAtLeastOneGeoipProcessor (projectMetadata ))
262+ );
263+ if (ENABLED_SETTING .get (event .state ().getMetadata ().settings (), settings )) {
264+ startTask (() -> taskIsBootstrapped .set (false ));
265+ } else {
266+ stopTask (() -> taskIsBootstrapped .set (false ));
267+ }
268+ }
269+
270+ boolean hasIngestPipelineChanges = event .changedCustomProjectMetadataSet (projectId ).contains (IngestMetadata .TYPE );
271+ boolean hasIndicesChanges = event .previousState ()
272+ .metadata ()
273+ .getProject (projectId )
274+ .indices ()
275+ .equals (projectMetadata .indices ()) == false ;
276+
277+ if (hasIngestPipelineChanges || hasIndicesChanges ) {
278+ var atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject .computeIfAbsent (projectId , k -> new AtomicBoolean (false ));
279+ boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor (projectMetadata );
280+
281+ if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor .get () == false ) {
282+ logger .trace ("Scheduling runDownloader because a geoip processor has been added" );
283+ GeoIpDownloader currentDownloader = getTask (projectId );
284+ if (currentDownloader != null ) {
285+ currentDownloader .requestReschedule ();
286+ }
239287 }
240- } else {
241- atLeastOneGeoipProcessor = newAtLeastOneGeoipProcessor ;
288+ atLeastOneGeoipProcessor .set (newAtLeastOneGeoipProcessor );
242289 }
243290 }
244291 }
245292
246- static boolean hasAtLeastOneGeoipProcessor (ClusterState clusterState ) {
247- if (pipelinesWithGeoIpProcessor (clusterState , true ).isEmpty () == false ) {
293+ static boolean hasAtLeastOneGeoipProcessor (ProjectMetadata projectMetadata ) {
294+ if (pipelinesWithGeoIpProcessor (projectMetadata , true ).isEmpty () == false ) {
248295 return true ;
249296 }
250297
251- final Set <String > checkReferencedPipelines = pipelinesWithGeoIpProcessor (clusterState , false );
298+ final Set <String > checkReferencedPipelines = pipelinesWithGeoIpProcessor (projectMetadata , false );
252299 if (checkReferencedPipelines .isEmpty ()) {
253300 return false ;
254301 }
255302
256- return clusterState . getMetadata (). getProject () .indices ().values ().stream ().anyMatch (indexMetadata -> {
303+ return projectMetadata .indices ().values ().stream ().anyMatch (indexMetadata -> {
257304 String defaultPipeline = IndexSettings .DEFAULT_PIPELINE .get (indexMetadata .getSettings ());
258305 String finalPipeline = IndexSettings .FINAL_PIPELINE .get (indexMetadata .getSettings ());
259306 return checkReferencedPipelines .contains (defaultPipeline ) || checkReferencedPipelines .contains (finalPipeline );
@@ -262,14 +309,14 @@ static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
262309
263310 /**
264311 * Retrieve the set of pipeline ids that have at least one geoip processor.
265- * @param clusterState Cluster state.
312+ * @param projectMetadata project metadata
266313 * @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
267314 * matching the param.
268315 * @return A set of pipeline ids matching criteria.
269316 */
270317 @ SuppressWarnings ("unchecked" )
271- private static Set <String > pipelinesWithGeoIpProcessor (ClusterState clusterState , boolean downloadDatabaseOnPipelineCreation ) {
272- List <PipelineConfiguration > configurations = IngestService .getPipelines (clusterState . metadata (). getProject () );
318+ private static Set <String > pipelinesWithGeoIpProcessor (ProjectMetadata projectMetadata , boolean downloadDatabaseOnPipelineCreation ) {
319+ List <PipelineConfiguration > configurations = IngestService .getPipelines (projectMetadata );
273320 Set <String > ids = new HashSet <>();
274321 // note: this loop is unrolled rather than streaming-style because it's hot enough to show up in a flamegraph
275322 for (PipelineConfiguration configuration : configurations ) {
@@ -367,8 +414,10 @@ private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object>
367414 }
368415
369416 private void startTask (Runnable onFailure ) {
417+ // TODO: double check thread context project Id
418+ // in multiple project mode, starts task for a single project, projectId in thread context
370419 persistentTasksService .sendStartRequest (
371- GEOIP_DOWNLOADER ,
420+ getTaskId () ,
372421 GEOIP_DOWNLOADER ,
373422 new GeoIpTaskParams (),
374423 MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
@@ -394,12 +443,12 @@ private void stopTask(Runnable onFailure) {
394443 }
395444 );
396445 persistentTasksService .sendRemoveRequest (
397- GEOIP_DOWNLOADER ,
446+ getTaskId () ,
398447 MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
399448 ActionListener .runAfter (listener , () -> {
400449 IndexAbstraction databasesAbstraction = clusterService .state ()
450+ .projectState (projectResolver .getProjectId ())
401451 .metadata ()
402- .getDefaultProject ()
403452 .getIndicesLookup ()
404453 .get (DATABASES_INDEX );
405454 if (databasesAbstraction != null ) {
@@ -416,7 +465,15 @@ private void stopTask(Runnable onFailure) {
416465 );
417466 }
418467
419- public GeoIpDownloader getCurrentTask () {
420- return currentTask .get ();
468+ public GeoIpDownloader getTask (ProjectId projectId ) {
469+ var taskReference = tasks .get (projectId );
470+ if (taskReference != null ) {
471+ return taskReference .get ();
472+ }
473+ return null ;
474+ }
475+
476+ private String getTaskId () {
477+ return projectResolver .supportsMultipleProjects () ? projectResolver .getProjectId () + "/" + GEOIP_DOWNLOADER : GEOIP_DOWNLOADER ;
421478 }
422479}
0 commit comments