Skip to content

Commit 768396d

Browse files
authored
Adding simulate ingest effective mapping (elastic#132833)
1 parent 80c9d10 commit 768396d

File tree

8 files changed

+259
-31
lines changed

8 files changed

+259
-31
lines changed

docs/changelog/132833.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 132833
2+
summary: Adding simulate ingest effective mapping
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/80_ingest_simulate.yml

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2182,3 +2182,131 @@ setup:
21822182
- match: { docs.0.doc._index: "test" }
21832183
- match: { docs.0.doc._source.foo: "bar" }
21842184
- match: { docs.0.doc.error.type: "document_parsing_exception" }
2185+
2186+
---
2187+
"Test effective mapping":
2188+
2189+
# This creates two templates, where the first reroutes to the second. Then we simulate ingesting and make sure that
2190+
# the effective_mapping is for the index where the document eventually would land. Also, the second index is really
2191+
# a data stream, so we expect to see a @timestamp field.
2192+
2193+
- skip:
2194+
features:
2195+
- headers
2196+
- allowed_warnings
2197+
2198+
- do:
2199+
headers:
2200+
Content-Type: application/json
2201+
ingest.put_pipeline:
2202+
id: "reroute-pipeline"
2203+
body: >
2204+
{
2205+
"processors": [
2206+
{
2207+
"reroute": {
2208+
"destination": "second-index"
2209+
}
2210+
}
2211+
]
2212+
}
2213+
- match: { acknowledged: true }
2214+
2215+
- do:
2216+
allowed_warnings:
2217+
- "index template [first-index-template] has index patterns [first-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [first-index-template] will take precedence during new index creation"
2218+
indices.put_index_template:
2219+
name: first-index-template
2220+
body:
2221+
index_patterns: first-index*
2222+
template:
2223+
settings:
2224+
default_pipeline: "reroute-pipeline"
2225+
mappings:
2226+
dynamic: strict
2227+
properties:
2228+
foo:
2229+
type: text
2230+
2231+
- do:
2232+
allowed_warnings:
2233+
- "index template [second-index-template] has index patterns [second-index*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [second-index-template] will take precedence during new index creation"
2234+
indices.put_index_template:
2235+
name: second-index-template
2236+
body:
2237+
index_patterns: second-index*
2238+
template:
2239+
mappings:
2240+
dynamic: strict
2241+
properties:
2242+
bar:
2243+
type: text
2244+
2245+
- do:
2246+
indices.put_index_template:
2247+
name: second-index-template
2248+
body:
2249+
index_patterns: second-index*
2250+
template:
2251+
lifecycle:
2252+
data_retention: "7d"
2253+
mappings:
2254+
dynamic: strict
2255+
properties:
2256+
bar:
2257+
type: text
2258+
data_stream: {}
2259+
2260+
- do:
2261+
indices.create_data_stream:
2262+
name: second-index
2263+
- is_true: acknowledged
2264+
2265+
- do:
2266+
cluster.health:
2267+
wait_for_status: yellow
2268+
2269+
- do:
2270+
indices.put_data_stream_mappings:
2271+
name: second-index
2272+
body:
2273+
properties:
2274+
foo:
2275+
type: boolean
2276+
2277+
- match: { data_streams.0.applied_to_data_stream: true }
2278+
2279+
# Here is the meat of the test. We simulate ingesting into first-index, knowing it will be rerouted to second-index,
2280+
# which is actually a data stream. So we expect the effective_mapping to contain the fields from second-index
2281+
# (including the implicit @timestamp field) and not second-index. Plus, it ought to include fields from the
2282+
# mapping_addition that we pass in.
2283+
- do:
2284+
headers:
2285+
Content-Type: application/json
2286+
simulate.ingest:
2287+
body: >
2288+
{
2289+
"docs": [
2290+
{
2291+
"_index": "first-index",
2292+
"_id": "id",
2293+
"_source": {
2294+
"foo": "bar"
2295+
}
2296+
}
2297+
],
2298+
"mapping_addition": {
2299+
"dynamic": "strict",
2300+
"properties": {
2301+
"baz": {
2302+
"type": "keyword"
2303+
}
2304+
}
2305+
}
2306+
}
2307+
- length: { docs: 1 }
2308+
- match: { docs.0.doc._index: "second-index" }
2309+
- not_exists: docs.0.doc.effective_mapping._doc.properties.foo
2310+
- match: { [email protected]: "date" }
2311+
- match: { docs.0.doc.effective_mapping._doc.properties.bar.type: "text" }
2312+
- match: { docs.0.doc.effective_mapping._doc.properties.baz.type: "keyword" }

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ static TransportVersion def(int id) {
364364
public static final TransportVersion EXTENDED_SNAPSHOT_STATS_IN_NODE_INFO = def(9_137_0_00);
365365
public static final TransportVersion SIMULATE_INGEST_MAPPING_MERGE_TYPE = def(9_138_0_00);
366366
public static final TransportVersion ESQL_LOOKUP_JOIN_ON_MANY_FIELDS = def(9_139_0_00);
367+
public static final TransportVersion SIMULATE_INGEST_EFFECTIVE_MAPPING = def(9_140_0_00);
367368

368369
/*
369370
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.common.util.concurrent.AtomicArray;
3636
import org.elasticsearch.common.xcontent.XContentHelper;
3737
import org.elasticsearch.core.Nullable;
38-
import org.elasticsearch.core.Tuple;
3938
import org.elasticsearch.features.FeatureService;
4039
import org.elasticsearch.features.NodeFeature;
4140
import org.elasticsearch.index.IndexSettingProvider;
@@ -144,14 +143,13 @@ protected void doInternalExecute(
144143
DocWriteRequest<?> docRequest = bulkRequest.requests.get(i);
145144
assert docRequest instanceof IndexRequest : "TransportSimulateBulkAction should only ever be called with IndexRequests";
146145
IndexRequest request = (IndexRequest) docRequest;
147-
Tuple<Collection<String>, Exception> validationResult = validateMappings(
146+
ValidationResult validationResult = validateMappings(
148147
componentTemplateSubstitutions,
149148
indexTemplateSubstitutions,
150149
mappingAddition,
151150
request,
152151
mappingMergeReason
153152
);
154-
Exception mappingValidationException = validationResult.v2();
155153
responses.set(
156154
i,
157155
BulkItemResponse.success(
@@ -164,8 +162,9 @@ protected void doInternalExecute(
164162
request.source(),
165163
request.getContentType(),
166164
request.getExecutedPipelines(),
167-
validationResult.v1(),
168-
mappingValidationException
165+
validationResult.ignoredFields,
166+
validationResult.validationException,
167+
validationResult.effectiveMapping
169168
)
170169
)
171170
);
@@ -193,7 +192,7 @@ private MapperService.MergeReason getMergeReason(String mergeType) {
193192
* @return a Tuple containing: (1) in v1 the names of any fields that would be ignored upon indexing and (2) in v2 the mapping
194193
* exception if the source does not match the mappings, otherwise null
195194
*/
196-
private Tuple<Collection<String>, Exception> validateMappings(
195+
private ValidationResult validateMappings(
197196
Map<String, ComponentTemplate> componentTemplateSubstitutions,
198197
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions,
199198
Map<String, Object> mappingAddition,
@@ -211,6 +210,7 @@ private Tuple<Collection<String>, Exception> validateMappings(
211210
);
212211

213212
ProjectMetadata project = projectResolver.getProjectMetadata(clusterService.state());
213+
CompressedXContent effectiveMapping = null;
214214
Exception mappingValidationException = null;
215215
Collection<String> ignoredFields = List.of();
216216
IndexAbstraction indexAbstraction = project.getIndicesLookup().get(request.index());
@@ -222,8 +222,8 @@ private Tuple<Collection<String>, Exception> validateMappings(
222222
*/
223223
IndexMetadata imd = project.getIndexSafe(indexAbstraction.getWriteIndex(request, project));
224224
CompressedXContent mappings = Optional.ofNullable(imd.mapping()).map(MappingMetadata::source).orElse(null);
225-
CompressedXContent mergedMappings = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
226-
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, mergedMappings, request, sourceToParse, mappingMergeReason);
225+
effectiveMapping = mappingAddition == null ? null : mergeMappings(mappings, mappingAddition);
226+
ignoredFields = validateUpdatedMappingsFromIndexMetadata(imd, effectiveMapping, request, sourceToParse, mappingMergeReason);
227227
} else {
228228
/*
229229
* The index did not exist, or we have component template substitutions, so we put together the mappings from existing
@@ -281,8 +281,8 @@ private Tuple<Collection<String>, Exception> validateMappings(
281281
indexSettingProviders
282282
);
283283
CompressedXContent mappings = template.mappings();
284-
CompressedXContent mergedMappings = mergeMappings(mappings, mappingAddition);
285-
ignoredFields = validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse, mappingMergeReason);
284+
effectiveMapping = mergeMappings(mappings, mappingAddition);
285+
ignoredFields = validateUpdatedMappings(mappings, effectiveMapping, request, sourceToParse, mappingMergeReason);
286286
} else {
287287
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedProjectMetadata, request.index(), false);
288288
if (matchingTemplates.isEmpty() == false) {
@@ -295,23 +295,27 @@ private Tuple<Collection<String>, Exception> validateMappings(
295295
matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
296296
xContentRegistry
297297
);
298-
final CompressedXContent combinedMappings = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
299-
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
298+
effectiveMapping = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
299+
ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
300300
} else {
301301
/*
302302
* The index matched no templates and had no mapping of its own. If there were component template substitutions
303303
* or index template substitutions, they didn't match anything. So just apply the mapping addition if it exists,
304304
* and validate.
305305
*/
306-
final CompressedXContent combinedMappings = mergeMappings(null, mappingAddition);
307-
ignoredFields = validateUpdatedMappings(null, combinedMappings, request, sourceToParse, mappingMergeReason);
306+
effectiveMapping = mergeMappings(null, mappingAddition);
307+
ignoredFields = validateUpdatedMappings(null, effectiveMapping, request, sourceToParse, mappingMergeReason);
308308
}
309309
}
310310
}
311311
} catch (Exception e) {
312312
mappingValidationException = e;
313313
}
314-
return Tuple.tuple(ignoredFields, mappingValidationException);
314+
return new ValidationResult(effectiveMapping, mappingValidationException, ignoredFields);
315+
}
316+
317+
private record ValidationResult(CompressedXContent effectiveMapping, Exception validationException, Collection<String> ignoredFields) {
318+
315319
}
316320

