Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2ca311c
[WIP] Add option for byte-based capacity for geoip cache
PeteGillinElastic Jun 10, 2025
1b2afaf
Constrain the generic type parameter of `IpDataLookup.Result` to exte…
PeteGillinElastic Jun 11, 2025
ea4d2f0
Revert a change to the test code from the original commit, which is n…
PeteGillinElastic Jun 11, 2025
9f6b995
Minor cleanup
PeteGillinElastic Jun 11, 2025
3a92e98
Add unit tests for byte-capacity cache
PeteGillinElastic Jun 11, 2025
e3b48b1
Add a new setting which triggers the byte-based capacity
PeteGillinElastic Jun 11, 2025
30cb5be
fix long line
PeteGillinElastic Jun 11, 2025
0d65209
Revert changes to `Cacheable[City|Country]Response`, including making…
PeteGillinElastic Jun 12, 2025
91fa87b
correct comment
PeteGillinElastic Jun 12, 2025
e01bf83
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jun 18, 2025
7759a1e
minor tweaks for readability, test coverage
PeteGillinElastic Jun 18, 2025
3fe30ec
Add size of project ID to key size
PeteGillinElastic Jun 18, 2025
88869f8
[CI] Auto commit changes from spotless
Jun 18, 2025
f61726c
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jun 19, 2025
0f1738d
Move base and rename response interface
PeteGillinElastic Jun 19, 2025
48a4d55
Do a horrible thing as an experimental workaround for the logstash issue
PeteGillinElastic Jun 19, 2025
45741c1
Tweaking the horrible thing
PeteGillinElastic Jun 19, 2025
4adc52b
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jun 19, 2025
dcc6286
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jul 8, 2025
6f77be4
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Jul 9, 2025
eaa8388
undo the horrid thing in favour of the one-line change in the logstas…
PeteGillinElastic Aug 25, 2025
3ebd856
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Aug 25, 2025
f9d01eb
flip default
PeteGillinElastic Aug 25, 2025
f15b1cf
move sizeInBytes out of ProjectId
PeteGillinElastic Aug 25, 2025
2182564
fix broken setting default, and other tweaks
PeteGillinElastic Aug 25, 2025
eb49256
Merge remote-tracking branch 'upstream/main' into ES-7713-geoip-cache…
PeteGillinElastic Aug 25, 2025
b514fa9
fix long line
PeteGillinElastic Aug 25, 2025
270ff2b
fix javadoc
PeteGillinElastic Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.Country;

