Skip to content

Commit 5b7b7d5

Browse files
committed
Change
1 parent 98772ba commit 5b7b7d5

File tree

4 files changed

+72
-40
lines changed

4 files changed

+72
-40
lines changed

qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/BulkRestIT.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,7 @@ public void testBulkUriMatchingDoesNotMatchBulkCapabilitiesApi() throws IOExcept
5252
}
5353

5454
public void testBulkMissingBody() throws IOException {
55-
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
56-
request.setJsonEntity("");
57-
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
58-
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
59-
assertThat(responseException.getMessage(), containsString("request body is required"));
55+
sendMissingBody();
6056
}
6157

6258
public void testBulkInvalidIndexNameString() throws IOException {
@@ -79,16 +75,7 @@ public void testBulkInvalidIndexNameString() throws IOException {
7975
}
8076

8177
public void testBulkRequestBodyImproperlyTerminated() throws IOException {
82-
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
83-
// missing final line of the bulk body. cannot process
84-
request.setJsonEntity(
85-
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
86-
+ "{\"field\":1}\n"
87-
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
88-
);
89-
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
90-
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
91-
assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
78+
sendImproperlyTerminated();
9279
}
9380

9481
public void testBulkRequest() throws IOException {
@@ -156,6 +143,9 @@ public void testBulkWithIncrementalDisabled() throws IOException {
156143

157144
try {
158145
sendLargeBulk();
146+
sendMalFormedActionLine();
147+
sendImproperlyTerminated();
148+
sendMissingBody();
159149
} finally {
160150
internalCluster().getInstances(IncrementalBulkService.class).forEach(i -> i.setForTests(true));
161151
updateClusterSettings(Settings.builder().put(IncrementalBulkService.INCREMENTAL_BULK.getKey(), (String) null));
@@ -177,6 +167,31 @@ public void testMalformedActionLineBulk() throws IOException {
177167
final Response indexCreatedResponse = getRestClient().performRequest(createRequest);
178168
assertThat(indexCreatedResponse.getStatusLine().getStatusCode(), equalTo(OK.getStatus()));
179169

170+
sendMalFormedActionLine();
171+
}
172+
173+
private static void sendMissingBody() {
174+
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
175+
request.setJsonEntity("");
176+
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
177+
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
178+
assertThat(responseException.getMessage(), containsString("request body is required"));
179+
}
180+
181+
private static void sendImproperlyTerminated() {
182+
Request request = new Request(randomBoolean() ? "POST" : "PUT", "/_bulk");
183+
// missing final line of the bulk body. cannot process
184+
request.setJsonEntity(
185+
"{\"index\":{\"_index\":\"index_name\",\"_id\":\"1\"}}\n"
186+
+ "{\"field\":1}\n"
187+
+ "{\"index\":{\"_index\":\"index_name\",\"_id\":\"2\"}"
188+
);
189+
ResponseException responseException = expectThrows(ResponseException.class, () -> getRestClient().performRequest(request));
190+
assertEquals(400, responseException.getResponse().getStatusLine().getStatusCode());
191+
assertThat(responseException.getMessage(), containsString("The bulk request must be terminated by a newline"));
192+
}
193+
194+
private static void sendMalFormedActionLine() throws IOException {
180195
Request bulkRequest = new Request("POST", "/index_name/_bulk");
181196

182197
final StringBuilder bulk = new StringBuilder();

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,12 @@ public void parse(
159159
deleteRequestConsumer
160160
);
161161

162-
incrementalParser.parse(ReleasableBytesReference.wrap(data), true);
162+
incrementalParser.parse(
163+
data instanceof ReleasableBytesReference releasableBytesReference
164+
? releasableBytesReference
165+
: ReleasableBytesReference.wrap(data),
166+
true
167+
);
163168
}
164169

165170
public IncrementalParser incrementalParser(
@@ -528,8 +533,7 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
528533
private void parseAndConsumeDocumentLine(ReleasableBytesReference data, int from, int to) throws IOException {
529534
assert currentRequest != null && currentRequest instanceof DeleteRequest == false;
530535
if (currentRequest instanceof IndexRequest indexRequest) {
531-
ReleasableBytesReference indexSource = sliceTrimmingCarriageReturn(data, from, to, xContentType, true);
532-
indexRequest.sourceContext().source(indexSource, xContentType);
536+
indexRequest.sourceContext().source(sliceTrimmingCarriageReturn(data, from, to, xContentType, true), xContentType);
533537
indexRequestConsumer.accept(indexRequest, currentType);
534538
} else if (currentRequest instanceof UpdateRequest updateRequest) {
535539
try (

server/src/main/java/org/elasticsearch/action/index/SourceContext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class SourceContext implements Writeable, Releasable {
3434
private XContentType contentType;
3535
private BytesReference source;
3636
private Releasable sourceReleasable;
37+
private boolean isClosed = false;
3738

3839
public SourceContext() {}
3940

@@ -86,6 +87,8 @@ public int byteLength() {
8687

8788
@Override
8889
public void close() {
90+
assert isClosed == false;
91+
isClosed = true;
8992
Releasables.close(sourceReleasable);
9093
}
9194

@@ -240,7 +243,8 @@ private void setSource(BytesReference source, XContentType contentType) {
240243
private void setSource(BytesReference source, XContentType contentType, Releasable sourceReleasable) {
241244
this.source = source;
242245
this.contentType = contentType;
243-
Releasables.close(sourceReleasable);
246+
Releasable toClose = this.sourceReleasable;
244247
this.sourceReleasable = sourceReleasable;
248+
Releasables.close(toClose);
245249
}
246250
}

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,10 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
113113
bulkRequest.setRefreshPolicy(request.param("refresh"));
114114
bulkRequest.includeSourceOnError(RestUtils.getIncludeSourceOnError(request));
115115
bulkRequest.requestParamsUsed(request.params().keySet());
116-
ReleasableBytesReference content = request.requiredContent();
117116

118117
try {
119118
bulkRequest.add(
120-
content,
119+
request.requiredContent(),
121120
defaultIndex,
122121
defaultRouting,
123122
defaultFetchSourceContext,
@@ -130,12 +129,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
130129
request.getRestApiVersion()
131130
);
132131
} catch (Exception e) {
132+
closeSourceContexts(bulkRequest.requests());
133133
return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
134134
}
135-
return channel -> {
136-
content.mustIncRef();
137-
client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content));
138-
};
135+
136+
// The request list is actually mutable so we make a copy for close.
137+
List<DocWriteRequest<?>> toClose = new ArrayList<>(bulkRequest.requests());
138+
return channel -> client.bulk(
139+
bulkRequest,
140+
ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), () -> closeSourceContexts(toClose))
141+
);
139142
} else {
140143
request.ensureContent();
141144
String waitForActiveShards = request.param("wait_for_active_shards");
@@ -217,26 +220,30 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
217220
return;
218221
}
219222

220-
final ReleasableBytesReference data;
221223
int bytesConsumed;
222224
if (chunk.length() == 0) {
223225
chunk.close();
224226
bytesConsumed = 0;
225227
} else {
228+
final ReleasableBytesReference data;
226229
try {
227230
handler.getIncrementalOperation().incrementUnparsedBytes(chunk.length());
228231
unParsedChunks.add(chunk);
229232

230233
if (unParsedChunks.size() > 1) {
231234
ReleasableBytesReference[] components = unParsedChunks.toArray(new ReleasableBytesReference[0]);
235+
for (ReleasableBytesReference reference : components) {
236+
reference.incRef();
237+
}
232238
data = new ReleasableBytesReference(CompositeBytesReference.of(components), () -> Releasables.close(components));
233239
} else {
234-
data = chunk;
240+
data = chunk.retain();
235241
}
236242

237-
bytesConsumed = parser.parse(data, isLast);
238-
handler.getIncrementalOperation().transferUnparsedBytesToParsed(bytesConsumed);
239-
243+
try (ReleasableBytesReference toClose = data) {
244+
bytesConsumed = parser.parse(data, isLast);
245+
handler.getIncrementalOperation().transferUnparsedBytesToParsed(bytesConsumed);
246+
}
240247
} catch (Exception e) {
241248
shortCircuit();
242249
new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
@@ -272,16 +279,6 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
272279
}
273280
}
274281

275-
private static void closeSourceContexts(ArrayList<DocWriteRequest<?>> requests) {
276-
// We only slice for index requests currently.
277-
for (DocWriteRequest<?> request : requests) {
278-
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(request);
279-
if (indexRequest != null) {
280-
indexRequest.sourceContext().close();
281-
}
282-
}
283-
}
284-
285282
@Override
286283
public void streamClose() {
287284
assert Transports.assertTransportThread();
@@ -294,6 +291,8 @@ private void shortCircuit() {
294291
shortCircuited = true;
295292
Releasables.close(handler);
296293
Releasables.close(unParsedChunks);
294+
closeSourceContexts(items);
295+
items.clear();
297296
unParsedChunks.clear();
298297
}
299298

@@ -309,6 +308,16 @@ private void releaseConsumeBytes(int bytesConsumed) {
309308
reference.close();
310309
}
311310
}
311+
312+
}
313+
314+
private static void closeSourceContexts(List<DocWriteRequest<?>> requests) {
315+
for (DocWriteRequest<?> request : requests) {
316+
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(request);
317+
if (indexRequest != null) {
318+
indexRequest.sourceContext().close();
319+
}
320+
}
312321
}
313322

314323
@Override

0 commit comments

Comments
 (0)