From 9ad06a87012802d8cc08e0b5178e3751018a45f6 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 29 Oct 2025 13:20:21 -0500 Subject: [PATCH 1/3] Prototype of random sampling of child streams --- .../elasticsearch/ingest/IngestService.java | 27 +++++-------------- .../elasticsearch/ingest/SamplingService.java | 6 +++-- .../ingest/IngestServiceTests.java | 2 +- 3 files changed, 12 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index ddd60dc522b3f..685f249aa06d6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -1040,14 +1040,7 @@ public void onFailure(Exception e) { } ); - executePipelines( - pipelines, - indexRequest, - ingestDocument, - adaptedResolveFailureStore, - documentListener, - originalDocumentMetadata - ); + executePipelines(pipelines, indexRequest, ingestDocument, adaptedResolveFailureStore, documentListener); assert actionRequest.index() != null; i++; @@ -1166,8 +1159,7 @@ private void executePipelines( final IndexRequest indexRequest, final IngestDocument ingestDocument, final Function resolveFailureStore, - final ActionListener listener, - final Metadata originalDocumentMetadata + final ActionListener listener ) { assert pipelines.hasNext(); PipelineSlot slot = pipelines.next(); @@ -1353,14 +1345,14 @@ private void executePipelines( } if (newPipelines.hasNext()) { - executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener, originalDocumentMetadata); + executePipelines(newPipelines, indexRequest, ingestDocument, resolveFailureStore, listener); } else { /* * At this point, all pipelines have been executed, and we are about to overwrite ingestDocument with the results. * This is our chance to sample with both the original document and all changes. */ haveAttemptedSampling.set(true); - attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata); + attemptToSampleData(project, indexRequest, ingestDocument); updateIndexRequestSource(indexRequest, ingestDocument); cacheRawTimestamp(indexRequest, ingestDocument); listener.onResponse(IngestPipelinesExecutionResult.SUCCESSFUL_RESULT); // document succeeded! @@ -1369,7 +1361,7 @@ private void executePipelines( } catch (Exception e) { if (haveAttemptedSampling.get() == false) { // It is possible that an exception happened after we sampled. We do not want to sample the same document twice. - attemptToSampleData(project, indexRequest, ingestDocument, originalDocumentMetadata); + attemptToSampleData(project, indexRequest, ingestDocument); } logger.debug( () -> format("failed to execute pipeline [%s] for document [%s/%s]", pipelineId, indexRequest.index(), indexRequest.id()), @@ -1379,18 +1371,13 @@ private void executePipelines( } } - private void attemptToSampleData( - ProjectMetadata projectMetadata, - IndexRequest indexRequest, - IngestDocument ingestDocument, - Metadata originalDocumentMetadata - ) { + private void attemptToSampleData(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) { if (samplingService != null && samplingService.atLeastOneSampleConfigured(projectMetadata)) { /* * We need both the original document and the fully updated document for sampling, so we make a copy of the original * before overwriting it here. We can discard it after sampling. */ - samplingService.maybeSample(projectMetadata, originalDocumentMetadata.getIndex(), indexRequest, ingestDocument); + samplingService.maybeSample(projectMetadata, indexRequest, ingestDocument); } } diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 8ef0c51c41b7e..3cb4b2b310095 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -200,8 +200,10 @@ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexReque * @param indexRequest The raw request to potentially sample * @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration */ - public void maybeSample(ProjectMetadata projectMetadata, String indexName, IndexRequest indexRequest, IngestDocument ingestDocument) { - maybeSample(projectMetadata, indexName, indexRequest, () -> ingestDocument); + public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) { + for (String index : ingestDocument.getIndexHistory()) { + maybeSample(projectMetadata, index, indexRequest, () -> ingestDocument); + } } private void maybeSample( diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 46a8baf541009..bae950119685f 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -3403,7 +3403,7 @@ public void testSampling() { ); verify(listener, times(1)).onResponse(null); // In the case where there is a pipeline, or there is a pipeline failure, there will be an IngestDocument so this verion is called: - verify(samplingService, times(2)).maybeSample(any(), any(), any(), any()); + verify(samplingService, times(2)).maybeSample(any(), any(), any()); // When there is no pipeline, we have no IngestDocument, and the maybeSample that does not require an IngestDocument is called: verify(samplingService, times(1)).maybeSample(any(), any()); } From 7d236024dc4ad3f775d956355f8391cbd84d87b8 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 30 Oct 2025 13:58:50 -0500 Subject: [PATCH 2/3] Adding a yaml rest test --- .../test/ingest/100_sampling_with_reroute.yml | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/100_sampling_with_reroute.yml diff --git a/qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/100_sampling_with_reroute.yml b/qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/100_sampling_with_reroute.yml new file mode 100644 index 0000000000000..f9c4300c511d2 --- /dev/null +++ b/qa/smoke-test-ingest-with-all-dependencies/src/yamlRestTest/resources/rest-api-spec/test/ingest/100_sampling_with_reroute.yml @@ -0,0 +1,186 @@ +--- +"Test get sample with multiple reroutes": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + ingest.put_pipeline: + id: pipeline1 + body: > + { + "processors" : [ + { + "set" : { + "field": "message", + "value": "set by pipeline1" + } + }, + { + "reroute" : { + "destination": "foo.bar" + } + } + ] + } + - match: { acknowledged: true } + + - do: + ingest.put_pipeline: + id: pipeline2 + body: > + { + "processors" : [ + { + "set" : { + "field": "message", + "value": "set by pipeline2" + } + }, + { + "reroute" : { + "destination": "foo.bar.baz" + } + } + ] + } + - match: { acknowledged: true } + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [foo] + template: + settings: + default_pipeline: pipeline1 + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + message: + type: text + data_stream: {} + - match: { acknowledged: true } + + - do: + indices.put_index_template: + name: my-template2 + body: + index_patterns: [foo.bar] + template: + settings: + default_pipeline: pipeline2 + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + message: + type: text + data_stream: {} + - match: { acknowledged: true } + + - do: + indices.put_index_template: + name: my-template3 + body: + index_patterns: [foo.bar.baz] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + message: + type: text + data_stream: {} + - match: { acknowledged: true } + + - do: + indices.create_data_stream: + name: foo + - is_true: acknowledged + + - do: + indices.create_data_stream: + name: foo.bar + - is_true: acknowledged + + - do: + indices.create_data_stream: + name: foo.bar.baz + - is_true: acknowledged + + - do: + indices.rollover: + alias: foo.bar.baz + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: foo + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.put_sample_configuration: + index: foo.bar + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.put_sample_configuration: + index: foo.bar.baz + body: + rate: 1.0 + max_samples: 100 + + - do: + bulk: + refresh: true + body: + - '{ "create":{"_index": "foo" } }' + - '{"@timestamp": 123456, "message": "This is the original message"}' + - '{ "create":{"_index": "foo" } }' + - '{"@timestamp": 123456, "message": "This is the original message"}' + - match: { errors: false } + + - do: + indices.get_sample: + index: foo + - length: { sample: 2 } + - match: { sample.0.index: "foo" } + - match: { sample.0.source.message: "This is the original message" } + - match: { sample.1.source.message: "This is the original message" } + + - do: + indices.get_sample: + index: foo.bar + - length: { sample: 2 } + - match: { sample.0.index: "foo.bar" } + - match: { sample.0.source.message: "This is the original message" } + - match: { sample.1.source.message: "This is the original message" } + + - do: + indices.get_sample: + index: foo.bar.baz + - length: { sample: 2 } + - match: { sample.0.index: "foo.bar.baz" } + - match: { sample.0.source.message: "This is the original message" } + - match: { sample.1.source.message: "This is the original message" } + +--- +teardown: + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.delete_data_stream: + name: foo* From 45735f4457288e782b195960dac58b661dfab48b Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 30 Oct 2025 14:03:46 -0500 Subject: [PATCH 3/3] adding a comment --- .../main/java/org/elasticsearch/ingest/SamplingService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 3cb4b2b310095..25e02af531e29 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -195,12 +195,14 @@ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexReque } /** - * Potentially samples the given indexRequest, depending on the existing sampling configuration. + * Potentially samples the given indexRequest, depending on the existing sampling configuration. The request will be sampled against + * the sampling configurations of all indices it has been rerouted to (if it has been rerouted). * @param projectMetadata Used to get the sampling configuration * @param indexRequest The raw request to potentially sample * @param ingestDocument The IngestDocument used for evaluating any conditionals that are part of the sample configuration */ public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) { + // The index history gives us the initially-requested index, as well as any indices it has been rerouted through for (String index : ingestDocument.getIndexHistory()) { maybeSample(projectMetadata, index, indexRequest, () -> ingestDocument); }