Skip to content

Commit 88fef9a

Browse files
Validate enrich index before completing policy execution (#100106) (#100158)
This PR adds a validation step to the end of an enrich policy run to ensure the integrity of the enrich index that is about to be promoted. (cherry picked from commit 225db31) Co-authored-by: Elastic Machine <[email protected]>
1 parent a121f61 commit 88fef9a

File tree

3 files changed

+139
-8
lines changed

3 files changed

+139
-8
lines changed

docs/changelog/100106.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 100106
2+
summary: Validate enrich index before completing policy execution
3+
area: Ingest Node
4+
type: bug
5+
issues: []

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunner.java

Lines changed: 59 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.elasticsearch.ElasticsearchException;
12+
import org.elasticsearch.ResourceNotFoundException;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.ActionRequest;
1415
import org.elasticsearch.action.ActionResponse;
@@ -33,13 +34,15 @@
3334
import org.elasticsearch.client.internal.FilterClient;
3435
import org.elasticsearch.client.internal.OriginSettingClient;
3536
import org.elasticsearch.cluster.ClusterState;
37+
import org.elasticsearch.cluster.metadata.IndexMetadata;
3638
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3739
import org.elasticsearch.cluster.metadata.MappingMetadata;
3840
import org.elasticsearch.cluster.service.ClusterService;
3941
import org.elasticsearch.common.Strings;
4042
import org.elasticsearch.common.bytes.BytesArray;
4143
import org.elasticsearch.common.settings.Settings;
4244
import org.elasticsearch.core.CheckedFunction;
45+
import org.elasticsearch.index.IndexNotFoundException;
4346
import org.elasticsearch.index.mapper.MapperService;
4447
import org.elasticsearch.index.query.QueryBuilders;
4548
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@@ -123,13 +126,9 @@ public void run() {
123126
logger.debug("Policy [{}]: Checking source indices [{}]", policyName, sourceIndices);
124127
GetIndexRequest getIndexRequest = new GetIndexRequest().indices(sourceIndices);
125128
// This call does not set the origin to ensure that the user executing the policy has permission to access the source index
126-
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailure((l, getIndexResponse) -> {
127-
try {
128-
validateMappings(getIndexResponse);
129-
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
130-
} catch (Exception e) {
131-
l.onFailure(e);
132-
}
129+
client.admin().indices().getIndex(getIndexRequest, listener.delegateFailureAndWrap((l, getIndexResponse) -> {
130+
validateMappings(getIndexResponse);
131+
prepareAndCreateEnrichIndex(toMappings(getIndexResponse));
133132
}));
134133
} catch (Exception e) {
135134
listener.onFailure(e);
@@ -565,14 +564,66 @@ private void waitForIndexGreen(final String destinationIndexName) {
565564
ClusterHealthRequest request = new ClusterHealthRequest(destinationIndexName).waitForGreenStatus();
566565
enrichOriginClient().admin()
567566
.cluster()
568-
.health(request, listener.delegateFailure((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
567+
.health(request, listener.delegateFailureAndWrap((l, r) -> updateEnrichPolicyAlias(destinationIndexName)));
568+
}
569+
570+
/**
571+
* Ensures that the index we are about to promote at the end of a policy execution exists, is intact, and has not been damaged
572+
* during the policy execution. In some cases, it is possible for the index being constructed to be deleted during the policy execution
573+
* and recreated with invalid mappings/data. We validate that the mapping exists and that it contains the expected meta fields on it to
574+
* guard against accidental removal and recreation during policy execution.
575+
*/
576+
private void validateIndexBeforePromotion(String destinationIndexName, ClusterState clusterState) {
577+
IndexMetadata destinationIndex = clusterState.metadata().index(destinationIndexName);
578+
if (destinationIndex == null) {
579+
throw new IndexNotFoundException(
580+
"was not able to promote it as part of executing enrich policy [" + policyName + "]",
581+
destinationIndexName
582+
);
583+
}
584+
MappingMetadata mapping = destinationIndex.mapping();
585+
if (mapping == null) {
586+
throw new ResourceNotFoundException(
587+
"Could not locate mapping for enrich index [{}] while completing [{}] policy run",
588+
destinationIndexName,
589+
policyName
590+
);
591+
}
592+
Map<String, Object> mappingSource = mapping.sourceAsMap();
593+
Object meta = mappingSource.get("_meta");
594+
if (meta instanceof Map<?, ?> metaMap) {
595+
Object policyNameMetaField = metaMap.get(ENRICH_POLICY_NAME_FIELD_NAME);
596+
if (policyNameMetaField == null) {
597+
throw new ElasticsearchException(
598+
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field missing",
599+
destinationIndexName,
600+
policyName
601+
);
602+
} else if (policyName.equals(policyNameMetaField) == false) {
603+
throw new ElasticsearchException(
604+
"Could not verify enrich index [{}] metadata before completing [{}] policy run: policy name meta field does not "
605+
+ "match expected value of [{}], was [{}]",
606+
destinationIndexName,
607+
policyName,
608+
policyName,
609+
policyNameMetaField.toString()
610+
);
611+
}
612+
} else {
613+
throw new ElasticsearchException(
614+
"Could not verify enrich index [{}] metadata before completing [{}] policy run: mapping meta field missing",
615+
destinationIndexName,
616+
policyName
617+
);
618+
}
569619
}
570620

571621
private void updateEnrichPolicyAlias(final String destinationIndexName) {
572622
String enrichIndexBase = EnrichPolicy.getBaseName(policyName);
573623
logger.debug("Policy [{}]: Promoting new enrich index [{}] to alias [{}]", policyName, destinationIndexName, enrichIndexBase);
574624
GetAliasesRequest aliasRequest = new GetAliasesRequest(enrichIndexBase);
575625
ClusterState clusterState = clusterService.state();
626+
validateIndexBeforePromotion(destinationIndexName, clusterState);
576627
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNamesWithSystemIndexAccess(clusterState, aliasRequest);
577628
String[] aliases = aliasRequest.aliases();
578629
IndicesAliasesRequest aliasToggleRequest = new IndicesAliasesRequest();

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyRunnerTests.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
1717
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1818
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
19+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
20+
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1921
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
2022
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
2123
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@@ -1980,6 +1982,79 @@ public void testRunRangePolicyWithObjectFieldAsMatchField() throws Exception {
19801982
assertThat(e.getMessage(), equalTo("Field 'field1' has type [object] which doesn't appear to be a range type"));
19811983
}
19821984

1985+
public void testRunnerValidatesIndexIntegrity() throws Exception {
1986+
final String sourceIndex = "source-index";
1987+
IndexResponse indexRequest = client().index(new IndexRequest().index(sourceIndex).id("id").source("""
1988+
{
1989+
"field1": "value1",
1990+
"field2": 2,
1991+
"field3": "ignored",
1992+
"field4": "ignored",
1993+
"field5": "value5"
1994+
}""", XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet();
1995+
assertEquals(RestStatus.CREATED, indexRequest.status());
1996+
1997+
SearchResponse sourceSearchResponse = client().search(
1998+
new SearchRequest(sourceIndex).source(SearchSourceBuilder.searchSource().query(QueryBuilders.matchAllQuery()))
1999+
).actionGet();
2000+
assertThat(sourceSearchResponse.getHits().getTotalHits().value, equalTo(1L));
2001+
Map<String, Object> sourceDocMap = sourceSearchResponse.getHits().getAt(0).getSourceAsMap();
2002+
assertNotNull(sourceDocMap);
2003+
assertThat(sourceDocMap.get("field1"), is(equalTo("value1")));
2004+
assertThat(sourceDocMap.get("field2"), is(equalTo(2)));
2005+
assertThat(sourceDocMap.get("field3"), is(equalTo("ignored")));
2006+
assertThat(sourceDocMap.get("field4"), is(equalTo("ignored")));
2007+
assertThat(sourceDocMap.get("field5"), is(equalTo("value5")));
2008+
2009+
List<String> enrichFields = List.of("field2", "field5");
2010+
EnrichPolicy policy = new EnrichPolicy(EnrichPolicy.MATCH_TYPE, null, List.of(sourceIndex), "field1", enrichFields);
2011+
String policyName = "test1";
2012+
2013+
final long createTime = randomNonNegativeLong();
2014+
String createdEnrichIndex = ".enrich-test1-" + createTime;
2015+
final AtomicReference<Exception> exception = new AtomicReference<>();
2016+
final CountDownLatch latch = new CountDownLatch(1);
2017+
ActionListener<ExecuteEnrichPolicyStatus> listener = createTestListener(latch, exception::set);
2018+
2019+
// Wrap the client so that when we receive the reindex action, we delete the index then resume operation. This mimics an invalid
2020+
// state for the resulting index.
2021+
Client client = new FilterClient(client()) {
2022+
@Override
2023+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
2024+
ActionType<Response> action,
2025+
Request request,
2026+
ActionListener<Response> listener
2027+
) {
2028+
if (action.equals(EnrichReindexAction.INSTANCE)) {
2029+
super.doExecute(
2030+
DeleteIndexAction.INSTANCE,
2031+
new DeleteIndexRequest(createdEnrichIndex),
2032+
listener.delegateFailureAndWrap((delegate, response) -> {
2033+
if (response.isAcknowledged() == false) {
2034+
fail("Enrich index should have been deleted but was not");
2035+
}
2036+
super.doExecute(action, request, delegate);
2037+
})
2038+
);
2039+
} else {
2040+
super.doExecute(action, request, listener);
2041+
}
2042+
}
2043+
};
2044+
EnrichPolicyRunner enrichPolicyRunner = createPolicyRunner(client, policyName, policy, listener, createdEnrichIndex);
2045+
2046+
logger.info("Starting policy run");
2047+
enrichPolicyRunner.run();
2048+
latch.await();
2049+
Exception runnerException = exception.get();
2050+
if (runnerException == null) {
2051+
fail("Expected the runner to fail when the underlying index was deleted during policy execution!");
2052+
}
2053+
assertThat(runnerException, is(instanceOf(ElasticsearchException.class)));
2054+
assertThat(runnerException.getMessage(), containsString("Could not verify enrich index"));
2055+
assertThat(runnerException.getMessage(), containsString("mapping meta field missing"));
2056+
}
2057+
19832058
private EnrichPolicyRunner createPolicyRunner(
19842059
String policyName,
19852060
EnrichPolicy policy,

0 commit comments

Comments
 (0)