Skip to content

Commit 5f59477

Browse files
authored
Adding a putIfAbsent() method on EnrichCache (#107499)
1 parent eb6af0e commit 5f59477

File tree

3 files changed

+156
-10
lines changed

3 files changed

+156
-10
lines changed

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichCache.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.enrich;
99

10+
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.search.SearchRequest;
1112
import org.elasticsearch.action.search.SearchResponse;
1213
import org.elasticsearch.cluster.metadata.IndexAbstraction;
@@ -24,6 +25,7 @@
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.Objects;
28+
import java.util.function.BiConsumer;
2729

2830
/**
2931
* A simple cache for enrich that uses {@link Cache}. There is one instance of this cache and
@@ -52,6 +54,32 @@ public final class EnrichCache {
5254
this.cache = CacheBuilder.<CacheKey, List<Map<?, ?>>>builder().setMaximumWeight(maxSize).build();
5355
}
5456

57+
/**
58+
* This method notifies the given listener of the value in this cache for the given searchRequest. If there is no value in the cache
59+
* for the searchRequest, then the new cache value is computed using searchResponseFetcher.
60+
* @param searchRequest The key for the cache request
61+
* @param searchResponseFetcher The function used to compute the value to be put in the cache, if there is no value in the cache already
62+
* @param listener A listener to be notified of the value in the cache
63+
*/
64+
public void computeIfAbsent(
65+
SearchRequest searchRequest,
66+
BiConsumer<SearchRequest, ActionListener<SearchResponse>> searchResponseFetcher,
67+
ActionListener<List<Map<?, ?>>> listener
68+
) {
69+
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
70+
List<Map<?, ?>> response = get(searchRequest);
71+
if (response != null) {
72+
listener.onResponse(response);
73+
} else {
74+
searchResponseFetcher.accept(searchRequest, ActionListener.wrap(resp -> {
75+
List<Map<?, ?>> value = toCacheValue(resp);
76+
put(searchRequest, value);
77+
listener.onResponse(deepCopy(value, false));
78+
}, listener::onFailure));
79+
}
80+
}
81+
82+
// non-private for unit testing only
5583
List<Map<?, ?>> get(SearchRequest searchRequest) {
5684
String enrichIndex = getEnrichIndexKey(searchRequest);
5785
CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);
@@ -64,6 +92,7 @@ public final class EnrichCache {
6492
}
6593
}
6694

95+
// non-private for unit testing only
6796
void put(SearchRequest searchRequest, List<Map<?, ?>> response) {
6897
String enrichIndex = getEnrichIndexKey(searchRequest);
6998
CacheKey cacheKey = new CacheKey(enrichIndex, searchRequest);

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,15 @@ public void accept(ClusterState state) {
131131
Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
132132
return (req, handler) -> {
133133
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
134-
List<Map<?, ?>> response = enrichCache.get(req);
135-
if (response != null) {
136-
handler.accept(response, null);
137-
} else {
138-
originClient.execute(EnrichCoordinatorProxyAction.INSTANCE, req, ActionListener.wrap(resp -> {
139-
List<Map<?, ?>> value = EnrichCache.toCacheValue(resp);
140-
enrichCache.put(req, value);
141-
handler.accept(EnrichCache.deepCopy(value, false), null);
142-
}, e -> { handler.accept(null, e); }));
143-
}
134+
enrichCache.computeIfAbsent(
135+
req,
136+
(searchRequest, searchResponseActionListener) -> originClient.execute(
137+
EnrichCoordinatorProxyAction.INSTANCE,
138+
searchRequest,
139+
searchResponseActionListener
140+
),
141+
ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e))
142+
);
144143
};
145144
}
146145
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichCacheTests.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,31 @@
66
*/
77
package org.elasticsearch.xpack.enrich;
88

9+
import org.elasticsearch.action.ActionListener;
910
import org.elasticsearch.action.search.SearchRequest;
11+
import org.elasticsearch.action.search.SearchResponse;
1012
import org.elasticsearch.cluster.metadata.AliasMetadata;
1113
import org.elasticsearch.cluster.metadata.IndexMetadata;
1214
import org.elasticsearch.cluster.metadata.Metadata;
15+
import org.elasticsearch.common.bytes.BytesReference;
1316
import org.elasticsearch.index.IndexNotFoundException;
1417
import org.elasticsearch.index.IndexVersion;
1518
import org.elasticsearch.index.query.MatchQueryBuilder;
19+
import org.elasticsearch.search.SearchHit;
20+
import org.elasticsearch.search.SearchHits;
1621
import org.elasticsearch.search.builder.SearchSourceBuilder;
1722
import org.elasticsearch.test.ESTestCase;
23+
import org.elasticsearch.xcontent.XContentBuilder;
24+
import org.elasticsearch.xcontent.json.JsonXContent;
1825
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1926

