Skip to content

Commit 3a7f712

Browse files
authored
Avoid stale enrich results after policy execution (elastic#133752)
1 parent a3438e1 commit 3a7f712

File tree

3 files changed

+189
-14
lines changed

3 files changed

+189
-14
lines changed

docs/changelog/133752.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 133752
2+
summary: Avoid stale enrich results after policy execution
3+
area: Ingest Node
4+
type: bug
5+
issues: []
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.enrich;
9+
10+
import org.elasticsearch.action.get.GetRequest;
11+
import org.elasticsearch.action.index.IndexRequest;
12+
import org.elasticsearch.action.support.WriteRequest;
13+
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.core.Strings;
15+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
16+
import org.elasticsearch.plugins.Plugin;
17+
import org.elasticsearch.reindex.ReindexPlugin;
18+
import org.elasticsearch.test.ESSingleNodeTestCase;
19+
import org.elasticsearch.xcontent.XContentType;
20+
import org.elasticsearch.xpack.core.XPackSettings;
21+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
22+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
23+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
24+
25+
import java.util.Collection;
26+
import java.util.List;
27+
import java.util.Map;
28+
29+
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
30+
import static org.hamcrest.Matchers.equalTo;
31+
32+
public class EnrichSourceDataChangeIT extends ESSingleNodeTestCase {
33+
34+
@Override
35+
protected Collection<Class<? extends Plugin>> getPlugins() {
36+
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
37+
}
38+
39+
@Override
40+
protected Settings nodeSettings() {
41+
return Settings.builder()
42+
// TODO Change this to run with security enabled
43+
// https://github.com/elastic/elasticsearch/issues/75940
44+
.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
45+
.build();
46+
}
47+
48+
private final String policyName = "device-enrich-policy";
49+
private final String sourceIndexName = "devices-idx";
50+
51+
public void testChangesToTheSourceIndexTakeEffectOnPolicyExecution() throws Exception {
52+
// create and store the enrich policy
53+
final var enrichPolicy = new EnrichPolicy(
54+
EnrichPolicy.MATCH_TYPE,
55+
null,
56+
List.of(sourceIndexName),
57+
"host.ip",
58+
List.of("device.name", "host.ip")
59+
);
60+
61+
// create the source index
62+
createSourceIndices(client(), enrichPolicy);
63+
64+
final String initialDeviceName = "some.device." + randomAlphaOfLength(10);
65+
66+
// add a single document to the enrich index
67+
setEnrichDeviceName(initialDeviceName);
68+
69+
// store the enrich policy and execute it
70+
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
71+
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();
72+
executeEnrichPolicy();
73+
74+
// create an honest to goodness pipeline for repeated executions (we're not running any _simulate requests here)
75+
final String pipelineName = "my-pipeline";
76+
putJsonPipeline(pipelineName, """
77+
{
78+
"processors": [
79+
{
80+
"enrich": {
81+
"policy_name": "device-enrich-policy",
82+
"field": "host.ip",
83+
"target_field": "_tmp.device"
84+
}
85+
},
86+
{
87+
"rename" : {
88+
"field" : "_tmp.device.device.name",
89+
"target_field" : "device.name"
90+
}
91+
},
92+
{
93+
"remove" : {
94+
"field" : "_tmp"
95+
}
96+
}
97+
]
98+
}""");
99+
100+
{
101+
final var indexRequest = new IndexRequest(sourceIndexName);
102+
indexRequest.id("1");
103+
indexRequest.setPipeline("my-pipeline");
104+
indexRequest.source("""
105+
{
106+
"host": {
107+
"ip": "10.151.80.8"
108+
}
109+
}
110+
""", XContentType.JSON);
111+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
112+
client().index(indexRequest).actionGet();
113+
114+
final var response = client().get(new GetRequest(sourceIndexName).id("1")).actionGet();
115+
assertThat(response.getSource().get("device"), equalTo(Map.of("name", initialDeviceName)));
116+
}
117+
118+
// add different document to the enrich index
119+
final String changedDeviceName = "some.device." + randomAlphaOfLength(10);
120+
setEnrichDeviceName(changedDeviceName);
121+
122+
// execute the policy to pick up the change
123+
executeEnrichPolicy();
124+
125+
// it can take a moment for the execution to take effect, so assertBusy
126+
assertBusy(() -> {
127+
final var indexRequest = new IndexRequest(sourceIndexName);
128+
indexRequest.id("2");
129+
indexRequest.setPipeline("my-pipeline");
130+
indexRequest.source("""
131+
{
132+
"host": {
133+
"ip": "10.151.80.8"
134+
}
135+
}
136+
""", XContentType.JSON);
137+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
138+
client().index(indexRequest).actionGet();
139+
140+
final var response = client().get(new GetRequest(sourceIndexName).id("2")).actionGet();
141+
assertThat(response.getSource().get("device"), equalTo(Map.of("name", changedDeviceName)));
142+
});
143+
}
144+
145+
private void setEnrichDeviceName(final String deviceName) {
146+
final var indexRequest = new IndexRequest(sourceIndexName);
147+
indexRequest.id("1"); // there's only one document, and we keep overwriting it
148+
indexRequest.source(Strings.format("""
149+
{
150+
"host": {
151+
"ip": "10.151.80.8"
152+
},
153+
"device": {
154+
"name": "%s"
155+
}
156+
}
157+
""", deviceName), XContentType.JSON);
158+
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
159+
client().index(indexRequest).actionGet();
160+
}
161+
162+
private void executeEnrichPolicy() {
163+
final var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
164+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
165+
}
166+
167+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.cluster.metadata.IndexMetadata;
1616
import org.elasticsearch.cluster.metadata.Metadata;
1717
import org.elasticsearch.cluster.metadata.ProjectId;
18-
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1918
import org.elasticsearch.common.geo.Orientation;
2019
import org.elasticsearch.common.geo.ShapeRelation;
2120
import org.elasticsearch.common.xcontent.support.XContentMapValues;
@@ -64,14 +63,18 @@ public Processor create(
6463
if (metadata == null) {
6564
throw new IllegalStateException("enrich processor factory has not yet been initialized with cluster state");
6665
}
67-
final var project = metadata.getProject(projectId);
68-
IndexAbstraction indexAbstraction = project.getIndicesLookup().get(indexAlias);
69-
if (indexAbstraction == null) {
70-
throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]");
66+
67+
final IndexMetadata imd;
68+
{
69+
final var project = metadata.getProject(projectId);
70+
IndexAbstraction indexAbstraction = project.getIndicesLookup().get(indexAlias);
71+
if (indexAbstraction == null) {
72+
throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]");
73+
}
74+
assert indexAbstraction.getType() == IndexAbstraction.Type.ALIAS;
75+
assert indexAbstraction.getIndices().size() == 1;
76+
imd = project.index(indexAbstraction.getIndices().get(0));
7177
}
72-
assert indexAbstraction.getType() == IndexAbstraction.Type.ALIAS;
73-
assert indexAbstraction.getIndices().size() == 1;
74-
IndexMetadata imd = project.index(indexAbstraction.getIndices().get(0));
7578

7679
Map<String, Object> mappingAsMap = imd.mapping().sourceAsMap();
7780
String policyType = (String) XContentMapValues.extractValue(
@@ -88,7 +91,7 @@ public Processor create(
8891
if (maxMatches < 1 || maxMatches > 128) {
8992
throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128");
9093
}
91-
var searchRunner = createSearchRunner(project, indexAlias);
94+
var searchRunner = createSearchRunner(projectId, indexAlias);
9295
switch (policyType) {
9396
case EnrichPolicy.MATCH_TYPE:
9497
case EnrichPolicy.RANGE_TYPE:
@@ -133,13 +136,13 @@ public void accept(ClusterState state) {
133136
metadata = state.getMetadata();
134137
}
135138

136-
private SearchRunner createSearchRunner(ProjectMetadata project, String indexAlias) {
139+
private SearchRunner createSearchRunner(final ProjectId projectId, final String indexAlias) {
137140
Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
138141
return (value, maxMatches, reqSupplier, handler) -> {
139142
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
140143
enrichCache.computeIfAbsent(
141-
project.id(),
142-
getEnrichIndexKey(project, indexAlias),
144+
projectId,
145+
getEnrichIndexKey(projectId, indexAlias),
143146
value,
144147
maxMatches,
145148
(searchResponseActionListener) -> originClient.execute(
@@ -152,8 +155,8 @@ private SearchRunner createSearchRunner(ProjectMetadata project, String indexAli
152155
};
153156
}
154157

155-
private String getEnrichIndexKey(ProjectMetadata project, String indexAlias) {
156-
IndexAbstraction ia = project.getIndicesLookup().get(indexAlias);
158+
private String getEnrichIndexKey(final ProjectId projectId, final String indexAlias) {
159+
IndexAbstraction ia = metadata.getProject(projectId).getIndicesLookup().get(indexAlias);
157160
if (ia == null) {
158161
throw new IndexNotFoundException("no generated enrich index [" + indexAlias + "]");
159162
}

0 commit comments

Comments
 (0)