diff --git a/server/src/main/java/org/elasticsearch/rest/RestRequest.java b/server/src/main/java/org/elasticsearch/rest/RestRequest.java index 54ad500a9144d..068d4879603e0 100644 --- a/server/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/server/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -329,16 +329,20 @@ public HttpBody.Stream contentStream() { return httpRequest.body().asStream(); } - /** - * Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing. - * See {@link #content()}. - */ - public ReleasableBytesReference requiredContent() { + public void ensureContent() { if (hasContent() == false) { throw new ElasticsearchParseException("request body is required"); } else if (xContentType.get() == null) { throwValidationException("unknown content type"); } + } + + /** + * Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing. + * See {@link #content()}. + */ + public ReleasableBytesReference requiredContent() { + ensureContent(); return content(); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java index f638367b85e76..293c8128dc8d9 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java @@ -137,6 +137,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content)); }; } else { + request.ensureContent(); String waitForActiveShards = request.param("wait_for_active_shards"); TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT); String refresh = request.param("refresh"); diff --git a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java index f4d601c7ad3b4..0b3e50974a827 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.rest.action.document; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.bulk.BulkRequest; @@ -18,12 +19,12 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Releasable; -import org.elasticsearch.http.HttpBody; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.rest.RestChannel; @@ -31,6 +32,7 @@ import org.elasticsearch.telemetry.metric.MeterRegistry; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpNodeClient; +import org.elasticsearch.test.rest.FakeHttpBodyStream; import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xcontent.XContentType; @@ -201,6 +203,24 @@ public void bulk(BulkRequest request, ActionListener listener) { } } + public void testIncrementalBulkMissingContent() { + assertThrows( + ElasticsearchParseException.class, + () -> new RestBulkAction( + Settings.EMPTY, + ClusterSettings.createBuiltInClusterSettings(), + new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP) + ).handleRequest( + new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") + .withContentLength(0) + .withBody(new FakeHttpBodyStream()) + .build(), + mock(RestChannel.class), + mock(NodeClient.class) + ) + ); + } + public void testIncrementalParsing() { ArrayList> docs = new ArrayList<>(); AtomicBoolean isLast = new AtomicBoolean(false); @@ -208,21 +228,7 @@ public void testIncrementalParsing() { FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk") .withMethod(RestRequest.Method.POST) - .withBody(new HttpBody.Stream() { - @Override - public void close() {} - - @Override - public ChunkHandler handler() { - return null; - } - - @Override - public void addTracingHandler(ChunkHandler chunkHandler) {} - - @Override - public void setHandler(ChunkHandler chunkHandler) {} - + .withBody(new FakeHttpBodyStream() { @Override public void next() { next.set(true); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 3cf9255f19803..e55387a715d97 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -117,9 +117,19 @@ public HttpRequest removeHeader(String header) { return new FakeHttpRequest(method, uri, body, filteredHeaders, inboundException); } + public int contentLength() { + return switch (body) { + case HttpBody.Full f -> f.bytes().length(); + case HttpBody.Stream s -> { + var len = header("Content-Length"); + yield len == null ? 0 : Integer.parseInt(len); + } + }; + } + @Override public boolean hasContent() { - return body.isEmpty() == false; + return contentLength() > 0; } @Override @@ -237,6 +247,11 @@ public Builder withBody(HttpBody body) { return this; } + public Builder withContentLength(int length) { + headers.put("Content-Length", List.of(String.valueOf(length))); + return this; + } + public Builder withPath(String path) { this.path = path; return this;