Skip to content

Commit 4b72b95

Browse files
Add project ID to key for geoip cache
In this change, the lazy loader passes in the default project ID. That will be fixed in a later PR.
1 parent 51e05a5 commit 4b72b95

File tree

3 files changed

+81
-32
lines changed

3 files changed

+81
-32
lines changed

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/DatabaseReaderLazyLoader.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
import org.apache.logging.log4j.Logger;
1717
import org.apache.lucene.util.SetOnce;
1818
import org.elasticsearch.ExceptionsHelper;
19+
import org.elasticsearch.cluster.metadata.ProjectId;
1920
import org.elasticsearch.common.CheckedBiFunction;
2021
import org.elasticsearch.common.CheckedSupplier;
2122
import org.elasticsearch.core.Booleans;
23+
import org.elasticsearch.core.FixForMultiProject;
2224
import org.elasticsearch.core.IOUtils;
2325
import org.elasticsearch.core.Nullable;
2426
import org.elasticsearch.core.SuppressForbidden;
@@ -105,8 +107,9 @@ int current() {
105107

106108
@Override
107109
@Nullable
110+
@FixForMultiProject // do not use ProjectId.DEFAULT
108111
public <RESPONSE> RESPONSE getResponse(String ipAddress, CheckedBiFunction<Reader, String, RESPONSE, Exception> responseProvider) {
109-
return cache.putIfAbsent(ipAddress, cachedDatabasePathToString, ip -> {
112+
return cache.putIfAbsent(ProjectId.DEFAULT, ipAddress, cachedDatabasePathToString, ip -> {
110113
try {
111114
return responseProvider.apply(get(), ipAddress);
112115
} catch (Exception e) {
@@ -143,9 +146,10 @@ public void shutdown() throws IOException {
143146
}
144147

145148
// Visible for Testing
149+
@FixForMultiProject // do not use ProjectId.DEFAULT
146150
protected void doShutdown() throws IOException {
147151
IOUtils.close(databaseReader.get());
148-
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(databasePath);
152+
int numEntriesEvicted = cache.purgeCacheEntriesForDatabase(ProjectId.DEFAULT, databasePath);
149153
logger.info("evicted [{}] entries from cache after reloading database [{}]", numEntriesEvicted, databasePath);
150154
if (deleteDatabaseFileOnShutdown) {
151155
logger.info("deleting [{}]", databasePath);

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpCache.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import com.maxmind.db.NodeCache;
1212

13+
import org.elasticsearch.cluster.metadata.ProjectId;
1314
import org.elasticsearch.common.cache.Cache;
1415
import org.elasticsearch.common.cache.CacheBuilder;
1516
import org.elasticsearch.core.TimeValue;
@@ -60,9 +61,9 @@ public String toString() {
6061
}
6162

6263
@SuppressWarnings("unchecked")
63-
<RESPONSE> RESPONSE putIfAbsent(String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
64+
<RESPONSE> RESPONSE putIfAbsent(ProjectId projectId, String ip, String databasePath, Function<String, RESPONSE> retrieveFunction) {
6465
// can't use cache.computeIfAbsent due to the elevated permissions for the jackson (run via the cache loader)
65-
CacheKey cacheKey = new CacheKey(ip, databasePath);
66+
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
6667
long cacheStart = relativeNanoTimeProvider.getAsLong();
6768
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
6869
Object response = cache.get(cacheKey);
@@ -92,16 +93,16 @@ <RESPONSE> RESPONSE putIfAbsent(String ip, String databasePath, Function<String,
9293
}
9394

9495
// only useful for testing
95-
Object get(String ip, String databasePath) {
96-
CacheKey cacheKey = new CacheKey(ip, databasePath);
96+
Object get(ProjectId projectId, String ip, String databasePath) {
97+
CacheKey cacheKey = new CacheKey(projectId, ip, databasePath);
9798
return cache.get(cacheKey);
9899
}
99100

100-
public int purgeCacheEntriesForDatabase(Path databaseFile) {
101+
public int purgeCacheEntriesForDatabase(ProjectId projectId, Path databaseFile) {
101102
String databasePath = databaseFile.toString();
102103
int counter = 0;
103104
for (CacheKey key : cache.keys()) {
104-
if (key.databasePath.equals(databasePath)) {
105+
if (key.projectId.equals(projectId) && key.databasePath.equals(databasePath)) {
105106
cache.invalidate(key);
106107
counter++;
107108
}
@@ -135,5 +136,5 @@ public CacheStats getCacheStats() {
135136
* path is needed to be included in the cache key. For example, if we only used the IP address as the key the City and ASN the same
136137
* IP may be in both with different values and we need to cache both.
137138
*/
138-
private record CacheKey(String ip, String databasePath) {}
139+
private record CacheKey(ProjectId projectId, String ip, String databasePath) {}
139140
}

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpCacheTests.java

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111

1212
import com.maxmind.geoip2.model.AbstractResponse;
1313

14+
import org.elasticsearch.cluster.metadata.ProjectId;
1415
import org.elasticsearch.core.TimeValue;
1516
import org.elasticsearch.ingest.geoip.stats.CacheStats;
1617
import org.elasticsearch.test.ESTestCase;
1718

19+
import java.nio.file.Path;
1820
import java.util.concurrent.atomic.AtomicInteger;
1921
import java.util.concurrent.atomic.AtomicLong;
2022
import java.util.function.Function;
@@ -26,21 +28,22 @@ public class GeoIpCacheTests extends ESTestCase {
2628

2729
public void testCachesAndEvictsResults() {
2830
GeoIpCache cache = new GeoIpCache(1);
31+
ProjectId projectId = randomProjectIdOrDefault();
2932
AbstractResponse response1 = mock(AbstractResponse.class);
3033
AbstractResponse response2 = mock(AbstractResponse.class);
3134

3235
// add a key
33-
AbstractResponse cachedResponse = cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1);
36+
AbstractResponse cachedResponse = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1);
3437
assertSame(cachedResponse, response1);
35-
assertSame(cachedResponse, cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> response1));
36-
assertSame(cachedResponse, cache.get("127.0.0.1", "path/to/db"));
38+
assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> response1));
39+
assertSame(cachedResponse, cache.get(projectId, "127.0.0.1", "path/to/db"));
3740

3841
// evict old key by adding another value
39-
cachedResponse = cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2);
42+
cachedResponse = cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2);
4043
assertSame(cachedResponse, response2);
41-
assertSame(cachedResponse, cache.putIfAbsent("127.0.0.2", "path/to/db", ip -> response2));
42-
assertSame(cachedResponse, cache.get("127.0.0.2", "path/to/db"));
43-
assertNotSame(response1, cache.get("127.0.0.1", "path/to/db"));
44+
assertSame(cachedResponse, cache.putIfAbsent(projectId, "127.0.0.2", "path/to/db", ip -> response2));
45+
assertSame(cachedResponse, cache.get(projectId, "127.0.0.2", "path/to/db"));
46+
assertNotSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db"));
4447
}
4548

4649
public void testCachesNoResult() {
@@ -51,31 +54,47 @@ public void testCachesNoResult() {
5154
return null;
5255
};
5356

54-
AbstractResponse response = cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull);
57+
ProjectId projectId = randomProjectIdOrDefault();
58+
AbstractResponse response = cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull);
5559
assertNull(response);
56-
assertNull(cache.putIfAbsent("127.0.0.1", "path/to/db", countAndReturnNull));
60+
assertNull(cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", countAndReturnNull));
5761
assertEquals(1, count.get());
5862

5963
// the cached value is not actually *null*, it's the NO_RESULT sentinel
60-
assertSame(GeoIpCache.NO_RESULT, cache.get("127.0.0.1", "path/to/db"));
64+
assertSame(GeoIpCache.NO_RESULT, cache.get(projectId, "127.0.0.1", "path/to/db"));
6165
}
6266

63-
public void testCacheKey() {
67+
public void testCacheDoesNotCollideForDifferentDatabases() {
6468
GeoIpCache cache = new GeoIpCache(2);
6569
AbstractResponse response1 = mock(AbstractResponse.class);
6670
AbstractResponse response2 = mock(AbstractResponse.class);
6771

68-
assertSame(response1, cache.putIfAbsent("127.0.0.1", "path/to/db1", ip -> response1));
69-
assertSame(response2, cache.putIfAbsent("127.0.0.1", "path/to/db2", ip -> response2));
70-
assertSame(response1, cache.get("127.0.0.1", "path/to/db1"));
71-
assertSame(response2, cache.get("127.0.0.1", "path/to/db2"));
72+
ProjectId projectId = randomProjectIdOrDefault();
73+
assertSame(response1, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db1", ip -> response1));
74+
assertSame(response2, cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db2", ip -> response2));
75+
assertSame(response1, cache.get(projectId, "127.0.0.1", "path/to/db1"));
76+
assertSame(response2, cache.get(projectId, "127.0.0.1", "path/to/db2"));
77+
}
78+
79+
public void testCacheDoesNotCollideForDifferentProjects() {
80+
GeoIpCache cache = new GeoIpCache(2);
81+
AbstractResponse response1 = mock(AbstractResponse.class);
82+
AbstractResponse response2 = mock(AbstractResponse.class);
83+
84+
ProjectId projectId1 = randomUniqueProjectId();
85+
ProjectId projectId2 = randomUniqueProjectId();
86+
assertSame(response1, cache.putIfAbsent(projectId1, "127.0.0.1", "path/to/db1", ip -> response1));
87+
assertSame(response2, cache.putIfAbsent(projectId2, "127.0.0.1", "path/to/db1", ip -> response2));
88+
assertSame(response1, cache.get(projectId1, "127.0.0.1", "path/to/db1"));
89+
assertSame(response2, cache.get(projectId2, "127.0.0.1", "path/to/db1"));
7290
}
7391

7492
public void testThrowsFunctionsException() {
7593
GeoIpCache cache = new GeoIpCache(1);
94+
ProjectId projectId = randomProjectIdOrDefault();
7695
IllegalArgumentException ex = expectThrows(
7796
IllegalArgumentException.class,
78-
() -> cache.putIfAbsent("127.0.0.1", "path/to/db", ip -> {
97+
() -> cache.putIfAbsent(projectId, "127.0.0.1", "path/to/db", ip -> {
7998
throw new IllegalArgumentException("bad");
8099
})
81100
);
@@ -92,19 +111,20 @@ public void testGetCacheStats() {
92111
final AtomicLong testNanoTime = new AtomicLong(0);
93112
// We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms
94113
GeoIpCache cache = new GeoIpCache(maxCacheSize, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos()));
114+
ProjectId projectId = randomProjectIdOrDefault();
95115
AbstractResponse response = mock(AbstractResponse.class);
96116
String databasePath = "path/to/db1";
97117
String key1 = "127.0.0.1";
98118
String key2 = "127.0.0.2";
99119
String key3 = "127.0.0.3";
100120

101-
cache.putIfAbsent(key1, databasePath, ip -> response); // cache miss
102-
cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss
103-
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
104-
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
105-
cache.putIfAbsent(key1, databasePath, ip -> response); // cache hit
106-
cache.putIfAbsent(key3, databasePath, ip -> response); // cache miss, key2 will be evicted
107-
cache.putIfAbsent(key2, databasePath, ip -> response); // cache miss, key1 will be evicted
121+
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache miss
122+
cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss
123+
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
124+
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
125+
cache.putIfAbsent(projectId, key1, databasePath, ip -> response); // cache hit
126+
cache.putIfAbsent(projectId, key3, databasePath, ip -> response); // cache miss, key2 will be evicted
127+
cache.putIfAbsent(projectId, key2, databasePath, ip -> response); // cache miss, key1 will be evicted
108128
CacheStats cacheStats = cache.getCacheStats();
109129
assertThat(cacheStats.count(), equalTo(maxCacheSize));
110130
assertThat(cacheStats.hits(), equalTo(3L));
@@ -115,4 +135,28 @@ public void testGetCacheStats() {
115135
// There are 4 misses. Each is made up of a cache query, and a database query, each being 1ms:
116136
assertThat(cacheStats.missesTimeInMillis(), equalTo(8L));
117137
}
138+
139+
public void testPurgeCacheEntriesForDatabase() {
140+
GeoIpCache cache = new GeoIpCache(100);
141+
ProjectId projectId1 = randomUniqueProjectId();
142+
ProjectId projectId2 = randomUniqueProjectId();
143+
String databasePath1 = "path/to/db1";
144+
String databasePath2 = "path/to/db2";
145+
String ip1 = "127.0.0.1";
146+
String ip2 = "127.0.0.2";
147+
148+
AbstractResponse response = mock(AbstractResponse.class);
149+
cache.putIfAbsent(projectId1, ip1, databasePath1, ip -> response); // cache miss
150+
cache.putIfAbsent(projectId1, ip2, databasePath1, ip -> response); // cache miss
151+
cache.putIfAbsent(projectId2, ip1, databasePath1, ip -> response); // cache miss
152+
cache.putIfAbsent(projectId1, ip1, databasePath2, ip -> response); // cache miss
153+
cache.purgeCacheEntriesForDatabase(projectId1, Path.of(databasePath1));
154+
// should have purged entries for projectId1 and databasePath1...
155+
assertNull(cache.get(projectId1, ip1, databasePath1));
156+
assertNull(cache.get(projectId1, ip2, databasePath1));
157+
// ...but left the one for projectId2...
158+
assertSame(response, cache.get(projectId2, ip1, databasePath1));
159+
// ...and for databasePath2:
160+
assertSame(response, cache.get(projectId1, ip1, databasePath2));
161+
}
118162
}

0 commit comments

Comments
 (0)