Skip to content

Commit d7bd22e

Browse files
moschejfreden
authored andcommitted
Remove metering from ingest service to occur afterwards when parsing the final document (elastic#114895)
1 parent 87af141 commit d7bd22e

File tree

25 files changed

+87
-375
lines changed

25 files changed

+87
-375
lines changed

modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java

Lines changed: 0 additions & 137 deletions
This file was deleted.

server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.plugins.internal;
1111

12-
import org.elasticsearch.action.DocWriteRequest;
1312
import org.elasticsearch.action.index.IndexRequest;
1413
import org.elasticsearch.index.IndexSettings;
1514
import org.elasticsearch.index.engine.EngineFactory;
@@ -126,7 +125,7 @@ public TestDocumentParsingProviderPlugin() {}
126125
public DocumentParsingProvider getDocumentParsingProvider() {
127126
return new DocumentParsingProvider() {
128127
@Override
129-
public <T> XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest<T> request) {
128+
public <T> XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) {
130129
return new TestXContentMeteringParserDecorator(0L);
131130
}
132131

@@ -152,8 +151,8 @@ public TestDocumentSizeReporter(String indexName) {
152151

153152
@Override
154153
public void onIndexingCompleted(ParsedDocument parsedDocument) {
155-
long delta = parsedDocument.getNormalizedSize().ingestedBytes();
156-
if (delta > 0) {
154+
long delta = parsedDocument.getNormalizedSize();
155+
if (delta > XContentMeteringParserDecorator.UNKNOWN_SIZE) {
157156
COUNTER.addAndGet(delta);
158157
}
159158
assertThat(indexName, equalTo(TEST_INDEX_NAME));
@@ -181,8 +180,8 @@ public Token nextToken() throws IOException {
181180
}
182181

183182
@Override
184-
public ParsedDocument.DocumentSize meteredDocumentSize() {
185-
return new ParsedDocument.DocumentSize(counter, counter);
183+
public long meteredDocumentSize() {
184+
return counter;
186185
}
187186
}
188187
}

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ static TransportVersion def(int id) {
182182
public static final TransportVersion SIMULATE_MAPPING_ADDITION = def(8_777_00_0);
183183
public static final TransportVersion INTRODUCE_ALL_APPLICABLE_SELECTOR = def(8_778_00_0);
184184
public static final TransportVersion INDEX_MODE_LOOKUP = def(8_779_00_0);
185+
public static final TransportVersion INDEX_REQUEST_REMOVE_METERING = def(8_780_00_0);
185186

186187
/*
187188
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 22 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
146146
* rawTimestamp field is used on the coordinate node, it doesn't need to be serialised.
147147
*/
148148
private Object rawTimestamp;
149-
private long normalisedBytesParsed = -1;
150-
private boolean originatesFromUpdateByScript;
151-
private boolean originatesFromUpdateByDoc;
152149

153150
public IndexRequest(StreamInput in) throws IOException {
154151
this(null, in);
@@ -183,7 +180,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
183180
dynamicTemplates = in.readMap(StreamInput::readString);
184181
if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
185182
&& in.getTransportVersion().before(TransportVersions.V_8_13_0)) {
186-
in.readBoolean();
183+
in.readBoolean(); // obsolete, prior to tracking normalisedBytesParsed
187184
}
188185
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
189186
this.listExecutedPipelines = in.readBoolean();
@@ -196,21 +193,20 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
196193
}
197194
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
198195
requireDataStream = in.readBoolean();
199-
normalisedBytesParsed = in.readZLong();
200196
} else {
201197
requireDataStream = false;
202198
}
203199

204-
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
205-
originatesFromUpdateByScript = in.readBoolean();
206-
} else {
207-
originatesFromUpdateByScript = false;
208-
}
209-
210-
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
211-
originatesFromUpdateByDoc = in.readBoolean();
212-
} else {
213-
originatesFromUpdateByDoc = false;
200+
if (in.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) {
201+
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
202+
in.readZLong(); // obsolete normalisedBytesParsed
203+
}
204+
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
205+
in.readBoolean(); // obsolete originatesFromUpdateByScript
206+
}
207+
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
208+
in.readBoolean(); // obsolete originatesFromUpdateByDoc
209+
}
214210
}
215211
}
216212

@@ -759,7 +755,7 @@ private void writeBody(StreamOutput out) throws IOException {
759755
out.writeMap(dynamicTemplates, StreamOutput::writeString);
760756
if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
761757
&& out.getTransportVersion().before(TransportVersions.V_8_13_0)) {
762-
out.writeBoolean(normalisedBytesParsed != -1L);
758+
out.writeBoolean(false); // obsolete, prior to tracking normalisedBytesParsed
763759
}
764760
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
765761
out.writeBoolean(listExecutedPipelines);
@@ -770,15 +766,18 @@ private void writeBody(StreamOutput out) throws IOException {
770766

771767
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
772768
out.writeBoolean(requireDataStream);
773-
out.writeZLong(normalisedBytesParsed);
774-
}
775-
776-
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
777-
out.writeBoolean(originatesFromUpdateByScript);
778769
}
779770

780-
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
781-
out.writeBoolean(originatesFromUpdateByDoc);
771+
if (out.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) {
772+
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
773+
out.writeZLong(-1); // obsolete normalisedBytesParsed
774+
}
775+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
776+
out.writeBoolean(false); // obsolete originatesFromUpdateByScript
777+
}
778+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
779+
out.writeBoolean(false); // obsolete originatesFromUpdateByDoc
780+
}
782781
}
783782
}
784783

