Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2ca311c
[WIP] Add option for byte-based capacity for geoip cache
PeteGillinElastic Jun 10, 2025
1b2afaf
Constrain the generic type parameter of `IpDataLookup.Result` to exte…
PeteGillinElastic Jun 11, 2025
ea4d2f0
Revert a change to the test code from the original commit, which is n…
PeteGillinElastic Jun 11, 2025
9f6b995
Minor cleanup
PeteGillinElastic Jun 11, 2025
3a92e98
Add unit tests for byte-capacity cache
PeteGillinElastic Jun 11, 2025
e3b48b1
Add a new setting which triggers the byte-based capacity
PeteGillinElastic Jun 11, 2025
30cb5be
fix long line
PeteGillinElastic Jun 11, 2025
0d65209
Revert changes to `Cacheable[City|Country]Response`, including making…
PeteGillinElastic Jun 12, 2025
91fa87b
correct comment
PeteGillinElastic Jun 12, 2025
e01bf83
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jun 18, 2025
7759a1e
minor tweaks for readability, test coverage
PeteGillinElastic Jun 18, 2025
3fe30ec
Add size of project ID to key size
PeteGillinElastic Jun 18, 2025
88869f8
[CI] Auto commit changes from spotless
Jun 18, 2025
f61726c
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jun 19, 2025
0f1738d
Move base and rename response interface
PeteGillinElastic Jun 19, 2025
48a4d55
Do a horrible thing as an experimental workaround for the logstash issue
PeteGillinElastic Jun 19, 2025
45741c1
Tweaking the horrible thing
PeteGillinElastic Jun 19, 2025
4adc52b
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jun 19, 2025
dcc6286
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jul 8, 2025
6f77be4
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jul 9, 2025
eb90023
Do a horrible ugly thing. This throws away some type safety, and requ…
PeteGillinElastic Jul 10, 2025
24601ac
Try a third version
PeteGillinElastic Jul 10, 2025
b70b110
[CI] Auto commit changes from spotless
Jul 10, 2025
376f681
Include file I forgot to add
PeteGillinElastic Jul 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.Country;

