Skip to content

Commit 9922d54

Browse files
authored
Standardize error code when bulk body is invalid (#114869) (#114944)
Currently the incremental and non-incremental bulk variations will return different error codes when the json body provided is invalid. This commit ensures both version return status code 400. Additionally, this renames the incremental rest tests to bulk tests and ensures that all tests work with both bulk api versions. We set these tests to randomize which version of the api we test each run.
1 parent 4be9122 commit 9922d54

File tree

3 files changed

+65
-25
lines changed

3 files changed

+65
-25
lines changed

docs/changelog/114869.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114869
2+
summary: Standardize error code when bulk body is invalid
3+
area: CRUD
4+
type: bug
5+
issues: []
Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
package org.elasticsearch.http;
1111

12+
import org.apache.http.entity.ByteArrayEntity;
13+
import org.apache.http.entity.ContentType;
1214
import org.elasticsearch.action.bulk.IncrementalBulkService;
1315
import org.elasticsearch.client.Request;
1416
import org.elasticsearch.client.Response;
@@ -19,24 +21,30 @@
1921
import org.elasticsearch.xcontent.json.JsonXContent;
2022

2123
import java.io.IOException;
24+
import java.nio.charset.StandardCharsets;
2225
import java.util.List;
2326
import java.util.Map;
2427

28+
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
2529
import static org.elasticsearch.rest.RestStatus.OK;
2630
import static org.hamcrest.CoreMatchers.containsString;
2731
import static org.hamcrest.Matchers.equalTo;
2832

2933
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
30-
public class IncrementalBulkRestIT extends HttpSmokeTestCase {
34+
public class BulkRestIT extends HttpSmokeTestCase {
3135

3236
@Override
3337
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
3438
return Settings.builder()
3539
.put(super.nodeSettings(nodeOrdinal, otherSettings))
36-
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), true)
40+
.put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), seventyFivePercentOfTheTime())
3741
.build();
3842
}
3943