@@ -928,24 +927,6 @@ public void setRawTimestamp(Object rawTimestamp) {
928927
this.rawTimestamp = rawTimestamp;
929928
}
930929

931-
/**
932-
* Returns a number of bytes observed when parsing a document in earlier stages of ingestion (like update/ingest service)
933-
* Defaults to -1 when a document size was not observed in earlier stages.
934-
* @return a number of bytes observed
935-
*/
936-
public long getNormalisedBytesParsed() {
937-
return normalisedBytesParsed;
938-
}
939-
940-
/**
941-
* Sets number of bytes observed by a <code>DocumentSizeObserver</code>
942-
* @return an index request
943-
*/
944-
public IndexRequest setNormalisedBytesParsed(long normalisedBytesParsed) {
945-
this.normalisedBytesParsed = normalisedBytesParsed;
946-
return this;
947-
}
948-
949930
/**
950931
* Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true
951932
*
@@ -976,22 +957,4 @@ public List<String> getExecutedPipelines() {
976957
return Collections.unmodifiableList(executedPipelines);
977958
}
978959
}
979-
980-
public IndexRequest setOriginatesFromUpdateByScript(boolean originatesFromUpdateByScript) {
981-
this.originatesFromUpdateByScript = originatesFromUpdateByScript;
982-
return this;
983-
}
984-
985-
public boolean originatesFromUpdateByScript() {
986-
return originatesFromUpdateByScript;
987-
}
988-
989-
public boolean originatesFromUpdateByDoc() {
990-
return originatesFromUpdateByDoc;
991-
}
992-
993-
public IndexRequest setOriginatesFromUpdateByDoc(boolean originatesFromUpdateByDoc) {
994-
this.originatesFromUpdateByDoc = originatesFromUpdateByDoc;
995-
return this;
996-
}
997960
}

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
import org.elasticsearch.index.mapper.RoutingFieldMapper;
2929
import org.elasticsearch.index.shard.IndexShard;
3030
import org.elasticsearch.index.shard.ShardId;
31-
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
32-
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
31+
import org.elasticsearch.plugins.internal.XContentParserDecorator;
3332
import org.elasticsearch.script.Script;
3433
import org.elasticsearch.script.ScriptService;
3534
import org.elasticsearch.script.UpdateCtxMap;
@@ -51,11 +50,9 @@ public class UpdateHelper {
5150
private static final Logger logger = LogManager.getLogger(UpdateHelper.class);
5251

5352
private final ScriptService scriptService;
54-
private final DocumentParsingProvider documentParsingProvider;
5553

56-
public UpdateHelper(ScriptService scriptService, DocumentParsingProvider documentParsingProvider) {
54+
public UpdateHelper(ScriptService scriptService) {
5755
this.scriptService = scriptService;
58-
this.documentParsingProvider = documentParsingProvider;
5956
}
6057

6158
/**
@@ -183,14 +180,13 @@ static String calculateRouting(GetResult getResult, @Nullable IndexRequest updat
183180
Result prepareUpdateIndexRequest(IndexShard indexShard, UpdateRequest request, GetResult getResult, boolean detectNoop) {
184181
final IndexRequest currentRequest = request.doc();
185182
final String routing = calculateRouting(getResult, currentRequest);
186-
final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(request);
187183
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
188184
final XContentType updateSourceContentType = sourceAndContent.v1();
189185
final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();
190186

191187
final boolean noop = XContentHelper.update(
192188
updatedSourceAsMap,
193-
currentRequest.sourceAsMap(meteringParserDecorator),
189+
currentRequest.sourceAsMap(XContentParserDecorator.NOOP),
194190
detectNoop
195191
) == false;
196192

@@ -228,9 +224,7 @@ Result prepareUpdateIndexRequest(IndexShard indexShard, UpdateRequest request, G
228224
.setIfPrimaryTerm(getResult.getPrimaryTerm())
229225
.waitForActiveShards(request.waitForActiveShards())
230226
.timeout(request.timeout())
231-
.setRefreshPolicy(request.getRefreshPolicy())
232-
.setOriginatesFromUpdateByDoc(true);
233-
finalIndexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes());
227+
.setRefreshPolicy(request.getRefreshPolicy());
234228
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
235229
}
236230
}
@@ -272,8 +266,7 @@ Result prepareUpdateScriptRequest(IndexShard indexShard, UpdateRequest request,
272266
.setIfPrimaryTerm(getResult.getPrimaryTerm())
273267
.waitForActiveShards(request.waitForActiveShards())
274268
.timeout(request.timeout())
275-
.setRefreshPolicy(request.getRefreshPolicy())
276-
.setOriginatesFromUpdateByScript(true);
269+
.setRefreshPolicy(request.getRefreshPolicy());
277270
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
278271
}
279272
case DELETE -> {

server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
8080
final RootDocumentParserContext context;
8181
final XContentType xContentType = source.getXContentType();
8282

83-
XContentMeteringParserDecorator meteringParserDecorator = source.getDocumentSizeObserver();
83+
XContentMeteringParserDecorator meteringParserDecorator = source.getMeteringParserDecorator();
8484
try (
8585
XContentParser parser = meteringParserDecorator.decorate(
8686
XContentHelper.createParser(parserConfiguration, source.source(), xContentType)

0 commit comments

Comments
 (0)