From c3156f39c4e5c00e85336b84db7ba83faa9b43d7 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 26 Aug 2025 10:24:02 -0400 Subject: [PATCH 1/7] Add a simple (passing) test loop --- .../xpack/enrich/EnrichPolicyChangeIT.java | 150 ++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java new file mode 100644 index 0000000000000..cd16fdad1d592 --- /dev/null +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Strings; +import org.elasticsearch.ingest.common.IngestCommonPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; + +import java.util.Collection; +import java.util.List; + +import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest; +import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class EnrichPolicyChangeIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + // TODO Change this to run with security enabled + // https://github.com/elastic/elasticsearch/issues/75940 + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .build(); + } + + private final String policyName = "device-enrich-policy"; + private final String sourceIndexName = "devices-idx"; + + public void testEnrichCacheValuesCannotBeCorrupted() throws Exception { + // create and store the enrich policy + final var enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of(sourceIndexName), + "host.ip", + List.of("device.name", "host.ip") + ); + + // create the source index + createSourceIndices(client(), enrichPolicy); + + // add a single document to the enrich index + setEnrichDeviceName("some.device." + randomAlphaOfLength(10)); + + // store the enrich policy + var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet(); + + // execute the policy once + executeEnrichPolicy(); + + for (int i = 0; i < randomIntBetween(10, 100); i++) { + final String deviceName = "some.device." + randomAlphaOfLength(10); + + // add a single document to the enrich index + setEnrichDeviceName(deviceName); + + // execute the policy + executeEnrichPolicy(); + + // simulate the pipeline and confirm that we see the expected result + assertBusy(() -> { + var result = simulatePipeline(); + assertThat(result.getFailure(), nullValue()); + assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName)); + }); + } + } + + private SimulateDocumentBaseResult simulatePipeline() { + final var simulatePipelineRequest = jsonSimulatePipelineRequest(""" + { + "pipeline": { + "processors": [ + { + "enrich": { + "policy_name": "device-enrich-policy", + "field": "host.ip", + "target_field": "_tmp.device" + } + }, + { + "rename" : { + "field" : "_tmp.device.device.name", + "target_field" : "device.name" + } + } + ] + }, + "docs": [ + { + "_source": { + "host": { + "ip": "10.151.80.8" + } + } + } + ] + } + """); + final var response = clusterAdmin().simulatePipeline(simulatePipelineRequest).actionGet(); + return (SimulateDocumentBaseResult) response.getResults().getFirst(); + } + + private void setEnrichDeviceName(final String deviceName) { + final var indexRequest = new IndexRequest(sourceIndexName); + indexRequest.id("1"); // there's only one document, and we keep overwriting it + indexRequest.source(Strings.format(""" + { + "host": { + "ip": "10.151.80.8" + }, + "device": { + "name": "%s" + } + } + """, deviceName), XContentType.JSON); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequest).actionGet(); + } + + private void executeEnrichPolicy() { + final var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet(); + } + +} From fe70ef2b3745971fbd6ec3b0e310040d718f8218 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 26 Aug 2025 11:04:26 -0400 Subject: [PATCH 2/7] Make the test fail --- .../xpack/enrich/EnrichPolicyChangeIT.java | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java index cd16fdad1d592..27c11e4300437 100644 --- a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichPolicyChangeIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.SimulateDocumentBaseResult; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Strings; import org.elasticsearch.ingest.common.IngestCommonPlugin; @@ -24,6 +25,7 @@ import java.util.Collection; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.ingest.IngestPipelineTestUtils.jsonSimulatePipelineRequest; import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices; @@ -72,21 +74,42 @@ public void testEnrichCacheValuesCannotBeCorrupted() throws Exception { // execute the policy once executeEnrichPolicy(); - for (int i = 0; i < randomIntBetween(10, 100); i++) { - final String deviceName = "some.device." + randomAlphaOfLength(10); + // add a low priority cluster state applier to increase the odds of a race occurring between + // the cluster state *appliers* having been run (this adjusts the enrich index pointer) and the + // cluster state *listeners* having been run (which adjusts the alias and therefore the search results) + final var clusterService = node().injector().getInstance(ClusterService.class); + clusterService.addLowPriorityApplier((event) -> safeSleep(10)); + + // kick off some threads that just bang on _simulate in the background + final var finished = new AtomicBoolean(false); + for (int i = 0; i < 5; i++) { + new Thread(() -> { + while (finished.get() == false) { + simulatePipeline(); + } + }).start(); + } - // add a single document to the enrich index - setEnrichDeviceName(deviceName); + try { + for (int i = 0; i < randomIntBetween(10, 100); i++) { + final String deviceName = "some.device." + randomAlphaOfLength(10); - // execute the policy - executeEnrichPolicy(); + // add a single document to the enrich index + setEnrichDeviceName(deviceName); - // simulate the pipeline and confirm that we see the expected result - assertBusy(() -> { - var result = simulatePipeline(); - assertThat(result.getFailure(), nullValue()); - assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName)); - }); + // execute the policy + executeEnrichPolicy(); + + // simulate the pipeline and confirm that we see the expected result + assertBusy(() -> { + var result = simulatePipeline(); + assertThat(result.getFailure(), nullValue()); + assertThat(result.getIngestDocument().getFieldValue("device.name", String.class), equalTo(deviceName)); + }); + } + } finally { + // we're finished, so those threads can all quit now + finished.set(true); } } From 6bbf4610b91c43ed41a9770f0be8408bd391ebbb Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 26 Aug 2025 11:08:56 -0400 Subject: [PATCH 3/7] Make these variables final --- .../elasticsearch/xpack/enrich/EnrichProcessorFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index ee685a7b86d2b..37de8e0e0d9b7 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -133,8 +133,8 @@ public void accept(ClusterState state) { metadata = state.getMetadata(); } - private SearchRunner createSearchRunner(ProjectMetadata project, String indexAlias) { - Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN); + private SearchRunner createSearchRunner(final ProjectMetadata project, final String indexAlias) { + final Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN); return (value, maxMatches, reqSupplier, handler) -> { // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. enrichCache.computeIfAbsent( From 2686739eb36ae6e20544dfbb34d2dbef1016e1b5 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 26 Aug 2025 11:09:36 -0400 Subject: [PATCH 4/7] Extract a variable for the concrete enrich index --- .../org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 37de8e0e0d9b7..2bcc4337b0c3f 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -136,10 +136,11 @@ public void accept(ClusterState state) { private SearchRunner createSearchRunner(final ProjectMetadata project, final String indexAlias) { final Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN); return (value, maxMatches, reqSupplier, handler) -> { + final String concreteEnrichIndex = getEnrichIndexKey(project, indexAlias); // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. enrichCache.computeIfAbsent( project.id(), - getEnrichIndexKey(project, indexAlias), + concreteEnrichIndex, value, maxMatches, (searchResponseActionListener) -> originClient.execute( From 2e71031238741470d6492e264d94e078a4fea15c Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Tue, 26 Aug 2025 11:13:55 -0400 Subject: [PATCH 5/7] Do not search through the alias --- .../xpack/enrich/AbstractEnrichProcessor.java | 12 ++++-------- .../xpack/enrich/EnrichProcessorFactory.java | 6 +++--- .../xpack/enrich/GeoMatchProcessorTests.java | 6 +++--- .../xpack/enrich/MatchProcessorTests.java | 6 +++--- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java index c2bcc67184958..4903643795f18 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/AbstractEnrichProcessor.java @@ -14,13 +14,12 @@ import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.script.TemplateScript; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.Supplier; +import java.util.function.Function; public abstract class AbstractEnrichProcessor extends AbstractProcessor { @@ -32,7 +31,6 @@ public abstract class AbstractEnrichProcessor extends AbstractProcessor { private final boolean overrideEnabled; protected final String matchField; protected final int maxMatches; - private final String indexAlias; protected AbstractEnrichProcessor( String tag, @@ -55,8 +53,6 @@ protected AbstractEnrichProcessor( this.overrideEnabled = overrideEnabled; this.matchField = matchField; this.maxMatches = maxMatches; - // note: since the policyName determines the indexAlias, we can calculate this once - this.indexAlias = EnrichPolicy.getBaseName(policyName); } public abstract QueryBuilder getQueryBuilder(Object fieldValue); @@ -72,7 +68,7 @@ public void execute(IngestDocument ingestDocument, BiConsumer searchRequestSupplier = () -> { + final Function searchRequestBuilder = (concreteEnrichIndex) -> { QueryBuilder queryBuilder = getQueryBuilder(value); ConstantScoreQueryBuilder constantScore = new ConstantScoreQueryBuilder(queryBuilder); SearchSourceBuilder searchBuilder = new SearchSourceBuilder(); @@ -82,13 +78,13 @@ public void execute(IngestDocument ingestDocument, BiConsumer { + searchRunner.accept(value, maxMatches, searchRequestBuilder, (searchHits, e) -> { if (e != null) { handler.accept(null, e); return; diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index 2bcc4337b0c3f..110fc32cd1388 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -32,7 +32,7 @@ import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Supplier; +import java.util.function.Function; import static org.elasticsearch.xpack.core.ClientHelper.ENRICH_ORIGIN; @@ -145,7 +145,7 @@ private SearchRunner createSearchRunner(final ProjectMetadata project, final Str maxMatches, (searchResponseActionListener) -> originClient.execute( EnrichCoordinatorProxyAction.INSTANCE, - reqSupplier.get(), + reqSupplier.apply(concreteEnrichIndex), searchResponseActionListener ), ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e)) @@ -165,7 +165,7 @@ public interface SearchRunner { void accept( Object value, int maxMatches, - Supplier searchRequestSupplier, + Function searchRequestBuilder, BiConsumer>, Exception> handler ); } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java index f122e34db5488..e0179f6b0c4a0 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/GeoMatchProcessorTests.java @@ -28,7 +28,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.Supplier; +import java.util.function.Function; import static org.elasticsearch.xpack.enrich.MatchProcessorTests.str; import static org.hamcrest.Matchers.emptyArray; @@ -162,10 +162,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory. public void accept( Object value, int maxMatches, - Supplier searchRequestSupplier, + Function searchRequestBuilder, BiConsumer>, Exception> handler ) { - capturedRequest.set(searchRequestSupplier.get()); + capturedRequest.set(searchRequestBuilder.apply(".enrich-_name")); if (exception != null) { handler.accept(null, exception); } else { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java index 4e3496e1a5838..3bf0fae2db0e5 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/MatchProcessorTests.java @@ -25,7 +25,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiConsumer; -import java.util.function.Supplier; +import java.util.function.Function; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; @@ -405,10 +405,10 @@ private static final class MockSearchFunction implements EnrichProcessorFactory. public void accept( Object value, int maxMatches, - Supplier searchRequestSupplier, + Function searchRequestBuilder, BiConsumer>, Exception> handler ) { - capturedRequest.set(searchRequestSupplier.get()); + capturedRequest.set(searchRequestBuilder.apply(".enrich-_name")); if (exception != null) { handler.accept(null, exception); } else { From 9674e4c832f3aa6e3fcf9e6983cd106acf22eb10 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Wed, 27 Aug 2025 15:23:24 -0400 Subject: [PATCH 6/7] Update docs/changelog/133680.yaml --- docs/changelog/133680.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/133680.yaml diff --git a/docs/changelog/133680.yaml b/docs/changelog/133680.yaml new file mode 100644 index 0000000000000..e0e2df81cdaa2 --- /dev/null +++ b/docs/changelog/133680.yaml @@ -0,0 +1,5 @@ +pr: 133680 +summary: Fix enrich cache containing outdated value after policy execution +area: Ingest Node +type: bug +issues: [] From 4cf61e224dcfce550ceddf91870fe0269ad518f8 Mon Sep 17 00:00:00 2001 From: Joe Gallo Date: Thu, 28 Aug 2025 08:48:54 -0400 Subject: [PATCH 7/7] Update changelog --- docs/changelog/133680.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/changelog/133680.yaml b/docs/changelog/133680.yaml index e0e2df81cdaa2..c99beb5a1040a 100644 --- a/docs/changelog/133680.yaml +++ b/docs/changelog/133680.yaml @@ -1,5 +1,5 @@ pr: 133680 -summary: Fix enrich cache containing outdated value after policy execution +summary: Fix enrich caches outdated value after policy run area: Ingest Node type: bug issues: []