Skip to content

Commit 01c83eb

Browse files
authored
Fix enrich caches outdated value after policy run (#133680) (#133880)
1 parent dd103cb commit 01c83eb

File tree

6 files changed

+209
-20
lines changed

6 files changed

+209
-20
lines changed

docs/changelog/133680.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133680
2+
summary: Fix enrich caches outdated value after policy run
3+
area: Ingest Node
4+
type: bug
5+
issues: []
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.enrich;
9+
10+
import org.elasticsearch.action.index.IndexRequest;
11+
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
12+
import org.elasticsearch.action.ingest.SimulatePipelineRequest;
13+
import org.elasticsearch.action.support.WriteRequest;
14+
import org.elasticsearch.cluster.service.ClusterService;
15+
import org.elasticsearch.common.bytes.BytesArray;
16+
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
18+
import org.elasticsearch.common.settings.Settings;
19+
import org.elasticsearch.core.Strings;
20+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
21+
import org.elasticsearch.plugins.Plugin;
22+
import org.elasticsearch.reindex.ReindexPlugin;
23+
import org.elasticsearch.test.ESSingleNodeTestCase;
24+
import org.elasticsearch.xcontent.XContentType;
25+
import org.elasticsearch.xpack.core.XPackSettings;
26+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
27+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
28+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
29+
30+
import java.util.Collection;
31+
import java.util.List;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
34+
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
35+
import static org.hamcrest.Matchers.equalTo;
36+
import static org.hamcrest.Matchers.nullValue;
37+
38+
public class EnrichPolicyChangeIT extends ESSingleNodeTestCase {
39+
40+
@Override
41+
protected Collection<Class<? extends Plugin>> getPlugins() {
42+
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
43+
}
44+
45+
@Override
46+
protected Settings nodeSettings() {
47+
return Settings.builder()
48+
// TODO Change this to run with security enabled
49+
// https://github.com/elastic/elasticsearch/issues/75940
50+
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
51+
.build();
52+
}
53+
54+
private final String policyName = "device-enrich-policy";
55+
private final String sourceIndexName = "devices-idx";
56+
57+
public void testEnrichCacheValuesCannotBeCorrupted() throws Exception {
58+
// create and store the enrich policy
59+
final EnrichPolicy enrichPolicy = new EnrichPolicy(
60+
EnrichPolicy.MATCH_TYPE,
61+
null,
62+
List.of(sourceIndexName),
63+
"host.ip",
64+
List.of("device.name", "host.ip")
65+
);
66+
67+
// create the source index
68+
createSourceIndices(client(), enrichPolicy);
69+
70+
// add a single document to the enrich index
71+
setEnrichDeviceName("some.device." + randomAlphaOfLength(10));
72+
73+
// store the enrich policy
74+
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
75+
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
76+
77+
// execute the policy once
78+
executeEnrichPolicy();
79+
80+
// add a low priority cluster state applier to increase the odds of a race occurring between
81+
// the cluster state *appliers* having been run (this adjusts the enrich index pointer) and the
82+
// cluster state *listeners* having been run (which adjusts the alias and therefore the search results)
83+
final var clusterService = node().injector().getInstance(ClusterService.class);
84+
clusterService.addLowPriorityApplier((event) -> safeSleep(10));
85+
86+
// kick off some threads that just bang on _simulate in the background
87+
final var finished = new AtomicBoolean(false);
88+
for (int i = 0; i < 5; i++) {
89+
new Thread(() -> {
90+
while (finished.get() == false) {
91+
simulatePipeline();
92+
}
93+
}).start();
94+
}
95+
96+
try {
97+
for (int i = 0; i < randomIntBetween(10, 100); i++) {
98+
final String deviceName = "some.device." + randomAlphaOfLength(10);
99+
100+
// add a single document to the enrich index
101+
setEnrichDeviceName(deviceName);
102+
103+
// execute the policy
104+
executeEnrichPolicy();
105+
106+
// simulate the pipeline and confirm that we see the expected result
107+
assertBusy(() -> {
108+
SimulateDocumentBaseResult result = simulatePipeline();
109+
assertThat(result.getFailure(), nullValue());
110+
assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName));
111+
});
112+
}
113+
} finally {
114+
// we're finished, so those threads can all quit now
115+
finished.set(true);
116+
}
117+
}
118+
119+
private SimulateDocumentBaseResult simulatePipeline() {
120+
final SimulatePipelineRequest simulatePipelineRequest = jsonSimulatePipelineRequest("""
121+
{
122+
"pipeline": {
123+
"processors": [
124+
{
125+
"enrich": {
126+
"policy_name": "device-enrich-policy",
127+
"field": "host.ip",
128+
"target_field": "_tmp.device"
129+
}
130+
},
131+
{
132+
"rename" : {
133+
"field" : "_tmp.device.device.name",
134+
"target_field" : "device.name"
135+
}
136+
}
137+
]
138+
},
139+
"docs": [
140+
{
141+
"_source": {
142+
"host": {
143+
"ip": "10.151.80.8"
144+
}
145+
}
146+
}
147+
]
148+
}
149+
""");
150+
final var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
151+
return (SimulateDocumentBaseResult) response.getResults().get(0);
152+
}
153+
154+
private void setEnrichDeviceName(final String deviceName) {
155+
final IndexRequest indexRequest = new IndexRequest(sourceIndexName);
156+
indexRequest.id("1"); // there's only one document, and we keep overwriting it
157+
indexRequest.source(Strings.format("""
158+
{
159+
"host": {
160+
"ip": "10.151.80.8"
161+
},
162+
"device": {
163+
"name": "%s"
164+
}
165+
}
166+
""", deviceName), XContentType.JSON);
167+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
168+
client().index(indexRequest).actionGet();
169+
}
170+
171+
private void executeEnrichPolicy() {
172+
final ExecuteEnrichPolicyAction.Request executePolicyRequest = new ExecuteEnrichPolicyAction.Request(
173+
TEST_REQUEST_TIMEOUT,
174+
policyName
175+
);
176+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
177+
}
178+
179+
private static SimulatePipelineRequest jsonSimulatePipelineRequest(String jsonString) {
180+
return jsonSimulatePipelineRequest(new BytesArray(jsonString));
181+
}
182+
183+
private static SimulatePipelineRequest jsonSimulatePipelineRequest(BytesReference jsonBytes) {
184+
return new SimulatePipelineRequest(ReleasableBytesReference.wrap(jsonBytes), XContentType.JSON);
185+
}
186+
187+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
import org.elasticsearch.ingest.IngestDocument;
1515
import org.elasticsearch.script.TemplateScript;
1616
import org.elasticsearch.search.builder.SearchSourceBuilder;
17-
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
1817

