Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133680.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133680
summary: Fix enrich caches outdated value after policy run
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class EnrichPolicyChangeIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder()
// TODO Change this to run with security enabled
// https://github.com/elastic/elasticsearch/issues/75940
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already added this new test to #75940.

.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
.build();
}

private final String policyName = "device-enrich-policy";
private final String sourceIndexName = "devices-idx";

public void testEnrichCacheValuesCannotBeCorrupted() throws Exception {
// create and store the enrich policy
final var enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
List.of(sourceIndexName),
"host.ip",
List.of("device.name", "host.ip")
);

// create the source index
createSourceIndices(client(), enrichPolicy);

// add a single document to the enrich index
setEnrichDeviceName("some.device." + randomAlphaOfLength(10));

// store the enrich policy
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();

// execute the policy once
executeEnrichPolicy();

// add a low priority cluster state applier to increase the odds of a race occurring between
// the cluster state *appliers* having been run (this adjusts the enrich index pointer) and the
// cluster state *listeners* having been run (which adjusts the alias and therefore the search results)
final var clusterService = node().injector().getInstance(ClusterService.class);
clusterService.addLowPriorityApplier((event) -> safeSleep(10));

// kick off some threads that just bang on _simulate in the background
final var finished = new AtomicBoolean(false);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (finished.get() == false) {
simulatePipeline();
}
}).start();
}

try {
for (int i = 0; i < randomIntBetween(10, 100); i++) {
final String deviceName = "some.device." + randomAlphaOfLength(10);

// add a single document to the enrich index
setEnrichDeviceName(deviceName);

// execute the policy
executeEnrichPolicy();

// simulate the pipeline and confirm that we see the expected result
assertBusy(() -> {
var result = simulatePipeline();
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName));
});
}
} finally {
// we're finished, so those threads can all quit now
finished.set(true);
}
}

private SimulateDocumentBaseResult simulatePipeline() {
final var simulatePipelineRequest = jsonSimulatePipelineRequest("""
{
"pipeline": {
"processors": [
{
"enrich": {
"policy_name": "device-enrich-policy",
"field": "host.ip",
"target_field": "_tmp.device"
}
},
{
"rename" : {
"field" : "_tmp.device.device.name",
"target_field" : "device.name"
}
}
]
},
"docs": [
{
"_source": {
"host": {
"ip": "10.151.80.8"
}
}
}
]
}
""");
final var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
return (SimulateDocumentBaseResult) response.getResults().getFirst();
}

private void setEnrichDeviceName(final String deviceName) {
final var indexRequest = new IndexRequest(sourceIndexName);
indexRequest.id("1"); // there's only one document, and we keep overwriting it
indexRequest.source(Strings.format("""
{
"host": {
"ip": "10.151.80.8"
},
"device": {
"name": "%s"
}
}
""", deviceName), XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(indexRequest).actionGet();
}

private void executeEnrichPolicy() {
final var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.function.Function;

public abstract class AbstractEnrichProcessor extends AbstractProcessor {

Expand All @@ -32,7 +31,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
private final boolean overrideEnabled;
protected final String matchField;
protected final int maxMatches;
private final String indexAlias;

protected AbstractEnrichProcessor(
String tag,
Expand All @@ -55,8 +53,6 @@ protected AbstractEnrichProcessor(
this.overrideEnabled = overrideEnabled;
this.matchField = matchField;
this.maxMatches = maxMatches;
// note: since the policyName determines the indexAlias, we can calculate this once
this.indexAlias = EnrichPolicy.getBaseName(policyName);
}

public abstract QueryBuilder getQueryBuilder(Object fieldValue);
Expand All @@ -72,7 +68,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
return;
}

Supplier<SearchRequest> searchRequestSupplier = () -> {
final Function<String, SearchRequest> searchRequestBuilder = (concreteEnrichIndex) -> {
QueryBuilder queryBuilder = getQueryBuilder(value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
Expand All @@ -82,13 +78,13 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
SearchRequest req = new SearchRequest();
req.indices(indexAlias);
req.indices(concreteEnrichIndex);
req.preference(Preference.LOCAL.type());
req.source(searchBuilder);
return req;
};

searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> {
searchRunner.accept(value, maxMatches, searchRequestBuilder, (searchHits, e) -> {
if (e != null) {
handler.accept(null, e);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.Function;

import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

Expand Down Expand Up @@ -137,17 +137,18 @@ public void accept(ClusterState state) {
}

private SearchRunner createSearchRunner(final ProjectId projectId, final String indexAlias) {
Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
final Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
return (value, maxMatches, reqSupplier, handler) -> {
final String concreteEnrichIndex = getEnrichIndexKey(projectId, indexAlias);
// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
enrichCache.computeIfAbsent(
projectId,
getEnrichIndexKey(projectId, indexAlias),
concreteEnrichIndex,
value,
maxMatches,
(searchResponseActionListener) -> originClient.execute(
EnrichCoordinatorProxyAction.INSTANCE,
reqSupplier.get(),
reqSupplier.apply(concreteEnrichIndex),
searchResponseActionListener
),
ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e))
Expand All @@ -167,7 +168,7 @@ public interface SearchRunner {
void accept(
Object value,
int maxMatches,
Supplier<SearchRequest> searchRequestSupplier,
Function<String, SearchRequest> searchRequestBuilder,
BiConsumer<List<Map<?, ?>>, Exception> handler
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.function.Function;

import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str;
import static org.hamcrest.Matchers.emptyArray;
Expand Down Expand Up @@ -162,10 +162,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
public void accept(
Object value,
int maxMatches,
Supplier<SearchRequest> searchRequestSupplier,
Function<String, SearchRequest> searchRequestBuilder,
BiConsumer<List<Map<?, ?>>, Exception> handler
) {
capturedRequest.set(searchRequestSupplier.get());
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
if (exception != null) {
handler.accept(null, exception);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.function.Function;

import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -405,10 +405,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
public void accept(
Object value,
int maxMatches,
Supplier<SearchRequest> searchRequestSupplier,
Function<String, SearchRequest> searchRequestBuilder,
BiConsumer<List<Map<?, ?>>, Exception> handler
) {
capturedRequest.set(searchRequestSupplier.get());
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
if (exception != null) {
handler.accept(null, exception);
} else {
Expand Down