diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_sample_configuration.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_sample_configuration.json new file mode 100644 index 0000000000000..bfe4307ecc80d --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_sample_configuration.json @@ -0,0 +1,49 @@ +{ + "indices.put_sample_configuration": { + "documentation": { + "url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-put-sample-configuration", + "description": "Configure sampling for an index or data stream" + }, + "stability": "experimental", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ], + "content_type": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/{index}/_sample/config", + "methods": [ + "PUT" + ], + "parts": { + "index": { + "type": "string", + "description": "The name of a data stream or index" + } + } + } + ] + }, + "params": { + "master_timeout": { + "type": "time", + "description": "Timeout for connection to master node" + }, + "timeout": { + "type": "time", + "description": "Timeout for the request" + } + }, + "body": { + "description": "The sampling configuration", + "required": true + } + } +} + diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_sample_configuration/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_sample_configuration/10_basic.yml new file mode 100644 index 0000000000000..73c65833d0d42 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_sample_configuration/10_basic.yml @@ -0,0 +1,161 @@ +--- +setup: + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + +--- +teardown: + - do: + indices.delete: + index: "test-*" + ignore_unavailable: true + allow_no_indices: true + +--- +"Test Put sampling configuration with JSON body": + - do: + indices.create: + index: test-index + body: + settings: + number_of_shards: 1 + + - do: + indices.put_sample_configuration: + index: test-index + body: + rate: 0.5 + max_samples: 100 + max_size: "10mb" + time_to_live: "1h" + + - match: { acknowledged: true } + +--- +"Put sampling configuration with condition": + - do: + indices.create: + index: test-condition-index + + - do: + indices.put_sample_configuration: + index: test-condition-index + body: + rate: 1.0 + max_samples: 50 + if: "ctx?.field == 'sample_me'" + + - match: { acknowledged: true } + +--- +"Put sampling configuration with minimal parameters": + - do: + indices.create: + index: test-minimal-index + + - do: + indices.put_sample_configuration: + index: test-minimal-index + body: + rate: 0.1 + + - match: { acknowledged: true } + +--- +"Put sampling configuration overwrites existing": + - do: + indices.create: + index: test-overwrite-index + + # First configuration + - do: + indices.put_sample_configuration: + index: test-overwrite-index + body: + rate: 0.3 + max_samples: 25 + + - match: { acknowledged: true } + + # Overwrite with new configuration + - do: + indices.put_sample_configuration: + index: test-overwrite-index + body: + rate: 0.8 + max_samples: 75 + max_size: "5mb" + + - match: { acknowledged: true } + +--- +"Put sampling configuration for non-existent index": + - do: + catch: missing + indices.put_sample_configuration: + index: non-existent-index + body: + rate: 0.6 + max_samples: 150 + +--- +"Put sampling configuration with timeout parameters": + - do: + indices.create: + index: test-timeout-index + + - do: + indices.put_sample_configuration: + index: test-timeout-index + master_timeout: "30s" + timeout: "10s" + body: + rate: 0.4 + max_samples: 80 + + - match: { acknowledged: true } + +--- +"Put sampling configuration with invalid rate fails": + - do: + indices.create: + index: test-invalid-rate-index + + - do: + catch: bad_request + indices.put_sample_configuration: + index: test-invalid-rate-index + body: + rate: 1.5 # Invalid rate > 1.0 + max_samples: 100 + +--- +"Put sampling configuration with missing rate fails": + - do: + indices.create: + index: test-missing-rate-index + + - do: + catch: bad_request + indices.put_sample_configuration: + index: test-missing-rate-index + body: + max_samples: 100 # Missing required rate parameter + +--- +"Put sampling configuration with human readable values": + - do: + indices.create: + index: test-human-readable-index + + - do: + indices.put_sample_configuration: + index: test-human-readable-index + body: + rate: ".05" + max_samples: 1000 + max_size: "10mb" + time_to_live: "1d" + + - match: { acknowledged: true } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionIT.java new file mode 100644 index 0000000000000..7a856ccbd6731 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionIT.java @@ -0,0 +1,271 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class PutSampleConfigurationActionIT extends ESIntegTestCase { + + public void testPutSampleConfiguration() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Initially no sampling configuration should exist + assertSamplingConfigurationNotExists(indexName); + + // Create a sampling configuration + SamplingConfiguration config = new SamplingConfiguration(0.5d, 50, ByteSizeValue.ofMb(10), TimeValue.timeValueHours(1), null); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(indexName); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration should be acknowledged", response.isAcknowledged()); + + // Verify the configuration was stored + assertSamplingConfigurationExists(indexName, config); + } + + public void testPutSampleConfigurationOverwritesExisting() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Create initial configuration + SamplingConfiguration initialConfig = new SamplingConfiguration( + 0.3d, + 30, + ByteSizeValue.ofMb(5), + TimeValue.timeValueMinutes(30), + null + ); + PutSampleConfigurationAction.Request initialRequest = new PutSampleConfigurationAction.Request( + initialConfig, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + initialRequest.indices(indexName); + + AcknowledgedResponse initialResponse = client().execute(PutSampleConfigurationAction.INSTANCE, initialRequest).actionGet(); + assertTrue("Initial put should be acknowledged", initialResponse.isAcknowledged()); + assertSamplingConfigurationExists(indexName, initialConfig); + + // Overwrite with new configuration + SamplingConfiguration newConfig = new SamplingConfiguration(0.8d, 80, ByteSizeValue.ofMb(20), TimeValue.timeValueHours(2), null); + PutSampleConfigurationAction.Request updateRequest = new PutSampleConfigurationAction.Request( + newConfig, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + updateRequest.indices(indexName); + + AcknowledgedResponse updateResponse = client().execute(PutSampleConfigurationAction.INSTANCE, updateRequest).actionGet(); + assertTrue("Update put should be acknowledged", updateResponse.isAcknowledged()); + + // Verify the configuration was overwritten + assertSamplingConfigurationExists(indexName, newConfig); + } + + public void testPutSampleConfigurationWithCondition() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Create configuration with condition + String condition = "ctx?.field == 'sample_me'"; + SamplingConfiguration configWithCondition = new SamplingConfiguration( + 1.0d, + 100, + ByteSizeValue.ofMb(15), + TimeValue.timeValueHours(3), + condition + ); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + configWithCondition, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(indexName); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration with condition should be acknowledged", response.isAcknowledged()); + + // Verify the configuration with condition was stored + assertSamplingConfigurationExists(indexName, configWithCondition); + } + + public void testPutSampleConfigurationMultipleIndices() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName1 = randomIdentifier(); + String indexName2 = randomIdentifier(); + createIndex(indexName1); + createIndex(indexName2); + + // According to the transport action implementation, only the first index should be used + SamplingConfiguration config = new SamplingConfiguration(0.7d, 70, ByteSizeValue.ofMb(12), TimeValue.timeValueMinutes(90), null); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(indexName1, indexName2); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration should be acknowledged", response.isAcknowledged()); + + // Only the first index should have the configuration + assertSamplingConfigurationExists(indexName1, config); + assertSamplingConfigurationNotExists(indexName2); + } + + public void testPutSampleConfigurationNonExistentIndex() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String nonExistentIndex = randomIdentifier(); + + // Don't create the index - test that we cannot set sampling config for non-existent indices + SamplingConfiguration config = new SamplingConfiguration(0.6d, 60, ByteSizeValue.ofMb(8), TimeValue.timeValueMinutes(45), null); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(nonExistentIndex); + + // This should now fail - sampling configs cannot be set for non-existent indices + expectThrows(Exception.class, () -> { client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); }); + + // Verify no configuration was stored + assertSamplingConfigurationNotExists(nonExistentIndex); + + // Now create the index and verify the config can be set + createIndex(nonExistentIndex); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration should be acknowledged for existing index", response.isAcknowledged()); + assertSamplingConfigurationExists(nonExistentIndex, config); + } + + public void testPutSampleConfigurationPersistsAcrossClusterStateUpdates() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Store sampling configuration + SamplingConfiguration config = new SamplingConfiguration(0.9d, 90, ByteSizeValue.ofMb(25), TimeValue.timeValueHours(4), null); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(indexName); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration should be acknowledged", response.isAcknowledged()); + assertSamplingConfigurationExists(indexName, config); + + // Trigger cluster state updates by creating additional indices + for (int i = 0; i < 3; i++) { + createIndex("dummy-index-" + i); + } + + // Verify sampling configuration still exists after cluster state changes + assertSamplingConfigurationExists(indexName, config); + } + + public void testPutSampleConfigurationWithSamplingIntegration() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Put a sampling configuration that samples everything + SamplingConfiguration config = new SamplingConfiguration(1.0d, 100, ByteSizeValue.ofMb(50), TimeValue.timeValueDays(1), null); + PutSampleConfigurationAction.Request putRequest = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + putRequest.indices(indexName); + + AcknowledgedResponse putResponse = client().execute(PutSampleConfigurationAction.INSTANCE, putRequest).actionGet(); + assertTrue("Put sampling configuration should be acknowledged", putResponse.isAcknowledged()); + + // Index some documents + int docsToIndex = randomIntBetween(5, 15); + for (int i = 0; i < docsToIndex; i++) { + indexDoc(indexName, "doc-" + i, "field1", "value" + i); + } + + // Verify the sampling configuration works by getting samples + GetSampleAction.Request getSampleRequest = new GetSampleAction.Request(indexName); + GetSampleAction.Response getSampleResponse = client().execute(GetSampleAction.INSTANCE, getSampleRequest).actionGet(); + + // Since we're sampling at 100%, we should get all documents sampled + assertEquals("All documents should be sampled", docsToIndex, getSampleResponse.getSample().size()); + } + + private void assertSamplingConfigurationExists(String indexName, SamplingConfiguration expectedConfig) { + ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + ClusterState clusterState = clusterService.state(); + ProjectMetadata projectMetadata = clusterState.metadata().getProject(ProjectId.DEFAULT); + assertThat("Project metadata should exist", projectMetadata, notNullValue()); + + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + assertThat("Sampling metadata should exist", samplingMetadata, notNullValue()); + + Map configMap = samplingMetadata.getIndexToSamplingConfigMap(); + assertTrue("Configuration should exist for index " + indexName, configMap.containsKey(indexName)); + + SamplingConfiguration actualConfig = configMap.get(indexName); + assertThat("Configuration should match expected for index " + indexName, actualConfig, equalTo(expectedConfig)); + } + + private void assertSamplingConfigurationNotExists(String indexName) { + ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + ClusterState clusterState = clusterService.state(); + ProjectMetadata projectMetadata = clusterState.metadata().getProject(ProjectId.DEFAULT); + + if (projectMetadata == null) { + return; // No project metadata means no sampling config + } + + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + if (samplingMetadata == null) { + return; // No sampling metadata means no sampling config + } + + Map configMap = samplingMetadata.getIndexToSamplingConfigMap(); + assertThat("Configuration should not exist for index " + indexName, configMap.get(indexName), nullValue()); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 757b8911a540d..b831b37935d43 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -131,10 +131,13 @@ import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction; import org.elasticsearch.action.admin.indices.sampling.GetSampleAction; import org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.PutSampleConfigurationAction; import org.elasticsearch.action.admin.indices.sampling.RestGetSampleAction; import org.elasticsearch.action.admin.indices.sampling.RestGetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.RestPutSampleConfigurationAction; import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleAction; import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.TransportPutSampleConfigurationAction; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; @@ -824,6 +827,7 @@ public void reg if (RANDOM_SAMPLING_FEATURE_FLAG) { actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class); + actions.register(PutSampleConfigurationAction.INSTANCE, TransportPutSampleConfigurationAction.class); actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class); } @@ -1057,6 +1061,7 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< if (RANDOM_SAMPLING_FEATURE_FLAG) { registerHandler.accept(new RestGetSampleAction()); + registerHandler.accept(new RestPutSampleConfigurationAction()); registerHandler.accept(new RestGetSampleStatsAction()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/DeleteSamplingConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/DeleteSamplingConfigurationAction.java new file mode 100644 index 0000000000000..407357fc59ea3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/DeleteSamplingConfigurationAction.java @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Action for deleting sampling configurations from indices. + *

