diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java index 12f039a5a122f..73ed6a4d2ef09 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/bulk/RestNoopBulkAction.java @@ -22,6 +22,7 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; @@ -75,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC defaultListExecutedPipelines, true, request.getXContentType(), + RestBulkAction.BulkFormat.MARKER_SUFFIX, request.getRestApiVersion() ); diff --git a/docs/changelog/135506.yaml b/docs/changelog/135506.yaml new file mode 100644 index 0000000000000..be61a6832909e --- /dev/null +++ b/docs/changelog/135506.yaml @@ -0,0 +1,6 @@ +pr: 135506 +summary: "[Draft] Introduce a bulk format that uses a prefix length" +area: Store +type: enhancement +issues: + - 94319 diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkLengthPrefixedRestIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkLengthPrefixedRestIT.java new file mode 100644 index 0000000000000..04423be90ac29 --- /dev/null +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkLengthPrefixedRestIT.java @@ -0,0 +1,304 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.http; + +import org.apache.http.entity.ByteArrayEntity; +import org.elasticsearch.action.bulk.IncrementalBulkService; +import org.elasticsearch.action.bulk.XContentLengthPrefixedStreamingType; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentType; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.rest.RestStatus.OK; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) +public class BulkLengthPrefixedRestIT extends HttpSmokeTestCase { + + private final RequestOptions options; + private final XContentLengthPrefixedStreamingType xContentLengthPrefixedStreamingType; + private final XContentType xContentType; + + public BulkLengthPrefixedRestIT() { + xContentLengthPrefixedStreamingType = randomFrom(XContentLengthPrefixedStreamingType.values()); + xContentType = xContentLengthPrefixedStreamingType.xContentType(); + options = RequestOptions.DEFAULT.toBuilder() + .addHeader("Content-Type", randomFrom(xContentLengthPrefixedStreamingType.headerValues()).v1()) + .build(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), seventyFivePercentOfTheTime()) + .build(); + } + + private static boolean seventyFivePercentOfTheTime() { + return (randomBoolean() && randomBoolean()) == false; + } + + public void testPrefixLengthFormatCapability() throws IOException { + { + String verb = randomBoolean() ? "PUT" : "POST"; + Request request = new Request("GET", "/_capabilities?method=" + verb + "&path=_bulk&capabilities=prefix_length_format"); + Response response = getRestClient().performRequest(request); + assertEquals(200, response.getStatusLine().getStatusCode()); + Map responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + response.getEntity().getContent(), + true + ); + assertEquals(Boolean.TRUE, responseMap.get("supported")); + } + { + String verb = randomBoolean() ? "GET" : "DELETE"; + Request request = new Request("GET", "/_capabilities?method=" + verb + "&path=_bulk&capabilities=prefix_length_format"); + Response response = getRestClient().performRequest(request); + assertEquals(200, response.getStatusLine().getStatusCode()); + Map responseMap = XContentHelper.convertToMap( + XContentType.JSON.xContent(), + response.getEntity().getContent(), + true + ); + assertEquals(Boolean.FALSE, responseMap.get("supported")); + } + } + + public void testBulkMissingBody() { + Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); + request.setOptions(options); + request.setEntity(new ByteArrayEntity(new byte[0])); + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat(responseException.getMessage(), containsString("request body is required")); + } + + public void testBulkRequestBodyWrongLength() throws IOException { + Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk"); + request.setOptions(options); + // missing final line of the bulk body. cannot process + try (ByteArrayOutputStream bulk = new ByteArrayOutputStream(); ByteArrayOutputStream doc = new ByteArrayOutputStream()) { + DataOutput dataOutput = new DataOutputStream(bulk); + createActionDocument(doc, "index", "index_name", "1"); + writeDocToBulk(bulk, doc); + createDocument(doc, 1); + dataOutput.writeInt(doc.size() + 1); + doc.writeTo(bulk); + doc.reset(); + request.setEntity(new ByteArrayEntity(bulk.toByteArray())); + } + ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request)); + assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode()); + assertThat( + responseException.getMessage(), + containsString("Documents in the bulk request must be prefixed with the length of the document") + ); + } + + public void testBulkRequest() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); + firstBulkRequest.setOptions(options); + try (ByteArrayOutputStream bulk = new ByteArrayOutputStream(); ByteArrayOutputStream doc = new ByteArrayOutputStream()) { + createActionDocument(doc, "index", "index_name", "1"); + writeDocToBulk(bulk, doc); + createDocument(doc, 1); + writeDocToBulk(bulk, doc); + createActionDocument(doc, "index", "index_name", "2"); + writeDocToBulk(bulk, doc); + createDocument(doc, 2); + writeDocToBulk(bulk, doc); + firstBulkRequest.setEntity(new ByteArrayEntity(bulk.toByteArray())); + } + + final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + sendLargeBulk(); + } + + public void testBulkWithIncrementalDisabled() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request firstBulkRequest = new Request("POST", "/index_name/_bulk"); + firstBulkRequest.setOptions(options); + try (ByteArrayOutputStream bulk = new ByteArrayOutputStream(); ByteArrayOutputStream doc = new ByteArrayOutputStream()) { + createActionDocument(doc, "index", "index_name", "1"); + writeDocToBulk(bulk, doc); + createDocument(doc, 1); + writeDocToBulk(bulk, doc); + createActionDocument(doc, "index", "index_name", "2"); + writeDocToBulk(bulk, doc); + createDocument(doc, 2); + writeDocToBulk(bulk, doc); + firstBulkRequest.setEntity(new ByteArrayEntity(bulk.toByteArray())); + } + + final Response indexSuccessFul = getRestClient().performRequest(firstBulkRequest); + assertThat(indexSuccessFul.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), false)); + + internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(false)); + + try { + sendLargeBulk(); + } finally { + internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true)); + updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null)); + } + } + + public void testMalformedActionLineBulk() throws IOException { + Request createRequest = new Request("PUT", "/index_name"); + createRequest.setJsonEntity(""" + { + "settings": { + "index": { + "number_of_shards": 1, + "number_of_replicas": 1, + "write.wait_for_active_shards": 2 + } + } + }"""); + final Response indexCreatedResponse = getRestClient().performRequest(createRequest); + assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + + Request bulkRequest = new Request("POST", "/index_name/_bulk"); + bulkRequest.setOptions(options); + + try (ByteArrayOutputStream bulk = new ByteArrayOutputStream(); ByteArrayOutputStream doc = new ByteArrayOutputStream()) { + createActionDocument(doc, "index", "index_name", null); + writeDocToBulk(bulk, doc); + createDocument(doc, 1); + writeDocToBulk(bulk, doc); + try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, doc)) { + builder.startObject(); + builder.endObject(); + } + writeDocToBulk(bulk, doc); + bulkRequest.setEntity(new ByteArrayEntity(bulk.toByteArray())); + } + + expectThrows(ResponseException.class, () -> getRestClient().performRequest(bulkRequest)); + } + + @SuppressWarnings("unchecked") + private void sendLargeBulk() throws IOException { + Request bulkRequest = new Request("POST", "/index_name/_bulk"); + bulkRequest.setOptions(options); + int updates = 0; + try (ByteArrayOutputStream bulk = new ByteArrayOutputStream(); ByteArrayOutputStream doc = new ByteArrayOutputStream()) { + createActionDocument(doc, "delete", "index_name", "1"); + writeDocToBulk(bulk, doc); + for (int i = 0; i < 1000; i++) { + createActionDocument(doc, "index", "index_name", null); + writeDocToBulk(bulk, doc); + createDocument(doc, i); + writeDocToBulk(bulk, doc); + if (randomBoolean() && randomBoolean() && randomBoolean() && randomBoolean()) { + ++updates; + createActionDocument(doc, "update", "index_name", "2"); + writeDocToBulk(bulk, doc); + createUpdateDocument(doc, i); + writeDocToBulk(bulk, doc); + } + } + bulkRequest.setEntity(new ByteArrayEntity(bulk.toByteArray())); + } + + final Response bulkResponse = getRestClient().performRequest(bulkRequest); + assertThat(bulkResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus())); + Map responseMap = XContentHelper.convertToMap(xContentType.xContent(), bulkResponse.getEntity().getContent(), true); + + assertFalse((Boolean) responseMap.get("errors")); + assertThat(((List) responseMap.get("items")).size(), equalTo(1001 + updates)); + } + + private void createActionDocument(OutputStream doc, String action, String indexName, String id) throws IOException { + try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, doc)) { + builder.startObject(); + builder.startObject(action); + builder.field("_index", indexName); + if (id != null) { + builder.field("_id", id); + } + builder.endObject(); + builder.endObject(); + } + } + + private void createDocument(OutputStream doc, int value) throws IOException { + try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, doc)) { + builder.startObject(); + builder.field("field", value); + builder.endObject(); + } + } + + private void createUpdateDocument(OutputStream doc, int value) throws IOException { + try (XContentBuilder builder = XContentFactory.contentBuilder(xContentType, doc)) { + builder.startObject(); + builder.startObject("doc"); + builder.field("field", value); + builder.endObject(); + builder.endObject(); + } + } + + private void writeDocToBulk(OutputStream bulk, ByteArrayOutputStream doc) throws IOException { + new DataOutputStream(bulk).writeInt(doc.size()); + doc.writeTo(bulk); + doc.reset(); + } +} diff --git a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkMarkerSuffixIRestT.java similarity index 99% rename from qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java rename to qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkMarkerSuffixIRestT.java index 3faa88339f0a3..4d1b1ebe1c954 100644 --- a/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java +++ b/qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkMarkerSuffixIRestT.java @@ -31,7 +31,7 @@ import static org.hamcrest.Matchers.equalTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) -public class BulkRestIT extends HttpSmokeTestCase { +public class BulkMarkerSuffixIRestT extends HttpSmokeTestCase { @Override protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 98c3eaad11dce..46874d7104fc4 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -36,6 +36,7 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.transport.RawIndexingDataTransportRequest; import org.elasticsearch.xcontent.XContentType; @@ -257,14 +258,19 @@ public BulkRequest add(byte[] data, int from, int length, XContentType xContentT * Adds a framed data in binary format */ public BulkRequest add(byte[] data, int from, int length, @Nullable String defaultIndex, XContentType xContentType) throws IOException { - return add(new BytesArray(data, from, length), defaultIndex, xContentType); + return add(new BytesArray(data, from, length), defaultIndex, xContentType, RestBulkAction.BulkFormat.MARKER_SUFFIX); } /** * Adds a framed data in binary format */ - public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, null, null, null, null, null, null, true, xContentType, RestApiVersion.current()); + public BulkRequest add( + BytesReference data, + @Nullable String defaultIndex, + XContentType xContentType, + RestBulkAction.BulkFormat bulkFormat + ) throws IOException { + return add(data, defaultIndex, null, null, null, null, null, null, true, xContentType, bulkFormat, RestApiVersion.current()); } /** @@ -272,7 +278,20 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XCont */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, XContentType xContentType) throws IOException { - return add(data, defaultIndex, null, null, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current()); + return add( + data, + defaultIndex, + null, + null, + null, + null, + null, + null, + allowExplicitIndex, + xContentType, + RestBulkAction.BulkFormat.MARKER_SUFFIX, + RestApiVersion.current() + ); } @@ -287,6 +306,7 @@ public BulkRequest add( @Nullable Boolean defaultListExecutedPipelines, boolean allowExplicitIndex, XContentType xContentType, + RestBulkAction.BulkFormat bulkFormat, RestApiVersion restApiVersion ) throws IOException { String routing = valueOrDefault(defaultRouting, globalRouting); @@ -304,6 +324,7 @@ public BulkRequest add( defaultListExecutedPipelines, allowExplicitIndex, xContentType, + bulkFormat, (indexRequest, type) -> internalAdd(indexRequest), this::internalAdd, this::add diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java index 2f336566953ba..dcc4f00eaf99e 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java @@ -22,6 +22,7 @@ import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xcontent.ParseField; import org.elasticsearch.xcontent.XContent; @@ -111,10 +112,13 @@ private static BytesReference sliceTrimmingCarriageReturn( BytesReference bytesReference, int from, int nextMarker, - XContentType xContentType + XContentType xContentType, + RestBulkAction.BulkFormat bulkFormat ) { final int length; - if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') { + if (RestBulkAction.BulkFormat.MARKER_SUFFIX == bulkFormat + && XContentType.JSON == xContentType + && bytesReference.get(nextMarker - 1) == (byte) '\r') { length = nextMarker - from - 1; } else { length = nextMarker - from; @@ -138,6 +142,7 @@ public void parse( @Nullable Boolean defaultListExecutedPipelines, boolean allowExplicitIndex, XContentType xContentType, + RestBulkAction.BulkFormat bulkFormat, BiConsumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer @@ -152,6 +157,7 @@ public void parse( defaultListExecutedPipelines, allowExplicitIndex, xContentType, + bulkFormat, indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer @@ -170,6 +176,7 @@ public IncrementalParser incrementalParser( @Nullable Boolean defaultListExecutedPipelines, boolean allowExplicitIndex, XContentType xContentType, + RestBulkAction.BulkFormat bulkFormat, BiConsumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer @@ -184,6 +191,7 @@ public IncrementalParser incrementalParser( defaultListExecutedPipelines, allowExplicitIndex, xContentType, + bulkFormat, indexRequestConsumer, updateRequestConsumer, deleteRequestConsumer @@ -207,6 +215,7 @@ public class IncrementalParser { private final boolean allowExplicitIndex; private final XContentType xContentType; + private final RestBulkAction.BulkFormat bulkFormat; private final byte marker; private final BiConsumer indexRequestConsumer; private final Consumer updateRequestConsumer; @@ -232,6 +241,7 @@ private IncrementalParser( @Nullable Boolean defaultListExecutedPipelines, boolean allowExplicitIndex, XContentType xContentType, + RestBulkAction.BulkFormat bulkFormat, BiConsumer indexRequestConsumer, Consumer updateRequestConsumer, Consumer deleteRequestConsumer @@ -245,7 +255,12 @@ private IncrementalParser( this.defaultListExecutedPipelines = defaultListExecutedPipelines; this.allowExplicitIndex = allowExplicitIndex; this.xContentType = xContentType; - this.marker = xContentType.xContent().bulkSeparator(); + this.bulkFormat = bulkFormat; + if (bulkFormat == RestBulkAction.BulkFormat.MARKER_SUFFIX) { + this.marker = xContentType.xContent().bulkSeparator(); + } else { + this.marker = (byte) '0'; // no need of a marker for prefix length + } this.indexRequestConsumer = indexRequestConsumer; this.updateRequestConsumer = updateRequestConsumer; this.deleteRequestConsumer = deleteRequestConsumer; @@ -257,14 +272,19 @@ public int parse(BytesReference data, boolean lastData) throws IOException { throw new IllegalStateException("Parser has already encountered exception", failure); } try { - return tryParse(data, lastData); + if (bulkFormat == RestBulkAction.BulkFormat.PREFIX_LENGTH) { + return tryParseWithPrefixLength(data, lastData); + } else { + assert bulkFormat == RestBulkAction.BulkFormat.MARKER_SUFFIX; + return tryParseWithMarkSuffix(data, lastData); + } } catch (Exception e) { failure = e; throw e; } } - private int tryParse(BytesReference data, boolean lastData) throws IOException { + private int tryParseWithMarkSuffix(BytesReference data, boolean lastData) throws IOException { int from = 0; int consumed = 0; @@ -275,20 +295,7 @@ private int tryParse(BytesReference data, boolean lastData) throws IOException { break; } incrementalFromOffset = nextMarker + 1; - line++; - - if (currentRequest == null) { - if (parseActionLine(data, from, nextMarker)) { - if (currentRequest instanceof DeleteRequest deleteRequest) { - deleteRequestConsumer.accept(deleteRequest); - currentRequest = null; - } - } - } else { - parseAndConsumeDocumentLine(data, from, nextMarker); - currentRequest = null; - } - + processRequest(data, from, nextMarker); from = nextMarker + 1; consumed = from; } @@ -296,6 +303,54 @@ private int tryParse(BytesReference data, boolean lastData) throws IOException { return lastData ? from : consumed; } + private int tryParseWithPrefixLength(BytesReference data, boolean lastData) throws IOException { + int from = 0; + while (true) { + if (from == data.length()) { + break; + } + if (Integer.BYTES > data.length() - from) { + if (lastData) { + throw new IllegalArgumentException( + "Documents in the bulk request must be prefixed with the length of the document" + ); + } + break; + } + final int len = data.getInt(from); + if (len < 0) { + throw new IllegalArgumentException("Documents in the bulk request must be prefixed with the length of the document"); + } + if (len > data.length() - (from + Integer.BYTES)) { + if (lastData) { + throw new IllegalArgumentException( + "Documents in the bulk request must be prefixed with the length of the document" + ); + } + break; + } + from += Integer.BYTES; + processRequest(data, from, from + len); + from += len; + } + return from; + } + + private void processRequest(BytesReference data, int from, int to) throws IOException { + line++; + if (currentRequest == null) { + if (parseActionLine(data, from, to)) { + if (currentRequest instanceof DeleteRequest deleteRequest) { + deleteRequestConsumer.accept(deleteRequest); + currentRequest = null; + } + } + } else { + parseAndConsumeDocumentLine(data, from, to); + currentRequest = null; + } + } + private boolean parseActionLine(BytesReference data, int from, int to) throws IOException { assert currentRequest == null; @@ -526,13 +581,13 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO private void parseAndConsumeDocumentLine(BytesReference data, int from, int to) throws IOException { assert currentRequest != null && currentRequest instanceof DeleteRequest == false; if (currentRequest instanceof IndexRequest indexRequest) { - indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType), xContentType); + indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType, bulkFormat), xContentType); indexRequestConsumer.accept(indexRequest, currentType); } else if (currentRequest instanceof UpdateRequest updateRequest) { try ( XContentParser sliceParser = createParser( xContentType.xContent(), - sliceTrimmingCarriageReturn(data, from, to, xContentType) + sliceTrimmingCarriageReturn(data, from, to, xContentType, bulkFormat) ) ) { updateRequest.fromXContent(sliceParser); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/XContentLengthPrefixedStreamingType.java b/server/src/main/java/org/elasticsearch/action/bulk/XContentLengthPrefixedStreamingType.java new file mode 100644 index 0000000000000..f123559a1e988 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/bulk/XContentLengthPrefixedStreamingType.java @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.xcontent.MediaType; +import org.elasticsearch.xcontent.MediaTypeRegistry; +import org.elasticsearch.xcontent.ParsedMediaType; +import org.elasticsearch.xcontent.XContent; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xcontent.cbor.CborXContent; +import org.elasticsearch.xcontent.smile.SmileXContent; +import org.elasticsearch.xcontent.yaml.YamlXContent; + +import java.util.Map; +import java.util.Set; + +/** + * The streaming format using length prefixed format for bulk requests. + */ +public enum XContentLengthPrefixedStreamingType implements MediaType { + /** + * Json length prefixed type. + */ + JSON(SmileXContent.smileXContent) { + + @Override + public String queryParameter() { + return "stream.json"; + } + + @Override + public Set headerValues() { + return Set.of( + new HeaderValue( + XContentType.VENDOR_APPLICATION_PREFIX + "stream.json", + Map.of(COMPATIBLE_WITH_PARAMETER_NAME, VERSION_PATTERN) + ) + ); + } + }, + /** + * SMILE length prefixed type. + */ + SMILE(SmileXContent.smileXContent) { + + @Override + public String queryParameter() { + return "stream.smile"; + } + + @Override + public Set headerValues() { + return Set.of( + new HeaderValue( + XContentType.VENDOR_APPLICATION_PREFIX + "stream.smile", + Map.of(COMPATIBLE_WITH_PARAMETER_NAME, VERSION_PATTERN) + ) + ); + } + }, + /** + * YAML length prefixed type. + */ + YAML(YamlXContent.yamlXContent) { + + @Override + public String queryParameter() { + return "stream.yaml"; + } + + @Override + public Set headerValues() { + return Set.of( + new HeaderValue( + XContentType.VENDOR_APPLICATION_PREFIX + "stream.yaml", + Map.of(COMPATIBLE_WITH_PARAMETER_NAME, VERSION_PATTERN) + ) + ); + } + }, + /** + * CBOR length prefixed type. + */ + CBOR(CborXContent.cborXContent) { + + @Override + public String queryParameter() { + return "stream.cbor"; + } + + @Override + public Set headerValues() { + return Set.of( + new HeaderValue( + XContentType.VENDOR_APPLICATION_PREFIX + "stream.cbor", + Map.of(COMPATIBLE_WITH_PARAMETER_NAME, VERSION_PATTERN) + ) + ); + } + }; + + private static final MediaTypeRegistry MEDIA_TYPE_REGISTRY = new MediaTypeRegistry< + XContentLengthPrefixedStreamingType>().register(XContentLengthPrefixedStreamingType.values()); + + /** + * Parses the given media type header value and returns the corresponding {@link XContentLengthPrefixedStreamingType}, + * or null if there is no match. + */ + public static XContentLengthPrefixedStreamingType fromMediaType(String mediaTypeHeaderValue) throws IllegalArgumentException { + ParsedMediaType parsedMediaType = ParsedMediaType.parseMediaType(mediaTypeHeaderValue); + if (parsedMediaType != null) { + return parsedMediaType.toMediaType(MEDIA_TYPE_REGISTRY); + } + return null; + } + + private final XContentType xContentType; + + XContentLengthPrefixedStreamingType(XContent xContent) { + this.xContentType = xContent.type(); + } + + public final XContentType xContentType() { + return xContentType; + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/RestHandler.java b/server/src/main/java/org/elasticsearch/rest/RestHandler.java index ed8263e7e83e2..73159f3f1797b 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestHandler.java +++ b/server/src/main/java/org/elasticsearch/rest/RestHandler.java @@ -107,7 +107,7 @@ default boolean allowSystemIndexAccessByDefault() { } default boolean mediaTypesValid(RestRequest request) { - return request.getXContentType() != null; + return request.hasLengthPrefixedStreamingContent() == false && request.getXContentType() != null; } default String getName() { diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 92e83fb9701a3..b6daed2e113de 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -12,6 +12,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.bulk.XContentLengthPrefixedStreamingType; import org.elasticsearch.common.Strings; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.bytes.BytesArray; @@ -92,6 +93,7 @@ public class RestRequest implements ToXContent.Params, Traceable { private final String rawPath; private final Set consumedParams = new HashSet<>(); private final SetOnce xContentType = new SetOnce<>(); + private final SetOnce xContentLengthPrefixedStreamingType = new SetOnce<>(); private final HttpChannel httpChannel; private final ParsedMediaType parsedAccept; private final ParsedMediaType parsedContentType; @@ -136,7 +138,14 @@ private RestRequest( try { this.parsedContentType = parseHeaderWithMediaType(httpRequest.getHeaders(), "Content-Type"); if (parsedContentType != null) { - this.xContentType.set(parsedContentType.toMediaType(XContentType.MEDIA_TYPE_REGISTRY)); + this.xContentLengthPrefixedStreamingType.set( + XContentLengthPrefixedStreamingType.fromMediaType(parsedContentType.mediaTypeWithoutParameters()) + ); + if (this.xContentLengthPrefixedStreamingType.get() == null) { + this.xContentType.set(parsedContentType.toMediaType(XContentType.MEDIA_TYPE_REGISTRY)); + } else { + this.xContentType.set(xContentLengthPrefixedStreamingType.get().xContentType()); + } } } catch (IllegalArgumentException e) { throw new MediaTypeHeaderException(e, "Content-Type"); @@ -332,11 +341,15 @@ public HttpBody.Stream contentStream() { public void ensureContent() { if (hasContent() == false) { throw new ElasticsearchParseException("request body is required"); - } else if (xContentType.get() == null) { + } else if (xContentType.get() == null && hasLengthPrefixedStreamingContent() == false) { throwValidationException("unknown content type"); } } + public boolean hasLengthPrefixedStreamingContent() { + return xContentLengthPrefixedStreamingType.get() != null; + } + /** * Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing. * See {@link #content()}. diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index e381b1c207072..3c9bde4cd3b42 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -63,6 +63,13 @@ public class RestBulkAction extends BaseRestHandler { public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated."; public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status"; + public static final String PREFIX_LENGTH_FORMAT_CAPABILITY = "prefix_length_format"; + + public enum BulkFormat { + PREFIX_LENGTH, + MARKER_SUFFIX + } + private final boolean allowExplicitIndex; private final IncrementalBulkService bulkHandler; private final IncrementalBulkService.Enabled incrementalEnabled; @@ -71,7 +78,7 @@ public class RestBulkAction extends BaseRestHandler { public RestBulkAction(Settings settings, ClusterSettings clusterSettings, IncrementalBulkService bulkHandler) { this.allowExplicitIndex = MULTI_ALLOW_EXPLICIT_INDEX.get(settings); this.bulkHandler = bulkHandler; - this.capabilities = Set.of(FAILURE_STORE_STATUS_CAPABILITY); + this.capabilities = Set.of(FAILURE_STORE_STATUS_CAPABILITY, PREFIX_LENGTH_FORMAT_CAPABILITY); this.incrementalEnabled = new IncrementalBulkService.Enabled(clusterSettings); } @@ -128,6 +135,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC defaultListExecutedPipelines, allowExplicitIndex, request.getXContentType(), + request.hasLengthPrefixedStreamingContent() ? BulkFormat.PREFIX_LENGTH : BulkFormat.MARKER_SUFFIX, request.getRestApiVersion() ); } catch (Exception e) { @@ -189,6 +197,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer { request.paramAsBoolean("list_executed_pipelines", false), allowExplicitIndex, request.getXContentType(), + request.hasLengthPrefixedStreamingContent() ? BulkFormat.PREFIX_LENGTH : BulkFormat.MARKER_SUFFIX, (indexRequest, type) -> items.add(indexRequest), items::add, items::add @@ -310,6 +319,7 @@ public Set supportedCapabilities() { @Override public boolean mediaTypesValid(RestRequest request) { - return super.mediaTypesValid(request) && XContentType.supportsDelimitedBulkRequests(request.getXContentType()); + return (super.mediaTypesValid(request) && XContentType.supportsDelimitedBulkRequests(request.getXContentType())) + || request.hasLengthPrefixedStreamingContent(); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java index 32296da94520a..5eef916e5c1e2 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -105,6 +106,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC true, true, request.getXContentType(), + RestBulkAction.BulkFormat.MARKER_SUFFIX, request.getRestApiVersion() ); return channel -> client.execute(SimulateBulkAction.INSTANCE, bulkRequest, new SimulateIngestRestToXContentListener(channel)); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserLengthPrefixedTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserLengthPrefixedTests.java new file mode 100644 index 0000000000000..13004cba8b47b --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserLengthPrefixedTests.java @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.bulk; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.rest.action.document.RestBulkAction; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.List; + +public class BulkRequestParserLengthPrefixedTests extends BulkRequestParserTestCase { + + @ParametersFactory + public static Iterable parameters() { + return List.of( + new Object[] { XContentType.JSON }, + new Object[] { XContentType.SMILE }, + new Object[] { XContentType.CBOR }, + new Object[] { XContentType.YAML } + ); + } + + private final XContentType contentType; + + public BulkRequestParserLengthPrefixedTests(XContentType contentType) { + this.contentType = contentType; + } + + protected XContentType contentType() { + return contentType; + } + + protected RestBulkAction.BulkFormat bulkFormat() { + return RestBulkAction.BulkFormat.PREFIX_LENGTH; + } + + public void testHandleWrongLength() throws IOException { + BytesArray doc1 = convertToFormat(new BytesArray(" { \"index\":{ \"require_alias\": false } }")); + BytesArray doc2 = convertToFormat(new BytesArray(" { \"field\": \"value\" }")); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + out.writeInt(doc1.length()); + out.write(doc1.array(), doc1.arrayOffset(), doc1.length()); + out.writeInt(randomValueOtherThanMany(i -> i == doc2.length(), ESTestCase::randomInt)); + out.write(doc2.array(), doc2.arrayOffset(), doc2.length()); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); + expectThrows( + IllegalArgumentException.class, + () -> parser.parse( + out.bytes(), + null, + null, + null, + null, + null, + null, + null, + false, + contentType(), + bulkFormat(), + (r, t) -> fail(), + req -> fail(), + req -> fail() + ) + ); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserMarkerSuffixTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserMarkerSuffixTests.java new file mode 100644 index 0000000000000..5fa2da2859656 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserMarkerSuffixTests.java @@ -0,0 +1,137 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.bulk; + +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.rest.action.document.RestBulkAction; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; + +public class BulkRequestParserMarkerSuffixTests extends BulkRequestParserTestCase { + + @Override + protected XContentType contentType() { + return XContentType.JSON; + } + + @Override + protected RestBulkAction.BulkFormat bulkFormat() { + return RestBulkAction.BulkFormat.MARKER_SUFFIX; + } + + public void testFailMissingCloseBrace() { + BytesArray request = new BytesArray(""" + { "index":{ } + {} + """); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, randomFrom(REST_API_VERSIONS_POST_V8)); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> parser.parse( + request, + null, + null, + null, + null, + null, + null, + null, + false, + contentType(), + RestBulkAction.BulkFormat.MARKER_SUFFIX, + (req, type) -> fail("expected failure before we got this far"), + req -> fail("expected failure before we got this far"), + req -> fail("expected failure before we got this far") + ) + ); + assertEquals("[1:14] Unexpected end of file", ex.getMessage()); + } + + public void testBarfOnLackOfTrailingNewline() throws IOException { + BytesArray request = new BytesArray(""" + { "index":{ "_id": "bar" } } + {}"""); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); + IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> parser.parse( + request, + "foo", + null, + null, + null, + null, + null, + null, + false, + contentType(), + RestBulkAction.BulkFormat.MARKER_SUFFIX, + (req, type) -> fail(), + req -> fail(), + req -> fail() + ) + ); + assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage()); + + BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser( + "foo", + null, + null, + null, + null, + null, + null, + false, + contentType(), + bulkFormat(), + (req, type) -> {}, + req -> {}, + req -> {} + ); + + // Should not throw because not last + incrementalParser.parse(request, false); + + IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, true)); + assertEquals("The bulk request must be terminated by a newline [\\n]", e2.getMessage()); + } + + public void testFailContentAfterClosingBrace() { + BytesArray request = new BytesArray(""" + { "index":{ } } { "something": "unexpected" } + {} + """); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, randomFrom(REST_API_VERSIONS_POST_V8)); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> parser.parse( + request, + null, + null, + null, + null, + null, + null, + null, + false, + contentType(), + RestBulkAction.BulkFormat.MARKER_SUFFIX, + (req, type) -> fail("expected failure before we got this far"), + req -> fail("expected failure before we got this far"), + req -> fail("expected failure before we got this far") + ) + ); + assertEquals("Malformed action/metadata line [1], unexpected data after the closing brace", ex.getMessage()); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTestCase.java similarity index 63% rename from server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java rename to server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTestCase.java index de2dc8f9ea0b5..3e644d340fccf 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTestCase.java @@ -12,9 +12,15 @@ import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.core.UpdateForV10; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; import org.hamcrest.Matchers; @@ -26,18 +32,47 @@ import static org.hamcrest.Matchers.equalTo; -public class BulkRequestParserTests extends ESTestCase { +public abstract class BulkRequestParserTestCase extends ESTestCase { @UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT) // Replace with just RestApiVersion.values() when V8 no longer exists public static final List REST_API_VERSIONS_POST_V8 = Stream.of(RestApiVersion.values()) .filter(v -> v.matches(RestApiVersion.onOrAfter(RestApiVersion.V_9))) .toList(); - public void testParserCannotBeReusedAfterFailure() { - BytesArray request = new BytesArray(""" - { "index":{ }, "something": "unexpected" } - {} - """); + protected abstract XContentType contentType(); + + protected abstract RestBulkAction.BulkFormat bulkFormat(); + + protected BytesArray buildBulk(List docs) throws IOException { + try (BytesStreamOutput out = new BytesStreamOutput()) { + for (String doc : docs) { + BytesArray convertedDoc = convertToFormat(new BytesArray(doc)); + if (bulkFormat() == RestBulkAction.BulkFormat.PREFIX_LENGTH) { + out.writeInt(convertedDoc.length()); + } + out.write(convertedDoc.array(), convertedDoc.arrayOffset(), convertedDoc.length()); + if (bulkFormat() == RestBulkAction.BulkFormat.MARKER_SUFFIX) { + out.write(contentType().xContent().bulkSeparator()); + } + } + return new BytesArray(out.bytes().toBytesRef()); + } + + } + + protected BytesArray convertToFormat(BytesArray array) throws IOException { + XContentParser parser = XContentType.JSON.xContent() + .createParser(XContentParserConfiguration.EMPTY, array.array(), 0, array.length()); + try (BytesStreamOutput out = new BytesStreamOutput()) { + try (XContentBuilder builder = XContentFactory.contentBuilder(contentType(), out)) { + builder.copyCurrentStructure(parser); + } + return new BytesArray(out.bytes().toBytesRef()); + } + } + + public void testParserCannotBeReusedAfterFailure() throws IOException { + BytesArray request = buildBulk(List.of("{ \"index\":{ }, \"something\": \"unexpected\" }", "{}")); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser( @@ -49,7 +84,8 @@ public void testParserCannotBeReusedAfterFailure() { null, null, false, - XContentType.JSON, + contentType(), + bulkFormat(), (req, type) -> fail("expected failure before we got this far"), req -> fail("expected failure before we got this far"), req -> fail("expected failure before we got this far") @@ -58,10 +94,7 @@ public void testParserCannotBeReusedAfterFailure() { IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, false)); assertEquals("Malformed action/metadata line [1], expected END_OBJECT but found [FIELD_NAME]", ex.getMessage()); - BytesArray valid = new BytesArray(""" - { "index":{ "_id": "bar" } } - {} - """); + BytesArray valid = buildBulk(List.of("{ \"index\":{ \"_id\": \"bar\" }}", "{}")); expectThrows(AssertionError.class, () -> incrementalParser.parse(valid, false)); } @@ -80,24 +113,32 @@ public void testIncrementalParsing() throws IOException { null, null, false, - XContentType.JSON, + contentType(), + bulkFormat(), (r, t) -> indexRequests.add(r), updateRequests::add, deleteRequests::add ); - BytesArray request = new BytesArray(""" - { "index":{ "_id": "bar", "pipeline": "foo" } } + BytesArray request = buildBulk(List.of("{ \"index\":{ \"_id\": \"bar\", \"pipeline\": \"foo\" } }", """ { "field": "value"} + """, """ { "index":{ "require_alias": false } } + """, """ { "field": "value" } + """, """ { "update":{ "_id": "bus", "require_alias": true } } + """, """ { "doc": {"field": "value" }} + """, """ { "delete":{ "_id": "baz" } } + """, """ { "index": { } } + """, """ { "field": "value"} + """, """ { "delete":{ "_id": "bop" } } - """); + """)); int consumed = 0; for (int i = 0; i < request.length() - 1; ++i) { @@ -112,13 +153,13 @@ public void testIncrementalParsing() throws IOException { } public void testIndexRequest() throws IOException { - BytesArray request = new BytesArray(""" + BytesArray request = buildBulk(List.of(""" { "index":{ "_id": "bar" } } - {} - """); + """, "{}")); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { + parser.parse(request, "foo", null, null, null, null, null, null, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertFalse(parsed.get()); assertEquals("foo", indexRequest.index()); assertEquals("bar", indexRequest.id()); @@ -127,31 +168,30 @@ public void testIndexRequest() throws IOException { }, req -> fail(), req -> fail()); assertTrue(parsed.get()); - parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (indexRequest, type) -> { + parser.parse(request, "foo", null, null, null, true, null, null, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail()); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "index":{ "_id": "bar", "require_alias": true } } - {} - """); - parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { + """, "{}")); + + parser.parse(request, "foo", null, null, null, null, null, null, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertTrue(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail()); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "index":{ "_id": "bar", "require_alias": false } } - {} - """); - parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (indexRequest, type) -> { + """, " {}")); + parser.parse(request, "foo", null, null, null, true, null, null, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertFalse(indexRequest.isRequireAlias()); }, req -> fail(), req -> fail()); } public void testDeleteRequest() throws IOException { - BytesArray request = new BytesArray(""" + BytesArray request = buildBulk(List.of(""" { "delete":{ "_id": "bar" } } - """); + """)); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); final AtomicBoolean parsed = new AtomicBoolean(); parser.parse( @@ -164,7 +204,8 @@ public void testDeleteRequest() throws IOException { null, null, false, - XContentType.JSON, + contentType(), + bulkFormat(), (req, type) -> fail(), req -> fail(), deleteRequest -> { @@ -178,94 +219,110 @@ public void testDeleteRequest() throws IOException { } public void testUpdateRequest() throws IOException { - BytesArray request = new BytesArray(""" + BytesArray request = buildBulk(List.of(""" { "update":{ "_id": "bar" } } - {} - """); + """, "{}")); + BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> { - assertFalse(parsed.get()); - assertEquals("foo", updateRequest.index()); - assertEquals("bar", updateRequest.id()); - assertFalse(updateRequest.isRequireAlias()); - parsed.set(true); - }, req -> fail()); + parser.parse( + request, + "foo", + null, + null, + null, + null, + null, + null, + false, + contentType(), + bulkFormat(), + (req, type) -> fail(), + updateRequest -> { + assertFalse(parsed.get()); + assertEquals("foo", updateRequest.index()); + assertEquals("bar", updateRequest.id()); + assertFalse(updateRequest.isRequireAlias()); + parsed.set(true); + }, + req -> fail() + ); assertTrue(parsed.get()); - parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> { - assertTrue(updateRequest.isRequireAlias()); - }, req -> fail()); + parser.parse( + request, + "foo", + null, + null, + null, + true, + null, + null, + false, + contentType(), + bulkFormat(), + (req, type) -> fail(), + updateRequest -> { + assertTrue(updateRequest.isRequireAlias()); + }, + req -> fail() + ); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "update":{ "_id": "bar", "require_alias": true } } + """, """ {} - """); - parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> { - assertTrue(updateRequest.isRequireAlias()); - }, req -> fail()); + """)); + parser.parse( + request, + "foo", + null, + null, + null, + null, + null, + null, + false, + contentType(), + bulkFormat(), + (req, type) -> fail(), + updateRequest -> { + assertTrue(updateRequest.isRequireAlias()); + }, + req -> fail() + ); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "update":{ "_id": "bar", "require_alias": false } } + """, """ {} - """); - parser.parse(request, "foo", null, null, null, true, null, null, false, XContentType.JSON, (req, type) -> fail(), updateRequest -> { - assertFalse(updateRequest.isRequireAlias()); - }, req -> fail()); - } - - public void testBarfOnLackOfTrailingNewline() throws IOException { - BytesArray request = new BytesArray(""" - { "index":{ "_id": "bar" } } - {}"""); - BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> parser.parse( - request, - "foo", - null, - null, - null, - null, - null, - null, - false, - XContentType.JSON, - (req, type) -> fail(), - req -> fail(), - req -> fail() - ) - ); - assertEquals("The bulk request must be terminated by a newline [\\n]", e.getMessage()); - - BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser( + """)); + parser.parse( + request, "foo", null, null, null, - null, + true, null, null, false, - XContentType.JSON, - (req, type) -> {}, - req -> {}, - req -> {} + contentType(), + bulkFormat(), + (req, type) -> fail(), + updateRequest -> { + assertFalse(updateRequest.isRequireAlias()); + }, + req -> fail() ); - - // Should not throw because not last - incrementalParser.parse(request, false); - - IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, true)); - assertEquals("The bulk request must be terminated by a newline [\\n]", e2.getMessage()); } - public void testFailOnExplicitIndex() { - BytesArray request = new BytesArray(""" + public void testFailOnExplicitIndex() throws IOException { + BytesArray request = buildBulk(List.of(""" { "index":{ "_index": "foo", "_id": "bar" } } + """, """ {} - """); + """)); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); IllegalArgumentException ex = expectThrows( @@ -280,7 +337,8 @@ public void testFailOnExplicitIndex() { null, null, false, - XContentType.JSON, + contentType(), + bulkFormat(), (req, type) -> fail(), req -> fail(), req -> fail() @@ -290,13 +348,14 @@ public void testFailOnExplicitIndex() { } public void testTypesStillParsedForBulkMonitoring() throws IOException { - BytesArray request = new BytesArray(""" + BytesArray request = buildBulk(List.of(""" { "index":{ "_type": "quux", "_id": "bar" } } + """, """ {} - """); + """)); BulkRequestParser parser = new BulkRequestParser(false, true, RestApiVersion.current()); final AtomicBoolean parsed = new AtomicBoolean(); - parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { + parser.parse(request, "foo", null, null, null, null, null, null, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertFalse(parsed.get()); assertEquals("foo", indexRequest.index()); assertEquals("bar", indexRequest.id()); @@ -307,12 +366,11 @@ public void testTypesStillParsedForBulkMonitoring() throws IOException { } public void testParseDeduplicatesParameterStrings() throws IOException { - BytesArray request = new BytesArray(""" + BytesArray request = buildBulk(List.of(""" { "index":{ "_index": "bar", "pipeline": "foo", "routing": "blub"} } - {} + """, "{}", """ { "index":{ "_index": "bar", "pipeline": "foo", "routing": "blub" } } - {} - """); + """, "{}")); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); final List indexRequests = new ArrayList<>(); parser.parse( @@ -325,7 +383,8 @@ public void testParseDeduplicatesParameterStrings() throws IOException { null, null, true, - XContentType.JSON, + contentType(), + bulkFormat(), (indexRequest, type) -> indexRequests.add(indexRequest), req -> fail(), req -> fail() @@ -338,11 +397,12 @@ public void testParseDeduplicatesParameterStrings() throws IOException { assertSame(first.routing(), second.routing()); } - public void testFailOnInvalidAction() { - BytesArray request = new BytesArray(""" + public void testFailOnInvalidAction() throws IOException { + BytesArray request = buildBulk(List.of(""" { "invalidaction":{ } } + """, """ {} - """); + """)); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, randomFrom(RestApiVersion.values())); IllegalArgumentException ex = expectThrows( @@ -357,7 +417,8 @@ public void testFailOnInvalidAction() { null, null, false, - XContentType.JSON, + contentType(), + bulkFormat(), (req, type) -> fail(), req -> fail(), req -> fail() @@ -369,39 +430,12 @@ public void testFailOnInvalidAction() { ); } - public void testFailMissingCloseBrace() { - BytesArray request = new BytesArray(""" - { "index":{ } - {} - """); - BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, randomFrom(REST_API_VERSIONS_POST_V8)); - - IllegalArgumentException ex = expectThrows( - IllegalArgumentException.class, - () -> parser.parse( - request, - null, - null, - null, - null, - null, - null, - null, - false, - XContentType.JSON, - (req, type) -> fail("expected failure before we got this far"), - req -> fail("expected failure before we got this far"), - req -> fail("expected failure before we got this far") - ) - ); - assertEquals("[1:14] Unexpected end of file", ex.getMessage()); - } - - public void testFailExtraKeys() { - BytesArray request = new BytesArray(""" + public void testFailExtraKeys() throws IOException { + BytesArray request = buildBulk(List.of(""" { "index":{ }, "something": "unexpected" } + """, """ {} - """); + """)); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, randomFrom(REST_API_VERSIONS_POST_V8)); IllegalArgumentException ex = expectThrows( @@ -416,7 +450,8 @@ public void testFailExtraKeys() { null, null, false, - XContentType.JSON, + contentType(), + bulkFormat(), (req, type) -> fail("expected failure before we got this far"), req -> fail("expected failure before we got this far"), req -> fail("expected failure before we got this far") @@ -425,79 +460,55 @@ public void testFailExtraKeys() { assertEquals("Malformed action/metadata line [1], expected END_OBJECT but found [FIELD_NAME]", ex.getMessage()); } - public void testFailContentAfterClosingBrace() { - BytesArray request = new BytesArray(""" - { "index":{ } } { "something": "unexpected" } - {} - """); - BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, randomFrom(REST_API_VERSIONS_POST_V8)); - - IllegalArgumentException ex = expectThrows( - IllegalArgumentException.class, - () -> parser.parse( - request, - null, - null, - null, - null, - null, - null, - null, - false, - XContentType.JSON, - (req, type) -> fail("expected failure before we got this far"), - req -> fail("expected failure before we got this far"), - req -> fail("expected failure before we got this far") - ) - ); - assertEquals("Malformed action/metadata line [1], unexpected data after the closing brace", ex.getMessage()); - } - public void testListExecutedPipelines() throws IOException { - BytesArray request = new BytesArray(""" + BytesArray request = buildBulk(List.of(""" { "index":{ "_id": "bar" } } + """, """ {} - """); + """)); BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current()); - parser.parse(request, "foo", null, null, null, null, null, null, false, XContentType.JSON, (indexRequest, type) -> { + parser.parse(request, "foo", null, null, null, null, null, null, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertFalse(indexRequest.getListExecutedPipelines()); }, req -> fail(), req -> fail()); - parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> { + parser.parse(request, "foo", null, null, null, null, null, true, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertTrue(indexRequest.getListExecutedPipelines()); }, req -> fail(), req -> fail()); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "index":{ "_id": "bar", "op_type": "create" } } + """, """ {} - """); - parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> { + """)); + parser.parse(request, "foo", null, null, null, null, null, true, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertTrue(indexRequest.getListExecutedPipelines()); }, req -> fail(), req -> fail()); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "create":{ "_id": "bar" } } + """, """ {} - """); - parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> { + """)); + parser.parse(request, "foo", null, null, null, null, null, true, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertTrue(indexRequest.getListExecutedPipelines()); }, req -> fail(), req -> fail()); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "index":{ "_id": "bar", "list_executed_pipelines": "true" } } + """, """ {} - """); - parser.parse(request, "foo", null, null, null, null, null, false, false, XContentType.JSON, (indexRequest, type) -> { + """)); + parser.parse(request, "foo", null, null, null, null, null, false, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertTrue(indexRequest.getListExecutedPipelines()); }, req -> fail(), req -> fail()); - request = new BytesArray(""" + request = buildBulk(List.of(""" { "index":{ "_id": "bar", "list_executed_pipelines": "false" } } + """, """ {} - """); - parser.parse(request, "foo", null, null, null, null, null, true, false, XContentType.JSON, (indexRequest, type) -> { + """)); + parser.parse(request, "foo", null, null, null, null, null, true, false, contentType(), bulkFormat(), (indexRequest, type) -> { assertFalse(indexRequest.getListExecutedPipelines()); }, req -> fail(), req -> fail()); } - } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java index b36b3af1ddb86..7609f1ba23b84 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRequestTests.java @@ -21,6 +21,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.script.Script; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentBuilder; @@ -303,7 +304,7 @@ public void testSmileIsSupported() throws IOException { } BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(data, null, xContentType); + bulkRequest.add(data, null, xContentType, RestBulkAction.BulkFormat.MARKER_SUFFIX); assertEquals(1, bulkRequest.requests().size()); DocWriteRequest docWriteRequest = bulkRequest.requests().get(0); assertEquals(DocWriteRequest.OpType.INDEX, docWriteRequest.opType()); @@ -344,7 +345,7 @@ public void testToValidateUpsertRequestAndCASInBulkRequest() throws IOException data = out.bytes(); } BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.add(data, null, xContentType); + bulkRequest.add(data, null, xContentType, RestBulkAction.BulkFormat.MARKER_SUFFIX); assertThat(bulkRequest.validate().validationErrors(), contains("upsert requests don't support `if_seq_no` and `if_primary_term`")); } @@ -380,7 +381,7 @@ public void testDynamicTemplates() throws Exception { { "index" : {"dynamic_templates":{}}} { "field1" : "value3" } """); - BulkRequest bulkRequest = new BulkRequest().add(data, null, XContentType.JSON); + BulkRequest bulkRequest = new BulkRequest().add(data, null, XContentType.JSON, RestBulkAction.BulkFormat.MARKER_SUFFIX); assertThat(bulkRequest.requests, hasSize(5)); assertThat(((IndexRequest) bulkRequest.requests.get(0)).getDynamicTemplates(), equalTo(Map.of("baz", "t1", "foo.bar", "t2"))); assertThat(((IndexRequest) bulkRequest.requests.get(2)).getDynamicTemplates(), equalTo(Map.of("bar", "t1"))); @@ -394,7 +395,7 @@ public void testInvalidDynamicTemplates() { """); IllegalArgumentException error = expectThrows( IllegalArgumentException.class, - () -> new BulkRequest().add(deleteWithDynamicTemplates, null, XContentType.JSON) + () -> new BulkRequest().add(deleteWithDynamicTemplates, null, XContentType.JSON, RestBulkAction.BulkFormat.MARKER_SUFFIX) ); assertThat(error.getMessage(), equalTo("Delete request in line [1] does not accept dynamic_templates")); @@ -404,7 +405,7 @@ public void testInvalidDynamicTemplates() { """); error = expectThrows( IllegalArgumentException.class, - () -> new BulkRequest().add(updateWithDynamicTemplates, null, XContentType.JSON) + () -> new BulkRequest().add(updateWithDynamicTemplates, null, XContentType.JSON, RestBulkAction.BulkFormat.MARKER_SUFFIX) ); assertThat(error.getMessage(), equalTo("Update request in line [1] does not accept dynamic_templates")); @@ -412,7 +413,10 @@ public void testInvalidDynamicTemplates() { { "index":{"_index":"test","dynamic_templates":[]} { "field1" : "value1" } """); - error = expectThrows(IllegalArgumentException.class, () -> new BulkRequest().add(invalidDynamicTemplates, null, XContentType.JSON)); + error = expectThrows( + IllegalArgumentException.class, + () -> new BulkRequest().add(invalidDynamicTemplates, null, XContentType.JSON, RestBulkAction.BulkFormat.MARKER_SUFFIX) + ); assertThat( error.getMessage(), equalTo( diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java index 4b65bc3350080..469e93d3b3c0d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/monitoring/action/MonitoringBulkRequest.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.RestApiVersion; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; @@ -94,6 +95,7 @@ public MonitoringBulkRequest add( null, true, xContentType, + RestBulkAction.BulkFormat.MARKER_SUFFIX, (indexRequest, type) -> { // we no longer accept non-timestamped indexes from Kibana, LS, or Beats because we do not use the data // and it was duplicated anyway; by simply dropping it, we allow BWC for older clients that still send it diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java index 56b0483e07c78..7c55c5c461629 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/IndexingStateProcessor.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; @@ -142,7 +143,7 @@ void findAppropriateIndexOrAliasAndPersist(BytesReference bytes) throws IOExcept void persist(String indexOrAlias, BytesReference bytes) throws IOException { BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .requireAlias(AnomalyDetectorsIndex.jobStateIndexWriteAlias().equals(indexOrAlias)); - bulkRequest.add(bytes, indexOrAlias, XContentType.JSON); + bulkRequest.add(bytes, indexOrAlias, XContentType.JSON, RestBulkAction.BulkFormat.MARKER_SUFFIX); if (bulkRequest.numberOfActions() > 0) { LOGGER.trace("[{}] Persisting job state document: index [{}], length [{}]", jobId, indexOrAlias, bytes.length()); try { diff --git a/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java b/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java index 5250a1f764e5c..f1fac68bc4840 100644 --- a/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java +++ b/x-pack/plugin/monitoring/src/internalClusterTest/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporterIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.license.TestUtils; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.http.MockRequest; @@ -882,7 +883,8 @@ private void assertBulkRequest(String requestBody, int numberOfActions) throws E BulkRequest bulkRequest = new BulkRequest().add( new BytesArray(requestBody.getBytes(StandardCharsets.UTF_8)), null, - XContentType.JSON + XContentType.JSON, + RestBulkAction.BulkFormat.MARKER_SUFFIX ); assertThat(bulkRequest.numberOfActions(), equalTo(numberOfActions)); for (DocWriteRequest actionRequest : bulkRequest.requests()) { diff --git a/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java b/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java index 51c9e35c95a3d..60731ad5a9215 100644 --- a/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java +++ b/x-pack/qa/runtime-fields/src/main/java/org/elasticsearch/xpack/runtimefields/test/CoreTestTranslater.java @@ -17,6 +17,7 @@ import org.elasticsearch.index.mapper.IpFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.NumberFieldMapper.NumberType; +import org.elasticsearch.rest.action.document.RestBulkAction; import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; import org.elasticsearch.test.rest.yaml.ClientYamlTestExecutionContext; import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse; @@ -377,6 +378,7 @@ private boolean handleBulk(ApiCallSection bulk) { null, true, XContentType.JSON, + RestBulkAction.BulkFormat.MARKER_SUFFIX, (index, type) -> indexRequests.add(index), u -> {}, d -> {}