44+
private static boolean seventyFivePercentOfTheTime() {
45+
return (randomBoolean() && randomBoolean()) == false;
46+
}
47+
4048
public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOException {
4149
Request request = new Request("GET", "/_capabilities?method=GET&path=%2F_bulk&capabilities=failure_store_status&pretty");
4250
Response response = getRestClient().performRequest(request);
@@ -51,6 +59,26 @@ public void testBulkMissingBody() throws IOException {
5159
assertThat(responseException.getMessage(), containsString("request body is required"));
5260
}
5361

62+
public void testBulkInvalidIndexNameString() throws IOException {
63+
Request request = new Request("POST", "/_bulk");
64+
65+
byte[] bytes1 = "{\"create\":{\"_index\":\"".getBytes(StandardCharsets.UTF_8);
66+
byte[] bytes2 = new byte[] { (byte) 0xfe, (byte) 0xfe, (byte) 0xff, (byte) 0xff };
67+
byte[] bytes3 = "\",\"_id\":\"1\"}}\n{\"field\":1}\n\r\n".getBytes(StandardCharsets.UTF_8);
68+
byte[] bulkBody = new byte[bytes1.length + bytes2.length + bytes3.length];
69+
System.arraycopy(bytes1, 0, bulkBody, 0, bytes1.length);
70+
System.arraycopy(bytes2, 0, bulkBody, bytes1.length, bytes2.length);
71+
System.arraycopy(bytes3, 0, bulkBody, bytes1.length + bytes2.length, bytes3.length);
72+
73+
request.setEntity(new ByteArrayEntity(bulkBody, ContentType.APPLICATION_JSON));
74+
75+
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
76+
assertThat(responseException.getResponse().getStatusLine().getStatusCode(), equalTo(BAD_REQUEST.getStatus()));
77+
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
78+
assertThat(responseException.getMessage(), containsString("json_parse_exception"));
79+
assertThat(responseException.getMessage(), containsString("Invalid UTF-8"));
80+
}
81+
5482
public void testBulkRequestBodyImproperlyTerminated() throws IOException {
5583
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
5684
// missing final line of the bulk body. cannot process
@@ -61,10 +89,10 @@ public void testBulkRequestBodyImproperlyTerminated() throws IOException {
6189
);
6290
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
6391
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
64-
assertThat(responseException.getMessage(), containsString("could not parse bulk request body"));
92+
assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
6593
}
6694

67-
public void testIncrementalBulk() throws IOException {
95+
public void testBulkRequest() throws IOException {
6896
Request createRequest = new Request("PUT", "/index_name");
6997
createRequest.setJsonEntity("""
7098
{
@@ -81,7 +109,6 @@ public void testIncrementalBulk() throws IOException {
81109

82110
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
83111

84-
// index documents for the rollup job
85112
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
86113
+ "{\"field\":1}\n"
87114
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
@@ -113,7 +140,6 @@ public void testBulkWithIncrementalDisabled() throws IOException {
113140

114141
Request firstBulkRequest = new Request("POST", "/index_name/_bulk");
115142

116-
// index documents for the rollup job
117143
String bulkBody = "{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
118144
+ "{\"field\":1}\n"
119145
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}}\n"
@@ -137,7 +163,7 @@ public void testBulkWithIncrementalDisabled() throws IOException {
137163
}
138164
}
139165

140-
public void testIncrementalMalformed() throws IOException {
166+
public void testMalformedActionLineBulk() throws IOException {
141167
Request createRequest = new Request("PUT", "/index_name");
142168
createRequest.setJsonEntity("""
143169
{
@@ -154,7 +180,6 @@ public void testIncrementalMalformed() throws IOException {
154180

155181
Request bulkRequest = new Request("POST", "/index_name/_bulk");
156182

157-
// index documents for the rollup job
158183
final StringBuilder bulk = new StringBuilder();
159184
bulk.append("{\"index\":{\"_index\":\"index_name\"}}\n");
160185
bulk.append("{\"field\":1}\n");
@@ -170,7 +195,6 @@ public void testIncrementalMalformed() throws IOException {
170195
private static void sendLargeBulk() throws IOException {
171196
Request bulkRequest = new Request("POST", "/index_name/_bulk");
172197

173-
// index documents for the rollup job
174198
final StringBuilder bulk = new StringBuilder();
175199
bulk.append("{\"delete\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n");
176200
int updates = 0;

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,23 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
110110
boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
111111
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
112112
bulkRequest.setRefreshPolicy(request.param("refresh"));
113-
bulkRequest.add(
114-
request.requiredContent(),
115-
defaultIndex,
116-
defaultRouting,
117-
defaultFetchSourceContext,
118-
defaultPipeline,
119-
defaultRequireAlias,
120-
defaultRequireDataStream,
121-
defaultListExecutedPipelines,
122-
allowExplicitIndex,
123-
request.getXContentType(),
124-
request.getRestApiVersion()
125-
);
113+
try {
114+
bulkRequest.add(
115+
request.requiredContent(),
116+
defaultIndex,
117+
defaultRouting,
118+
defaultFetchSourceContext,
119+
defaultPipeline,
120+
defaultRequireAlias,
121+
defaultRequireDataStream,
122+
defaultListExecutedPipelines,
123+
allowExplicitIndex,
124+
request.getXContentType(),
125+
request.getRestApiVersion()
126+
);
127+
} catch (Exception e) {
128+
return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
129+
}
126130

127131
return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
128132
} else {
@@ -137,6 +141,15 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
137141
}
138142
}
139143

144+
private static Exception parseFailureException(Exception e) {
145+
if (e instanceof IllegalArgumentException) {
146+
return e;
147+
} else {
148+
// TODO: Maybe improve in follow-up to be XContentParseException and include line number and column
149+
return new ElasticsearchParseException("could not parse bulk request body", e);
150+
}
151+
}
152+
140153
static class ChunkHandler implements BaseRestHandler.RequestBodyChunkConsumer {
141154

142155
private final boolean allowExplicitIndex;
@@ -229,9 +242,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
229242

230243
} catch (Exception e) {
231244
shortCircuit();
232-
new RestToXContentListener<>(channel).onFailure(
233-
new ElasticsearchParseException("could not parse bulk request body", e)
234-
);
245+
new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
235246
return;
236247
}
237248
}

0 commit comments

Comments
 (0)