Skip to content

Commit 36292d3

Browse files
Improve memory aspects of enrich cache (#120256)
This commit reduces the occupied heap space of the enrich cache and corrects inaccuracies in tracking the occupied heap space (for cache size limitation purposes). --------- Co-authored-by: Joe Gallo <[email protected]>
1 parent c7d8862 commit 36292d3

File tree

10 files changed

+348
-235
lines changed

10 files changed

+348
-235
lines changed

docs/changelog/120256.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 120256
2+
summary: Improve memory aspects of enrich cache
3+
area: Ingest Node
4+
type: enhancement
5+
issues:
6+
- 96050
7+
- 120021
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.enrich;
9+
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
12+
import org.elasticsearch.action.support.WriteRequest;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
15+
import org.elasticsearch.plugins.Plugin;
16+
import org.elasticsearch.reindex.ReindexPlugin;
17+
import org.elasticsearch.test.ESSingleNodeTestCase;
18+
import org.elasticsearch.xcontent.XContentType;
19+
import org.elasticsearch.xpack.core.XPackSettings;
20+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
21+
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
22+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
23+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
24+
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
30+
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
31+
import static org.hamcrest.Matchers.containsInAnyOrder;
32+
import static org.hamcrest.Matchers.containsString;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.hasSize;
35+
import static org.hamcrest.Matchers.nullValue;
36+
37+
public class EnrichProcessorMaxMatchesIT extends ESSingleNodeTestCase {
38+
39+
@Override
40+
protected Collection<Class<? extends Plugin>> getPlugins() {
41+
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
42+
}
43+
44+
@Override
45+
protected Settings nodeSettings() {
46+
return Settings.builder()
47+
// TODO Change this to run with security enabled
48+
// https://github.com/elastic/elasticsearch/issues/75940
49+
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
50+
.build();
51+
}
52+
53+
public void testEnrichCacheValuesAndMaxMatches() {
54+
// this test is meant to be much less ignorable than a mere comment in the code, since the behavior here is tricky.
55+
56+
// there's an interesting edge case where two processors could be using the same policy and search, etc,
57+
// but that they have a different number of max_matches -- if we're not careful about how we implement caching,
58+
// then we could miss that edge case and return the wrong results from the cache.
59+
60+
// Ensure enrich cache is empty
61+
var statsRequest = new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT);
62+
var statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
63+
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
64+
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(0L));
65+
assertThat(statsResponse.getCacheStats().get(0).misses(), equalTo(0L));
66+
assertThat(statsResponse.getCacheStats().get(0).hits(), equalTo(0L));
67+
68+
String policyName = "kv";
69+
String sourceIndexName = "kv";
70+
71+
var enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value"));
72+
73+
// Create source index and add two documents:
74+
createSourceIndices(client(), enrichPolicy);
75+
{
76+
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
77+
indexRequest.create(true);
78+
indexRequest.source("""
79+
{
80+
"key": "k1",
81+
"value": "v1"
82+
}
83+
""", XContentType.JSON);
84+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
85+
client().index(indexRequest).actionGet();
86+
}
87+
{
88+
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
89+
indexRequest.create(true);
90+
indexRequest.source("""
91+
{
92+
"key": "k1",
93+
"value": "v2"
94+
}
95+
""", XContentType.JSON);
96+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
97+
client().index(indexRequest).actionGet();
98+
}
99+
100+
// Store policy and execute it:
101+
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
102+
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
103+
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
104+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
105+
106+
{
107+
// run a single enrich processor to fill the cache, note that the default max_matches is 1 (so it's not given explicitly here)
108+
var simulatePipelineRequest = jsonSimulatePipelineRequest("""
109+
{
110+
"pipeline": {
111+
"processors" : [
112+
{
113+
"enrich": {
114+
"policy_name": "kv",
115+
"field": "key",
116+
"target_field": "result"
117+
}
118+
}
119+
]
120+
},
121+
"docs": [
122+
{
123+
"_source": {
124+
"key": "k1"
125+
}
126+
}
127+
]
128+
}
129+
""");
130+
var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
131+
var result = (SimulateDocumentBaseResult) response.getResults().get(0);
132+
assertThat(result.getFailure(), nullValue());
133+
// it's not actually important in this specific test whether the result is v1 or v2
134+
assertThat(result.getIngestDocument().getFieldValue("result.value", String.class), containsString("v"));
135+
}
136+
137+
{
138+
// run two enrich processors with different max_matches, and see if we still get the right behavior
139+
var simulatePipelineRequest = jsonSimulatePipelineRequest("""
140+
{
141+
"pipeline": {
142+
"processors" : [
143+
{
144+
"enrich": {
145+
"policy_name": "kv",
146+
"field": "key",
147+
"target_field": "result"
148+
}
149+
},
150+
{
151+
"enrich": {
152+
"policy_name": "kv",
153+
"field": "key",
154+
"target_field": "results",
155+
"max_matches": 8
156+
}
157+
}
158+
]
159+
},
160+
"docs": [
161+
{
162+
"_source": {
163+
"key": "k1"
164+
}
165+
}
166+
]
167+
}
168+
""");
169+
var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
170+
var result = (SimulateDocumentBaseResult) response.getResults().get(0);
171+
assertThat(result.getFailure(), nullValue());
172+
// it's not actually important in this specific test whether the result is v1 or v2
173+
assertThat(result.getIngestDocument().getFieldValue("result.value", String.class), containsString("v"));
174+
175+
// this is the important part of the test -- did the max_matches=1 case pollute the cache for the max_matches=8 case?
176+
@SuppressWarnings("unchecked")
177+
List<Map<String, String>> results = (List<Map<String, String>>) result.getIngestDocument().getSource().get("results");
178+
List<String> values = results.stream().map(m -> m.get("value")).toList();
179+
// if these assertions fail, it probably means you were fussing about with the EnrichCache.CacheKey and tried removing
180+
// the max_matches accounting from it
181+
assertThat(values, containsInAnyOrder("v1", "v2"));
182+
assertThat(values, hasSize(2));
183+
}
184+
185+
statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
186+
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
187+
// there are two items in the cache, the single result from max_matches 1 (implied), and the multi-result from max_matches 8
188+
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(2L));
189+
}
190+
191+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,24 @@
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.function.BiConsumer;
23+
import java.util.function.Supplier;
2324