1918
import java.util.ArrayList;
2019
import java.util.List;
2120
import java.util.Map;
2221
import java.util.function.BiConsumer;
23-
import java.util.function.Supplier;
22+
import java.util.function.Function;
2423

2524
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
2625

@@ -32,7 +31,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
3231
private final boolean overrideEnabled;
3332
protected final String matchField;
3433
protected final int maxMatches;
35-
private final String indexAlias;
3634

3735
protected AbstractEnrichProcessor(
3836
String tag,
@@ -55,8 +53,6 @@ protected AbstractEnrichProcessor(
5553
this.overrideEnabled = overrideEnabled;
5654
this.matchField = matchField;
5755
this.maxMatches = maxMatches;
58-
// note: since the policyName determines the indexAlias, we can calculate this once
59-
this.indexAlias = EnrichPolicy.getBaseName(policyName);
6056
}
6157

6258
public abstract QueryBuilder getQueryBuilder(Object fieldValue);
@@ -72,7 +68,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
7268
return;
7369
}
7470

75-
Supplier<SearchRequest> searchRequestSupplier = () -> {
71+
final Function<String, SearchRequest> searchRequestBuilder = (concreteEnrichIndex) -> {
7672
QueryBuilder queryBuilder = getQueryBuilder(value);
7773
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
7874
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
@@ -82,13 +78,13 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
8278
searchBuilder.fetchSource(true);
8379
searchBuilder.query(constantScore);
8480
SearchRequest req = new SearchRequest();
85-
req.indices(indexAlias);
81+
req.indices(concreteEnrichIndex);
8682
req.preference(Preference.LOCAL.type());
8783
req.source(searchBuilder);
8884
return req;
8985
};
9086

91-
searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> {
87+
searchRunner.accept(value, maxMatches, searchRequestBuilder, (searchHits, e) -> {
9288
if (e != null) {
9389
handler.accept(null, e);
9490
return;

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.util.Objects;
3131
import java.util.function.BiConsumer;
3232
import java.util.function.Consumer;
33-
import java.util.function.Supplier;
33+
import java.util.function.Function;
3434

3535
import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;
3636

@@ -125,17 +125,18 @@ public void accept(ClusterState state) {
125125
metadata = state.getMetadata();
126126
}
127127

128-
private SearchRunner createSearchRunner(String indexAlias, Client client, EnrichCache enrichCache) {
129-
Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
128+
private SearchRunner createSearchRunner(final String indexAlias, final Client client, final EnrichCache enrichCache) {
129+
final Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
130130
return (value, maxMatches, reqSupplier, handler) -> {
131+
final String concreteEnrichIndex = getEnrichIndexKey(indexAlias);
131132
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
132133
enrichCache.computeIfAbsent(
133-
getEnrichIndexKey(indexAlias),
134+
concreteEnrichIndex,
134135
value,
135136
maxMatches,
136137
(searchResponseActionListener) -> originClient.execute(
137138
EnrichCoordinatorProxyAction.INSTANCE,
138-
reqSupplier.get(),
139+
reqSupplier.apply(concreteEnrichIndex),
139140
searchResponseActionListener
140141
),
141142
ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e))
@@ -155,7 +156,7 @@ public interface SearchRunner {
155156
void accept(
156157
Object value,
157158
int maxMatches,
158-
Supplier<SearchRequest> searchRequestSupplier,
159+
Function<String, SearchRequest> searchRequestBuilder,
159160
BiConsumer<List<Map<?, ?>>, Exception> handler
160161
);
161162
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.function.BiConsumer;
31-
import java.util.function.Supplier;
31+
import java.util.function.Function;
3232

3333
import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str;
3434
import static org.hamcrest.Matchers.emptyArray;
@@ -162,10 +162,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
162162
public void accept(
163163
Object value,
164164
int maxMatches,
165-
Supplier<SearchRequest> searchRequestSupplier,
165+
Function<String, SearchRequest> searchRequestBuilder,
166166
BiConsumer<List<Map<?, ?>>, Exception> handler
167167
) {
168-
capturedRequest.set(searchRequestSupplier.get());
168+
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
169169
if (exception != null) {
170170
handler.accept(null, exception);
171171
} else {

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.function.BiConsumer;
28-
import java.util.function.Supplier;
28+
import java.util.function.Function;
2929

3030
import static org.hamcrest.Matchers.emptyArray;
3131
import static org.hamcrest.Matchers.equalTo;
@@ -405,10 +405,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
405405
public void accept(
406406
Object value,
407407
int maxMatches,
408-
Supplier<SearchRequest> searchRequestSupplier,
408+
Function<String, SearchRequest> searchRequestBuilder,
409409
BiConsumer<List<Map<?, ?>>, Exception> handler
410410
) {
411-
capturedRequest.set(searchRequestSupplier.get());
411+
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
412412
if (exception != null) {
413413
handler.accept(null, exception);
414414
} else {

0 commit comments

Comments
 (0)