Skip to content

Commit 2ddbdea

Browse files
committed
WIP: make GeoIp search project aware
1 parent b561598 commit 2ddbdea

File tree

12 files changed

+343
-243
lines changed

12 files changed

+343
-243
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/ConfigDatabases.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ final class ConfigDatabases implements Closeable {
3939
private final GeoIpCache cache;
4040
private final Path geoipConfigDir;
4141

42+
// private final ConcurrentMap<ProjectId, ConcurrentMap<String, DatabaseReaderLazyLoader>> configDatabasesByProject;
4243
private final ConcurrentMap<String, DatabaseReaderLazyLoader> configDatabases;
4344

4445
ConfigDatabases(Environment environment, GeoIpCache cache) {

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseNodeService.java

Lines changed: 250 additions & 191 deletions
Large diffs are not rendered by default.

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,13 @@ public final String getDatabaseType() throws IOException {
8888
return databaseType.get();
8989
}
9090

91+
/**
92+
* Prepares the database for lookup by incrementing the usage count.
93+
* If the usage count is already negative, it indicates that the database is being closed,
94+
* and this method will return false to indicate that no lookup should be performed.
95+
*
96+
* @return true if the database is ready for lookup, false if it is being closed
97+
*/
9198
boolean preLookup() {
9299
return currentUsages.updateAndGet(current -> current < 0 ? current : current + 1) > 0;
93100
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/EnterpriseGeoIpTaskState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public void writeTo(StreamOutput out) throws IOException {
146146
* @return the geoip downloader's task state or null if there is not a state to read
147147
*/
148148
@Nullable
149+
@Deprecated(forRemoval = true)
149150
static EnterpriseGeoIpTaskState getEnterpriseGeoIpTaskState(ClusterState state) {
150151
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(state, EnterpriseGeoIpTask.ENTERPRISE_GEOIP_DOWNLOADER);
151152
return (task == null) ? null : (EnterpriseGeoIpTaskState) task.getState();

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.Objects;
5454
import java.util.Set;
5555
import java.util.concurrent.ConcurrentHashMap;
56+
import java.util.concurrent.ConcurrentMap;
5657
import java.util.concurrent.atomic.AtomicBoolean;
5758
import java.util.concurrent.atomic.AtomicReference;
5859

@@ -106,9 +107,9 @@ public final class GeoIpDownloaderTaskExecutor extends PersistentTasksExecutor<G
106107
private volatile TimeValue pollInterval;
107108
private volatile boolean eagerDownload;
108109

109-
private final ConcurrentHashMap<ProjectId, AtomicBoolean> atLeastOneGeoipProcessorByProject = new ConcurrentHashMap<>();
110-
private final ConcurrentHashMap<ProjectId, AtomicBoolean> taskIsBootstrappedByProject = new ConcurrentHashMap<>();
111-
private final ConcurrentHashMap<ProjectId, AtomicReference<GeoIpDownloader>> tasks = new ConcurrentHashMap<>();
110+
private final ConcurrentMap<ProjectId, AtomicBoolean> atLeastOneGeoipProcessorByProject = new ConcurrentHashMap<>();
111+
private final ConcurrentMap<ProjectId, AtomicBoolean> taskIsBootstrappedByProject = new ConcurrentHashMap<>();
112+
private final ConcurrentMap<ProjectId, AtomicReference<GeoIpDownloader>> tasks = new ConcurrentHashMap<>();
112113
private final ProjectResolver projectResolver;
113114

114115
GeoIpDownloaderTaskExecutor(
@@ -409,7 +410,7 @@ private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object>
409410
private void startTask(Runnable onFailure) {
410411
assert projectResolver.getProjectId() != null : "projectId must be set before starting geoIp download task";
411412
persistentTasksService.sendStartRequest(
412-
getTaskId(projectResolver.getProjectId()),
413+
getTaskId(projectResolver.getProjectId(), projectResolver.supportsMultipleProjects()),
413414
GEOIP_DOWNLOADER,
414415
new GeoIpTaskParams(),
415416
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
@@ -438,7 +439,7 @@ private void stopTask(Runnable onFailure) {
438439
}
439440
);
440441
persistentTasksService.sendRemoveRequest(
441-
getTaskId(projectId),
442+
getTaskId(projectId, projectResolver.supportsMultipleProjects()),
442443
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
443444
ActionListener.runAfter(listener, () -> {
444445
IndexAbstraction databasesAbstraction = clusterService.state()
@@ -471,7 +472,7 @@ public GeoIpDownloader getTask(ProjectId projectId) {
471472
return null;
472473
}
473474

474-
private String getTaskId(ProjectId projectId) {
475-
return projectResolver.supportsMultipleProjects() ? projectId + "/" + GEOIP_DOWNLOADER : GEOIP_DOWNLOADER;
475+
public static String getTaskId(ProjectId projectId, boolean supportsMultipleProjects) {
476+
return supportsMultipleProjects ? projectId + "/" + GEOIP_DOWNLOADER : GEOIP_DOWNLOADER;
476477
}
477478
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -180,16 +180,19 @@ public static final class DatabaseVerifyingSupplier implements CheckedSupplier<I
180180
private final IpDatabaseProvider ipDatabaseProvider;
181181
private final String databaseFile;
182182
private final String databaseType;
183+
private final ProjectId projectId;
183184

184-
public DatabaseVerifyingSupplier(IpDatabaseProvider ipDatabaseProvider, String databaseFile, String databaseType) {
185+
public DatabaseVerifyingSupplier(IpDatabaseProvider ipDatabaseProvider, String databaseFile, String databaseType,
186+
ProjectId projectId) {
185187
this.ipDatabaseProvider = ipDatabaseProvider;
186188
this.databaseFile = databaseFile;
187189
this.databaseType = databaseType;
190+
this.projectId = projectId;
188191
}
189192

190193
@Override
191194
public IpDatabase get() throws IOException {
192-
IpDatabase loader = ipDatabaseProvider.getDatabase(databaseFile);
195+
IpDatabase loader = ipDatabaseProvider.getDatabase(projectId, databaseFile);
193196
if (loader == null) {
194197
return null;
195198
}
@@ -242,7 +245,7 @@ public Processor create(
242245
readBooleanProperty(type, processorTag, config, "download_database_on_pipeline_creation", true);
243246

244247
final String databaseType;
245-
try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(databaseFile)) {
248+
try (IpDatabase ipDatabase = ipDatabaseProvider.getDatabase(projectId, databaseFile)) {
246249
if (ipDatabase == null) {
247250
// It's possible that the database could be downloaded via the GeoipDownloader process and could become available
248251
// at a later moment, so a processor impl is returned that tags documents instead. If a database cannot be sourced
@@ -302,8 +305,8 @@ public Processor create(
302305
processorTag,
303306
description,
304307
ipField,
305-
new DatabaseVerifyingSupplier(ipDatabaseProvider, databaseFile, databaseType),
306-
() -> ipDatabaseProvider.isValid(databaseFile),
308+
new DatabaseVerifyingSupplier(ipDatabaseProvider, databaseFile, databaseType, projectId),
309+
() -> ipDatabaseProvider.isValid(projectId, databaseFile),
307310
targetField,
308311
ipDataLookup,
309312
ignoreMissing,

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpTaskState.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.TransportVersion;
1313
import org.elasticsearch.TransportVersions;
1414
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1516
import org.elasticsearch.common.io.stream.StreamInput;
1617
import org.elasticsearch.common.io.stream.StreamOutput;
1718
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
@@ -252,16 +253,31 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
252253
}
253254
}
254255

256+
// /**
257+
// * Retrieves the geoip downloader's task state from the cluster state. This may return null in some circumstances,
258+
// * for example if the geoip downloader task hasn't been created yet (which it wouldn't be if it's disabled).
259+
// *
260+
// * @param state the cluster state to read the task state from
261+
// * @return the geoip downloader's task state or null if there is not a state to read
262+
// */
263+
// @Nullable
264+
// @Deprecated(forRemoval = true)
265+
// static GeoIpTaskState getGeoIpTaskState(ClusterState state) {
266+
// PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER);
267+
// return (task == null) ? null : (GeoIpTaskState) task.getState();
268+
// }
269+
255270
/**
256-
* Retrieves the geoip downloader's task state from the cluster state. This may return null in some circumstances,
271+
* Retrieves the geoip downloader's task state from the project metadata. This may return null in some circumstances,
257272
* for example if the geoip downloader task hasn't been created yet (which it wouldn't be if it's disabled).
258273
*
259-
* @param state the cluster state to read the task state from
274+
* @param projectMetadata the project metatdata to read the task state from.
275+
* @param taskId the task ID of the geoip downloader task to read the state for.
260276
* @return the geoip downloader's task state or null if there is not a state to read
261277
*/
262278
@Nullable
263-
static GeoIpTaskState getGeoIpTaskState(ClusterState state) {
264-
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(state, GeoIpDownloader.GEOIP_DOWNLOADER);
279+
static GeoIpTaskState getGeoIpTaskState(ProjectMetadata projectMetadata, String taskId) {
280+
PersistentTasksCustomMetadata.PersistentTask<?> task = getTaskWithId(projectMetadata, taskId);
265281
return (task == null) ? null : (GeoIpTaskState) task.getState();
266282
}
267283

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
139139
public Collection<?> createComponents(PluginServices services) {
140140
try {
141141
String nodeId = services.nodeEnvironment().nodeId();
142-
databaseRegistry.get().initialize(nodeId, services.resourceWatcherService(), ingestService.get());
142+
databaseRegistry.get().initialize(nodeId, services.resourceWatcherService(), ingestService.get(), services.projectResolver());
143143
} catch (IOException e) {
144144
throw new UncheckedIOException(e);
145145
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDatabaseProvider.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.ingest.geoip;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
1214
/**
1315
* Provides construction and initialization logic for {@link IpDatabase} instances.
1416
*/
@@ -20,15 +22,17 @@ public interface IpDatabaseProvider {
2022
* Verifying database expiration is left to each provider implementation to determine. A return value of <code>false</code> does not
2123
* preclude the possibility of a provider returning <code>true</code> in the future.
2224
*
25+
* @param projectId projectId to look for database.
2326
* @param name the name of the database to provide.
2427
* @return <code>false</code> IFF the requested database file is expired,
2528
* <code>true</code> for all other cases (including unknown file name, file missing, wrong database type, etc).
2629
*/
27-
Boolean isValid(String name);
30+
Boolean isValid(ProjectId projectId, String name);
2831

2932
/**
33+
* @param projectId projectId to look for database.
3034
* @param name the name of the database to provide.
3135
* @return a ready-to-use database instance, or <code>null</code> if no database could be loaded.
3236
*/
33-
IpDatabase getDatabase(String name);
37+
IpDatabase getDatabase(ProjectId projectId, String name);
3438
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/stats/GeoIpStatsTransportAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.FailedNodeException;
1313
import org.elasticsearch.action.support.ActionFilters;
1414
import org.elasticsearch.action.support.nodes.TransportNodesAction;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
1516
import org.elasticsearch.cluster.node.DiscoveryNode;
1617
import org.elasticsearch.cluster.project.ProjectResolver;
1718
import org.elasticsearch.cluster.service.ClusterService;
@@ -77,15 +78,16 @@ protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throw
7778

7879
@Override
7980
protected NodeResponse nodeOperation(NodeRequest request, Task task) {
80-
GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getTask(projectResolver.getProjectId());
81+
ProjectId projectId = projectResolver.getProjectId();
82+
GeoIpDownloader geoIpTask = geoIpDownloaderTaskExecutor.getTask(projectId);
8183
GeoIpDownloaderStats downloaderStats = geoIpTask == null || geoIpTask.getStatus() == null ? null : geoIpTask.getStatus();
8284
CacheStats cacheStats = registry.getCacheStats();
8385
return new NodeResponse(
8486
transportService.getLocalNode(),
8587
downloaderStats,
8688
cacheStats,
87-
registry.getAvailableDatabases(),
88-
registry.getFilesInTemp(),
89+
registry.getAvailableDatabases(projectId),
90+
registry.getFilesInTemp(projectId),
8991
registry.getConfigDatabases()
9092
);
9193
}

0 commit comments

Comments
 (0)