import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -102,11 +99,9 @@ private void assertValidDatabase(DatabaseNodeService databaseNodeService, String
IpDatabase database = databaseNodeService.getDatabase(projectId, databaseFileName);
assertNotNull(database);
assertThat(database.getDatabaseType(), equalTo(databaseType));
CountryResponse countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry);
GeoIpTestUtils.SimpleCountry countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry);
assertNotNull(countryResponse);
Country country = countryResponse.getCountry();
assertNotNull(country);
assertThat(country.getName(), equalTo("Sweden"));
assertThat(countryResponse.countryName(), equalTo("Sweden"));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private static DatabaseNodeService createRegistry(
ClusterService clusterService,
ProjectResolver projectResolver
) throws IOException {
GeoIpCache cache = new GeoIpCache(0);
GeoIpCache cache = GeoIpCache.createGeoIpCacheWithMaxCount(0);
ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache);
copyDefaultDatabases(geoIpConfigDir, configDatabases);
DatabaseNodeService databaseNodeService = new DatabaseNodeService(
Expand All @@ -242,10 +242,12 @@ private static DatabaseNodeService createRegistry(
private static void lazyLoadReaders(ProjectId projectId, DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get(projectId, "GeoLite2-City.mmdb") != null) {
databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
IpDatabase ipDatabase = databaseNodeService.get(projectId, "GeoLite2-City.mmdb");
ipDatabase.getResponse("2.125.160.216", GeoIpTestUtils::getCity);
}
databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
IpDatabase ipDatabase = databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb");
ipDatabase.getResponse("2.125.160.216", GeoIpTestUtils::getCity);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest.geoip;

import com.maxmind.db.Reader;

import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.core.Nullable;

public interface CacheingIpDatabase extends IpDatabase {

@Nullable
<RESPONSE extends GeoIpCache.Response> RESPONSE getCacheableResponse(
String ipAddress,
CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* Facilitates lazy loading of the database reader, so that when the geoip plugin is installed, but not used,
* no memory is being wasted on the database reader.
*/
public class DatabaseReaderLazyLoader implements IpDatabase {
public class DatabaseReaderLazyLoader implements CacheingIpDatabase {

private static final boolean LOAD_DATABASE_ON_HEAP = Booleans.parseBoolean(System.getProperty("es.geoip.load_db_on_heap", "false"));

Expand Down Expand Up @@ -116,6 +116,15 @@ int current() {
@Override
@Nullable
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
// TODO(pete): Improve the documentation here.
throw new UnsupportedOperationException("This is a CacheingIpDatabase, callers should use get getCacheableResponse instead");
}

@Nullable
public <RESPONSE extends GeoIpCache.Response> RESPONSE getCacheableResponse(
String ipAddress,
CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider
) {
return cache.putIfAbsent(projectId, ipAddress, cachedDatabasePathToString, ip -> {
try {
return responseProvider.apply(get(), ipAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import com.maxmind.db.NodeCache;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.geoip.stats.CacheStats;

Expand All @@ -29,44 +31,66 @@
*/
public final class GeoIpCache {

static GeoIpCache createGeoIpCacheWithMaxCount(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
}
return new GeoIpCache(System::nanoTime, CacheBuilder.<CacheKey, Response>builder().setMaximumWeight(maxSize).build());
}

static GeoIpCache createGeoIpCacheWithMaxBytes(ByteSizeValue maxByteSize) {
if (maxByteSize.getBytes() < 0) {
throw new IllegalArgumentException("geoip max cache size in bytes must be 0 or greater");
}
return new GeoIpCache(
System::nanoTime,
CacheBuilder.<CacheKey, Response>builder()
.setMaximumWeight(maxByteSize.getBytes())
.weigher((key, value) -> key.sizeInBytes() + value.sizeInBytes())
.build()
);
}

// package private for testing
static GeoIpCache createGeoIpCacheWithMaxCountAndCustomTimeProvider(long maxSize, LongSupplier relativeNanoTimeProvider) {
return new GeoIpCache(relativeNanoTimeProvider, CacheBuilder.<CacheKey, Response>builder().setMaximumWeight(maxSize).build());
}

/**
* Internal-only sentinel object for recording that a result from the geoip database was null (i.e. there was no result). By caching
* this no-result we can distinguish between something not being in the cache because we haven't searched for that data yet, versus
* something not being in the cache because the data doesn't exist in the database.
*/
// visible for testing
static final Object NO_RESULT = new Object() {
static final Response NO_RESULT = new Response() {
@Override
public String toString() {
return "NO_RESULT";
}
};

private final LongSupplier relativeNanoTimeProvider;
private final Cache<CacheKey, Object> cache;
private final Cache<CacheKey, Response> cache;
private final AtomicLong hitsTimeInNanos = new AtomicLong(0);
private final AtomicLong missesTimeInNanos = new AtomicLong(0);

// package private for testing
GeoIpCache(long maxSize, LongSupplier relativeNanoTimeProvider) {
if (maxSize < 0) {
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
}
private GeoIpCache(LongSupplier relativeNanoTimeProvider, Cache<CacheKey, Response> cache) {
this.relativeNanoTimeProvider = relativeNanoTimeProvider;
this.cache = CacheBuilder.<CacheKey, Object>builder().setMaximumWeight(maxSize).build();
}

GeoIpCache(long maxSize) {
this(maxSize, System::nanoTime);
this.cache = cache;
}

@SuppressWarnings("unchecked")
<RESPONSE> RESPONSE putIfAbsent(ProjectId projectId, String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
<RESPONSE extends Response> RESPONSE putIfAbsent(
ProjectId projectId,
String ip,
String databasePath,
Function<String, RESPONSE> retrieveFunction
) {
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
long cacheStart = relativeNanoTimeProvider.getAsLong();
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
Object response = cache.get(cacheKey);
Response response = cache.get(cacheKey);
long cacheRequestTime = relativeNanoTimeProvider.getAsLong() - cacheStart;

// populate the cache for this key, if necessary
Expand Down Expand Up @@ -131,10 +155,30 @@ public CacheStats getCacheStats() {
);
}

public interface Response {

// TODO PETE: Remove this default implementation and implement in all implementing classes instead
default long sizeInBytes() {
return 0;
}
}

/**
* The key to use for the cache. Since this cache can span multiple geoip processors that all use different databases, the database
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
* IP may be in both with different values and we need to cache both.
*/
private record CacheKey(ProjectId projectId, String ip, String databasePath) {}
private record CacheKey(ProjectId projectId, String ip, String databasePath) {

private static final long BASE_BYTES = RamUsageEstimator.shallowSizeOfInstance(CacheKey.class);

private long sizeInBytes() {
return keySizeInBytes(projectId, ip, databasePath);
}
}

// visible for testing
static long keySizeInBytes(ProjectId projectId, String ip, String databasePath) {
return CacheKey.BASE_BYTES + projectId.sizeInBytes() + RamUsageEstimator.sizeOf(ip) + RamUsageEstimator.sizeOf(databasePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Strings;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams;
Expand Down Expand Up @@ -86,7 +88,19 @@ public class IngestGeoIpPlugin extends Plugin
PersistentTaskPlugin,
ActionPlugin,
ReloadablePlugin {
public static final Setting<Long> CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);

private static final Setting<Long> CACHE_SIZE_COUNT = Setting.longSetting(
"ingest.geoip.cache_size",
1000,
0,
Setting.Property.NodeScope
);

private static final Setting<ByteSizeValue> CACHE_SIZE_BYTES = Setting.byteSizeSetting(
"ingest.geoip.cache_memory_size",
ByteSizeValue.MINUS_ONE,
Setting.Property.NodeScope
);
private static final int GEOIP_INDEX_MAPPINGS_VERSION = 1;
/**
* No longer used for determining the age of mappings, but system index descriptor
Expand All @@ -105,7 +119,8 @@ public class IngestGeoIpPlugin extends Plugin
@Override
public List<Setting<?>> getSettings() {
return List.of(
CACHE_SIZE,
CACHE_SIZE_COUNT,
CACHE_SIZE_BYTES,
GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING,
GeoIpDownloaderTaskExecutor.ENABLED_SETTING,
GeoIpDownloader.ENDPOINT_SETTING,
Expand All @@ -119,8 +134,7 @@ public List<Setting<?>> getSettings() {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
ingestService.set(parameters.ingestService);

long cacheSize = CACHE_SIZE.get(parameters.env.settings());
GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
GeoIpCache geoIpCache = createGeoIpCache(parameters.env.settings());
DatabaseNodeService registry = new DatabaseNodeService(
parameters.env,
parameters.client,
Expand All @@ -137,6 +151,30 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
);
}

private static GeoIpCache createGeoIpCache(Settings settings) {
if (settings.hasValue(CACHE_SIZE_BYTES.getKey())) {
if (settings.hasValue(CACHE_SIZE_COUNT.getKey())) {
// Both CACHE_SIZE_COUNT and CACHE_SIZE_BYTES are set, which is an error:
throw new IllegalArgumentException(
Strings.format(
"Both %s and %s are set: "
+ "please use either %s to set a size based on count or %s to set a size based on bytes of memory",
CACHE_SIZE_COUNT.getKey(),
CACHE_SIZE_BYTES.getKey(),
CACHE_SIZE_COUNT.getKey(),
CACHE_SIZE_BYTES.getKey()
)
);
} else {
// Only CACHE_SIZE_BYTES is set, so use that:
return GeoIpCache.createGeoIpCacheWithMaxBytes(CACHE_SIZE_BYTES.get(settings));
}
} else {
// CACHE_SIZE_BYTES is not set, so use either the explicit or default value of CACHE_SIZE_COUNT:
return GeoIpCache.createGeoIpCacheWithMaxCount(CACHE_SIZE_COUNT.get(settings));
}
}

@Override
public Collection<?> createComponents(PluginServices services) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@

package org.elasticsearch.ingest.geoip;

import com.maxmind.db.Reader;

import org.elasticsearch.common.CheckedBiFunction;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
Expand All @@ -24,6 +28,18 @@ interface IpDataLookup {
*/
Map<String, Object> getData(IpDatabase ipDatabase, String ip) throws IOException;

default <RECORD extends GeoIpCache.Response> RECORD databaseLookup(
IpDatabase ipDatabase,
String ipAddress,
CheckedBiFunction<Reader, String, RECORD, Exception> responseProvider
) {
if (ipDatabase instanceof CacheingIpDatabase cacheingIpDatabase) {
return cacheingIpDatabase.getCacheableResponse(ipAddress, responseProvider);
} else {
return ipDatabase.getResponse(ipAddress, responseProvider);
}
}

/**
* @return the set of properties this lookup will provide
*/
Expand All @@ -34,5 +50,5 @@ interface IpDataLookup {
* as a network for which the record applies. Having a helper record prevents each individual response record from needing to
* track these bits of information.
*/
record Result<T>(T result, String ip, String network) {}
record Result<T extends GeoIpCache.Response>(T result, String ip, String network) implements GeoIpCache.Response {}
}
Loading