Skip to content

Commit ec50aaa

Browse files
authored
Making transport changes to enable component template substitutions in the simulate ingest API (elastic#113063)
1 parent 88a94c9 commit ec50aaa

File tree

13 files changed

+364
-51
lines changed

13 files changed

+364
-51
lines changed

server/src/internalClusterTest/java/org/elasticsearch/action/bulk/TransportSimulateBulkActionIT.java

Lines changed: 120 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
1616
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
1717
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
18+
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
1819
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
1920
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
2021
import org.elasticsearch.action.index.IndexRequest;
2122
import org.elasticsearch.action.ingest.SimulateIndexResponse;
2223
import org.elasticsearch.action.search.SearchRequest;
2324
import org.elasticsearch.action.search.SearchResponse;
25+
import org.elasticsearch.cluster.metadata.ComponentTemplate;
2426
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2527
import org.elasticsearch.cluster.metadata.Template;
2628
import org.elasticsearch.common.compress.CompressedXContent;
@@ -57,7 +59,7 @@ public void testMappingValidationIndexExists() {
5759
}
5860
""";
5961
indicesAdmin().create(new CreateIndexRequest(indexName).mapping(mapping)).actionGet();
60-
BulkRequest bulkRequest = new BulkRequest();
62+
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
6163
bulkRequest.add(new IndexRequest(indexName).source("""
6264
{
6365
"foo1": "baz"
@@ -87,13 +89,125 @@ public void testMappingValidationIndexExists() {
8789
assertThat(fields.size(), equalTo(1));
8890
}
8991

92+
@SuppressWarnings("unchecked")
93+
public void testMappingValidationIndexExistsWithComponentTemplate() throws IOException {
94+
/*
95+
* This test simulates a BulkRequest of two documents into an existing index. Then we make sure the index contains no documents, and
96+
* that the index's mapping in the cluster state has not been updated with the two new field. With the mapping from the template
97+
* that was used to create the index, we would expect the second document to throw an exception because it uses a field that does
98+
* not exist. But we substitute a new version of the component template named "test-component-template" that allows for the new
99+
* field.
100+
*/
101+
String originalComponentTemplateMappingString = """
102+
{
103+
"_doc":{
104+
"dynamic":"strict",
105+
"properties":{
106+
"foo1":{
107+
"type":"text"
108+
}
109+
}
110+
}
111+
}
112+
""";
113+
CompressedXContent mapping = CompressedXContent.fromJSON(originalComponentTemplateMappingString);
114+
Template template = new Template(Settings.EMPTY, mapping, null);
115+
PutComponentTemplateAction.Request componentTemplateActionRequest = new PutComponentTemplateAction.Request(
116+
"test-component-template"
117+
);
118+
ComponentTemplate componentTemplate = new ComponentTemplate(template, null, null);
119+
componentTemplateActionRequest.componentTemplate(componentTemplate);
120+
client().execute(PutComponentTemplateAction.INSTANCE, componentTemplateActionRequest).actionGet();
121+
ComposableIndexTemplate composableIndexTemplate = ComposableIndexTemplate.builder()
122+
.indexPatterns(List.of("my-index-*"))
123+
.componentTemplates(List.of("test-component-template"))
124+
.build();
125+
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request("test");
126+
request.indexTemplate(composableIndexTemplate);
127+
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
128+
129+
String indexName = "my-index-1";
130+
// First, run before the index is created:
131+
assertMappingsUpdatedFromComponentTemplateSubstitutions(indexName);
132+
// Now, create the index and make sure the component template substitutions work the same:
133+
indicesAdmin().create(new CreateIndexRequest(indexName)).actionGet();
134+
assertMappingsUpdatedFromComponentTemplateSubstitutions(indexName);
135+
// Now make sure nothing was actually changed:
136+
indicesAdmin().refresh(new RefreshRequest(indexName)).actionGet();
137+
SearchResponse searchResponse = client().search(new SearchRequest(indexName)).actionGet();
138+
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
139+
searchResponse.decRef();
140+
ClusterStateResponse clusterStateResponse = admin().cluster().state(new ClusterStateRequest(TEST_REQUEST_TIMEOUT)).actionGet();
141+
Map<String, Object> indexMapping = clusterStateResponse.getState().metadata().index(indexName).mapping().sourceAsMap();
142+
Map<String, Object> fields = (Map<String, Object>) indexMapping.get("properties");
143+
assertThat(fields.size(), equalTo(1));
144+
}
145+
146+
private void assertMappingsUpdatedFromComponentTemplateSubstitutions(String indexName) {
147+
IndexRequest indexRequest1 = new IndexRequest(indexName).source("""
148+
{
149+
"foo1": "baz"
150+
}
151+
""", XContentType.JSON).id(randomUUID());
152+
IndexRequest indexRequest2 = new IndexRequest(indexName).source("""
153+
{
154+
"foo3": "baz"
155+
}
156+
""", XContentType.JSON).id(randomUUID());
157+
{
158+
// First we use the original component template, and expect a failure in the second document:
159+
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
160+
bulkRequest.add(indexRequest1);
161+
bulkRequest.add(indexRequest2);
162+
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
163+
assertThat(response.getItems().length, equalTo(2));
164+
assertThat(response.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
165+
assertNull(((SimulateIndexResponse) response.getItems()[0].getResponse()).getException());
166+
assertThat(response.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
167+
assertThat(
168+
((SimulateIndexResponse) response.getItems()[1].getResponse()).getException().getMessage(),
169+
containsString("mapping set to strict, dynamic introduction of")
170+
);
171+
}
172+
173+
{
174+
// Now we substitute a "test-component-template" that defines both fields, so we expect no exception:
175+
BulkRequest bulkRequest = new SimulateBulkRequest(
176+
Map.of(),
177+
Map.of(
178+
"test-component-template",
179+
Map.of(
180+
"template",
181+
Map.of(
182+
"mappings",
183+
Map.of(
184+
"dynamic",
185+
"strict",
186+
"properties",
187+
Map.of("foo1", Map.of("type", "text"), "foo3", Map.of("type", "text"))
188+
)
189+
)
190+
)
191+
)
192+
);
193+
bulkRequest.add(indexRequest1);
194+
bulkRequest.add(indexRequest2);
195+
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
196+
assertThat(response.getItems().length, equalTo(2));
197+
assertThat(response.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
198+
assertNull(((SimulateIndexResponse) response.getItems()[0].getResponse()).getException());
199+
assertThat(response.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
200+
assertNull(((SimulateIndexResponse) response.getItems()[1].getResponse()).getException());
201+
}
202+
}
203+
90204
public void testMappingValidationIndexDoesNotExistsNoTemplate() {
91205
/*
92206
* This test simulates a BulkRequest of two documents into an index that does not exist. There is no template (other than the
93207
* mapping-less "random-index-template" created by the parent class), so we expect no mapping validation failure.
94208
*/
95209
String indexName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
96-
BulkRequest bulkRequest = new BulkRequest();
210+
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
97211
bulkRequest.add(new IndexRequest(indexName).source("""
98212
{
99213
"foo1": "baz"
@@ -140,7 +254,7 @@ public void testMappingValidationIndexDoesNotExistsV2Template() throws IOExcepti
140254
request.indexTemplate(composableIndexTemplate);
141255

142256
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
143-
BulkRequest bulkRequest = new BulkRequest();
257+
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
144258
bulkRequest.add(new IndexRequest(indexName).source("""
145259
{
146260
"foo1": "baz"
@@ -172,7 +286,7 @@ public void testMappingValidationIndexDoesNotExistsV1Template() {
172286
indicesAdmin().putTemplate(
173287
new PutIndexTemplateRequest("test-template").patterns(List.of("my-index-*")).mapping("foo1", "type=integer")
174288
).actionGet();
175-
BulkRequest bulkRequest = new BulkRequest();
289+
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
176290
bulkRequest.add(new IndexRequest(indexName).source("""
177291
{
178292
"foo1": "baz"
@@ -226,7 +340,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
226340
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
227341
{
228342
// First, try with no @timestamp to make sure we're picking up data-stream-specific templates
229-
BulkRequest bulkRequest = new BulkRequest();
343+
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
230344
bulkRequest.add(new IndexRequest(indexName).source("""
231345
{
232346
"foo1": "baz"
@@ -252,7 +366,7 @@ public void testMappingValidationIndexDoesNotExistsDataStream() throws IOExcepti
252366
}
253367
{
254368
// Now with @timestamp
255-
BulkRequest bulkRequest = new BulkRequest();
369+
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
256370
bulkRequest.add(new IndexRequest(indexName).source("""
257371
{
258372
"@timestamp": "2024-08-27",

server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateIndexTemplateAction.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.block.ClusterBlockException;
1717
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1818
import org.elasticsearch.cluster.metadata.AliasMetadata;
19+
import org.elasticsearch.cluster.metadata.ComponentTemplate;
1920
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
2021
import org.elasticsearch.cluster.metadata.DataStream;
2122
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
@@ -156,7 +157,8 @@ protected void masterOperation(
156157
xContentRegistry,
157158
indicesService,
158159
systemIndices,
159-
indexSettingProviders
160+
indexSettingProviders,
161+
Map.of()
160162
);
161163

162164
final Map<String, List<String>> overlapping = new HashMap<>();
@@ -233,7 +235,8 @@ public static Template resolveTemplate(
233235
final NamedXContentRegistry xContentRegistry,
234236
final IndicesService indicesService,
235237
final SystemIndices systemIndices,
236-
Set<IndexSettingProvider> indexSettingProviders
238+
Set<IndexSettingProvider> indexSettingProviders,
239+
Map<String, ComponentTemplate> componentTemplateSubstitutions
237240
) throws Exception {
238241
var metadata = simulatedState.getMetadata();
239242
Settings templateSettings = resolveSettings(simulatedState.metadata(), matchingTemplate);
@@ -263,6 +266,7 @@ public static Template resolveTemplate(
263266
null, // empty request mapping as the user can't specify any explicit mappings via the simulate api
264267
simulatedState,
265268
matchingTemplate,
269+
componentTemplateSubstitutions,
266270
xContentRegistry,
267271
simulatedIndexName
268272
);

server/src/main/java/org/elasticsearch/action/admin/indices/template/post/TransportSimulateTemplateAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,8 @@ protected void masterOperation(
170170
xContentRegistry,
171171
indicesService,
172172
systemIndices,
173-
indexSettingProviders
173+
indexSettingProviders,
174+
Map.of()
174175
);
175176
if (request.includeDefaults()) {
176177
listener.onResponse(

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.cluster.ClusterStateObserver;
2525
import org.elasticsearch.cluster.block.ClusterBlockException;
2626
import org.elasticsearch.cluster.block.ClusterBlockLevel;
27+
import org.elasticsearch.cluster.metadata.ComponentTemplate;
2728
import org.elasticsearch.cluster.metadata.Metadata;
2829
import org.elasticsearch.cluster.service.ClusterService;
2930
import org.elasticsearch.common.io.stream.Writeable;
@@ -39,6 +40,8 @@
3940
import org.elasticsearch.threadpool.ThreadPool;
4041
import org.elasticsearch.transport.TransportService;
4142

43+
import java.io.IOException;
44+
import java.util.Map;
4245
import java.util.Objects;
4346
import java.util.concurrent.Executor;
4447
import java.util.concurrent.TimeUnit;
@@ -168,19 +171,21 @@ public void onTimeout(TimeValue timeout) {
168171
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
169172
executor.execute(new ActionRunnable<>(releasingListener) {
170173
@Override
171-
protected void doRun() {
174+
protected void doRun() throws IOException {
172175
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
173176
}
174177
});
175178
}
176179

177-
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener) {
180+
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
181+
throws IOException {
178182
boolean hasIndexRequestsWithPipelines = false;
179183
final Metadata metadata = clusterService.state().getMetadata();
184+
Map<String, ComponentTemplate> templateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
180185
for (DocWriteRequest<?> actionRequest : bulkRequest.requests) {
181186
IndexRequest indexRequest = getIndexWriteRequest(actionRequest);
182187
if (indexRequest != null) {
183-
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata);
188+
IngestService.resolvePipelinesAndUpdateIndexRequest(actionRequest, indexRequest, metadata, templateSubstitutions);
184189
hasIndexRequestsWithPipelines |= IngestService.hasPipeline(indexRequest);
185190
}
186191

@@ -250,7 +255,7 @@ private void processBulkIndexIngestRequest(
250255
} else {
251256
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
252257
@Override
253-
protected void doRun() {
258+
protected void doRun() throws IOException {
254259
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
255260
}
256261

@@ -328,7 +333,7 @@ private void applyPipelinesAndDoInternalExecute(
328333
BulkRequest bulkRequest,
329334
Executor executor,
330335
ActionListener<BulkResponse> listener
331-
) {
336+
) throws IOException {
332337
final long relativeStartTimeNanos = relativeTimeNanos();
333338
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
334339
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
@@ -349,6 +354,6 @@ protected abstract void doInternalExecute(
349354
Executor executor,
350355
ActionListener<BulkResponse> listener,
351356
long relativeStartTimeNanos
352-
);
357+
) throws IOException;
353358

354359
}

server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.threadpool.ThreadPool;
5454
import org.elasticsearch.transport.TransportService;
5555

56+
import java.io.IOException;
5657
import java.util.HashMap;
5758
import java.util.HashSet;
5859
import java.util.Map;
@@ -204,7 +205,11 @@ protected void doInternalExecute(
204205
Executor executor,
205206
ActionListener<BulkResponse> listener,
206207
long relativeStartTimeNanos
207-
) {
208+
) throws IOException {
209+
assert (bulkRequest instanceof SimulateBulkRequest) == false
210+
: "TransportBulkAction should never be called with a SimulateBulkRequest";
211+
assert bulkRequest.getComponentTemplateSubstitutions().isEmpty()
212+
: "Component template substitutions are not allowed in a non-simulated bulk";
208213
trackIndexRequests(bulkRequest);
209214

210215
Map<String, CreateIndexRequest> indicesToAutoCreate = new HashMap<>();

0 commit comments

Comments
 (0)