Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
Expand Down Expand Up @@ -75,6 +76,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
defaultListExecutedPipelines,
true,
request.getXContentType(),
RestBulkAction.BulkFormat.MARKER_SUFFIX,
request.getRestApiVersion()
);

Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/135506.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 135506
summary: "[Draft] Introduce a bulk format that uses a prefix length"
area: Store
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.transport.RawIndexingDataTransportRequest;
import org.elasticsearch.xcontent.XContentType;
Expand Down Expand Up @@ -257,22 +258,40 @@ public BulkRequest add(byte[] data, int from, int length, XContentType xContentT
* Adds a framed data in binary format
*/
public BulkRequest add(byte[] data, int from, int length, @Nullable String defaultIndex, XContentType xContentType) throws IOException {
return add(new BytesArray(data, from, length), defaultIndex, xContentType);
return add(new BytesArray(data, from, length), defaultIndex, xContentType, RestBulkAction.BulkFormat.MARKER_SUFFIX);
}

/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, XContentType xContentType) throws IOException {
return add(data, defaultIndex, null, null, null, null, null, null, true, xContentType, RestApiVersion.current());
public BulkRequest add(
BytesReference data,
@Nullable String defaultIndex,
XContentType xContentType,
RestBulkAction.BulkFormat bulkFormat
) throws IOException {
return add(data, defaultIndex, null, null, null, null, null, null, true, xContentType, bulkFormat, RestApiVersion.current());
}

/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, boolean allowExplicitIndex, XContentType xContentType)
throws IOException {
return add(data, defaultIndex, null, null, null, null, null, null, allowExplicitIndex, xContentType, RestApiVersion.current());
return add(
data,
defaultIndex,
null,
null,
null,
null,
null,
null,
allowExplicitIndex,
xContentType,
RestBulkAction.BulkFormat.MARKER_SUFFIX,
RestApiVersion.current()
);

}

