Skip to content

Commit bf9dbbf

Browse files
[8.17] Improve memory aspects of enrich cache (elastic#120256) (elastic#120765)
* Improve memory aspects of enrich cache (elastic#120256) This commit reduces the occupied heap space of the enrich cache and corrects inaccuracies in tracking the occupied heap space (for cache size limitation purposes). --------- Co-authored-by: Joe Gallo <[email protected]> * Fix compilation --------- Co-authored-by: Joe Gallo <[email protected]>
1 parent a268269 commit bf9dbbf

File tree

10 files changed

+349
-235
lines changed

10 files changed

+349
-235
lines changed

docs/changelog/120256.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 120256
2+
summary: Improve memory aspects of enrich cache
3+
area: Ingest Node
4+
type: enhancement
5+
issues:
6+
- 96050
7+
- 120021
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.enrich;
9+
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
12+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
13+
import org.elasticsearch.action.support.WriteRequest;
14+
import org.elasticsearch.common.bytes.BytesArray;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.reindex.ReindexPlugin;
19+
import org.elasticsearch.test.ESSingleNodeTestCase;
20+
import org.elasticsearch.xcontent.XContentType;
21+
import org.elasticsearch.xpack.core.XPackSettings;
22+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
23+
import org.elasticsearch.xpack.core.enrich.action.EnrichStatsAction;
24+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
25+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
26+
27+
import java.util.Collection;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
32+
import static org.hamcrest.Matchers.containsInAnyOrder;
33+
import static org.hamcrest.Matchers.containsString;
34+
import static org.hamcrest.Matchers.equalTo;
35+
import static org.hamcrest.Matchers.hasSize;
36+
import static org.hamcrest.Matchers.nullValue;
37+
38+
public class EnrichProcessorMaxMatchesIT extends ESSingleNodeTestCase {
39+
40+
@Override
41+
protected Collection<Class<? extends Plugin>> getPlugins() {
42+
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
43+
}
44+
45+
@Override
46+
protected Settings nodeSettings() {
47+
return Settings.builder()
48+
// TODO Change this to run with security enabled
49+
// https://github.com/elastic/elasticsearch/issues/75940
50+
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
51+
.build();
52+
}
53+
54+
public void testEnrichCacheValuesAndMaxMatches() {
55+
// this test is meant to be much less ignorable than a mere comment in the code, since the behavior here is tricky.
56+
57+
// there's an interesting edge case where two processors could be using the same policy and search, etc,
58+
// but that they have a different number of max_matches -- if we're not careful about how we implement caching,
59+
// then we could miss that edge case and return the wrong results from the cache.
60+
61+
// Ensure enrich cache is empty
62+
var statsRequest = new EnrichStatsAction.Request(TEST_REQUEST_TIMEOUT);
63+
var statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
64+
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
65+
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(0L));
66+
assertThat(statsResponse.getCacheStats().get(0).misses(), equalTo(0L));
67+
assertThat(statsResponse.getCacheStats().get(0).hits(), equalTo(0L));
68+
69+
String policyName = "kv";
70+
String sourceIndexName = "kv";
71+
72+
var enrichPolicy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndexName), "key", List.of("value"));
73+
74+
// Create source index and add two documents:
75+
createSourceIndices(client(), enrichPolicy);
76+
{
77+
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
78+
indexRequest.create(true);
79+
indexRequest.source("""
80+
{
81+
"key": "k1",
82+
"value": "v1"
83+
}
84+
""", XContentType.JSON);
85+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
86+
client().index(indexRequest).actionGet();
87+
}
88+
{
89+
IndexRequest indexRequest = new IndexRequest(sourceIndexName);
90+
indexRequest.create(true);
91+
indexRequest.source("""
92+
{
93+
"key": "k1",
94+
"value": "v2"
95+
}
96+
""", XContentType.JSON);
97+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
98+
client().index(indexRequest).actionGet();
99+
}
100+
101+
// Store policy and execute it:
102+
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
103+
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
104+
var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
105+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
106+
107+
{
108+
// run a single enrich processor to fill the cache, note that the default max_matches is 1 (so it's not given explicitly here)
109+
var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray("""
110+
{
111+
"pipeline": {
112+
"processors" : [
113+
{
114+
"enrich": {
115+
"policy_name": "kv",
116+
"field": "key",
117+
"target_field": "result"
118+
}
119+
}
120+
]
121+
},
122+
"docs": [
123+
{
124+
"_source": {
125+
"key": "k1"
126+
}
127+
}
128+
]
129+
}
130+
"""), XContentType.JSON);
131+
var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
132+
var result = (SimulateDocumentBaseResult) response.getResults().get(0);
133+
assertThat(result.getFailure(), nullValue());
134+
// it's not actually important in this specific test whether the result is v1 or v2
135+
assertThat(result.getIngestDocument().getFieldValue("result.value", String.class), containsString("v"));
136+
}
137+
138+
{
139+
// run two enrich processors with different max_matches, and see if we still get the right behavior
140+
var simulatePipelineRequest = new SimulatePipelineRequest(new BytesArray("""
141+
{
142+
"pipeline": {
143+
"processors" : [
144+
{
145+
"enrich": {
146+
"policy_name": "kv",
147+
"field": "key",
148+
"target_field": "result"
149+
}
150+
},
151+
{
152+
"enrich": {
153+
"policy_name": "kv",
154+
"field": "key",
155+
"target_field": "results",
156+
"max_matches": 8
157+
}
158+
}
159+
]
160+
},
161+
"docs": [
162+
{
163+
"_source": {
164+
"key": "k1"
165+
}
166+
}
167+
]
168+
}
169+
"""), XContentType.JSON);
170+
var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
171+
var result = (SimulateDocumentBaseResult) response.getResults().get(0);
172+
assertThat(result.getFailure(), nullValue());
173+
// it's not actually important in this specific test whether the result is v1 or v2
174+
assertThat(result.getIngestDocument().getFieldValue("result.value", String.class), containsString("v"));
175+
176+
// this is the important part of the test -- did the max_matches=1 case pollute the cache for the max_matches=8 case?
177+
@SuppressWarnings("unchecked")
178+
List<Map<String, String>> results = (List<Map<String, String>>) result.getIngestDocument().getSource().get("results");
179+
List<String> values = results.stream().map(m -> m.get("value")).toList();
180+
// if these assertions fail, it probably means you were fussing about with the EnrichCache.CacheKey and tried removing
181+
// the max_matches accounting from it
182+
assertThat(values, containsInAnyOrder("v1", "v2"));
183+
assertThat(values, hasSize(2));
184+
}
185+
186+
statsResponse = client().execute(EnrichStatsAction.INSTANCE, statsRequest).actionGet();
187+
assertThat(statsResponse.getCacheStats().size(), equalTo(1));
188+
// there are two items in the cache, the single result from max_matches 1 (implied), and the multi-result from max_matches 8
189+
assertThat(statsResponse.getCacheStats().get(0).count(), equalTo(2L));
190+
}
191+
192+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,24 @@
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.function.BiConsumer;
23+
import java.util.function.Supplier;
2324

