Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/133680.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 133680
summary: Fix enrich cache containing outdated value after policy execution
area: Ingest Node
type: bug
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.enrich;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.SimulateDocumentBaseResult;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Strings;
import org.elasticsearch.ingest.common.IngestCommonPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.reindex.ReindexPlugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest;
import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class EnrichPolicyChangeIT extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class);
}

@Override
protected Settings nodeSettings() {
return Settings.builder()
// TODO Change this to run with security enabled
// https://github.com/elastic/elasticsearch/issues/75940
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already added this new test to #75940.

.put(XPackSettings.SECURITY_ENABLED.getKey(), false)
.build();
}

private final String policyName = "device-enrich-policy";
private final String sourceIndexName = "devices-idx";

public void testEnrichCacheValuesCannotBeCorrupted() throws Exception {
// create and store the enrich policy
final var enrichPolicy = new EnrichPolicy(
EnrichPolicy.MATCH_TYPE,
null,
List.of(sourceIndexName),
"host.ip",
List.of("device.name", "host.ip")
);

// create the source index
createSourceIndices(client(), enrichPolicy);

// add a single document to the enrich index
setEnrichDeviceName("some.device." + randomAlphaOfLength(10));

// store the enrich policy
var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy);
client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet();

// execute the policy once
executeEnrichPolicy();

// add a low priority cluster state applier to increase the odds of a race occurring between
// the cluster state *appliers* having been run (this adjusts the enrich index pointer) and the
// cluster state *listeners* having been run (which adjusts the alias and therefore the search results)
final var clusterService = node().injector().getInstance(ClusterService.class);
clusterService.addLowPriorityApplier((event) -> safeSleep(10));

// kick off some threads that just bang on _simulate in the background
final var finished = new AtomicBoolean(false);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (finished.get() == false) {
simulatePipeline();
}
}).start();
}

try {
for (int i = 0; i < randomIntBetween(10, 100); i++) {
final String deviceName = "some.device." + randomAlphaOfLength(10);

// add a single document to the enrich index
setEnrichDeviceName(deviceName);

// execute the policy
executeEnrichPolicy();

// simulate the pipeline and confirm that we see the expected result
assertBusy(() -> {
var result = simulatePipeline();
assertThat(result.getFailure(), nullValue());
assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName));
});
}
} finally {
// we're finished, so those threads can all quit now
finished.set(true);
}
}

private SimulateDocumentBaseResult simulatePipeline() {
final var simulatePipelineRequest = jsonSimulatePipelineRequest("""
{
"pipeline": {
"processors": [
{
"enrich": {
"policy_name": "device-enrich-policy",
"field": "host.ip",
"target_field": "_tmp.device"
}
},
{
"rename" : {
"field" : "_tmp.device.device.name",
"target_field" : "device.name"
}
}
]
},
"docs": [
{
"_source": {
"host": {
"ip": "10.151.80.8"
}
}
}
]
}
""");
final var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet();
return (SimulateDocumentBaseResult) response.getResults().getFirst();
}

