Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

package org.elasticsearch.plugins.internal;

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
Expand Down Expand Up @@ -126,7 +125,7 @@ public TestDocumentParsingProviderPlugin() {}
public DocumentParsingProvider getDocumentParsingProvider() {
return new DocumentParsingProvider() {
@Override
public <T> XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest<T> request) {
public <T> XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) {
return new TestXContentMeteringParserDecorator(0L);
}

Expand All @@ -152,8 +151,8 @@ public TestDocumentSizeReporter(String indexName) {

@Override
public void onIndexingCompleted(ParsedDocument parsedDocument) {
long delta = parsedDocument.getNormalizedSize().ingestedBytes();
if (delta > 0) {
long delta = parsedDocument.getNormalizedSize();
if (delta > XContentMeteringParserDecorator.UNKNOWN_SIZE) {
COUNTER.addAndGet(delta);
}
assertThat(indexName, equalTo(TEST_INDEX_NAME));
Expand Down Expand Up @@ -181,8 +180,8 @@ public Token nextToken() throws IOException {
}

@Override
public ParsedDocument.DocumentSize meteredDocumentSize() {
return new ParsedDocument.DocumentSize(counter, counter);
public long meteredDocumentSize() {
return counter;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED = def(8_775_00_0);
public static final TransportVersion INFERENCE_DONT_PERSIST_ON_READ = def(8_776_00_0);
public static final TransportVersion SIMULATE_MAPPING_ADDITION = def(8_777_00_0);
public static final TransportVersion INDEX_REQUEST_REMOVE_METERING = def(8_778_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
* rawTimestamp field is used on the coordinate node, it doesn't need to be serialised.
*/
private Object rawTimestamp;
private long normalisedBytesParsed = -1;
private boolean originatesFromUpdateByScript;
private boolean originatesFromUpdateByDoc;

public IndexRequest(StreamInput in) throws IOException {
this(null, in);
Expand Down Expand Up @@ -183,7 +180,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
dynamicTemplates = in.readMap(StreamInput::readString);
if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
&& in.getTransportVersion().before(TransportVersions.V_8_13_0)) {
in.readBoolean();
in.readBoolean(); // obsolete, prior to tracking normalisedBytesParsed
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
this.listExecutedPipelines = in.readBoolean();
Expand All @@ -196,21 +193,20 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio
}
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
requireDataStream = in.readBoolean();
normalisedBytesParsed = in.readZLong();
} else {
requireDataStream = false;
}

if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
originatesFromUpdateByScript = in.readBoolean();
} else {
originatesFromUpdateByScript = false;
}

if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
originatesFromUpdateByDoc = in.readBoolean();
} else {
originatesFromUpdateByDoc = false;
if (in.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
in.readZLong(); // obsolete normalisedBytesParsed
}
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
in.readBoolean(); // obsolete originatesFromUpdateByScript
}
if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
in.readBoolean(); // obsolete originatesFromUpdateByDoc
}
}
}

Expand Down Expand Up @@ -759,7 +755,7 @@ private void writeBody(StreamOutput out) throws IOException {
out.writeMap(dynamicTemplates, StreamOutput::writeString);
if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED)
&& out.getTransportVersion().before(TransportVersions.V_8_13_0)) {
out.writeBoolean(normalisedBytesParsed != -1L);
out.writeBoolean(false); // obsolete, prior to tracking normalisedBytesParsed
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeBoolean(listExecutedPipelines);
Expand All @@ -770,15 +766,18 @@ private void writeBody(StreamOutput out) throws IOException {

if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
out.writeBoolean(requireDataStream);
out.writeZLong(normalisedBytesParsed);
}

if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
out.writeBoolean(originatesFromUpdateByScript);
}

if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
out.writeBoolean(originatesFromUpdateByDoc);
if (out.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
out.writeZLong(-1); // obsolete normalisedBytesParsed
}
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) {
out.writeBoolean(false); // obsolete originatesFromUpdateByScript
}
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) {
out.writeBoolean(false); // obsolete originatesFromUpdateByDoc
}
}
}

Expand Down Expand Up @@ -928,24 +927,6 @@ public void setRawTimestamp(Object rawTimestamp) {
this.rawTimestamp = rawTimestamp;
}

