From 5771f4c17a6710c954aa9e50ac6620cd180b63ea Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 12:07:02 -0500 Subject: [PATCH 01/11] Adding an API to get random sampling stats --- .../elasticsearch/action/ActionModule.java | 12 + .../sampling/GetSampleStatsAction.java | 227 ++++++++++++++++++ .../sampling/RestGetSampleStatsAction.java | 49 ++++ .../TransportGetSampleStatsAction.java | 86 +++++++ .../elasticsearch/ingest/SamplingService.java | 87 ++++++- .../GetSampleStatsActionNodeRequestTests.java | 37 +++ .../GetSampleStatsActionResponseTests.java | 87 +++++++ .../xpack/security/operator/Constants.java | 3 +- 8 files changed, 577 insertions(+), 11 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index 5d4acd0a3f691..f014db3c13d98 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -129,6 +129,9 @@ import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction; +import org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.RestGetSampleStatsAction; +import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleStatsAction; import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction; import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; @@ -427,6 +430,7 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; +import static org.elasticsearch.ingest.SamplingService.RANDOM_SAMPLING_FEATURE_FLAG; /** * Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}. @@ -815,6 +819,10 @@ public void reg actions.register(GetSynonymRuleAction.INSTANCE, TransportGetSynonymRuleAction.class); actions.register(DeleteSynonymRuleAction.INSTANCE, TransportDeleteSynonymRuleAction.class); + if (RANDOM_SAMPLING_FEATURE_FLAG) { + actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class); + } + return unmodifiableMap(actions.getRegistry()); } @@ -1042,6 +1050,10 @@ public void initRestHandlers(Supplier nodesInCluster, Predicate< registerHandler.accept(new RestPutSynonymRuleAction()); registerHandler.accept(new RestGetSynonymRuleAction()); registerHandler.accept(new RestDeleteSynonymRuleAction()); + + if (RANDOM_SAMPLING_FEATURE_FLAG) { + registerHandler.accept(new RestGetSampleStatsAction()); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java new file mode 100644 index 0000000000000..d769e3a8403b7 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java @@ -0,0 +1,227 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.transport.AbstractTransportRequest; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class GetSampleStatsAction extends ActionType { + + public static final GetSampleStatsAction INSTANCE = new GetSampleStatsAction(); + public static final String NAME = "indices:admin/sample/stats"; + + private GetSampleStatsAction() { + super(NAME); + } + + public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable { + private String indexName; + + public Request(String indexName) { + super((String[]) null); + this.indexName = indexName; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "get sample stats", parentTaskId, headers); + } + + @Override + public ActionRequestValidationException validate() { + if (this.indices().length != 1) { + return new ActionRequestValidationException(); + } + return null; + } + + @Override + public IndicesRequest indices(String... indices) { + assert indices.length == 1 : "GetSampleStatsAction only supports a single index name"; + this.indexName = indices[0]; + return this; + } + + @Override + public String[] indices() { + return new String[] { indexName }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS; + } + } + + public static class NodeRequest extends AbstractTransportRequest implements IndicesRequest { + private final String indexName; + + public NodeRequest(String indexName) { + this.indexName = indexName; + } + + public NodeRequest(StreamInput in) throws IOException { + super(in); + this.indexName = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(indexName); + } + + public String getIndexName() { + return indexName; + } + + @Override + public String[] indices() { + return new String[] { indexName }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeRequest other = (NodeRequest) o; + return Objects.equals(indexName, other.indexName); + } + + @Override + public int hashCode() { + return Objects.hash(indexName); + } + } + + public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { + final int maxSize; + + public Response(StreamInput in) throws IOException { + super(in); + maxSize = in.readInt(); + } + + public Response( + ClusterName clusterName, + List nodes, + List failures, + int maxSize + ) { + super(clusterName, nodes, failures); + this.maxSize = maxSize; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeInt(maxSize); + } + + public SamplingService.SampleStats getSampleStats() { + SamplingService.SampleStats rawStats = getRawSampleStats(); + if (rawStats.getSamples() > maxSize) { + SamplingService.SampleStats filteredStats = new SamplingService.SampleStats().combine(rawStats); + filteredStats.adjustForMaxSize(maxSize); + return filteredStats; + } else { + return rawStats; + } + } + + private SamplingService.SampleStats getRawSampleStats() { + return getNodes().stream() + .map(NodeResponse::getSampleStats) + .filter(Objects::nonNull) + .reduce(SamplingService.SampleStats::combine) + .orElse(new SamplingService.SampleStats()); + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(GetSampleStatsAction.NodeResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeCollection(nodes); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response other = (Response) o; + return Objects.equals(getNodes(), other.getNodes()) && maxSize == other.maxSize; + } + + @Override + public int hashCode() { + return Objects.hash(getNodes(), maxSize); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return getSampleStats().toXContent(builder, params); + } + } + + public static class NodeResponse extends BaseNodeResponse { + private final SamplingService.SampleStats sampleStats; + + protected NodeResponse(StreamInput in) throws IOException { + super(in); + sampleStats = new SamplingService.SampleStats(in); + } + + protected NodeResponse(DiscoveryNode node, SamplingService.SampleStats sampleStats) { + super(node); + this.sampleStats = sampleStats; + } + + public SamplingService.SampleStats getSampleStats() { + return sampleStats; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + sampleStats.writeTo(out); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java new file mode 100644 index 0000000000000..068079ac9738b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGetSampleStatsAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_sample_stats"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/{index}/_sample/stats")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String[] indexNames = request.param("index").split(","); + if (indexNames.length > 1) { + throw new ActionRequestValidationException().addValidationError( + "Can only get samples for a single index at a time, but found " + + Arrays.stream(indexNames).collect(Collectors.joining(", ", "[", "]")) + ); + } + GetSampleStatsAction.Request getSampleStatsRequest = new GetSampleStatsAction.Request(indexNames[0]); + return channel -> client.execute(GetSampleStatsAction.INSTANCE, getSampleStatsRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java new file mode 100644 index 0000000000000..6426ef13637a4 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.NodeRequest; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.NodeResponse; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.Request; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.Response; + +public class TransportGetSampleStatsAction extends TransportNodesAction { + private final SamplingService samplingService; + private final ProjectResolver projectResolver; + + @Inject + public TransportGetSampleStatsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + SamplingService samplingService, + ProjectResolver projectResolver + ) { + super( + GetSampleStatsAction.NAME, + clusterService, + transportService, + actionFilters, + NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.samplingService = samplingService; + this.projectResolver = projectResolver; + } + + @Override + protected Response newResponse(Request request, List nodeResponses, List failures) { + SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( + projectResolver.getProjectMetadata(clusterService.state()), + request.indices()[0] + ); + int maxSamples = samplingConfiguration == null ? 0 : samplingConfiguration.maxSamples(); + return new Response(clusterService.getClusterName(), nodeResponses, failures, maxSamples); + } + + @Override + protected NodeRequest newNodeRequest(Request request) { + return new NodeRequest(request.indices()[0]); + } + + @Override + protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new NodeResponse(in); + } + + @Override + protected NodeResponse nodeOperation(NodeRequest request, Task task) { + SamplingService.SampleStats sampleStats = samplingService.getLocalSampleStats( + projectResolver.getProjectId(), + request.getIndexName() + ); + return new NodeResponse(transportService.getLocalNode(), sampleStats); + } +} diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index ec9f19a44bfae..506c2ccbb552d 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -13,11 +13,14 @@ import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -47,6 +50,7 @@ import java.util.Objects; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.function.LongSupplier; @@ -61,6 +65,7 @@ public class SamplingService implements ClusterStateListener { private final LongSupplier relativeMillisTimeSupplier; private final LongSupplier statsTimeSupplier = System::nanoTime; private final Random random; + private final AtomicBoolean firstCall = new AtomicBoolean(true); /* * This Map contains the samples that exist on this node. They are not persisted to disk. They are stored as SoftReferences so that * sampling does not contribute to a node running out of memory. The idea is that access to samples is desirable, but not critical. We @@ -132,17 +137,12 @@ private void maybeSample( return; } long startTime = statsTimeSupplier.getAsLong(); - SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); - if (samplingMetadata == null) { - return; - } - SamplingConfiguration samplingConfig = samplingMetadata.getIndexToSamplingConfigMap().get(indexName); - ProjectId projectId = projectMetadata.id(); + SamplingConfiguration samplingConfig = getSamplingConfiguration(projectMetadata, indexName); if (samplingConfig == null) { return; } SoftReference sampleInfoReference = samples.compute( - new ProjectIndex(projectId, indexName), + new ProjectIndex(projectMetadata.id(), indexName), (k, v) -> v == null || v.get() == null ? new SoftReference<>( new SampleInfo(samplingConfig.maxSamples(), samplingConfig.timeToLive(), relativeMillisTimeSupplier.getAsLong()) @@ -211,6 +211,14 @@ && evaluateCondition(ingestDocumentSupplier, sampleInfo.script, sampleInfo.facto } } + public SamplingConfiguration getSamplingConfiguration(ProjectMetadata projectMetadata, String indexName) { + SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); + if (samplingMetadata == null) { + return null; + } + return samplingMetadata.getIndexToSamplingConfigMap().get(indexName); + } + /** * Gets the sample for the given projectId and index on this node only. The sample is not persistent. * @param projectId The project that this sample is for @@ -241,6 +249,28 @@ public SampleStats getLocalSampleStats(ProjectId projectId, String index) { public boolean atLeastOneSampleConfigured() { if (RANDOM_SAMPLING_FEATURE_FLAG) { + if (firstCall.get()) { + clusterService.submitUnbatchedStateUpdateTask("blocking-task", new ClusterStateUpdateTask(Priority.IMMEDIATE) { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder( + currentState.metadata().getProject(ProjectId.DEFAULT) + ); + SamplingMetadata samplingMetadata = new SamplingMetadata( + Map.of("test", new SamplingConfiguration(1.0d, 50, null, null, null)) + ); + projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, samplingMetadata); + ClusterState newState = new ClusterState.Builder(currentState).putProjectMetadata(projectMetadataBuilder).build(); + return newState; + } + + @Override + public void onFailure(Exception e) { + assert false : e.getMessage(); + } + }); + firstCall.set(false); + } SamplingMetadata samplingMetadata = clusterService.state() .projectState(projectResolver.getProjectId()) .metadata() @@ -347,6 +377,30 @@ public static final class SampleStats implements Writeable, ToXContent { public SampleStats() {} + public SampleStats( + long samples, + long potentialSamples, + long samplesRejectedForMaxSamplesExceeded, + long samplesRejectedForCondition, + long samplesRejectedForRate, + long samplesRejectedForException, + TimeValue timeSampling, + TimeValue timeEvaluatingCondition, + TimeValue timeCompilingCondition, + Exception lastException + ) { + this.samples.add(samples); + this.potentialSamples.add(potentialSamples); + this.samplesRejectedForMaxSamplesExceeded.add(samplesRejectedForMaxSamplesExceeded); + this.samplesRejectedForCondition.add(samplesRejectedForCondition); + this.samplesRejectedForRate.add(samplesRejectedForRate); + this.samplesRejectedForException.add(samplesRejectedForException); + this.timeSampling.add(timeSampling.nanos()); + this.timeEvaluatingCondition.add(timeEvaluatingCondition.nanos()); + this.timeCompilingCondition.add(timeCompilingCondition.nanos()); + this.lastException = lastException; + } + public SampleStats(StreamInput in) throws IOException { potentialSamples.add(in.readLong()); samplesRejectedForMaxSamplesExceeded.add(in.readLong()); @@ -457,9 +511,17 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.field("samples_rejected_for_rate", samplesRejectedForRate.longValue()); builder.field("samples_rejected_for_exception", samplesRejectedForException.longValue()); builder.field("samples_accepted", samples.longValue()); - builder.field("time_sampling", (timeSampling.longValue() / 1000000)); - builder.field("time_evaluating_condition", (timeEvaluatingCondition.longValue() / 1000000)); - builder.field("time_compiling_condition", (timeCompilingCondition.longValue() / 1000000)); + builder.humanReadableField("time_sampling_millis", "time_sampling", TimeValue.timeValueNanos(timeSampling.longValue())); + builder.humanReadableField( + "time_evaluating_condition_millis", + "time_evaluating_condition", + TimeValue.timeValueNanos(timeEvaluatingCondition.longValue()) + ); + builder.humanReadableField( + "time_compiling_condition_millis", + "time_compiling_condition", + TimeValue.timeValueNanos(timeCompilingCondition.longValue()) + ); builder.endObject(); return builder; } @@ -558,6 +620,11 @@ private int hashException(Exception e) { return Objects.hash(e.getClass(), e.getMessage()); } } + + public void adjustForMaxSize(int maxSize) { + samples.add(maxSize - samples.longValue()); + samplesRejectedForMaxSamplesExceeded.add(samples.longValue() - maxSize); + } } /* diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeRequestTests.java new file mode 100644 index 0000000000000..b42952aedff90 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeRequestTests.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.NodeRequest; + +public class GetSampleStatsActionNodeRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return NodeRequest::new; + } + + @Override + protected NodeRequest createTestInstance() { + return new NodeRequest(randomIdentifier()); + } + + @Override + protected NodeRequest mutateInstance(NodeRequest instance) throws IOException { + String index = instance.indices()[0]; + index = randomValueOtherThan(index, ESTestCase::randomIdentifier); + return new NodeRequest(index); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java new file mode 100644 index 0000000000000..ddaf19195d563 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionResponseTests.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction.Response; + +public class GetSampleStatsActionResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + ClusterName clusterName = new ClusterName(randomAlphaOfLength(10)); + List nodes = randomList( + 5, + () -> new GetSampleStatsAction.NodeResponse(randomNode(), randomStats()) + ); + return new Response(clusterName, nodes, List.of(), randomIntBetween(1, 10)); + } + + @Override + protected Response mutateInstance(Response instance) throws IOException { + ClusterName clusterName = instance.getClusterName(); + List nodes = new ArrayList<>(instance.getNodes()); + List failures = new ArrayList<>(instance.failures()); + int maxSize = instance.maxSize; + switch (between(0, 1)) { + case 0 -> maxSize = randomIntBetween(101, 200); + case 1 -> { + DiscoveryNode node = randomNode(); + nodes.add(new GetSampleStatsAction.NodeResponse(node, randomStats())); + } + default -> throw new AssertionError("Illegal randomisation branch"); + } + return new Response(clusterName, nodes, failures, maxSize); + } + + private static SamplingService.SampleStats randomStats() { + return new SamplingService.SampleStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomBoolean() ? null : new ElasticsearchException("fail") + ); + } + + private static DiscoveryNode randomNode() { + return new DiscoveryNode( + randomIdentifier(), + randomIdentifier(), + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionInformation.CURRENT + ); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index d992f1b028a3c..750722795d187 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -657,6 +657,7 @@ public class Constants { "indices:admin/index/create_from_source", "indices:admin/index/copy_lifecycle_index_metadata", "internal:admin/repository/verify", - "internal:admin/repository/verify/coordinate" + "internal:admin/repository/verify/coordinate", + "indices:admin/sample/stats" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); } From 8da2d81840a92e8ce700d72cb8e208104356933b Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 13:16:07 -0500 Subject: [PATCH 02/11] removing junk code --- .../elasticsearch/ingest/SamplingService.java | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 63dbf742f40e4..39cc67c1a51f6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -13,14 +13,11 @@ import org.elasticsearch.action.admin.indices.sampling.SamplingMetadata; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -48,7 +45,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.function.LongSupplier; @@ -62,7 +58,6 @@ public class SamplingService implements ClusterStateListener { private final ProjectResolver projectResolver; private final LongSupplier relativeMillisTimeSupplier; private final LongSupplier statsTimeSupplier = System::nanoTime; - private final AtomicBoolean firstCall = new AtomicBoolean(true); /* * This Map contains the samples that exist on this node. They are not persisted to disk. They are stored as SoftReferences so that * sampling does not contribute to a node running out of memory. The idea is that access to samples is desirable, but not critical. We @@ -245,28 +240,6 @@ public SampleStats getLocalSampleStats(ProjectId projectId, String index) { public boolean atLeastOneSampleConfigured() { if (RANDOM_SAMPLING_FEATURE_FLAG) { - if (firstCall.get()) { - clusterService.submitUnbatchedStateUpdateTask("blocking-task", new ClusterStateUpdateTask(Priority.IMMEDIATE) { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder( - currentState.metadata().getProject(ProjectId.DEFAULT) - ); - SamplingMetadata samplingMetadata = new SamplingMetadata( - Map.of("test", new SamplingConfiguration(1.0d, 50, null, null, null)) - ); - projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, samplingMetadata); - ClusterState newState = new ClusterState.Builder(currentState).putProjectMetadata(projectMetadataBuilder).build(); - return newState; - } - - @Override - public void onFailure(Exception e) { - assert false : e.getMessage(); - } - }); - firstCall.set(false); - } SamplingMetadata samplingMetadata = clusterService.state() .projectState(projectResolver.getProjectId()) .metadata() From 4a76c55719b751b3c9d1e9ecaafe6855d3ca0d72 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 13:49:06 -0500 Subject: [PATCH 03/11] adding integration test --- .../indices/sampling/GetSampleActionIT.java | 32 +++++++++++++++++-- .../elasticsearch/ingest/SamplingService.java | 5 +-- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java index 904720ee70acb..6eca12f07d303 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.test.ESIntegTestCase; @@ -23,6 +24,7 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; public class GetSampleActionIT extends ESIntegTestCase { @@ -34,7 +36,8 @@ public void testGetSample() throws Exception { createIndex(indexName); // the index exists but there is no sampling configuration for it, so getting its sample will throw an exception: assertGetSampleThrowsResourceNotFoundException(indexName); - addSamplingConfig(indexName); + final int maxSamples = 30; + addSamplingConfig(indexName, maxSamples); // There is now a sampling configuration, but no data has been ingested: assertEmptySample(indexName); int docsToIndex = randomIntBetween(1, 20); @@ -49,6 +52,29 @@ public void testGetSample() throws Exception { for (int i = 0; i < docsToIndex; i++) { assertRawDocument(sample.get(i), indexName); } + + GetSampleStatsAction.Request statsRequest = new GetSampleStatsAction.Request(indexName); + GetSampleStatsAction.Response statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet(); + SamplingService.SampleStats stats = statsResponse.getSampleStats(); + assertThat(stats.getSamples(), equalTo((long) docsToIndex)); + assertThat(stats.getPotentialSamples(), equalTo((long) docsToIndex)); + assertThat(stats.getTimeSampling(), greaterThan(TimeValue.ZERO)); + assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo(0L)); + assertThat(stats.getSamplesRejectedForRate(), equalTo(0L)); + assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L)); + assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L)); + + final int samplesOverMax = randomIntBetween(1, 5); + for (int i = docsToIndex; i < maxSamples + samplesOverMax; i++) { + indexDoc(indexName, randomIdentifier(), randomAlphanumericOfLength(10), randomAlphanumericOfLength(10)); + } + statsRequest = new GetSampleStatsAction.Request(indexName); + statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet(); + stats = statsResponse.getSampleStats(); + assertThat(stats.getSamples(), equalTo((long) maxSamples)); + assertThat(stats.getPotentialSamples(), equalTo((long) maxSamples + samplesOverMax)); + assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo((long) samplesOverMax)); + } private void assertRawDocument(SamplingService.RawDocument rawDocument, String indexName) { @@ -68,7 +94,7 @@ private void assertGetSampleThrowsResourceNotFoundException(String indexName) { } @SuppressWarnings("deprecation") - private void addSamplingConfig(String indexName) throws Exception { + private void addSamplingConfig(String indexName, int maxSamples) throws Exception { /* * Note: The following code writes a sampling config directly to the cluster state. It can be replaced with a call to the action * that does this once that action exists. @@ -81,7 +107,7 @@ public ClusterState execute(ClusterState currentState) throws Exception { currentState.metadata().getProject(ProjectId.DEFAULT) ); SamplingMetadata samplingMetadata = new SamplingMetadata( - Map.of(indexName, new SamplingConfiguration(1.0d, 100, null, null, null)) + Map.of(indexName, new SamplingConfiguration(1.0d, maxSamples, null, null, null)) ); projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, samplingMetadata); ClusterState newState = new ClusterState.Builder(currentState).putProjectMetadata(projectMetadataBuilder).build(); diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 39cc67c1a51f6..9b2192a7d5056 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -591,8 +591,9 @@ private int hashException(Exception e) { } public void adjustForMaxSize(int maxSize) { - samples.add(maxSize - samples.longValue()); - samplesRejectedForMaxSamplesExceeded.add(samples.longValue() - maxSize); + long actualSamples = samples.longValue(); + samples.add(maxSize - actualSamples); + samplesRejectedForMaxSamplesExceeded.add(actualSamples - maxSize); } } From b8994cfcc845c68ec746f406ede05f40f32e8399 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 14:00:16 -0500 Subject: [PATCH 04/11] fixing unit tests --- .../indices/sampling/GetSampleStatsAction.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java index d769e3a8403b7..603e7f8023b10 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java @@ -223,5 +223,18 @@ public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); sampleStats.writeTo(out); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetSampleStatsAction.NodeResponse other = (GetSampleStatsAction.NodeResponse) o; + return getNode().equals(other.getNode()) && sampleStats.equals(other.sampleStats); + } + + @Override + public int hashCode() { + return Objects.hash(getNode(), sampleStats); + } } } From 20d70c8af0daebeb3de19b1f0cb8e32bd2f9ad61 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 14:57:20 -0500 Subject: [PATCH 05/11] adding yaml rest test --- .../api/indices.get_sample_stats.json | 31 +++++++++++++++++++ .../indices.get_sample_stats/10_basic.yml | 19 ++++++++++++ .../TransportGetSampleStatsAction.java | 11 +++++++ 3 files changed, 61 insertions(+) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_sample_stats.json create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/10_basic.yml diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_sample_stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_sample_stats.json new file mode 100644 index 0000000000000..43aa69977e8da --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/indices.get_sample_stats.json @@ -0,0 +1,31 @@ +{ + "indices.get_sample_stats": { + "documentation": { + "url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-get-sample", + "description": "Get stats about a random sample of ingested data" + }, + "stability": "experimental", + "visibility": "public", + "headers": { + "accept": [ + "application/json" + ] + }, + "url": { + "paths": [ + { + "path": "/{index}/_sample/stats", + "methods": [ + "GET" + ], + "parts": { + "index": { + "type": "string", + "description": "The name of a data stream or index" + } + } + } + ] + } + } +} diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/10_basic.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/10_basic.yml new file mode 100644 index 0000000000000..1d9297dc7f4d8 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/indices.get_sample_stats/10_basic.yml @@ -0,0 +1,19 @@ +--- +"Test get sample stats for index with no sample config": + - requires: + cluster_features: [ "random_sampling" ] + reason: requires feature 'random_sampling' to get random samples + + - do: + indices.get_sample_stats: + index: non_existent + catch: missing + + - do: + indices.create: + index: no_config + + - do: + indices.get_sample_stats: + index: no_config + catch: missing diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java index 6426ef13637a4..0346ef8a063d5 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java @@ -9,6 +9,7 @@ package org.elasticsearch.action.admin.indices.sampling; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.TransportNodesAction; @@ -55,6 +56,16 @@ public TransportGetSampleStatsAction( this.projectResolver = projectResolver; } + @Override + protected Void createActionContext(Task task, GetSampleStatsAction.Request request) { + String indexName = request.indices()[0]; + SamplingMetadata samplingMetadata = projectResolver.getProjectMetadata(clusterService.state()).custom(SamplingMetadata.TYPE); + if (samplingMetadata == null || samplingMetadata.getIndexToSamplingConfigMap().get(indexName) == null) { + throw new ResourceNotFoundException("No sampling configuration found for [" + indexName + "]"); + } + return null; + } + @Override protected Response newResponse(Request request, List nodeResponses, List failures) { SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( From 24f35bcbd60cae71382aaa89869732c7d38a7f3e Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 15:08:32 -0500 Subject: [PATCH 06/11] adding a unit test --- ...GetSampleStatsActionNodeResponseTests.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java new file mode 100644 index 0000000000000..620a15bd6e4d0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsActionNodeResponseTests.java @@ -0,0 +1,73 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.indices.sampling; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.ingest.SamplingService; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class GetSampleStatsActionNodeResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return GetSampleStatsAction.NodeResponse::new; + } + + @Override + protected GetSampleStatsAction.NodeResponse createTestInstance() { + return new GetSampleStatsAction.NodeResponse(randomNode(), randomStats()); + } + + @Override + protected GetSampleStatsAction.NodeResponse mutateInstance(GetSampleStatsAction.NodeResponse instance) throws IOException { + DiscoveryNode node = instance.getNode(); + SamplingService.SampleStats stats = instance.getSampleStats(); + if (randomBoolean()) { + node = randomValueOtherThan(node, GetSampleStatsActionNodeResponseTests::randomNode); + } else { + stats = randomStats(); + } + return new GetSampleStatsAction.NodeResponse(node, stats); + } + + private static SamplingService.SampleStats randomStats() { + return new SamplingService.SampleStats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomPositiveTimeValue(), + randomBoolean() ? null : new ElasticsearchException("fail") + ); + } + + private static DiscoveryNode randomNode() { + return new DiscoveryNode( + randomIdentifier(), + randomIdentifier(), + buildNewFakeTransportAddress(), + emptyMap(), + emptySet(), + VersionInformation.CURRENT + ); + } + +} From aa121a714680e42b06196c898a760681017adb60 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 15:35:40 -0500 Subject: [PATCH 07/11] cleanup --- .../sampling/GetSampleStatsAction.java | 8 +- .../elasticsearch/ingest/SamplingService.java | 102 +++++++++++------- .../SamplingServiceSampleStatsTests.java | 12 +-- 3 files changed, 71 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java index 603e7f8023b10..821e48017492f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java @@ -155,13 +155,7 @@ public void writeTo(StreamOutput out) throws IOException { public SamplingService.SampleStats getSampleStats() { SamplingService.SampleStats rawStats = getRawSampleStats(); - if (rawStats.getSamples() > maxSize) { - SamplingService.SampleStats filteredStats = new SamplingService.SampleStats().combine(rawStats); - filteredStats.adjustForMaxSize(maxSize); - return filteredStats; - } else { - return rawStats; - } + return rawStats.adjustForMaxSize(maxSize); } private SamplingService.SampleStats getRawSampleStats() { diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 9b2192a7d5056..6f9203b4466e4 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -173,7 +173,7 @@ private void maybeSample( sampleInfo.compilationFailed = true; throw e; } finally { - stats.timeCompilingCondition.add((statsTimeSupplier.getAsLong() - compileScriptStartTime)); + stats.timeCompilingConditionInNanos.add((statsTimeSupplier.getAsLong() - compileScriptStartTime)); } } } @@ -198,7 +198,7 @@ && evaluateCondition(ingestDocumentSupplier, sampleInfo.script, sampleInfo.facto stats.lastException = e; logger.debug("Error performing sampling for " + indexName, e); } finally { - stats.timeSampling.add((statsTimeSupplier.getAsLong() - startTime)); + stats.timeSamplingInNanos.add((statsTimeSupplier.getAsLong() - startTime)); } } @@ -264,7 +264,7 @@ private boolean evaluateCondition( long conditionStartTime = statsTimeSupplier.getAsLong(); boolean passedCondition = factory.newInstance(script.getParams(), ingestDocumentSupplier.get().getUnmodifiableSourceAndMetadata()) .execute(); - stats.timeEvaluatingCondition.add((statsTimeSupplier.getAsLong() - conditionStartTime)); + stats.timeEvaluatingConditionInNanos.add((statsTimeSupplier.getAsLong() - conditionStartTime)); return passedCondition; } @@ -339,13 +339,20 @@ public static final class SampleStats implements Writeable, ToXContent { final LongAdder samplesRejectedForCondition = new LongAdder(); final LongAdder samplesRejectedForRate = new LongAdder(); final LongAdder samplesRejectedForException = new LongAdder(); - final LongAdder timeSampling = new LongAdder(); - final LongAdder timeEvaluatingCondition = new LongAdder(); - final LongAdder timeCompilingCondition = new LongAdder(); + final LongAdder timeSamplingInNanos = new LongAdder(); + final LongAdder timeEvaluatingConditionInNanos = new LongAdder(); + final LongAdder timeCompilingConditionInNanos = new LongAdder(); Exception lastException = null; public SampleStats() {} + public SampleStats(SampleStats other) { + addAllFields(other, this); + } + + /* + * This constructor is only meant for constructing arbitrary SampleStats for testing + */ public SampleStats( long samples, long potentialSamples, @@ -364,9 +371,9 @@ public SampleStats( this.samplesRejectedForCondition.add(samplesRejectedForCondition); this.samplesRejectedForRate.add(samplesRejectedForRate); this.samplesRejectedForException.add(samplesRejectedForException); - this.timeSampling.add(timeSampling.nanos()); - this.timeEvaluatingCondition.add(timeEvaluatingCondition.nanos()); - this.timeCompilingCondition.add(timeCompilingCondition.nanos()); + this.timeSamplingInNanos.add(timeSampling.nanos()); + this.timeEvaluatingConditionInNanos.add(timeEvaluatingCondition.nanos()); + this.timeCompilingConditionInNanos.add(timeCompilingCondition.nanos()); this.lastException = lastException; } @@ -377,9 +384,9 @@ public SampleStats(StreamInput in) throws IOException { samplesRejectedForRate.add(in.readLong()); samplesRejectedForException.add(in.readLong()); samples.add(in.readLong()); - timeSampling.add(in.readLong()); - timeEvaluatingCondition.add(in.readLong()); - timeCompilingCondition.add(in.readLong()); + timeSamplingInNanos.add(in.readLong()); + timeEvaluatingConditionInNanos.add(in.readLong()); + timeCompilingConditionInNanos.add(in.readLong()); if (in.readBoolean()) { lastException = in.readException(); } else { @@ -412,15 +419,15 @@ public long getSamplesRejectedForException() { } public TimeValue getTimeSampling() { - return TimeValue.timeValueNanos(timeSampling.longValue()); + return TimeValue.timeValueNanos(timeSamplingInNanos.longValue()); } public TimeValue getTimeEvaluatingCondition() { - return TimeValue.timeValueNanos(timeEvaluatingCondition.longValue()); + return TimeValue.timeValueNanos(timeEvaluatingConditionInNanos.longValue()); } public TimeValue getTimeCompilingCondition() { - return TimeValue.timeValueNanos(timeCompilingCondition.longValue()); + return TimeValue.timeValueNanos(timeCompilingConditionInNanos.longValue()); } public Exception getLastException() { @@ -442,16 +449,15 @@ public String toString() { + ", samples_accepted: " + samples + ", time_sampling: " - + (timeSampling.longValue() / 1000000) + + TimeValue.timeValueNanos(timeSamplingInNanos.longValue()) + ", time_evaluating_condition: " - + (timeEvaluatingCondition.longValue() / 1000000) + + TimeValue.timeValueNanos(timeEvaluatingConditionInNanos.longValue()) + ", time_compiling_condition: " - + (timeCompilingCondition.longValue() / 1000000); + + TimeValue.timeValueNanos(timeCompilingConditionInNanos.longValue()); } public SampleStats combine(SampleStats other) { - SampleStats result = new SampleStats(); - addAllFields(this, result); + SampleStats result = new SampleStats(this); addAllFields(other, result); return result; } @@ -463,9 +469,9 @@ private static void addAllFields(SampleStats source, SampleStats dest) { dest.samplesRejectedForRate.add(source.samplesRejectedForRate.longValue()); dest.samplesRejectedForException.add(source.samplesRejectedForException.longValue()); dest.samples.add(source.samples.longValue()); - dest.timeSampling.add(source.timeSampling.longValue()); - dest.timeEvaluatingCondition.add(source.timeEvaluatingCondition.longValue()); - dest.timeCompilingCondition.add(source.timeCompilingCondition.longValue()); + dest.timeSamplingInNanos.add(source.timeSamplingInNanos.longValue()); + dest.timeEvaluatingConditionInNanos.add(source.timeEvaluatingConditionInNanos.longValue()); + dest.timeCompilingConditionInNanos.add(source.timeCompilingConditionInNanos.longValue()); if (dest.lastException == null) { dest.lastException = source.lastException; } @@ -480,16 +486,16 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par builder.field("samples_rejected_for_rate", samplesRejectedForRate.longValue()); builder.field("samples_rejected_for_exception", samplesRejectedForException.longValue()); builder.field("samples_accepted", samples.longValue()); - builder.humanReadableField("time_sampling_millis", "time_sampling", TimeValue.timeValueNanos(timeSampling.longValue())); + builder.humanReadableField("time_sampling_millis", "time_sampling", TimeValue.timeValueNanos(timeSamplingInNanos.longValue())); builder.humanReadableField( "time_evaluating_condition_millis", "time_evaluating_condition", - TimeValue.timeValueNanos(timeEvaluatingCondition.longValue()) + TimeValue.timeValueNanos(timeEvaluatingConditionInNanos.longValue()) ); builder.humanReadableField( "time_compiling_condition_millis", "time_compiling_condition", - TimeValue.timeValueNanos(timeCompilingCondition.longValue()) + TimeValue.timeValueNanos(timeCompilingConditionInNanos.longValue()) ); builder.endObject(); return builder; @@ -503,9 +509,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(samplesRejectedForRate.longValue()); out.writeLong(samplesRejectedForException.longValue()); out.writeLong(samples.longValue()); - out.writeLong(timeSampling.longValue()); - out.writeLong(timeEvaluatingCondition.longValue()); - out.writeLong(timeCompilingCondition.longValue()); + out.writeLong(timeSamplingInNanos.longValue()); + out.writeLong(timeEvaluatingConditionInNanos.longValue()); + out.writeLong(timeCompilingConditionInNanos.longValue()); if (lastException == null) { out.writeBoolean(false); } else { @@ -545,18 +551,23 @@ public boolean equals(Object o) { if (samplesRejectedForException.longValue() != that.samplesRejectedForException.longValue()) { return false; } - if (timeSampling.longValue() != that.timeSampling.longValue()) { + if (timeSamplingInNanos.longValue() != that.timeSamplingInNanos.longValue()) { return false; } - if (timeEvaluatingCondition.longValue() != that.timeEvaluatingCondition.longValue()) { + if (timeEvaluatingConditionInNanos.longValue() != that.timeEvaluatingConditionInNanos.longValue()) { return false; } - if (timeCompilingCondition.longValue() != that.timeCompilingCondition.longValue()) { + if (timeCompilingConditionInNanos.longValue() != that.timeCompilingConditionInNanos.longValue()) { return false; } return exceptionsAreEqual(lastException, that.lastException); } + /* + * This is used because most Exceptions do not have an equals or hashCode, and cause trouble when testing for equality in + * serialization unit tests. This method returns true if the exceptions are the same class and have the same message. This is good + * enough for serialization unit tests. + */ private boolean exceptionsAreEqual(Exception e1, Exception e2) { if (e1 == null && e2 == null) { return true; @@ -567,6 +578,10 @@ private boolean exceptionsAreEqual(Exception e1, Exception e2) { return e1.getClass().equals(e2.getClass()) && e1.getMessage().equals(e2.getMessage()); } + /* + * equals and hashCode are implemented for the sake of testing serialization. Since this class is mutable, these ought to never be + * used outside of testing. + */ @Override public int hashCode() { return Objects.hash( @@ -576,9 +591,9 @@ public int hashCode() { samplesRejectedForCondition.longValue(), samplesRejectedForRate.longValue(), samplesRejectedForException.longValue(), - timeSampling.longValue(), - timeEvaluatingCondition.longValue(), - timeCompilingCondition.longValue() + timeSamplingInNanos.longValue(), + timeEvaluatingConditionInNanos.longValue(), + timeCompilingConditionInNanos.longValue() ) + hashException(lastException); } @@ -590,10 +605,21 @@ private int hashException(Exception e) { } } - public void adjustForMaxSize(int maxSize) { + /* + * If the sample stats report more raw documents than the maximum size allowed for this sample, then this method creates a new + * cloned copy of the stats, but with the reported samples lowered to maxSize, and the reported rejected documents increased by the + * same amount. This avoids the confusing situation of the stats reporting more samples than the user has configured. + */ + public SampleStats adjustForMaxSize(int maxSize) { long actualSamples = samples.longValue(); - samples.add(maxSize - actualSamples); - samplesRejectedForMaxSamplesExceeded.add(actualSamples - maxSize); + if (actualSamples > maxSize) { + SampleStats adjusted = new SampleStats().combine(this); + adjusted.samples.add(maxSize - actualSamples); + adjusted.samplesRejectedForMaxSamplesExceeded.add(actualSamples - maxSize); + return adjusted; + } else { + return this; + } } } diff --git a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java index df8973d27ce3a..6fb3d345a3caa 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SamplingServiceSampleStatsTests.java @@ -36,9 +36,9 @@ protected SampleStats createTestInstance() { stats.samplesRejectedForCondition.add(randomReasonableLong()); stats.samplesRejectedForRate.add(randomReasonableLong()); stats.samplesRejectedForException.add(randomReasonableLong()); - stats.timeSampling.add(randomReasonableLong()); - stats.timeEvaluatingCondition.add(randomReasonableLong()); - stats.timeCompilingCondition.add(randomReasonableLong()); + stats.timeSamplingInNanos.add(randomReasonableLong()); + stats.timeEvaluatingConditionInNanos.add(randomReasonableLong()); + stats.timeCompilingConditionInNanos.add(randomReasonableLong()); stats.lastException = randomBoolean() ? null : new ElasticsearchException(randomAlphanumericOfLength(10)); return stats; } @@ -65,9 +65,9 @@ protected SampleStats mutateInstance(SampleStats instance) throws IOException { case 3 -> mutated.samplesRejectedForCondition.add(1); case 4 -> mutated.samplesRejectedForRate.add(1); case 5 -> mutated.samplesRejectedForException.add(1); - case 6 -> mutated.timeSampling.add(1); - case 7 -> mutated.timeEvaluatingCondition.add(1); - case 8 -> mutated.timeCompilingCondition.add(1); + case 6 -> mutated.timeSamplingInNanos.add(1); + case 7 -> mutated.timeEvaluatingConditionInNanos.add(1); + case 8 -> mutated.timeCompilingConditionInNanos.add(1); case 9 -> mutated.lastException = mutated.lastException == null ? new ElasticsearchException(randomAlphanumericOfLength(10)) : null; From 863e510aa85028d24a5d0940c8a05fd29dad1ac6 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 15:40:25 -0500 Subject: [PATCH 08/11] copilot-suggested changes --- .../admin/indices/sampling/GetSampleStatsAction.java | 6 ++++-- .../java/org/elasticsearch/ingest/SamplingService.java | 7 +++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java index 821e48017492f..3c2030d7723c2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java @@ -59,8 +59,10 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId, @Override public ActionRequestValidationException validate() { - if (this.indices().length != 1) { - return new ActionRequestValidationException(); + if (this.indexName.contains("*")) { + return (ActionRequestValidationException) new ActionRequestValidationException().addValidationError( + "Wildcards are not supported, but found [" + indexName + "]" + ); } return null; } diff --git a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java index 6f9203b4466e4..e6d34753e037f 100644 --- a/server/src/main/java/org/elasticsearch/ingest/SamplingService.java +++ b/server/src/main/java/org/elasticsearch/ingest/SamplingService.java @@ -202,6 +202,13 @@ && evaluateCondition(ingestDocumentSupplier, sampleInfo.script, sampleInfo.facto } } + /** + * Retrieves the sampling configuration for the specified index from the given project metadata. + * + * @param projectMetadata The project metadata containing sampling information. + * @param indexName The name of the index or data stream for which to retrieve the sampling configuration. + * @return The {@link SamplingConfiguration} for the specified index, or {@code null} if none exists. + */ public SamplingConfiguration getSamplingConfiguration(ProjectMetadata projectMetadata, String indexName) { SamplingMetadata samplingMetadata = projectMetadata.custom(SamplingMetadata.TYPE); if (samplingMetadata == null) { From 1d3db4c5c5375213619ec3ac6269eb7c8c6903c9 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 17:38:50 -0500 Subject: [PATCH 09/11] making RestGetSampleStatsAction cancellable --- .../indices/sampling/GetSampleStatsAction.java | 13 ++++++++----- .../indices/sampling/RestGetSampleStatsAction.java | 9 +++++++-- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java index 3c2030d7723c2..a8e6e2b0140a2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/GetSampleStatsAction.java @@ -22,19 +22,22 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.ingest.SamplingService; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.AbstractTransportRequest; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; +import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk; + public class GetSampleStatsAction extends ActionType { public static final GetSampleStatsAction INSTANCE = new GetSampleStatsAction(); @@ -131,7 +134,7 @@ public int hashCode() { } } - public static class Response extends BaseNodesResponse implements Writeable, ToXContentObject { + public static class Response extends BaseNodesResponse implements Writeable, ChunkedToXContent { final int maxSize; public Response(StreamInput in) throws IOException { @@ -192,8 +195,8 @@ public int hashCode() { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - return getSampleStats().toXContent(builder, params); + public Iterator toXContentChunked(ToXContent.Params params) { + return chunk(getSampleStats()); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java index 068079ac9738b..7ac8e9b01291e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java @@ -13,7 +13,8 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; import java.io.IOException; import java.util.Arrays; @@ -44,6 +45,10 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ); } GetSampleStatsAction.Request getSampleStatsRequest = new GetSampleStatsAction.Request(indexNames[0]); - return channel -> client.execute(GetSampleStatsAction.INSTANCE, getSampleStatsRequest, new RestToXContentListener<>(channel)); + return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute( + GetSampleStatsAction.INSTANCE, + getSampleStatsRequest, + new RestRefCountedChunkedToXContentListener<>(channel) + ); } } From 032d2cb14eab31b1872062c6d7c892d20a59c911 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 17:50:07 -0500 Subject: [PATCH 10/11] using SamplingService.getSamplingConfiguration() --- .../admin/indices/sampling/TransportGetSampleAction.java | 7 +++++-- .../indices/sampling/TransportGetSampleStatsAction.java | 7 +++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java index 2d25aa32ce0cc..87842ef096e33 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleAction.java @@ -63,8 +63,11 @@ public TransportGetSampleAction( @Override protected Void createActionContext(Task task, Request request) { String indexName = request.indices()[0]; - SamplingMetadata samplingMetadata = projectResolver.getProjectMetadata(clusterService.state()).custom(SamplingMetadata.TYPE); - if (samplingMetadata == null || samplingMetadata.getIndexToSamplingConfigMap().get(indexName) == null) { + SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( + projectResolver.getProjectMetadata(clusterService.state()), + request.indices()[0] + ); + if (samplingConfiguration == null) { throw new ResourceNotFoundException("No sampling configuration found for [" + indexName + "]"); } return null; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java index 0346ef8a063d5..4bd89dd2beab2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/TransportGetSampleStatsAction.java @@ -59,8 +59,11 @@ public TransportGetSampleStatsAction( @Override protected Void createActionContext(Task task, GetSampleStatsAction.Request request) { String indexName = request.indices()[0]; - SamplingMetadata samplingMetadata = projectResolver.getProjectMetadata(clusterService.state()).custom(SamplingMetadata.TYPE); - if (samplingMetadata == null || samplingMetadata.getIndexToSamplingConfigMap().get(indexName) == null) { + SamplingConfiguration samplingConfiguration = samplingService.getSamplingConfiguration( + projectResolver.getProjectMetadata(clusterService.state()), + request.indices()[0] + ); + if (samplingConfiguration == null) { throw new ResourceNotFoundException("No sampling configuration found for [" + indexName + "]"); } return null; From 04a99c4c37ca73952d6b27a05c0140dd6d39154e Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Mon, 6 Oct 2025 17:51:28 -0500 Subject: [PATCH 11/11] marking the rest action as serverless internal --- .../admin/indices/sampling/RestGetSampleStatsAction.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java index 7ac8e9b01291e..5a3c76f3781aa 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/sampling/RestGetSampleStatsAction.java @@ -13,6 +13,8 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.Scope; +import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener; @@ -23,6 +25,7 @@ import static org.elasticsearch.rest.RestRequest.Method.GET; +@ServerlessScope(Scope.INTERNAL) public class RestGetSampleStatsAction extends BaseRestHandler { @Override