private void setEnrichDeviceName(final String deviceName) {
final var indexRequest = new IndexRequest(sourceIndexName);
indexRequest.id("1"); // there's only one document, and we keep overwriting it
indexRequest.source(Strings.format("""
{
"host": {
"ip": "10.151.80.8"
},
"device": {
"name": "%s"
}
}
""", deviceName), XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(indexRequest).actionGet();
}

private void executeEnrichPolicy() {
final var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName);
client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.script.TemplateScript;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.function.Function;

public abstract class AbstractEnrichProcessor extends AbstractProcessor {

Expand All @@ -32,7 +31,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor {
private final boolean overrideEnabled;
protected final String matchField;
protected final int maxMatches;
private final String indexAlias;

protected AbstractEnrichProcessor(
String tag,
Expand All @@ -55,8 +53,6 @@ protected AbstractEnrichProcessor(
this.overrideEnabled = overrideEnabled;
this.matchField = matchField;
this.maxMatches = maxMatches;
// note: since the policyName determines the indexAlias, we can calculate this once
this.indexAlias = EnrichPolicy.getBaseName(policyName);
}

public abstract QueryBuilder getQueryBuilder(Object fieldValue);
Expand All @@ -72,7 +68,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
return;
}

Supplier<SearchRequest> searchRequestSupplier = () -> {
final Function<String, SearchRequest> searchRequestBuilder = (concreteEnrichIndex) -> {
QueryBuilder queryBuilder = getQueryBuilder(value);
ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder);
SearchSourceBuilder searchBuilder = new SearchSourceBuilder();
Expand All @@ -82,13 +78,13 @@ public void execute(IngestDocument ingestDocument, BiConsumer<IngestDocument, Ex
searchBuilder.fetchSource(true);
searchBuilder.query(constantScore);
SearchRequest req = new SearchRequest();
req.indices(indexAlias);
req.indices(concreteEnrichIndex);
req.preference(Preference.LOCAL.type());
req.source(searchBuilder);
return req;
};

searchRunner.accept(value, maxMatches, searchRequestSupplier, (searchHits, e) -> {
searchRunner.accept(value, maxMatches, searchRequestBuilder, (searchHits, e) -> {
if (e != null) {
handler.accept(null, e);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.Function;

import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN;

Expand Down Expand Up @@ -133,18 +133,19 @@ public void accept(ClusterState state) {
metadata = state.getMetadata();
}

private SearchRunner createSearchRunner(ProjectMetadata project, String indexAlias) {
Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
private SearchRunner createSearchRunner(final ProjectMetadata project, final String indexAlias) {
final Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN);
return (value, maxMatches, reqSupplier, handler) -> {
final String concreteEnrichIndex = getEnrichIndexKey(project, indexAlias);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We capture this enrich index name here when we create the processor, but how will this get updated if we execute an enrich policy and a new index is created?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's all lunatic higher order functions (I say with the highest compliments) -- we capture the client at createSearchRunner invocation time, that is, when a processor is created. But the search runner itself doesn't run until it's invoked for some document in AbstractEnrichProcessor#execute and that's when the concreteEnrichIndex is finally set (and of course the handler is another functional argument so this is all clear as mud when one reads the code).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This search runner captures the project metadata to extract that concrete enrich index name, but it only does this at processor creation time. Is there a possibility that when we execute an enrich policy, thus updating the concrete index to use, but this processor isn't recreated when that happens? It looks like we only create a processor if it's configuration changes after a cluster state update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeahhhhhhhhhhhh... to be clear, that problem existed before this, though, right? I mean, it seems like it does work... but I must admit I'm not sure I see how it does work just yet.

Copy link
Contributor Author

@joegallo joegallo Aug 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my! #133752 (edit: merged!)

// intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition.
enrichCache.computeIfAbsent(
project.id(),
getEnrichIndexKey(project, indexAlias),
concreteEnrichIndex,
value,
maxMatches,
(searchResponseActionListener) -> originClient.execute(
EnrichCoordinatorProxyAction.INSTANCE,
reqSupplier.get(),
reqSupplier.apply(concreteEnrichIndex),
searchResponseActionListener
),
ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e))
Expand All @@ -164,7 +165,7 @@ public interface SearchRunner {
void accept(
Object value,
int maxMatches,
Supplier<SearchRequest> searchRequestSupplier,
Function<String, SearchRequest> searchRequestBuilder,
BiConsumer<List<Map<?, ?>>, Exception> handler
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.function.Function;

import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str;
import static org.hamcrest.Matchers.emptyArray;
Expand Down Expand Up @@ -162,10 +162,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
public void accept(
Object value,
int maxMatches,
Supplier<SearchRequest> searchRequestSupplier,
Function<String, SearchRequest> searchRequestBuilder,
BiConsumer<List<Map<?, ?>>, Exception> handler
) {
capturedRequest.set(searchRequestSupplier.get());
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
if (exception != null) {
handler.accept(null, exception);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.function.Function;

import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -405,10 +405,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory.
public void accept(
Object value,
int maxMatches,
Supplier<SearchRequest> searchRequestSupplier,
Function<String, SearchRequest> searchRequestBuilder,
BiConsumer<List<Map<?, ?>>, Exception> handler
) {
capturedRequest.set(searchRequestSupplier.get());
capturedRequest.set(searchRequestBuilder.apply(".enrich-_name"));
if (exception != null) {
handler.accept(null, exception);
} else {
Expand Down