Skip to content

Commit 2ca311c

Browse files
[WIP] Add option for byte-based capacity for geoip cache
This adds the option to have a geoip cache with a byte-based rather than a count-based capacity.
1 parent 98e9514 commit 2ca311c

File tree

16 files changed

+179
-78
lines changed

16 files changed

+179
-78
lines changed

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/DatabaseNodeServiceIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private void assertValidDatabase(DatabaseNodeService databaseNodeService, String
9696
IpDatabase database = databaseNodeService.getDatabase(databaseFileName);
9797
assertNotNull(database);
9898
assertThat(database.getDatabaseType(), equalTo(databaseType));
99-
CountryResponse countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry);
99+
CountryResponse countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry).result();
100100
assertNotNull(countryResponse);
101101
Country country = countryResponse.getCountry();
102102
assertNotNull(country);

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public void test() throws Exception {
195195

196196
private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoIpTmpDir, ClusterService clusterService)
197197
throws IOException {
198-
GeoIpCache cache = new GeoIpCache(0);
198+
GeoIpCache cache = GeoIpCache.createGeoIpCacheWithMaxCount(0);
199199
ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache);
200200
copyDefaultDatabases(geoIpConfigDir, configDatabases);
201201
DatabaseNodeService databaseNodeService = new DatabaseNodeService(

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ int current() {
105105

106106
@Override
107107
@Nullable
108-
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
108+
public <RESPONSE extends GeoIpCache.CacheableValue> RESPONSE getResponse(
109+
String ipAddress,
110+
CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider
111+
) {
109112
return cache.putIfAbsent(ipAddress, cachedDatabasePathToString, ip -> {
110113
try {
111114
return responseProvider.apply(get(), ipAddress);

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010

1111
import com.maxmind.db.NodeCache;
1212

13+
import org.apache.lucene.util.RamUsageEstimator;
1314
import org.elasticsearch.common.cache.Cache;
1415
import org.elasticsearch.common.cache.CacheBuilder;
16+
import org.elasticsearch.common.unit.ByteSizeValue;
1517
import org.elasticsearch.core.TimeValue;
1618
import org.elasticsearch.ingest.geoip.stats.CacheStats;
1719

@@ -28,44 +30,71 @@
2830
*/
2931
public final class GeoIpCache {
3032

33+
public interface CacheableValue {
34+
35+
// TODO PETE: Remove this default implementation and implement in all implementing classes instead
36+
default long sizeInBytes() {
37+
return 0;
38+
}
39+
}
40+
41+
static <V> GeoIpCache createGeoIpCacheWithMaxCount(long maxSize) {
42+
if (maxSize < 0) {
43+
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
44+
}
45+
return new GeoIpCache(System::nanoTime, CacheBuilder.<CacheKey, CacheableValue>builder().setMaximumWeight(maxSize).build());
46+
}
47+
48+
// TODO PETE: Add tests for this
49+
// TODO PETE: Make plugin use this instead of the other factory method when the settings require it
50+
static GeoIpCache createGeoIpCacheWithMaxBytes(ByteSizeValue maxByteSize) {
51+
if (maxByteSize.getBytes() < 0) {
52+
throw new IllegalArgumentException("geoip max cache size in bytes must be 0 or greater");
53+
}
54+
return new GeoIpCache(
55+
System::nanoTime,
56+
CacheBuilder.<CacheKey, CacheableValue>builder()
57+
.setMaximumWeight(maxByteSize.getBytes())
58+
.weigher((key, value) -> key.sizeInBytes() + value.sizeInBytes())
59+
.build()
60+
);
61+
}
62+
63+
// package private for testing
64+
static GeoIpCache createGeoIpCacheWithMaxCountAndCustomTimeProvider(long maxSize, LongSupplier relativeNanoTimeProvider) {
65+
return new GeoIpCache(relativeNanoTimeProvider, CacheBuilder.<CacheKey, CacheableValue>builder().setMaximumWeight(maxSize).build());
66+
}
67+
3168
/**
3269
* Internal-only sentinel object for recording that a result from the geoip database was null (i.e. there was no result). By caching
3370
* this no-result we can distinguish between something not being in the cache because we haven't searched for that data yet, versus
3471
* something not being in the cache because the data doesn't exist in the database.
3572
*/
3673
// visible for testing
37-
static final Object NO_RESULT = new Object() {
74+
static final CacheableValue NO_RESULT = new CacheableValue() {
3875
@Override
3976
public String toString() {
4077
return "NO_RESULT";
4178
}
4279
};
4380

4481
private final LongSupplier relativeNanoTimeProvider;
45-
private final Cache<CacheKey, Object> cache;
82+
private final Cache<CacheKey, CacheableValue> cache;
4683
private final AtomicLong hitsTimeInNanos = new AtomicLong(0);
4784
private final AtomicLong missesTimeInNanos = new AtomicLong(0);
4885

49-
// package private for testing
50-
GeoIpCache(long maxSize, LongSupplier relativeNanoTimeProvider) {
51-
if (maxSize < 0) {
52-
throw new IllegalArgumentException("geoip max cache size must be 0 or greater");
53-
}
86+
private GeoIpCache(LongSupplier relativeNanoTimeProvider, Cache<CacheKey, CacheableValue> cache) {
5487
this.relativeNanoTimeProvider = relativeNanoTimeProvider;
55-
this.cache = CacheBuilder.<CacheKey, Object>builder().setMaximumWeight(maxSize).build();
56-
}
57-
58-
GeoIpCache(long maxSize) {
59-
this(maxSize, System::nanoTime);
88+
this.cache = cache;
6089
}
6190

6291
@SuppressWarnings("unchecked")
63-
<RESPONSE> RESPONSE putIfAbsent(String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
92+
<RESPONSE extends CacheableValue> RESPONSE putIfAbsent(String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
6493
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
6594
CacheKey cacheKey = new CacheKey(ip, databasePath);
6695
long cacheStart = relativeNanoTimeProvider.getAsLong();
6796
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
68-
Object response = cache.get(cacheKey);
97+
CacheableValue response = cache.get(cacheKey);
6998
long cacheRequestTime = relativeNanoTimeProvider.getAsLong() - cacheStart;
7099

71100
// populate the cache for this key, if necessary
@@ -135,5 +164,12 @@ public CacheStats getCacheStats() {
135164
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
136165
* IP may be in both with different values and we need to cache both.
137166
*/
138-
private record CacheKey(String ip, String databasePath) {}
167+
private record CacheKey(String ip, String databasePath) {
168+
169+
private static final long BASE_BYTES = RamUsageEstimator.shallowSizeOfInstance(CacheKey.class);
170+
171+
public long sizeInBytes() {
172+
return BASE_BYTES + RamUsageEstimator.sizeOf(ip) + RamUsageEstimator.sizeOf(databasePath);
173+
}
174+
}
139175
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
120120
ingestService.set(parameters.ingestService);
121121

122122
long cacheSize = CACHE_SIZE.get(parameters.env.settings());
123-
GeoIpCache geoIpCache = new GeoIpCache(cacheSize);
123+
GeoIpCache geoIpCache = GeoIpCache.createGeoIpCacheWithMaxCount(cacheSize);
124124
DatabaseNodeService registry = new DatabaseNodeService(
125125
parameters.env,
126126
parameters.client,

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDataLookup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ interface IpDataLookup {
3434
* as a network for which the record applies. Having a helper record prevents each individual response record from needing to
3535
* track these bits of information.
3636
*/
37-
record Result<T>(T result, String ip, String network) {}
37+
record Result<T>(T result, String ip, String network) implements GeoIpCache.CacheableValue {}
3838
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpDatabase.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,12 @@ public interface IpDatabase extends AutoCloseable {
3333
* @param ipAddress the address to lookup
3434
* @param responseProvider a method for extracting a response from a {@link Reader}, usually this will be a method reference
3535
* @return a possibly-null response
36-
* @param <RESPONSE> the type of response that will be returned
3736
*/
3837
@Nullable
39-
<RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider);
38+
<RESPONSE extends GeoIpCache.CacheableValue> RESPONSE getResponse(
39+
String ipAddress,
40+
CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider
41+
);
4042

4143
/**
4244
* Releases the current database object. Called after processing a single document. Databases should be closed or returned to a

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/MaxmindIpDataLookups.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,23 @@ static Function<Set<Database.Property>, IpDataLookup> getMaxmindLookup(final Dat
109109
};
110110
}
111111

112-
static class AnonymousIp extends AbstractBase<AnonymousIpResponse, AnonymousIpResponse> {
112+
static class CacheableAnonymousIpResponse extends AnonymousIpResponse implements GeoIpCache.CacheableValue {
113+
114+
CacheableAnonymousIpResponse(AnonymousIpResponse response) {
115+
super(
116+
response.getIpAddress(),
117+
response.isAnonymous(),
118+
response.isAnonymousVpn(),
119+
response.isHostingProvider(),
120+
response.isPublicProxy(),
121+
response.isResidentialProxy(),
122+
response.isTorExitNode(),
123+
response.getNetwork()
124+
);
125+
}
126+
}
127+
128+
static class AnonymousIp extends AbstractBase<AnonymousIpResponse, CacheableAnonymousIpResponse> {
113129
AnonymousIp(final Set<Database.Property> properties) {
114130
super(
115131
properties,
@@ -119,12 +135,12 @@ static class AnonymousIp extends AbstractBase<AnonymousIpResponse, AnonymousIpRe
119135
}
120136

121137
@Override
122-
protected AnonymousIpResponse cacheableRecord(AnonymousIpResponse response) {
123-
return response;
138+
protected CacheableAnonymousIpResponse cacheableRecord(AnonymousIpResponse response) {
139+
return new CacheableAnonymousIpResponse(response);
124140
}
125141

126142
@Override
127-
protected Map<String, Object> transform(final AnonymousIpResponse response) {
143+
protected Map<String, Object> transform(final CacheableAnonymousIpResponse response) {
128144
boolean isHostingProvider = response.isHostingProvider();
129145
boolean isTorExitNode = response.isTorExitNode();
130146
boolean isAnonymousVpn = response.isAnonymousVpn();
@@ -160,18 +176,30 @@ protected Map<String, Object> transform(final AnonymousIpResponse response) {
160176
}
161177
}
162178

163-
static class Asn extends AbstractBase<AsnResponse, AsnResponse> {
179+
static class CacheableAsnResponse extends AsnResponse implements GeoIpCache.CacheableValue {
180+
181+
CacheableAsnResponse(AsnResponse response) {
182+
super(
183+
response.getAutonomousSystemNumber(),
184+
response.getAutonomousSystemOrganization(),
185+
response.getIpAddress(),
186+
response.getNetwork()
187+
);
188+
}
189+
}
190+
191+
static class Asn extends AbstractBase<AsnResponse, CacheableAsnResponse> {
164192
Asn(Set<Database.Property> properties) {
165193
super(properties, AsnResponse.class, (response, ipAddress, network, locales) -> new AsnResponse(response, ipAddress, network));
166194
}
167195

168196
@Override
169-
protected AsnResponse cacheableRecord(AsnResponse response) {
170-
return response;
197+
protected CacheableAsnResponse cacheableRecord(AsnResponse response) {
198+
return new CacheableAsnResponse(response);
171199
}
172200

173201
@Override
174-
protected Map<String, Object> transform(final AsnResponse response) {
202+
protected Map<String, Object> transform(final CacheableAsnResponse response) {
175203
Long asn = response.getAutonomousSystemNumber();
176204
String organizationName = response.getAutonomousSystemOrganization();
177205
Network network = response.getNetwork();
@@ -354,7 +382,14 @@ protected Map<String, Object> transform(final Result<CacheableCityResponse> resu
354382
}
355383
}
356384

357-
static class ConnectionType extends AbstractBase<ConnectionTypeResponse, ConnectionTypeResponse> {
385+
static class CacheableConnectionTypeResponse extends ConnectionTypeResponse implements GeoIpCache.CacheableValue {
386+
387+
CacheableConnectionTypeResponse(ConnectionTypeResponse response) {
388+
super(response.getConnectionType(), response.getIpAddress(), response.getNetwork());
389+
}
390+
}
391+
392+
static class ConnectionType extends AbstractBase<ConnectionTypeResponse, CacheableConnectionTypeResponse> {
358393
ConnectionType(final Set<Database.Property> properties) {
359394
super(
360395
properties,
@@ -364,12 +399,12 @@ static class ConnectionType extends AbstractBase<ConnectionTypeResponse, Connect
364399
}
365400

366401
@Override
367-
protected ConnectionTypeResponse cacheableRecord(ConnectionTypeResponse response) {
368-
return response;
402+
protected CacheableConnectionTypeResponse cacheableRecord(ConnectionTypeResponse response) {
403+
return new CacheableConnectionTypeResponse(response);
369404
}
370405

371406
@Override
372-
protected Map<String, Object> transform(final ConnectionTypeResponse response) {
407+
protected Map<String, Object> transform(final CacheableConnectionTypeResponse response) {
373408
ConnectionTypeResponse.ConnectionType connectionType = response.getConnectionType();
374409

375410
Map<String, Object> data = new HashMap<>();
@@ -479,7 +514,14 @@ protected Map<String, Object> transform(final Result<CacheableCountryResponse> r
479514
}
480515
}
481516

482-
static class Domain extends AbstractBase<DomainResponse, DomainResponse> {
517+
static class CacheableDomainResponse extends DomainResponse implements GeoIpCache.CacheableValue {
518+
519+
CacheableDomainResponse(DomainResponse response) {
520+
super(response.getDomain(), response.getIpAddress(), response.getNetwork());
521+
}
522+
}
523+
524+
static class Domain extends AbstractBase<DomainResponse, CacheableDomainResponse> {
483525
Domain(final Set<Database.Property> properties) {
484526
super(
485527
properties,
@@ -489,12 +531,12 @@ static class Domain extends AbstractBase<DomainResponse, DomainResponse> {
489531
}
490532

491533
@Override
492-
protected DomainResponse cacheableRecord(DomainResponse response) {
493-
return response;
534+
protected CacheableDomainResponse cacheableRecord(DomainResponse response) {
535+
return new CacheableDomainResponse(response);
494536
}
495537

496538
@Override
497-
protected Map<String, Object> transform(final DomainResponse response) {
539+
protected Map<String, Object> transform(final CacheableDomainResponse response) {
498540
String domain = response.getDomain();
499541

500542
Map<String, Object> data = new HashMap<>();
@@ -784,18 +826,25 @@ protected Map<String, Object> transform(final Result<CacheableEnterpriseResponse
784826
}
785827
}
786828

787-
static class Isp extends AbstractBase<IspResponse, IspResponse> {
829+
static class CacheableIspResponse extends IspResponse implements GeoIpCache.CacheableValue {
830+
831+
CacheableIspResponse(IspResponse response) {
832+
super(response, response.getIpAddress(), response.getNetwork());
833+
}
834+
}
835+
836+
static class Isp extends AbstractBase<IspResponse, CacheableIspResponse> {
788837
Isp(final Set<Database.Property> properties) {
789838
super(properties, IspResponse.class, (response, ipAddress, network, locales) -> new IspResponse(response, ipAddress, network));
790839
}
791840

792841
@Override
793-
protected IspResponse cacheableRecord(IspResponse response) {
794-
return response;
842+
protected CacheableIspResponse cacheableRecord(IspResponse response) {
843+
return new CacheableIspResponse(response);
795844
}
796845

797846
@Override
798-
protected Map<String, Object> transform(final IspResponse response) {
847+
protected Map<String, Object> transform(final CacheableIspResponse response) {
799848
String isp = response.getIsp();
800849
String ispOrganization = response.getOrganization();
801850
String mobileNetworkCode = response.getMobileNetworkCode();
@@ -867,7 +916,9 @@ private interface ResponseBuilder<RESPONSE extends AbstractResponse> {
867916
*
868917
* @param <RESPONSE> the intermediate type of {@link AbstractResponse}
869918
*/
870-
private abstract static class AbstractBase<RESPONSE extends AbstractResponse, RECORD> implements IpDataLookup {
919+
private abstract static class AbstractBase<RESPONSE extends AbstractResponse, RECORD extends GeoIpCache.CacheableValue>
920+
implements
921+
IpDataLookup {
871922

872923
protected final Set<Database.Property> properties;
873924
protected final Class<RESPONSE> clazz;

0 commit comments

Comments
 (0)