2425
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
2526

2627
private final String policyName;
27-
private final BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> searchRunner;
28+
private final EnrichProcessorFactory.SearchRunner searchRunner;
2829
private final TemplateScript.Factory field;
2930
private final TemplateScript.Factory targetField;
3031
private final boolean ignoreMissing;
3132
private final boolean overrideEnabled;
3233
protected final String matchField;
3334
protected final int maxMatches;
35+
private final String indexAlias;
3436

3537
protected AbstractEnrichProcessor(
3638
String tag,
3739
String description,
38-
BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> searchRunner,
40+
EnrichProcessorFactory.SearchRunner searchRunner,
3941
String policyName,
4042
TemplateScript.Factory field,
4143
TemplateScript.Factory targetField,
@@ -53,6 +55,8 @@ protected AbstractEnrichProcessor(
5355
this.overrideEnabled = overrideEnabled;
5456
this.matchField = matchField;
5557
this.maxMatches = maxMatches;
58+
// note: since the policyName determines the indexAlias, we can calculate this once
59+
this.indexAlias = EnrichPolicy.getBaseName(policyName);
5660
}
5761

5862
public abstract QueryBuilder getQueryBuilder(Object fieldValue);
@@ -68,20 +72,23 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
6872
return;
6973
}
7074

71-
QueryBuilder queryBuilder = getQueryBuilder(value);
72-
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
73-
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
74-
searchBuilder.from(0);
75-
searchBuilder.size(maxMatches);
76-
searchBuilder.trackScores(false);
77-
searchBuilder.fetchSource(true);
78-
searchBuilder.query(constantScore);
79-
SearchRequest req = new SearchRequest();
80-
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
81-
req.preference(Preference.LOCAL.type());
82-
req.source(searchBuilder);
83-
84-
searchRunner.accept(req, (searchHits, e) -> {
75+
Supplier<SearchRequest> searchRequestSupplier = () -> {
76+
QueryBuilder queryBuilder = getQueryBuilder(value);
77+
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
78+
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
79+
searchBuilder.from(0);
80+
searchBuilder.size(maxMatches);
81+
searchBuilder.trackScores(false);
82+
searchBuilder.fetchSource(true);
83+
searchBuilder.query(constantScore);
84+
SearchRequest req = new SearchRequest();
85+
req.indices(indexAlias);
86+
req.preference(Preference.LOCAL.type());
87+
req.source(searchBuilder);
88+
return req;
89+
};
90+
91+
searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> {
8592
if (e != null) {
8693
handler.accept(null, e);
8794
return;

0 commit comments

Comments
 (0)