Skip to content

Commit dc57b3e

Browse files
authored
Fix enrich caches outdated value after policy run (#133680)
1 parent 284574d commit dc57b3e

File tree

6 files changed

+194
-19
lines changed

6 files changed

+194
-19
lines changed

docs/changelog/133680.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133680
2+
summary: Fix enrich caches outdated value after policy run
3+
area: Ingest Node
4+
type: bug
5+
issues: []
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.enrich;
9+
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
12+
import org.elasticsearch.action.support.WriteRequest;
13+
import org.elasticsearch.cluster.service.ClusterService;
14+
import org.elasticsearch.common.settings.Settings;
15+
import org.elasticsearch.core.Strings;
16+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
17+
import org.elasticsearch.plugins.Plugin;
18+
import org.elasticsearch.reindex.ReindexPlugin;
19+
import org.elasticsearch.test.ESSingleNodeTestCase;
20+
import org.elasticsearch.xcontent.XContentType;
21+
import org.elasticsearch.xpack.core.XPackSettings;
22+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
23+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
24+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
25+
26+
import java.util.Collection;
27+
import java.util.List;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
30+
import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
31+
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
32+
import static org.hamcrest.Matchers.equalTo;
33+
import static org.hamcrest.Matchers.nullValue;
34+
35+
public class EnrichPolicyChangeIT extends ESSingleNodeTestCase {
36+
37+
@Override
38+
protected Collection<Class<? extends Plugin>> getPlugins() {
39+
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
40+
}
41+
42+
@Override
43+
protected Settings nodeSettings() {
44+
return Settings.builder()
45+
// TODO Change this to run with security enabled
46+
// https://github.com/elastic/elasticsearch/issues/75940
47+
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
48+
.build();
49+
}
50+
51+
private final String policyName = "device-enrich-policy";
52+
private final String sourceIndexName = "devices-idx";
53+
54+
public void testEnrichCacheValuesCannotBeCorrupted() throws Exception {
55+
// create and store the enrich policy
56+
final var enrichPolicy = new EnrichPolicy(
57+
EnrichPolicy.MATCH_TYPE,
58+
null,
59+
List.of(sourceIndexName),
60+
"host.ip",
61+
List.of("device.name", "host.ip")
62+
);
63+
64+
// create the source index
65+
createSourceIndices(client(), enrichPolicy);
66+
67+
// add a single document to the enrich index
68+
setEnrichDeviceName("some.device." + randomAlphaOfLength(10));
69+
70+
// store the enrich policy
71+
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
72+
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
73+
74+
// execute the policy once
75+
executeEnrichPolicy();
76+
77+
// add a low priority cluster state applier to increase the odds of a race occurring between
78+
// the cluster state *appliers* having been run (this adjusts the enrich index pointer) and the
79+
// cluster state *listeners* having been run (which adjusts the alias and therefore the search results)
80+
final var clusterService = node().injector().getInstance(ClusterService.class);
81+
clusterService.addLowPriorityApplier((event) -> safeSleep(10));
82+
83+
// kick off some threads that just bang on _simulate in the background
84+
final var finished = new AtomicBoolean(false);
85+
for (int i = 0; i < 5; i++) {
86+
new Thread(() -> {
87+
while (finished.get() == false) {
88+
simulatePipeline();
89+
}
90+
}).start();
91+
}
92+
93+
try {
94+
for (int i = 0; i < randomIntBetween(10, 100); i++) {
95+
final String deviceName = "some.device." + randomAlphaOfLength(10);
96+
97+
// add a single document to the enrich index
98+
setEnrichDeviceName(deviceName);
99+
100+
// execute the policy
101+
executeEnrichPolicy();
102+
103+
// simulate the pipeline and confirm that we see the expected result
104+
assertBusy(() -> {
105+
var result = simulatePipeline();
106+
assertThat(result.getFailure(), nullValue());
107+
assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName));
108+
});
109+
}
110+
} finally {
111+
// we're finished, so those threads can all quit now
112+
finished.set(true);
113+
}
114+
}
115+
116+
private SimulateDocumentBaseResult simulatePipeline() {
117+
final var simulatePipelineRequest = jsonSimulatePipelineRequest("""
118+
{
119+
"pipeline": {
120+
"processors": [
121+
{
122+
"enrich": {
123+
"policy_name": "device-enrich-policy",
124+
"field": "host.ip",
125+
"target_field": "_tmp.device"
126+
}
127+
},
128+
{
129+
"rename" : {
130+
"field" : "_tmp.device.device.name",
131+
"target_field" : "device.name"
132+
}
133+
}
134+
]
135+
},
136+
"docs": [
137+
{
138+
"_source": {
139+
"host": {
140+
"ip": "10.151.80.8"
141+
}
142+
}
143+
}
144+
]
145+
}
146+
""");
147+
final var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
148+
return (SimulateDocumentBaseResult) response.getResults().getFirst();
149+
}
150+
151+
private void setEnrichDeviceName(final String deviceName) {
152+
final var indexRequest = new IndexRequest(sourceIndexName);
153+
indexRequest.id("1"); // there's only one document, and we keep overwriting it
154+
indexRequest.source(Strings.format("""
155+
{
156+
"host": {
157+
"ip": "10.151.80.8"
158+
},
159+
"device": {
160+
"name": "%s"
161+
}
162+
}
163+
""", deviceName), XContentType.JSON);
164+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
165+
client().index(indexRequest).actionGet();
166+
}
167+
168+
private void executeEnrichPolicy() {
169+
final var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
170+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
171+
}
172+
173+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
import org.elasticsearch.ingest.IngestDocument;
1515
import org.elasticsearch.script.TemplateScript;
1616
import org.elasticsearch.search.builder.SearchSourceBuilder;
17-
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1817