Expand All @@ -287,6 +306,7 @@ public BulkRequest add(
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
RestBulkAction.BulkFormat bulkFormat,
RestApiVersion restApiVersion
) throws IOException {
String routing = valueOrDefault(defaultRouting, globalRouting);
Expand All @@ -304,6 +324,7 @@ public BulkRequest add(
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
bulkFormat,
(indexRequest, type) -> internalAdd(indexRequest),
this::internalAdd,
this::add
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContent;
Expand Down Expand Up @@ -111,10 +112,13 @@ private static BytesReference sliceTrimmingCarriageReturn(
BytesReference bytesReference,
int from,
int nextMarker,
XContentType xContentType
XContentType xContentType,
RestBulkAction.BulkFormat bulkFormat
) {
final int length;
if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') {
if (RestBulkAction.BulkFormat.MARKER_SUFFIX == bulkFormat
&& XContentType.JSON == xContentType
&& bytesReference.get(nextMarker - 1) == (byte) '\r') {
length = nextMarker - from - 1;
} else {
length = nextMarker - from;
Expand All @@ -138,6 +142,7 @@ public void parse(
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
RestBulkAction.BulkFormat bulkFormat,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
Expand All @@ -152,6 +157,7 @@ public void parse(
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
bulkFormat,
indexRequestConsumer,
updateRequestConsumer,
deleteRequestConsumer
Expand All @@ -170,6 +176,7 @@ public IncrementalParser incrementalParser(
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
RestBulkAction.BulkFormat bulkFormat,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
Expand All @@ -184,6 +191,7 @@ public IncrementalParser incrementalParser(
defaultListExecutedPipelines,
allowExplicitIndex,
xContentType,
bulkFormat,
indexRequestConsumer,
updateRequestConsumer,
deleteRequestConsumer
Expand All @@ -207,6 +215,7 @@ public class IncrementalParser {
private final boolean allowExplicitIndex;

private final XContentType xContentType;
private final RestBulkAction.BulkFormat bulkFormat;
private final byte marker;
private final BiConsumer<IndexRequest, String> indexRequestConsumer;
private final Consumer<UpdateRequest> updateRequestConsumer;
Expand All @@ -232,6 +241,7 @@ private IncrementalParser(
@Nullable Boolean defaultListExecutedPipelines,
boolean allowExplicitIndex,
XContentType xContentType,
RestBulkAction.BulkFormat bulkFormat,
BiConsumer<IndexRequest, String> indexRequestConsumer,
Consumer<UpdateRequest> updateRequestConsumer,
Consumer<DeleteRequest> deleteRequestConsumer
Expand All @@ -245,7 +255,12 @@ private IncrementalParser(
this.defaultListExecutedPipelines = defaultListExecutedPipelines;
this.allowExplicitIndex = allowExplicitIndex;
this.xContentType = xContentType;
this.marker = xContentType.xContent().bulkSeparator();
this.bulkFormat = bulkFormat;
if (bulkFormat == RestBulkAction.BulkFormat.MARKER_SUFFIX) {
this.marker = xContentType.xContent().bulkSeparator();
} else {
this.marker = (byte) '0'; // no need of a marker for prefix length
}
this.indexRequestConsumer = indexRequestConsumer;
this.updateRequestConsumer = updateRequestConsumer;
this.deleteRequestConsumer = deleteRequestConsumer;
Expand All @@ -257,14 +272,19 @@ public int parse(BytesReference data, boolean lastData) throws IOException {
throw new IllegalStateException("Parser has already encountered exception", failure);
}
try {
return tryParse(data, lastData);
if (bulkFormat == RestBulkAction.BulkFormat.PREFIX_LENGTH) {
return tryParseWithPrefixLength(data, lastData);
} else {
assert bulkFormat == RestBulkAction.BulkFormat.MARKER_SUFFIX;
return tryParseWithMarkSuffix(data, lastData);
}
} catch (Exception e) {
failure = e;
throw e;
}
}

private int tryParse(BytesReference data, boolean lastData) throws IOException {
private int tryParseWithMarkSuffix(BytesReference data, boolean lastData) throws IOException {
int from = 0;
int consumed = 0;

Expand All @@ -275,27 +295,59 @@ private int tryParse(BytesReference data, boolean lastData) throws IOException {
break;
}
incrementalFromOffset = nextMarker + 1;
line++;

if (currentRequest == null) {
if (parseActionLine(data, from, nextMarker)) {
if (currentRequest instanceof DeleteRequest deleteRequest) {
deleteRequestConsumer.accept(deleteRequest);
currentRequest = null;
}
}
} else {
parseAndConsumeDocumentLine(data, from, nextMarker);
currentRequest = null;
}

processRequest(data, from, nextMarker);
from = nextMarker + 1;
consumed = from;
}

return lastData ? from : consumed;
}

private int tryParseWithPrefixLength(BytesReference data, boolean lastData) throws IOException {
int from = 0;
while (true) {
if (from == data.length()) {
break;
}
if (Integer.BYTES > data.length() - from) {
if (lastData) {
throw new IllegalArgumentException(
"Documents in the bulk request must be prefixed with the length of the document"
);
}
break;
}
final int len = data.getInt(from);
if (len > data.length() - (from + Integer.BYTES)) {
if (lastData) {
throw new IllegalArgumentException(
"Documents in the bulk request must be prefixed with the length of the document"
);
}
break;
}
from += Integer.BYTES;
processRequest(data, from, from + len);
from += len;
}
return from;
}

private void processRequest(BytesReference data, int from, int to) throws IOException {
line++;
if (currentRequest == null) {
if (parseActionLine(data, from, to)) {
if (currentRequest instanceof DeleteRequest deleteRequest) {
deleteRequestConsumer.accept(deleteRequest);
currentRequest = null;
}
}
} else {
parseAndConsumeDocumentLine(data, from, to);
currentRequest = null;
}
}

private boolean parseActionLine(BytesReference data, int from, int to) throws IOException {
assert currentRequest == null;

Expand Down Expand Up @@ -526,13 +578,13 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
private void parseAndConsumeDocumentLine(BytesReference data, int from, int to) throws IOException {
assert currentRequest != null && currentRequest instanceof DeleteRequest == false;
if (currentRequest instanceof IndexRequest indexRequest) {
indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType), xContentType);
indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType, bulkFormat), xContentType);
indexRequestConsumer.accept(indexRequest, currentType);
} else if (currentRequest instanceof UpdateRequest updateRequest) {
try (
XContentParser sliceParser = createParser(
xContentType.xContent(),
sliceTrimmingCarriageReturn(data, from, to, xContentType)
sliceTrimmingCarriageReturn(data, from, to, xContentType, bulkFormat)
)
) {
updateRequest.fromXContent(sliceParser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.action.bulk.IncrementalBulkService;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand All @@ -41,6 +42,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -62,6 +64,28 @@ public class RestBulkAction extends BaseRestHandler {

public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in bulk requests is deprecated.";
public static final String FAILURE_STORE_STATUS_CAPABILITY = "failure_store_status";

private static final String BULK_FORMAT_HEADER = "Bulk-Format";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been debating on if this should be called X-Bulk-Format, but this RFC (https://datatracker.ietf.org/doc/html/rfc6648) indicates that we shouldn't rely on that convention.

I am fine with either.

I defer to @elastic/es-core-infra on this decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to X-Bulk-Format but I am fine either way.


public enum BulkFormat {
PREFIX_LENGTH,
MARKER_SUFFIX;

static BulkFormat parse(String bulkFormat) {
if (Strings.hasText(bulkFormat)) {
if ("marker-suffix".equalsIgnoreCase(bulkFormat)) {
return MARKER_SUFFIX;
} else if ("prefix-length".equalsIgnoreCase(bulkFormat)) {
return PREFIX_LENGTH;
} else {
throw new IllegalArgumentException("Unknown bulk format: " + bulkFormat);
}
} else {
throw new IllegalArgumentException("Header [" + BULK_FORMAT_HEADER + "] cannot be empty.");
}
}
}

private final boolean allowExplicitIndex;
private final IncrementalBulkService bulkHandler;
private final IncrementalBulkService.Enabled incrementalEnabled;
Expand Down Expand Up @@ -127,6 +151,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
defaultListExecutedPipelines,
allowExplicitIndex,
request.getXContentType(),
parseBulkFormatHeader(request.getHeaders()),
request.getRestApiVersion()
);
} catch (Exception e) {
Expand All @@ -149,6 +174,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}
}

private static BulkFormat parseBulkFormatHeader(Map<String, List<String>> headers) {
List<String> header = headers.get(BULK_FORMAT_HEADER);
if (header == null || header.isEmpty()) {
return BulkFormat.MARKER_SUFFIX; // default bulk format
} else if (header.size() > 1) {
throw new IllegalArgumentException("Incorrect header [" + BULK_FORMAT_HEADER + "]. Only one value should be provided");
}
return BulkFormat.parse(header.get(0));
}

private static Exception parseFailureException(Exception e) {
if (e instanceof IllegalArgumentException) {
return e;
Expand Down Expand Up @@ -188,6 +223,7 @@ static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
request.paramAsBoolean("list_executed_pipelines", false),
allowExplicitIndex,
request.getXContentType(),
parseBulkFormatHeader(request.getHeaders()),
(indexRequest, type) -> items.add(indexRequest),
items::add,
items::add
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.document.RestBulkAction;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -105,6 +106,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
true,
true,
request.getXContentType(),
RestBulkAction.BulkFormat.MARKER_SUFFIX,
request.getRestApiVersion()
);
return channel -> client.execute(SimulateBulkAction.INSTANCE, bulkRequest, new SimulateIngestRestToXContentListener(channel));
Expand Down
Loading