From 774dabddfb92d6a14f71e8114a304fddbc97e343 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 28 Oct 2025 10:16:04 -0500 Subject: [PATCH 1/4] Fixing data stream support in random sampling --- .../30_with_data_streams.yml | 275 +++++++++++ .../30_with_data_streams.yml | 435 ++++++++++++++++++ .../sampling/GetSampleStatsAction.java | 5 + ...nsportDeleteSampleConfigurationAction.java | 3 +- .../sampling/TransportGetSampleAction.java | 7 +- .../TransportGetSampleStatsAction.java | 7 +- ...TransportPutSampleConfigurationAction.java | 3 +- .../elasticsearch/ingest/SamplingService.java | 20 + 8 files changed, 749 insertions(+), 6 deletions(-) create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml new file mode 100644 index 0000000000000..8109b2919b6e9 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml @@ -0,0 +1,275 @@ +--- +"Test get sample for basic sample config": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.get_sample: + index: sample_test + - match: { sample: [] } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample: + index: sample_test + - length: { sample: 2 } + - match: { sample.0.index: "sample_test" } + - match: { sample.0.source.animal: "dog" } + - match: { sample.0.source.foo: "bar" } + - match: { sample.1.source.animal: "cat" } + - match: { sample.1.source.foo: "baz" } + +--- +"Test get sample for conditional sample config": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + if: "ctx?.animal == 'dog'" + + - do: + indices.get_sample: + index: sample_test + - match: { sample: [] } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample: + index: sample_test + - length: { sample: 1 } + - match: { sample.0.index: "sample_test" } + - match: { sample.0.source.animal: "dog" } + - match: { sample.0.source.foo: "bar" } + +--- +"Test that deleting sample config deletes sample": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.get_sample: + index: sample_test + - match: { sample: [] } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + + - do: + indices.get_sample: + index: sample_test + - length: { sample: 1 } + + - do: + indices.delete_sample_configuration: + index: sample_test + + - do: + indices.get_sample: + index: sample_test + catch: missing + +--- +"Test that deleting index deletes sample": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.get_sample: + index: sample_test + - match: { sample: [] } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample: + index: sample_test + - length: { sample: 2 } + + - do: + indices.delete_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.get_sample_configuration: + index: sample_test + catch: missing + + - do: + indices.get_sample: + index: sample_test + catch: missing + diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml new file mode 100644 index 0000000000000..d9b41a9edaf9b --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml @@ -0,0 +1,435 @@ +--- +"Test get sample stats for basic sample config": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 0 } + - match: { samples_rejected_for_max_samples_exceeded: 0 } + - match: { samples_rejected_for_condition: 0 } + - match: { samples_rejected_for_rate: 0 } + - match: { samples_rejected_for_exception: 0 } + - match: { samples_rejected_for_size: 0 } + - match: { samples_accepted: 0 } + - match: { time_sampling_millis: 0 } + - match: { time_sampling: "0s" } + - match: { time_evaluating_condition_millis: 0 } + - match: { time_evaluating_condition: "0s" } + - match: { time_compiling_condition_millis: 0 } + - match: { time_compiling_condition: "0s" } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 2 } + - match: { samples_rejected_for_max_samples_exceeded: 0 } + - match: { samples_rejected_for_condition: 0 } + - match: { samples_rejected_for_rate: 0 } + - match: { samples_rejected_for_exception: 0 } + - match: { samples_rejected_for_size: 0 } + - match: { samples_accepted: 2 } + - match: { time_evaluating_condition_millis: 0 } + - match: { time_evaluating_condition: "0s" } + - match: { time_compiling_condition_millis: 0 } + - match: { time_compiling_condition: "0s" } + - match: { last_exception: null } + +--- +"Test get sample stats for conditional sample config": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + if: "ctx?.animal == 'dog'" + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 2 } + - match: { samples_rejected_for_max_samples_exceeded: 0 } + - match: { samples_rejected_for_condition: 1 } + - match: { samples_rejected_for_rate: 0 } + - match: { samples_rejected_for_exception: 0 } + - match: { samples_rejected_for_size: 0 } + - match: { samples_accepted: 1 } + - match: { last_exception: null } + +--- +"Test that deleting sample config deletes sample stats": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 0 } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 1 } + + - do: + indices.delete_sample_configuration: + index: sample_test + + - do: + indices.get_sample_stats: + index: sample_test + catch: missing + +--- +"Test that deleting index deletes sample stats": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 0 } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 2 } + + - do: + indices.delete_data_stream: + name: sample_test + + - do: + indices.get_sample: + index: sample_test + catch: missing + + - do: + indices.get_sample_stats: + index: sample_test + catch: missing + +--- +"Test get sample stats for exceeds size": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 1 + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 0 } + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 2 } + - match: { samples_rejected_for_max_samples_exceeded: 1 } + - match: { samples_rejected_for_condition: 0 } + - match: { samples_rejected_for_rate: 0 } + - match: { samples_rejected_for_exception: 0 } + - match: { samples_rejected_for_size: 0 } + - match: { samples_accepted: 1 } + - match: { time_evaluating_condition_millis: 0 } + - match: { time_evaluating_condition: "0s" } + - match: { time_compiling_condition_millis: 0 } + - match: { time_compiling_condition: "0s" } + +--- +"Test get sample stats for conditional with exception": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [sample_test] + template: + settings: + index.number_of_shards: 1 + index.number_of_replicas: 0 + mappings: + dynamic: strict + properties: + animal: + type: text + foo: + type: text + data_stream: {} + + - do: + indices.create_data_stream: + name: sample_test + - is_true: acknowledged + + - do: + indices.rollover: + alias: sample_test + wait_for_active_shards: 1 + - match: { rolled_over: true } + + - do: + indices.put_sample_configuration: + index: sample_test + body: + rate: 1.0 + max_samples: 100 + if: "ctx?.animal > 0" + + - do: + bulk: + refresh: true + body: + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "dog", "foo": "bar"}' + - '{"index": {"_index": "sample_test"}}' + - '{"animal": "cat", "foo": "baz"}' + + - do: + indices.get_sample_stats: + index: sample_test + human: true + - match: { potential_samples: 2 } + - match: { samples_rejected_for_max_samples_exceeded: 0 } + - match: { samples_rejected_for_condition: 0 } + - match: { samples_rejected_for_rate: 0 } + - match: { samples_rejected_for_exception: 2 } + - match: { samples_rejected_for_size: 0 } + - match: { samples_accepted: 0 } + - match: { last_exception.type: "script_exception" } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java index a8e6e2b0140a2..00e14e12b4e3f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java @@ -60,6 +60,11 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, return new CancellableTask(id, type, action, "get sample stats", parentTaskId, headers); } + @Override + public boolean includeDataStreams() { + return true; + } + @Override public ActionRequestValidationException validate() { if (this.indexName.contains("*")) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationAction.java index d23c3df2ca441..2292e9a5a284d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationAction.java @@ -76,9 +76,8 @@ protected void masterOperation( ClusterState state, ActionListener listener ) throws Exception { - // throws IndexNotFoundException if any index does not exist or more than one index is resolved try { - indexNameExpressionResolver.concreteIndexNames(state, request); + SamplingService.throwIndexNotFoundExceptionIfNotDataStreamOrIndex(indexNameExpressionResolver, projectResolver, state, request); } catch (IndexNotFoundException e) { listener.onFailure(e); return; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java index 87842ef096e33..a3979bdeb4cd8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java @@ -75,7 +75,12 @@ protected Void createActionContext(Task task, Request request) { @Override protected Response newResponse(Request request, List nodeResponses, List failures) { - indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request); + SamplingService.throwIndexNotFoundExceptionIfNotDataStreamOrIndex( + indexNameExpressionResolver, + projectResolver, + clusterService.state(), + request + ); SamplingMetadata samplingMetadata = projectResolver.getProjectMetadata(clusterService.state()).custom(SamplingMetadata.TYPE); final int maxSamples; if (samplingMetadata == null) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java index 03cc218e1e6f4..2feb024759094 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java @@ -75,7 +75,12 @@ protected Void createActionContext(Task task, GetSampleStatsAction.Request reque @Override protected Response newResponse(Request request, List nodeResponses, List failures) { - indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request); + SamplingService.throwIndexNotFoundExceptionIfNotDataStreamOrIndex( + indexNameExpressionResolver, + projectResolver, + clusterService.state(), + request + ); SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( projectResolver.getProjectMetadata(clusterService.state()), request.indices()[0] diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java index db08bab3a5fc1..56304fbd1a0fd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java @@ -75,9 +75,8 @@ protected void masterOperation( ClusterState state, ActionListener listener ) throws Exception { - // throws IndexNotFoundException if any index does not exist or more than one index is resolved try { - indexNameExpressionResolver.concreteIndexNames(state, request); + SamplingService.throwIndexNotFoundExceptionIfNotDataStreamOrIndex(indexNameExpressionResolver, projectResolver, state, request); } catch (IndexNotFoundException e) { listener.onFailure(e); return; diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index c11c5bb1b30d0..473e97b6763b0 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -14,6 +14,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.action.index.IndexRequest; @@ -25,9 +26,11 @@ import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; @@ -327,6 +330,23 @@ public SampleStats getLocalSampleStats(ProjectId projectId, String index) { return sampleInfo == null ? new SampleStats() : sampleInfo.stats; } + /* + * Throws an IndexNotFoundException if the first index in the IndicesRequest is not a data stream or a single index that exists + */ + public static void throwIndexNotFoundExceptionIfNotDataStreamOrIndex( + IndexNameExpressionResolver indexNameExpressionResolver, + ProjectResolver projectResolver, + ClusterState state, + IndicesRequest request + ) { + assert request.indices().length == 1 : "Expected IndicesRequest to have a single index but found " + request.indices().length; + assert request.includeDataStreams() : "Expected IndicesRequest to include data streams but it did not"; + boolean isDataStream = projectResolver.getProjectMetadata(state).dataStreams().containsKey(request.indices()[0]); + if (isDataStream == false) { + indexNameExpressionResolver.concreteIndexNames(state, request); + } + } + public boolean atLeastOneSampleConfigured(ProjectMetadata projectMetadata) { if (RANDOM_SAMPLING_FEATURE_FLAG) { SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); From fb889be1ea9d51b1f176e19e1693f58dd9b31e7e Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 28 Oct 2025 15:06:38 -0500 Subject: [PATCH 2/4] Delete sample config when a data stream is deleted --- .../30_with_data_streams.yml | 10 +++ .../30_with_data_streams.yml | 11 +++ .../ingest/SamplingServiceIT.java | 76 +++++++++++++++++++ .../elasticsearch/ingest/SamplingService.java | 17 +++++ 4 files changed, 114 insertions(+) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml index 8109b2919b6e9..09bf2ec7cde89 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml @@ -273,3 +273,13 @@ index: sample_test catch: missing +--- +teardown: + + - do: + indices.delete_data_stream: + name: sample_test* + + - do: + indices.get_all_sample_configuration: {} + - length: { $body: 0 } diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml index d9b41a9edaf9b..6403c7c7e9606 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml @@ -433,3 +433,14 @@ - match: { samples_rejected_for_size: 0 } - match: { samples_accepted: 0 } - match: { last_exception.type: "script_exception" } + +--- +teardown: + + - do: + indices.delete_data_stream: + name: sample_test* + + - do: + indices.get_all_sample_configuration: {} + - length: { $body: 0 } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/SamplingServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/SamplingServiceIT.java index ef6dff74ed152..6b4417cb4de06 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/SamplingServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/SamplingServiceIT.java @@ -17,15 +17,25 @@ import org.elasticsearch.action.admin.indices.sampling.GetSampleAction; import org.elasticsearch.action.admin.indices.sampling.PutSampleConfigurationAction; import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.TransportBulkAction; +import org.elasticsearch.action.datastreams.CreateDataStreamAction; +import org.elasticsearch.action.datastreams.DeleteDataStreamAction; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.datastreams.DataStreamsPlugin; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.junit.After; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,6 +46,12 @@ import static org.hamcrest.Matchers.equalTo; public class SamplingServiceIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class); + } + public void testTTL() throws Exception { assumeTrue("Requires sampling feature flag", RANDOM_SAMPLING_FEATURE_FLAG); assertAcked( @@ -102,6 +118,66 @@ public void testDeleteIndex() throws Exception { }); } + public void testDeleteDataStream() throws Exception { + assumeTrue("Requires sampling feature flag", RANDOM_SAMPLING_FEATURE_FLAG); + String indexName = randomIdentifier(); + var template = ComposableIndexTemplate.builder() + .indexPatterns(List.of(indexName)) + + .template(new Template(Settings.EMPTY, CompressedXContent.fromJSON(""" + { + "_doc":{ + "dynamic":true, + "properties":{ + "foo":{ + "type":"text" + }, + "bar":{ + "type":"text" + } + } + } + } + """), null)) + .dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false)) + .build(); + var request = new TransportPutComposableIndexTemplateAction.Request("logs-template"); + request.indexTemplate(template); + safeGet(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request)); + client().execute( + CreateDataStreamAction.INSTANCE, + new CreateDataStreamAction.Request(TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS, indexName) + ).actionGet(); + ensureYellow(indexName); + PutSampleConfigurationAction.Request putSampleConfigRequest = new PutSampleConfigurationAction.Request( + new SamplingConfiguration(1.0d, 10, null, null, null), + TimeValue.THIRTY_SECONDS, + TimeValue.THIRTY_SECONDS + ).indices(indexName); + client().execute(PutSampleConfigurationAction.INSTANCE, putSampleConfigRequest).actionGet(); + for (int i = 0; i < 5; i++) { + BulkRequest bulkRequest = new BulkRequest(); + for (int j = 0; j < 20; j++) { + IndexRequest indexRequest = new IndexRequest(indexName); + indexRequest.create(true); + indexRequest.source(Map.of("@timestamp", 12345, "foo", randomBoolean() ? 3L : randomLong(), "bar", randomBoolean())); + bulkRequest.add(indexRequest); + } + BulkResponse bulkResponse = client().execute(TransportBulkAction.TYPE, bulkRequest).actionGet(); + assertThat(bulkResponse.hasFailures(), equalTo(false)); + } + GetSampleAction.Response getSampleResponse = client().execute(GetSampleAction.INSTANCE, new GetSampleAction.Request(indexName)) + .actionGet(); + assertThat(getSampleResponse.getSample().size(), equalTo(10)); + client().execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(TimeValue.THIRTY_SECONDS, indexName)) + .actionGet(); + assertBusy(() -> { + for (SamplingService samplingService : internalCluster().getInstances(SamplingService.class)) { + assertThat(samplingService.getLocalSample(ProjectId.DEFAULT, indexName), equalTo(List.of())); + } + }); + } + @After public void cleanup() { Map clearedSettings = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 473e97b6763b0..c83661ecd7229 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.ClusterStateAckListener; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; @@ -525,6 +526,22 @@ private void maybeDeleteSamplingConfigurations(ClusterChangedEvent event, Projec } } } + if (currentProject.dataStreams() != previousProject.dataStreams()) { + for (DataStream dataStream : previousProject.dataStreams().values()) { + DataStream current = currentProject.dataStreams().get(dataStream.getName()); + if (current == null) { + String dataStreamName = dataStream.getName(); + SamplingConfiguration samplingConfiguration = getSamplingConfiguration( + event.state().projectState(projectId).metadata(), + dataStreamName + ); + if (samplingConfiguration != null) { + logger.debug("Deleting sample configuration for {} because the data stream has been deleted", dataStreamName); + deleteSampleConfiguration(projectId, dataStreamName); + } + } + } + } } private void maybeScheduleJob() { From bbf85df3f0c074bd1ba0b545f87dc1ce4783b5db Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 28 Oct 2025 16:03:13 -0500 Subject: [PATCH 3/4] fixing tests --- .../TransportDeleteSampleConfigurationActionTests.java | 3 +++ .../sampling/TransportPutSampleConfigurationActionTests.java | 3 +++ 2 files changed, 6 insertions(+) diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationActionTests.java index 11d7b0e7a0fdf..f18d59b2e241f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportDeleteSampleConfigurationActionTests.java @@ -69,6 +69,9 @@ public void setUp() throws Exception { ThreadPool threadPool = mock(ThreadPool.class); ActionFilters actionFilters = mock(ActionFilters.class); projectResolver = mock(ProjectResolver.class); + when(projectResolver.getProjectMetadata((ClusterState) any())).thenReturn( + ClusterState.EMPTY_STATE.projectState(ProjectId.DEFAULT).metadata() + ); indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); samplingService = mock(SamplingService.class); diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java index db22b6e1f8303..b71ce2ff3f1d5 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java @@ -65,6 +65,9 @@ public void setUp() throws Exception { ThreadPool threadPool = mock(ThreadPool.class); ActionFilters actionFilters = mock(ActionFilters.class); projectResolver = mock(ProjectResolver.class); + when(projectResolver.getProjectMetadata((ClusterState) any())).thenReturn( + ClusterState.EMPTY_STATE.projectState(ProjectId.DEFAULT).metadata() + ); indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); samplingService = mock(SamplingService.class); From d0d2d5892307bc08ebd9daa89c58dab81e053dcb Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 28 Oct 2025 16:23:30 -0500 Subject: [PATCH 4/4] fixing tests --- .../test/indices.get_sample/30_with_data_streams.yml | 3 +++ .../test/indices.get_sample_stats/30_with_data_streams.yml | 3 +++ 2 files changed, 6 insertions(+) diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml index 09bf2ec7cde89..fcc740c286fa7 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample/30_with_data_streams.yml @@ -275,6 +275,9 @@ --- teardown: + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples - do: indices.delete_data_stream: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml index 6403c7c7e9606..d2acf4f61ac7a 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/30_with_data_streams.yml @@ -436,6 +436,9 @@ --- teardown: + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples - do: indices.delete_data_stream: