Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/112957.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 112957
summary: Making changes to `BulkRequest` to enable component template substitutions
in the simulate ingest API
area: Ingest Node
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ static TransportVersion def(int id) {
public static final TransportVersion CCS_TELEMETRY_STATS = def(8_739_00_0);
public static final TransportVersion GLOBAL_RETENTION_TELEMETRY = def(8_740_00_0);
public static final TransportVersion ROUTING_TABLE_VERSION_REMOVED = def(8_741_00_0);
public static final TransportVersion SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS = def(8_742_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -38,6 +39,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

Expand Down Expand Up @@ -475,4 +477,23 @@ public Set<String> getIndices() {
public boolean isSimulated() {
return false; // Always false, but may be overridden by a subclass
}

/*
* Returns any component template substitutions that are to be used as part of this bulk request. We would likely only have
* substitutions in the event of a simulated request.
*/
public Map<String, ComponentTemplate> getComponentTemplateSubstitutions() throws IOException {
return Map.of();
}

/*
* This copies this bulk request, but without all of its inner requests
*/
public BulkRequest shallowClone() {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(getRefreshPolicy());
bulkRequest.waitForActiveShards(waitForActiveShards());
bulkRequest.timeout(timeout());
return bulkRequest;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BulkRequest has many other things than only the refresh policy, active shards, and timeout. Are we going to copy the rest of them also? Should the javadoc reflect what's being copied?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went his way because this is what BulkRequestModifier did (this code was moved out of there). I wavered on whether I ought to add the other fields or not, and wound up just repeating and testing the previous behavior. Any thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just documenting it in the javadoc is fine

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wound up adding the rest of the fields just in case (I don't think they'd intentionally been left out -- I think they were just added after the BulkRequestModifier method had been written).

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,7 @@ BulkRequest getBulkRequest() {
if (itemResponses.isEmpty()) {
return bulkRequest;
} else {
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.setRefreshPolicy(bulkRequest.getRefreshPolicy());
modifiedBulkRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
modifiedBulkRequest.timeout(bulkRequest.timeout());
BulkRequest modifiedBulkRequest = bulkRequest.shallowClone();

int slot = 0;
List<DocWriteRequest<?>> requests = bulkRequest.requests();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,24 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
* This extends BulkRequest with support for providing substitute pipeline definitions. In a user request, the pipeline substitutions
* will look something like this:
* This extends BulkRequest with support for providing substitute pipeline definitions and component template definitions. In a user
* request, the substitutions will look something like this:
*
* "pipeline_substitutions": {
* "my-pipeline-1": {
Expand All @@ -45,6 +53,27 @@
* }
* ]
* }
* },
* "component_template_substitutions": {
* "my-template-1": {
* "settings": {
* "number_of_shards": 1
* },
* "mappings": {
* "_source": {
* "enabled": false
* },
* "properties": {
* "host_name": {
* "type": "keyword"
* },
* "created_at": {
* "type": "date",
* "format": "EEE MMM dd HH:mm:ss Z yyyy"
* }
* }
* }
* }
* }
*
* The pipelineSubstitutions Map held by this class is intended to be the result of XContentHelper.convertToMap(). The top-level keys
Expand All @@ -53,27 +82,42 @@
*/
public class SimulateBulkRequest extends BulkRequest {
private final Map<String, Map<String, Object>> pipelineSubstitutions;
private final Map<String, Map<String, Object>> componentTemplateSubstitutions;

/**
* @param pipelineSubstitutions The pipeline definitions that are to be used in place of any pre-existing pipeline definitions with
* the same pipelineId. The key of the map is the pipelineId, and the value the pipeline definition as
* parsed by XContentHelper.convertToMap().
* @param componentTemplateSubstitutions The component template definitions that are to be used in place of any pre-existing
* component template definitions with the same name.
*/
public SimulateBulkRequest(@Nullable Map<String, Map<String, Object>> pipelineSubstitutions) {
public SimulateBulkRequest(
@Nullable Map<String, Map<String, Object>> pipelineSubstitutions,
@Nullable Map<String, Map<String, Object>> componentTemplateSubstitutions
) {
super();
this.pipelineSubstitutions = pipelineSubstitutions;
this.componentTemplateSubstitutions = componentTemplateSubstitutions;
}

@SuppressWarnings("unchecked")
public SimulateBulkRequest(StreamInput in) throws IOException {
super(in);
this.pipelineSubstitutions = (Map<String, Map<String, Object>>) in.readGenericValue();
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS)) {
this.componentTemplateSubstitutions = (Map<String, Map<String, Object>>) in.readGenericValue();
} else {
componentTemplateSubstitutions = Map.of();
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeGenericValue(pipelineSubstitutions);
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_COMPONENT_TEMPLATES_SUBSTITUTIONS)) {
out.writeGenericValue(componentTemplateSubstitutions);
}
}

public Map<String, Map<String, Object>> getPipelineSubstitutions() {
Expand All @@ -84,4 +128,41 @@ public Map<String, Map<String, Object>> getPipelineSubstitutions() {
public boolean isSimulated() {
return true;
}

@Override
public Map<String, ComponentTemplate> getComponentTemplateSubstitutions() throws IOException {
if (componentTemplateSubstitutions == null) {
return Map.of();
}
Map<String, ComponentTemplate> result = new HashMap<>(componentTemplateSubstitutions.size());
for (Map.Entry<String, Map<String, Object>> rawEntry : componentTemplateSubstitutions.entrySet()) {
result.put(rawEntry.getKey(), convertRawTemplateToComponentTemplate(rawEntry.getValue()));
}
return result;
}

@SuppressWarnings("unchecked")
private static ComponentTemplate convertRawTemplateToComponentTemplate(Map<String, Object> rawTemplate) throws IOException {
Settings settings = null;
CompressedXContent mappings = null;
if (rawTemplate.containsKey("mappings")) {
mappings = new CompressedXContent((Map<String, Object>) rawTemplate.get("mappings"));
}
if (rawTemplate.containsKey("settings")) {
settings = Settings.builder().loadFromMap((Map<String, ?>) rawTemplate.get("settings")).build();
}
Map<String, AliasMetadata> aliases = null;
DataStreamLifecycle lifecycle = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to cause problems with it having unconsumed xcontent? Should we be throwing an exception for all other keys other than mappings and settings to show the user that those cannot be simulated? Or should we keep the leniency?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if we're ignoring eventual data_stream contents, then eventually from #98877 which wants to:

Figure out how to specify tsdb settings in component templates. For example index.routing_path can be specified in a composable index template if data stream template' index_mode is set to time_series.

Then we may need the whole thing for validation? This may be too premature, I'm just curious right now. Would it make sense to just load the whole thing into a Template from the JSON?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess there's no harm in just using it all. Plus it simplifies my code. At this point though, I'm working with a Map -- think I ought to just pass it through all the way as xcontent? It's not really performance-sensitive since this is simulate code. And it's convenient for the sake of testing to have these overrides as a Map.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefit is that if ComponentTemplate.parse ever changes to do some kind of validation, this would pick that up at least

Template template = new Template(settings, mappings, aliases, lifecycle);
return new ComponentTemplate(template, null, null);
}

@Override
public BulkRequest shallowClone() {
BulkRequest bulkRequest = new SimulateBulkRequest(pipelineSubstitutions, componentTemplateSubstitutions);
bulkRequest.setRefreshPolicy(getRefreshPolicy());
bulkRequest.waitForActiveShards(waitForActiveShards());
bulkRequest.timeout(timeout());
return bulkRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
Map<String, Object> sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2();
SimulateBulkRequest bulkRequest = new SimulateBulkRequest(
(Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions")
(Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions"),
(Map<String, Map<String, Object>>) sourceMap.remove("component_template_substitutions")
);
BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap);
bulkRequest.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,16 @@ public void testUnsupportedAction() {
allOf(containsString("Malformed action/metadata line [1]"), containsString("found [get"))
);
}

public void testShallowClone() {
BulkRequest simulateBulkRequest = new BulkRequest();
simulateBulkRequest.setRefreshPolicy(randomFrom(RefreshPolicy.values()));
simulateBulkRequest.waitForActiveShards(randomIntBetween(1, 10));
simulateBulkRequest.timeout(randomTimeValue());
BulkRequest shallowCopy = simulateBulkRequest.shallowClone();
assertThat(shallowCopy.requests, equalTo(List.of()));
assertThat(shallowCopy.getRefreshPolicy(), equalTo(simulateBulkRequest.getRefreshPolicy()));
assertThat(shallowCopy.waitForActiveShards(), equalTo(simulateBulkRequest.waitForActiveShards()));
assertThat(shallowCopy.timeout(), equalTo(simulateBulkRequest.timeout()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,36 @@

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;

public class SimulateBulkRequestTests extends ESTestCase {

public void testSerialization() throws Exception {
testSerialization(getTestPipelineSubstitutions());
testSerialization(null);
testSerialization(Map.of());
testSerialization(getTestPipelineSubstitutions(), getTestTemplateSubstitutions());
testSerialization(getTestPipelineSubstitutions(), null);
testSerialization(null, getTestTemplateSubstitutions());
testSerialization(null, null);
testSerialization(Map.of(), Map.of());
}

private void testSerialization(Map<String, Map<String, Object>> pipelineSubstitutions) throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions);
private void testSerialization(
Map<String, Map<String, Object>> pipelineSubstitutions,
Map<String, Map<String, Object>> templateSubstitutions
) throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, templateSubstitutions);
/*
* Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a
* Writable
Expand All @@ -35,6 +47,77 @@ private void testSerialization(Map<String, Map<String, Object>> pipelineSubstitu
assertThat(copy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions()));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public void testGetComponentTemplateSubstitutions() throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of());
assertThat(simulateBulkRequest.getComponentTemplateSubstitutions(), equalTo(Map.of()));
String substituteComponentTemplatesString = """
{
"mappings_template": {
"mappings": {
"dynamic": "true",
"properties": {
"foo": {
"type": "keyword"
}
}
}
},
"settings_template": {
"settings": {
"index": {
"default_pipeline": "bar-pipeline"
}
}
}
}
""";

Map tempMap = XContentHelper.convertToMap(
new BytesArray(substituteComponentTemplatesString.getBytes(StandardCharsets.UTF_8)),
randomBoolean(),
XContentType.JSON
).v2();
Map<String, Map<String, Object>> substituteComponentTemplates = (Map<String, Map<String, Object>>) tempMap;
simulateBulkRequest = new SimulateBulkRequest(Map.of(), substituteComponentTemplates);
Map<String, ComponentTemplate> componentTemplateSubstitutions = simulateBulkRequest.getComponentTemplateSubstitutions();
assertThat(componentTemplateSubstitutions.size(), equalTo(2));
assertThat(
XContentHelper.convertToMap(
componentTemplateSubstitutions.get("mappings_template").template().mappings().uncompressed(),
randomBoolean(),
XContentType.JSON
).v2(),
equalTo(substituteComponentTemplates.get("mappings_template").get("mappings"))
);
assertNull(componentTemplateSubstitutions.get("mappings_template").template().settings());
assertNull(componentTemplateSubstitutions.get("settings_template").template().mappings());
assertThat(componentTemplateSubstitutions.get("settings_template").template().settings().size(), equalTo(1));
assertThat(
componentTemplateSubstitutions.get("settings_template").template().settings().get("index.default_pipeline"),
equalTo("bar-pipeline")
);
}

public void testShallowClone() throws IOException {
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(getTestPipelineSubstitutions(), getTestTemplateSubstitutions());
simulateBulkRequest.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values()));
simulateBulkRequest.waitForActiveShards(randomIntBetween(1, 10));
simulateBulkRequest.timeout(randomTimeValue());
BulkRequest shallowCopy = simulateBulkRequest.shallowClone();
assertThat(shallowCopy, instanceOf(SimulateBulkRequest.class));
SimulateBulkRequest simulateBulkRequestCopy = (SimulateBulkRequest) shallowCopy;
assertThat(simulateBulkRequestCopy.requests, equalTo(List.of()));
assertThat(
simulateBulkRequestCopy.getComponentTemplateSubstitutions(),
equalTo(simulateBulkRequest.getComponentTemplateSubstitutions())
);
assertThat(simulateBulkRequestCopy.getPipelineSubstitutions(), equalTo(simulateBulkRequest.getPipelineSubstitutions()));
assertThat(simulateBulkRequestCopy.getRefreshPolicy(), equalTo(simulateBulkRequest.getRefreshPolicy()));
assertThat(simulateBulkRequestCopy.waitForActiveShards(), equalTo(simulateBulkRequest.waitForActiveShards()));
assertThat(simulateBulkRequestCopy.timeout(), equalTo(simulateBulkRequest.timeout()));
}

private static Map<String, Map<String, Object>> getTestPipelineSubstitutions() {
return Map.of(
"pipeline1",
Expand All @@ -43,4 +126,13 @@ private static Map<String, Map<String, Object>> getTestPipelineSubstitutions() {
Map.of("processors", List.of(Map.of("processor3", Map.of())))
);
}

private static Map<String, Map<String, Object>> getTestTemplateSubstitutions() {
return Map.of(
"template1",
Map.of("mappings", Map.of("_source", Map.of("enabled", false), "properties", Map.of()), "settings", Map.of()),
"template2",
Map.of("mappings", Map.of(), "settings", Map.of())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void tearDown() throws Exception {

public void testIndexData() {
Task task = mock(Task.class); // unused
BulkRequest bulkRequest = new SimulateBulkRequest((Map<String, Map<String, Object>>) null);
BulkRequest bulkRequest = new SimulateBulkRequest(null, null);
int bulkItemCount = randomIntBetween(0, 200);
for (int i = 0; i < bulkItemCount; i++) {
Map<String, ?> source = Map.of(randomAlphaOfLength(10), randomAlphaOfLength(5));
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testIndexDataWithValidation() throws IOException {
* (7) An indexing request to a nonexistent index that matches no templates
*/
Task task = mock(Task.class); // unused
BulkRequest bulkRequest = new SimulateBulkRequest((Map<String, Map<String, Object>>) null);
BulkRequest bulkRequest = new SimulateBulkRequest(null, null);
int bulkItemCount = randomIntBetween(0, 200);
Map<String, IndexMetadata> indicesMap = new HashMap<>();
Map<String, IndexTemplateMetadata> v1Templates = new HashMap<>();
Expand Down
Loading