Skip to content

Commit 6bacb71

Browse files
committed
Add projectId context
1 parent e68c0ad commit 6bacb71

File tree

1 file changed

+24
-11
lines changed

1 file changed

+24
-11
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ private void setEnabled(boolean enabled) {
153153
}
154154

155155
private void setEagerDownload(Boolean eagerDownload) {
156+
assert projectResolver.getProjectId() != null : "projectId must be set before setting eager download";
156157
if (Objects.equals(this.eagerDownload, eagerDownload) == false) {
157158
this.eagerDownload = eagerDownload;
158159
GeoIpDownloader currentDownloader = getTask(projectResolver.getProjectId());
@@ -163,6 +164,7 @@ private void setEagerDownload(Boolean eagerDownload) {
163164
}
164165

165166
private void setPollInterval(TimeValue pollInterval) {
167+
assert projectResolver.getProjectId() != null : "projectId must be set before setting poll interval";
166168
if (Objects.equals(this.pollInterval, pollInterval) == false) {
167169
this.pollInterval = pollInterval;
168170
GeoIpDownloader currentDownloader = getTask(projectResolver.getProjectId());
@@ -234,7 +236,6 @@ public void clusterChanged(ClusterChangedEvent event) {
234236
return;
235237
}
236238

237-
// TODO: optimize to only loop over the projects that has changed
238239
for (var projectMetadataEntry : event.state().metadata().projects().entrySet()) {
239240
ProjectId projectId = projectMetadataEntry.getKey();
240241
ProjectMetadata projectMetadata = projectMetadataEntry.getValue();
@@ -247,9 +248,15 @@ public void clusterChanged(ClusterChangedEvent event) {
247248
k -> new AtomicBoolean(hasAtLeastOneGeoipProcessor(projectMetadata))
248249
);
249250
if (ENABLED_SETTING.get(event.state().getMetadata().settings(), settings)) {
250-
startTask(() -> taskIsBootstrapped.set(false));
251+
projectResolver.executeOnProject(projectId, () -> {
252+
logger.debug("Bootstrapping geoip downloader task for project [{}]", projectId);
253+
startTask(() -> taskIsBootstrapped.set(false));
254+
});
251255
} else {
252-
stopTask(() -> taskIsBootstrapped.set(false));
256+
projectResolver.executeOnProject(projectId, () -> {
257+
logger.debug("Stopping geoip downloader task for project [{}]", projectId);
258+
stopTask(() -> taskIsBootstrapped.set(false));
259+
});
253260
}
254261
}
255262

@@ -399,11 +406,11 @@ private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object>
399406
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
400407
}
401408

409+
// starts GeoIP downloader task for a single project
402410
private void startTask(Runnable onFailure) {
403-
// TODO: double check thread context project Id
404-
// in multiple project mode, starts task for a single project, projectId in thread context
411+
assert projectResolver.getProjectId() != null : "projectId must be set before starting geoIp download task";
405412
persistentTasksService.sendStartRequest(
406-
getTaskId(),
413+
getTaskId(projectResolver.getProjectId()),
407414
GEOIP_DOWNLOADER,
408415
new GeoIpTaskParams(),
409416
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
@@ -417,7 +424,10 @@ private void startTask(Runnable onFailure) {
417424
);
418425
}
419426

427+
// stops GeoIP downloader task for a single project
420428
private void stopTask(Runnable onFailure) {
429+
assert projectResolver.getProjectId() != null: "projectId must be set before stopping geoIp download task";
430+
ProjectId projectId = projectResolver.getProjectId();
421431
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = ActionListener.wrap(
422432
r -> logger.debug("Stopped geoip downloader task"),
423433
e -> {
@@ -429,18 +439,21 @@ private void stopTask(Runnable onFailure) {
429439
}
430440
);
431441
persistentTasksService.sendRemoveRequest(
432-
getTaskId(),
442+
getTaskId(projectId),
433443
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
434444
ActionListener.runAfter(listener, () -> {
435445
IndexAbstraction databasesAbstraction = clusterService.state()
436-
.projectState(projectResolver.getProjectId())
446+
.projectState(projectId)
437447
.metadata()
438448
.getIndicesLookup()
439449
.get(DATABASES_INDEX);
440450
if (databasesAbstraction != null) {
441451
// regardless of whether DATABASES_INDEX is an alias, resolve it to a concrete index
442452
Index databasesIndex = databasesAbstraction.getWriteIndex();
443-
client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {}, e -> {
453+
client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {
454+
// remove task reference in the map so it can be garbage collected
455+
tasks.remove(projectId);
456+
}, e -> {
444457
Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e;
445458
if (t instanceof ResourceNotFoundException == false) {
446459
logger.warn("failed to remove " + databasesIndex, e);
@@ -459,7 +472,7 @@ public GeoIpDownloader getTask(ProjectId projectId) {
459472
return null;
460473
}
461474

462-
private String getTaskId() {
463-
return projectResolver.supportsMultipleProjects() ? projectResolver.getProjectId() + "/" + GEOIP_DOWNLOADER : GEOIP_DOWNLOADER;
475+
private String getTaskId(ProjectId projectId) {
476+
return projectResolver.supportsMultipleProjects() ? projectId + "/" + GEOIP_DOWNLOADER : GEOIP_DOWNLOADER;
464477
}
465478
}

0 commit comments

Comments
 (0)