diff --git a/docs/changelog/136426.yaml b/docs/changelog/136426.yaml new file mode 100644 index 0000000000000..a66a81af6b028 --- /dev/null +++ b/docs/changelog/136426.yaml @@ -0,0 +1,6 @@ +summary: "Docs: clarify ESQL time span alias rules (month m/mm vs minute min/mi/n), note case-insensitive matching and optional whitespace, and add unambiguous examples." +area: ES|QL +type: docs +issues: + - 135552 +pr: 136426 diff --git a/docs/reference/query-languages/esql/esql-time-spans.md b/docs/reference/query-languages/esql/esql-time-spans.md index e9da009e7deff..3c0e71e0c5805 100644 --- a/docs/reference/query-languages/esql/esql-time-spans.md +++ b/docs/reference/query-languages/esql/esql-time-spans.md @@ -110,17 +110,30 @@ POST /_query ``` +### Duration units and matching rules + +Duration literals can be written with or without a space between the number and the unit +(for example, `1m` is equivalent to `1 m`). Matching is **case-insensitive**. + ## Supported temporal units [esql-time-spans-table] | Temporal Units | Valid Abbreviations | | --- | --- | | year | y, yr, years | | quarter | q, quarters | -| month | mo, months | +| month | m, mm, month, months | | week | w, weeks | | day | d, days | | hour | h, hours | -| minute | min, minutes | +| minute | min, mi, n, minute, minutes | | second | s, sec, seconds | | millisecond | ms, milliseconds | +**Examples** + +```esql +FROM t | WHERE date_col > NOW() - 1m // month (same as: 1 m) +FROM t | WHERE ts > NOW() - 1mi // minute +FROM t | WHERE ts > NOW() - 1n // minute +FROM t | WHERE ts > NOW() - 500ms // millisecond +``` diff --git a/muted-tests.yml b/muted-tests.yml index 15891408a11ad..0aa4b990039c6 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -411,9 +411,6 @@ tests: - class: org.elasticsearch.xpack.search.CrossClusterAsyncSearchIT method: testCancelViaExpirationOnRemoteResultsWithMinimizeRoundtrips issue: https://github.com/elastic/elasticsearch/issues/127302 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT - method: testStopQueryLocal - issue: https://github.com/elastic/elasticsearch/issues/133481 - class: org.elasticsearch.xpack.esql.inference.rerank.RerankOperatorTests method: testSimpleCircuitBreaking issue: https://github.com/elastic/elasticsearch/issues/133619 @@ -567,9 +564,6 @@ tests: - class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT method: testTopNPushedToLuceneOnSortedIndex issue: https://github.com/elastic/elasticsearch/issues/135939 -- class: org.elasticsearch.xpack.esql.action.CrossClusterAsyncQueryStopIT - method: testStopQueryInlineStats - issue: https://github.com/elastic/elasticsearch/issues/135032 - class: org.elasticsearch.xpack.ml.integration.RegressionIT method: testAliasFields issue: https://github.com/elastic/elasticsearch/issues/135996 @@ -663,6 +657,15 @@ tests: - class: org.elasticsearch.TransportVersionTests method: testVersionComparison issue: https://github.com/elastic/elasticsearch/issues/136410 +- class: org.elasticsearch.xpack.restart.FullClusterRestartIT + method: testRollupAfterRestart {cluster=UPGRADED} + issue: https://github.com/elastic/elasticsearch/issues/136437 +- class: org.elasticsearch.cluster.coordination.NodeJoiningIT + method: testNodeTriesToJoinClusterAndThenSameMasterIsElected + issue: https://github.com/elastic/elasticsearch/issues/136332 +- class: org.elasticsearch.test.rest.yaml.RcsCcsCommonYamlTestSuiteIT + method: test {p0=search.vectors/200_dense_vector_docvalue_fields/Enable docvalue_fields parameter for dense_vector fields} + issue: https://github.com/elastic/elasticsearch/issues/136443 # Examples: # diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_sample_configuration.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_sample_configuration.json new file mode 100644 index 0000000000000..bfe4307ecc80d --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.put_sample_configuration.json @@ -0,0 +1,49 @@ +{ + "indices.put_sample_configuration": { + "documentation": { + "url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-put-sample-configuration", + "description": "Configure sampling for an index or data stream" + }, + "stability": "experimental", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ], + "content_type": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/{index}/_sample/config", + "methods": [ + "PUT" + ], + "parts": { + "index": { + "type": "string", + "description": "The name of a data stream or index" + } + } + } + ] + }, + "params": { + "master_timeout": { + "type": "time", + "description": "Timeout for connection to master node" + }, + "timeout": { + "type": "time", + "description": "Timeout for the request" + } + }, + "body": { + "description": "The sampling configuration", + "required": true + } + } +} + diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_sample_configuration/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_sample_configuration/10_basic.yml new file mode 100644 index 0000000000000..b8ea5106637cb --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.put_sample_configuration/10_basic.yml @@ -0,0 +1,211 @@ +--- +setup: + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + +--- +teardown: + - do: + indices.delete: + index: "*test*" + ignore_unavailable: true + allow_no_indices: true + +--- +"Test Put sampling configuration with JSON body": + - do: + indices.create: + index: test-index + body: + settings: + number_of_shards: 1 + + - do: + indices.put_sample_configuration: + index: test-index + body: + rate: 0.5 + max_samples: 100 + max_size: "10mb" + time_to_live: "1h" + + - match: { acknowledged: true } + +--- +"Put sampling configuration with condition": + - do: + indices.create: + index: test-condition-index + + - do: + indices.put_sample_configuration: + index: test-condition-index + body: + rate: 1.0 + max_samples: 50 + if: "ctx?.field == 'sample_me'" + + - match: { acknowledged: true } + +--- +"Put sampling configuration with minimal parameters": + - do: + indices.create: + index: test-minimal-index + + - do: + indices.put_sample_configuration: + index: test-minimal-index + body: + rate: 0.1 + + - match: { acknowledged: true } + +--- +"Put sampling configuration overwrites existing": + - do: + indices.create: + index: test-overwrite-index + + # First configuration + - do: + indices.put_sample_configuration: + index: test-overwrite-index + body: + rate: 0.3 + max_samples: 25 + + - match: { acknowledged: true } + + # Overwrite with new configuration + - do: + indices.put_sample_configuration: + index: test-overwrite-index + body: + rate: 0.8 + max_samples: 75 + max_size: "5mb" + + - match: { acknowledged: true } + +--- +"Put sampling configuration for non-existent index": + - do: + catch: missing + indices.put_sample_configuration: + index: non-existent-index + body: + rate: 0.6 + max_samples: 150 + +--- +"Put sampling configuration with timeout parameters": + - do: + indices.create: + index: test-timeout-index + + - do: + indices.put_sample_configuration: + index: test-timeout-index + master_timeout: "30s" + timeout: "10s" + body: + rate: 0.4 + max_samples: 80 + + - match: { acknowledged: true } + +--- +"Put sampling configuration with invalid rate fails": + - do: + indices.create: + index: test-invalid-rate-index + + - do: + catch: bad_request + indices.put_sample_configuration: + index: test-invalid-rate-index + body: + rate: 1.5 # Invalid rate > 1.0 + max_samples: 100 + +--- +"Put sampling configuration with missing rate fails": + - do: + indices.create: + index: test-missing-rate-index + + - do: + catch: bad_request + indices.put_sample_configuration: + index: test-missing-rate-index + body: + max_samples: 100 # Missing required rate parameter + +--- +"Put sampling configuration with human readable values": + - do: + indices.create: + index: test-human-readable-index + + - do: + indices.put_sample_configuration: + index: test-human-readable-index + body: + rate: ".05" + max_samples: 1000 + max_size: "10mb" + time_to_live: "1d" + + - match: { acknowledged: true } + +--- +"Put sampling configuration rejects multiple indices": + - do: + indices.create: + index: test-multi-index-1 + body: + settings: + number_of_shards: 1 + + - do: + indices.create: + index: test-multi-index-2 + body: + settings: + number_of_shards: 1 + + - do: + catch: bad_request + indices.put_sample_configuration: + index: "test-multi-index-1,test-multi-index-2" + body: + rate: 0.5 + max_samples: 100 + + - match: { error.type: "action_request_validation_exception" } + +--- +"Put sampling configuration rejects wildcard matching multiple indices": + - do: + indices.create: + index: wildcard-test-1 + body: + settings: + number_of_shards: 1 + + - do: + indices.create: + index: wildcard-test-2 + body: + settings: + number_of_shards: 1 + + - do: + catch: missing + indices.put_sample_configuration: + index: "wildcard-test-*" + body: + rate: 0.3 + max_samples: 50 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionIT.java new file mode 100644 index 0000000000000..c8344eb1d2745 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionIT.java @@ -0,0 +1,246 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class PutSampleConfigurationActionIT extends ESIntegTestCase { + + public void testPutSampleConfiguration() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Initially no sampling configuration should exist + assertSamplingConfigurationNotExists(indexName); + + // Create a sampling configuration + SamplingConfiguration config = new SamplingConfiguration(0.5d, 50, ByteSizeValue.ofMb(10), TimeValue.timeValueHours(1), null); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(indexName); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration should be acknowledged", response.isAcknowledged()); + + // Verify the configuration was stored + assertSamplingConfigurationExists(indexName, config); + } + + public void testPutSampleConfigurationOverwritesExisting() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Create initial configuration + SamplingConfiguration initialConfig = new SamplingConfiguration( + 0.3d, + 30, + ByteSizeValue.ofMb(5), + TimeValue.timeValueMinutes(30), + null + ); + PutSampleConfigurationAction.Request initialRequest = new PutSampleConfigurationAction.Request( + initialConfig, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + initialRequest.indices(indexName); + + AcknowledgedResponse initialResponse = client().execute(PutSampleConfigurationAction.INSTANCE, initialRequest).actionGet(); + assertTrue("Initial put should be acknowledged", initialResponse.isAcknowledged()); + assertSamplingConfigurationExists(indexName, initialConfig); + + // Overwrite with new configuration + SamplingConfiguration newConfig = new SamplingConfiguration(0.8d, 80, ByteSizeValue.ofMb(20), TimeValue.timeValueHours(2), null); + PutSampleConfigurationAction.Request updateRequest = new PutSampleConfigurationAction.Request( + newConfig, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + updateRequest.indices(indexName); + + AcknowledgedResponse updateResponse = client().execute(PutSampleConfigurationAction.INSTANCE, updateRequest).actionGet(); + assertTrue("Update put should be acknowledged", updateResponse.isAcknowledged()); + + // Verify the configuration was overwritten + assertSamplingConfigurationExists(indexName, newConfig); + } + + public void testPutSampleConfigurationWithCondition() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Create configuration with condition + String condition = "ctx?.field == 'sample_me'"; + SamplingConfiguration configWithCondition = new SamplingConfiguration( + 1.0d, + 100, + ByteSizeValue.ofMb(15), + TimeValue.timeValueHours(3), + condition + ); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + configWithCondition, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(indexName); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration with condition should be acknowledged", response.isAcknowledged()); + + // Verify the configuration with condition was stored + assertSamplingConfigurationExists(indexName, configWithCondition); + } + + public void testPutSampleConfigurationNonExistentIndex() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String nonExistentIndex = randomIdentifier(); + + // Don't create the index - test that we cannot set sampling config for non-existent indices + SamplingConfiguration config = new SamplingConfiguration(0.6d, 60, ByteSizeValue.ofMb(8), TimeValue.timeValueMinutes(45), null); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(nonExistentIndex); + + // This should now fail - sampling configs cannot be set for non-existent indices + expectThrows(Exception.class, () -> { client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); }); + + // Verify no configuration was stored + assertSamplingConfigurationNotExists(nonExistentIndex); + + // Now create the index and verify the config can be set + createIndex(nonExistentIndex); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration should be acknowledged for existing index", response.isAcknowledged()); + assertSamplingConfigurationExists(nonExistentIndex, config); + } + + public void testPutSampleConfigurationPersistsAcrossClusterStateUpdates() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Store sampling configuration + SamplingConfiguration config = new SamplingConfiguration(0.9d, 90, ByteSizeValue.ofMb(25), TimeValue.timeValueHours(4), null); + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + request.indices(indexName); + + AcknowledgedResponse response = client().execute(PutSampleConfigurationAction.INSTANCE, request).actionGet(); + assertTrue("Put sampling configuration should be acknowledged", response.isAcknowledged()); + assertSamplingConfigurationExists(indexName, config); + + // Trigger cluster state updates by creating additional indices + for (int i = 0; i < 3; i++) { + createIndex("dummy-index-" + i); + } + + // Verify sampling configuration still exists after cluster state changes + assertSamplingConfigurationExists(indexName, config); + } + + public void testPutSampleConfigurationWithSamplingIntegration() throws Exception { + assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG); + + String indexName = randomIdentifier(); + createIndex(indexName); + + // Put a sampling configuration that samples everything + SamplingConfiguration config = new SamplingConfiguration(1.0d, 100, ByteSizeValue.ofMb(50), TimeValue.timeValueDays(1), null); + PutSampleConfigurationAction.Request putRequest = new PutSampleConfigurationAction.Request( + config, + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(10) + ); + putRequest.indices(indexName); + + AcknowledgedResponse putResponse = client().execute(PutSampleConfigurationAction.INSTANCE, putRequest).actionGet(); + assertTrue("Put sampling configuration should be acknowledged", putResponse.isAcknowledged()); + + // Index some documents + int docsToIndex = randomIntBetween(5, 15); + for (int i = 0; i < docsToIndex; i++) { + indexDoc(indexName, "doc-" + i, "field1", "value" + i); + } + + // Verify the sampling configuration works by getting samples + GetSampleAction.Request getSampleRequest = new GetSampleAction.Request(indexName); + GetSampleAction.Response getSampleResponse = client().execute(GetSampleAction.INSTANCE, getSampleRequest).actionGet(); + + // Since we're sampling at 100%, we should get all documents sampled + assertEquals("All documents should be sampled", docsToIndex, getSampleResponse.getSample().size()); + } + + private void assertSamplingConfigurationExists(String indexName, SamplingConfiguration expectedConfig) { + ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + ClusterState clusterState = clusterService.state(); + ProjectMetadata projectMetadata = clusterState.metadata().getProject(ProjectId.DEFAULT); + assertThat("Project metadata should exist", projectMetadata, notNullValue()); + + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + assertThat("Sampling metadata should exist", samplingMetadata, notNullValue()); + + Map configMap = samplingMetadata.getIndexToSamplingConfigMap(); + assertTrue("Configuration should exist for index " + indexName, configMap.containsKey(indexName)); + + SamplingConfiguration actualConfig = configMap.get(indexName); + assertThat("Configuration should match expected for index " + indexName, actualConfig, equalTo(expectedConfig)); + } + + private void assertSamplingConfigurationNotExists(String indexName) { + ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + ClusterState clusterState = clusterService.state(); + ProjectMetadata projectMetadata = clusterState.metadata().getProject(ProjectId.DEFAULT); + + if (projectMetadata == null) { + return; // No project metadata means no sampling config + } + + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + if (samplingMetadata == null) { + return; // No sampling metadata means no sampling config + } + + Map configMap = samplingMetadata.getIndexToSamplingConfigMap(); + assertThat("Configuration should not exist for index " + indexName, configMap.get(indexName), nullValue()); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 757b8911a540d..2a0ea06bcfadc 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -131,10 +131,13 @@ import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction; import org.elasticsearch.action.admin.indices.sampling.GetSampleAction; import org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.PutSampleConfigurationAction; import org.elasticsearch.action.admin.indices.sampling.RestGetSampleAction; import org.elasticsearch.action.admin.indices.sampling.RestGetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.RestPutSampleConfigurationAction; import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleAction; import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.TransportPutSampleConfigurationAction; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; @@ -824,6 +827,7 @@ public void reg if (RANDOM_SAMPLING_FEATURE_FLAG) { actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class); + actions.register(PutSampleConfigurationAction.INSTANCE, TransportPutSampleConfigurationAction.class); actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class); } @@ -1057,6 +1061,7 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< if (RANDOM_SAMPLING_FEATURE_FLAG) { registerHandler.accept(new RestGetSampleAction()); + registerHandler.accept(new RestPutSampleConfigurationAction()); registerHandler.accept(new RestGetSampleStatsAction()); } } @@ -1098,7 +1103,7 @@ public ReservedClusterStateService getReservedClusterStateService() { return reservedClusterStateService; } - public List getNamedWriteables() { + public static List getNamedWriteables() { return List.of( new NamedWriteableRegistry.Entry( ExtendedSearchUsageMetric.class, diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationAction.java new file mode 100644 index 0000000000000..4fa2f64e0e158 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationAction.java @@ -0,0 +1,194 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.core.TimeValue; + +import java.io.IOException; +import java.util.Objects; + +/** + * Action for configuring sampling settings on indices. + *

