Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/132210.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 132210
summary: Adding a `merge_type` parameter to the ingest simulate API
area: Ingest Node
type: enhancement
issues:
- 131608
Original file line number Diff line number Diff line change
Expand Up @@ -1931,3 +1931,81 @@ setup:
- match: { docs.0.doc._index: "simple-data-stream1" }
- match: { docs.0.doc._source.bar: "baz" }
- match: { docs.0.doc.error.type: "document_parsing_exception" }

---
"Test ingest simulate with mapping addition on subobjects":

- skip:
features:
- headers
- allowed_warnings

- do:
indices.put_index_template:
name: subobject-template
body:
index_patterns: subobject-index*
template:
mappings:
properties:
a.b:
type: match_only_text

- do:
headers:
Content-Type: application/json
simulate.ingest:
body: >
{
"docs": [
{
"_index": "subobject-index-1",
"_id": "AZgsHA0B41JjTOmNiBKC",
"_source": {
"a.b": "some text"
}
}
],
"mapping_addition": {
"properties": {
"a.b": {
"type": "keyword"
}
}
}
}
- length: { docs: 1 }
- match: { docs.0.doc._index: "subobject-index-1" }
- match: { docs.0.doc._source.a\.b: "some text" }
- match: { docs.0.doc.error.type: "mapper_parsing_exception" }

# Here we provide a mapping_substitution to the subobject, and make sure that it is applied rather than throwing an
# exception.
- do:
headers:
Content-Type: application/json
simulate.ingest:
merge_type: "template"
body: >
{
"docs": [
{
"_index": "subobject-index-1",
"_id": "AZgsHA0B41JjTOmNiBKC",
"_source": {
"a.b": "some text"
}
}
],
"mapping_addition": {
"properties": {
"a.b": {
"type": "keyword"
}
}
}
Comment on lines +2000 to +2006
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is it worth testing with mapping in index_template_substitutions or component_template_substitutions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it probably is.

}
- length: { docs: 1 }
- match: { docs.0.doc._index: "subobject-index-1" }
- match: { docs.0.doc._source.a\.b: "some text" }
- not_exists: docs.0.doc.error
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
"pipeline":{
"type":"string",
"description":"The pipeline id to preprocess incoming documents with if no pipeline is given for a particular document"
},
"merge_type":{
"type":"string",
"description":"The mapping merge type if mapping overrides are being provided in mapping_addition, index_template_substitutions, or component_template_substitutions. The allowed values are one of index or template. The index option merges mappings the way they would be merged into an existing index. The template option merges mappings the way they would be merged into a template.",
"default": "mapping_update"
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void testMappingValidationIndexExists() {
}
""";
indicesAdmin().create(new CreateIndexRequest(indexName).mapping(mapping)).actionGet();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -163,7 +163,7 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
""", XContentType.JSON).id(randomUUID());
{
// First we use the original component template, and expect a failure in the second document:
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
Expand Down Expand Up @@ -197,7 +197,8 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
)
),
Map.of(),
Map.of()
Map.of(),
null
);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
Expand Down Expand Up @@ -235,7 +236,8 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
indexTemplateName,
Map.of("index_patterns", List.of(indexName), "composed_of", List.of("test-component-template-2"))
),
Map.of()
Map.of(),
null
);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
Expand All @@ -258,7 +260,8 @@ private void assertMappingsUpdatedFromSubstitutions(String indexName, String ind
Map.of(
"_doc",
Map.of("dynamic", "strict", "properties", Map.of("foo1", Map.of("type", "text"), "foo3", Map.of("type", "text")))
)
),
null
);
bulkRequest.add(indexRequest1);
bulkRequest.add(indexRequest2);
Expand All @@ -277,7 +280,7 @@ public void testMappingValidationIndexDoesNotExistsNoTemplate() {
* mapping-less "random-index-template" created by the parent class), so we expect no mapping validation failure.
*/
String indexName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -324,7 +327,7 @@ public void testMappingValidationIndexDoesNotExistsV2Template() throws IOExcepti
request.indexTemplate(composableIndexTemplate);