317321
/*

server/src/main/java/org/elasticsearch/action/ingest/SimulateIndexResponse.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
1515
import org.elasticsearch.action.index.IndexResponse;
1616
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.common.compress.CompressedXContent;
1718
import org.elasticsearch.common.io.stream.StreamInput;
1819
import org.elasticsearch.common.io.stream.StreamOutput;
1920
import org.elasticsearch.common.xcontent.XContentHelper;
@@ -26,6 +27,7 @@
2627
import java.io.IOException;
2728
import java.util.Collection;
2829
import java.util.List;
30+
import java.util.Map;
2931

3032
/**
3133
* This is an IndexResponse that is specifically for simulate requests. Unlike typical IndexResponses, we need to include the original
@@ -37,6 +39,7 @@ public class SimulateIndexResponse extends IndexResponse {
3739
private final XContentType sourceXContentType;
3840
private final Collection<String> ignoredFields;
3941
private final Exception exception;
42+
private final CompressedXContent effectiveMapping;
4043

4144
@SuppressWarnings("this-escape")
4245
public SimulateIndexResponse(StreamInput in) throws IOException {
@@ -54,6 +57,15 @@ public SimulateIndexResponse(StreamInput in) throws IOException {
5457
} else {
5558
this.ignoredFields = List.of();
5659
}
60+
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
61+
if (in.readBoolean()) {
62+
this.effectiveMapping = CompressedXContent.readCompressedString(in);
63+
} else {
64+
this.effectiveMapping = null;
65+
}
66+
} else {
67+
effectiveMapping = null;
68+
}
5769
}
5870

5971
@SuppressWarnings("this-escape")
@@ -65,7 +77,8 @@ public SimulateIndexResponse(
6577
XContentType sourceXContentType,
6678
List<String> pipelines,
6779
Collection<String> ignoredFields,
68-
@Nullable Exception exception
80+
@Nullable Exception exception,
81+
@Nullable CompressedXContent effectiveMapping
6982
) {
7083
// We don't actually care about most of the IndexResponse fields:
7184
super(
@@ -83,6 +96,7 @@ public SimulateIndexResponse(
8396
setShardInfo(ShardInfo.EMPTY);
8497
this.ignoredFields = ignoredFields;
8598
this.exception = exception;
99+
this.effectiveMapping = effectiveMapping;
86100
}
87101

88102
@Override
@@ -108,6 +122,14 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
108122
ElasticsearchException.generateThrowableXContent(builder, params, exception);
109123
builder.endObject();
110124
}
125+
if (effectiveMapping == null) {
126+
builder.field("effective_mapping", Map.of());
127+
} else {
128+
builder.field(
129+
"effective_mapping",
130+
XContentHelper.convertToMap(effectiveMapping.uncompressed(), true, builder.contentType()).v2()
131+
);
132+
}
111133
return builder;
112134
}
113135

@@ -127,6 +149,12 @@ public void writeTo(StreamOutput out) throws IOException {
127149
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_IGNORED_FIELDS)) {
128150
out.writeStringCollection(ignoredFields);
129151
}
152+
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INGEST_EFFECTIVE_MAPPING)) {
153+
out.writeBoolean(effectiveMapping != null);
154+
if (effectiveMapping != null) {
155+
effectiveMapping.writeTo(out);
156+
}
157+
}
130158
}
131159

132160
public Exception getException() {

server/src/test/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionTests.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ public void onResponse(BulkResponse response) {
185185
"_index": "%s",
186186
"_version": -3,
187187
"_source": %s,
188-
"executed_pipelines": [%s]
188+
"executed_pipelines": [%s],
189+
"effective_mapping":{}
189190
}""",
190191
indexRequest.id(),
191192
indexRequest.index(),
@@ -319,7 +320,8 @@ public void onResponse(BulkResponse response) {
319320
"_version": -3,
320321
"_source": %s,
321322
"executed_pipelines": [%s],
322-
"error":{"type":"exception","reason":"invalid mapping"}
323+
"error":{"type":"exception","reason":"invalid mapping"},
324+
"effective_mapping":{"_doc":{"dynamic":"strict"}}
323325
}""",
324326
indexRequest.id(),
325327
indexName,
@@ -346,7 +348,8 @@ public void onResponse(BulkResponse response) {
346348
"_index": "%s",
347349
"_version": -3,
348350
"_source": %s,
349-
"executed_pipelines": [%s]
351+
"executed_pipelines": [%s],
352+
"effective_mapping":{"_doc":{"dynamic":"strict"}}
350353
}""",
351354
indexRequest.id(),
352355
indexName,
@@ -373,7 +376,9 @@ public void onFailure(Exception e) {
373376
};
374377
when(indicesService.withTempIndexService(any(), any())).thenAnswer((Answer<?>) invocation -> {
375378
IndexMetadata imd = invocation.getArgument(0);
376-
if (indicesWithInvalidMappings.contains(imd.getIndex().getName())) {
379+
if (indicesWithInvalidMappings.contains(imd.getIndex().getName())
380+
// We only want to throw exceptions inside TransportSimulateBulkAction:
381+
&& invocation.getArgument(1).getClass().getSimpleName().contains(TransportSimulateBulkAction.class.getSimpleName())) {
377382
throw new ElasticsearchException("invalid mapping");
378383
} else {
379384
// we don't actually care what is returned, as long as no exception is thrown the request is considered valid:

0 commit comments

Comments
 (0)