Skip to content

Commit 812e7e0

Browse files
committed
Change
1 parent cb21030 commit 812e7e0

File tree

5 files changed

+54
-39
lines changed

5 files changed

+54
-39
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.action.index.IndexRequest;
1515
import org.elasticsearch.action.update.UpdateRequest;
1616
import org.elasticsearch.common.bytes.BytesReference;
17+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1718
import org.elasticsearch.common.logging.DeprecationLogger;
1819
import org.elasticsearch.common.lucene.uid.Versions;
1920
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@@ -107,19 +108,20 @@ private static int findNextMarker(byte marker, int from, BytesReference data, bo
107108
* Returns the sliced {@link BytesReference}. If the {@link XContentType} is JSON, the byte preceding the marker is checked to see
108109
* if it is a carriage return and if so, the BytesReference is sliced so that the carriage return is ignored
109110
*/
110-
private static BytesReference sliceTrimmingCarriageReturn(
111-
BytesReference bytesReference,
111+
private static ReleasableBytesReference sliceTrimmingCarriageReturn(
112+
ReleasableBytesReference bytesReference,
112113
int from,
113114
int nextMarker,
114-
XContentType xContentType
115+
XContentType xContentType,
116+
boolean retain
115117
) {
116118
final int length;
117119
if (XContentType.JSON == xContentType && bytesReference.get(nextMarker - 1) == (byte) '\r') {
118120
length = nextMarker - from - 1;
119121
} else {
120122
length = nextMarker - from;
121123
}
122-
return bytesReference.slice(from, length);
124+
return retain ? bytesReference.retainedSlice(from, length) : bytesReference.slice(from, length);
123125
}
124126

125127
/**
@@ -157,7 +159,7 @@ public void parse(
157159
deleteRequestConsumer
158160
);
159161

160-
incrementalParser.parse(data, true);
162+
incrementalParser.parse(ReleasableBytesReference.wrap(data), true);
161163
}
162164

163165
public IncrementalParser incrementalParser(
@@ -251,7 +253,7 @@ private IncrementalParser(
251253
this.deleteRequestConsumer = deleteRequestConsumer;
252254
}
253255

254-
public int parse(BytesReference data, boolean lastData) throws IOException {
256+
public int parse(ReleasableBytesReference data, boolean lastData) throws IOException {
255257
if (failure != null) {
256258
assert false : failure.getMessage();
257259
throw new IllegalStateException("Parser has already encountered exception", failure);
@@ -264,7 +266,7 @@ public int parse(BytesReference data, boolean lastData) throws IOException {
264266
}
265267
}
266268

267-
private int tryParse(BytesReference data, boolean lastData) throws IOException {
269+
private int tryParse(ReleasableBytesReference data, boolean lastData) throws IOException {
268270
int from = 0;
269271
int consumed = 0;
270272

@@ -523,16 +525,17 @@ private boolean parseActionLine(BytesReference data, int from, int to) throws IO
523525
return true;
524526
}
525527

526-
private void parseAndConsumeDocumentLine(BytesReference data, int from, int to) throws IOException {
528+
private void parseAndConsumeDocumentLine(ReleasableBytesReference data, int from, int to) throws IOException {
527529
assert currentRequest != null && currentRequest instanceof DeleteRequest == false;
528530
if (currentRequest instanceof IndexRequest indexRequest) {
529-
indexRequest.source(sliceTrimmingCarriageReturn(data, from, to, xContentType), xContentType);
531+
ReleasableBytesReference indexSource = sliceTrimmingCarriageReturn(data, from, to, xContentType, true);
532+
indexRequest.sourceContext().source(indexSource, xContentType);
530533
indexRequestConsumer.accept(indexRequest, currentType);
531534
} else if (currentRequest instanceof UpdateRequest updateRequest) {
532535
try (
533536
XContentParser sliceParser = createParser(
534537
xContentType.xContent(),
535-
sliceTrimmingCarriageReturn(data, from, to, xContentType)
538+
sliceTrimmingCarriageReturn(data, from, to, xContentType, false)
536539
)
537540
) {
538541
updateRequest.fromXContent(sliceParser);

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
9595
@Nullable
9696
private String routing;
9797

98-
private SourceContext sourceContext = new SourceContext();
98+
private final SourceContext sourceContext;
9999

100100
private OpType opType = OpType.INDEX;
101101

@@ -162,10 +162,11 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
162162
routing = in.readOptionalString();
163163
boolean beforeSourceContext = in.getTransportVersion().before(TransportVersions.SOURCE_CONTEXT);
164164
BytesReference source;
165+
SourceContext localSourceContext = null;
165166
if (beforeSourceContext) {
166167
source = in.readBytesReference();
167168
} else {
168-
sourceContext = new SourceContext(in);
169+
localSourceContext = new SourceContext(in);
169170
source = null;
170171
}
171172
opType = OpType.fromId(in.readByte());
@@ -184,8 +185,9 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
184185
} else {
185186
contentType = null;
186187
}
187-
sourceContext = new SourceContext(contentType, source, () -> {});
188+
localSourceContext = new SourceContext(contentType, source);
188189
}
190+
sourceContext = Objects.requireNonNull(localSourceContext);
189191
ifSeqNo = in.readZLong();
190192
ifPrimaryTerm = in.readVLong();
191193
requireAlias = in.readBoolean();
@@ -226,6 +228,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
226228

227229
public IndexRequest() {
228230
super(NO_SHARD_ID);
231+
this.sourceContext = new SourceContext();
229232
}
230233

231234
/**
@@ -235,6 +238,7 @@ public IndexRequest() {
235238
public IndexRequest(String index) {
236239
super(NO_SHARD_ID);
237240
this.index = index;
241+
this.sourceContext = new SourceContext();
238242
}
239243

240244
private static final StringLiteralDeduplicator pipelineNameDeduplicator = new StringLiteralDeduplicator();
@@ -500,11 +504,6 @@ public IndexRequest source(XContentType xContentType, Object... source) {
500504
return this;
501505
}
502506

503-
public IndexRequest sourceContext(SourceContext sourceContext) {
504-
sourceContext = Objects.requireNonNull(sourceContext);
505-
return this;
506-
}
507-
508507
/**
509508
* Sets the document to index in bytes form.
510509
*/

server/src/main/java/org/elasticsearch/action/index/SourceContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public int byteLength() {
8686

8787
@Override
8888
public void close() {
89-
sourceReleasable.close();
89+
Releasables.close(sourceReleasable);
9090
}
9191

9292
public Map<String, Object> sourceAsMap() {

server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
import org.elasticsearch.action.bulk.BulkRequestParser;
1717
import org.elasticsearch.action.bulk.BulkShardRequest;
1818
import org.elasticsearch.action.bulk.IncrementalBulkService;
19+
import org.elasticsearch.action.bulk.TransportBulkAction;
20+
import org.elasticsearch.action.index.IndexRequest;
1921
import org.elasticsearch.action.support.ActiveShardCount;
2022
import org.elasticsearch.client.internal.node.NodeClient;
21-
import org.elasticsearch.common.bytes.BytesReference;
2223
import org.elasticsearch.common.bytes.CompositeBytesReference;
2324
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2425
import org.elasticsearch.common.settings.ClusterSettings;
2526
import org.elasticsearch.common.settings.Settings;
26-
import org.elasticsearch.core.Releasable;
2727
import org.elasticsearch.core.Releasables;
2828
import org.elasticsearch.core.TimeValue;
2929
import org.elasticsearch.rest.BaseRestHandler;
@@ -217,7 +217,7 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
217217
return;
218218
}
219219

220-
final BytesReference data;
220+
final ReleasableBytesReference data;
221221
int bytesConsumed;
222222
if (chunk.length() == 0) {
223223
chunk.close();
@@ -228,7 +228,8 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
228228
unParsedChunks.add(chunk);
229229

230230
if (unParsedChunks.size() > 1) {
231-
data = CompositeBytesReference.of(unParsedChunks.toArray(new ReleasableBytesReference[0]));
231+
ReleasableBytesReference[] components = unParsedChunks.toArray(new ReleasableBytesReference[0]);
232+
data = new ReleasableBytesReference(CompositeBytesReference.of(components), () -> Releasables.close(components));
232233
} else {
233234
data = chunk;
234235
}
@@ -243,7 +244,8 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
243244
}
244245
}
245246

246-
final ArrayList<Releasable> releasables = accountParsing(bytesConsumed);
247+
releaseConsumeBytes(bytesConsumed);
248+
247249
if (isLast) {
248250
assert unParsedChunks.isEmpty();
249251
if (handler.getIncrementalOperation().totalParsedBytes() == 0) {
@@ -253,24 +255,33 @@ public void handleChunk(RestChannel channel, ReleasableBytesReference chunk, boo
253255
assert channel != null;
254256
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
255257
items.clear();
256-
handler.lastItems(toPass, () -> Releasables.close(releasables), new RestRefCountedChunkedToXContentListener<>(channel));
258+
handler.lastItems(toPass, () -> closeSourceContexts(toPass), new RestRefCountedChunkedToXContentListener<>(channel));
257259
}
258260
handler.updateWaitForChunkMetrics(TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTimeInNanos));
259261
totalChunkWaitTimeInNanos = 0L;
260262
} else if (items.isEmpty() == false) {
261263
ArrayList<DocWriteRequest<?>> toPass = new ArrayList<>(items);
262264
items.clear();
263-
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
265+
handler.addItems(toPass, () -> closeSourceContexts(toPass), () -> {
264266
requestNextChunkTime = System.nanoTime();
265267
request.contentStream().next();
266268
});
267269
} else {
268-
Releasables.close(releasables);
269270
requestNextChunkTime = System.nanoTime();
270271
request.contentStream().next();
271272
}
272273
}
273274

275+
private static void closeSourceContexts(ArrayList<DocWriteRequest<?>> requests) {
276+
// We only slice for index requests currently.
277+
for (DocWriteRequest<?> request : requests) {
278+
IndexRequest indexRequest = TransportBulkAction.getIndexWriteRequest(request);
279+
if (indexRequest != null) {
280+
indexRequest.sourceContext().close();
281+
}
282+
}
283+
}
284+
274285
@Override
275286
public void streamClose() {
276287
assert Transports.assertTransportThread();
@@ -286,19 +297,17 @@ private void shortCircuit() {
286297
unParsedChunks.clear();
287298
}
288299

289-
private ArrayList<Releasable> accountParsing(int bytesConsumed) {
290-
ArrayList<Releasable> releasables = new ArrayList<>(unParsedChunks.size());
300+
private void releaseConsumeBytes(int bytesConsumed) {
291301
while (bytesConsumed > 0) {
292302
ReleasableBytesReference reference = unParsedChunks.removeFirst();
293-
releasables.add(reference);
294303
if (bytesConsumed >= reference.length()) {
295304
bytesConsumed -= reference.length();
296305
} else {
297306
unParsedChunks.addFirst(reference.retainedSlice(bytesConsumed, reference.length() - bytesConsumed));
298307
bytesConsumed = 0;
299308
}
309+
reference.close();
300310
}
301-
return releasables;
302311
}
303312
}
304313

server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.action.DocWriteRequest;
1313
import org.elasticsearch.action.index.IndexRequest;
1414
import org.elasticsearch.common.bytes.BytesArray;
15+
import org.elasticsearch.common.bytes.ReleasableBytesReference;
1516
import org.elasticsearch.core.RestApiVersion;
1617
import org.elasticsearch.core.UpdateForV10;
1718
import org.elasticsearch.test.ESTestCase;
@@ -34,10 +35,10 @@ public class BulkRequestParserTests extends ESTestCase {
3435
.toList();
3536

3637
public void testParserCannotBeReusedAfterFailure() {
37-
BytesArray request = new BytesArray("""
38+
ReleasableBytesReference request = ReleasableBytesReference.wrap(new BytesArray("""
3839
{ "index":{ }, "something": "unexpected" }
3940
{}
40-
""");
41+
"""));
4142

4243
BulkRequestParser parser = new BulkRequestParser(randomBoolean(), true, RestApiVersion.current());
4344
BulkRequestParser.IncrementalParser incrementalParser = parser.incrementalParser(
@@ -58,10 +59,10 @@ public void testParserCannotBeReusedAfterFailure() {
5859
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, false));
5960
assertEquals("Malformed action/metadata line [1], expected END_OBJECT but found [FIELD_NAME]", ex.getMessage());
6061

61-
BytesArray valid = new BytesArray("""
62+
ReleasableBytesReference valid = ReleasableBytesReference.wrap(new BytesArray("""
6263
{ "index":{ "_id": "bar" } }
6364
{}
64-
""");
65+
"""));
6566
expectThrows(AssertionError.class, () -> incrementalParser.parse(valid, false));
6667
}
6768

@@ -86,7 +87,7 @@ public void testIncrementalParsing() throws IOException {
8687
deleteRequests::add
8788
);
8889

89-
BytesArray request = new BytesArray("""
90+
ReleasableBytesReference request = ReleasableBytesReference.wrap(new BytesArray("""
9091
{ "index":{ "_id": "bar", "pipeline": "foo" } }
9192
{ "field": "value"}
9293
{ "index":{ "require_alias": false } }
@@ -97,7 +98,7 @@ public void testIncrementalParsing() throws IOException {
9798
{ "index": { } }
9899
{ "field": "value"}
99100
{ "delete":{ "_id": "bop" } }
100-
""");
101+
"""));
101102

102103
int consumed = 0;
103104
for (int i = 0; i < request.length() - 1; ++i) {
@@ -255,9 +256,12 @@ public void testBarfOnLackOfTrailingNewline() throws IOException {
255256
);
256257

257258
// Should not throw because not last
258-
incrementalParser.parse(request, false);
259+
incrementalParser.parse(ReleasableBytesReference.wrap(request), false);
259260

260-
IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, () -> incrementalParser.parse(request, true));
261+
IllegalArgumentException e2 = expectThrows(
262+
IllegalArgumentException.class,
263+
() -> incrementalParser.parse(ReleasableBytesReference.wrap(request), true)
264+
);
261265
assertEquals("The bulk request must be terminated by a newline [\\n]", e2.getMessage());
262266
}
263267

0 commit comments

Comments
 (0)