Skip to content

Commit 5092830

Browse files
committed
Remove atomic and test change
1 parent a54d56b commit 5092830

File tree

2 files changed

+26
-61
lines changed

2 files changed

+26
-61
lines changed

modules/ingest-geoip/qa/multi-project/src/javaRestTest/java/geoip/GeoIpMultiProjectIT.java

Lines changed: 16 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ public void testGeoIpDownloader() throws Exception {
7070

7171
// download databases for project1
7272
putGeoIpPipeline(project1);
73-
assertBusy(() -> assertDatabasesLoaded(project1), 30, TimeUnit.SECONDS);
74-
assertBusy(() -> assertDatabasesNotLoaded(project2), 30, TimeUnit.SECONDS);
73+
assertBusy(() -> assertDatabases(project1, true), 30, TimeUnit.SECONDS);
74+
assertBusy(() -> assertDatabases(project2, false), 30, TimeUnit.SECONDS);
7575

7676
// download databases for project2
7777
putGeoIpPipeline(project2);
78-
assertBusy(() -> assertDatabasesLoaded(project2), 30, TimeUnit.SECONDS);
78+
assertBusy(() -> assertDatabases(project2, true), 30, TimeUnit.SECONDS);
7979
}
8080

8181
private void putGeoIpPipeline(String projectId) throws IOException {
@@ -106,38 +106,7 @@ private static Request setRequestProjectId(String projectId, Request request) {
106106
}
107107

108108
@SuppressWarnings("unchecked")
109-
private void assertDatabasesNotLoaded(String projectId) throws IOException {
110-
Request getTaskState = new Request("GET", "/_cluster/state");
111-
setRequestProjectId(projectId, getTaskState);
112-
113-
ObjectPath state = ObjectPath.createFromResponse(assertOK(client().performRequest(getTaskState)));
114-
115-
List<Map<String, ?>> tasks = state.evaluate("metadata.persistent_tasks.tasks");
116-
// Short-circuit to avoid using steams if the list is empty
117-
// task should exist but no database should be downloaded
118-
if (tasks.isEmpty()) {
119-
fail("persistent tasks list is empty, expected at least one task for geoip-downloader");
120-
}
121-
122-
// verify project task id
123-
Set<Map<String, ?>> id = tasks.stream()
124-
.filter(task -> String.format("%s/geoip-downloader", projectId).equals(task.get("id")))
125-
.collect(Collectors.toSet());
126-
assertThat(id.size(), equalTo(1));
127-
128-
// verify no database download
129-
Map<String, Object> databases = (Map<String, Object>) tasks.stream().map(task -> {
130-
try {
131-
return ObjectPath.evaluate(task, "task.geoip-downloader.state.databases");
132-
} catch (IOException e) {
133-
return null;
134-
}
135-
}).filter(Objects::nonNull).findFirst().orElse(null);
136-
assertNull(databases);
137-
}
138-
139-
@SuppressWarnings("unchecked")
140-
private void assertDatabasesLoaded(String projectId) throws IOException {
109+
private void assertDatabases(String projectId, boolean shouldDownload) throws IOException {
141110
Request getTaskState = new Request("GET", "/_cluster/state");
142111
setRequestProjectId(projectId, getTaskState);
143112

@@ -164,13 +133,18 @@ private void assertDatabasesLoaded(String projectId) throws IOException {
164133
}
165134
}).filter(Objects::nonNull).findFirst().orElse(null);
166135

167-
assertNotNull(databases);
168-
169-
for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
170-
Object database = databases.get(name);
171-
assertNotNull(database);
172-
assertNotNull(ObjectPath.evaluate(database, "md5"));
136+
if (shouldDownload) {
137+
// verify database downloaded
138+
assertNotNull(databases);
139+
for (String name : List.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb")) {
140+
Object database = databases.get(name);
141+
assertNotNull(database);
142+
assertNotNull(ObjectPath.evaluate(database, "md5"));
143+
}
144+
} else {
145+
// verify database not downloaded
146+
assertNull(databases);
173147
}
174-
}
175148

149+
}
176150
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import java.util.Set;
5555
import java.util.concurrent.ConcurrentHashMap;
5656
import java.util.concurrent.atomic.AtomicBoolean;
57-
import java.util.concurrent.atomic.AtomicReference;
5857

5958
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
6059
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
@@ -105,9 +104,9 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
105104
private volatile TimeValue pollInterval;
106105
private volatile boolean eagerDownload;
107106

