5252import org .elasticsearch .cluster .service .MasterServiceTaskQueue ;
5353import org .elasticsearch .common .Priority ;
5454import org .elasticsearch .common .TriConsumer ;
55+ import org .elasticsearch .common .bytes .BytesReference ;
5556import org .elasticsearch .common .collect .ImmutableOpenMap ;
5657import org .elasticsearch .common .logging .DeprecationCategory ;
5758import org .elasticsearch .common .logging .DeprecationLogger ;
7778import org .elasticsearch .index .analysis .AnalysisRegistry ;
7879import org .elasticsearch .node .ReportingService ;
7980import org .elasticsearch .plugins .IngestPlugin ;
81+ import org .elasticsearch .script .Metadata ;
8082import org .elasticsearch .script .ScriptService ;
8183import org .elasticsearch .threadpool .Scheduler ;
8284import org .elasticsearch .threadpool .ThreadPool ;
98100import java .util .Set ;
99101import java .util .TreeMap ;
100102import java .util .concurrent .CopyOnWriteArrayList ;
103+ import java .util .concurrent .atomic .AtomicBoolean ;
101104import java .util .function .BiConsumer ;
102105import java .util .function .BiFunction ;
103106import java .util .function .Consumer ;
@@ -151,6 +154,7 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) {
151154 private volatile ClusterState state ;
152155 private final ProjectResolver projectResolver ;
153156 private final FeatureService featureService ;
157+ private final SamplingService samplingService ;
154158 private final Consumer <ActionListener <NodesInfoResponse >> nodeInfoListener ;
155159
156160 private static BiFunction <Long , Runnable , Scheduler .ScheduledCancellable > createScheduler (ThreadPool threadPool ) {
@@ -252,6 +256,7 @@ public IngestService(
252256 FailureStoreMetrics failureStoreMetrics ,
253257 ProjectResolver projectResolver ,
254258 FeatureService featureService ,
259+ SamplingService samplingService ,
255260 Consumer <ActionListener <NodesInfoResponse >> nodeInfoListener
256261 ) {
257262 this .clusterService = clusterService ;
@@ -276,6 +281,7 @@ public IngestService(
276281 this .failureStoreMetrics = failureStoreMetrics ;
277282 this .projectResolver = projectResolver ;
278283 this .featureService = featureService ;
284+ this .samplingService = samplingService ;
279285 this .nodeInfoListener = nodeInfoListener ;
280286 }
281287
@@ -290,7 +296,8 @@ public IngestService(
290296 MatcherWatchdog matcherWatchdog ,
291297 FailureStoreMetrics failureStoreMetrics ,
292298 ProjectResolver projectResolver ,
293- FeatureService featureService
299+ FeatureService featureService ,
300+ SamplingService samplingService
294301 ) {
295302 this (
296303 clusterService ,
@@ -304,6 +311,7 @@ public IngestService(
304311 failureStoreMetrics ,
305312 projectResolver ,
306313 featureService ,
314+ samplingService ,
307315 createNodeInfoListener (client )
308316 );
309317 }
@@ -324,6 +332,7 @@ public IngestService(
324332 this .failureStoreMetrics = ingestService .failureStoreMetrics ;
325333 this .projectResolver = ingestService .projectResolver ;
326334 this .featureService = ingestService .featureService ;
335+ this .samplingService = ingestService .samplingService ;
327336 this .nodeInfoListener = ingestService .nodeInfoListener ;
328337 }
329338
@@ -971,6 +980,7 @@ protected void doRun() {
971980 Pipeline firstPipeline = pipelines .peekFirst ();
972981 if (pipelines .hasNext () == false ) {
973982 i ++;
983+ samplingService .maybeSample (state .metadata ().projects ().get (pipelines .projectId ()), indexRequest );
974984 continue ;
975985 }
976986
@@ -983,7 +993,7 @@ protected void doRun() {
983993 final int slot = i ;
984994 final Releasable ref = refs .acquire ();
985995 final IngestDocument ingestDocument = newIngestDocument (indexRequest );
986- final org . elasticsearch . script . Metadata originalDocumentMetadata = ingestDocument .getMetadata ().clone ();
996+ final Metadata originalDocumentMetadata = ingestDocument .getMetadata ().clone ();
987997 // the document listener gives us three-way logic: a document can fail processing (1), or it can
988998 // be successfully processed. a successfully processed document can be kept (2) or dropped (3).
989999 final ActionListener <IngestPipelinesExecutionResult > documentListener = ActionListener .runAfter (
@@ -1030,7 +1040,14 @@ public void onFailure(Exception e) {
10301040 }
10311041 );
10321042
1033- executePipelines (pipelines , indexRequest , ingestDocument , adaptedResolveFailureStore , documentListener );
1043+ executePipelines (
1044+ pipelines ,
1045+ indexRequest ,
1046+ ingestDocument ,
1047+ adaptedResolveFailureStore ,
1048+ documentListener ,
1049+ originalDocumentMetadata
1050+ );
10341051 assert actionRequest .index () != null ;
10351052
10361053 i ++;
@@ -1149,7 +1166,8 @@ private void executePipelines(
11491166 final IndexRequest indexRequest ,
11501167 final IngestDocument ingestDocument ,
11511168 final Function <String , Boolean > resolveFailureStore ,
1152- final ActionListener <IngestPipelinesExecutionResult > listener
1169+ final ActionListener <IngestPipelinesExecutionResult > listener ,
1170+ final Metadata originalDocumentMetadata
11531171 ) {
11541172 assert pipelines .hasNext ();
11551173 PipelineSlot slot = pipelines .next ();
@@ -1180,12 +1198,12 @@ private void executePipelines(
11801198 listener .onFailure (e );
11811199 }
11821200 };
1183-
1201+ AtomicBoolean haveAttemptedSampling = new AtomicBoolean (false );
1202+ final var project = state .metadata ().projects ().get (pipelines .projectId ());
11841203 try {
11851204 if (pipeline == null ) {
11861205 throw new IllegalArgumentException ("pipeline with id [" + pipelineId + "] does not exist" );
11871206 }
1188- final var project = state .metadata ().projects ().get (pipelines .projectId ());
11891207 if (project == null ) {
11901208 throw new IllegalArgumentException ("project with id [" + pipelines .projectId () + "] does not exist" );
11911209 }
@@ -1335,15 +1353,24 @@ private void executePipelines(
13351353 }
13361354
13371355 if (newPipelines .hasNext ()) {
1338- executePipelines (newPipelines , indexRequest , ingestDocument , resolveFailureStore , listener );
1356+ executePipelines (newPipelines , indexRequest , ingestDocument , resolveFailureStore , listener , originalDocumentMetadata );
13391357 } else {
1340- // update the index request's source and (potentially) cache the timestamp for TSDB
1358+ /*
1359+ * At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results.
1360+ * This is our chance to sample with both the original document and all changes.
1361+ */
1362+ haveAttemptedSampling .set (true );
1363+ attemptToSampleData (project , indexRequest , ingestDocument , originalDocumentMetadata );
13411364 updateIndexRequestSource (indexRequest , ingestDocument );
13421365 cacheRawTimestamp (indexRequest , ingestDocument );
13431366 listener .onResponse (IngestPipelinesExecutionResult .SUCCESSFUL_RESULT ); // document succeeded!
13441367 }
13451368 });
13461369 } catch (Exception e ) {
1370+ if (haveAttemptedSampling .get () == false ) {
1371+ // It is possible that an exception happened after we sampled. We do not want to sample the same document twice.
1372+ attemptToSampleData (project , indexRequest , ingestDocument , originalDocumentMetadata );
1373+ }
13471374 logger .debug (
13481375 () -> format ("failed to execute pipeline [%s] for document [%s/%s]" , pipelineId , indexRequest .index (), indexRequest .id ()),
13491376 e
@@ -1352,6 +1379,56 @@ private void executePipelines(
13521379 }
13531380 }
13541381
1382+ private void attemptToSampleData (
1383+ ProjectMetadata projectMetadata ,
1384+ IndexRequest indexRequest ,
1385+ IngestDocument ingestDocument ,
1386+ Metadata originalDocumentMetadata
1387+ ) {
1388+ if (samplingService != null && samplingService .atLeastOneSampleConfigured ()) {
1389+ /*
1390+ * We need both the original document and the fully updated document for sampling, so we make a copy of the original
1391+ * before overwriting it here. We can discard it after sampling.
1392+ */
1393+ samplingService .maybeSample (projectMetadata , indexRequest .index (), () -> {
1394+ IndexRequest original = copyIndexRequestForSampling (indexRequest );
1395+ updateIndexRequestMetadata (original , originalDocumentMetadata );
1396+ return original ;
1397+ }, ingestDocument );
1398+
1399+ }
1400+ }
1401+
1402+ /**
1403+ * Creates a copy of an IndexRequest to be used by random sampling.
1404+ * @param original The IndexRequest to be copied
1405+ * @return A copy of the IndexRequest
1406+ */
1407+ private IndexRequest copyIndexRequestForSampling (IndexRequest original ) {
1408+ IndexRequest clonedRequest = new IndexRequest (original .index ());
1409+ clonedRequest .id (original .id ());
1410+ clonedRequest .routing (original .routing ());
1411+ clonedRequest .version (original .version ());
1412+ clonedRequest .versionType (original .versionType ());
1413+ clonedRequest .setPipeline (original .getPipeline ());
1414+ clonedRequest .setFinalPipeline (original .getFinalPipeline ());
1415+ clonedRequest .setIfSeqNo (original .ifSeqNo ());
1416+ clonedRequest .setIfPrimaryTerm (original .ifPrimaryTerm ());
1417+ clonedRequest .setRefreshPolicy (original .getRefreshPolicy ());
1418+ clonedRequest .waitForActiveShards (original .waitForActiveShards ());
1419+ clonedRequest .timeout (original .timeout ());
1420+ clonedRequest .opType (original .opType ());
1421+ clonedRequest .setParentTask (original .getParentTask ());
1422+ clonedRequest .setRequireDataStream (original .isRequireDataStream ());
1423+ clonedRequest .setRequireAlias (original .isRequireAlias ());
1424+ clonedRequest .setIncludeSourceOnError (original .getIncludeSourceOnError ());
1425+ BytesReference source = original .source ();
1426+ if (source != null ) {
1427+ clonedRequest .source (source , original .getContentType ());
1428+ }
1429+ return clonedRequest ;
1430+ }
1431+
13551432 private static void executePipeline (
13561433 final IngestDocument ingestDocument ,
13571434 final Pipeline pipeline ,
0 commit comments