client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -356,7 +359,7 @@ public void testMappingValidationIndexDoesNotExistsV1Template() {
indicesAdmin().putTemplate(
new PutIndexTemplateRequest("test-template").patterns(List.of("my-index-*")).mapping("foo1", "type=integer")
).actionGet();
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand Down Expand Up @@ -410,7 +413,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
{
// First, try with no @timestamp to make sure we're picking up data-stream-specific templates
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"foo1": "baz"
Expand All @@ -437,7 +440,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
}
{
// Now with @timestamp
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of(), null);
bulkRequest.add(new IndexRequest(indexName).source("""
{
"@timestamp": "2024-08-27",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PIPELINE_TRACKING_INFO = def(9_131_0_00);
public static final TransportVersion COMPONENT_TEMPLATE_TRACKING_INFO = def(9_132_0_00);
public static final TransportVersion TO_CHILD_BLOCK_JOIN_QUERY = def(9_133_0_00);
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_134_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class SimulateBulkRequest extends BulkRequest {
private final Map<String, Map<String, Object>> componentTemplateSubstitutions;
private final Map<String, Map<String, Object>> indexTemplateSubstitutions;
private final Map<String, Object> mappingAddition;
private final String mappingMergeType;

/**
* @param pipelineSubstitutions The pipeline definitions that are to be used in place of any pre-existing pipeline definitions with
Expand All @@ -118,7 +119,8 @@ public SimulateBulkRequest(
Map<String, Map<String, Object>> pipelineSubstitutions,
Map<String, Map<String, Object>> componentTemplateSubstitutions,
Map<String, Map<String, Object>> indexTemplateSubstitutions,
Map<String, Object> mappingAddition
Map<String, Object> mappingAddition,
String mappingMergeType
) {
super();
Objects.requireNonNull(pipelineSubstitutions);
Expand All @@ -129,6 +131,7 @@ public SimulateBulkRequest(
this.componentTemplateSubstitutions = componentTemplateSubstitutions;
this.indexTemplateSubstitutions = indexTemplateSubstitutions;
this.mappingAddition = mappingAddition;
this.mappingMergeType = mappingMergeType;
}

@SuppressWarnings("unchecked")
Expand All @@ -147,6 +150,11 @@ public SimulateBulkRequest(StreamInput in) throws IOException {
} else {
mappingAddition = Map.of();
}
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_MAPPING_MERGE_TYPE)) {
mappingMergeType = in.readOptionalString();
} else {
mappingMergeType = null;
}
}

@Override
Expand All @@ -160,6 +168,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_17_0)) {
out.writeGenericValue(mappingAddition);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_MAPPING_MERGE_TYPE)) {
out.writeOptionalString(mappingMergeType);
}
}

public Map<String, Map<String, Object>> getPipelineSubstitutions() {
Expand Down Expand Up @@ -189,6 +200,10 @@ public Map<String, Object> getMappingAddition() {
return mappingAddition;
}

public String getMappingMergeType() {
return mappingMergeType;
}

private static ComponentTemplate convertRawTemplateToComponentTemplate(Map<String, Object> rawTemplate) {
ComponentTemplate componentTemplate;
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawTemplate)) {
Expand All @@ -215,7 +230,8 @@ public BulkRequest shallowClone() {
pipelineSubstitutions,
componentTemplateSubstitutions,
indexTemplateSubstitutions,
mappingAddition
mappingAddition,
mappingMergeType
);
bulkRequest.setRefreshPolicy(getRefreshPolicy());
bulkRequest.waitForActiveShards(waitForActiveShards());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ protected void doInternalExecute(
Map<String, ComponentTemplate> componentTemplateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions = bulkRequest.getIndexTemplateSubstitutions();
Map<String, Object> mappingAddition = ((SimulateBulkRequest) bulkRequest).getMappingAddition();
MapperService.MergeReason mappingMergeReason = getMergeReason(((SimulateBulkRequest) bulkRequest).getMappingMergeType());
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docRequest = bulkRequest.requests.get(i);
assert docRequest instanceof IndexRequest : "TransportSimulateBulkAction should only ever be called with IndexRequests";
Expand All @@ -144,7 +145,8 @@ protected void doInternalExecute(
componentTemplateSubstitutions,
indexTemplateSubstitutions,
mappingAddition,
request
request,
mappingMergeReason
);
Exception mappingValidationException = validationResult.v2();
responses.set(
Expand All @@ -170,6 +172,14 @@ protected void doInternalExecute(
);
}

private MapperService.MergeReason getMergeReason(String mergeType) {
return Optional.ofNullable(mergeType).map(type -> switch (type) {
case "index" -> MapperService.MergeReason.MAPPING_UPDATE;
case "template" -> MapperService.MergeReason.INDEX_TEMPLATE;
default -> throw new IllegalArgumentException("Unsupported merge type " + mergeType);
}).orElse(MapperService.MergeReason.MAPPING_UPDATE);
}

/**
* This creates a temporary index with the mappings of the index in the request, and then attempts to index the source from the request
* into it. If there is a mapping exception, that exception is returned. On success the returned exception is null.
Expand All @@ -182,7 +192,8 @@ private Tuple<Collection<String>, Exception> validateMappings(
Map<String, ComponentTemplate> componentTemplateSubstitutions,
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions,
Map<String, Object> mappingAddition,
IndexRequest request
IndexRequest request,
MapperService.MergeReason mappingMergeReason
) {
final SourceToParse sourceToParse = new SourceToParse(
request.id(),
Expand All @@ -207,7 +218,7 @@ private Tuple<Collection<String>, Exception> validateMappings(
IndexMetadata imd = project.getIndexSafe(indexAbstraction.getWriteIndex(request, project));
CompressedXContent mappings = Optional.ofNullable(imd.mapping()).map(MappingMetadata::source).orElse(null);
CompressedXContent mergedMappings = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, mergedMappings, request, sourceToParse);
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, mergedMappings, request, sourceToParse, mappingMergeReason);
} else {
/*
* The index did not exist, or we have component template substitutions, so we put together the mappings from existing
Expand Down Expand Up @@ -265,7 +276,7 @@ private Tuple<Collection<String>, Exception> validateMappings(
);
CompressedXContent mappings = template.mappings();
CompressedXContent mergedMappings = mergeMappings(mappings, mappingAddition);
ignoredFields = validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse);
ignoredFields = validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse, mappingMergeReason);
} else {
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedProjectMetadata, request.index(), false);
if (matchingTemplates.isEmpty() == false) {
Expand All @@ -279,15 +290,15 @@ private Tuple<Collection<String>, Exception> validateMappings(
xContentRegistry
);
final CompressedXContent combinedMappings = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
} else {
/*
* The index matched no templates and had no mapping of its own. If there were component template substitutions
* or index template substitutions, they didn't match anything. So just apply the mapping addition if it exists,
* and validate.
*/
final CompressedXContent combinedMappings = mergeMappings(null, mappingAddition);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse);
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
}
}
}
Expand All @@ -305,7 +316,8 @@ private Collection<String> validateUpdatedMappings(
@Nullable CompressedXContent originalMappings,
@Nullable CompressedXContent updatedMappings,
IndexRequest request,
SourceToParse sourceToParse
SourceToParse sourceToParse,
MapperService.MergeReason mappingMergeReason
) throws IOException {
Settings dummySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
Expand All @@ -318,14 +330,15 @@ private Collection<String> validateUpdatedMappings(
originalIndexMetadataBuilder.putMapping(new MappingMetadata(originalMappings));
}
final IndexMetadata originalIndexMetadata = originalIndexMetadataBuilder.build();
return validateUpdatedMappingsFromIndexMetadata(originalIndexMetadata, updatedMappings, request, sourceToParse);
return validateUpdatedMappingsFromIndexMetadata(originalIndexMetadata, updatedMappings, request, sourceToParse, mappingMergeReason);
}

private Collection<String> validateUpdatedMappingsFromIndexMetadata(
IndexMetadata originalIndexMetadata,
@Nullable CompressedXContent updatedMappings,
IndexRequest request,
SourceToParse sourceToParse
SourceToParse sourceToParse,
MapperService.MergeReason mappingMergeReason
) throws IOException {
if (updatedMappings == null) {
return List.of(); // no validation to do
Expand All @@ -335,7 +348,7 @@ private Collection<String> validateUpdatedMappingsFromIndexMetadata(
.putMapping(new MappingMetadata(updatedMappings))
.build();
Engine.Index result = indicesService.withTempIndexService(originalIndexMetadata, indexService -> {
indexService.mapperService().merge(updatedIndexMetadata, MapperService.MergeReason.MAPPING_UPDATE);
indexService.mapperService().merge(updatedIndexMetadata, mappingMergeReason);
return IndexShard.prepareIndex(
indexService.mapperService(),
sourceToParse,
Expand Down
Loading