108-
private final ConcurrentHashMap<ProjectId, AtomicBoolean> atLeastOneGeoipProcessorByProject = new ConcurrentHashMap<>();
107+
private final ConcurrentHashMap<ProjectId, Boolean> atLeastOneGeoipProcessorByProject = new ConcurrentHashMap<>();
109108
private final ConcurrentHashMap<ProjectId, AtomicBoolean> taskIsBootstrappedByProject = new ConcurrentHashMap<>();
110-
private final ConcurrentHashMap<ProjectId, AtomicReference<GeoIpDownloader>> tasks = new ConcurrentHashMap<>();
109+
private final ConcurrentHashMap<ProjectId, GeoIpDownloader> tasks = new ConcurrentHashMap<>();
111110
private final ProjectResolver projectResolver;
112111

113112
GeoIpDownloaderTaskExecutor(
@@ -178,16 +177,10 @@ private void setPollInterval(TimeValue pollInterval) {
178177

179178
@Override
180179
protected void nodeOperation(AllocatedPersistentTask task, GeoIpTaskParams params, PersistentTaskState state) {
181-
logger.info("Executing node operation for GeoIpDownloader task id [{}] for project [{}]", task.getId(), task.getProjectId());
182-
183180
GeoIpDownloader downloader = (GeoIpDownloader) task;
184181
GeoIpTaskState geoIpTaskState = (state == null) ? GeoIpTaskState.EMPTY : (GeoIpTaskState) state;
185182
downloader.setState(geoIpTaskState);
186-
AtomicReference<GeoIpDownloader> downloaderReference = tasks.computeIfAbsent(
187-
projectResolver.getProjectId(),
188-
k -> new AtomicReference<>()
189-
);
190-
downloaderReference.set(downloader);
183+
tasks.put(projectResolver.getProjectId(), downloader);
191184
if (ENABLED_SETTING.get(clusterService.state().metadata().settings(), settings)) {
192185
downloader.runDownloader();
193186
}
@@ -217,7 +210,7 @@ protected GeoIpDownloader createTask(
217210
headers,
218211
() -> pollInterval,
219212
() -> eagerDownload,
220-
() -> atLeastOneGeoipProcessorByProject.get(projectId) != null && atLeastOneGeoipProcessorByProject.get(projectId).get(),
213+
() -> atLeastOneGeoipProcessorByProject.getOrDefault(projectId, false),
221214
projectId,
222215
projectResolver
223216
);
@@ -275,18 +268,18 @@ public void clusterChanged(ClusterChangedEvent event) {
275268
if (hasIngestPipelineChanges || hasIndicesChanges) {
276269
var atLeastOneGeoipProcessor = atLeastOneGeoipProcessorByProject.computeIfAbsent(
277270
projectId,
278-
k -> new AtomicBoolean(false)
271+
k -> false
279272
);
280273
boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(projectMetadata);
281274

282-
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor.get() == false) {
275+
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) {
283276
logger.trace("Scheduling runDownloader for project [{}] because a geoip processor has been added", projectId);
284277
GeoIpDownloader currentDownloader = getTask(projectId);
285278
if (currentDownloader != null) {
286279
currentDownloader.requestReschedule();
287280
}
288281
}
289-
atLeastOneGeoipProcessor.set(newAtLeastOneGeoipProcessor);
282+
atLeastOneGeoipProcessorByProject.put(projectId, newAtLeastOneGeoipProcessor);
290283
}
291284
});
292285
}
@@ -463,6 +456,8 @@ private void stopTask(Runnable onFailure) {
463456
client.admin().indices().prepareDelete(databasesIndex.getName()).execute(ActionListener.wrap(rr -> {
464457
// remove task reference in the map so it can be garbage collected
465458
tasks.remove(projectId);
459+
taskIsBootstrappedByProject.remove(projectId);
460+
atLeastOneGeoipProcessorByProject.remove(projectId);
466461
}, e -> {
467462
Throwable t = e instanceof RemoteTransportException ? ExceptionsHelper.unwrapCause(e) : e;
468463
if (t instanceof ResourceNotFoundException == false) {
@@ -475,11 +470,7 @@ private void stopTask(Runnable onFailure) {
475470
}
476471

477472
public GeoIpDownloader getTask(ProjectId projectId) {
478-
var taskReference = tasks.get(projectId);
479-
if (taskReference != null) {
480-
return taskReference.get();
481-
}
482-
return null;
473+
return tasks.get(projectId);
483474
}
484475

485476
public static String getTaskId(ProjectId projectId, boolean supportsMultipleProjects) {

0 commit comments

Comments
 (0)