1918
import java.util.ArrayList;
2019
import java.util.List;
2120
import java.util.Map;
2221
import java.util.function.BiConsumer;
23-
import java.util.function.Supplier;
22+
import java.util.function.Function;
2423

2524
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
2625

@@ -32,7 +31,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
3231
private final boolean overrideEnabled;
3332
protected final String matchField;
3433
protected final int maxMatches;
35-
private final String indexAlias;
3634

3735
protected AbstractEnrichProcessor(
3836
String tag,
@@ -55,8 +53,6 @@ protected AbstractEnrichProcessor(
5553
this.overrideEnabled = overrideEnabled;
5654
this.matchField = matchField;
5755
this.maxMatches = maxMatches;
58-
// note: since the policyName determines the indexAlias, we can calculate this once
59-
this.indexAlias = EnrichPolicy.getBaseName(policyName);
6056
}
6157

6258
public abstract QueryBuilder getQueryBuilder(Object fieldValue);
@@ -72,7 +68,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
7268
return;
7369
}
7470

75-
Supplier<SearchRequest> searchRequestSupplier = () -> {
71+
final Function<String, SearchRequest> searchRequestBuilder = (concreteEnrichIndex) -> {
7672
QueryBuilder queryBuilder = getQueryBuilder(value);
7773
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
7874
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
@@ -82,13 +78,13 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
8278
searchBuilder.fetchSource(true);
8379
searchBuilder.query(constantScore);
8480
SearchRequest req = new SearchRequest();
85-
req.indices(indexAlias);
81+
req.indices(concreteEnrichIndex);
8682
req.preference(Preference.LOCAL.type());
8783
req.source(searchBuilder);
8884
return req;
8985
};
9086

91-
searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> {
87+
searchRunner.accept(value, maxMatches, searchRequestBuilder, (searchHits, e) -> {
9288
if (e != null) {
9389
handler.accept(null, e);
9490
return;

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.util.Objects;
3232
import java.util.function.BiConsumer;
3333
import java.util.function.Consumer;
34-
import java.util.function.Supplier;
34+
import java.util.function.Function;
3535

3636
import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;
3737

@@ -137,17 +137,18 @@ public void accept(ClusterState state) {
137137
}
138138

139139
private SearchRunner createSearchRunner(final ProjectId projectId, final String indexAlias) {
140-
Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
140+
final Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
141141
return (value, maxMatches, reqSupplier, handler) -> {
142+
final String concreteEnrichIndex = getEnrichIndexKey(projectId, indexAlias);
142143
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
143144
enrichCache.computeIfAbsent(
144145
projectId,
145-
getEnrichIndexKey(projectId, indexAlias),
146+
concreteEnrichIndex,
146147
value,
147148
maxMatches,
148149
(searchResponseActionListener) -> originClient.execute(
149150
EnrichCoordinatorProxyAction.INSTANCE,
150-
reqSupplier.get(),
151+
reqSupplier.apply(concreteEnrichIndex),
151152
searchResponseActionListener
152153
),
153154
ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e))
@@ -167,7 +168,7 @@ public interface SearchRunner {
167168
void accept(
168169
Object value,
169170
int maxMatches,
170-
Supplier<SearchRequest> searchRequestSupplier,
171+
Function<String, SearchRequest> searchRequestBuilder,
171172
BiConsumer<List<Map<?, ?>>, Exception> handler
172173
);
173174
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.function.BiConsumer;
31-
import java.util.function.Supplier;
31+
import java.util.function.Function;
3232

3333
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str;
3434
import static org.hamcrest.Matchers.emptyArray;
@@ -162,10 +162,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
162162
public void accept(
163163
Object value,
164164
int maxMatches,
165-
Supplier<SearchRequest> searchRequestSupplier,
165+
Function<String, SearchRequest> searchRequestBuilder,
166166
BiConsumer<List<Map<?, ?>>, Exception> handler
167167
) {
168-
capturedRequest.set(searchRequestSupplier.get());
168+
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
169169
if (exception != null) {
170170
handler.accept(null, exception);
171171
} else {

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.function.BiConsumer;
28-
import java.util.function.Supplier;
28+
import java.util.function.Function;
2929

3030
import static org.hamcrest.Matchers.emptyArray;
3131
import static org.hamcrest.Matchers.equalTo;
@@ -405,10 +405,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
405405
public void accept(
406406
Object value,
407407
int maxMatches,
408-
Supplier<SearchRequest> searchRequestSupplier,
408+
Function<String, SearchRequest> searchRequestBuilder,
409409
BiConsumer<List<Map<?, ?>>, Exception> handler
410410
) {
411-
capturedRequest.set(searchRequestSupplier.get());
411+
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
412412
if (exception != null) {
413413
handler.accept(null, exception);
414414
} else {

0 commit comments

Comments
 (0)