+ * This action allows administrators to remove sampling configurations from one or more indices + * or data streams. When a sampling configuration is deleted, any associated in-memory storage + * and sampling state is cleaned up. + *

+ *

+ * The action name is "indices:admin/sampling/config/delete" and it returns an {@link AcknowledgedResponse} + * to indicate whether the configuration was successfully removed. + *

+ * Usage examples: + *
    + *
  • {@code DELETE /logs/_sample/config} - Remove sampling configuration from the "logs" index
  • + *
  • {@code DELETE /logs,metrics/_sample/config} - Remove sampling configurations from multiple indices
  • + *
+ * + * @see SamplingConfiguration + * @see PutSampleConfigurationAction + * @see GetSamplingConfigurationAction + */ +public class DeleteSamplingConfigurationAction extends ActionType { + + /** + * The action name used to identify this action in the transport layer. + */ + public static final String NAME = "indices:admin/sampling/config/delete"; + + /** + * Singleton instance of this action type. + */ + public static final DeleteSamplingConfigurationAction INSTANCE = new DeleteSamplingConfigurationAction(); + + /** + * Private constructor to enforce singleton pattern. + * Creates a new action type with the predefined NAME. + */ + private DeleteSamplingConfigurationAction() { + super(NAME); + } + + /** + * Request class for deleting sampling configurations from indices. + *

+ * This request specifies which indices or data streams should have their sampling + * configurations removed. It extends {@link AcknowledgedRequest} to support + * master node timeout and acknowledgment timeout settings. + *

+ */ + public static class Request extends AcknowledgedRequest + implements + IndicesRequest.Replaceable { + + private String[] indices = Strings.EMPTY_ARRAY; + + /** + * Constructs a new request with specified timeouts. + * + * @param masterNodeTimeout the timeout for master node operations, or null for default + * @param ackTimeout the timeout for acknowledgment, or null for default + * @param indices the names of indices or data streams from which to remove sampling configurations + */ + public Request(@Nullable TimeValue masterNodeTimeout, @Nullable TimeValue ackTimeout, String... indices) { + super(masterNodeTimeout, ackTimeout); + this.indices = indices != null ? indices : Strings.EMPTY_ARRAY; + } + + /** + * Constructs a new request by deserializing from a StreamInput. + * + * @param in the stream input to read from + * @throws IOException if an I/O error occurs during deserialization + */ + public Request(StreamInput in) throws IOException { + super(in); + this.indices = in.readStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + } + + @Override + public String[] indices() { + return indices; + } + + @Override + public IndicesRequest indices(String... indices) { + this.indices = indices != null ? indices : Strings.EMPTY_ARRAY; + return this; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (indices == null || indices.length == 0) { + validationException = addValidationError("at least one index name is required", validationException); + } + for (String index : indices) { + if (Strings.isNullOrEmpty(index)) { + validationException = addValidationError("index name cannot be null or empty", validationException); + break; + } + } + return validationException; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "Deletes Sampling Configuration.", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(indices, request.indices) + && Objects.equals(this.masterNodeTimeout(), request.masterNodeTimeout()) + && Objects.equals(this.ackTimeout(), request.ackTimeout()); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(indices), masterNodeTimeout(), ackTimeout()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationAction.java new file mode 100644 index 0000000000000..e31ad9173d055 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationAction.java @@ -0,0 +1,251 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +/** + * Action type for retrieving sampling configuration for a specific index. + *

+ * This action allows clients to get the current sampling configuration + * that has been set on a specific index. Unlike {@link GetSamplingConfigurationsAction} + * which retrieves all sampling configurations for the entire cluster, this action + * targets a single index and returns its configuration in a structured format. + *

+ * The response format matches: + *
+ *  [
+ *      {
+ *      "index": "logs",
+ *      "configuration": {
+ *          "rate": "5%",
+ *          "if": "ctx?.network?.name == 'Guest'"
+ *      }
+ *      }
+ *  ]
+ *  
+ * + * @see SamplingConfiguration + * @see GetSamplingConfigurationsAction + * @see PutSampleConfigurationAction + */ +public class GetSamplingConfigurationAction extends ActionType { + + /** + * Singleton instance of the GetSamplingConfigurationAction. + * This provides a shared reference to the action type throughout the application. + */ + public static final GetSamplingConfigurationAction INSTANCE = new GetSamplingConfigurationAction(); + + /** + * The name identifier for this action type used in the transport layer. + * Format follows the pattern: "indices:admin/sampling/config/get" + */ + public static final String NAME = "indices:admin/sampling/config/get"; + + private GetSamplingConfigurationAction() { + super(NAME); + } + + /** + * Request object for getting the sampling configuration of a specific index. + *

+ * This request specifies which index's sampling configuration should be retrieved. + * The index name must be provided and cannot be null or empty. + *

+ */ + public static class Request extends ActionRequest implements IndicesRequest.Replaceable { + private String index; + + /** + * Constructs a new request for the specified index. + * + * @param index the name of the index to get the sampling configuration for. + * Must not be null or empty. + */ + public Request(String index) { + this.index = index; + } + + /** + * Constructs a new request by deserializing from a StreamInput. + * + * @param in the stream input to read from + * @throws IOException if an I/O error occurs during deserialization + */ + public Request(StreamInput in) throws IOException { + super(in); + this.index = in.readString(); + } + + /** + * Gets the index name for which to retrieve the sampling configuration. + * + * @return the index name + */ + public String getIndex() { + return index; + } + + @Override + public String[] indices() { + return new String[] { index }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public IndicesRequest indices(String... indices) { + if (indices == null || indices.length != 1) { + throw new IllegalArgumentException("GetSamplingConfigurationAction.Request requires exactly one index"); + } + this.index = indices[0]; + return this; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (Strings.isNullOrEmpty(index)) { + validationException = addValidationError("index name is required", validationException); + } + return validationException; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(index, request.index); + } + + @Override + public int hashCode() { + return Objects.hash(index); + } + } + + /** + * Response object containing the sampling configuration for a specific index. + *

+ * This response contains the index name and its associated sampling configuration. + * The response is designed to match the expected JSON format with an array containing + * a single object with "index" and "configuration" fields. + *

+ */ + public static class Response extends ActionResponse implements ToXContentObject { + private final String index; + private final SamplingConfiguration configuration; + + /** + * Constructs a new Response with the given index and configuration. + * + * @param index the index name + * @param configuration the sampling configuration for the index, or null if no configuration exists + */ + public Response(String index, SamplingConfiguration configuration) { + this.index = index; + this.configuration = configuration; + } + + /** + * Constructs a new Response by deserializing from a StreamInput. + * + * @param in the stream input to read from + * @throws IOException if an I/O error occurs during deserialization + */ + public Response(StreamInput in) throws IOException { + this.index = in.readString(); + this.configuration = in.readOptionalWriteable(SamplingConfiguration::new); + } + + /** + * Gets the index name. + * + * @return the index name + */ + public String getIndex() { + return index; + } + + /** + * Gets the sampling configuration for the index. + * + * @return the sampling configuration, or null if no configuration exists for this index + */ + public SamplingConfiguration getConfiguration() { + return configuration; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(index); + out.writeOptionalWriteable(configuration); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(); + builder.startObject(); + builder.field("index", index); + if (configuration != null) { + builder.field("configuration", configuration); + } else { + builder.nullField("configuration"); + } + builder.endObject(); + builder.endArray(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(index, response.index) && Objects.equals(configuration, response.configuration); + } + + @Override + public int hashCode() { + return Objects.hash(index, configuration); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationsAction.java new file mode 100644 index 0000000000000..d98854d718f09 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationsAction.java @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; + +/** + * Action type for retrieving sampling configurations. + *

+ * This action allows clients to get the current sampling configurations + * that have been set on one or more indices. The response contains a mapping + * from index names to their corresponding {@link SamplingConfiguration} objects. + *

+ * + * @see SamplingConfiguration + * @see PutSampleConfigurationAction + */ +public class GetSamplingConfigurationsAction extends ActionType { + + /** + * Singleton instance of the GetSamplingConfigurationsAction. + * This provides a shared reference to the action type throughout the application. + */ + public static final GetSamplingConfigurationsAction INSTANCE = new GetSamplingConfigurationsAction(); + + /** + * The name identifier for this action type used in the transport layer. + * Format follows the pattern: "indices:admin/sampling/config/get" + */ + public static final String NAME = "indices:admin/sampling/config/getAll"; + + private GetSamplingConfigurationsAction() { + super(NAME); + } + + /** + * Response object containing a map from index names to their sampling configurations. + *

+ * This response encapsulates a map where keys are index names and values are + * their corresponding sampling configurations. If an index has no sampling + * configuration, it will not be present in the map. + *

+ */ + public static class Response extends ActionResponse implements ToXContentObject { + private final Map indexToSamplingConfigMap; + + /** + * Constructs a new Response with the given index-to-configuration mapping. + * + * @param indexToSamplingConfigMap a map from index names to their sampling configurations. + */ + public Response(Map indexToSamplingConfigMap) { + this.indexToSamplingConfigMap = indexToSamplingConfigMap; + } + + /** + * Constructs a new Response by deserializing from a StreamInput. + *

+ * This constructor is used during deserialization when the response is received + * over the transport layer. It reads the map of index names to sampling configurations + * from the input stream. + *

+ * + * @param in the stream input to read from + * @throws IOException if an I/O error occurs during deserialization + */ + public Response(StreamInput in) throws IOException { + this.indexToSamplingConfigMap = in.readMap(StreamInput::readString, SamplingConfiguration::new); + } + + /** + * Returns the mapping of index names to their sampling configurations. + *

+ * The returned map contains all indices that have sampling configurations. + * Indices without sampling configurations will not be present in this map. + *

+ * + * @return the index-to-sampling-configuration map + */ + public Map getIndexToSamplingConfigMap() { + return indexToSamplingConfigMap; + } + + /** + * Serializes this response to a StreamOutput. + *

+ * This method writes the index-to-configuration map to the output stream + * so it can be transmitted over the transport layer. Each index name is + * written as a string, followed by its corresponding sampling configuration. + *

+ * + * @param out the stream output to write to + * @throws IOException if an I/O error occurs during serialization + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(indexToSamplingConfigMap, StreamOutput::writeString, (stream, config) -> config.writeTo(stream)); + } + + /** + * Determines whether this response is equal to another object. + *

+ * Two Response objects are considered equal if they contain the same + * index-to-sampling-configuration mappings. + *

+ * + * @param o the object to compare with this response + * @return true if the objects are equal, false otherwise + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response that = (Response) o; + return Objects.equals(indexToSamplingConfigMap, that.indexToSamplingConfigMap); + } + + /** + * Returns the hash code for this response. + *

+ * The hash code is computed based on the index-to-sampling-configuration map, + * ensuring that equal responses have the same hash code. + *

+ * + * @return the hash code value for this response + */ + @Override + public int hashCode() { + return Objects.hash(indexToSamplingConfigMap); + } + + /** + * Converts this response to XContent format. + *

+ * This method serializes the response into a format suitable for + * XContent builders, allowing it to be easily included in + * Elasticsearch responses. + *

+ * + * @param builder the XContent builder to use for serialization + * @param params additional parameters for serialization, if any + * @return the XContent builder with the serialized response + * @throws IOException if an I/O error occurs during serialization + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + { + builder.field("indices"); + builder.startArray(); + { + for (Map.Entry entry : indexToSamplingConfigMap.entrySet()) { + builder.startObject(); + { + builder.field("index", entry.getKey()); + builder.field("sampling_configuration", entry.getValue()); + } + builder.endObject(); + } + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationAction.java new file mode 100644 index 0000000000000..4ed7b571cba44 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationAction.java @@ -0,0 +1,215 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; + +/** + * Action for configuring sampling settings on indices. + *

+ * This action allows administrators to configure sampling parameters for one or more indices, + * including sampling rate, maximum number of samples, maximum size constraints, time-to-live + * settings, and conditional sampling criteria. + *

+ *

+ * The action name is "indices:admin/sample/config/update" and it returns an {@link AcknowledgedResponse} + * to indicate whether the configuration was successfully applied. + *

+ */ +public class PutSampleConfigurationAction extends ActionType { + /** + * The action name used to identify this action in the transport layer. + */ + public static final String NAME = "indices:admin/sample/config/update"; + + /** + * Singleton instance of this action type. + */ + public static final PutSampleConfigurationAction INSTANCE = new PutSampleConfigurationAction(); + + /** + * Constructs a new PutSampleConfigurationAction with the predefined action name. + */ + public PutSampleConfigurationAction() { + super(NAME); + } + + /** + * Request class for configuring sampling settings on indices. + *

+ * This request encapsulates all the parameters needed to configure sampling on one or more indices, + * including the sampling configuration itself and the target indices. It implements + * {@link IndicesRequest.Replaceable} to support index name resolution and expansion. + *

+ */ + public static class Request extends AcknowledgedRequest implements IndicesRequest.Replaceable { + private final SamplingConfiguration samplingConfiguration; + private String[] indices = Strings.EMPTY_ARRAY; + + /** + * Constructs a new request with the specified sampling configuration parameters. + * + * @param samplingConfiguration the sampling configuration to apply + * @param masterNodeTimeout the timeout for master node operations, or null for default + * @param ackTimeout the timeout for acknowledgment, or null for default + */ + public Request( + @Nullable SamplingConfiguration samplingConfiguration, + @Nullable TimeValue masterNodeTimeout, + @Nullable TimeValue ackTimeout + ) { + super(masterNodeTimeout, ackTimeout); + this.samplingConfiguration = samplingConfiguration; + } + + /** + * Constructs a new request by deserializing from a stream input. + * + * @param in the stream input to read from + * @throws IOException if an I/O error occurs during deserialization + */ + public Request(StreamInput in) throws IOException { + super(in); + this.indices = in.readStringArray(); + this.samplingConfiguration = new SamplingConfiguration(in); + } + + /** + * Serializes this request to a stream output. + * + * @param out the stream output to write to + * @throws IOException if an I/O error occurs during serialization + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + samplingConfiguration.writeTo(out); + } + + /** + * Returns the array of target indices for this sampling configuration request. + * + * @return an array of index names, never null but may be empty + */ + @Override + public String[] indices() { + return indices; + } + + /** + * Sets the target indices or data streams for this sampling configuration request. + * + * @param indices the names of indices or data streams to target + * @return this request instance for method chaining + */ + @Override + public Request indices(String... indices) { + this.indices = indices; + return this; + } + + /** + * Indicates whether this request should include data streams in addition to regular indices. + * + * @return true to include data streams + */ + @Override + public boolean includeDataStreams() { + return true; + } + + /** + * Returns the indices options for this request, which control how index names are resolved and expanded. + * + * @return the indices options, configured to be lenient and expand both open and closed indices + */ + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS; + } + + /** + * Creates a cancellable task for tracking the execution of this request. + * + * @param id the unique task identifier + * @param type the task type + * @param action the action name + * @param parentTaskId the parent task identifier, or null if this is a root task + * @param headers the request headers + * @return a new cancellable task instance + */ + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "Updates Sampling Configuration.", parentTaskId, headers); + } + + /** + * Returns the sampling configuration encapsulated by this request. + * + * @return the sampling configuration + */ + public SamplingConfiguration getSampleConfiguration() { + return samplingConfiguration; + } + + /** + * Compares this request with another object for equality. + *

+ * Two requests are considered equal if they have the same target indices and + * sampling configuration parameters. + *

+ * + * @param o the object to compare with + * @return true if the objects are equal, false otherwise + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request that = (Request) o; + return Arrays.equals(indices, that.indices) + && Objects.equals(samplingConfiguration, that.samplingConfiguration) + && Objects.equals(masterNodeTimeout(), that.masterNodeTimeout()) + && Objects.equals(ackTimeout(), that.ackTimeout()); + } + + /** + * Returns the hash code for this request. + *

+ * The hash code is computed based on the target indices, sampling configuration, + * master node timeout, and acknowledgment timeout. + *

+ * + * @return the hash code value + */ + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(indices), samplingConfiguration, masterNodeTimeout(), ackTimeout()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestPutSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestPutSampleConfigurationAction.java new file mode 100644 index 0000000000000..c091c1f9fa5fd --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestPutSampleConfigurationAction.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration.DEFAULT_MAX_SAMPLES; +import static org.elasticsearch.rest.RestRequest.Method.PUT; +import static org.elasticsearch.rest.RestUtils.getAckTimeout; +import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout; + +/** + * REST action for updating sampling configurations for indices. + *

+ * Handles PUT requests to /{index}/_sample/config endpoint and delegates + * to the PutSampleConfigurationAction transport action. + *

+ * + *

Example usage:

+ *
{@code
+ * PUT /my-index/_sample/config
+ * {
+ *   "rate": ".05",
+ *   "max_samples": 1000,
+ *   "max_size": "10mb",
+ *   "time_to_live": "1d",
+ *   "if": "ctx?.network?.name == 'Guest'"
+ * }
+ * }
+ */ +@ServerlessScope(Scope.INTERNAL) +public class RestPutSampleConfigurationAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of(new Route(PUT, "/{index}/_sample/config")); + } + + @Override + public String getName() { + return "put_sample_configuration_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + String indexName = request.param("index"); + + PutSampleConfigurationAction.Request putRequest; + + if (request.hasContent()) { + // Parse the sampling configuration from request body + try (XContentParser parser = request.contentParser()) { + SamplingConfiguration samplingConfig = SamplingConfiguration.fromXContent(parser); + putRequest = new PutSampleConfigurationAction.Request( + samplingConfig, + getMasterNodeTimeout(request), + getAckTimeout(request) + ); + } + } else { + // Use URL parameters if no request body provided + if (request.hasParam("rate") == false) { + throw new IllegalArgumentException("Missing required parameter: rate"); + } + double rate = request.paramAsDouble("rate", 0); // required parameter, default won't be used + int maxSamples = request.paramAsInt("max_samples", DEFAULT_MAX_SAMPLES); + ByteSizeValue maxSize = request.paramAsSize("max_size", null); + TimeValue timeToLive = request.paramAsTime("time_to_live", null); + String condition = request.param("if", null); + + putRequest = new PutSampleConfigurationAction.Request( + new SamplingConfiguration(rate, maxSamples, maxSize, timeToLive, condition), + getMasterNodeTimeout(request), + getAckTimeout(request) + ); + } + + // Set the target index + putRequest.indices(indexName); + + return channel -> client.execute(PutSampleConfigurationAction.INSTANCE, putRequest, new RestToXContentListener<>(channel)); + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java new file mode 100644 index 0000000000000..468095d158081 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * Transport action for updating sampling configurations in cluster metadata. + *

+ * This action handles the cluster state update required to store sampling configurations + * for the specified indices. It validates the request, resolves index names, and updates + * the cluster metadata with the new sampling configuration. + *

+ */ +public class TransportPutSampleConfigurationAction extends AcknowledgedTransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportPutSampleConfigurationAction.class); + private final ProjectResolver projectResolver; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SamplingService samplingService; + + @Inject + public TransportPutSampleConfigurationAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + IndexNameExpressionResolver indexNameExpressionResolver, + SamplingService samplingService + ) { + super( + PutSampleConfigurationAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + PutSampleConfigurationAction.Request::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.projectResolver = projectResolver; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.samplingService = samplingService; + } + + @Override + protected void masterOperation( + Task task, + PutSampleConfigurationAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + // throws IndexNotFoundException if any index does not exist + indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request); + + ProjectId projectId = projectResolver.getProjectId(); + samplingService.updateSampleConfiguration( + projectId, + request.indices()[0], + request.getSampleConfiguration(), + request.masterNodeTimeout(), + request.ackTimeout(), + listener + ); + state.projectState(projectId).metadata().custom("sample_config"); + } + + @Override + protected ClusterBlockException checkBlock(PutSampleConfigurationAction.Request request, ClusterState state) { + return null; + } + +} diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 800b760879769..a292b2d08243f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -9,15 +9,23 @@ package org.elasticsearch.ingest; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -26,6 +34,7 @@ import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; import org.elasticsearch.script.IngestConditionalScript; @@ -41,6 +50,7 @@ import java.io.IOException; import java.lang.ref.SoftReference; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -58,6 +68,8 @@ public class SamplingService implements ClusterStateListener { private final ProjectResolver projectResolver; private final LongSupplier relativeMillisTimeSupplier; private final LongSupplier statsTimeSupplier = System::nanoTime; + private final MasterServiceTaskQueue updateSamplingConfigurationTaskQueue; + /* * This Map contains the samples that exist on this node. They are not persisted to disk. They are stored as SoftReferences so that * sampling does not contribute to a node running out of memory. The idea is that access to samples is desirable, but not critical. We @@ -75,6 +87,11 @@ public SamplingService( this.clusterService = clusterService; this.projectResolver = projectResolver; this.relativeMillisTimeSupplier = relativeMillisTimeSupplier; + this.updateSamplingConfigurationTaskQueue = clusterService.createTaskQueue( + "update-sampling-configuration", + Priority.NORMAL, + new UpdateSamplingConfigurationExecutor(projectResolver) + ); } /** @@ -257,6 +274,21 @@ public boolean atLeastOneSampleConfigured() { } } + public void updateSampleConfiguration( + ProjectId projectId, + String index, + SamplingConfiguration samplingConfiguration, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + updateSamplingConfigurationTaskQueue.submitTask( + "Updating Sampling Configuration", + new UpdateSamplingConfigurationTask(projectId, index, samplingConfiguration, ackTimeout, listener), + masterNodeTimeout + ); + } + @Override public void clusterChanged(ClusterChangedEvent event) { // TODO: React to sampling config changes @@ -686,6 +718,102 @@ void setScript(Script script, IngestConditionalScript.Factory factory) { } } + static class UpdateSamplingConfigurationTask extends AckedBatchedClusterStateUpdateTask { + final ProjectId projectId; + private final String indexName; + private final SamplingConfiguration samplingConfiguration; + + UpdateSamplingConfigurationTask( + ProjectId projectId, + String indexName, + SamplingConfiguration samplingConfiguration, + TimeValue ackTimeout, + ActionListener listener + ) { + super(ackTimeout, listener); + this.projectId = projectId; + this.indexName = indexName; + this.samplingConfiguration = samplingConfiguration; + } + } + + static class UpdateSamplingConfigurationExecutor extends SimpleBatchedAckListenerTaskExecutor { + private static final Logger logger = LogManager.getLogger(UpdateSamplingConfigurationExecutor.class); + private final ProjectResolver projectResolver; + + UpdateSamplingConfigurationExecutor(ProjectResolver projectResolver) { + this.projectResolver = projectResolver; + } + + @Override + public Tuple executeTask( + UpdateSamplingConfigurationTask updateSamplingConfigurationTask, + ClusterState clusterState + ) { + logger.debug( + "Updating sampling configuration for index [{}] in project [{}] with rate [{}], maxSamples [{}]", + updateSamplingConfigurationTask.indexName, + updateSamplingConfigurationTask.projectId, + updateSamplingConfigurationTask.samplingConfiguration.rate(), + updateSamplingConfigurationTask.samplingConfiguration.maxSamples() + ); + + logger.debug( + "Configuration details: maxSize [{}], timeToLive [{}]", + updateSamplingConfigurationTask.samplingConfiguration.maxSize(), + updateSamplingConfigurationTask.samplingConfiguration.timeToLive() + ); + + // Get sampling metadata + ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState); + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + + boolean isNewConfiguration = samplingMetadata == null; + int existingConfigCount = isNewConfiguration ? 0 : samplingMetadata.getIndexToSamplingConfigMap().size(); + + logger.trace( + "Current sampling metadata state for project [{}]: {} (existing configurations: {})", + updateSamplingConfigurationTask.projectId, + isNewConfiguration ? "null" : "exists", + existingConfigCount + ); + + // Update with new sampling configuration if it exists or create new sampling metadata with the configuration + Map updatedConfigMap = new HashMap<>(); + if (samplingMetadata != null) { + updatedConfigMap.putAll(samplingMetadata.getIndexToSamplingConfigMap()); + } + + boolean isUpdate = updatedConfigMap.containsKey(updateSamplingConfigurationTask.indexName); + updatedConfigMap.put(updateSamplingConfigurationTask.indexName, updateSamplingConfigurationTask.samplingConfiguration); + + logger.trace( + "{} sampling configuration for index [{}], total configurations after update: {}", + isUpdate ? "Updated" : "Added", + updateSamplingConfigurationTask.indexName, + updatedConfigMap.size() + ); + + SamplingMetadata newSamplingMetadata = new SamplingMetadata(updatedConfigMap); + + // Update cluster state + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, newSamplingMetadata); + + // Return tuple with updated cluster state and the original listener + ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build(); + + logger.info( + "Successfully {} sampling configuration for index [{}] in project [{}]", + isUpdate ? "updated" : "created", + updateSamplingConfigurationTask.indexName, + updateSamplingConfigurationTask.projectId + ); + + return new Tuple<>(updatedClusterState, updateSamplingConfigurationTask); + } + } + /* * This is meant to be used internally as the key of the map of samples */ diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/DeleteSamplingConfigurationActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/DeleteSamplingConfigurationActionTests.java new file mode 100644 index 0000000000000..097390741199e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/DeleteSamplingConfigurationActionTests.java @@ -0,0 +1,163 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class DeleteSamplingConfigurationActionTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return DeleteSamplingConfigurationAction.Request::new; + } + + @Override + protected DeleteSamplingConfigurationAction.Request createTestInstance() { + return createRandomRequest(); + } + + @Override + protected DeleteSamplingConfigurationAction.Request mutateInstance(DeleteSamplingConfigurationAction.Request instance) { + return switch (randomIntBetween(0, 2)) { + case 0 -> { + // Mutate indices + DeleteSamplingConfigurationAction.Request mutated = new DeleteSamplingConfigurationAction.Request( + instance.masterNodeTimeout(), + instance.ackTimeout(), + randomValueOtherThan( + instance.indices(), + () -> randomArray(1, 5, String[]::new, () -> randomAlphaOfLengthBetween(1, 10)) + ) + ); + yield mutated; + } + case 1 -> { + // Mutate master node timeout + DeleteSamplingConfigurationAction.Request mutated = new DeleteSamplingConfigurationAction.Request( + randomValueOtherThan(instance.masterNodeTimeout(), () -> TimeValue.timeValueSeconds(randomIntBetween(1, 60))), + instance.ackTimeout(), + instance.indices() + ); + yield mutated; + } + case 2 -> { + // Mutate ack timeout + DeleteSamplingConfigurationAction.Request mutated = new DeleteSamplingConfigurationAction.Request( + instance.masterNodeTimeout(), + randomValueOtherThan(instance.ackTimeout(), () -> TimeValue.timeValueSeconds(randomIntBetween(1, 60))), + instance.indices() + ); + yield mutated; + } + default -> throw new IllegalStateException("Invalid mutation case"); + }; + } + + private DeleteSamplingConfigurationAction.Request createRandomRequest() { + String[] indices = randomArray(1, 5, String[]::new, () -> randomAlphaOfLengthBetween(1, 10)); + TimeValue masterNodeTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 60)); + TimeValue ackTimeout = TimeValue.timeValueSeconds(randomIntBetween(1, 60)); + + return new DeleteSamplingConfigurationAction.Request(masterNodeTimeout, ackTimeout, indices); + } + + public void testActionName() { + assertThat(DeleteSamplingConfigurationAction.NAME, equalTo("indices:admin/sampling/config/delete")); + assertThat(DeleteSamplingConfigurationAction.INSTANCE.name(), equalTo(DeleteSamplingConfigurationAction.NAME)); + } + + public void testActionInstance() { + assertThat(DeleteSamplingConfigurationAction.INSTANCE, notNullValue()); + assertThat(DeleteSamplingConfigurationAction.INSTANCE, sameInstance(DeleteSamplingConfigurationAction.INSTANCE)); + } + + public void testRequestValidation() { + // Valid request + DeleteSamplingConfigurationAction.Request validRequest = new DeleteSamplingConfigurationAction.Request( + TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(10), + "test-index" + ); + assertThat(validRequest.validate(), nullValue()); + + // Invalid request with no indices + DeleteSamplingConfigurationAction.Request noIndicesRequest = new DeleteSamplingConfigurationAction.Request( + TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(10) + ); + ActionRequestValidationException validation = noIndicesRequest.validate(); + assertThat(validation, notNullValue()); + assertThat(validation.getMessage(), containsString("at least one index name is required")); + + // Invalid request with empty indices array + DeleteSamplingConfigurationAction.Request emptyIndicesRequest = new DeleteSamplingConfigurationAction.Request( + TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(10), + new String[0] + ); + ActionRequestValidationException emptyValidation = emptyIndicesRequest.validate(); + assertThat(emptyValidation, notNullValue()); + assertThat(emptyValidation.getMessage(), containsString("at least one index name is required")); + + // Invalid request with null index in array + DeleteSamplingConfigurationAction.Request nullIndexRequest = new DeleteSamplingConfigurationAction.Request( + TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(10), + "valid", + null, + "also-valid" + ); + ActionRequestValidationException nullValidation = nullIndexRequest.validate(); + assertThat(nullValidation, notNullValue()); + assertThat(nullValidation.getMessage(), containsString("index name cannot be null or empty")); + + // Invalid request with empty index in array + DeleteSamplingConfigurationAction.Request emptyIndexRequest = new DeleteSamplingConfigurationAction.Request( + TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(10), + "valid", + "", + "also-valid" + ); + ActionRequestValidationException emptyIndexValidation = emptyIndexRequest.validate(); + assertThat(emptyIndexValidation, notNullValue()); + assertThat(emptyIndexValidation.getMessage(), containsString("index name cannot be null or empty")); + } + + public void testRequestIndicesOptions() { + DeleteSamplingConfigurationAction.Request request = createRandomRequest(); + assertThat(request.indicesOptions(), equalTo(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED)); + assertThat(request.includeDataStreams(), is(true)); + } + + public void testMultipleIndicesSupport() { + String[] indices = { "logs", "metrics", "traces", "data-stream-1" }; + DeleteSamplingConfigurationAction.Request request = new DeleteSamplingConfigurationAction.Request( + TimeValue.timeValueSeconds(10), + TimeValue.timeValueSeconds(10), + indices + ); + + assertThat(request.indices(), equalTo(indices)); + assertThat(request.validate(), nullValue()); + assertThat(request.includeDataStreams(), is(true)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationActionRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationActionRequestTests.java new file mode 100644 index 0000000000000..8d17051fe0dad --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationActionRequestTests.java @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class GetSamplingConfigurationActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return GetSamplingConfigurationAction.Request::new; + } + + @Override + protected GetSamplingConfigurationAction.Request createTestInstance() { + return new GetSamplingConfigurationAction.Request(randomAlphaOfLengthBetween(1, 20)); + } + + @Override + protected GetSamplingConfigurationAction.Request mutateInstance(GetSamplingConfigurationAction.Request instance) { + return new GetSamplingConfigurationAction.Request( + randomValueOtherThan(instance.getIndex(), () -> randomAlphaOfLengthBetween(1, 20)) + ); + } + + public void testRequestValidation() { + // Valid request + GetSamplingConfigurationAction.Request validRequest = new GetSamplingConfigurationAction.Request("test-index"); + assertThat(validRequest.validate(), nullValue()); + + // Invalid request with null index + GetSamplingConfigurationAction.Request invalidRequest = new GetSamplingConfigurationAction.Request((String) null); + ActionRequestValidationException validation = invalidRequest.validate(); + assertThat(validation, notNullValue()); + assertThat(validation.getMessage(), containsString("index name is required")); + + // Invalid request with empty index + GetSamplingConfigurationAction.Request emptyIndexRequest = new GetSamplingConfigurationAction.Request(""); + ActionRequestValidationException emptyValidation = emptyIndexRequest.validate(); + assertThat(emptyValidation, notNullValue()); + assertThat(emptyValidation.getMessage(), containsString("index name is required")); + } + + public void testRequestIndices() { + String indexName = "test-index"; + GetSamplingConfigurationAction.Request request = new GetSamplingConfigurationAction.Request(indexName); + + assertThat(request.indices(), equalTo(new String[] { indexName })); + assertThat(request.indicesOptions(), equalTo(IndicesOptions.strictSingleIndexNoExpandForbidClosed())); + assertThat(request.includeDataStreams(), equalTo(true)); + } + + public void testRequestIndicesReplacement() { + GetSamplingConfigurationAction.Request request = new GetSamplingConfigurationAction.Request("original-index"); + + // Valid replacement with single index + request.indices("new-index"); + assertThat(request.getIndex(), equalTo("new-index")); + assertThat(request.indices(), equalTo(new String[] { "new-index" })); + + // Invalid replacement with multiple indices should throw exception + expectThrows(IllegalArgumentException.class, () -> request.indices("index1", "index2")); + + // Invalid replacement with no indices should throw exception + expectThrows(IllegalArgumentException.class, () -> request.indices(new String[0])); + + // Invalid replacement with null should throw exception + expectThrows(IllegalArgumentException.class, () -> request.indices((String[]) null)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationActionResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationActionResponseTests.java new file mode 100644 index 0000000000000..d50a53e6f0141 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationActionResponseTests.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class GetSamplingConfigurationActionResponseTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return GetSamplingConfigurationAction.Response::new; + } + + @Override + protected GetSamplingConfigurationAction.Response createTestInstance() { + return createRandomResponse(); + } + + @Override + protected GetSamplingConfigurationAction.Response mutateInstance(GetSamplingConfigurationAction.Response instance) { + return switch (randomIntBetween(0, 1)) { + case 0 -> new GetSamplingConfigurationAction.Response( + randomValueOtherThan(instance.getIndex(), () -> randomAlphaOfLengthBetween(1, 20)), + instance.getConfiguration() + ); + case 1 -> new GetSamplingConfigurationAction.Response( + instance.getIndex(), + randomValueOtherThan(instance.getConfiguration(), this::createRandomSamplingConfiguration) + ); + default -> throw new IllegalStateException("Invalid mutation case"); + }; + } + + private GetSamplingConfigurationAction.Response createRandomResponse() { + String indexName = randomAlphaOfLengthBetween(1, 20); + SamplingConfiguration config = randomBoolean() ? null : createRandomSamplingConfiguration(); + return new GetSamplingConfigurationAction.Response(indexName, config); + } + + private SamplingConfiguration createRandomSamplingConfiguration() { + return new SamplingConfiguration( + randomDoubleBetween(0.0, 1.0, true), + randomBoolean() ? null : randomIntBetween(1, SamplingConfiguration.MAX_SAMPLES_LIMIT), + randomBoolean() ? null : ByteSizeValue.ofGb(randomLongBetween(1, SamplingConfiguration.MAX_SIZE_LIMIT_GIGABYTES)), + randomBoolean() ? null : TimeValue.timeValueDays(randomLongBetween(1, SamplingConfiguration.MAX_TIME_TO_LIVE_DAYS)), + randomBoolean() ? null : randomAlphaOfLength(10) + ); + } + + public void testActionName() { + assertThat(GetSamplingConfigurationAction.NAME, equalTo("indices:admin/sampling/config/get")); + assertThat(GetSamplingConfigurationAction.INSTANCE.name(), equalTo(GetSamplingConfigurationAction.NAME)); + } + + public void testActionInstance() { + assertThat(GetSamplingConfigurationAction.INSTANCE, notNullValue()); + assertThat(GetSamplingConfigurationAction.INSTANCE, sameInstance(GetSamplingConfigurationAction.INSTANCE)); + } + + public void testResponseWithNullConfiguration() throws IOException { + String indexName = "test-index"; + GetSamplingConfigurationAction.Response response = new GetSamplingConfigurationAction.Response(indexName, null); + + // Test serialization/deserialization + GetSamplingConfigurationAction.Response deserialized = copyInstance(response); + assertThat(deserialized, equalTo(response)); + assertThat(deserialized.getIndex(), equalTo(indexName)); + assertThat(deserialized.getConfiguration(), nullValue()); + } + + public void testResponseWithConfiguration() throws IOException { + String indexName = "test-index"; + SamplingConfiguration config = createRandomSamplingConfiguration(); + GetSamplingConfigurationAction.Response response = new GetSamplingConfigurationAction.Response(indexName, config); + + // Test serialization/deserialization + GetSamplingConfigurationAction.Response deserialized = copyInstance(response); + assertThat(deserialized, equalTo(response)); + assertThat(deserialized.getIndex(), equalTo(indexName)); + assertThat(deserialized.getConfiguration(), equalTo(config)); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationsActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationsActionTests.java new file mode 100644 index 0000000000000..54bc65408807c --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSamplingConfigurationsActionTests.java @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class GetSamplingConfigurationsActionTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return GetSamplingConfigurationsAction.Response::new; + } + + @Override + protected GetSamplingConfigurationsAction.Response createTestInstance() { + return createRandomResponse(); + } + + @Override + protected GetSamplingConfigurationsAction.Response mutateInstance(GetSamplingConfigurationsAction.Response instance) { + Map originalMap = instance.getIndexToSamplingConfigMap(); + + // Create a mutated map by either adding, removing, or changing entries + Map mutatedMap = new HashMap<>(originalMap); + + if (mutatedMap.isEmpty() || randomBoolean()) { + // Add a new entry + mutatedMap.put(randomAlphaOfLength(10), createRandomSamplingConfiguration()); + } else if (randomBoolean()) { + // Remove an entry + String keyToRemove = randomFrom(mutatedMap.keySet()); + mutatedMap.remove(keyToRemove); + } else { + // Change an existing entry + String keyToChange = randomFrom(mutatedMap.keySet()); + mutatedMap.put(keyToChange, createRandomSamplingConfiguration()); + } + + return new GetSamplingConfigurationsAction.Response(mutatedMap); + } + + private GetSamplingConfigurationsAction.Response createRandomResponse() { + Map indexToConfigMap = new HashMap<>(); + + int numEntries = randomIntBetween(0, 5); + for (int i = 0; i < numEntries; i++) { + String indexName = randomAlphaOfLengthBetween(1, 20); + SamplingConfiguration config = createRandomSamplingConfiguration(); + indexToConfigMap.put(indexName, config); + } + + return new GetSamplingConfigurationsAction.Response(indexToConfigMap); + } + + private SamplingConfiguration createRandomSamplingConfiguration() { + return new SamplingConfiguration( + randomDoubleBetween(0.0, 1.0, true), + randomBoolean() ? null : randomIntBetween(1, SamplingConfiguration.MAX_SAMPLES_LIMIT), + randomBoolean() ? null : ByteSizeValue.ofGb(randomLongBetween(1, SamplingConfiguration.MAX_SIZE_LIMIT_GIGABYTES)), + randomBoolean() ? null : TimeValue.timeValueDays(randomLongBetween(1, SamplingConfiguration.MAX_TIME_TO_LIVE_DAYS)), + randomBoolean() ? null : randomAlphaOfLength(10) + ); + } + + public void testActionName() { + assertThat(GetSamplingConfigurationsAction.NAME, equalTo("indices:admin/sampling/config/getAll")); + assertThat(GetSamplingConfigurationsAction.INSTANCE.name(), equalTo(GetSamplingConfigurationsAction.NAME)); + } + + public void testActionInstance() { + assertThat(GetSamplingConfigurationsAction.INSTANCE, notNullValue()); + assertThat(GetSamplingConfigurationsAction.INSTANCE, sameInstance(GetSamplingConfigurationsAction.INSTANCE)); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionTests.java new file mode 100644 index 0000000000000..39e4264435434 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionTests.java @@ -0,0 +1,130 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class PutSampleConfigurationActionTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return PutSampleConfigurationAction.Request::new; + } + + @Override + protected PutSampleConfigurationAction.Request createTestInstance() { + return createRandomRequest(); + } + + @Override + protected PutSampleConfigurationAction.Request mutateInstance(PutSampleConfigurationAction.Request instance) { + return switch (randomIntBetween(0, 1)) { + case 0 -> { + PutSampleConfigurationAction.Request mutated = new PutSampleConfigurationAction.Request( + randomValueOtherThan(instance.getSampleConfiguration(), () -> createRandomSampleConfig()), + instance.masterNodeTimeout(), + instance.ackTimeout() + ); + mutated.indices(instance.indices()); + yield mutated; + } + case 1 -> { + PutSampleConfigurationAction.Request mutated = new PutSampleConfigurationAction.Request( + instance.getSampleConfiguration(), + instance.masterNodeTimeout(), + instance.ackTimeout() + ); + mutated.indices( + randomValueOtherThan( + instance.indices(), + () -> randomArray(1, 5, String[]::new, () -> randomAlphaOfLengthBetween(1, 10)) + ) + ); + yield mutated; + } + default -> throw new IllegalStateException("Invalid mutation case"); + }; + } + + private PutSampleConfigurationAction.Request createRandomRequest() { + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + createRandomSampleConfig(), + TimeValue.timeValueSeconds(randomIntBetween(1, 60)), + TimeValue.timeValueSeconds(randomIntBetween(1, 60)) + ); + + // Randomly set some indices + if (randomBoolean()) { + request.indices(randomArray(0, 3, String[]::new, () -> randomAlphaOfLengthBetween(1, 10))); + } + + return request; + } + + public void testActionName() { + assertThat(PutSampleConfigurationAction.NAME, equalTo("indices:admin/sample/config/update")); + assertThat(PutSampleConfigurationAction.INSTANCE.name(), equalTo(PutSampleConfigurationAction.NAME)); + } + + public void testActionInstance() { + assertThat(PutSampleConfigurationAction.INSTANCE, notNullValue()); + assertThat(PutSampleConfigurationAction.INSTANCE, sameInstance(PutSampleConfigurationAction.INSTANCE)); + } + + public void testRequestIndicesOptions() { + PutSampleConfigurationAction.Request request = createRandomRequest(); + assertThat(request.indicesOptions(), equalTo(IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS)); + assertThat(request.includeDataStreams(), is(true)); + } + + public void testRequestTaskCreation() { + PutSampleConfigurationAction.Request request = createRandomRequest(); + + long id = randomLong(); + String type = randomAlphaOfLength(5); + String action = randomAlphaOfLength(10); + TaskId parentTaskId = randomBoolean() ? null : new TaskId(randomAlphaOfLength(5), randomLong()); + Map headers = Map.of("header1", "value1"); + + Task task = request.createTask(id, type, action, parentTaskId, headers); + + assertThat(task, notNullValue()); + assertThat(task.getId(), equalTo(id)); + assertThat(task.getType(), equalTo(type)); + assertThat(task.getAction(), equalTo(action)); + assertThat(task.getParentTaskId(), equalTo(parentTaskId)); + assertThat(task.headers(), equalTo(headers)); + } + + private static SamplingConfiguration createRandomSampleConfig() { + return new SamplingConfiguration( + randomDoubleBetween(0.0, 1.0, true), + randomBoolean() ? null : randomIntBetween(1, 1000), + randomBoolean() ? null : ByteSizeValue.ofGb(randomIntBetween(1, 5)), + randomBoolean() ? null : new TimeValue(randomIntBetween(1, 30), TimeUnit.DAYS), + randomBoolean() ? randomAlphaOfLength(10) : null + ); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java new file mode 100644 index 0000000000000..8b43f1e4d9b5f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java @@ -0,0 +1,226 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for TransportPutSampleConfigurationAction. + * Tests the logic in isolation using mocks for dependencies. + */ +public class TransportPutSampleConfigurationActionTests extends ESTestCase { + + private ProjectResolver projectResolver; + private IndexNameExpressionResolver indexNameExpressionResolver; + private SamplingService samplingService; + private TransportPutSampleConfigurationAction action; + + @Override + public void setUp() throws Exception { + super.setUp(); + + TransportService transportService = mock(TransportService.class); + ClusterService clusterService = mock(ClusterService.class); + ThreadPool threadPool = mock(ThreadPool.class); + ActionFilters actionFilters = mock(ActionFilters.class); + projectResolver = mock(ProjectResolver.class); + indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); + samplingService = mock(SamplingService.class); + + // Mock thread context + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + action = new TransportPutSampleConfigurationAction( + transportService, + clusterService, + threadPool, + actionFilters, + projectResolver, + indexNameExpressionResolver, + samplingService + ); + } + + /** + * Tests successful masterOperation execution and verifies sampling metadata updates. + */ + public void testMasterOperationSuccess() throws Exception { + // Setup test data + ProjectId projectId = randomProjectIdOrDefault(); + String indexName = randomIdentifier(); + SamplingConfiguration config = createRandomSamplingConfiguration(); + TimeValue masterNodeTimeout = randomTimeValue(100, 200); + TimeValue ackTimeout = randomTimeValue(100, 200); + + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request(config, masterNodeTimeout, ackTimeout); + request.indices(indexName); + + // Create initial cluster state with project metadata + ProjectMetadata initialProjectMetadata = ProjectMetadata.builder(projectId).build(); + ClusterState initialClusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(initialProjectMetadata) + .build(); + + Task task = mock(Task.class); + + // Setup mocks + when(projectResolver.getProjectId()).thenReturn(projectId); + + // Create updated cluster state with sampling metadata + Map indexToSampleConfigMap = new HashMap<>(); + indexToSampleConfigMap.put(indexName, config); + SamplingMetadata samplingMetadata = new SamplingMetadata(indexToSampleConfigMap); + + ProjectMetadata updatedProjectMetadata = ProjectMetadata.builder(projectId) + .putCustom(SamplingMetadata.TYPE, samplingMetadata) + .build(); + ClusterState updatedClusterState = ClusterState.builder(initialClusterState).putProjectMetadata(updatedProjectMetadata).build(); + + // Mock samplingService to simulate the actual cluster state update + AtomicReference capturedClusterState = new AtomicReference<>(); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(5); + + // Simulate the sampling service updating cluster state and calling listener + capturedClusterState.set(updatedClusterState); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(samplingService) + .updateSampleConfiguration( + eq(projectId), + eq(indexName), + eq(config), + eq(request.masterNodeTimeout()), + eq(request.ackTimeout()), + any() + ); + + // Test execution with CountDownLatch to handle async operation + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + + ActionListener testListener = new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse response) { + responseRef.set(response); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }; + + action.masterOperation(task, request, initialClusterState, testListener); + + // Wait for async operation to complete + assertTrue("Operation should complete within timeout", latch.await(5, TimeUnit.SECONDS)); + + // Verify results + assertThat(responseRef.get(), not(nullValue())); + assertThat(responseRef.get().isAcknowledged(), equalTo(true)); + assertThat(exceptionRef.get(), nullValue()); + + // Verify interactions + verify(projectResolver).getProjectId(); + verify(samplingService).updateSampleConfiguration( + eq(projectId), + eq(indexName), + eq(config), + eq(request.masterNodeTimeout()), + eq(request.ackTimeout()), + any() + ); + + // Verify cluster state change + ClusterState resultClusterState = capturedClusterState.get(); + assertThat(resultClusterState, not(nullValue())); + + ProjectMetadata resultProjectMetadata = resultClusterState.getMetadata().getProject(projectId); + assertThat(resultProjectMetadata, not(nullValue())); + + SamplingMetadata resultSamplingMetadata = resultProjectMetadata.custom(SamplingMetadata.TYPE); + assertThat(resultSamplingMetadata, not(nullValue())); + assertThat(resultSamplingMetadata.getIndexToSamplingConfigMap(), hasKey(indexName)); + assertThat(resultSamplingMetadata.getIndexToSamplingConfigMap().get(indexName), equalTo(config)); + } + + /** + * Tests checkBlock method (should return null for no blocks). + */ + public void testCheckBlock() { + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + createRandomSamplingConfiguration(), + randomTimeValue(100, 200), + randomTimeValue(100, 200) + ); + request.indices(new String[] { randomIdentifier() }); + ClusterState clusterState = ClusterState.EMPTY_STATE; + + // checkBlock should return null (no blocks) + assertThat(action.checkBlock(request, clusterState), nullValue()); + } + + /** + * Tests action name and configuration. + */ + public void testActionConfiguration() { + // Verify action is properly configured + assertThat(action.actionName, equalTo(PutSampleConfigurationAction.NAME)); + } + + private SamplingConfiguration createRandomSamplingConfiguration() { + return new SamplingConfiguration( + randomDoubleBetween(0.0, 1.0, true), + randomBoolean() ? null : randomIntBetween(1, SamplingConfiguration.MAX_SAMPLES_LIMIT), + randomBoolean() ? null : ByteSizeValue.ofGb(randomLongBetween(1, SamplingConfiguration.MAX_SIZE_LIMIT_GIGABYTES)), + randomBoolean() ? null : TimeValue.timeValueDays(randomLongBetween(1, SamplingConfiguration.MAX_TIME_TO_LIVE_DAYS)), + randomBoolean() ? null : randomAlphaOfLength(10) + ); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index a66f85e953e62..6b1b7ff490102 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -659,6 +659,7 @@ public class Constants { "internal:admin/repository/verify", "internal:admin/repository/verify/coordinate", "indices:admin/sample", + "indices:admin/sample/config/update", "indices:admin/sample/stats" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); }