diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 9183f77448a2f..f7460dd3de47d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -42,6 +42,7 @@ import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -90,6 +91,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction protected final Executor systemCoordinationExecutor; private final ActionType bulkAction; protected final FeatureService featureService; + protected final SamplingService samplingService; public TransportAbstractBulkAction( ActionType action, @@ -103,7 +105,8 @@ public TransportAbstractBulkAction( SystemIndices systemIndices, ProjectResolver projectResolver, LongSupplier relativeTimeNanosProvider, - FeatureService featureService + FeatureService featureService, + SamplingService samplingService ) { super(action.name(), transportService, actionFilters, requestReader, EsExecutors.DIRECT_EXECUTOR_SERVICE); this.threadPool = threadPool; @@ -119,6 +122,7 @@ public TransportAbstractBulkAction( clusterService.addStateApplier(this.ingestForwarder); this.relativeTimeNanosProvider = relativeTimeNanosProvider; this.bulkAction = action; + this.samplingService = samplingService; } @Override @@ -204,13 +208,18 @@ private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executo executor.execute(new ActionRunnable<>(releasingListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener, false); } }); } - private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener listener) - throws IOException { + private boolean applyPipelines( + Task task, + BulkRequest bulkRequest, + Executor executor, + ActionListener listener, + boolean haveRunIngestService + ) throws IOException { boolean hasIndexRequestsWithPipelines = false; ClusterState state = clusterService.state(); ProjectId projectId = projectResolver.getProjectId(); @@ -303,6 +312,16 @@ private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor exec } }); return true; + } else if (haveRunIngestService == false && samplingService != null && samplingService.atLeastOneSampleConfigured()) { + /* + * Else ample only if this request has not passed through IngestService::executeBulkRequest. Otherwise, some request within the + * bulk had pipelines and we sampled in IngestService already. + */ + for (DocWriteRequest actionRequest : bulkRequest.requests) { + if (actionRequest instanceof IndexRequest ir) { + samplingService.maybeSample(project, ir); + } + } } return false; } @@ -338,7 +357,7 @@ private void processBulkIndexIngestRequest( ActionRunnable runnable = new ActionRunnable<>(actionListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener, true); } @Override @@ -416,7 +435,8 @@ private void applyPipelinesAndDoInternalExecute( Task task, BulkRequest bulkRequest, Executor executor, - ActionListener listener + ActionListener listener, + boolean haveRunIngestService ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); @@ -434,7 +454,7 @@ private void applyPipelinesAndDoInternalExecute( var wrappedListener = bulkRequestModifier.wrapActionListenerIfNeeded(listener); - if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener) == false) { + if (applyPipelines(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, haveRunIngestService) == false) { doInternalExecute(task, bulkRequestModifier.getBulkRequest(), executor, wrappedListener, relativeStartTimeNanos); } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 7e443e055cc90..81d60886b7bab 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -101,7 +102,8 @@ public TransportBulkAction( ProjectResolver projectResolver, FailureStoreMetrics failureStoreMetrics, DataStreamFailureStoreSettings dataStreamFailureStoreSettings, - FeatureService featureService + FeatureService featureService, + SamplingService samplingService ) { this( threadPool, @@ -117,7 +119,8 @@ public TransportBulkAction( threadPool::relativeTimeInNanos, failureStoreMetrics, dataStreamFailureStoreSettings, - featureService + featureService, + samplingService ); } @@ -135,7 +138,8 @@ public TransportBulkAction( LongSupplier relativeTimeProvider, FailureStoreMetrics failureStoreMetrics, DataStreamFailureStoreSettings dataStreamFailureStoreSettings, - FeatureService featureService + FeatureService featureService, + SamplingService samplingService ) { this( TYPE, @@ -153,7 +157,8 @@ public TransportBulkAction( relativeTimeProvider, failureStoreMetrics, dataStreamFailureStoreSettings, - featureService + featureService, + samplingService ); } @@ -173,7 +178,8 @@ public TransportBulkAction( LongSupplier relativeTimeProvider, FailureStoreMetrics failureStoreMetrics, DataStreamFailureStoreSettings dataStreamFailureStoreSettings, - FeatureService featureService + FeatureService featureService, + SamplingService samplingService ) { super( bulkAction, @@ -187,7 +193,8 @@ public TransportBulkAction( systemIndices, projectResolver, relativeTimeProvider, - featureService + featureService, + samplingService ); this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; Objects.requireNonNull(relativeTimeProvider); diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java index b52f5447b9311..6712920b3bf85 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSimulateBulkAction.java @@ -117,7 +117,8 @@ public TransportSimulateBulkAction( systemIndices, projectResolver, threadPool::relativeTimeInNanos, - featureService + featureService, + null ); this.indicesService = indicesService; this.xContentRegistry = xContentRegistry; diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 2c0359e367086..2265a0343576c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -52,6 +52,7 @@ import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.logging.DeprecationCategory; import org.elasticsearch.common.logging.DeprecationLogger; @@ -77,6 +78,7 @@ import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.node.ReportingService; import org.elasticsearch.plugins.IngestPlugin; +import org.elasticsearch.script.Metadata; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -98,6 +100,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -151,6 +154,7 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) { private volatile ClusterState state; private final ProjectResolver projectResolver; private final FeatureService featureService; + private final SamplingService samplingService; private final Consumer> nodeInfoListener; private static BiFunction createScheduler(ThreadPool threadPool) { @@ -252,6 +256,7 @@ public IngestService( FailureStoreMetrics failureStoreMetrics, ProjectResolver projectResolver, FeatureService featureService, + SamplingService samplingService, Consumer> nodeInfoListener ) { this.clusterService = clusterService; @@ -276,6 +281,7 @@ public IngestService( this.failureStoreMetrics = failureStoreMetrics; this.projectResolver = projectResolver; this.featureService = featureService; + this.samplingService = samplingService; this.nodeInfoListener = nodeInfoListener; } @@ -290,7 +296,8 @@ public IngestService( MatcherWatchdog matcherWatchdog, FailureStoreMetrics failureStoreMetrics, ProjectResolver projectResolver, - FeatureService featureService + FeatureService featureService, + SamplingService samplingService ) { this( clusterService, @@ -304,6 +311,7 @@ public IngestService( failureStoreMetrics, projectResolver, featureService, + samplingService, createNodeInfoListener(client) ); } @@ -324,6 +332,7 @@ public IngestService( this.failureStoreMetrics = ingestService.failureStoreMetrics; this.projectResolver = ingestService.projectResolver; this.featureService = ingestService.featureService; + this.samplingService = ingestService.samplingService; this.nodeInfoListener = ingestService.nodeInfoListener; } @@ -971,6 +980,7 @@ protected void doRun() { Pipeline firstPipeline = pipelines.peekFirst(); if (pipelines.hasNext() == false) { i++; + samplingService.maybeSample(state.metadata().projects().get(pipelines.projectId()), indexRequest); continue; } @@ -983,7 +993,7 @@ protected void doRun() { final int slot = i; final Releasable ref = refs.acquire(); final IngestDocument ingestDocument = newIngestDocument(indexRequest); - final org.elasticsearch.script.Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone(); + final Metadata originalDocumentMetadata = ingestDocument.getMetadata().clone(); // the document listener gives us three-way logic: a document can fail processing (1), or it can // be successfully processed. a successfully processed document can be kept (2) or dropped (3). final ActionListener documentListener = ActionListener.runAfter( @@ -1030,7 +1040,14 @@ public void onFailure(Exception e) { } ); - executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener); + executePipelines( + pipelines, + indexRequest, + ingestDocument, + adaptedResolveFailureStore, + documentListener, + originalDocumentMetadata + ); assert actionRequest.index() != null; i++; @@ -1149,7 +1166,8 @@ private void executePipelines( final IndexRequest indexRequest, final IngestDocument ingestDocument, final Function resolveFailureStore, - final ActionListener listener + final ActionListener listener, + final Metadata originalDocumentMetadata ) { assert pipelines.hasNext(); PipelineSlot slot = pipelines.next(); @@ -1180,12 +1198,12 @@ private void executePipelines( listener.onFailure(e); } }; - + AtomicBoolean haveAttemptedSampling = new AtomicBoolean(false); + final var project = state.metadata().projects().get(pipelines.projectId()); try { if (pipeline == null) { throw new IllegalArgumentException("pipeline with id [" + pipelineId + "] does not exist"); } - final var project = state.metadata().projects().get(pipelines.projectId()); if (project == null) { throw new IllegalArgumentException("project with id [" + pipelines.projectId() + "] does not exist"); } @@ -1335,15 +1353,24 @@ private void executePipelines( } if (newPipelines.hasNext()) { - executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener); + executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener, originalDocumentMetadata); } else { - // update the index request's source and (potentially) cache the timestamp for TSDB + /* + * At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results. + * This is our chance to sample with both the original document and all changes. + */ + haveAttemptedSampling.set(true); + attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata); updateIndexRequestSource(indexRequest, ingestDocument); cacheRawTimestamp(indexRequest, ingestDocument); listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded! } }); } catch (Exception e) { + if (haveAttemptedSampling.get() == false) { + // It is possible that an exception happened after we sampled. We do not want to sample the same document twice. + attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata); + } logger.debug( () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), e @@ -1352,6 +1379,56 @@ private void executePipelines( } } + private void attemptToSampleData( + ProjectMetadata projectMetadata, + IndexRequest indexRequest, + IngestDocument ingestDocument, + Metadata originalDocumentMetadata + ) { + if (samplingService != null && samplingService.atLeastOneSampleConfigured()) { + /* + * We need both the original document and the fully updated document for sampling, so we make a copy of the original + * before overwriting it here. We can discard it after sampling. + */ + samplingService.maybeSample(projectMetadata, indexRequest.index(), () -> { + IndexRequest original = copyIndexRequestForSampling(indexRequest); + updateIndexRequestMetadata(original, originalDocumentMetadata); + return original; + }, ingestDocument); + + } + } + + /** + * Creates a copy of an IndexRequest to be used by random sampling. + * @param original The IndexRequest to be copied + * @return A copy of the IndexRequest + */ + private IndexRequest copyIndexRequestForSampling(IndexRequest original) { + IndexRequest clonedRequest = new IndexRequest(original.index()); + clonedRequest.id(original.id()); + clonedRequest.routing(original.routing()); + clonedRequest.version(original.version()); + clonedRequest.versionType(original.versionType()); + clonedRequest.setPipeline(original.getPipeline()); + clonedRequest.setFinalPipeline(original.getFinalPipeline()); + clonedRequest.setIfSeqNo(original.ifSeqNo()); + clonedRequest.setIfPrimaryTerm(original.ifPrimaryTerm()); + clonedRequest.setRefreshPolicy(original.getRefreshPolicy()); + clonedRequest.waitForActiveShards(original.waitForActiveShards()); + clonedRequest.timeout(original.timeout()); + clonedRequest.opType(original.opType()); + clonedRequest.setParentTask(original.getParentTask()); + clonedRequest.setRequireDataStream(original.isRequireDataStream()); + clonedRequest.setRequireAlias(original.isRequireAlias()); + clonedRequest.setIncludeSourceOnError(original.getIncludeSourceOnError()); + BytesReference source = original.source(); + if (source != null) { + clonedRequest.source(source, original.getContentType()); + } + return clonedRequest; + } + private static void executePipeline( final IngestDocument ingestDocument, final Pipeline pipeline, diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java new file mode 100644 index 0000000000000..477ef12a5c042 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.xcontent.XContentParseException; + +import java.util.Map; +import java.util.function.Supplier; + +public class SamplingService implements ClusterStateListener { + private static final Logger logger = LogManager.getLogger(SamplingService.class); + private final ScriptService scriptService; + private final ClusterService clusterService; + + public SamplingService(ScriptService scriptService, ClusterService clusterService) { + this.scriptService = scriptService; + this.clusterService = clusterService; + } + + /** + * Potentially samples the given indexRequest, depending on the existing sampling configuration. + * @param projectMetadata Used to get the sampling configuration + * @param indexRequest The raw request to potentially sample + */ + public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest) { + maybeSample(projectMetadata, indexRequest.index(), () -> indexRequest, () -> { + Map sourceAsMap; + try { + sourceAsMap = indexRequest.sourceAsMap(); + } catch (XContentParseException e) { + sourceAsMap = Map.of(); + logger.trace("Invalid index request source, attempting to sample anyway"); + } + return new IngestDocument( + indexRequest.index(), + indexRequest.id(), + indexRequest.version(), + indexRequest.routing(), + indexRequest.versionType(), + sourceAsMap + ); + }); + } + + /** + * + * @param projectMetadata Used to get the sampling configuration + * @param indexRequestSupplier A supplier for the raw request to potentially sample + * @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration + */ + public void maybeSample( + ProjectMetadata projectMetadata, + String indexName, + Supplier indexRequestSupplier, + IngestDocument ingestDocument + ) { + maybeSample(projectMetadata, indexName, indexRequestSupplier, () -> ingestDocument); + } + + private void maybeSample( + ProjectMetadata projectMetadata, + String indexName, + Supplier indexRequest, + Supplier ingestDocumentSupplier + ) { + // TODO Sampling logic to go here in the near future + } + + public boolean atLeastOneSampleConfigured() { + return false; // TODO Return true if there is at least one sample in the cluster state + } + + @Override + public void clusterChanged(ClusterChangedEvent event) { + // TODO: React to sampling config changes + } + +} diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 72c48c303ffa9..57f3dda579819 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -142,6 +142,7 @@ import org.elasticsearch.indices.recovery.plan.RecoveryPlannerService; import org.elasticsearch.indices.recovery.plan.ShardSnapshotsService; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.injection.guice.Injector; import org.elasticsearch.injection.guice.Key; import org.elasticsearch.injection.guice.Module; @@ -716,6 +717,10 @@ private void construct( FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class)); + SamplingService samplingService = new SamplingService(scriptService, clusterService); + modules.bindToInstance(SamplingService.class, samplingService); + clusterService.addListener(samplingService); + FailureStoreMetrics failureStoreMetrics = new FailureStoreMetrics(telemetryProvider.getMeterRegistry()); final IngestService ingestService = new IngestService( clusterService, @@ -728,7 +733,8 @@ private void construct( IngestService.createGrokThreadWatchdog(environment, threadPool), failureStoreMetrics, projectResolver, - featureService + featureService, + samplingService ); SystemIndices systemIndices = createSystemIndices(settings); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index a54cd08c3738a..119385319e52f 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.TestIndexNameExpressionResolver; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -172,10 +173,17 @@ class TestTransportBulkAction extends TransportBulkAction { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + initializeSamplingService() ); } + private static SamplingService initializeSamplingService() { + SamplingService samplingService = mock(SamplingService.class); + when(samplingService.atLeastOneSampleConfigured()).thenReturn(true); + return samplingService; + } + @Override void executeBulk( Task task, diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 481fdf5ea3530..4242c44e29808 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -63,6 +63,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.indices.SystemIndexDescriptorUtils; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.index.IndexVersionUtils; @@ -81,6 +82,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -92,6 +94,8 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class TransportBulkActionTests extends ESTestCase { @@ -152,10 +156,17 @@ public ProjectId getProjectId() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + initializeSamplingService() ); } + private static SamplingService initializeSamplingService() { + SamplingService samplingService = mock(SamplingService.class); + when(samplingService.atLeastOneSampleConfigured()).thenReturn(true); + return samplingService; + } + @Override void createIndex(CreateIndexRequest createIndexRequest, ActionListener listener) { indexCreated = true; @@ -733,6 +744,19 @@ public void testFailuresDuringPrerequisiteActions() throws InterruptedException assertNull(bulkRequest.requests.get(2)); } + public void testSampling() throws ExecutionException, InterruptedException { + // This test makes sure that the sampling service is called once per IndexRequest + BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id1").source(Collections.emptyMap())) + .add(new IndexRequest("index").id("id2").source(Collections.emptyMap())) + .add(new DeleteRequest("index2").id("id3")); + PlainActionFuture future = new PlainActionFuture<>(); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.get(); + assertTrue(bulkAction.indexCreated); + // We expect 2 sampling calls since there are 2 index requests: + verify(bulkAction.samplingService, times(2)).maybeSample(any(), any()); + } + private BulkRequest buildBulkRequest(List indices) { BulkRequest request = new BulkRequest(); for (String index : indices) { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java index 0077f739bf7b6..fb2e8963614a3 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTookTests.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.indices.EmptySystemIndices; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -66,6 +67,7 @@ import static org.elasticsearch.test.StreamsUtils.copyToStringFromClasspath; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; public class TransportBulkActionTookTests extends ESTestCase { @@ -267,7 +269,8 @@ static class TestTransportBulkAction extends TransportBulkAction { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ); } } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index df3211331c7f5..395407c897a63 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -171,7 +171,8 @@ public void testIngestPlugin() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); @@ -198,7 +199,8 @@ public void testIngestPluginDuplicate() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); @@ -222,7 +224,8 @@ public void testExecuteIndexPipelineDoesNotExist() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(Map.of()) @@ -243,11 +246,18 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { assertThat(status, equalTo(fsStatus)); }; + // This is due to a quirk of IngestService. It uses a cluster state from the most recent cluster change event: + ProjectId projectId = randomProjectIdOrDefault(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(ProjectMetadata.builder(projectId).build()) + .build(); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); + @SuppressWarnings("unchecked") final ActionListener listener = mock(ActionListener.class); ingestService.executeBulkRequest( - randomProjectIdOrDefault(), + projectId, 1, List.of(indexRequest), indexReq -> {}, @@ -1833,7 +1843,8 @@ public void testFailureRedirectionWithoutNodeFeatureEnabled() throws Exception { "set", (factories, tag, description, config, projectId) -> new FakeProcessor("set", "", "", (ingestDocument) -> fail()) ), - Predicates.never() + Predicates.never(), + mock(SamplingService.class) ); PutPipelineRequest putRequest1 = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}"); // given that set -> fail() above, it's a failure if a document executes against this pipeline @@ -2567,7 +2578,8 @@ public Map getProcessors(Processor.Parameters paramet public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ); ingestService.addIngestClusterStateListener(ingestClusterStateListener); @@ -3075,6 +3087,7 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } }, + mock(SamplingService.class), consumer ); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); @@ -3337,6 +3350,64 @@ public void testResolvePipelinesWithNonePipeline() { } } + public void testSampling() { + SamplingService samplingService = mock(SamplingService.class); + IngestService ingestService = createWithProcessors( + Map.of("mock", (factories, tag, description, config, projectId) -> mockCompoundProcessor()), + Predicates.never(), + samplingService + ); + when(samplingService.atLeastOneSampleConfigured()).thenReturn(true); + PutPipelineRequest putRequest = putJsonPipelineRequest("_id", "{\"processors\": [{\"mock\" : {}}]}"); + var projectId = randomProjectIdOrDefault(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .putProjectMetadata(ProjectMetadata.builder(projectId).build()) + .build(); + ClusterState previousClusterState = clusterState; + clusterState = executePut(projectId, putRequest, clusterState); + ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); + + BulkRequest bulkRequest = new BulkRequest(); + + IndexRequest indexRequest1 = new IndexRequest("_index").id("_id1").source(Map.of()).setPipeline("_none").setFinalPipeline("_none"); + bulkRequest.add(indexRequest1); + IndexRequest indexRequest2 = new IndexRequest("_index").id("_id2").source(Map.of()).setPipeline("_id").setFinalPipeline("_none"); + bulkRequest.add(indexRequest2); + IndexRequest indexRequest3 = new IndexRequest("_index").id("_id3") + .source(Map.of()) + .setPipeline("does_not_exist") + .setFinalPipeline("_none"); + bulkRequest.add(indexRequest3); + @SuppressWarnings("unchecked") + TriConsumer failureHandler = mock(TriConsumer.class); + @SuppressWarnings("unchecked") + final ActionListener listener = mock(ActionListener.class); + + Boolean noRedirect = randomBoolean() ? false : null; + IndexDocFailureStoreStatus fsStatus = IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN; + + ingestService.executeBulkRequest( + projectId, + bulkRequest.numberOfActions(), + bulkRequest.requests(), + indexReq -> {}, + (s) -> noRedirect, + (slot, targetIndex, e) -> fail("Should not be redirecting failures"), + failureHandler, + listener + ); + verify(failureHandler, times(1)).apply( + argThat(item -> item == 2), + argThat(iae -> "pipeline with id [does_not_exist] does not exist".equals(iae.getMessage())), + argThat(fsStatus::equals) + ); + verify(listener, times(1)).onResponse(null); + // In the case where there is a pipeline, or there is a pipeline failure, there will be an IngestDocument so this verion is called: + verify(samplingService, times(2)).maybeSample(any(), any(), any(), any()); + // When there is no pipeline, we have no IngestDocument, and the maybeSample that does not require an IngestDocument is called: + verify(samplingService, times(1)).maybeSample(any(), any()); + } + private static Tuple randomMapEntry() { return tuple(randomAlphaOfLength(5), randomObject()); } @@ -3377,10 +3448,14 @@ private static IngestService createWithProcessors() { } private static IngestService createWithProcessors(Map processors) { - return createWithProcessors(processors, DataStream.DATA_STREAM_FAILURE_STORE_FEATURE::equals); + return createWithProcessors(processors, DataStream.DATA_STREAM_FAILURE_STORE_FEATURE::equals, mock(SamplingService.class)); } - private static IngestService createWithProcessors(Map processors, Predicate featureTest) { + private static IngestService createWithProcessors( + Map processors, + Predicate featureTest, + SamplingService samplingService + ) { Client client = mock(Client.class); ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); @@ -3406,7 +3481,8 @@ public Map getProcessors(final Processor.Parameters p public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return featureTest.test(feature); } - } + }, + samplingService ); if (randomBoolean()) { /* diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java index 6733ff4c44e6e..79b4e76932fb3 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java @@ -173,7 +173,8 @@ public Map getProcessors(final Processor.Parameters p public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 6a459cab07328..d6a57ba4587c5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -166,6 +166,7 @@ import org.elasticsearch.indices.recovery.SnapshotFilesProvider; import org.elasticsearch.indices.recovery.plan.PeerOnlyRecoveryPlannerService; import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; @@ -2666,7 +2667,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ), client, actionFilters, @@ -2681,7 +2683,8 @@ public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ) ); final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction( diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 06ba7ba113d4e..85f0aa5c40a35 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.ingest.Processor; +import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.license.MockLicenseState; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.test.ESTestCase; @@ -159,7 +160,8 @@ public void setUpVariables() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + mock(SamplingService.class) ); }