Skip to content

Commit 6e810fb

Browse files
authored
Handle missing content in incremental bulk (#133685) (#133856)
(cherry picked from commit 36de99b)
1 parent cae6afa commit 6e810fb

File tree

4 files changed

+48
-22
lines changed

4 files changed

+48
-22
lines changed

server/src/main/java/org/elasticsearch/rest/RestRequest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,16 +329,20 @@ public HttpBody.Stream contentStream() {
329329
return httpRequest.body().asStream();
330330
}
331331

332-
/**
333-
* Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
334-
* See {@link #content()}.
335-
*/
336-
public ReleasableBytesReference requiredContent() {
332+
public void ensureContent() {
337333
if (hasContent() == false) {
338334
throw new ElasticsearchParseException("request body is required");
339335
} else if (xContentType.get() == null) {
340336
throwValidationException("unknown content type");
341337
}
338+
}
339+
340+
/**
341+
* Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
342+
* See {@link #content()}.
343+
*/
344+
public ReleasableBytesReference requiredContent() {
345+
ensureContent();
342346
return content();
343347
}
344348

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
137137
client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content));
138138
};
139139
} else {
140+
request.ensureContent();
140141
String waitForActiveShards = request.param("wait_for_active_shards");
141142
TimeValue timeout = request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT);
142143
String refresh = request.param("refresh");

server/src/test/java/org/elasticsearch/rest/action/document/RestBulkActionTests.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.rest.action.document;
1111

1212
import org.apache.lucene.util.SetOnce;
13+
import org.elasticsearch.ElasticsearchParseException;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.DocWriteRequest;
1516
import org.elasticsearch.action.bulk.BulkRequest;
@@ -18,19 +19,20 @@
1819
import org.elasticsearch.action.index.IndexRequest;
1920
import org.elasticsearch.action.update.UpdateRequest;
2021
import org.elasticsearch.client.internal.Client;
22+
import org.elasticsearch.client.internal.node.NodeClient;
2123
import org.elasticsearch.common.bytes.BytesArray;
2224
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2325
import org.elasticsearch.common.settings.ClusterSettings;
2426
import org.elasticsearch.common.settings.Settings;
2527
import org.elasticsearch.core.Releasable;
26-
import org.elasticsearch.http.HttpBody;
2728
import org.elasticsearch.index.IndexVersion;
2829
import org.elasticsearch.index.IndexingPressure;
2930
import org.elasticsearch.rest.RestChannel;
3031
import org.elasticsearch.rest.RestRequest;
3132
import org.elasticsearch.telemetry.metric.MeterRegistry;
3233
import org.elasticsearch.test.ESTestCase;
3334
import org.elasticsearch.test.client.NoOpNodeClient;
35+
import org.elasticsearch.test.rest.FakeHttpBodyStream;
3436
import org.elasticsearch.test.rest.FakeRestChannel;
3537
import org.elasticsearch.test.rest.FakeRestRequest;
3638
import org.elasticsearch.xcontent.XContentType;
@@ -201,28 +203,32 @@ public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
201203
}
202204
}
203205

206+
public void testIncrementalBulkMissingContent() {
207+
assertThrows(
208+
ElasticsearchParseException.class,
209+
() -> new RestBulkAction(
210+
Settings.EMPTY,
211+
ClusterSettings.createBuiltInClusterSettings(),
212+
new IncrementalBulkService(mock(Client.class), mock(IndexingPressure.class), MeterRegistry.NOOP)
213+
).handleRequest(
214+
new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
215+
.withContentLength(0)
216+
.withBody(new FakeHttpBodyStream())
217+
.build(),
218+
mock(RestChannel.class),
219+
mock(NodeClient.class)
220+
)
221+
);
222+
}
223+
204224
public void testIncrementalParsing() {
205225
ArrayList<DocWriteRequest<?>> docs = new ArrayList<>();
206226
AtomicBoolean isLast = new AtomicBoolean(false);
207227
AtomicBoolean next = new AtomicBoolean(false);
208228

209229
FakeRestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("my_index/_bulk")
210230
.withMethod(RestRequest.Method.POST)
211-
.withBody(new HttpBody.Stream() {
212-
@Override
213-
public void close() {}
214-
215-
@Override
216-
public ChunkHandler handler() {
217-
return null;
218-
}
219-
220-
@Override
221-
public void addTracingHandler(ChunkHandler chunkHandler) {}
222-
223-
@Override
224-
public void setHandler(ChunkHandler chunkHandler) {}
225-
231+
.withBody(new FakeHttpBodyStream() {
226232
@Override
227233
public void next() {
228234
next.set(true);

test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,19 @@ public HttpRequest removeHeader(String header) {
117117
return new FakeHttpRequest(method, uri, body, filteredHeaders, inboundException);
118118
}
119119

120+
public int contentLength() {
121+
return switch (body) {
122+
case HttpBody.Full f -> f.bytes().length();
123+
case HttpBody.Stream s -> {
124+
var len = header("Content-Length");
125+
yield len == null ? 0 : Integer.parseInt(len);
126+
}
127+
};
128+
}
129+
120130
@Override
121131
public boolean hasContent() {
122-
return body.isEmpty() == false;
132+
return contentLength() > 0;
123133
}
124134

125135
@Override
@@ -237,6 +247,11 @@ public Builder withBody(HttpBody body) {
237247
return this;
238248
}
239249

250+
public Builder withContentLength(int length) {
251+
headers.put("Content-Length", List.of(String.valueOf(length)));
252+
return this;
253+
}
254+
240255
public Builder withPath(String path) {
241256
this.path = path;
242257
return this;

0 commit comments

Comments
 (0)