5353import java .util .Objects ;
5454import java .util .Set ;
5555import java .util .concurrent .ConcurrentHashMap ;
56+ import java .util .concurrent .atomic .AtomicBoolean ;
5657
5758import static org .elasticsearch .ingest .geoip .GeoIpDownloader .DATABASES_INDEX ;
5859import static org .elasticsearch .ingest .geoip .GeoIpDownloader .GEOIP_DOWNLOADER ;
@@ -104,7 +105,7 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
104105 private volatile boolean eagerDownload ;
105106
106107 private final ConcurrentHashMap <ProjectId , Boolean > atLeastOneGeoipProcessorByProject = new ConcurrentHashMap <>();
107- private final ConcurrentHashMap <ProjectId , Boolean > taskIsBootstrappedByProject = new ConcurrentHashMap <>();
108+ private final ConcurrentHashMap <ProjectId , AtomicBoolean > taskIsBootstrappedByProject = new ConcurrentHashMap <>();
108109 private final ConcurrentHashMap <ProjectId , GeoIpDownloader > tasks = new ConcurrentHashMap <>();
109110 private final ProjectResolver projectResolver ;
110111
@@ -133,24 +134,22 @@ public void init() {
133134
134135 @ FixForMultiProject (description = "Should execute in the context of the current project after settings are project-aware" )
135136 private void setEnabled (boolean enabled ) {
136- assert projectResolver .getProjectId () != null : "projectId must be set before enabling download" ;
137137 if (clusterService .state ().nodes ().isLocalNodeElectedMaster () == false ) {
138138 // we should only start/stop task from single node, master is the best as it will go through it anyway
139139 return ;
140140 }
141141 if (enabled ) {
142- startTask (projectResolver . getProjectId () , () -> {});
142+ startTask (ProjectId . DEFAULT , () -> {});
143143 } else {
144- stopTask (projectResolver . getProjectId () , () -> {});
144+ stopTask (ProjectId . DEFAULT , () -> {});
145145 }
146146 }
147147
148148 @ FixForMultiProject (description = "Should execute in the context of the current project after settings are project-aware" )
149149 private void setEagerDownload (Boolean eagerDownload ) {
150- assert projectResolver .getProjectId () != null : "projectId must be set before setting eager download" ;
151150 if (Objects .equals (this .eagerDownload , eagerDownload ) == false ) {
152151 this .eagerDownload = eagerDownload ;
153- GeoIpDownloader currentDownloader = getTask (projectResolver . getProjectId () );
152+ GeoIpDownloader currentDownloader = getTask (ProjectId . DEFAULT );
154153 if (currentDownloader != null && Objects .equals (eagerDownload , Boolean .TRUE )) {
155154 currentDownloader .requestReschedule ();
156155 }
@@ -159,10 +158,9 @@ private void setEagerDownload(Boolean eagerDownload) {
159158
160159 @ FixForMultiProject (description = "Should execute in the context of the current project after settings are project-aware" )
161160 private void setPollInterval (TimeValue pollInterval ) {
162- assert projectResolver .getProjectId () != null : "projectId must be set before setting poll interval" ;
163161 if (Objects .equals (this .pollInterval , pollInterval ) == false ) {
164162 this .pollInterval = pollInterval ;
165- GeoIpDownloader currentDownloader = getTask (projectResolver . getProjectId () );
163+ GeoIpDownloader currentDownloader = getTask (ProjectId . DEFAULT );
166164 if (currentDownloader != null ) {
167165 currentDownloader .requestReschedule ();
168166 }
@@ -209,6 +207,7 @@ protected GeoIpDownloader createTask(
209207 );
210208 }
211209
210+ @ FixForMultiProject (description = "Make sure removed project tasks are cancelled: https://elasticco.atlassian.net/browse/ES-12054" )
212211 @ Override
213212 public void clusterChanged (ClusterChangedEvent event ) {
214213 if (event .state ().blocks ().hasGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK )) {
@@ -226,21 +225,20 @@ public void clusterChanged(ClusterChangedEvent event) {
226225 return ;
227226 }
228227
229- for (var projectMetadataEntry : event .state ().metadata ().projects ().entrySet ()) {
230- ProjectId projectId = projectMetadataEntry .getKey ();
231- ProjectMetadata projectMetadata = projectMetadataEntry .getValue ();
228+ for (var projectMetadata : event .state ().metadata ().projects ().values ()) {
229+ ProjectId projectId = projectMetadata .id ();
232230
233231 // bootstrap task once iff it is not already bootstrapped
234- boolean taskIsBootstrapped = taskIsBootstrappedByProject .computeIfAbsent (projectId , k -> false );
235- if (taskIsBootstrapped != true ) {
236- taskIsBootstrappedByProject .put (projectId , true );
237- this . taskIsBootstrappedByProject . computeIfAbsent ( projectId , k -> hasAtLeastOneGeoipProcessor (projectMetadata ));
232+ AtomicBoolean taskIsBootstrapped = taskIsBootstrappedByProject .computeIfAbsent (projectId , k -> new AtomicBoolean ( false ) );
233+ if (taskIsBootstrapped . getAndSet ( true ) == false ) {
234+ this . taskIsBootstrappedByProject .computeIfAbsent (projectId ,
235+ k -> new AtomicBoolean ( hasAtLeastOneGeoipProcessor (projectMetadata ) ));
238236 if (ENABLED_SETTING .get (event .state ().getMetadata ().settings (), settings )) {
239237 logger .debug ("Bootstrapping geoip downloader task for project [{}]" , projectId );
240- startTask (projectId , () -> taskIsBootstrappedByProject . put ( projectId , false ));
238+ startTask (projectId , () -> taskIsBootstrapped . set ( false ));
241239 } else {
242240 logger .debug ("Stopping geoip downloader task for project [{}]" , projectId );
243- stopTask (projectId , () -> taskIsBootstrappedByProject . put ( projectId , false ));
241+ stopTask (projectId , () -> taskIsBootstrapped . set ( false ));
244242 }
245243 }
246244
@@ -430,8 +428,8 @@ private void stopTask(ProjectId projectId, Runnable onFailure) {
430428 MasterNodeRequest .INFINITE_MASTER_NODE_TIMEOUT ,
431429 ActionListener .runAfter (listener , () -> {
432430 IndexAbstraction databasesAbstraction = clusterService .state ()
433- .projectState (projectId )
434431 .metadata ()
432+ .getProject (projectId )
435433 .getIndicesLookup ()
436434 .get (DATABASES_INDEX );
437435 if (databasesAbstraction != null ) {
0 commit comments