diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java deleted file mode 100644 index 3547b3f9910ad..0000000000000 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorWithPipelinesIT.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the "Elastic License - * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side - * Public License v 1"; you may not use this file except in compliance with, at - * your election, the "Elastic License 2.0", the "GNU Affero General Public - * License v3.0 only", or the "Server Side Public License, v 1". - */ - -package org.elasticsearch.plugins.internal; - -import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.index.mapper.MapperService; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.ingest.common.IngestCommonPlugin; -import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.xcontent.FilterXContentParserWrapper; -import org.elasticsearch.xcontent.XContentParser; - -import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; -import static org.hamcrest.Matchers.equalTo; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) -public class XContentMeteringParserDecoratorWithPipelinesIT extends ESIntegTestCase { - - private static String TEST_INDEX_NAME = "test-index-name"; - // the assertions are done in plugin which is static and will be created by ES server. - // hence a static flag to make sure it is indeed used - public static volatile boolean hasWrappedParser; - public static AtomicLong providedFixedSize = new AtomicLong(); - - public void testDocumentIsReportedWithPipelines() throws Exception { - hasWrappedParser = false; - // pipeline adding fields, changing destination is not affecting reporting - putJsonPipeline("pipeline", """ - { - "processors": [ - { - "set": { - "field": "my-text-field", - "value": "xxxx" - } - }, - { - "set": { - "field": "my-boolean-field", - "value": true - } - } - ] - } - """); - - client().index( - new IndexRequest(TEST_INDEX_NAME).setPipeline("pipeline") - .id("1") - .source(jsonBuilder().startObject().field("test", "I am sam i am").endObject()) - ).actionGet(); - assertBusy(() -> { - // ingest node has used an observer that was counting #map operations - // and passed that info to newFixedSize observer in TransportShardBulkAction - assertTrue(hasWrappedParser); - assertThat(providedFixedSize.get(), equalTo(1L)); - }); - } - - @Override - protected Collection> nodePlugins() { - return List.of(TestDocumentParsingProviderPlugin.class, IngestCommonPlugin.class); - } - - public static class TestDocumentParsingProviderPlugin extends Plugin implements DocumentParsingProviderPlugin, IngestPlugin { - - public TestDocumentParsingProviderPlugin() {} - - @Override - public DocumentParsingProvider getDocumentParsingProvider() { - // returns a static instance, because we want to assert that the wrapping is called only once - return new DocumentParsingProvider() { - @Override - public XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest request) { - if (request instanceof IndexRequest indexRequest && indexRequest.getNormalisedBytesParsed() > 0) { - long normalisedBytesParsed = indexRequest.getNormalisedBytesParsed(); - providedFixedSize.set(normalisedBytesParsed); - return new TestXContentMeteringParserDecorator(normalisedBytesParsed); - } - return new TestXContentMeteringParserDecorator(0L); - } - - @Override - public DocumentSizeReporter newDocumentSizeReporter( - String indexName, - MapperService mapperService, - DocumentSizeAccumulator documentSizeAccumulator - ) { - return DocumentSizeReporter.EMPTY_INSTANCE; - } - }; - } - } - - public static class TestXContentMeteringParserDecorator implements XContentMeteringParserDecorator { - long mapCounter = 0; - - public TestXContentMeteringParserDecorator(long mapCounter) { - this.mapCounter = mapCounter; - } - - @Override - public XContentParser decorate(XContentParser xContentParser) { - hasWrappedParser = true; - return new FilterXContentParserWrapper(xContentParser) { - - @Override - public Map map() throws IOException { - mapCounter++; - return super.map(); - } - }; - } - - @Override - public ParsedDocument.DocumentSize meteredDocumentSize() { - return new ParsedDocument.DocumentSize(mapCounter, 0); - } - } - -} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java index f11c145f71f23..f70667b91aec8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecoratorIT.java @@ -9,7 +9,6 @@ package org.elasticsearch.plugins.internal; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; @@ -126,7 +125,7 @@ public TestDocumentParsingProviderPlugin() {} public DocumentParsingProvider getDocumentParsingProvider() { return new DocumentParsingProvider() { @Override - public XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest request) { + public XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) { return new TestXContentMeteringParserDecorator(0L); } @@ -152,8 +151,8 @@ public TestDocumentSizeReporter(String indexName) { @Override public void onIndexingCompleted(ParsedDocument parsedDocument) { - long delta = parsedDocument.getNormalizedSize().ingestedBytes(); - if (delta > 0) { + long delta = parsedDocument.getNormalizedSize(); + if (delta > XContentMeteringParserDecorator.UNKNOWN_SIZE) { COUNTER.addAndGet(delta); } assertThat(indexName, equalTo(TEST_INDEX_NAME)); @@ -181,8 +180,8 @@ public Token nextToken() throws IOException { } @Override - public ParsedDocument.DocumentSize meteredDocumentSize() { - return new ParsedDocument.DocumentSize(counter, counter); + public long meteredDocumentSize() { + return counter; } } } diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 25bb792d827a9..3986ea4b97254 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -182,6 +182,7 @@ static TransportVersion def(int id) { public static final TransportVersion SIMULATE_MAPPING_ADDITION = def(8_777_00_0); public static final TransportVersion INTRODUCE_ALL_APPLICABLE_SELECTOR = def(8_778_00_0); public static final TransportVersion INDEX_MODE_LOOKUP = def(8_779_00_0); + public static final TransportVersion INDEX_REQUEST_REMOVE_METERING = def(8_780_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java index b98f5d87ee232..d0785a60dd0f5 100644 --- a/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/server/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -146,9 +146,6 @@ public class IndexRequest extends ReplicatedWriteRequest implement * rawTimestamp field is used on the coordinate node, it doesn't need to be serialised. */ private Object rawTimestamp; - private long normalisedBytesParsed = -1; - private boolean originatesFromUpdateByScript; - private boolean originatesFromUpdateByDoc; public IndexRequest(StreamInput in) throws IOException { this(null, in); @@ -183,7 +180,7 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio dynamicTemplates = in.readMap(StreamInput::readString); if (in.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED) && in.getTransportVersion().before(TransportVersions.V_8_13_0)) { - in.readBoolean(); + in.readBoolean(); // obsolete, prior to tracking normalisedBytesParsed } if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { this.listExecutedPipelines = in.readBoolean(); @@ -196,21 +193,20 @@ public IndexRequest(@Nullable ShardId shardId, StreamInput in) throws IOExceptio } if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { requireDataStream = in.readBoolean(); - normalisedBytesParsed = in.readZLong(); } else { requireDataStream = false; } - if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) { - originatesFromUpdateByScript = in.readBoolean(); - } else { - originatesFromUpdateByScript = false; - } - - if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) { - originatesFromUpdateByDoc = in.readBoolean(); - } else { - originatesFromUpdateByDoc = false; + if (in.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) { + if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { + in.readZLong(); // obsolete normalisedBytesParsed + } + if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) { + in.readBoolean(); // obsolete originatesFromUpdateByScript + } + if (in.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) { + in.readBoolean(); // obsolete originatesFromUpdateByDoc + } } } @@ -759,7 +755,7 @@ private void writeBody(StreamOutput out) throws IOException { out.writeMap(dynamicTemplates, StreamOutput::writeString); if (out.getTransportVersion().onOrAfter(PIPELINES_HAVE_RUN_FIELD_ADDED) && out.getTransportVersion().before(TransportVersions.V_8_13_0)) { - out.writeBoolean(normalisedBytesParsed != -1L); + out.writeBoolean(false); // obsolete, prior to tracking normalisedBytesParsed } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) { out.writeBoolean(listExecutedPipelines); @@ -770,15 +766,18 @@ private void writeBody(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { out.writeBoolean(requireDataStream); - out.writeZLong(normalisedBytesParsed); - } - - if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) { - out.writeBoolean(originatesFromUpdateByScript); } - if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) { - out.writeBoolean(originatesFromUpdateByDoc); + if (out.getTransportVersion().before(TransportVersions.INDEX_REQUEST_REMOVE_METERING)) { + if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { + out.writeZLong(-1); // obsolete normalisedBytesParsed + } + if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_SCRIPT_ORIGIN)) { + out.writeBoolean(false); // obsolete originatesFromUpdateByScript + } + if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_REQUEST_UPDATE_BY_DOC_ORIGIN)) { + out.writeBoolean(false); // obsolete originatesFromUpdateByDoc + } } } @@ -928,24 +927,6 @@ public void setRawTimestamp(Object rawTimestamp) { this.rawTimestamp = rawTimestamp; } - /** - * Returns a number of bytes observed when parsing a document in earlier stages of ingestion (like update/ingest service) - * Defaults to -1 when a document size was not observed in earlier stages. - * @return a number of bytes observed - */ - public long getNormalisedBytesParsed() { - return normalisedBytesParsed; - } - - /** - * Sets number of bytes observed by a DocumentSizeObserver - * @return an index request - */ - public IndexRequest setNormalisedBytesParsed(long normalisedBytesParsed) { - this.normalisedBytesParsed = normalisedBytesParsed; - return this; - } - /** * Adds the pipeline to the list of executed pipelines, if listExecutedPipelines is true * @@ -976,22 +957,4 @@ public List getExecutedPipelines() { return Collections.unmodifiableList(executedPipelines); } } - - public IndexRequest setOriginatesFromUpdateByScript(boolean originatesFromUpdateByScript) { - this.originatesFromUpdateByScript = originatesFromUpdateByScript; - return this; - } - - public boolean originatesFromUpdateByScript() { - return originatesFromUpdateByScript; - } - - public boolean originatesFromUpdateByDoc() { - return originatesFromUpdateByDoc; - } - - public IndexRequest setOriginatesFromUpdateByDoc(boolean originatesFromUpdateByDoc) { - this.originatesFromUpdateByDoc = originatesFromUpdateByDoc; - return this; - } } diff --git a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 212b99ca140d3..d32e102b2e18b 100644 --- a/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -28,8 +28,7 @@ import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; -import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; +import org.elasticsearch.plugins.internal.XContentParserDecorator; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.UpdateCtxMap; @@ -51,11 +50,9 @@ public class UpdateHelper { private static final Logger logger = LogManager.getLogger(UpdateHelper.class); private final ScriptService scriptService; - private final DocumentParsingProvider documentParsingProvider; - public UpdateHelper(ScriptService scriptService, DocumentParsingProvider documentParsingProvider) { + public UpdateHelper(ScriptService scriptService) { this.scriptService = scriptService; - this.documentParsingProvider = documentParsingProvider; } /** @@ -183,14 +180,13 @@ static String calculateRouting(GetResult getResult, @Nullable IndexRequest updat Result prepareUpdateIndexRequest(IndexShard indexShard, UpdateRequest request, GetResult getResult, boolean detectNoop) { final IndexRequest currentRequest = request.doc(); final String routing = calculateRouting(getResult, currentRequest); - final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator(request); final Tuple> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true); final XContentType updateSourceContentType = sourceAndContent.v1(); final Map updatedSourceAsMap = sourceAndContent.v2(); final boolean noop = XContentHelper.update( updatedSourceAsMap, - currentRequest.sourceAsMap(meteringParserDecorator), + currentRequest.sourceAsMap(XContentParserDecorator.NOOP), detectNoop ) == false; @@ -228,9 +224,7 @@ Result prepareUpdateIndexRequest(IndexShard indexShard, UpdateRequest request, G .setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()) .timeout(request.timeout()) - .setRefreshPolicy(request.getRefreshPolicy()) - .setOriginatesFromUpdateByDoc(true); - finalIndexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes()); + .setRefreshPolicy(request.getRefreshPolicy()); return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } } @@ -272,8 +266,7 @@ Result prepareUpdateScriptRequest(IndexShard indexShard, UpdateRequest request, .setIfPrimaryTerm(getResult.getPrimaryTerm()) .waitForActiveShards(request.waitForActiveShards()) .timeout(request.timeout()) - .setRefreshPolicy(request.getRefreshPolicy()) - .setOriginatesFromUpdateByScript(true); + .setRefreshPolicy(request.getRefreshPolicy()); return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType); } case DELETE -> { diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index 1ed0a117ddd89..bde9b0fb8a4ab 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -80,7 +80,7 @@ public ParsedDocument parseDocument(SourceToParse source, MappingLookup mappingL final RootDocumentParserContext context; final XContentType xContentType = source.getXContentType(); - XContentMeteringParserDecorator meteringParserDecorator = source.getDocumentSizeObserver(); + XContentMeteringParserDecorator meteringParserDecorator = source.getMeteringParserDecorator(); try ( XContentParser parser = meteringParserDecorator.decorate( XContentHelper.createParser(parserConfiguration, source.source(), xContentType) diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java index b1d882f04de54..f2ddf38fe4357 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.index.mapper.MapperService.MergeReason; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.xcontent.XContentType; import java.util.Collections; @@ -24,6 +25,7 @@ * The result of parsing a document. */ public class ParsedDocument { + private final Field version; private final String id; @@ -33,7 +35,7 @@ public class ParsedDocument { private final List documents; - private final DocumentSize normalizedSize; + private final long normalizedSize; private BytesReference source; private XContentType xContentType; @@ -61,7 +63,7 @@ public static ParsedDocument noopTombstone(String reason) { new BytesArray("{}"), XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); } @@ -86,7 +88,7 @@ public static ParsedDocument deleteTombstone(String id) { new BytesArray("{}"), XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); } @@ -99,7 +101,7 @@ public ParsedDocument( BytesReference source, XContentType xContentType, Mapping dynamicMappingsUpdate, - DocumentSize normalizedSize + long normalizedSize ) { this.version = version; this.seqID = seqID; @@ -178,16 +180,7 @@ public String documentDescription() { return "id"; } - public DocumentSize getNormalizedSize() { + public long getNormalizedSize() { return normalizedSize; } - - /** - * Normalized ingested and stored size of a document. - * @param ingestedBytes ingest size of the document - * @param storedBytes stored retained size of the document - */ - public record DocumentSize(long ingestedBytes, long storedBytes) { - public static final DocumentSize UNKNOWN = new DocumentSize(-1, -1); - } } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index a8cb03c223833..879e0fe785df2 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -91,7 +91,7 @@ public XContentType getXContentType() { return this.xContentType; } - public XContentMeteringParserDecorator getDocumentSizeObserver() { + public XContentMeteringParserDecorator getMeteringParserDecorator() { return meteringParserDecorator; } } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 99ff44a3cd135..b5ac54b018e46 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -68,8 +68,6 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.node.ReportingService; import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; -import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.plugins.internal.XContentParserDecorator; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; @@ -121,7 +119,6 @@ public class IngestService implements ClusterStateApplier, ReportingService taskQueue; private final ClusterService clusterService; private final ScriptService scriptService; - private final DocumentParsingProvider documentParsingProvider; private final Map processorFactories; // Ideally this should be in IngestMetadata class, but we don't have the processor factories around there. // We know of all the processor factories when a node with all its plugin have been initialized. Also some @@ -204,12 +201,10 @@ public IngestService( List ingestPlugins, Client client, MatcherWatchdog matcherWatchdog, - DocumentParsingProvider documentParsingProvider, FailureStoreMetrics failureStoreMetrics ) { this.clusterService = clusterService; this.scriptService = scriptService; - this.documentParsingProvider = documentParsingProvider; this.processorFactories = processorFactories( ingestPlugins, new Processor.Parameters( @@ -238,7 +233,6 @@ public IngestService( IngestService(IngestService ingestService) { this.clusterService = ingestService.clusterService; this.scriptService = ingestService.scriptService; - this.documentParsingProvider = ingestService.documentParsingProvider; this.processorFactories = ingestService.processorFactories; this.threadPool = ingestService.threadPool; this.taskQueue = ingestService.taskQueue; @@ -776,10 +770,7 @@ protected void doRun() { } final int slot = i; final Releasable ref = refs.acquire(); - final XContentMeteringParserDecorator meteringParserDecorator = documentParsingProvider.newMeteringParserDecorator( - indexRequest - ); - final IngestDocument ingestDocument = newIngestDocument(indexRequest, meteringParserDecorator); + final IngestDocument ingestDocument = newIngestDocument(indexRequest); final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone(); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). @@ -820,7 +811,6 @@ public void onFailure(Exception e) { ); executePipelines(pipelines, indexRequest, ingestDocument, resolveFailureStore, documentListener); - indexRequest.setNormalisedBytesParsed(meteringParserDecorator.meteredDocumentSize().ingestedBytes()); assert actionRequest.index() != null; i++; @@ -1159,14 +1149,14 @@ static String getProcessorName(Processor processor) { /** * Builds a new ingest document from the passed-in index request. */ - private static IngestDocument newIngestDocument(final IndexRequest request, XContentParserDecorator parserDecorator) { + private static IngestDocument newIngestDocument(final IndexRequest request) { return new IngestDocument( request.index(), request.id(), request.version(), request.routing(), request.versionType(), - request.sourceAsMap(parserDecorator) + request.sourceAsMap(XContentParserDecorator.NOOP) ); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 784e02059823b..0a88a202ac8d3 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -285,7 +285,7 @@ static NodeConstruction prepareConstruction( ScriptService scriptService = constructor.createScriptService(settingsModule, threadPool, serviceProvider); - constructor.createUpdateHelper(documentParsingProvider, scriptService); + constructor.createUpdateHelper(scriptService); constructor.construct( threadPool, @@ -643,10 +643,10 @@ private DataStreamGlobalRetentionSettings createDataStreamServicesAndGlobalReten return dataStreamGlobalRetentionSettings; } - private UpdateHelper createUpdateHelper(DocumentParsingProvider documentParsingProvider, ScriptService scriptService) { - UpdateHelper updateHelper = new UpdateHelper(scriptService, documentParsingProvider); + private UpdateHelper createUpdateHelper(ScriptService scriptService) { + UpdateHelper updateHelper = new UpdateHelper(scriptService); - modules.add(b -> { b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService, documentParsingProvider)); }); + modules.add(b -> b.bind(UpdateHelper.class).toInstance(updateHelper)); return updateHelper; } @@ -701,7 +701,6 @@ private void construct( pluginsService.filterPlugins(IngestPlugin.class).toList(), client, IngestService.createGrokThreadWatchdog(environment, threadPool), - documentParsingProvider, failureStoreMetrics ); diff --git a/server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProvider.java b/server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProvider.java index e1613caf9deac..9df7fd4c3bd43 100644 --- a/server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProvider.java +++ b/server/src/main/java/org/elasticsearch/plugins/internal/DocumentParsingProvider.java @@ -9,7 +9,7 @@ package org.elasticsearch.plugins.internal; -import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.index.mapper.MapperService; /** @@ -40,7 +40,7 @@ default DocumentSizeAccumulator createDocumentSizeAccumulator() { /** * @return an observer */ - default XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest request) { + default XContentMeteringParserDecorator newMeteringParserDecorator(IndexRequest request) { return XContentMeteringParserDecorator.NOOP; } } diff --git a/server/src/main/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecorator.java b/server/src/main/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecorator.java index e3b4415edcc01..6ccdac19acb91 100644 --- a/server/src/main/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecorator.java +++ b/server/src/main/java/org/elasticsearch/plugins/internal/XContentMeteringParserDecorator.java @@ -9,17 +9,17 @@ package org.elasticsearch.plugins.internal; -import org.elasticsearch.index.mapper.ParsedDocument.DocumentSize; import org.elasticsearch.xcontent.XContentParser; public interface XContentMeteringParserDecorator extends XContentParserDecorator { + long UNKNOWN_SIZE = -1; /** * a default noop implementation */ XContentMeteringParserDecorator NOOP = new XContentMeteringParserDecorator() { @Override - public DocumentSize meteredDocumentSize() { - return DocumentSize.UNKNOWN; + public long meteredDocumentSize() { + return UNKNOWN_SIZE; } @Override @@ -28,5 +28,5 @@ public XContentParser decorate(XContentParser xContentParser) { } }; - DocumentSize meteredDocumentSize(); + long meteredDocumentSize(); } diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 35ef892da59a2..b389e33993b9b 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -49,11 +49,11 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.plugins.internal.DocumentParsingProvider; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; -import org.mockito.ArgumentCaptor; import org.mockito.MockingDetails; import org.mockito.Mockito; import org.mockito.stubbing.Stubbing; @@ -114,13 +114,18 @@ public void testExecuteBulkIndexRequest() throws Exception { BulkItemRequest[] items = new BulkItemRequest[1]; boolean create = randomBoolean(); - DocWriteRequest writeRequest = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE).create(create); + IndexRequest writeRequest = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE).create(create); BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); items[0] = primaryRequest; BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); randomlySetIgnoredPrimaryResponse(primaryRequest); + DocumentParsingProvider documentParsingProvider = mock(); + XContentMeteringParserDecorator parserDecorator = mock(); + when(documentParsingProvider.newMeteringParserDecorator(any())).thenReturn(parserDecorator); + when(parserDecorator.decorate(any())).then(i -> i.getArgument(0)); + BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest( context, @@ -129,7 +134,7 @@ public void testExecuteBulkIndexRequest() throws Exception { new NoopMappingUpdatePerformer(), (listener, mappingVersion) -> {}, ASSERTING_DONE_LISTENER, - DocumentParsingProvider.EMPTY_INSTANCE + documentParsingProvider ); assertFalse(context.hasMoreOperationsToExecute()); @@ -185,6 +190,8 @@ public void testExecuteBulkIndexRequest() throws Exception { assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); assertThat(replicaRequest, equalTo(primaryRequest)); + verify(documentParsingProvider).newMeteringParserDecorator(any()); + verify(parserDecorator).decorate(any()); // Assert that the document count is still 1 assertDocCount(shard, 1); @@ -600,9 +607,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { .retryOnConflict(retries); BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - IndexRequest updateResponse = new IndexRequest("index").id("id") - .source(Requests.INDEX_CONTENT_TYPE, "field", "value") - .setNormalisedBytesParsed(0);// let's pretend this was modified by a script + IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); DocumentParsingProvider documentParsingProvider = mock(DocumentParsingProvider.class); Exception err = new VersionConflictEngineException(shardId, "id", "I'm conflicted <(;_;)>"); @@ -655,11 +660,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { assertThat(failure.getCause(), equalTo(err)); assertThat(failure.getStatus(), equalTo(RestStatus.CONFLICT)); - // we have set 0 value on normalisedBytesParsed on the IndexRequest, like it happens with updates by script. - ArgumentCaptor argument = ArgumentCaptor.forClass(IndexRequest.class); - verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(argument.capture()); - IndexRequest value = argument.getValue(); - assertThat(value.getNormalisedBytesParsed(), equalTo(0L)); + verify(documentParsingProvider, times(retries + 1)).newMeteringParserDecorator(any()); } @SuppressWarnings("unchecked") @@ -668,9 +669,7 @@ public void testUpdateRequestWithSuccess() throws Exception { DocWriteRequest writeRequest = new UpdateRequest("index", "id").doc(Requests.INDEX_CONTENT_TYPE, "field", "value"); BulkItemRequest primaryRequest = new BulkItemRequest(0, writeRequest); - IndexRequest updateResponse = new IndexRequest("index").id("id") - .source(Requests.INDEX_CONTENT_TYPE, "field", "value") - .setNormalisedBytesParsed(100L); + IndexRequest updateResponse = new IndexRequest("index").id("id").source(Requests.INDEX_CONTENT_TYPE, "field", "value"); DocumentParsingProvider documentParsingProvider = mock(DocumentParsingProvider.class); boolean created = randomBoolean(); @@ -721,10 +720,7 @@ public void testUpdateRequestWithSuccess() throws Exception { assertThat(response.status(), equalTo(created ? RestStatus.CREATED : RestStatus.OK)); assertThat(response.getSeqNo(), equalTo(13L)); - ArgumentCaptor argument = ArgumentCaptor.forClass(IndexRequest.class); - verify(documentParsingProvider, times(1)).newMeteringParserDecorator(argument.capture()); - IndexRequest value = argument.getValue(); - assertThat(value.getNormalisedBytesParsed(), equalTo(100L)); + verify(documentParsingProvider).newMeteringParserDecorator(updateResponse); } public void testUpdateWithDelete() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java b/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java index 9729b653ae3d2..331f754d437a7 100644 --- a/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/ingest/ReservedPipelineActionTests.java @@ -31,7 +31,6 @@ import org.elasticsearch.ingest.Processor; import org.elasticsearch.ingest.ProcessorInfo; import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; import org.elasticsearch.reservedstate.TransformState; import org.elasticsearch.reservedstate.service.FileSettingsService; import org.elasticsearch.reservedstate.service.ReservedClusterStateService; @@ -94,7 +93,6 @@ public void setup() { Collections.singletonList(DUMMY_PLUGIN), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ); Map factories = ingestService.getProcessorFactories(); diff --git a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java index d8960bd902ac5..0cc2dcf38e8ff 100644 --- a/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptEngine; @@ -121,7 +120,7 @@ public void setUp() throws Exception { final MockScriptEngine engine = new MockScriptEngine("mock", scripts, Collections.emptyMap()); Map engines = Collections.singletonMap(engine.getType(), engine); ScriptService scriptService = new ScriptService(baseSettings, engines, ScriptModule.CORE_CONTEXTS, () -> 1L); - updateHelper = new UpdateHelper(scriptService, DocumentParsingProvider.EMPTY_INSTANCE); + updateHelper = new UpdateHelper(scriptService); } @SuppressWarnings("unchecked") @@ -594,7 +593,7 @@ public void testNoopDetection() throws Exception { try (var parser = createParser(JsonXContent.jsonXContent, new BytesArray("{\"doc\": {\"body\": \"foo\"}}"))) { request = new UpdateRequest("test", "1").fromXContent(parser); } - UpdateHelper updateHelper = new UpdateHelper(mock(ScriptService.class), DocumentParsingProvider.EMPTY_INSTANCE); + UpdateHelper updateHelper = new UpdateHelper(mock(ScriptService.class)); UpdateHelper.Result result = updateHelper.prepareUpdateIndexRequest(indexShard, request, getResult, true); assertThat(result.action(), instanceOf(UpdateResponse.class)); diff --git a/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java b/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java index 753602e73a30a..c626be7983c46 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexingSlowLogTests.java @@ -28,10 +28,10 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.ParsedDocument.DocumentSize; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentParseException; import org.elasticsearch.xcontent.XContentType; @@ -217,7 +217,7 @@ public void testSlowLogMessageHasJsonFields() throws IOException { source, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); Index index = new Index("foo", "123"); // Turning off document logging doesn't log source[] @@ -246,7 +246,7 @@ public void testSlowLogMessageHasAdditionalFields() throws IOException { source, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); Index index = new Index("foo", "123"); // Turning off document logging doesn't log source[] @@ -276,7 +276,7 @@ public void testEmptyRoutingField() throws IOException { source, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); Index index = new Index("foo", "123"); @@ -295,7 +295,7 @@ public void testSlowLogParsedDocumentPrinterSourceToLog() throws IOException { source, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); Index index = new Index("foo", "123"); // Turning off document logging doesn't log source[] @@ -327,7 +327,7 @@ public void testSlowLogParsedDocumentPrinterSourceToLog() throws IOException { source, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); final XContentParseException e = expectThrows( diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 21aefd893de70..bba1fa338559f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -109,7 +109,6 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MappingLookup; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.ParsedDocument.DocumentSize; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; @@ -132,6 +131,7 @@ import org.elasticsearch.index.translog.TranslogOperationsUtils; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.index.IndexVersionUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -5522,7 +5522,7 @@ public void testSeqNoGenerator() throws IOException { source, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); final Engine.Index index = new Engine.Index( diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index ccf0bbebcc354..9e7f5fbbce1a3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -45,7 +45,6 @@ import org.elasticsearch.index.mapper.LuceneDocument; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.ParsedDocument.DocumentSize; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.RetentionLeases; @@ -54,6 +53,7 @@ import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -567,7 +567,7 @@ private Engine.IndexResult index(String id, String testFieldValue) throws IOExce source, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); Engine.Index index = new Engine.Index(uid, engine.config().getPrimaryTermSupplier().getAsLong(), doc); return engine.index(index); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index d0cabd609158b..97f49df41d099 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -56,7 +56,6 @@ import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.LuceneDocument; import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.mapper.ParsedDocument.DocumentSize; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -64,6 +63,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog.Location; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.TransportVersionUtils; @@ -3395,7 +3395,7 @@ public void testTranslogOpSerialization() throws Exception { B_1, XContentType.JSON, null, - DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); Engine.Index eIndex = new Engine.Index( diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index d83fdbd5dd46b..b3ddc313eaf3a 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -54,10 +54,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; -import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.script.MockScriptEngine; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptModule; @@ -68,7 +65,6 @@ import org.elasticsearch.test.MockLog; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.cbor.CborXContent; import org.junit.Before; @@ -157,7 +153,6 @@ public void testIngestPlugin() { List.of(DUMMY_PLUGIN), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ); Map factories = ingestService.getProcessorFactories(); @@ -178,7 +173,6 @@ public void testIngestPluginDuplicate() { List.of(DUMMY_PLUGIN, DUMMY_PLUGIN), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ) ); @@ -196,7 +190,6 @@ public void testExecuteIndexPipelineDoesNotExist() { List.of(DUMMY_PLUGIN), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") @@ -1194,66 +1187,6 @@ public void testExecuteBulkPipelineDoesNotExist() { verify(completionHandler, times(1)).accept(Thread.currentThread(), null); } - public void testExecuteBulkRequestCallsDocumentSizeObserver() { - /* - * This test makes sure that for both insert and upsert requests, when we call executeBulkRequest DocumentSizeObserver is - * called using a non-null index name. - */ - AtomicInteger wrappedObserverWasUsed = new AtomicInteger(0); - AtomicInteger parsedValueWasUsed = new AtomicInteger(0); - DocumentParsingProvider documentParsingProvider = new DocumentParsingProvider() { - @Override - public XContentMeteringParserDecorator newMeteringParserDecorator(DocWriteRequest request) { - return new XContentMeteringParserDecorator() { - @Override - public ParsedDocument.DocumentSize meteredDocumentSize() { - parsedValueWasUsed.incrementAndGet(); - return new ParsedDocument.DocumentSize(0, 0); - } - - @Override - public XContentParser decorate(XContentParser xContentParser) { - wrappedObserverWasUsed.incrementAndGet(); - return xContentParser; - } - }; - } - }; - IngestService ingestService = createWithProcessors( - Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()), - documentParsingProvider - ); - - PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); - ClusterState clusterState = ClusterState.builder(new ClusterName("_name")).build(); // Start empty - ClusterState previousClusterState = clusterState; - clusterState = executePut(putRequest, clusterState); - ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); - - BulkRequest bulkRequest = new BulkRequest(); - UpdateRequest updateRequest = new UpdateRequest("_index", "_id1").upsert("{}", "{}"); - updateRequest.upsertRequest().setPipeline("_id"); - bulkRequest.add(updateRequest); - IndexRequest indexRequest = new IndexRequest("_index").id("_id1").source(Map.of()).setPipeline("_id1"); - bulkRequest.add(indexRequest); - @SuppressWarnings("unchecked") - BiConsumer failureHandler = mock(BiConsumer.class); - @SuppressWarnings("unchecked") - final BiConsumer completionHandler = mock(BiConsumer.class); - ingestService.executeBulkRequest( - bulkRequest.numberOfActions(), - bulkRequest.requests(), - indexReq -> {}, - (s) -> false, - (slot, targetIndex, e) -> fail("Should not be redirecting failures"), - failureHandler, - completionHandler, - EsExecutors.DIRECT_EXECUTOR_SERVICE - ); - assertThat(wrappedObserverWasUsed.get(), equalTo(2)); - assertThat(parsedValueWasUsed.get(), equalTo(2)); - } - public void testExecuteSuccess() { IngestService ingestService = createWithProcessors( Map.of("mock", (factories, tag, description, config) -> mockCompoundProcessor()) @@ -2271,7 +2204,6 @@ public Map getProcessors(Processor.Parameters paramet List.of(testPlugin), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ); ingestService.addIngestClusterStateListener(ingestClusterStateListener); @@ -2611,7 +2543,6 @@ private void testUpdatingPipeline(String pipelineString) throws Exception { List.of(DUMMY_PLUGIN), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); @@ -2921,13 +2852,6 @@ private static IngestService createWithProcessors() { } private static IngestService createWithProcessors(Map processors) { - return createWithProcessors(processors, DocumentParsingProvider.EMPTY_INSTANCE); - } - - private static IngestService createWithProcessors( - Map processors, - DocumentParsingProvider documentParsingProvider - ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); @@ -2946,7 +2870,6 @@ public Map getProcessors(final Processor.Parameters p }), client, null, - documentParsingProvider, FailureStoreMetrics.NOOP ); if (randomBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java index 94b3607bd7608..e8115e7266176 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.XContentType; @@ -132,7 +131,6 @@ public Map getProcessors(final Processor.Parameters p List.of(ingestPlugin), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index c46d98fe1cd8b..e0363d84ea4d2 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2405,7 +2405,6 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { Collections.emptyList(), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ), mockFeatureService, @@ -2425,7 +2424,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { threadPool, shardStateAction, mappingUpdatedAction, - new UpdateHelper(scriptService, DocumentParsingProvider.EMPTY_INSTANCE), + new UpdateHelper(scriptService), actionFilters, indexingMemoryLimits, EmptySystemIndices.INSTANCE, diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 4713adf6cf01d..87c566d543d0f 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -100,6 +100,7 @@ import org.elasticsearch.index.translog.TranslogDeletionPolicy; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.plugins.internal.XContentMeteringParserDecorator; import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; @@ -428,7 +429,7 @@ protected static ParsedDocument testParsedDocument( source, XContentType.JSON, mappingUpdate, - ParsedDocument.DocumentSize.UNKNOWN + XContentMeteringParserDecorator.UNKNOWN_SIZE ); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 7e88cad88dcec..bb973bf4359e8 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.ingest.Processor; import org.elasticsearch.license.MockLicenseState; import org.elasticsearch.plugins.IngestPlugin; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.ml.MachineLearningField; @@ -139,7 +138,6 @@ public void setUpVariables() { Collections.singletonList(SKINNY_INGEST_PLUGIN), client, null, - DocumentParsingProvider.EMPTY_INSTANCE, FailureStoreMetrics.NOOP ); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java index 5710b031494bf..c2e9a92e45353 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/AuthorizationServiceTests.java @@ -115,7 +115,6 @@ import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.license.MockLicenseState; import org.elasticsearch.license.XPackLicenseState; -import org.elasticsearch.plugins.internal.DocumentParsingProvider; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -1580,7 +1579,7 @@ public void testDenialErrorMessagesForBulkIngest() throws Exception { TransportShardBulkAction.performOnPrimary( request, indexShard, - new UpdateHelper(mock(ScriptService.class), DocumentParsingProvider.EMPTY_INSTANCE), + new UpdateHelper(mock(ScriptService.class)), System::currentTimeMillis, mappingUpdater, waitForMappingUpdate,