2425
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
2526

2627
private final String policyName;
27-
private final BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> searchRunner;
28+
private final EnrichProcessorFactory.SearchRunner searchRunner;
2829
private final TemplateScript.Factory field;
2930
private final TemplateScript.Factory targetField;
3031
private final boolean ignoreMissing;
3132
private final boolean overrideEnabled;
3233
protected final String matchField;
3334
protected final int maxMatches;
35+
private final String indexAlias;
3436

3537
protected AbstractEnrichProcessor(
3638
String tag,
3739
String description,
38-
BiConsumer<SearchRequest, BiConsumer<List<Map<?, ?>>, Exception>> searchRunner,
40+
EnrichProcessorFactory.SearchRunner searchRunner,
3941
String policyName,
4042
TemplateScript.Factory field,
4143
TemplateScript.Factory targetField,
@@ -53,6 +55,8 @@ protected AbstractEnrichProcessor(
5355
this.overrideEnabled = overrideEnabled;
5456
this.matchField = matchField;
5557
this.maxMatches = maxMatches;
58+
// note: since the policyName determines the indexAlias, we can calculate this once
59+
this.indexAlias = EnrichPolicy.getBaseName(policyName);
5660
}
5761

5862
public abstract QueryBuilder getQueryBuilder(Object fieldValue);
@@ -68,20 +72,23 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
6872
return;
6973
}
7074

71-
QueryBuilder queryBuilder = getQueryBuilder(value);
72-
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
73-
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
74-
searchBuilder.from(0);
75-
searchBuilder.size(maxMatches);
76-
searchBuilder.trackScores(false);
77-
searchBuilder.fetchSource(true);
78-
searchBuilder.query(constantScore);
79-
SearchRequest req = new SearchRequest();
80-
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
81-
req.preference(Preference.LOCAL.type());
82-
req.source(searchBuilder);
83-
84-
searchRunner.accept(req, (searchHits, e) -> {
75+
Supplier<SearchRequest> searchRequestSupplier = () -> {
76+
QueryBuilder queryBuilder = getQueryBuilder(value);
77+
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
78+
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
79+
searchBuilder.from(0);
80+
searchBuilder.size(maxMatches);
81+
searchBuilder.trackScores(false);
82+
searchBuilder.fetchSource(true);
83+
searchBuilder.query(constantScore);
84+
SearchRequest req = new SearchRequest();
85+
req.indices(indexAlias);
86+
req.preference(Preference.LOCAL.type());
87+
req.source(searchBuilder);
88+
return req;
89+
};
90+
91+
searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> {
8592
if (e != null) {
8693
handler.accept(null, e);
8794
return;

0 commit comments

Comments
 (0)