27+
import java.io.IOException;
2028
import java.util.ArrayList;
2129
import java.util.HashMap;
2230
import java.util.List;
2331
import java.util.Map;
32+
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.TimeUnit;
2434

2535
import static org.hamcrest.Matchers.containsString;
2636
import static org.hamcrest.Matchers.equalTo;
@@ -138,6 +148,114 @@ public void testCaching() {
138148
assertThat(cacheStats.getEvictions(), equalTo(4L));
139149
}
140150

151+
public void testPutIfAbsent() throws InterruptedException {
152+
// Emulate cluster metadata:
153+
// (two enrich indices with corresponding alias entries)
154+
var metadata = Metadata.builder()
155+
.put(
156+
IndexMetadata.builder(EnrichPolicy.getBaseName("policy1") + "-1")
157+
.settings(settings(IndexVersion.current()))
158+
.numberOfShards(1)
159+
.numberOfReplicas(0)
160+
.putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy1")).build())
161+
)
162+
.put(
163+
IndexMetadata.builder(EnrichPolicy.getBaseName("policy2") + "-1")
164+
.settings(settings(IndexVersion.current()))
165+
.numberOfShards(1)
166+
.numberOfReplicas(0)
167+
.putAlias(AliasMetadata.builder(EnrichPolicy.getBaseName("policy2")).build())
168+
)
169+
.build();
170+
171+
// Emulated search requests that an enrich processor could generate:
172+
// (two unique searches for two enrich policies)
173+
var searchRequest1 = new SearchRequest(EnrichPolicy.getBaseName("policy1")).source(
174+
new SearchSourceBuilder().query(new MatchQueryBuilder("match_field", "1"))
175+
);
176+
final List<Map<String, ?>> searchResponseMap = List.of(
177+
Map.of("key1", "value1", "key2", "value2"),
178+
Map.of("key3", "value3", "key4", "value4")
179+
);
180+
EnrichCache enrichCache = new EnrichCache(3);
181+
enrichCache.setMetadata(metadata);
182+
183+
{
184+
CountDownLatch queriedDatabaseLatch = new CountDownLatch(1);
185+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
186+
enrichCache.computeIfAbsent(searchRequest1, (searchRequest, searchResponseActionListener) -> {
187+
SearchResponse searchResponse = convertToSearchResponse(searchResponseMap);
188+
searchResponseActionListener.onResponse(searchResponse);
189+
searchResponse.decRef();
190+
queriedDatabaseLatch.countDown();
191+
}, new ActionListener<>() {
192+
@Override
193+
public void onResponse(List<Map<?, ?>> response) {
194+
assertThat(response, equalTo(searchResponseMap));
195+
notifiedOfResultLatch.countDown();
196+
}
197+
198+
@Override
199+
public void onFailure(Exception e) {
200+
fail(e);
201+
}
202+
});
203+
assertThat(queriedDatabaseLatch.await(5, TimeUnit.SECONDS), equalTo(true));
204+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
205+
}
206+
207+
{
208+
CountDownLatch notifiedOfResultLatch = new CountDownLatch(1);
209+
enrichCache.computeIfAbsent(searchRequest1, (searchRequest, searchResponseActionListener) -> {
210+
fail("Expected no call to the database because item should have been in the cache");
211+
}, new ActionListener<>() {
212+
@Override
213+
public void onResponse(List<Map<?, ?>> maps) {
214+
notifiedOfResultLatch.countDown();
215+
}
216+
217+
@Override
218+
public void onFailure(Exception e) {
219+
fail(e);
220+
}
221+
});
222+
assertThat(notifiedOfResultLatch.await(5, TimeUnit.SECONDS), equalTo(true));
223+
}
224+
}
225+
226+
private SearchResponse convertToSearchResponse(List<Map<String, ?>> searchResponseList) {
227+
SearchHit[] hitArray = searchResponseList.stream().map(map -> {
228+
try {
229+
return SearchHit.unpooled(0, "id").sourceRef(convertMapToJson(map));
230+
} catch (IOException e) {
231+
throw new RuntimeException(e);
232+
}
233+
}).toArray(SearchHit[]::new);
234+
SearchHits hits = SearchHits.unpooled(hitArray, null, 0);
235+
return new SearchResponse(
236+
hits,
237+
null,
238+
null,
239+
false,
240+
false,
241+
null,
242+
1,
243+
null,
244+
5,
245+
4,
246+
0,
247+
randomLong(),
248+
null,
249+
SearchResponse.Clusters.EMPTY
250+
);
251+
}
252+
253+
private BytesReference convertMapToJson(Map<String, ?> simpleMap) throws IOException {
254+
try (XContentBuilder builder = JsonXContent.contentBuilder().map(simpleMap)) {
255+
return BytesReference.bytes(builder);
256+
}
257+
}
258+
141259
public void testDeepCopy() {
142260
Map<String, Object> original = new HashMap<>();
143261
{

0 commit comments

Comments
 (0)