diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_sample_stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_sample_stats.json new file mode 100644 index 0000000000000..43aa69977e8da --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_sample_stats.json @@ -0,0 +1,31 @@ +{ + "indices.get_sample_stats": { + "documentation": { + "url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-get-sample", + "description": "Get stats about a random sample of ingested data" + }, + "stability": "experimental", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/{index}/_sample/stats", + "methods": [ + "GET" + ], + "parts": { + "index": { + "type": "string", + "description": "The name of a data stream or index" + } + } + } + ] + } + } +} diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/10_basic.yml new file mode 100644 index 0000000000000..1d9297dc7f4d8 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/10_basic.yml @@ -0,0 +1,19 @@ +--- +"Test get sample stats for index with no sample config": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.get_sample_stats: + index: non_existent + catch: missing + + - do: + indices.create: + index: no_config + + - do: + indices.get_sample_stats: + index: no_config + catch: missing diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java index 904720ee70acb..6eca12f07d303 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.test.ESIntegTestCase; @@ -23,6 +24,7 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class GetSampleActionIT extends ESIntegTestCase { @@ -34,7 +36,8 @@ public void testGetSample() throws Exception { createIndex(indexName); // the index exists but there is no sampling configuration for it, so getting its sample will throw an exception: assertGetSampleThrowsResourceNotFoundException(indexName); - addSamplingConfig(indexName); + final int maxSamples = 30; + addSamplingConfig(indexName, maxSamples); // There is now a sampling configuration, but no data has been ingested: assertEmptySample(indexName); int docsToIndex = randomIntBetween(1, 20); @@ -49,6 +52,29 @@ public void testGetSample() throws Exception { for (int i = 0; i < docsToIndex; i++) { assertRawDocument(sample.get(i), indexName); } + + GetSampleStatsAction.Request statsRequest = new GetSampleStatsAction.Request(indexName); + GetSampleStatsAction.Response statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet(); + SamplingService.SampleStats stats = statsResponse.getSampleStats(); + assertThat(stats.getSamples(), equalTo((long) docsToIndex)); + assertThat(stats.getPotentialSamples(), equalTo((long) docsToIndex)); + assertThat(stats.getTimeSampling(), greaterThan(TimeValue.ZERO)); + assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo(0L)); + assertThat(stats.getSamplesRejectedForRate(), equalTo(0L)); + assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L)); + assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L)); + + final int samplesOverMax = randomIntBetween(1, 5); + for (int i = docsToIndex; i < maxSamples + samplesOverMax; i++) { + indexDoc(indexName, randomIdentifier(), randomAlphanumericOfLength(10), randomAlphanumericOfLength(10)); + } + statsRequest = new GetSampleStatsAction.Request(indexName); + statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet(); + stats = statsResponse.getSampleStats(); + assertThat(stats.getSamples(), equalTo((long) maxSamples)); + assertThat(stats.getPotentialSamples(), equalTo((long) maxSamples + samplesOverMax)); + assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo((long) samplesOverMax)); + } private void assertRawDocument(SamplingService.RawDocument rawDocument, String indexName) { @@ -68,7 +94,7 @@ private void assertGetSampleThrowsResourceNotFoundException(String indexName) { } @SuppressWarnings("deprecation") - private void addSamplingConfig(String indexName) throws Exception { + private void addSamplingConfig(String indexName, int maxSamples) throws Exception { /* * Note: The following code writes a sampling config directly to the cluster state. It can be replaced with a call to the action * that does this once that action exists. @@ -81,7 +107,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { currentState.metadata().getProject(ProjectId.DEFAULT) ); SamplingMetadata samplingMetadata = new SamplingMetadata( - Map.of(indexName, new SamplingConfiguration(1.0d, 100, null, null, null)) + Map.of(indexName, new SamplingConfiguration(1.0d, maxSamples, null, null, null)) ); projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, samplingMetadata); ClusterState newState = new ClusterState.Builder(currentState).putProjectMetadata(projectMetadataBuilder).build(); diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index d2252af74ecaa..757b8911a540d 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -130,8 +130,11 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction; import org.elasticsearch.action.admin.indices.sampling.GetSampleAction; +import org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction; import org.elasticsearch.action.admin.indices.sampling.RestGetSampleAction; +import org.elasticsearch.action.admin.indices.sampling.RestGetSampleStatsAction; import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleAction; +import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleStatsAction; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; @@ -821,6 +824,7 @@ public void reg if (RANDOM_SAMPLING_FEATURE_FLAG) { actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class); + actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class); } return unmodifiableMap(actions.getRegistry()); @@ -1053,6 +1057,7 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< if (RANDOM_SAMPLING_FEATURE_FLAG) { registerHandler.accept(new RestGetSampleAction()); + registerHandler.accept(new RestGetSampleStatsAction()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java new file mode 100644 index 0000000000000..a8e6e2b0140a2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java @@ -0,0 +1,239 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.xcontent.ToXContent; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk; + +public class GetSampleStatsAction extends ActionType { + + public static final GetSampleStatsAction INSTANCE = new GetSampleStatsAction(); + public static final String NAME = "indices:admin/sample/stats"; + + private GetSampleStatsAction() { + super(NAME); + } + + public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable { + private String indexName; + + public Request(String indexName) { + super((String[]) null); + this.indexName = indexName; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "get sample stats", parentTaskId, headers); + } + + @Override + public ActionRequestValidationException validate() { + if (this.indexName.contains("*")) { + return (ActionRequestValidationException) new ActionRequestValidationException().addValidationError( + "Wildcards are not supported, but found [" + indexName + "]" + ); + } + return null; + } + + @Override + public IndicesRequest indices(String... indices) { + assert indices.length == 1 : "GetSampleStatsAction only supports a single index name"; + this.indexName = indices[0]; + return this; + } + + @Override + public String[] indices() { + return new String[] { indexName }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS; + } + } + + public static class NodeRequest extends AbstractTransportRequest implements IndicesRequest { + private final String indexName; + + public NodeRequest(String indexName) { + this.indexName = indexName; + } + + public NodeRequest(StreamInput in) throws IOException { + super(in); + this.indexName = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(indexName); + } + + public String getIndexName() { + return indexName; + } + + @Override + public String[] indices() { + return new String[] { indexName }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeRequest other = (NodeRequest) o; + return Objects.equals(indexName, other.indexName); + } + + @Override + public int hashCode() { + return Objects.hash(indexName); + } + } + + public static class Response extends BaseNodesResponse implements Writeable, ChunkedToXContent { + final int maxSize; + + public Response(StreamInput in) throws IOException { + super(in); + maxSize = in.readInt(); + } + + public Response( + ClusterName clusterName, + List nodes, + List failures, + int maxSize + ) { + super(clusterName, nodes, failures); + this.maxSize = maxSize; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeInt(maxSize); + } + + public SamplingService.SampleStats getSampleStats() { + SamplingService.SampleStats rawStats = getRawSampleStats(); + return rawStats.adjustForMaxSize(maxSize); + } + + private SamplingService.SampleStats getRawSampleStats() { + return getNodes().stream() + .map(NodeResponse::getSampleStats) + .filter(Objects::nonNull) + .reduce(SamplingService.SampleStats::combine) + .orElse(new SamplingService.SampleStats()); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(GetSampleStatsAction.NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeCollection(nodes); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response other = (Response) o; + return Objects.equals(getNodes(), other.getNodes()) && maxSize == other.maxSize; + } + + @Override + public int hashCode() { + return Objects.hash(getNodes(), maxSize); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return chunk(getSampleStats()); + } + } + + public static class NodeResponse extends BaseNodeResponse { + private final SamplingService.SampleStats sampleStats; + + protected NodeResponse(StreamInput in) throws IOException { + super(in); + sampleStats = new SamplingService.SampleStats(in); + } + + protected NodeResponse(DiscoveryNode node, SamplingService.SampleStats sampleStats) { + super(node); + this.sampleStats = sampleStats; + } + + public SamplingService.SampleStats getSampleStats() { + return sampleStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sampleStats.writeTo(out); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetSampleStatsAction.NodeResponse other = (GetSampleStatsAction.NodeResponse) o; + return getNode().equals(other.getNode()) && sampleStats.equals(other.sampleStats); + } + + @Override + public int hashCode() { + return Objects.hash(getNode(), sampleStats); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java new file mode 100644 index 0000000000000..5a3c76f3781aa --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +@ServerlessScope(Scope.INTERNAL) +public class RestGetSampleStatsAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_sample_stats"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/{index}/_sample/stats")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String[] indexNames = request.param("index").split(","); + if (indexNames.length > 1) { + throw new ActionRequestValidationException().addValidationError( + "Can only get samples for a single index at a time, but found " + + Arrays.stream(indexNames).collect(Collectors.joining(", ", "[", "]")) + ); + } + GetSampleStatsAction.Request getSampleStatsRequest = new GetSampleStatsAction.Request(indexNames[0]); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + GetSampleStatsAction.INSTANCE, + getSampleStatsRequest, + new RestRefCountedChunkedToXContentListener<>(channel) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java index 2d25aa32ce0cc..87842ef096e33 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java @@ -63,8 +63,11 @@ public TransportGetSampleAction( @Override protected Void createActionContext(Task task, Request request) { String indexName = request.indices()[0]; - SamplingMetadata samplingMetadata = projectResolver.getProjectMetadata(clusterService.state()).custom(SamplingMetadata.TYPE); - if (samplingMetadata == null || samplingMetadata.getIndexToSamplingConfigMap().get(indexName) == null) { + SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( + projectResolver.getProjectMetadata(clusterService.state()), + request.indices()[0] + ); + if (samplingConfiguration == null) { throw new ResourceNotFoundException("No sampling configuration found for [" + indexName + "]"); } return null; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java new file mode 100644 index 0000000000000..4bd89dd2beab2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.NodeRequest; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.NodeResponse; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.Request; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.Response; + +public class TransportGetSampleStatsAction extends TransportNodesAction { + private final SamplingService samplingService; + private final ProjectResolver projectResolver; + + @Inject + public TransportGetSampleStatsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + SamplingService samplingService, + ProjectResolver projectResolver + ) { + super( + GetSampleStatsAction.NAME, + clusterService, + transportService, + actionFilters, + NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.samplingService = samplingService; + this.projectResolver = projectResolver; + } + + @Override + protected Void createActionContext(Task task, GetSampleStatsAction.Request request) { + String indexName = request.indices()[0]; + SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( + projectResolver.getProjectMetadata(clusterService.state()), + request.indices()[0] + ); + if (samplingConfiguration == null) { + throw new ResourceNotFoundException("No sampling configuration found for [" + indexName + "]"); + } + return null; + } + + @Override + protected Response newResponse(Request request, List nodeResponses, List failures) { + SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( + projectResolver.getProjectMetadata(clusterService.state()), + request.indices()[0] + ); + int maxSamples = samplingConfiguration == null ? 0 : samplingConfiguration.maxSamples(); + return new Response(clusterService.getClusterName(), nodeResponses, failures, maxSamples); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request.indices()[0]); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeResponse(in); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request, Task task) { + SamplingService.SampleStats sampleStats = samplingService.getLocalSampleStats( + projectResolver.getProjectId(), + request.getIndexName() + ); + return new NodeResponse(transportService.getLocalNode(), sampleStats); + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 4dc60cb4ce7a2..e6d34753e037f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -58,7 +58,6 @@ public class SamplingService implements ClusterStateListener { private final ProjectResolver projectResolver; private final LongSupplier relativeMillisTimeSupplier; private final LongSupplier statsTimeSupplier = System::nanoTime; - /* * This Map contains the samples that exist on this node. They are not persisted to disk. They are stored as SoftReferences so that * sampling does not contribute to a node running out of memory. The idea is that access to samples is desirable, but not critical. We @@ -129,17 +128,12 @@ private void maybeSample( return; } long startTime = statsTimeSupplier.getAsLong(); - SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); - if (samplingMetadata == null) { - return; - } - SamplingConfiguration samplingConfig = samplingMetadata.getIndexToSamplingConfigMap().get(indexName); - ProjectId projectId = projectMetadata.id(); + SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName); if (samplingConfig == null) { return; } SoftReference sampleInfoReference = samples.compute( - new ProjectIndex(projectId, indexName), + new ProjectIndex(projectMetadata.id(), indexName), (k, v) -> v == null || v.get() == null ? new SoftReference<>( new SampleInfo(samplingConfig.maxSamples(), samplingConfig.timeToLive(), relativeMillisTimeSupplier.getAsLong()) @@ -179,7 +173,7 @@ private void maybeSample( sampleInfo.compilationFailed = true; throw e; } finally { - stats.timeCompilingCondition.add((statsTimeSupplier.getAsLong() - compileScriptStartTime)); + stats.timeCompilingConditionInNanos.add((statsTimeSupplier.getAsLong() - compileScriptStartTime)); } } } @@ -204,10 +198,25 @@ && evaluateCondition(ingestDocumentSupplier, sampleInfo.script, sampleInfo.facto stats.lastException = e; logger.debug("Error performing sampling for " + indexName, e); } finally { - stats.timeSampling.add((statsTimeSupplier.getAsLong() - startTime)); + stats.timeSamplingInNanos.add((statsTimeSupplier.getAsLong() - startTime)); } } + /** + * Retrieves the sampling configuration for the specified index from the given project metadata. + * + * @param projectMetadata The project metadata containing sampling information. + * @param indexName The name of the index or data stream for which to retrieve the sampling configuration. + * @return The {@link SamplingConfiguration} for the specified index, or {@code null} if none exists. + */ + public SamplingConfiguration getSamplingConfiguration(ProjectMetadata projectMetadata, String indexName) { + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + if (samplingMetadata == null) { + return null; + } + return samplingMetadata.getIndexToSamplingConfigMap().get(indexName); + } + /** * Gets the sample for the given projectId and index on this node only. The sample is not persistent. * @param projectId The project that this sample is for @@ -262,7 +271,7 @@ private boolean evaluateCondition( long conditionStartTime = statsTimeSupplier.getAsLong(); boolean passedCondition = factory.newInstance(script.getParams(), ingestDocumentSupplier.get().getUnmodifiableSourceAndMetadata()) .execute(); - stats.timeEvaluatingCondition.add((statsTimeSupplier.getAsLong() - conditionStartTime)); + stats.timeEvaluatingConditionInNanos.add((statsTimeSupplier.getAsLong() - conditionStartTime)); return passedCondition; } @@ -337,13 +346,44 @@ public static final class SampleStats implements Writeable, ToXContent { final LongAdder samplesRejectedForCondition = new LongAdder(); final LongAdder samplesRejectedForRate = new LongAdder(); final LongAdder samplesRejectedForException = new LongAdder(); - final LongAdder timeSampling = new LongAdder(); - final LongAdder timeEvaluatingCondition = new LongAdder(); - final LongAdder timeCompilingCondition = new LongAdder(); + final LongAdder timeSamplingInNanos = new LongAdder(); + final LongAdder timeEvaluatingConditionInNanos = new LongAdder(); + final LongAdder timeCompilingConditionInNanos = new LongAdder(); Exception lastException = null; public SampleStats() {} + public SampleStats(SampleStats other) { + addAllFields(other, this); + } + + /* + * This constructor is only meant for constructing arbitrary SampleStats for testing + */ + public SampleStats( + long samples, + long potentialSamples, + long samplesRejectedForMaxSamplesExceeded, + long samplesRejectedForCondition, + long samplesRejectedForRate, + long samplesRejectedForException, + TimeValue timeSampling, + TimeValue timeEvaluatingCondition, + TimeValue timeCompilingCondition, + Exception lastException + ) { + this.samples.add(samples); + this.potentialSamples.add(potentialSamples); + this.samplesRejectedForMaxSamplesExceeded.add(samplesRejectedForMaxSamplesExceeded); + this.samplesRejectedForCondition.add(samplesRejectedForCondition); + this.samplesRejectedForRate.add(samplesRejectedForRate); + this.samplesRejectedForException.add(samplesRejectedForException); + this.timeSamplingInNanos.add(timeSampling.nanos()); + this.timeEvaluatingConditionInNanos.add(timeEvaluatingCondition.nanos()); + this.timeCompilingConditionInNanos.add(timeCompilingCondition.nanos()); + this.lastException = lastException; + } + public SampleStats(StreamInput in) throws IOException { potentialSamples.add(in.readLong()); samplesRejectedForMaxSamplesExceeded.add(in.readLong()); @@ -351,9 +391,9 @@ public SampleStats(StreamInput in) throws IOException { samplesRejectedForRate.add(in.readLong()); samplesRejectedForException.add(in.readLong()); samples.add(in.readLong()); - timeSampling.add(in.readLong()); - timeEvaluatingCondition.add(in.readLong()); - timeCompilingCondition.add(in.readLong()); + timeSamplingInNanos.add(in.readLong()); + timeEvaluatingConditionInNanos.add(in.readLong()); + timeCompilingConditionInNanos.add(in.readLong()); if (in.readBoolean()) { lastException = in.readException(); } else { @@ -386,15 +426,15 @@ public long getSamplesRejectedForException() { } public TimeValue getTimeSampling() { - return TimeValue.timeValueNanos(timeSampling.longValue()); + return TimeValue.timeValueNanos(timeSamplingInNanos.longValue()); } public TimeValue getTimeEvaluatingCondition() { - return TimeValue.timeValueNanos(timeEvaluatingCondition.longValue()); + return TimeValue.timeValueNanos(timeEvaluatingConditionInNanos.longValue()); } public TimeValue getTimeCompilingCondition() { - return TimeValue.timeValueNanos(timeCompilingCondition.longValue()); + return TimeValue.timeValueNanos(timeCompilingConditionInNanos.longValue()); } public Exception getLastException() { @@ -416,16 +456,15 @@ public String toString() { + ", samples_accepted: " + samples + ", time_sampling: " - + (timeSampling.longValue() / 1000000) + + TimeValue.timeValueNanos(timeSamplingInNanos.longValue()) + ", time_evaluating_condition: " - + (timeEvaluatingCondition.longValue() / 1000000) + + TimeValue.timeValueNanos(timeEvaluatingConditionInNanos.longValue()) + ", time_compiling_condition: " - + (timeCompilingCondition.longValue() / 1000000); + + TimeValue.timeValueNanos(timeCompilingConditionInNanos.longValue()); } public SampleStats combine(SampleStats other) { - SampleStats result = new SampleStats(); - addAllFields(this, result); + SampleStats result = new SampleStats(this); addAllFields(other, result); return result; } @@ -437,9 +476,9 @@ private static void addAllFields(SampleStats source, SampleStats dest) { dest.samplesRejectedForRate.add(source.samplesRejectedForRate.longValue()); dest.samplesRejectedForException.add(source.samplesRejectedForException.longValue()); dest.samples.add(source.samples.longValue()); - dest.timeSampling.add(source.timeSampling.longValue()); - dest.timeEvaluatingCondition.add(source.timeEvaluatingCondition.longValue()); - dest.timeCompilingCondition.add(source.timeCompilingCondition.longValue()); + dest.timeSamplingInNanos.add(source.timeSamplingInNanos.longValue()); + dest.timeEvaluatingConditionInNanos.add(source.timeEvaluatingConditionInNanos.longValue()); + dest.timeCompilingConditionInNanos.add(source.timeCompilingConditionInNanos.longValue()); if (dest.lastException == null) { dest.lastException = source.lastException; } @@ -454,9 +493,17 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.field("samples_rejected_for_rate", samplesRejectedForRate.longValue()); builder.field("samples_rejected_for_exception", samplesRejectedForException.longValue()); builder.field("samples_accepted", samples.longValue()); - builder.field("time_sampling", (timeSampling.longValue() / 1000000)); - builder.field("time_evaluating_condition", (timeEvaluatingCondition.longValue() / 1000000)); - builder.field("time_compiling_condition", (timeCompilingCondition.longValue() / 1000000)); + builder.humanReadableField("time_sampling_millis", "time_sampling", TimeValue.timeValueNanos(timeSamplingInNanos.longValue())); + builder.humanReadableField( + "time_evaluating_condition_millis", + "time_evaluating_condition", + TimeValue.timeValueNanos(timeEvaluatingConditionInNanos.longValue()) + ); + builder.humanReadableField( + "time_compiling_condition_millis", + "time_compiling_condition", + TimeValue.timeValueNanos(timeCompilingConditionInNanos.longValue()) + ); builder.endObject(); return builder; } @@ -469,9 +516,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(samplesRejectedForRate.longValue()); out.writeLong(samplesRejectedForException.longValue()); out.writeLong(samples.longValue()); - out.writeLong(timeSampling.longValue()); - out.writeLong(timeEvaluatingCondition.longValue()); - out.writeLong(timeCompilingCondition.longValue()); + out.writeLong(timeSamplingInNanos.longValue()); + out.writeLong(timeEvaluatingConditionInNanos.longValue()); + out.writeLong(timeCompilingConditionInNanos.longValue()); if (lastException == null) { out.writeBoolean(false); } else { @@ -511,18 +558,23 @@ public boolean equals(Object o) { if (samplesRejectedForException.longValue() != that.samplesRejectedForException.longValue()) { return false; } - if (timeSampling.longValue() != that.timeSampling.longValue()) { + if (timeSamplingInNanos.longValue() != that.timeSamplingInNanos.longValue()) { return false; } - if (timeEvaluatingCondition.longValue() != that.timeEvaluatingCondition.longValue()) { + if (timeEvaluatingConditionInNanos.longValue() != that.timeEvaluatingConditionInNanos.longValue()) { return false; } - if (timeCompilingCondition.longValue() != that.timeCompilingCondition.longValue()) { + if (timeCompilingConditionInNanos.longValue() != that.timeCompilingConditionInNanos.longValue()) { return false; } return exceptionsAreEqual(lastException, that.lastException); } + /* + * This is used because most Exceptions do not have an equals or hashCode, and cause trouble when testing for equality in + * serialization unit tests. This method returns true if the exceptions are the same class and have the same message. This is good + * enough for serialization unit tests. + */ private boolean exceptionsAreEqual(Exception e1, Exception e2) { if (e1 == null && e2 == null) { return true; @@ -533,6 +585,10 @@ private boolean exceptionsAreEqual(Exception e1, Exception e2) { return e1.getClass().equals(e2.getClass()) && e1.getMessage().equals(e2.getMessage()); } + /* + * equals and hashCode are implemented for the sake of testing serialization. Since this class is mutable, these ought to never be + * used outside of testing. + */ @Override public int hashCode() { return Objects.hash( @@ -542,9 +598,9 @@ public int hashCode() { samplesRejectedForCondition.longValue(), samplesRejectedForRate.longValue(), samplesRejectedForException.longValue(), - timeSampling.longValue(), - timeEvaluatingCondition.longValue(), - timeCompilingCondition.longValue() + timeSamplingInNanos.longValue(), + timeEvaluatingConditionInNanos.longValue(), + timeCompilingConditionInNanos.longValue() ) + hashException(lastException); } @@ -555,6 +611,23 @@ private int hashException(Exception e) { return Objects.hash(e.getClass(), e.getMessage()); } } + + /* + * If the sample stats report more raw documents than the maximum size allowed for this sample, then this method creates a new + * cloned copy of the stats, but with the reported samples lowered to maxSize, and the reported rejected documents increased by the + * same amount. This avoids the confusing situation of the stats reporting more samples than the user has configured. + */ + public SampleStats adjustForMaxSize(int maxSize) { + long actualSamples = samples.longValue(); + if (actualSamples > maxSize) { + SampleStats adjusted = new SampleStats().combine(this); + adjusted.samples.add(maxSize - actualSamples); + adjusted.samplesRejectedForMaxSamplesExceeded.add(actualSamples - maxSize); + return adjusted; + } else { + return this; + } + } } /* diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeRequestTests.java new file mode 100644 index 0000000000000..b42952aedff90 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeRequestTests.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.NodeRequest; + +public class GetSampleStatsActionNodeRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return NodeRequest::new; + } + + @Override + protected NodeRequest createTestInstance() { + return new NodeRequest(randomIdentifier()); + } + + @Override + protected NodeRequest mutateInstance(NodeRequest instance) throws IOException { + String index = instance.indices()[0]; + index = randomValueOtherThan(index, ESTestCase::randomIdentifier); + return new NodeRequest(index); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java new file mode 100644 index 0000000000000..620a15bd6e4d0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class GetSampleStatsActionNodeResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return GetSampleStatsAction.NodeResponse::new; + } + + @Override + protected GetSampleStatsAction.NodeResponse createTestInstance() { + return new GetSampleStatsAction.NodeResponse(randomNode(), randomStats()); + } + + @Override + protected GetSampleStatsAction.NodeResponse mutateInstance(GetSampleStatsAction.NodeResponse instance) throws IOException { + DiscoveryNode node = instance.getNode(); + SamplingService.SampleStats stats = instance.getSampleStats(); + if (randomBoolean()) { + node = randomValueOtherThan(node, GetSampleStatsActionNodeResponseTests::randomNode); + } else { + stats = randomStats(); + } + return new GetSampleStatsAction.NodeResponse(node, stats); + } + + private static SamplingService.SampleStats randomStats() { + return new SamplingService.SampleStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomBoolean() ? null : new ElasticsearchException("fail") + ); + } + + private static DiscoveryNode randomNode() { + return new DiscoveryNode( + randomIdentifier(), + randomIdentifier(), + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionInformation.CURRENT + ); + } + +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java new file mode 100644 index 0000000000000..ddaf19195d563 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.Response; + +public class GetSampleStatsActionResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + ClusterName clusterName = new ClusterName(randomAlphaOfLength(10)); + List nodes = randomList( + 5, + () -> new GetSampleStatsAction.NodeResponse(randomNode(), randomStats()) + ); + return new Response(clusterName, nodes, List.of(), randomIntBetween(1, 10)); + } + + @Override + protected Response mutateInstance(Response instance) throws IOException { + ClusterName clusterName = instance.getClusterName(); + List nodes = new ArrayList<>(instance.getNodes()); + List failures = new ArrayList<>(instance.failures()); + int maxSize = instance.maxSize; + switch (between(0, 1)) { + case 0 -> maxSize = randomIntBetween(101, 200); + case 1 -> { + DiscoveryNode node = randomNode(); + nodes.add(new GetSampleStatsAction.NodeResponse(node, randomStats())); + } + default -> throw new AssertionError("Illegal randomisation branch"); + } + return new Response(clusterName, nodes, failures, maxSize); + } + + private static SamplingService.SampleStats randomStats() { + return new SamplingService.SampleStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomBoolean() ? null : new ElasticsearchException("fail") + ); + } + + private static DiscoveryNode randomNode() { + return new DiscoveryNode( + randomIdentifier(), + randomIdentifier(), + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionInformation.CURRENT + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java index df8973d27ce3a..6fb3d345a3caa 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java @@ -36,9 +36,9 @@ protected SampleStats createTestInstance() { stats.samplesRejectedForCondition.add(randomReasonableLong()); stats.samplesRejectedForRate.add(randomReasonableLong()); stats.samplesRejectedForException.add(randomReasonableLong()); - stats.timeSampling.add(randomReasonableLong()); - stats.timeEvaluatingCondition.add(randomReasonableLong()); - stats.timeCompilingCondition.add(randomReasonableLong()); + stats.timeSamplingInNanos.add(randomReasonableLong()); + stats.timeEvaluatingConditionInNanos.add(randomReasonableLong()); + stats.timeCompilingConditionInNanos.add(randomReasonableLong()); stats.lastException = randomBoolean() ? null : new ElasticsearchException(randomAlphanumericOfLength(10)); return stats; } @@ -65,9 +65,9 @@ protected SampleStats mutateInstance(SampleStats instance) throws IOException { case 3 -> mutated.samplesRejectedForCondition.add(1); case 4 -> mutated.samplesRejectedForRate.add(1); case 5 -> mutated.samplesRejectedForException.add(1); - case 6 -> mutated.timeSampling.add(1); - case 7 -> mutated.timeEvaluatingCondition.add(1); - case 8 -> mutated.timeCompilingCondition.add(1); + case 6 -> mutated.timeSamplingInNanos.add(1); + case 7 -> mutated.timeEvaluatingConditionInNanos.add(1); + case 8 -> mutated.timeCompilingConditionInNanos.add(1); case 9 -> mutated.lastException = mutated.lastException == null ? new ElasticsearchException(randomAlphanumericOfLength(10)) : null; diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 992d388efe707..a66f85e953e62 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -658,6 +658,7 @@ public class Constants { "indices:admin/index/copy_lifecycle_index_metadata", "internal:admin/repository/verify", "internal:admin/repository/verify/coordinate", - "indices:admin/sample" + "indices:admin/sample", + "indices:admin/sample/stats" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); }