Skip to content

Commit 2d538c7

Browse files
authored
Backport transport changes from elastic#114895 to 8.x (elastic#115909)
1 parent eeca7aa commit 2d538c7

File tree

6 files changed

+28
-290
lines changed

6 files changed

+28
-290
lines changed

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java

Lines changed: 0 additions & 142 deletions
This file was deleted.

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 22 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
146146
* rawTimestamp field is used on the coordinate node, it doesn't need to be serialised.
147147
*/
148148
private Object rawTimestamp;
149-
private long normalisedBytesParsed = -1;
150-
private boolean originatesFromUpdateByScript;
151-
private boolean originatesFromUpdateByDoc;
152149

153150
public IndexRequest(StreamInput in) throws IOException {
154151
this(null, in);
@@ -183,7 +180,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
183180
dynamicTemplates = in.readMap(StreamInput::readString);
184181
if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
185182
&& in.getTransportVersion().before(TransportVersions.V_8_13_0)) {
186-
in.readBoolean();
183+
in.readBoolean(); // obsolete, prior to tracking normalisedBytesParsed
187184
}
188185
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
189186
this.listExecutedPipelines = in.readBoolean();
@@ -196,21 +193,20 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
196193
}
197194
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
198195
requireDataStream = in.readBoolean();
199-
normalisedBytesParsed = in.readZLong();
200196
} else {
201197
requireDataStream = false;
202198
}
203199

204-
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
205-
originatesFromUpdateByScript = in.readBoolean();
206-
} else {
207-
originatesFromUpdateByScript = false;
208-
}
209-
210-
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
211-
originatesFromUpdateByDoc = in.readBoolean();
212-
} else {
213-
originatesFromUpdateByDoc = false;
200+
if (in.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) {
201+
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
202+
in.readZLong(); // obsolete normalisedBytesParsed
203+
}
204+
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
205+
in.readBoolean(); // obsolete originatesFromUpdateByScript
206+
}
207+
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
208+
in.readBoolean(); // obsolete originatesFromUpdateByDoc
209+
}
214210
}
215211
}
216212

@@ -759,7 +755,7 @@ private void writeBody(StreamOutput out) throws IOException {
759755
out.writeMap(dynamicTemplates, StreamOutput::writeString);
760756
if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
761757
&& out.getTransportVersion().before(TransportVersions.V_8_13_0)) {
762-
out.writeBoolean(normalisedBytesParsed != -1L);
758+
out.writeBoolean(false); // obsolete, prior to tracking normalisedBytesParsed
763759
}
764760
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
765761
out.writeBoolean(listExecutedPipelines);
@@ -770,15 +766,18 @@ private void writeBody(StreamOutput out) throws IOException {
770766

771767
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
772768
out.writeBoolean(requireDataStream);
773-
out.writeZLong(normalisedBytesParsed);
774-
}
775-
776-
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
777-
out.writeBoolean(originatesFromUpdateByScript);
778769
}
779770

780-
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
781-
out.writeBoolean(originatesFromUpdateByDoc);
771+
if (out.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) {
772+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
773+
out.writeZLong(-1); // obsolete normalisedBytesParsed
774+
}
775+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
776+
out.writeBoolean(false); // obsolete originatesFromUpdateByScript
777+
}
778+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
779+
out.writeBoolean(false); // obsolete originatesFromUpdateByDoc
780+
}
782781
}
783782
}
784783

@@ -928,24 +927,6 @@ public void setRawTimestamp(Object rawTimestamp) {
928927
this.rawTimestamp = rawTimestamp;
929928
}
930929

