Skip to content

Commit 269c884

Browse files
committed
handle missing content in incremental bulk
1 parent 0814e2f commit 269c884

File tree

4 files changed

+47
-18
lines changed

4 files changed

+47
-18
lines changed

server/src/main/java/org/elasticsearch/http/HttpBody.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,26 @@ static Full empty() {
2727
return new ByteRefHttpBody(ReleasableBytesReference.empty());
2828
}
2929

30+
class NoopStream implements Stream {
31+
32+
@Override
33+
public ChunkHandler handler() {
34+
return null;
35+
}
36+
37+
@Override
38+
public void addTracingHandler(ChunkHandler chunkHandler) {}
39+
40+
@Override
41+
public void setHandler(ChunkHandler chunkHandler) {}
42+
43+
@Override
44+
public void next() {}
45+
46+
@Override
47+
public void close() {}
48+
}
49+
3050
default boolean isFull() {
3151
return this instanceof Full;
3252
}

server/src/main/java/org/elasticsearch/rest/RestRequest.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,14 +329,19 @@ public HttpBody.Stream contentStream() {
329329
return httpRequest.body().asStream();
330330
}
331331

332+
public void ensureContent() {
333+
if (hasContent() == false) {
334+
throw new ElasticsearchParseException("request body is required");
335+
}
336+
}
337+
332338
/**
333339
* Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
334340
* See {@link #content()}.
335341
*/
336342
public ReleasableBytesReference requiredContent() {
337-
if (hasContent() == false) {
338-
throw new ElasticsearchParseException("request body is required");
339-
} else if (xContentType.get() == null) {
343+
ensureContent();
344+
if (xContentType.get() == null) {
340345
throwValidationException("unknown content type");
341346
}
342347
return content();

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
137137
client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content));
138138
};
139139
} else {
140+
request.ensureContent();
140141
String waitForActiveShards = request.param("wait_for_active_shards");
141142
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
142143
String refresh = request.param("refresh");

server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.rest.action.document;
1111

1212
import org.apache.lucene.util.SetOnce;
13+
import org.elasticsearch.ElasticsearchParseException;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.DocWriteRequest;
1516
import org.elasticsearch.action.bulk.BulkRequest;
@@ -18,6 +19,7 @@
1819
import org.elasticsearch.action.index.IndexRequest;
1920
import org.elasticsearch.action.update.UpdateRequest;
2021
import org.elasticsearch.client.internal.Client;
22+
import org.elasticsearch.client.internal.node.NodeClient;
2123
import org.elasticsearch.common.bytes.BytesArray;
2224
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2325
import org.elasticsearch.common.settings.ClusterSettings;
@@ -201,28 +203,29 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
201203
}
202204
}
203205

206+
public void testIncrementalBulkMissingContent() {
207+
assertThrows(
208+
ElasticsearchParseException.class,
209+
() -> new RestBulkAction(
210+
Settings.EMPTY,
211+
ClusterSettings.createBuiltInClusterSettings(),
212+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
213+
).handleRequest(
214+
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk").withBody(new HttpBody.NoopStream()).build(),
215+
mock(RestChannel.class),
216+
mock(NodeClient.class)
217+
)
218+
);
219+
}
220+
204221
public void testIncrementalParsing() {
205222
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
206223
AtomicBoolean isLast = new AtomicBoolean(false);
207224
AtomicBoolean next = new AtomicBoolean(false);
208225

209226
FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
210227
.withMethod(RestRequest.Method.POST)
211-
.withBody(new HttpBody.Stream() {
212-
@Override
213-
public void close() {}
214-
215-
@Override
216-
public ChunkHandler handler() {
217-
return null;
218-
}
219-
220-
@Override
221-
public void addTracingHandler(ChunkHandler chunkHandler) {}
222-
223-
@Override
224-
public void setHandler(ChunkHandler chunkHandler) {}
225-
228+
.withBody(new HttpBody.NoopStream() {
226229
@Override
227230
public void next() {
228231
next.set(true);

0 commit comments

Comments
 (0)