import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -102,11 +99,9 @@ private void assertValidDatabase(DatabaseNodeService databaseNodeService, String
IpDatabase database = databaseNodeService.getDatabase(projectId, databaseFileName);
assertNotNull(database);
assertThat(database.getDatabaseType(), equalTo(databaseType));
CountryResponse countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry);
GeoIpTestUtils.SimpleCountry countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry);
assertNotNull(countryResponse);
Country country = countryResponse.getCountry();
assertNotNull(country);
assertThat(country.getName(), equalTo("Sweden"));
assertThat(countryResponse.countryName(), equalTo("Sweden"));
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ int current() {

@Override
@Nullable
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
public <RESPONSE extends Response> RESPONSE getResponse(
String ipAddress,
CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider
) {
return cache.putIfAbsent(projectId, ipAddress, cachedDatabasePathToString, ip -> {
try {
return responseProvider.apply(get(), ipAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import com.maxmind.db.NodeCache;

import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.cache.Cache;
import org.elasticsearch.common.cache.CacheBuilder;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.geoip.stats.CacheStats;

Expand All @@ -29,44 +31,69 @@
*/
public final class GeoIpCache {

static GeoIpCache createGeoIpCacheWithMaxCount(long maxSize) {
if (maxSize < 0) {
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
}
return new GeoIpCache(System::nanoTime, CacheBuilder.<CacheKey, IpDatabase.Response>builder().setMaximumWeight(maxSize).build());
}

static GeoIpCache createGeoIpCacheWithMaxBytes(ByteSizeValue maxByteSize) {
if (maxByteSize.getBytes() < 0) {
throw new IllegalArgumentException("geoip max cache size in bytes must be 0 or greater");
}
return new GeoIpCache(
System::nanoTime,
CacheBuilder.<CacheKey, IpDatabase.Response>builder()
.setMaximumWeight(maxByteSize.getBytes())
.weigher((key, value) -> key.sizeInBytes() + value.sizeInBytes())
.build()
);
}

// package private for testing
static GeoIpCache createGeoIpCacheWithMaxCountAndCustomTimeProvider(long maxSize, LongSupplier relativeNanoTimeProvider) {
return new GeoIpCache(
relativeNanoTimeProvider,
CacheBuilder.<CacheKey, IpDatabase.Response>builder().setMaximumWeight(maxSize).build()
);
}

/**
* Internal-only sentinel object for recording that a result from the geoip database was null (i.e. there was no result). By caching
* this no-result we can distinguish between something not being in the cache because we haven't searched for that data yet, versus
* something not being in the cache because the data doesn't exist in the database.
*/
// visible for testing
static final Object NO_RESULT = new Object() {
static final IpDatabase.Response NO_RESULT = new IpDatabase.Response() {
@Override
public String toString() {
return "NO_RESULT";
}
};

private final Cache<CacheKey, Object> cache;
private final Cache<CacheKey, IpDatabase.Response> cache;
private final LongSupplier relativeNanoTimeProvider;
private final LongAdder hitsTimeInNanos = new LongAdder();
private final LongAdder missesTimeInNanos = new LongAdder();

// package private for testing
GeoIpCache(long maxSize, LongSupplier relativeNanoTimeProvider) {
if (maxSize < 0) {
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
}
private GeoIpCache(LongSupplier relativeNanoTimeProvider, Cache<CacheKey, IpDatabase.Response> cache) {
this.relativeNanoTimeProvider = relativeNanoTimeProvider;
this.cache = CacheBuilder.<CacheKey, Object>builder().setMaximumWeight(maxSize).build();
}

GeoIpCache(long maxSize) {
this(maxSize, System::nanoTime);
this.cache = cache;
}

@SuppressWarnings("unchecked")
<RESPONSE> RESPONSE putIfAbsent(ProjectId projectId, String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
<RESPONSE extends IpDatabase.Response> RESPONSE putIfAbsent(
ProjectId projectId,
String ip,
String databasePath,
Function<String, RESPONSE> retrieveFunction
) {
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
long cacheStart = relativeNanoTimeProvider.getAsLong();
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
Object response = cache.get(cacheKey);
IpDatabase.Response response = cache.get(cacheKey);
long cacheRequestTime = relativeNanoTimeProvider.getAsLong() - cacheStart;

// populate the cache for this key, if necessary
Expand Down Expand Up @@ -136,5 +163,21 @@ public CacheStats getCacheStats() {
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
* IP may be in both with different values and we need to cache both.
*/
private record CacheKey(ProjectId projectId, String ip, String databasePath) {}
private record CacheKey(ProjectId projectId, String ip, String databasePath) {

private static final long BASE_BYTES = RamUsageEstimator.shallowSizeOfInstance(CacheKey.class);
private static final long PROJECT_ID_BASE_BYTES = RamUsageEstimator.shallowSizeOfInstance(ProjectId.class);

private long sizeInBytes() {
return keySizeInBytes(projectId, ip, databasePath);
}
}

// visible for testing
static long keySizeInBytes(ProjectId projectId, String ip, String databasePath) {
// TODO: Check this size computation before merging:
return CacheKey.BASE_BYTES + CacheKey.PROJECT_ID_BASE_BYTES + RamUsageEstimator.sizeOf(projectId.id()) + RamUsageEstimator.sizeOf(
ip
) + RamUsageEstimator.sizeOf(databasePath);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Strings;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.ingest.EnterpriseGeoIpTask.EnterpriseGeoIpTaskParams;
Expand All @@ -43,6 +45,7 @@
import org.elasticsearch.ingest.geoip.stats.GeoIpStatsAction;
import org.elasticsearch.ingest.geoip.stats.GeoIpStatsTransportAction;
import org.elasticsearch.ingest.geoip.stats.RestGeoIpStatsAction;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.persistent.PersistentTaskParams;
import org.elasticsearch.persistent.PersistentTaskState;
import org.elasticsearch.persistent.PersistentTasksExecutor;
Expand Down Expand Up @@ -86,7 +89,20 @@ public class IngestGeoIpPlugin extends Plugin
PersistentTaskPlugin,
ActionPlugin,
ReloadablePlugin {
public static final Setting<Long> CACHE_SIZE = Setting.longSetting("ingest.geoip.cache_size", 1000, 0, Setting.Property.NodeScope);

private static final Setting<Long> CACHE_SIZE_COUNT = Setting.longSetting(
"ingest.geoip.cache_size",
0, // default is never used, we use CACHE_SIZE_BYTES if this is not set
0,
Setting.Property.NodeScope
);

private static final Setting<ByteSizeValue> CACHE_SIZE_BYTES = Setting.byteSizeSetting(
"ingest.geoip.cache_memory_size",
// TODO: Think more carefully about this default before merging:
ByteSizeValue.ofBytes((long) (0.01 * JvmInfo.jvmInfo().getConfiguredMaxHeapSize())),
Setting.Property.NodeScope
);
private static final int GEOIP_INDEX_MAPPINGS_VERSION = 1;
/**
* No longer used for determining the age of mappings, but system index descriptor
Expand All @@ -105,7 +121,8 @@ public class IngestGeoIpPlugin extends Plugin
@Override
public List<Setting<?>> getSettings() {
return List.of(
CACHE_SIZE,
CACHE_SIZE_COUNT,
CACHE_SIZE_BYTES,
GeoIpDownloaderTaskExecutor.EAGER_DOWNLOAD_SETTING,
GeoIpDownloaderTaskExecutor.ENABLED_SETTING,
GeoIpDownloader.ENDPOINT_SETTING,
Expand All @@ -119,8 +136,7 @@ public List<Setting<?>> getSettings() {
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
ingestService.set(parameters.ingestService);

long cacheSize = CACHE_SIZE.get(parameters.env.settings());
GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
GeoIpCache geoIpCache = createGeoIpCache(parameters.env.settings());
DatabaseNodeService registry = new DatabaseNodeService(
parameters.env,
parameters.client,
Expand All @@ -137,6 +153,30 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
);
}

private static GeoIpCache createGeoIpCache(Settings settings) {
if (settings.hasValue(CACHE_SIZE_COUNT.getKey())) {
if (settings.hasValue(CACHE_SIZE_BYTES.getKey())) {
// Both CACHE_SIZE_COUNT and CACHE_SIZE_BYTES are set, which is an error:
throw new IllegalArgumentException(
Strings.format(
"Both %s and %s are set: "
+ "please use either %s to set a size based on count or %s to set a size based on bytes of memory",
CACHE_SIZE_COUNT.getKey(),
CACHE_SIZE_BYTES.getKey(),
CACHE_SIZE_COUNT.getKey(),
CACHE_SIZE_BYTES.getKey()
)
);
} else {
// Only CACHE_SIZE_COUNT is set, so use that:
return GeoIpCache.createGeoIpCacheWithMaxCount(CACHE_SIZE_COUNT.get(settings));
}
} else {
// CACHE_SIZE_COUNT is not set, so use either the explicit or default value of CACHE_SIZE_BYTES:
return GeoIpCache.createGeoIpCacheWithMaxBytes(CACHE_SIZE_BYTES.get(settings));
}
}

@Override
public Collection<?> createComponents(PluginServices services) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ interface IpDataLookup {
* as a network for which the record applies. Having a helper record prevents each individual response record from needing to
* track these bits of information.
*/
record Result<T>(T result, String ip, String network) {}
record Result<T extends IpDatabase.Response>(T result, String ip, String network) implements IpDatabase.Response {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ public interface IpDatabase extends AutoCloseable {
* @param <RESPONSE> the type of response that will be returned
*/
@Nullable
<RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider);
// TODO: This change requires a one-line change to an implementation in a logstash filter. Coordinate with that team before merging.
// See repo https://github.com/elastic/logstash-filter-elastic_integration,
// class co.elastic.logstash.filters.elasticintegration.geoip.IpDatabaseAdapter.
<RESPONSE extends Response> RESPONSE getResponse(
String ipAddress,
CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider
);

/**
* Releases the current database object. Called after processing a single document. Databases should be closed or returned to a
Expand All @@ -46,4 +52,12 @@ public interface IpDatabase extends AutoCloseable {
*/
@Override
void close() throws IOException;

interface Response {

// TODO: Remove this default implementation and implement in all implementing classes instead before merging:
default long sizeInBytes() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public record AsnResult(
String domain,
String name,
@Nullable String type // not present in the free asn database
) {
) implements IpDatabase.Response {
@SuppressWarnings("checkstyle:RedundantModifier")
@MaxMindDbConstructor
public AsnResult(
Expand All @@ -210,20 +210,14 @@ public record CountryResult(
@MaxMindDbParameter(name = "continent_name") String continentName,
@MaxMindDbParameter(name = "country") String country,
@MaxMindDbParameter(name = "country_name") String countryName
) {
) implements IpDatabase.Response {
@MaxMindDbConstructor
public CountryResult {}
}

public record GeolocationResult(
String city,
String country,
Double lat,
Double lng,
String postalCode,
String region,
String timezone
) {
public record GeolocationResult(String city, String country, Double lat, Double lng, String postalCode, String region, String timezone)
implements
IpDatabase.Response {
@SuppressWarnings("checkstyle:RedundantModifier")
@MaxMindDbConstructor
public GeolocationResult(
Expand All @@ -241,7 +235,9 @@ public GeolocationResult(
}
}

public record PrivacyDetectionResult(Boolean hosting, Boolean proxy, Boolean relay, String service, Boolean tor, Boolean vpn) {
public record PrivacyDetectionResult(Boolean hosting, Boolean proxy, Boolean relay, String service, Boolean tor, Boolean vpn)
implements
IpDatabase.Response {
@SuppressWarnings("checkstyle:RedundantModifier")
@MaxMindDbConstructor
public PrivacyDetectionResult(
Expand Down Expand Up @@ -466,7 +462,7 @@ protected Map<String, Object> transform(final Result<PrivacyDetectionResult> res
*
* @param <RESPONSE> the record type that will be wrapped and returned
*/
private abstract static class AbstractBase<RESPONSE> implements IpDataLookup {
private abstract static class AbstractBase<RESPONSE extends IpDatabase.Response> implements IpDataLookup {

protected final Set<Database.Property> properties;
protected final Class<RESPONSE> clazz;
Expand Down
Loading