@@ -144,9 +144,9 @@ private void setEnabled(boolean enabled) {
144144 return ;
145145 }
146146 if (enabled ) {
147- startTask (() -> {});
147+ startTask (projectResolver . getProjectId (), () -> {});
148148 } else {
149- stopTask (() -> {});
149+ stopTask (projectResolver . getProjectId (), () -> {});
150150 }
151151 }
152152
@@ -196,7 +196,7 @@ protected GeoIpDownloader createTask(
196196 ) {
197197 ProjectId projectId = projectResolver .getProjectId ();
198198 return new GeoIpDownloader (
199- client ,
199+ client . projectClient ( projectId ) ,
200200 httpClient ,
201201 clusterService ,
202202 threadPool ,
@@ -236,46 +236,44 @@ public void clusterChanged(ClusterChangedEvent event) {
236236 ProjectId projectId = projectMetadataEntry .getKey ();
237237 ProjectMetadata projectMetadata = projectMetadataEntry .getValue ();
238238
239- projectResolver .executeOnProject (projectId , () -> {
240- // bootstrap task once iff it is not already bootstrapped
241- boolean taskIsBootstrapped = taskIsBootstrappedByProject .computeIfAbsent (projectId , k -> false );
242- if (taskIsBootstrapped != true ) {
243- taskIsBootstrappedByProject .put (projectId , true );
244- this .taskIsBootstrappedByProject .computeIfAbsent (projectId , k -> hasAtLeastOneGeoipProcessor (projectMetadata ));
245- if (ENABLED_SETTING .get (event .state ().getMetadata ().settings (), settings )) {
246- logger .debug ("Bootstrapping geoip downloader task for project [{}]" , projectId );
247- startTask (() -> taskIsBootstrappedByProject .put (projectId , false ));
248- } else {
249- logger .debug ("Stopping geoip downloader task for project [{}]" , projectId );
250- stopTask (() -> taskIsBootstrappedByProject .put (projectId , false ));
251- }
239+ // bootstrap task once iff it is not already bootstrapped
240+ boolean taskIsBootstrapped = taskIsBootstrappedByProject .computeIfAbsent (projectId , k -> false );
241+ if (taskIsBootstrapped != true ) {
242+ taskIsBootstrappedByProject .put (projectId , true );
243+ this .taskIsBootstrappedByProject .computeIfAbsent (projectId , k -> hasAtLeastOneGeoipProcessor (projectMetadata ));
244+ if (ENABLED_SETTING .get (event .state ().getMetadata ().settings (), settings )) {
245+ logger .debug ("Bootstrapping geoip downloader task for project [{}]" , projectId );
246+ startTask (projectId , () -> taskIsBootstrappedByProject .put (projectId , false ));
247+ } else {
248+ logger .debug ("Stopping geoip downloader task for project [{}]" , projectId );
249+ stopTask (projectId , () -> taskIsBootstrappedByProject .put (projectId , false ));
252250 }
251+ }
253252
254- boolean hasIngestPipelineChanges = event .customMetadataChanged (projectId , IngestMetadata .TYPE );
255- boolean hasIndicesChanges = false ;
256- boolean projectExisted = event .previousState ().metadata ().hasProject (projectId );
257- if (projectExisted ) {
258- hasIndicesChanges = event .previousState ()
259- .metadata ()
260- .getProject (projectId )
261- .indices ()
262- .equals (projectMetadata .indices ()) == false ;
263- }
253+ boolean hasIngestPipelineChanges = event .customMetadataChanged (projectId , IngestMetadata .TYPE );
254+ boolean hasIndicesChanges = false ;
255+ boolean projectExisted = event .previousState ().metadata ().hasProject (projectId );
256+ if (projectExisted ) {
257+ hasIndicesChanges = event .previousState ()
258+ .metadata ()
259+ .getProject (projectId )
260+ .indices ()
261+ .equals (projectMetadata .indices ()) == false ;
262+ }
264263
265- if (hasIngestPipelineChanges || hasIndicesChanges ) {
266- var atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject .computeIfAbsent (projectId , k -> false );
267- boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor (projectMetadata );
264+ if (hasIngestPipelineChanges || hasIndicesChanges ) {
265+ var atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject .computeIfAbsent (projectId , k -> false );
266+ boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor (projectMetadata );
268267
269- if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false ) {
270- logger .trace ("Scheduling runDownloader for project [{}] because a geoip processor has been added" , projectId );
271- GeoIpDownloader currentDownloader = getTask (projectId );
272- if (currentDownloader != null ) {
273- currentDownloader .requestReschedule ();
274- }
268+ if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false ) {
269+ logger .trace ("Scheduling runDownloader for project [{}] because a geoip processor has been added" , projectId );
270+ GeoIpDownloader currentDownloader = getTask (projectId );
271+ if (currentDownloader != null ) {
272+ currentDownloader .requestReschedule ();
275273 }
276- atLeastOneGeoipProcessorByProject .put (projectId , newAtLeastOneGeoipProcessor );
277274 }
278- });
275+ atLeastOneGeoipProcessorByProject .put (projectId , newAtLeastOneGeoipProcessor );
276+ }
279277 }
280278 }
281279
@@ -403,10 +401,10 @@ private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object>
403401 }
404402
405403 // starts GeoIP downloader task for a single project
406- private void startTask (Runnable onFailure ) {
407- ProjectId projectId = projectResolver .getProjectId ();
404+ private void startTask (ProjectId projectId , Runnable onFailure ) {
408405 assert projectId != null : "projectId must be set before starting geoIp download task" ;
409- persistentTasksService .sendStartRequest (
406+ persistentTasksService .sendProjectStartRequest (
407+ projectId ,
410408 getTaskId (projectId , projectResolver .supportsMultipleProjects ()),
411409 GEOIP_DOWNLOADER ,
412410 new GeoIpTaskParams (),
@@ -422,9 +420,7 @@ private void startTask(Runnable onFailure) {
422420 }
423421
424422 // stops GeoIP downloader task for a single project
425- private void stopTask (Runnable onFailure ) {
426- assert projectResolver .getProjectId () != null : "projectId must be set before stopping geoIp download task" ;
427- ProjectId projectId = projectResolver .getProjectId ();
423+ private void stopTask (ProjectId projectId , Runnable onFailure ) {
428424 ActionListener <PersistentTasksCustomMetadata .PersistentTask <?>> listener = ActionListener .wrap (
429425 r -> logger .debug ("Stopped geoip downloader task" ),
430426 e -> {
@@ -435,7 +431,8 @@ private void stopTask(Runnable onFailure) {
435431 }
436432 }
437433 );
438- persistentTasksService .sendRemoveRequest (
434+ persistentTasksService .sendProjectRemoveRequest (
435+ projectId ,
439436 getTaskId (projectId , projectResolver .supportsMultipleProjects ()),
440437 MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
441438 ActionListener .runAfter (listener , () -> {
@@ -447,17 +444,21 @@ private void stopTask(Runnable onFailure) {
447444 if (databasesAbstraction != null ) {
448445 // regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
449446 Index databasesIndex = databasesAbstraction .getWriteIndex ();
450- client .admin ().indices ().prepareDelete (databasesIndex .getName ()).execute (ActionListener .wrap (rr -> {
451- // remove task reference in the map so it can be garbage collected
452- tasks .remove (projectId );
453- taskIsBootstrappedByProject .remove (projectId );
454- atLeastOneGeoipProcessorByProject .remove (projectId );
455- }, e -> {
456- Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper .unwrapCause (e ) : e ;
457- if (t instanceof ResourceNotFoundException == false ) {
458- logger .warn ("failed to remove " + databasesIndex , e );
459- }
460- }));
447+ client .projectClient (projectId )
448+ .admin ()
449+ .indices ()
450+ .prepareDelete (databasesIndex .getName ())
451+ .execute (ActionListener .wrap (rr -> {
452+ // remove task reference in the map so it can be garbage collected
453+ tasks .remove (projectId );
454+ taskIsBootstrappedByProject .remove (projectId );
455+ atLeastOneGeoipProcessorByProject .remove (projectId );
456+ }, e -> {
457+ Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper .unwrapCause (e ) : e ;
458+ if (t instanceof ResourceNotFoundException == false ) {
459+ logger .warn ("failed to remove " + databasesIndex , e );
460+ }
461+ }));
461462 }
462463 })
463464 );
0 commit comments