/**
* Returns a number of bytes observed when parsing a document in earlier stages of ingestion (like update/ingest service)
* Defaults to -1 when a document size was not observed in earlier stages.
* @return a number of bytes observed
*/
public long getNormalisedBytesParsed() {
return normalisedBytesParsed;
}

/**
* Sets number of bytes observed by a <code>DocumentSizeObserver</code>
* @return an index request
*/
public IndexRequest setNormalisedBytesParsed(long normalisedBytesParsed) {
this.normalisedBytesParsed = normalisedBytesParsed;
return this;
}

/**
* Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true
*
Expand Down Expand Up @@ -976,22 +957,4 @@ public List<String> getExecutedPipelines() {
return Collections.unmodifiableList(executedPipelines);
}
}

public IndexRequest setOriginatesFromUpdateByScript(boolean originatesFromUpdateByScript) {
this.originatesFromUpdateByScript = originatesFromUpdateByScript;
return this;
}

public boolean originatesFromUpdateByScript() {
return originatesFromUpdateByScript;
}

public boolean originatesFromUpdateByDoc() {
return originatesFromUpdateByDoc;
}

public IndexRequest setOriginatesFromUpdateByDoc(boolean originatesFromUpdateByDoc) {
this.originatesFromUpdateByDoc = originatesFromUpdateByDoc;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
import org.elasticsearch.index.mapper.RoutingFieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.internal.DocumentParsingProvider;
import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator;
import org.elasticsearch.plugins.internal.XContentParserDecorator;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.UpdateCtxMap;
Expand All @@ -51,11 +50,9 @@ public class UpdateHelper {
private static final Logger logger = LogManager.getLogger(UpdateHelper.class);

private final ScriptService scriptService;
private final DocumentParsingProvider documentParsingProvider;

public UpdateHelper(ScriptService scriptService, DocumentParsingProvider documentParsingProvider) {
public UpdateHelper(ScriptService scriptService) {
this.scriptService = scriptService;
this.documentParsingProvider = documentParsingProvider;
}

/**
Expand Down Expand Up @@ -183,14 +180,13 @@ static String calculateRouting(GetResult getResult, @Nullable IndexRequest updat
Result prepareUpdateIndexRequest(IndexShard indexShard, UpdateRequest request, GetResult getResult, boolean detectNoop) {
final IndexRequest currentRequest = request.doc();
final String routing = calculateRouting(getResult, currentRequest);
final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(request);
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
final XContentType updateSourceContentType = sourceAndContent.v1();
final Map<String, Object> updatedSourceAsMap = sourceAndContent.v2();

final boolean noop = XContentHelper.update(
updatedSourceAsMap,
currentRequest.sourceAsMap(meteringParserDecorator),
currentRequest.sourceAsMap(XContentParserDecorator.NOOP),
detectNoop
) == false;

Expand Down Expand Up @@ -228,9 +224,7 @@ Result prepareUpdateIndexRequest(IndexShard indexShard, UpdateRequest request, G
.setIfPrimaryTerm(getResult.getPrimaryTerm())
.waitForActiveShards(request.waitForActiveShards())
.timeout(request.timeout())
.setRefreshPolicy(request.getRefreshPolicy())
.setOriginatesFromUpdateByDoc(true);
finalIndexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes());
.setRefreshPolicy(request.getRefreshPolicy());
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
}
}
Expand Down Expand Up @@ -272,8 +266,7 @@ Result prepareUpdateScriptRequest(IndexShard indexShard, UpdateRequest request,
.setIfPrimaryTerm(getResult.getPrimaryTerm())
.waitForActiveShards(request.waitForActiveShards())
.timeout(request.timeout())
.setRefreshPolicy(request.getRefreshPolicy())
.setOriginatesFromUpdateByScript(true);
.setRefreshPolicy(request.getRefreshPolicy());
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
}
case DELETE -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL
final RootDocumentParserContext context;
final XContentType xContentType = source.getXContentType();

XContentMeteringParserDecorator meteringParserDecorator = source.getDocumentSizeObserver();
XContentMeteringParserDecorator meteringParserDecorator = source.getMeteringParserDecorator();
try (
XContentParser parser = meteringParserDecorator.decorate(
XContentHelper.createParser(parserConfiguration, source.source(), xContentType)
Expand Down
Loading
Loading