+ * This action allows administrators to configure sampling parameters for one or more indices, + * including sampling rate, maximum number of samples, maximum size constraints, time-to-live + * settings, and conditional sampling criteria. + *

+ *

+ * The action name is "indices:admin/sample/config/update" and it returns an {@link AcknowledgedResponse} + * to indicate whether the configuration was successfully applied. + *

+ */ +public class PutSampleConfigurationAction extends ActionType { + /** + * The action name used to identify this action in the transport layer. + */ + public static final String NAME = "indices:admin/sample/config/update"; + + /** + * Singleton instance of this action type. + */ + public static final PutSampleConfigurationAction INSTANCE = new PutSampleConfigurationAction(); + + /** + * Constructs a new PutSampleConfigurationAction with the predefined action name. + */ + public PutSampleConfigurationAction() { + super(NAME); + } + + /** + * Request class for configuring sampling settings on indices. + *

+ * This request encapsulates all the parameters needed to configure sampling on one or more indices, + * including the sampling configuration itself and the target indices. It implements + * {@link Replaceable} to support index name resolution and expansion. + *

+ */ + public static class Request extends AcknowledgedRequest implements IndicesRequest.Replaceable { + private final SamplingConfiguration samplingConfiguration; + private String index; + + /** + * Constructs a new request with the specified sampling configuration parameters. + * + * @param samplingConfiguration the sampling configuration to apply + * @param masterNodeTimeout the timeout for master node operations, or null for default + * @param ackTimeout the timeout for acknowledgment, or null for default + */ + public Request(SamplingConfiguration samplingConfiguration, @Nullable TimeValue masterNodeTimeout, @Nullable TimeValue ackTimeout) { + super(masterNodeTimeout, ackTimeout); + Objects.requireNonNull(samplingConfiguration, "samplingConfiguration must not be null"); + this.samplingConfiguration = samplingConfiguration; + } + + /** + * Constructs a new request by deserializing from a stream input. + * + * @param in the stream input to read from + * @throws IOException if an I/O error occurs during deserialization + */ + public Request(StreamInput in) throws IOException { + super(in); + this.index = in.readString(); + this.samplingConfiguration = new SamplingConfiguration(in); + } + + /** + * Serializes this request to a stream output. + * + * @param out the stream output to write to + * @throws IOException if an I/O error occurs during serialization + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + samplingConfiguration.writeTo(out); + } + + /** + * Returns the array of target indices for this sampling configuration request. + * + * @return an array of index names, never null but may be empty + */ + @Override + public String[] indices() { + return index == null ? null : new String[] { index }; + } + + /** + * Sets the target indices or data streams for this sampling configuration request. + * + * @param indices the names of indices or data streams to target + * @return this request instance for method chaining + */ + @Override + public Request indices(String... indices) { + if (indices == null || indices.length != 1) { + throw new IllegalArgumentException("[indices] must contain only one index"); + } + this.index = indices[0]; + return this; + } + + /** + * Indicates whether this request should include data streams in addition to regular indices. + * + * @return true to include data streams + */ + @Override + public boolean includeDataStreams() { + return true; + } + + /** + * Returns the indices options for this request, which control how index names are resolved and expanded. + * + * @return the indices options, configured to be strict about single index, no expansion, forbid closed indices, and allow selectors + */ + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS; + } + + /** + * Returns the sampling configuration encapsulated by this request. + * + * @return the sampling configuration + */ + public SamplingConfiguration getSampleConfiguration() { + return samplingConfiguration; + } + + /** + * Compares this request with another object for equality. + *

+ * Two requests are considered equal if they have the same target indices and + * sampling configuration parameters. + *

+ * + * @param o the object to compare with + * @return true if the objects are equal, false otherwise + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request that = (Request) o; + return Objects.equals(index, that.index) + && Objects.equals(samplingConfiguration, that.samplingConfiguration) + && Objects.equals(masterNodeTimeout(), that.masterNodeTimeout()) + && Objects.equals(ackTimeout(), that.ackTimeout()); + } + + /** + * Returns the hash code for this request. + *

+ * The hash code is computed based on the target indices, sampling configuration, + * master node timeout, and acknowledgment timeout. + *

+ * + * @return the hash code value + */ + @Override + public int hashCode() { + return Objects.hash(index, samplingConfiguration, masterNodeTimeout(), ackTimeout()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestPutSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestPutSampleConfigurationAction.java new file mode 100644 index 0000000000000..be1e1a1c85727 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestPutSampleConfigurationAction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.elasticsearch.rest.RestRequest.Method.PUT; +import static org.elasticsearch.rest.RestUtils.getAckTimeout; +import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout; + +/** + * REST action for updating sampling configurations for indices. + *

+ * Handles PUT requests to /{index}/_sample/config endpoint and delegates + * to the PutSampleConfigurationAction transport action. + *

+ * + *

Example usage:

+ *
{@code
+ * PUT /my-index/_sample/config
+ * {
+ *   "rate": ".05",
+ *   "max_samples": 1000,
+ *   "max_size": "10mb",
+ *   "time_to_live": "1d",
+ *   "if": "ctx?.network?.name == 'Guest'"
+ * }
+ * }
+ */ +@ServerlessScope(Scope.INTERNAL) +public class RestPutSampleConfigurationAction extends BaseRestHandler { + + @Override + public List routes() { + return List.of(new Route(PUT, "/{index}/_sample/config")); + } + + @Override + public String getName() { + return "put_sample_configuration_action"; + } + + @Override + public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + String[] indexNames = request.param("index").split(","); + if (indexNames.length > 1) { + throw new ActionRequestValidationException().addValidationError( + "Can only set sampling configuration for a single index at a time, but found " + + Arrays.stream(indexNames).collect(Collectors.joining(", ", "[", "]")) + ); + } + PutSampleConfigurationAction.Request putRequest; + + XContentParser parser = request.contentParser(); + SamplingConfiguration samplingConfig = SamplingConfiguration.fromXContent(parser); + putRequest = new PutSampleConfigurationAction.Request(samplingConfig, getMasterNodeTimeout(request), getAckTimeout(request)); + + // Set the target index + putRequest.indices(indexNames); + + // TODO: Make this cancellable e.g. RestGetSampleStatsAction + return channel -> client.execute(PutSampleConfigurationAction.INSTANCE, putRequest, new RestToXContentListener<>(channel)); + } + +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java new file mode 100644 index 0000000000000..db08bab3a5fc1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationAction.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.logging.LogManager; +import org.elasticsearch.logging.Logger; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +/** + * Transport action for updating sampling configurations in cluster metadata. + *

+ * This action handles the cluster state update required to store sampling configurations + * for the specified indices. It validates the request, resolves index names, and updates + * the cluster metadata with the new sampling configuration. + *

+ */ +public class TransportPutSampleConfigurationAction extends AcknowledgedTransportMasterNodeAction { + private static final Logger logger = LogManager.getLogger(TransportPutSampleConfigurationAction.class); + private final ProjectResolver projectResolver; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SamplingService samplingService; + + @Inject + public TransportPutSampleConfigurationAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + IndexNameExpressionResolver indexNameExpressionResolver, + SamplingService samplingService + ) { + super( + PutSampleConfigurationAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + PutSampleConfigurationAction.Request::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.projectResolver = projectResolver; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.samplingService = samplingService; + } + + @Override + protected void masterOperation( + Task task, + PutSampleConfigurationAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + // throws IndexNotFoundException if any index does not exist or more than one index is resolved + try { + indexNameExpressionResolver.concreteIndexNames(state, request); + } catch (IndexNotFoundException e) { + listener.onFailure(e); + return; + } + + ProjectId projectId = projectResolver.getProjectId(); + samplingService.updateSampleConfiguration( + projectId, + request.indices()[0], + request.getSampleConfiguration(), + request.masterNodeTimeout(), + request.ackTimeout(), + listener + ); + } + + @Override + protected ClusterBlockException checkBlock(PutSampleConfigurationAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE); + } + +} diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java index c3f4055c8d061..3c53d063aac6c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster; +import org.elasticsearch.action.ActionModule; import org.elasticsearch.action.admin.indices.rollover.MetadataRolloverService; import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.cluster.action.index.MappingUpdatedAction; @@ -321,6 +322,9 @@ public static List getNamedWriteables() { // Streams registerProjectCustom(entries, StreamsMetadata.TYPE, StreamsMetadata::new, StreamsMetadata::readDiffFrom); + // Actions + entries.addAll(ActionModule.getNamedWriteables()); + return entries; } diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index d1cf0a1ce0477..9a1ed81175570 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -9,20 +9,31 @@ package org.elasticsearch.ingest; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.AckedBatchedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateAckListener; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.service.MasterServiceTaskQueue; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; @@ -42,9 +53,13 @@ import java.io.IOException; import java.lang.ref.SoftReference; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; @@ -59,6 +74,16 @@ public class SamplingService implements ClusterStateListener { private final ProjectResolver projectResolver; private final LongSupplier relativeMillisTimeSupplier; private final LongSupplier statsTimeSupplier = System::nanoTime; + private final MasterServiceTaskQueue updateSamplingConfigurationTaskQueue; + + private static final Setting MAX_CONFIGURATIONS_SETTING = Setting.intSetting( + "sampling.max_configurations", + 100, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + /* * This Map contains the samples that exist on this node. They are not persisted to disk. They are stored as SoftReferences so that * sampling does not contribute to a node running out of memory. The idea is that access to samples is desirable, but not critical. We @@ -76,6 +101,11 @@ public SamplingService( this.clusterService = clusterService; this.projectResolver = projectResolver; this.relativeMillisTimeSupplier = relativeMillisTimeSupplier; + this.updateSamplingConfigurationTaskQueue = clusterService.createTaskQueue( + "update-sampling-configuration", + Priority.NORMAL, + new UpdateSamplingConfigurationExecutor() + ); } /** @@ -262,9 +292,98 @@ public boolean atLeastOneSampleConfigured() { } } + public void updateSampleConfiguration( + ProjectId projectId, + String index, + SamplingConfiguration samplingConfiguration, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + ActionListener listener + ) { + // Early validation: check if adding a new configuration would exceed the limit + ClusterState clusterState = clusterService.state(); + boolean maxConfigLimitBreached = checkMaxConfigLimitBreached(projectId, index, clusterState); + if (maxConfigLimitBreached) { + Integer maxConfigurations = MAX_CONFIGURATIONS_SETTING.get(clusterState.getMetadata().settings()); + listener.onFailure( + new IllegalStateException( + "Cannot add sampling configuration for index [" + + index + + "]. Maximum number of sampling configurations (" + + maxConfigurations + + ") already reached." + ) + ); + return; + } + + updateSamplingConfigurationTaskQueue.submitTask( + "Updating Sampling Configuration", + new UpdateSamplingConfigurationTask(projectId, index, samplingConfiguration, ackTimeout, listener), + masterNodeTimeout + ); + } + @Override public void clusterChanged(ClusterChangedEvent event) { - // TODO: React to sampling config changes + if (RANDOM_SAMPLING_FEATURE_FLAG == false) { + return; + } + if (samples.isEmpty()) { + return; + } + // We want to remove any samples if their sampling configuration has been deleted or modified. + if (event.metadataChanged()) { + /* + * First, we collect the union of all project ids in the current state and the previous one. We include the project ids from the + * previous state in case an entire project has been deleted -- in that case we would want to delete all of its samples. + */ + Set allProjectIds = Sets.union( + event.state().metadata().projects().keySet(), + event.previousState().metadata().projects().keySet() + ); + for (ProjectId projectId : allProjectIds) { + if (event.customMetadataChanged(projectId, SamplingMetadata.TYPE)) { + Map oldSampleConfigsMap = Optional.ofNullable( + event.previousState().metadata().projects().get(projectId) + ) + .map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE)) + .map(SamplingMetadata::getIndexToSamplingConfigMap) + .orElse(Map.of()); + Map newSampleConfigsMap = Optional.ofNullable( + event.state().metadata().projects().get(projectId) + ) + .map(p -> (SamplingMetadata) p.custom(SamplingMetadata.TYPE)) + .map(SamplingMetadata::getIndexToSamplingConfigMap) + .orElse(Map.of()); + Set indicesWithRemovedConfigs = new HashSet<>(oldSampleConfigsMap.keySet()); + indicesWithRemovedConfigs.removeAll(newSampleConfigsMap.keySet()); + /* + * These index names no longer have sampling configurations associated with them. So we remove their samples. We are OK + * with the fact that we have a race condition here -- it is possible that in maybeSample() the configuration still + * exists but before the sample is read from samples it is deleted by this method and gets recreated. In the worst case + * we'll have a small amount of memory being used until the sampling configuration is recreated or the TTL checker + * reclaims it. The advantage is that we can avoid locking here, which could slow down ingestion. + */ + for (String indexName : indicesWithRemovedConfigs) { + logger.debug("Removing sample info for {} because its configuration has been removed", indexName); + samples.remove(new ProjectIndex(projectId, indexName)); + } + /* + * Now we check if any of the sampling configurations have changed. If they have, we remove the existing sample. Same as + * above, we have a race condition here that we can live with. + */ + for (Map.Entry entry : newSampleConfigsMap.entrySet()) { + String indexName = entry.getKey(); + if (entry.getValue().equals(oldSampleConfigsMap.get(indexName)) == false) { + logger.debug("Removing sample info for {} because its configuration has changed", indexName); + samples.remove(new ProjectIndex(projectId, indexName)); + } + } + } + } + // TODO: If an index has been deleted, we want to remove its sampling configuration + } } private boolean evaluateCondition( @@ -294,6 +413,29 @@ private static Script getScript(String conditional) throws IOException { } } + // Checks whether the maximum number of sampling configurations has been reached for the given project. + // If the limit is breached, it notifies the listener with an IllegalStateException and returns true. + private static boolean checkMaxConfigLimitBreached(ProjectId projectId, String index, ClusterState currentState) { + Metadata currentMetadata = currentState.metadata(); + ProjectMetadata projectMetadata = currentMetadata.getProject(projectId); + + if (projectMetadata != null) { + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + Map existingConfigs = samplingMetadata != null + ? samplingMetadata.getIndexToSamplingConfigMap() + : Map.of(); + + boolean isUpdate = existingConfigs.containsKey(index); + Integer maxConfigurations = MAX_CONFIGURATIONS_SETTING.get(currentMetadata.settings()); + + // Only check limit for new configurations, not updates + if (isUpdate == false && existingConfigs.size() >= maxConfigurations) { + return true; + } + } + return false; + } + /* * This represents a raw document as the user sent it to us in an IndexRequest. It only holds onto the information needed for the * sampling API, rather than holding all of the fields a user might send in an IndexRequest. @@ -759,6 +901,110 @@ void setScript(Script script, IngestConditionalScript.Factory factory) { } } + static class UpdateSamplingConfigurationTask extends AckedBatchedClusterStateUpdateTask { + private final ProjectId projectId; + private final String indexName; + private final SamplingConfiguration samplingConfiguration; + + UpdateSamplingConfigurationTask( + ProjectId projectId, + String indexName, + SamplingConfiguration samplingConfiguration, + TimeValue ackTimeout, + ActionListener listener + ) { + super(ackTimeout, listener); + this.projectId = projectId; + this.indexName = indexName; + this.samplingConfiguration = samplingConfiguration; + } + } + + static class UpdateSamplingConfigurationExecutor extends SimpleBatchedAckListenerTaskExecutor { + private static final Logger logger = LogManager.getLogger(UpdateSamplingConfigurationExecutor.class); + + UpdateSamplingConfigurationExecutor() {} + + @Override + public Tuple executeTask( + UpdateSamplingConfigurationTask updateSamplingConfigurationTask, + ClusterState clusterState + ) { + logger.debug( + "Updating sampling configuration for index [{}] with rate [{}]," + + " maxSamples [{}], maxSize [{}], timeToLive [{}], condition[{}]", + updateSamplingConfigurationTask.indexName, + updateSamplingConfigurationTask.samplingConfiguration.rate(), + updateSamplingConfigurationTask.samplingConfiguration.maxSamples(), + updateSamplingConfigurationTask.samplingConfiguration.maxSize(), + updateSamplingConfigurationTask.samplingConfiguration.timeToLive(), + updateSamplingConfigurationTask.samplingConfiguration.condition() + ); + + // Get sampling metadata + Metadata metadata = clusterState.getMetadata(); + ProjectMetadata projectMetadata = metadata.getProject(updateSamplingConfigurationTask.projectId); + ; + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + + boolean isNewConfiguration = samplingMetadata == null; // for logging + int existingConfigCount = isNewConfiguration ? 0 : samplingMetadata.getIndexToSamplingConfigMap().size(); + logger.trace( + "Current sampling metadata state: {} (number of existing configurations: {})", + isNewConfiguration ? "null" : "exists", + existingConfigCount + ); + + // Update with new sampling configuration if it exists or create new sampling metadata with the configuration + Map updatedConfigMap = new HashMap<>(); + if (samplingMetadata != null) { + updatedConfigMap.putAll(samplingMetadata.getIndexToSamplingConfigMap()); + } + boolean isUpdate = updatedConfigMap.containsKey(updateSamplingConfigurationTask.indexName); + + Integer maxConfigurations = MAX_CONFIGURATIONS_SETTING.get(metadata.settings()); + // check if adding a new configuration would exceed the limit + boolean maxConfigLimitBreached = checkMaxConfigLimitBreached( + updateSamplingConfigurationTask.projectId, + updateSamplingConfigurationTask.indexName, + clusterState + ); + if (maxConfigLimitBreached) { + throw new IllegalStateException( + "Cannot add sampling configuration for index [" + + updateSamplingConfigurationTask.indexName + + "]. Maximum number of sampling configurations (" + + maxConfigurations + + ") already reached." + ); + } + updatedConfigMap.put(updateSamplingConfigurationTask.indexName, updateSamplingConfigurationTask.samplingConfiguration); + + logger.trace( + "{} sampling configuration for index [{}], total configurations after update: {}", + isUpdate ? "Updated" : "Added", + updateSamplingConfigurationTask.indexName, + updatedConfigMap.size() + ); + + SamplingMetadata newSamplingMetadata = new SamplingMetadata(updatedConfigMap); + + // Update cluster state + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, newSamplingMetadata); + + // Return tuple with updated cluster state and the original listener + ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build(); + + logger.debug( + "Successfully {} sampling configuration for index [{}]", + isUpdate ? "updated" : "created", + updateSamplingConfigurationTask.indexName + ); + return new Tuple<>(updatedClusterState, updateSamplingConfigurationTask); + } + } + /* * This is meant to be used internally as the key of the map of samples */ diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionTests.java new file mode 100644 index 0000000000000..ffe446a9cc2b0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/PutSampleConfigurationActionTests.java @@ -0,0 +1,123 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class PutSampleConfigurationActionTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return PutSampleConfigurationAction.Request::new; + } + + @Override + protected PutSampleConfigurationAction.Request createTestInstance() { + return createRandomRequest(); + } + + @Override + protected PutSampleConfigurationAction.Request mutateInstance(PutSampleConfigurationAction.Request instance) { + return switch (randomIntBetween(0, 1)) { + case 0 -> { + PutSampleConfigurationAction.Request mutated = new PutSampleConfigurationAction.Request( + randomValueOtherThan(instance.getSampleConfiguration(), () -> createRandomSampleConfig()), + instance.masterNodeTimeout(), + instance.ackTimeout() + ); + mutated.indices(instance.indices()); + yield mutated; + } + case 1 -> { + PutSampleConfigurationAction.Request mutated = new PutSampleConfigurationAction.Request( + instance.getSampleConfiguration(), + instance.masterNodeTimeout(), + instance.ackTimeout() + ); + mutated.indices(randomValueOtherThan(instance.indices(), () -> new String[] { randomAlphaOfLengthBetween(1, 10) })); + yield mutated; + } + default -> throw new IllegalStateException("Invalid mutation case"); + }; + } + + private PutSampleConfigurationAction.Request createRandomRequest() { + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request( + createRandomSampleConfig(), + TimeValue.timeValueSeconds(randomIntBetween(1, 60)), + TimeValue.timeValueSeconds(randomIntBetween(1, 60)) + ); + + // Randomly set some indices + request.indices(randomAlphaOfLengthBetween(1, 10)); + + return request; + } + + public void testActionName() { + assertThat(PutSampleConfigurationAction.NAME, equalTo("indices:admin/sample/config/update")); + assertThat(PutSampleConfigurationAction.INSTANCE.name(), equalTo(PutSampleConfigurationAction.NAME)); + } + + public void testActionInstance() { + assertThat(PutSampleConfigurationAction.INSTANCE, notNullValue()); + assertThat(PutSampleConfigurationAction.INSTANCE, sameInstance(PutSampleConfigurationAction.INSTANCE)); + } + + public void testRequestIndicesOptions() { + PutSampleConfigurationAction.Request request = createRandomRequest(); + assertThat(request.indicesOptions(), equalTo(IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS)); + assertThat(request.includeDataStreams(), is(true)); + } + + public void testRequestTaskCreation() { + PutSampleConfigurationAction.Request request = createRandomRequest(); + + long id = randomLong(); + String type = randomAlphaOfLength(5); + String action = randomAlphaOfLength(10); + TaskId parentTaskId = randomBoolean() ? null : new TaskId(randomAlphaOfLength(5), randomLong()); + Map headers = Map.of("header1", "value1"); + + Task task = request.createTask(id, type, action, parentTaskId, headers); + + assertThat(task, notNullValue()); + assertThat(task.getId(), equalTo(id)); + assertThat(task.getType(), equalTo(type)); + assertThat(task.getAction(), equalTo(action)); + assertThat(task.getParentTaskId(), equalTo(parentTaskId)); + assertThat(task.headers(), equalTo(headers)); + } + + private static SamplingConfiguration createRandomSampleConfig() { + return new SamplingConfiguration( + randomDoubleBetween(0.0, 1.0, true), + randomBoolean() ? null : randomIntBetween(1, 1000), + randomBoolean() ? null : ByteSizeValue.ofGb(randomIntBetween(1, 5)), + randomBoolean() ? null : new TimeValue(randomIntBetween(1, 30), TimeUnit.DAYS), + randomBoolean() ? randomAlphaOfLength(10) : null + ); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java new file mode 100644 index 0000000000000..db22b6e1f8303 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/TransportPutSampleConfigurationActionTests.java @@ -0,0 +1,210 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for TransportPutSampleConfigurationAction. + * Tests the logic in isolation using mocks for dependencies. + */ +public class TransportPutSampleConfigurationActionTests extends ESTestCase { + + private ProjectResolver projectResolver; + private IndexNameExpressionResolver indexNameExpressionResolver; + private SamplingService samplingService; + private TransportPutSampleConfigurationAction action; + + @Override + public void setUp() throws Exception { + super.setUp(); + + TransportService transportService = mock(TransportService.class); + ClusterService clusterService = mock(ClusterService.class); + ThreadPool threadPool = mock(ThreadPool.class); + ActionFilters actionFilters = mock(ActionFilters.class); + projectResolver = mock(ProjectResolver.class); + indexNameExpressionResolver = mock(IndexNameExpressionResolver.class); + samplingService = mock(SamplingService.class); + + // Mock thread context + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + when(threadPool.getThreadContext()).thenReturn(threadContext); + + action = new TransportPutSampleConfigurationAction( + transportService, + clusterService, + threadPool, + actionFilters, + projectResolver, + indexNameExpressionResolver, + samplingService + ); + } + + /** + * Tests successful masterOperation execution and verifies sampling metadata updates. + */ + public void testMasterOperationSuccess() throws Exception { + // Setup test data + ProjectId projectId = randomProjectIdOrDefault(); + String indexName = randomIdentifier(); + SamplingConfiguration config = createRandomSamplingConfiguration(); + TimeValue masterNodeTimeout = randomTimeValue(100, 200); + TimeValue ackTimeout = randomTimeValue(100, 200); + + PutSampleConfigurationAction.Request request = new PutSampleConfigurationAction.Request(config, masterNodeTimeout, ackTimeout); + request.indices(indexName); + + // Create initial cluster state with project metadata + ProjectMetadata initialProjectMetadata = ProjectMetadata.builder(projectId).build(); + ClusterState initialClusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(initialProjectMetadata) + .build(); + + Task task = mock(Task.class); + + // Setup mocks + when(projectResolver.getProjectId()).thenReturn(projectId); + + // Create updated cluster state with sampling metadata + Map indexToSampleConfigMap = new HashMap<>(); + indexToSampleConfigMap.put(indexName, config); + SamplingMetadata samplingMetadata = new SamplingMetadata(indexToSampleConfigMap); + + ProjectMetadata updatedProjectMetadata = ProjectMetadata.builder(projectId) + .putCustom(SamplingMetadata.TYPE, samplingMetadata) + .build(); + ClusterState updatedClusterState = ClusterState.builder(initialClusterState).putProjectMetadata(updatedProjectMetadata).build(); + + // Mock samplingService to simulate the actual cluster state update + AtomicReference capturedClusterState = new AtomicReference<>(); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(5); + + // Simulate the sampling service updating cluster state and calling listener + capturedClusterState.set(updatedClusterState); + listener.onResponse(AcknowledgedResponse.TRUE); + return null; + }).when(samplingService) + .updateSampleConfiguration( + eq(projectId), + eq(indexName), + eq(config), + eq(request.masterNodeTimeout()), + eq(request.ackTimeout()), + any() + ); + + // Test execution with CountDownLatch to handle async operation + CountDownLatch latch = new CountDownLatch(1); + AtomicReference responseRef = new AtomicReference<>(); + AtomicReference exceptionRef = new AtomicReference<>(); + + ActionListener testListener = new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse response) { + responseRef.set(response); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }; + + action.masterOperation(task, request, initialClusterState, testListener); + + // Wait for async operation to complete + assertTrue("Operation should complete within timeout", latch.await(5, TimeUnit.SECONDS)); + + // Verify results + assertThat(responseRef.get(), not(nullValue())); + assertThat(responseRef.get().isAcknowledged(), equalTo(true)); + assertThat(exceptionRef.get(), nullValue()); + + // Verify interactions + verify(projectResolver).getProjectId(); + verify(samplingService).updateSampleConfiguration( + eq(projectId), + eq(indexName), + eq(config), + eq(request.masterNodeTimeout()), + eq(request.ackTimeout()), + any() + ); + + // Verify cluster state change + ClusterState resultClusterState = capturedClusterState.get(); + assertThat(resultClusterState, not(nullValue())); + + ProjectMetadata resultProjectMetadata = resultClusterState.getMetadata().getProject(projectId); + assertThat(resultProjectMetadata, not(nullValue())); + + SamplingMetadata resultSamplingMetadata = resultProjectMetadata.custom(SamplingMetadata.TYPE); + assertThat(resultSamplingMetadata, not(nullValue())); + assertThat(resultSamplingMetadata.getIndexToSamplingConfigMap(), hasKey(indexName)); + assertThat(resultSamplingMetadata.getIndexToSamplingConfigMap().get(indexName), equalTo(config)); + } + + /** + * Tests action name and configuration. + */ + public void testActionConfiguration() { + // Verify action is properly configured + assertThat(action.actionName, equalTo(PutSampleConfigurationAction.NAME)); + } + + private SamplingConfiguration createRandomSamplingConfiguration() { + return new SamplingConfiguration( + randomDoubleBetween(0.0, 1.0, true), + randomBoolean() ? null : randomIntBetween(1, SamplingConfiguration.MAX_SAMPLES_LIMIT), + randomBoolean() ? null : ByteSizeValue.ofGb(randomLongBetween(1, SamplingConfiguration.MAX_SIZE_LIMIT_GIGABYTES)), + randomBoolean() ? null : TimeValue.timeValueDays(randomLongBetween(1, SamplingConfiguration.MAX_TIME_TO_LIVE_DAYS)), + randomBoolean() ? null : randomAlphaOfLength(10) + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java index d15874dcf7942..ad4309740f608 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; @@ -256,6 +258,85 @@ public void testMaybeSampleMaxSize() { assertThat(samplingService.getLocalSampleStats(projectId, indexName).getSamplesRejectedForSize(), equalTo((long) maxSamples - 2)); } + public void testClusterChanged() { + String indexName = randomIdentifier(); + SamplingService samplingService = getTestSamplingService(); + Map inputRawDocSource = randomMap(1, 100, () -> Tuple.tuple(randomAlphaOfLength(10), randomAlphaOfLength(10))); + final IndexRequest indexRequest = new IndexRequest(indexName).id("_id").source(inputRawDocSource); + + // Test that the sample is removed if the new state does not have the project that the sample was configured in: + ProjectMetadata projectMetadata = ProjectMetadata.builder(ProjectId.fromId(randomIdentifier())) + .putCustom( + SamplingMetadata.TYPE, + new SamplingMetadata( + Map.of( + indexName, + new SamplingConfiguration( + 1.0, + randomIntBetween(1, 1000), + ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)), + TimeValue.timeValueDays(randomIntBetween(1, 10)), + null + ) + ) + ) + ) + .build(); + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + ClusterState oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + ClusterState newState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(ProjectMetadata.builder(ProjectId.fromId(randomIdentifier()))) + .build(); + ClusterChangedEvent event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L)); + + // Test that the sample is removed if the sampling metadata is removed from the project: + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(ProjectMetadata.builder(projectMetadata.id())).build(); + event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L)); + + // Test that the sample is removed if the sampling configuration is changed + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + projectMetadata = ProjectMetadata.builder(projectMetadata.id()) + .putCustom( + SamplingMetadata.TYPE, + new SamplingMetadata( + Map.of( + indexName, + new SamplingConfiguration( + 1.0, + 1001, + ByteSizeValue.ofBytes(randomLongBetween(100, 1000000)), + TimeValue.timeValueDays(randomIntBetween(1, 10)), + null + ) + ) + ) + ) + .build(); + newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(0L)); + + // Test that the sample is _not_ removed if the sampling configuration does not change: + samplingService.maybeSample(projectMetadata, indexRequest); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + oldState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + newState = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(projectMetadata).build(); + event = new ClusterChangedEvent("test", newState, oldState); + samplingService.clusterChanged(event); + assertThat(samplingService.getLocalSampleStats(projectMetadata.id(), indexName).getSamples(), equalTo(1L)); + } + private SamplingService getTestSamplingService() { final ScriptService scriptService = new ScriptService( Settings.EMPTY, diff --git a/server/src/test/java/org/elasticsearch/ingest/UpdateSamplingConfigurationExecutorTests.java b/server/src/test/java/org/elasticsearch/ingest/UpdateSamplingConfigurationExecutorTests.java new file mode 100644 index 0000000000000..68401dd54eb96 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/ingest/UpdateSamplingConfigurationExecutorTests.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.ingest; + +import org.elasticsearch.action.admin.indices.sampling.SamplingConfiguration; +import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.test.ESTestCase; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.HashMap; +import java.util.Map; +import java.util.function.LongSupplier; + +import static org.hamcrest.Matchers.containsString; +import static org.mockito.Mockito.when; + +public class UpdateSamplingConfigurationExecutorTests extends ESTestCase { + + @Mock + private ProjectResolver projectResolver; + + @Mock + private ClusterService clusterService; + + @Mock + private ScriptService scriptService; + + @Mock + private LongSupplier timeSupplier; + + @Mock + private ClusterState clusterState; + + @Mock + private Metadata metadata; + + private SamplingService.UpdateSamplingConfigurationExecutor executor; + + @Override + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.openMocks(this); + + // Create a real SamplingService with mocked dependencies + executor = new SamplingService.UpdateSamplingConfigurationExecutor(); + + // Set up settings with MAX_CONFIGURATIONS_SETTING = 100 (the default value) + Settings settings = Settings.builder().put("sampling.max_configurations", 100).build(); + + // Mock the complete chain for checkMaxConfigLimitBreached() method: + // clusterService.state() -> ClusterState + when(clusterService.state()).thenReturn(clusterState); + // currentState.metadata() -> Metadata + when(clusterState.metadata()).thenReturn(metadata); + // currentMetadata.settings() -> Settings (for MAX_CONFIGURATIONS_SETTING) + when(metadata.settings()).thenReturn(settings); + + // Also mock for the executor's executeTask method + when(clusterState.getMetadata()).thenReturn(metadata); + } + + public void testExecuteTaskWithUpperBoundLimit() { + // Create a project metadata with 100 existing sampling configurations (the maximum) + Map existingConfigs = new HashMap<>(); + SamplingConfiguration defaultConfig = new SamplingConfiguration( + 0.5d, + 50, + ByteSizeValue.ofMb(10), + TimeValue.timeValueHours(1), + null + ); + + // Add 100 configurations to reach the limit + for (int i = 0; i < 100; i++) { + existingConfigs.put("existing-index-" + i, defaultConfig); + } + + SamplingMetadata existingSamplingMetadata = new SamplingMetadata(existingConfigs); + ProjectMetadata projectMetadata = ProjectMetadata.builder(ProjectId.DEFAULT) + .putCustom(SamplingMetadata.TYPE, existingSamplingMetadata) + .build(); + + // Mock both methods that get project metadata: + // 1. For the executor's executeTask method: clusterState.metadata().getProject() + when(metadata.getProject(ProjectId.DEFAULT)).thenReturn(projectMetadata); + // 2. For the checkMaxConfigLimitBreached method: currentMetadata.getProject() + // (this uses the same metadata mock, so the above mock covers both) + + // Try to add the 101st configuration - this should fail + String newIndexName = "new-index-101"; + SamplingConfiguration newConfig = new SamplingConfiguration(0.8d, 80, ByteSizeValue.ofMb(20), TimeValue.timeValueHours(2), null); + + SamplingService.UpdateSamplingConfigurationTask task = new SamplingService.UpdateSamplingConfigurationTask( + ProjectId.DEFAULT, + newIndexName, + newConfig, + TimeValue.timeValueSeconds(30), + null // ActionListener not needed for this test + ); + + // Execute the task and expect an IllegalStateException + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> executor.executeTask(task, clusterState)); + + // Verify the exception message contains expected information + assertThat(exception.getMessage(), containsString("Cannot add sampling configuration for index [" + newIndexName + "]")); + assertThat(exception.getMessage(), containsString("Maximum number of sampling configurations (100) already reached")); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityExtension.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityExtension.java index 4c4db9a035e93..9bfd207856339 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityExtension.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/SecurityExtension.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.security.authc.AuthenticationFailureHandler; @@ -64,6 +65,9 @@ interface SecurityComponents { /** Provides the ability to access project-scoped data from the global scope **/ ProjectResolver projectResolver(); + + /** Provides the ability to access the APM tracer and meter registry **/ + TelemetryProvider telemetryProvider(); } /** diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java index 972a9a4653671..e9885d155187a 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryStopIT.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.action; import org.elasticsearch.Build; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.client.internal.Client; import org.elasticsearch.core.Tuple; @@ -16,6 +17,8 @@ import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.async.AsyncStopRequest; +import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest; +import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; @@ -28,7 +31,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.esql.action.AbstractEsqlIntegTestCase.randomIncludeCCSMetadata; -import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.deleteAsyncId; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.getAsyncResponse; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQuery; import static org.elasticsearch.xpack.esql.action.EsqlAsyncTestUtils.startAsyncQueryWithPragmas; @@ -43,6 +45,11 @@ public class CrossClusterAsyncQueryStopIT extends AbstractCrossClusterTestCase { private static final Logger LOGGER = LogManager.getLogger(CrossClusterAsyncQueryStopIT.class); + @Override + protected boolean reuseClusters() { + return false; + } + public void testStopQuery() throws Exception { assumeTrue("Pragma does not work in release builds", Build.current().isSnapshot()); Map testClusterInfo = setupClusters(3); @@ -125,7 +132,7 @@ public void testStopQuery() throws Exception { } finally { // Ensure proper cleanup if the test fails CountingPauseFieldPlugin.allowEmitting.countDown(); - assertAcked(deleteAsyncId(client(), asyncExecutionId)); + deleteAsyncId(client(), asyncExecutionId); } } @@ -222,7 +229,7 @@ public void testStopQueryLocal() throws Exception { } } finally { SimplePauseFieldPlugin.allowEmitting.countDown(); - assertAcked(deleteAsyncId(client, asyncExecutionId)); + deleteAsyncId(client, asyncExecutionId); } } @@ -265,7 +272,7 @@ public void testStopQueryLocalNoRemotes() throws Exception { } } finally { SimplePauseFieldPlugin.allowEmitting.countDown(); - assertAcked(deleteAsyncId(client(), asyncExecutionId)); + deleteAsyncId(client(), asyncExecutionId); } } @@ -318,7 +325,7 @@ public void testStopQueryInlineStats() throws Exception { // The sum could be null, if the stats did not manage to compute anything before being stopped // Or it could be 45L if it managed to add 0-9 if (v != null) { - assertThat((long) v, equalTo(45L)); + assertThat((long) v, lessThanOrEqualTo(45L)); } v = row.next(); if (v != null) { @@ -351,7 +358,16 @@ public void testStopQueryInlineStats() throws Exception { } finally { // Ensure proper cleanup if the test fails CountingPauseFieldPlugin.allowEmitting.countDown(); - assertAcked(deleteAsyncId(client(), asyncExecutionId)); + deleteAsyncId(client(), asyncExecutionId); + } + } + + public void deleteAsyncId(Client client, String id) { + try { + DeleteAsyncResultRequest request = new DeleteAsyncResultRequest(id); + assertAcked(client.execute(TransportDeleteAsyncResultAction.TYPE, request).actionGet(30, TimeUnit.SECONDS)); + } catch (ElasticsearchTimeoutException e) { + LOGGER.warn("timeout waiting for DELETE response: {}: {}", id, e); } } diff --git a/x-pack/plugin/inference/qa/multi-node/build.gradle b/x-pack/plugin/inference/qa/multi-node/build.gradle new file mode 100644 index 0000000000000..0fcb153c82dbc --- /dev/null +++ b/x-pack/plugin/inference/qa/multi-node/build.gradle @@ -0,0 +1,12 @@ +import org.elasticsearch.gradle.util.GradleUtils + +apply plugin: 'elasticsearch.internal-yaml-rest-test' + +dependencies { + clusterPlugins project(':x-pack:plugin:inference:qa:test-service-plugin') +} + +tasks.named('yamlRestTest') { + usesDefaultDistribution("to be triaged") + maxParallelForks = 1 +} diff --git a/x-pack/plugin/inference/qa/multi-node/src/yamlRestTest/java/org/elasticsearch/xpack/inference/InferenceRestMultiNodeIT.java b/x-pack/plugin/inference/qa/multi-node/src/yamlRestTest/java/org/elasticsearch/xpack/inference/InferenceRestMultiNodeIT.java new file mode 100644 index 0000000000000..bca31acabc386 --- /dev/null +++ b/x-pack/plugin/inference/qa/multi-node/src/yamlRestTest/java/org/elasticsearch/xpack/inference/InferenceRestMultiNodeIT.java @@ -0,0 +1,71 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.inference; + +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.elasticsearch.client.Request; +import org.elasticsearch.common.Strings; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate; +import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase; +import org.junit.After; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class InferenceRestMultiNodeIT extends ESClientYamlSuiteTestCase { + + public InferenceRestMultiNodeIT(final ClientYamlTestCandidate testCandidate) { + super(testCandidate); + } + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .systemProperty("tests.seed", System.getProperty("tests.seed")) + .setting("xpack.security.enabled", "false") + .setting("xpack.security.http.ssl.enabled", "false") + .setting("xpack.license.self_generated.type", "trial") + .plugin("inference-service-test") + .nodes(3) + .distribution(DistributionType.DEFAULT) + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @ParametersFactory + public static Iterable parameters() throws Exception { + return ESClientYamlSuiteTestCase.createParameters(); + } + + @After + public void cleanup() throws Exception { + for (var model : getAllModels()) { + var inferenceId = model.get("inference_id"); + try { + var endpoint = Strings.format("_inference/%s?force", inferenceId); + adminClient().performRequest(new Request("DELETE", endpoint)); + } catch (Exception ex) { + logger.warn(() -> "failed to delete inference endpoint " + inferenceId, ex); + } + } + } + + @SuppressWarnings("unchecked") + static List> getAllModels() throws IOException { + var request = new Request("GET", "_inference/_all"); + var response = client().performRequest(request); + return (List>) entityAsMap(response).get("endpoints"); + } +} diff --git a/x-pack/plugin/inference/qa/multi-node/src/yamlRestTest/resources/rest-api-spec/test/inference/10_retriever_extended_telemetry.yml b/x-pack/plugin/inference/qa/multi-node/src/yamlRestTest/resources/rest-api-spec/test/inference/10_retriever_extended_telemetry.yml new file mode 100644 index 0000000000000..8722fb102d4aa --- /dev/null +++ b/x-pack/plugin/inference/qa/multi-node/src/yamlRestTest/resources/rest-api-spec/test/inference/10_retriever_extended_telemetry.yml @@ -0,0 +1,120 @@ +setup: + - requires: + cluster_features: "search.usage.extended_data" + reason: extended search usage data introduced in 9.2.0 + + - do: + inference.put: + task_type: rerank + inference_id: my-rerank-model + body: > + { + "service": "test_reranking_service", + "service_settings": { + "model_id": "my_model", + "api_key": "abc64" + }, + "task_settings": { + } + } + + - do: + inference.put: + task_type: sparse_embedding + inference_id: sparse-inference-id + body: > + { + "service": "test_service", + "service_settings": { + "model": "my_model", + "api_key": "abc64" + }, + "task_settings": { + } + } + + - do: + indices.create: + index: test-index + body: + mappings: + properties: + text: + type: text + copy_to: semantic_text_field + topic: + type: keyword + subtopic: + type: keyword + inference_text_field: + type: text + semantic_text_field: + type: semantic_text + inference_id: sparse-inference-id + chunking_settings: + strategy: word + max_chunk_size: 10 + overlap: 1 + + - do: + index: + index: test-index + id: doc_2 + body: + text: "The phases of the Moon come from the position of the Moon relative to the Earth and Sun." + topic: [ "science" ] + subtopic: [ "astronomy" ] + inference_text_field: "0" + refresh: true + + - do: + index: + index: test-index + id: doc_3 + body: + text: "Sun Moon Lake is a lake in Nantou County, Taiwan. It is the largest lake in Taiwan." + topic: [ "geography" ] + inference_text_field: "1" + refresh: true + + - do: + index: + index: test-index + id: doc_1 + body: + text: "As seen from Earth, a solar eclipse happens when the Moon is directly between the Earth and the Sun." + topic: [ "science" ] + subtopic: [ "technology" ] + inference_text_field: "-1" + refresh: true + +--- +"Reranking based on rescore_chunks is reflected in extended search usage stats": + + - do: + search: + index: test-index + body: + track_total_hits: true + fields: [ "text", "semantic_text_field", "topic" ] + retriever: + text_similarity_reranker: + retriever: + standard: + query: + match: + topic: + query: "science" + rank_window_size: 10 + inference_id: my-rerank-model + inference_text: "how often does the moon hide the sun?" + field: semantic_text_field + chunk_rescorer: {} + size: 10 + + - do: + cluster.stats: {} + + - not_exists: _nodes.failures + - exists: indices.search.extended.retrievers.text_similarity_reranker.chunk_rescorer + diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java index cf794d0e124e1..e15759669785f 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferenceFeatures.java @@ -52,6 +52,7 @@ public class InferenceFeatures implements FeatureSpecification { private static final NodeFeature SEMANTIC_TEXT_FIELDS_CHUNKS_FORMAT = new NodeFeature("semantic_text.fields_chunks_format"); public static final NodeFeature INFERENCE_ENDPOINT_CACHE = new NodeFeature("inference.endpoint.cache"); + public static final NodeFeature SEARCH_USAGE_EXTENDED_DATA = new NodeFeature("search.usage.extended_data"); @Override public Set getFeatures() { @@ -96,7 +97,8 @@ public Set getTestFeatures() { SemanticQueryBuilder.SEMANTIC_QUERY_FILTER_FIELD_CAPS_FIX, InterceptedInferenceQueryBuilder.NEW_SEMANTIC_QUERY_INTERCEPTORS, TEXT_SIMILARITY_RERANKER_SNIPPETS, - ModelStats.SEMANTIC_TEXT_USAGE + ModelStats.SEMANTIC_TEXT_USAGE, + SEARCH_USAGE_EXTENDED_DATA ) ); testFeatures.addAll(getFeatures()); diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index a66f85e953e62..6b1b7ff490102 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -659,6 +659,7 @@ public class Constants { "internal:admin/repository/verify", "internal:admin/repository/verify/coordinate", "indices:admin/sample", + "indices:admin/sample/config/update", "indices:admin/sample/stats" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); } diff --git a/x-pack/plugin/security/src/main/java/module-info.java b/x-pack/plugin/security/src/main/java/module-info.java index 0497761e71b2b..1fd6f007c86d3 100644 --- a/x-pack/plugin/security/src/main/java/module-info.java +++ b/x-pack/plugin/security/src/main/java/module-info.java @@ -80,6 +80,7 @@ exports org.elasticsearch.xpack.security.transport.extension to org.elasticsearch.internal.security; exports org.elasticsearch.xpack.security.transport to org.elasticsearch.internal.security; exports org.elasticsearch.xpack.security.audit to org.elasticsearch.internal.security; + exports org.elasticsearch.xpack.security.metric to org.elasticsearch.internal.security; provides org.elasticsearch.index.SlowLogFieldProvider with org.elasticsearch.xpack.security.slowlog.SecuritySlowLogFieldProvider; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index b449144cb1d98..90e8d34b68f7a 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -865,7 +865,8 @@ Collection createComponents( clusterService, resourceWatcherService, userRoleMapper, - projectResolver + projectResolver, + telemetryProvider ); Map realmFactories = new HashMap<>( InternalRealms.getFactories( diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/InstrumentedSecurityActionListener.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/InstrumentedSecurityActionListener.java index 101f49258dd59..eca776ecf3c6f 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/InstrumentedSecurityActionListener.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/InstrumentedSecurityActionListener.java @@ -42,4 +42,28 @@ public static ActionListener> wrapForAuthc( }), () -> metrics.recordTime(context, startTimeNano)); } + /** + * A simpler variant that re-uses the Authentication Result as the context. This can be handy in situations where the attributes that + * are of interest are available only after the authentication is completed and not before. + * As a natural consequence, there will be no context available at the point of recording start time and in cases of exceptional failure + */ + public static ActionListener> wrapForAuthc( + final SecurityMetrics> metrics, + final ActionListener> listener + ) { + assert metrics.type().group() == SecurityMetricGroup.AUTHC; + final long startTimeNano = metrics.relativeTimeInNanos(); + return ActionListener.runBefore(ActionListener.wrap(result -> { + if (result.isAuthenticated()) { + metrics.recordSuccess(result); + } else { + metrics.recordFailure(result, result.getMessage()); + } + listener.onResponse(result); + }, e -> { + metrics.recordFailure(null, e.getMessage()); + listener.onFailure(e); + }), () -> metrics.recordTime(null, startTimeNano)); + } + } diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/SecurityMetricType.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/SecurityMetricType.java index 02ac292aee781..74a9164b104d4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/SecurityMetricType.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/metric/SecurityMetricType.java @@ -19,6 +19,25 @@ public enum SecurityMetricType { new SecurityMetricInfo("es.security.authc.api_key.time", "Time it took (in nanoseconds) to execute API key authentication.", "ns") ), + CLOUD_AUTHC_API_KEY( + SecurityMetricGroup.AUTHC, + new SecurityMetricInfo( + "es.security.authc.cloud_api_key.success.total", + "Number of successful cloud API key authentications.", + "count" + ), + new SecurityMetricInfo( + "es.security.authc.cloud_api_key.failures.total", + "Number of failed cloud API key authentications.", + "count" + ), + new SecurityMetricInfo( + "es.security.authc.cloud_api_key.time", + "Time it took (in nanoseconds) to execute cloud API key authentication.", + "ns" + ) + ), + AUTHC_SERVICE_ACCOUNT( SecurityMetricGroup.AUTHC, new SecurityMetricInfo( diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/ExtensionComponents.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/ExtensionComponents.java index 1e14fe197e724..56c5ab993f8a4 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/ExtensionComponents.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/ExtensionComponents.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.core.security.SecurityExtension; @@ -27,6 +28,7 @@ public final class ExtensionComponents implements SecurityExtension.SecurityComp private final ResourceWatcherService resourceWatcherService; private final UserRoleMapper roleMapper; private final ProjectResolver projectResolver; + private final TelemetryProvider telemetryProvider; public ExtensionComponents( Environment environment, @@ -34,7 +36,8 @@ public ExtensionComponents( ClusterService clusterService, ResourceWatcherService resourceWatcherService, UserRoleMapper roleMapper, - ProjectResolver projectResolver + ProjectResolver projectResolver, + TelemetryProvider telemetryProvider ) { this.environment = environment; this.client = client; @@ -42,6 +45,7 @@ public ExtensionComponents( this.resourceWatcherService = resourceWatcherService; this.roleMapper = roleMapper; this.projectResolver = projectResolver; + this.telemetryProvider = telemetryProvider; } @Override @@ -83,4 +87,9 @@ public UserRoleMapper roleMapper() { public ProjectResolver projectResolver() { return projectResolver; } + + @Override + public TelemetryProvider telemetryProvider() { + return telemetryProvider; + } }