Skip to content

Commit 3721613

Browse files
ES-10936 Include project ID in enrich cache key (#128908)
This prevents collisions in the enrich cache for different projects. Collisions would be unlikely without this chance, since the key includes the index name, and the index name includes a millisecond-precision timestamp, so it would only happen if two projects executed enrich policies with the same name at the same time, or if one project manipulated the alias to match the timestamp of the other project. However, collisions would have serious consequences (allowing read-across of data between projects). This change closes that gap.
1 parent 2124ea9 commit 3721613

File tree

3 files changed

+106
-22
lines changed

3 files changed

+106
-22
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.search.SearchResponse;
12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.common.cache.Cache;
1314
import org.elasticsearch.common.cache.CacheBuilder;
1415
import org.elasticsearch.common.unit.ByteSizeValue;
1516
import org.elasticsearch.common.util.Maps;
16-
import org.elasticsearch.core.FixForMultiProject;
1717
import org.elasticsearch.core.TimeValue;
1818
import org.elasticsearch.search.SearchHit;
1919
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
@@ -84,14 +84,15 @@ private Cache<CacheKey, CacheValue> createCache(long maxWeight, ToLongBiFunction
8484
* This method notifies the given listener of the value in this cache for the given search parameters. If there is no value in the cache
8585
* for these search parameters, then the new cache value is computed using searchResponseFetcher.
8686
*
87-
* @param enrichIndex The enrich index from which the results will be retrieved
88-
* @param lookupValue The value that will be used in the search
89-
* @param maxMatches The max number of matches that the search will return
87+
* @param projectId The ID of the project
88+
* @param enrichIndex The enrich index from which the results will be retrieved
89+
* @param lookupValue The value that will be used in the search
90+
* @param maxMatches The max number of matches that the search will return
9091
* @param searchResponseFetcher The function used to compute the value to be put in the cache, if there is no value in the cache already
91-
* @param listener A listener to be notified of the value in the cache
92+
* @param listener A listener to be notified of the value in the cache
9293
*/
93-
@FixForMultiProject(description = "The enrich cache will currently leak data between projects. We need to either disable or fix it.")
9494
public void computeIfAbsent(
95+
ProjectId projectId,
9596
String enrichIndex,
9697
Object lookupValue,
9798
int maxMatches,
@@ -100,7 +101,7 @@ public void computeIfAbsent(
100101
) {
101102
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
102103
long cacheStart = relativeNanoTimeProvider.getAsLong();
103-
var cacheKey = new CacheKey(enrichIndex, lookupValue, maxMatches);
104+
var cacheKey = new CacheKey(projectId, enrichIndex, lookupValue, maxMatches);
104105
List<Map<?, ?>> response = get(cacheKey);
105106
long cacheRequestTime = relativeNanoTimeProvider.getAsLong() - cacheStart;
106107
if (response != null) {
@@ -200,11 +201,11 @@ private static Object innerDeepCopy(Object value, boolean unmodifiable) {
200201
*
201202
* @param enrichIndex The enrich <i>index</i> (i.e. not the alias, but the concrete index that the alias points to)
202203
* @param lookupValue The value that is used to find matches in the enrich index
203-
* @param maxMatches The max number of matches that the enrich lookup should return. This changes the size of the search response and
204-
* should thus be included in the cache key
204+
* @param maxMatches The max number of matches that the enrich lookup should return. This changes the size of the search response and
205+
* should thus be included in the cache key
205206
*/
206207
// Visibility for testing
207-
record CacheKey(String enrichIndex, Object lookupValue, int maxMatches) {
208+
record CacheKey(ProjectId projectId, String enrichIndex, Object lookupValue, int maxMatches) {
208209
/**
209210
* In reality, the size in bytes of the cache key is a function of the {@link CacheKey#lookupValue} field plus some constant for
210211
* the object itself, the string reference for the enrich index (but not the string itself because it's taken from the metadata),

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ private SearchRunner createSearchRunner(ProjectMetadata project, String indexAli
138138
return (value, maxMatches, reqSupplier, handler) -> {
139139
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
140140
enrichCache.computeIfAbsent(
141+
project.id(),
141142
getEnrichIndexKey(project, indexAlias),
142143
value,
143144
maxMatches,

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.enrich;
88

99
import org.elasticsearch.action.search.SearchResponse;
10+
import org.elasticsearch.cluster.metadata.ProjectId;
1011
import org.elasticsearch.common.bytes.BytesReference;
1112
import org.elasticsearch.core.TimeValue;
1213
import org.elasticsearch.search.SearchHit;
@@ -38,10 +39,11 @@ public class EnrichCacheTests extends ESTestCase {
3839
public void testCaching() {
3940
// Emulated search requests that an enrich processor could generate:
4041
// (two unique searches for two enrich policies)
41-
var cacheKey1 = new EnrichCache.CacheKey("policy1-1", "1", 1);
42-
var cacheKey2 = new EnrichCache.CacheKey("policy1-1", "2", 1);
43-
var cacheKey3 = new EnrichCache.CacheKey("policy2-1", "1", 1);
44-
var cacheKey4 = new EnrichCache.CacheKey("policy2-1", "2", 1);
42+
var projectId = randomProjectIdOrDefault();
43+
var cacheKey1 = new EnrichCache.CacheKey(projectId, "policy1-1", "1", 1);
44+
var cacheKey2 = new EnrichCache.CacheKey(projectId, "policy1-1", "2", 1);
45+
var cacheKey3 = new EnrichCache.CacheKey(projectId, "policy2-1", "1", 1);
46+
var cacheKey4 = new EnrichCache.CacheKey(projectId, "policy2-1", "2", 1);
4547
// Emulated search response (content doesn't matter, since it isn't used, it just a cache entry)
4648
EnrichCache.CacheValue searchResponse = new EnrichCache.CacheValue(List.of(Map.of("test", "entry")), 1L);
4749

@@ -75,10 +77,10 @@ public void testCaching() {
7577
assertThat(cacheStats.evictions(), equalTo(1L));
7678
assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L));
7779

78-
cacheKey1 = new EnrichCache.CacheKey("policy1-2", "1", 1);
79-
cacheKey2 = new EnrichCache.CacheKey("policy1-2", "2", 1);
80-
cacheKey3 = new EnrichCache.CacheKey("policy2-2", "1", 1);
81-
cacheKey4 = new EnrichCache.CacheKey("policy2-2", "2", 1);
80+
cacheKey1 = new EnrichCache.CacheKey(projectId, "policy1-2", "1", 1);
81+
cacheKey2 = new EnrichCache.CacheKey(projectId, "policy1-2", "2", 1);
82+
cacheKey3 = new EnrichCache.CacheKey(projectId, "policy2-2", "1", 1);
83+
cacheKey4 = new EnrichCache.CacheKey(projectId, "policy2-2", "2", 1);
8284

8385
// Because enrich index has changed, cache can't serve cached entries
8486
assertThat(enrichCache.get(cacheKey1), nullValue());
@@ -115,10 +117,13 @@ public void testComputeIfAbsent() throws InterruptedException {
115117
// We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms
116118
EnrichCache enrichCache = new EnrichCache(3, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos()));
117119

120+
ProjectId projectId = randomProjectIdOrDefault();
121+
long expectedMisses = 0L;
118122
{
123+
// Do initial computeIfAbsent, assert that it is a cache miss and the search is performed:
119124
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
120125
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
121-
enrichCache.computeIfAbsent("policy1-1", "1", 1, (searchResponseActionListener) -> {
126+
enrichCache.computeIfAbsent(projectId, "policy1-1", "1", 1, (searchResponseActionListener) -> {
122127
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
123128
searchResponseActionListener.onResponse(searchResponse);
124129
searchResponse.decRef();
@@ -132,26 +137,103 @@ public void testComputeIfAbsent() throws InterruptedException {
132137
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
133138
assertThat(cacheStats.count(), equalTo(1L));
134139
assertThat(cacheStats.hits(), equalTo(0L));
135-
assertThat(cacheStats.misses(), equalTo(1L));
140+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
136141
assertThat(cacheStats.evictions(), equalTo(0L));
137142
assertThat(cacheStats.hitsTimeInMillis(), equalTo(0L));
138143
assertThat(cacheStats.missesTimeInMillis(), equalTo(2L)); // cache query and enrich query + cache put
139144
}
140145

141146
{
147+
// Do the same call, assert that it is a cache hit and no search is performed:
142148
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
143-
enrichCache.computeIfAbsent("policy1-1", "1", 1, (searchResponseActionListener) -> {
149+
enrichCache.computeIfAbsent(projectId, "policy1-1", "1", 1, (searchResponseActionListener) -> {
144150
fail("Expected no call to the database because item should have been in the cache");
145151
}, assertNoFailureListener(r -> notifiedOfResultLatch.countDown()));
146152
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
147153
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
148154
assertThat(cacheStats.count(), equalTo(1L));
149155
assertThat(cacheStats.hits(), equalTo(1L));
150-
assertThat(cacheStats.misses(), equalTo(1L));
156+
assertThat(cacheStats.misses(), equalTo(expectedMisses));
151157
assertThat(cacheStats.evictions(), equalTo(0L));
152158
assertThat(cacheStats.hitsTimeInMillis(), equalTo(1L));
153159
assertThat(cacheStats.missesTimeInMillis(), equalTo(2L));
154160
}
161+
162+
{
163+
// Do a computeIfAbsent with a different index, assert that it is a cache miss and the search is performed:
164+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
165+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
166+
enrichCache.computeIfAbsent(projectId, "policy1-2", "1", 1, (searchResponseActionListener) -> {
167+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
168+
searchResponseActionListener.onResponse(searchResponse);
169+
searchResponse.decRef();
170+
queriedDatabaseLatch.countDown();
171+
}, assertNoFailureListener(response -> {
172+
assertThat(response, equalTo(searchResponseMap));
173+
notifiedOfResultLatch.countDown();
174+
}));
175+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
176+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
177+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
178+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
179+
}
180+
181+
{
182+
// Do a computeIfAbsent with a different project, assert that it is a cache miss and the search is performed:
183+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
184+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
185+
enrichCache.computeIfAbsent(randomUniqueProjectId(), "policy1-1", "1", 1, (searchResponseActionListener) -> {
186+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
187+
searchResponseActionListener.onResponse(searchResponse);
188+
searchResponse.decRef();
189+
queriedDatabaseLatch.countDown();
190+
}, assertNoFailureListener(response -> {
191+
assertThat(response, equalTo(searchResponseMap));
192+
notifiedOfResultLatch.countDown();
193+
}));
194+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
195+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
196+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
197+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
198+
}
199+
200+
{
201+
// Do a computeIfAbsent with a different lookup value, assert that it is a cache miss and the search is performed:
202+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
203+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
204+
enrichCache.computeIfAbsent(projectId, "policy1-1", "2", 1, (searchResponseActionListener) -> {
205+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
206+
searchResponseActionListener.onResponse(searchResponse);
207+
searchResponse.decRef();
208+
queriedDatabaseLatch.countDown();
209+
}, assertNoFailureListener(response -> {
210+
assertThat(response, equalTo(searchResponseMap));
211+
notifiedOfResultLatch.countDown();
212+
}));
213+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
214+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
215+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
216+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
217+
}
218+
219+
{
220+
// Do a computeIfAbsent with a different max matches, assert that it is a cache miss and the search is performed:
221+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
222+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
223+
enrichCache.computeIfAbsent(projectId, "policy1-1", "1", 3, (searchResponseActionListener) -> {
224+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
225+
searchResponseActionListener.onResponse(searchResponse);
226+
searchResponse.decRef();
227+
queriedDatabaseLatch.countDown();
228+
}, assertNoFailureListener(response -> {
229+
assertThat(response, equalTo(searchResponseMap));
230+
notifiedOfResultLatch.countDown();
231+
}));
232+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
233+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
234+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
235+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
236+
}
155237
}
156238

157239
private SearchResponse convertToSearchResponse(List<Map<String, ?>> searchResponseList) {

0 commit comments

Comments
 (0)