Skip to content

Commit 85cc749

Browse files
authored
Add source context to index request (#134474)
Add a source context to manage lifecycle of source. This commit is only a first step adding the context.
1 parent 948f661 commit 85cc749

File tree

15 files changed

+376
-137
lines changed

15 files changed

+376
-137
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,7 @@ static TransportVersion def(int id) {
324324
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
325325
public static final TransportVersion INFERENCE_API_EIS_DIAGNOSTICS = def(9_156_0_00);
326326
public static final TransportVersion ML_INFERENCE_ENDPOINT_CACHE = def(9_157_0_00);
327+
public static final TransportVersion INDEX_SOURCE = def(9_158_0_00);
327328

328329
/*
329330
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/DocWriteRequest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.elasticsearch.action;
1010

1111
import org.apache.lucene.util.Accountable;
12+
import org.elasticsearch.action.bulk.TransportAbstractBulkAction;
1213
import org.elasticsearch.action.delete.DeleteRequest;
1314
import org.elasticsearch.action.index.IndexRequest;
1415
import org.elasticsearch.action.support.IndicesOptions;
@@ -20,6 +21,7 @@
2021
import org.elasticsearch.common.io.stream.StreamOutput;
2122
import org.elasticsearch.common.lucene.uid.Versions;
2223
import org.elasticsearch.core.Nullable;
24+
import org.elasticsearch.core.Releasable;
2325
import org.elasticsearch.index.Index;
2426
import org.elasticsearch.index.VersionType;
2527
import org.elasticsearch.index.shard.ShardId;
@@ -37,7 +39,7 @@
3739
* Generic interface to group ActionRequest, which perform writes to a single document
3840
* Action requests implementing this can be part of {@link org.elasticsearch.action.bulk.BulkRequest}
3941
*/
40-
public interface DocWriteRequest<T> extends IndicesRequest, Accountable {
42+
public interface DocWriteRequest<T> extends IndicesRequest, Accountable, Releasable {
4143

4244
// Flag set for disallowing index auto creation for an individual write request.
4345
String REQUIRE_ALIAS = "require_alias";
@@ -346,4 +348,12 @@ static ActionRequestValidationException validateDocIdLength(String id, ActionReq
346348
}
347349
return validationException;
348350
}
351+
352+
@Override
353+
default void close() {
354+
IndexRequest indexRequest = TransportAbstractBulkAction.getIndexWriteRequest(this);
355+
if (indexRequest != null) {
356+
indexRequest.indexSource().close();
357+
}
358+
}
349359
}

server/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ BulkRequest internalAdd(IndexRequest request) {
182182

183183
requests.add(request);
184184
// lack of source is validated in validate() method
185-
sizeInBytes += (request.source() != null ? request.source().length() : 0) + REQUEST_OVERHEAD;
185+
sizeInBytes += request.indexSource().byteLength() + REQUEST_OVERHEAD;
186186
indices.add(request.index());
187187
return this;
188188
}
@@ -200,10 +200,10 @@ BulkRequest internalAdd(UpdateRequest request) {
200200

201201
requests.add(request);
202202
if (request.doc() != null) {
203-
sizeInBytes += request.doc().source().length();
203+
sizeInBytes += request.doc().indexSource().byteLength();
204204
}
205205
if (request.upsertRequest() != null) {
206-
sizeInBytes += request.upsertRequest().source().length();
206+
sizeInBytes += request.upsertRequest().indexSource().byteLength();
207207
}
208208
if (request.script() != null) {
209209
sizeInBytes += request.script().getIdOrCode().length() * 2;

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestModifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public void markItemForFailureStore(int slot, String targetIndexName, Exception
296296
() -> "Encountered exception while attempting to redirect a failed ingest operation: index ["
297297
+ targetIndexName
298298
+ "], source: ["
299-
+ indexRequest.source().utf8ToString()
299+
+ indexRequest.indexSource().bytes().utf8ToString()
300300
+ "]",
301301
ioException
302302
);

server/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,11 @@ public long totalSizeInBytes() {
9090
for (int i = 0; i < items.length; i++) {
9191
DocWriteRequest<?> request = items[i].request();
9292
if (request instanceof IndexRequest) {
93-
if (((IndexRequest) request).source() != null) {
94-
totalSizeInBytes += ((IndexRequest) request).source().length();
95-
}
93+
totalSizeInBytes += ((IndexRequest) request).indexSource().byteLength();
9694
} else if (request instanceof UpdateRequest) {
9795
IndexRequest doc = ((UpdateRequest) request).doc();
98-
if (doc != null && doc.source() != null) {
99-
totalSizeInBytes += ((UpdateRequest) request).doc().source().length();
96+
if (doc != null) {
97+
totalSizeInBytes += ((UpdateRequest) request).doc().indexSource().byteLength();
10098
}
10199
}
102100
}
@@ -108,13 +106,11 @@ public long maxOperationSizeInBytes() {
108106
for (int i = 0; i < items.length; i++) {
109107
DocWriteRequest<?> request = items[i].request();
110108
if (request instanceof IndexRequest) {
111-
if (((IndexRequest) request).source() != null) {
112-
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((IndexRequest) request).source().length());
113-
}
109+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((IndexRequest) request).indexSource().byteLength());
114110
} else if (request instanceof UpdateRequest) {
115111
IndexRequest doc = ((UpdateRequest) request).doc();
116-
if (doc != null && doc.source() != null) {
117-
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((UpdateRequest) request).doc().source().length());
112+
if (doc != null) {
113+
maxOperationSizeInBytes = Math.max(maxOperationSizeInBytes, ((UpdateRequest) request).doc().indexSource().byteLength());
118114
}
119115
}
120116
}

0 commit comments

Comments
 (0)