Skip to content

Commit a7c4467

Browse files
Include project ID in enrich cache key
This prevents collisions in the enrich cache for different projects. Collisions would be unlikely without this chance, since the key includes the index name, and the index name includes a millisecond-precision timestamp, so it would only happen if two projects executed enrich policies with the same name at the same time, or if one project manipulated the alias to match the timestamp of the other project. However, collisions would have serious consequences (allowing read-across of data between projects). This change closes that gap.
1 parent 43a9e6f commit a7c4467

File tree

3 files changed

+99
-15
lines changed

3 files changed

+99
-15
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.search.SearchResponse;
12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.common.cache.Cache;
1314
import org.elasticsearch.common.cache.CacheBuilder;
1415
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -90,17 +91,17 @@ private Cache<CacheKey, CacheValue> createCache(long maxWeight, ToLongBiFunction
9091
* @param searchResponseFetcher The function used to compute the value to be put in the cache, if there is no value in the cache already
9192
* @param listener A listener to be notified of the value in the cache
9293
*/
93-
@FixForMultiProject(description = "The enrich cache will currently leak data between projects. We need to either disable or fix it.")
9494
public void computeIfAbsent(
9595
String enrichIndex,
96+
ProjectId projectId,
9697
Object lookupValue,
9798
int maxMatches,
9899
Consumer<ActionListener<SearchResponse>> searchResponseFetcher,
99100
ActionListener<List<Map<?, ?>>> listener
100101
) {
101102
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
102103
long cacheStart = relativeNanoTimeProvider.getAsLong();
103-
var cacheKey = new CacheKey(enrichIndex, lookupValue, maxMatches);
104+
var cacheKey = new CacheKey(enrichIndex, projectId, lookupValue, maxMatches);
104105
List<Map<?, ?>> response = get(cacheKey);
105106
long cacheRequestTime = relativeNanoTimeProvider.getAsLong() - cacheStart;
106107
if (response != null) {
@@ -204,7 +205,7 @@ private static Object innerDeepCopy(Object value, boolean unmodifiable) {
204205
* should thus be included in the cache key
205206
*/
206207
// Visibility for testing
207-
record CacheKey(String enrichIndex, Object lookupValue, int maxMatches) {
208+
record CacheKey(String enrichIndex, ProjectId projectId, Object lookupValue, int maxMatches) {
208209
/**
209210
* In reality, the size in bytes of the cache key is a function of the {@link CacheKey#lookupValue} field plus some constant for
210211
* the object itself, the string reference for the enrich index (but not the string itself because it's taken from the metadata),

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ private SearchRunner createSearchRunner(ProjectMetadata project, String indexAli
139139
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
140140
enrichCache.computeIfAbsent(
141141
getEnrichIndexKey(project, indexAlias),
142+
project.id(),
142143
value,
143144
maxMatches,
144145
(searchResponseActionListener) -> originClient.execute(

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.enrich;
88

99
import org.elasticsearch.action.search.SearchResponse;
10+
import org.elasticsearch.cluster.metadata.ProjectId;
1011
import org.elasticsearch.common.bytes.BytesReference;
1112
import org.elasticsearch.core.TimeValue;
1213
import org.elasticsearch.search.SearchHit;
@@ -38,10 +39,11 @@ public class EnrichCacheTests extends ESTestCase {
3839
public void testCaching() {
3940
// Emulated search requests that an enrich processor could generate:
4041
// (two unique searches for two enrich policies)
41-
var cacheKey1 = new EnrichCache.CacheKey("policy1-1", "1", 1);
42-
var cacheKey2 = new EnrichCache.CacheKey("policy1-1", "2", 1);
43-
var cacheKey3 = new EnrichCache.CacheKey("policy2-1", "1", 1);
44-
var cacheKey4 = new EnrichCache.CacheKey("policy2-1", "2", 1);
42+
var projectId = randomProjectIdOrDefault();
43+
var cacheKey1 = new EnrichCache.CacheKey("policy1-1", projectId, "1", 1);
44+
var cacheKey2 = new EnrichCache.CacheKey("policy1-1", projectId, "2", 1);
45+
var cacheKey3 = new EnrichCache.CacheKey("policy2-1", projectId, "1", 1);
46+
var cacheKey4 = new EnrichCache.CacheKey("policy2-1", projectId, "2", 1);
4547
// Emulated search response (content doesn't matter, since it isn't used, it just a cache entry)
4648
EnrichCache.CacheValue searchResponse = new EnrichCache.CacheValue(List.of(Map.of("test", "entry")), 1L);
4749

@@ -75,10 +77,10 @@ public void testCaching() {
7577
assertThat(cacheStats.evictions(), equalTo(1L));
7678
assertThat(cacheStats.cacheSizeInBytes(), equalTo(3L));
7779

78-
cacheKey1 = new EnrichCache.CacheKey("policy1-2", "1", 1);
79-
cacheKey2 = new EnrichCache.CacheKey("policy1-2", "2", 1);
80-
cacheKey3 = new EnrichCache.CacheKey("policy2-2", "1", 1);
81-
cacheKey4 = new EnrichCache.CacheKey("policy2-2", "2", 1);
80+
cacheKey1 = new EnrichCache.CacheKey("policy1-2", projectId, "1", 1);
81+
cacheKey2 = new EnrichCache.CacheKey("policy1-2", projectId, "2", 1);
82+
cacheKey3 = new EnrichCache.CacheKey("policy2-2", projectId, "1", 1);
83+
cacheKey4 = new EnrichCache.CacheKey("policy2-2", projectId, "2", 1);
8284

8385
// Because enrich index has changed, cache can't serve cached entries
8486
assertThat(enrichCache.get(cacheKey1), nullValue());
@@ -115,10 +117,13 @@ public void testComputeIfAbsent() throws InterruptedException {
115117
// We use a relative time provider that increments 1ms every time it is called. So each operation appears to take 1ms
116118
EnrichCache enrichCache = new EnrichCache(3, () -> testNanoTime.addAndGet(TimeValue.timeValueMillis(1).getNanos()));
117119

120+
ProjectId projectId = randomProjectIdOrDefault();
121+
long expectedMisses = 0L;
118122
{
123+
// Do initial computeIfAbsent, assert that it is a cache miss and the search is performed:
119124
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
120125
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
121-
enrichCache.computeIfAbsent("policy1-1", "1", 1, (searchResponseActionListener) -> {
126+
enrichCache.computeIfAbsent("policy1-1", projectId, "1", 1, (searchResponseActionListener) -> {
122127
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
123128
searchResponseActionListener.onResponse(searchResponse);
124129
searchResponse.decRef();
@@ -132,26 +137,103 @@ public void testComputeIfAbsent() throws InterruptedException {
132137
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
133138
assertThat(cacheStats.count(), equalTo(1L));
134139
assertThat(cacheStats.hits(), equalTo(0L));
135-
assertThat(cacheStats.misses(), equalTo(1L));
140+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
136141
assertThat(cacheStats.evictions(), equalTo(0L));
137142
assertThat(cacheStats.hitsTimeInMillis(), equalTo(0L));
138143
assertThat(cacheStats.missesTimeInMillis(), equalTo(2L)); // cache query and enrich query + cache put
139144
}
140145

141146
{
147+
// Do the same call, assert that it is a cache hit and no search is performed:
142148
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
143-
enrichCache.computeIfAbsent("policy1-1", "1", 1, (searchResponseActionListener) -> {
149+
enrichCache.computeIfAbsent("policy1-1", projectId, "1", 1, (searchResponseActionListener) -> {
144150
fail("Expected no call to the database because item should have been in the cache");
145151
}, assertNoFailureListener(r -> notifiedOfResultLatch.countDown()));
146152
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
147153
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
148154
assertThat(cacheStats.count(), equalTo(1L));
149155
assertThat(cacheStats.hits(), equalTo(1L));
150-
assertThat(cacheStats.misses(), equalTo(1L));
156+
assertThat(cacheStats.misses(), equalTo(expectedMisses));
151157
assertThat(cacheStats.evictions(), equalTo(0L));
152158
assertThat(cacheStats.hitsTimeInMillis(), equalTo(1L));
153159
assertThat(cacheStats.missesTimeInMillis(), equalTo(2L));
154160
}
161+
162+
{
163+
// Do a computeIfAbsent with a different index, assert that it is a cache miss and the search is performed:
164+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
165+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
166+
enrichCache.computeIfAbsent("policy1-2", projectId, "1", 1, (searchResponseActionListener) -> {
167+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
168+
searchResponseActionListener.onResponse(searchResponse);
169+
searchResponse.decRef();
170+
queriedDatabaseLatch.countDown();
171+
}, assertNoFailureListener(response -> {
172+
assertThat(response, equalTo(searchResponseMap));
173+
notifiedOfResultLatch.countDown();
174+
}));
175+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
176+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
177+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
178+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
179+
}
180+
181+
{
182+
// Do a computeIfAbsent with a different project, assert that it is a cache miss and the search is performed:
183+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
184+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
185+
enrichCache.computeIfAbsent("policy1-1", randomUniqueProjectId(), "1", 1, (searchResponseActionListener) -> {
186+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
187+
searchResponseActionListener.onResponse(searchResponse);
188+
searchResponse.decRef();
189+
queriedDatabaseLatch.countDown();
190+
}, assertNoFailureListener(response -> {
191+
assertThat(response, equalTo(searchResponseMap));
192+
notifiedOfResultLatch.countDown();
193+
}));
194+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
195+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
196+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
197+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
198+
}
199+
200+
{
201+
// Do a computeIfAbsent with a different lookup value, assert that it is a cache miss and the search is performed:
202+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
203+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
204+
enrichCache.computeIfAbsent("policy1-1", projectId, "2", 1, (searchResponseActionListener) -> {
205+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
206+
searchResponseActionListener.onResponse(searchResponse);
207+
searchResponse.decRef();
208+
queriedDatabaseLatch.countDown();
209+
}, assertNoFailureListener(response -> {
210+
assertThat(response, equalTo(searchResponseMap));
211+
notifiedOfResultLatch.countDown();
212+
}));
213+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
214+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
215+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
216+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
217+
}
218+
219+
{
220+
// Do a computeIfAbsent with a different max matches, assert that it is a cache miss and the search is performed:
221+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
222+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
223+
enrichCache.computeIfAbsent("policy1-1", projectId, "1", 3, (searchResponseActionListener) -> {
224+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
225+
searchResponseActionListener.onResponse(searchResponse);
226+
searchResponse.decRef();
227+
queriedDatabaseLatch.countDown();
228+
}, assertNoFailureListener(response -> {
229+
assertThat(response, equalTo(searchResponseMap));
230+
notifiedOfResultLatch.countDown();
231+
}));
232+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
233+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
234+
EnrichStatsAction.Response.CacheStats cacheStats = enrichCache.getStats(randomAlphaOfLength(10));
235+
assertThat(cacheStats.misses(), equalTo(++expectedMisses));
236+
}
155237
}
156238

157239
private SearchResponse convertToSearchResponse(List<Map<String, ?>> searchResponseList) {

0 commit comments

Comments
 (0)