diff --git a/docs/changelog/133752.yaml b/docs/changelog/133752.yaml new file mode 100644 index 0000000000000..784d6e2afa370 --- /dev/null +++ b/docs/changelog/133752.yaml @@ -0,0 +1,5 @@ +pr: 133752 +summary: Avoid stale enrich results after policy execution +area: Ingest Node +type: bug +issues: [] diff --git a/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichSourceDataChangeIT.java b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichSourceDataChangeIT.java new file mode 100644 index 0000000000000..74fe6315a1ae2 --- /dev/null +++ b/x-pack/plugin/enrich/src/internalClusterTest/java/org/elasticsearch/xpack/enrich/EnrichSourceDataChangeIT.java @@ -0,0 +1,167 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.enrich; + +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.Strings; +import org.elasticsearch.ingest.common.IngestCommonPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.reindex.ReindexPlugin; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.core.XPackSettings; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction; +import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xpack.enrich.AbstractEnrichTestCase.createSourceIndices; +import static org.hamcrest.Matchers.equalTo; + +public class EnrichSourceDataChangeIT extends ESSingleNodeTestCase { + + @Override + protected Collection> getPlugins() { + return List.of(LocalStateEnrich.class, ReindexPlugin.class, IngestCommonPlugin.class); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + // TODO Change this to run with security enabled + // https://github.com/elastic/elasticsearch/issues/75940 + .put(XPackSettings.SECURITY_ENABLED.getKey(), false) + .build(); + } + + private final String policyName = "device-enrich-policy"; + private final String sourceIndexName = "devices-idx"; + + public void testChangesToTheSourceIndexTakeEffectOnPolicyExecution() throws Exception { + // create and store the enrich policy + final var enrichPolicy = new EnrichPolicy( + EnrichPolicy.MATCH_TYPE, + null, + List.of(sourceIndexName), + "host.ip", + List.of("device.name", "host.ip") + ); + + // create the source index + createSourceIndices(client(), enrichPolicy); + + final String initialDeviceName = "some.device." + randomAlphaOfLength(10); + + // add a single document to the enrich index + setEnrichDeviceName(initialDeviceName); + + // store the enrich policy and execute it + var putPolicyRequest = new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName, enrichPolicy); + client().execute(PutEnrichPolicyAction.INSTANCE, putPolicyRequest).actionGet(); + executeEnrichPolicy(); + + // create an honest to goodness pipeline for repeated executions (we're not running any _simulate requests here) + final String pipelineName = "my-pipeline"; + putJsonPipeline(pipelineName, """ + { + "processors": [ + { + "enrich": { + "policy_name": "device-enrich-policy", + "field": "host.ip", + "target_field": "_tmp.device" + } + }, + { + "rename" : { + "field" : "_tmp.device.device.name", + "target_field" : "device.name" + } + }, + { + "remove" : { + "field" : "_tmp" + } + } + ] + }"""); + + { + final var indexRequest = new IndexRequest(sourceIndexName); + indexRequest.id("1"); + indexRequest.setPipeline("my-pipeline"); + indexRequest.source(""" + { + "host": { + "ip": "10.151.80.8" + } + } + """, XContentType.JSON); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequest).actionGet(); + + final var response = client().get(new GetRequest(sourceIndexName).id("1")).actionGet(); + assertThat(response.getSource().get("device"), equalTo(Map.of("name", initialDeviceName))); + } + + // add different document to the enrich index + final String changedDeviceName = "some.device." + randomAlphaOfLength(10); + setEnrichDeviceName(changedDeviceName); + + // execute the policy to pick up the change + executeEnrichPolicy(); + + // it can take a moment for the execution to take effect, so assertBusy + assertBusy(() -> { + final var indexRequest = new IndexRequest(sourceIndexName); + indexRequest.id("2"); + indexRequest.setPipeline("my-pipeline"); + indexRequest.source(""" + { + "host": { + "ip": "10.151.80.8" + } + } + """, XContentType.JSON); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequest).actionGet(); + + final var response = client().get(new GetRequest(sourceIndexName).id("2")).actionGet(); + assertThat(response.getSource().get("device"), equalTo(Map.of("name", changedDeviceName))); + }); + } + + private void setEnrichDeviceName(final String deviceName) { + final var indexRequest = new IndexRequest(sourceIndexName); + indexRequest.id("1"); // there's only one document, and we keep overwriting it + indexRequest.source(Strings.format(""" + { + "host": { + "ip": "10.151.80.8" + }, + "device": { + "name": "%s" + } + } + """, deviceName), XContentType.JSON); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + client().index(indexRequest).actionGet(); + } + + private void executeEnrichPolicy() { + final var executePolicyRequest = new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policyName); + client().execute(ExecuteEnrichPolicyAction.INSTANCE, executePolicyRequest).actionGet(); + } + +} diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java index ee685a7b86d2b..9daf4b75ed614 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichProcessorFactory.java @@ -15,7 +15,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.common.geo.Orientation; import org.elasticsearch.common.geo.ShapeRelation; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -64,14 +63,18 @@ public Processor create( if (metadata == null) { throw new IllegalStateException("enrich processor factory has not yet been initialized with cluster state"); } - final var project = metadata.getProject(projectId); - IndexAbstraction indexAbstraction = project.getIndicesLookup().get(indexAlias); - if (indexAbstraction == null) { - throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]"); + + final IndexMetadata imd; + { + final var project = metadata.getProject(projectId); + IndexAbstraction indexAbstraction = project.getIndicesLookup().get(indexAlias); + if (indexAbstraction == null) { + throw new IllegalArgumentException("no enrich index exists for policy with name [" + policyName + "]"); + } + assert indexAbstraction.getType() == IndexAbstraction.Type.ALIAS; + assert indexAbstraction.getIndices().size() == 1; + imd = project.index(indexAbstraction.getIndices().get(0)); } - assert indexAbstraction.getType() == IndexAbstraction.Type.ALIAS; - assert indexAbstraction.getIndices().size() == 1; - IndexMetadata imd = project.index(indexAbstraction.getIndices().get(0)); Map mappingAsMap = imd.mapping().sourceAsMap(); String policyType = (String) XContentMapValues.extractValue( @@ -88,7 +91,7 @@ public Processor create( if (maxMatches < 1 || maxMatches > 128) { throw ConfigurationUtils.newConfigurationException(TYPE, tag, "max_matches", "should be between 1 and 128"); } - var searchRunner = createSearchRunner(project, indexAlias); + var searchRunner = createSearchRunner(projectId, indexAlias); switch (policyType) { case EnrichPolicy.MATCH_TYPE: case EnrichPolicy.RANGE_TYPE: @@ -133,13 +136,13 @@ public void accept(ClusterState state) { metadata = state.getMetadata(); } - private SearchRunner createSearchRunner(ProjectMetadata project, String indexAlias) { + private SearchRunner createSearchRunner(final ProjectId projectId, final String indexAlias) { Client originClient = new OriginSettingClient(client, ENRICH_ORIGIN); return (value, maxMatches, reqSupplier, handler) -> { // intentionally non-locking for simplicity...it's OK if we re-put the same key/value in the cache during a race condition. enrichCache.computeIfAbsent( - project.id(), - getEnrichIndexKey(project, indexAlias), + projectId, + getEnrichIndexKey(projectId, indexAlias), value, maxMatches, (searchResponseActionListener) -> originClient.execute( @@ -152,8 +155,8 @@ private SearchRunner createSearchRunner(ProjectMetadata project, String indexAli }; } - private String getEnrichIndexKey(ProjectMetadata project, String indexAlias) { - IndexAbstraction ia = project.getIndicesLookup().get(indexAlias); + private String getEnrichIndexKey(final ProjectId projectId, final String indexAlias) { + IndexAbstraction ia = metadata.getProject(projectId).getIndicesLookup().get(indexAlias); if (ia == null) { throw new IndexNotFoundException("no generated enrich index [" + indexAlias + "]"); }