Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -75,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
defaultListExecutedPipelines,
true,
request.getXContentType(),
RestBulkAction.BulkFormat.MARKER_SUFFIX,
request.getRestApiVersion()
);

Expand Down
6 changes: 6 additions & 0 deletions docs/changelog/135506.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 135506
summary: "[Draft] Introduce a bulk format that uses a prefix length"
area: Store
type: enhancement
issues:
- 94319
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.http.entity.ContentType;
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -31,7 +32,9 @@
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
public class BulkRestIT extends HttpSmokeTestCase {
public class BulkRestMarkerSuffixIT extends HttpSmokeTestCase {

private final RequestOptions options = RequestOptions.DEFAULT.toBuilder().addHeader("X-Bulk-Format", "marker-suffix").build();

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
Expand All @@ -53,6 +56,9 @@ public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOExcept

public void testBulkMissingBody() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
if (randomBoolean()) {
request.setOptions(options);
}
request.setJsonEntity("");
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
Expand All @@ -61,6 +67,9 @@ public void testBulkMissingBody() throws IOException {

public void testBulkInvalidIndexNameString() throws IOException {
Request request = new Request("POST", "/_bulk");
if (randomBoolean()) {
request.setOptions(options);
}

byte[] bytes1 = "{\"create\":{\"_index\":\"".getBytes(StandardCharsets.UTF_8);
byte[] bytes2 = new byte[] { (byte) 0xfe, (byte) 0xfe, (byte) 0xff, (byte) 0xff };
Expand All @@ -80,6 +89,9 @@ public void testBulkInvalidIndexNameString() throws IOException {

public void testBulkRequestBodyImproperlyTerminated() throws IOException {
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
if (randomBoolean()) {
request.setOptions(options);
}
// missing final line of the bulk body. cannot process
request.setJsonEntity(
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
Expand Down Expand Up @@ -107,7 +119,9 @@ public void testBulkRequest() throws IOException {
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");

if (randomBoolean()) {
firstBulkRequest.setOptions(options);
}
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
Expand Down Expand Up @@ -138,6 +152,9 @@ public void testBulkWithIncrementalDisabled() throws IOException {
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
if (randomBoolean()) {
firstBulkRequest.setOptions(options);
}

String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
+ "{\"field\":1}\n"
Expand Down Expand Up @@ -178,6 +195,9 @@ public void testMalformedActionLineBulk() throws IOException {
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));

Request bulkRequest = new Request("POST", "/index_name/_bulk");
if (randomBoolean()) {
bulkRequest.setOptions(options);
}

final StringBuilder bulk = new StringBuilder();
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
Expand All @@ -191,9 +211,11 @@ public void testMalformedActionLineBulk() throws IOException {
}

@SuppressWarnings("unchecked")
private static void sendLargeBulk() throws IOException {
private void sendLargeBulk() throws IOException {
Request bulkRequest = new Request("POST", "/index_name/_bulk");

if (randomBoolean()) {
bulkRequest.setOptions(options);
}
final StringBuilder bulk = new StringBuilder();
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
int updates = 0;
Expand Down
Loading