931-
/**
932-
* Returns a number of bytes observed when parsing a document in earlier stages of ingestion (like update/ingest service)
933-
* Defaults to -1 when a document size was not observed in earlier stages.
934-
* @return a number of bytes observed
935-
*/
936-
public long getNormalisedBytesParsed() {
937-
return normalisedBytesParsed;
938-
}
939-
940-
/**
941-
* Sets number of bytes observed by a <code>DocumentSizeObserver</code>
942-
* @return an index request
943-
*/
944-
public IndexRequest setNormalisedBytesParsed(long normalisedBytesParsed) {
945-
this.normalisedBytesParsed = normalisedBytesParsed;
946-
return this;
947-
}
948-
949930
/**
950931
* Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true
951932
*
@@ -976,22 +957,4 @@ public List<String> getExecutedPipelines() {
976957
return Collections.unmodifiableList(executedPipelines);
977958
}
978959
}
979-
980-
public IndexRequest setOriginatesFromUpdateByScript(boolean originatesFromUpdateByScript) {
981-
this.originatesFromUpdateByScript = originatesFromUpdateByScript;
982-
return this;
983-
}
984-
985-
public boolean originatesFromUpdateByScript() {
986-
return originatesFromUpdateByScript;
987-
}
988-
989-
public boolean originatesFromUpdateByDoc() {
990-
return originatesFromUpdateByDoc;
991-
}
992-
993-
public IndexRequest setOriginatesFromUpdateByDoc(boolean originatesFromUpdateByDoc) {
994-
this.originatesFromUpdateByDoc = originatesFromUpdateByDoc;
995-
return this;
996-
}
997960
}

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,7 @@ Result prepareUpdateIndexRequest(IndexShard indexShard, UpdateRequest request, G
228228
.setIfPrimaryTerm(getResult.getPrimaryTerm())
229229
.waitForActiveShards(request.waitForActiveShards())
230230
.timeout(request.timeout())
231-
.setRefreshPolicy(request.getRefreshPolicy())
232-
.setOriginatesFromUpdateByDoc(true);
233-
finalIndexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes());
231+
.setRefreshPolicy(request.getRefreshPolicy());
234232
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
235233
}
236234
}
@@ -272,8 +270,7 @@ Result prepareUpdateScriptRequest(IndexShard indexShard, UpdateRequest request,
272270
.setIfPrimaryTerm(getResult.getPrimaryTerm())
273271
.waitForActiveShards(request.waitForActiveShards())
274272
.timeout(request.timeout())
275-
.setRefreshPolicy(request.getRefreshPolicy())
276-
.setOriginatesFromUpdateByScript(true);
273+
.setRefreshPolicy(request.getRefreshPolicy());
277274
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
278275
}
279276
case DELETE -> {

server/src/main/java/org/elasticsearch/ingest/IngestService.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -806,7 +806,6 @@ public void onFailure(Exception e) {
806806
);
807807

808808
executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener);
809-
indexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes());
810809
assert actionRequest.index() != null;
811810

812811
i++;

server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.elasticsearch.threadpool.TestThreadPool;
5454
import org.elasticsearch.threadpool.ThreadPool;
5555
import org.elasticsearch.threadpool.ThreadPool.Names;
56-
import org.mockito.ArgumentCaptor;
5756
import org.mockito.MockingDetails;
5857
import org.mockito.Mockito;
5958
import org.mockito.stubbing.Stubbing;
@@ -600,9 +599,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
600599
.retryOnConflict(retries);
601600
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
602601

603-
IndexRequest updateResponse = new IndexRequest("index").id("id")
604-
.source(Requests.INDEX_CONTENT_TYPE, "field", "value")
605-
.setNormalisedBytesParsed(0);// let's pretend this was modified by a script
602+
IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
606603
DocumentParsingProvider documentParsingProvider = mock(DocumentParsingProvider.class);
607604

608605
Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>");
@@ -655,11 +652,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception {
655652
assertThat(failure.getCause(), equalTo(err));
656653
assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT));
657654

658-
// we have set 0 value on normalisedBytesParsed on the IndexRequest, like it happens with updates by script.
659-
ArgumentCaptor<IndexRequest> argument = ArgumentCaptor.forClass(IndexRequest.class);
660-
verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(argument.capture());
661-
IndexRequest value = argument.getValue();
662-
assertThat(value.getNormalisedBytesParsed(), equalTo(0L));
655+
verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(any());
663656
}
664657

665658
@SuppressWarnings("unchecked")
@@ -668,9 +661,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
668661
DocWriteRequest<UpdateRequest> writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value");
669662
BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest);
670663

671-
IndexRequest updateResponse = new IndexRequest("index").id("id")
672-
.source(Requests.INDEX_CONTENT_TYPE, "field", "value")
673-
.setNormalisedBytesParsed(100L);
664+
IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value");
674665
DocumentParsingProvider documentParsingProvider = mock(DocumentParsingProvider.class);
675666

676667
boolean created = randomBoolean();
@@ -721,10 +712,7 @@ public void testUpdateRequestWithSuccess() throws Exception {
721712
assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK));
722713
assertThat(response.getSeqNo(), equalTo(13L));
723714

724-
ArgumentCaptor<IndexRequest> argument = ArgumentCaptor.forClass(IndexRequest.class);
725-
verify(documentParsingProvider, times(1)).newMeteringParserDecorator(argument.capture());
726-
IndexRequest value = argument.getValue();
727-
assertThat(value.getNormalisedBytesParsed(), equalTo(100L));
715+
verify(documentParsingProvider).newMeteringParserDecorator(updateResponse);
728716
}
729717

730718
public void testUpdateWithDelete() throws Exception {

0 commit comments

Comments
 (0)