Skip to content

Commit f474779

Browse files
authored
Random sampling get stats (#136040)
1 parent d3d013e commit f474779

File tree

14 files changed

+805
-52
lines changed

14 files changed

+805
-52
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"indices.get_sample_stats": {
3+
"documentation": {
4+
"url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-get-sample",
5+
"description": "Get stats about a random sample of ingested data"
6+
},
7+
"stability": "experimental",
8+
"visibility": "public",
9+
"headers": {
10+
"accept": [
11+
"application/json"
12+
]
13+
},
14+
"url": {
15+
"paths": [
16+
{
17+
"path": "/{index}/_sample/stats",
18+
"methods": [
19+
"GET"
20+
],
21+
"parts": {
22+
"index": {
23+
"type": "string",
24+
"description": "The name of a data stream or index"
25+
}
26+
}
27+
}
28+
]
29+
}
30+
}
31+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
"Test get sample stats for index with no sample config":
3+
- requires:
4+
cluster_features: [ "random_sampling" ]
5+
reason: requires feature 'random_sampling' to get random samples
6+
7+
- do:
8+
indices.get_sample_stats:
9+
index: non_existent
10+
catch: missing
11+
12+
- do:
13+
indices.create:
14+
index: no_config
15+
16+
- do:
17+
indices.get_sample_stats:
18+
index: no_config
19+
catch: missing

server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/sampling/GetSampleActionIT.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@
1616
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1717
import org.elasticsearch.cluster.service.ClusterService;
1818
import org.elasticsearch.common.Priority;
19+
import org.elasticsearch.core.TimeValue;
1920
import org.elasticsearch.ingest.SamplingService;
2021
import org.elasticsearch.test.ESIntegTestCase;
2122

2223
import java.util.List;
2324
import java.util.Map;
2425

2526
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.greaterThan;
2628

2729
public class GetSampleActionIT extends ESIntegTestCase {
2830

@@ -34,7 +36,8 @@ public void testGetSample() throws Exception {
3436
createIndex(indexName);
3537
// the index exists but there is no sampling configuration for it, so getting its sample will throw an exception:
3638
assertGetSampleThrowsResourceNotFoundException(indexName);
37-
addSamplingConfig(indexName);
39+
final int maxSamples = 30;
40+
addSamplingConfig(indexName, maxSamples);
3841
// There is now a sampling configuration, but no data has been ingested:
3942
assertEmptySample(indexName);
4043
int docsToIndex = randomIntBetween(1, 20);
@@ -49,6 +52,29 @@ public void testGetSample() throws Exception {
4952
for (int i = 0; i < docsToIndex; i++) {
5053
assertRawDocument(sample.get(i), indexName);
5154
}
55+
56+
GetSampleStatsAction.Request statsRequest = new GetSampleStatsAction.Request(indexName);
57+
GetSampleStatsAction.Response statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet();
58+
SamplingService.SampleStats stats = statsResponse.getSampleStats();
59+
assertThat(stats.getSamples(), equalTo((long) docsToIndex));
60+
assertThat(stats.getPotentialSamples(), equalTo((long) docsToIndex));
61+
assertThat(stats.getTimeSampling(), greaterThan(TimeValue.ZERO));
62+
assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo(0L));
63+
assertThat(stats.getSamplesRejectedForRate(), equalTo(0L));
64+
assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L));
65+
assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L));
66+
67+
final int samplesOverMax = randomIntBetween(1, 5);
68+
for (int i = docsToIndex; i < maxSamples + samplesOverMax; i++) {
69+
indexDoc(indexName, randomIdentifier(), randomAlphanumericOfLength(10), randomAlphanumericOfLength(10));
70+
}
71+
statsRequest = new GetSampleStatsAction.Request(indexName);
72+
statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet();
73+
stats = statsResponse.getSampleStats();
74+
assertThat(stats.getSamples(), equalTo((long) maxSamples));
75+
assertThat(stats.getPotentialSamples(), equalTo((long) maxSamples + samplesOverMax));
76+
assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo((long) samplesOverMax));
77+
5278
}
5379

5480
private void assertRawDocument(SamplingService.RawDocument rawDocument, String indexName) {
@@ -68,7 +94,7 @@ private void assertGetSampleThrowsResourceNotFoundException(String indexName) {
6894
}
6995

7096
@SuppressWarnings("deprecation")
71-
private void addSamplingConfig(String indexName) throws Exception {
97+
private void addSamplingConfig(String indexName, int maxSamples) throws Exception {
7298
/*
7399
* Note: The following code writes a sampling config directly to the cluster state. It can be replaced with a call to the action
74100
* that does this once that action exists.
@@ -81,7 +107,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
81107
currentState.metadata().getProject(ProjectId.DEFAULT)
82108
);
83109
SamplingMetadata samplingMetadata = new SamplingMetadata(
84-
Map.of(indexName, new SamplingConfiguration(1.0d, 100, null, null, null))
110+
Map.of(indexName, new SamplingConfiguration(1.0d, maxSamples, null, null, null))
85111
);
86112
projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, samplingMetadata);
87113
ClusterState newState = new ClusterState.Builder(currentState).putProjectMetadata(projectMetadataBuilder).build();

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,11 @@
130130
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
131131
import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction;
132132
import org.elasticsearch.action.admin.indices.sampling.GetSampleAction;
133+
import org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction;
133134
import org.elasticsearch.action.admin.indices.sampling.RestGetSampleAction;
135+
import org.elasticsearch.action.admin.indices.sampling.RestGetSampleStatsAction;
134136
import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleAction;
137+
import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleStatsAction;
135138
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
136139
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
137140
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@@ -821,6 +824,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
821824

822825
if (RANDOM_SAMPLING_FEATURE_FLAG) {
823826
actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class);
827+
actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class);
824828
}
825829

826830
return unmodifiableMap(actions.getRegistry());
@@ -1053,6 +1057,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
10531057

10541058
if (RANDOM_SAMPLING_FEATURE_FLAG) {
10551059
registerHandler.accept(new RestGetSampleAction());
1060+
registerHandler.accept(new RestGetSampleStatsAction());
10561061
}
10571062
}
10581063

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.indices.sampling;
11+
12+
import org.elasticsearch.action.ActionRequestValidationException;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.FailedNodeException;
15+
import org.elasticsearch.action.IndicesRequest;
16+
import org.elasticsearch.action.support.IndicesOptions;
17+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
18+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
19+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
20+
import org.elasticsearch.cluster.ClusterName;
21+
import org.elasticsearch.cluster.node.DiscoveryNode;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.io.stream.Writeable;
25+
import org.elasticsearch.common.xcontent.ChunkedToXContent;
26+
import org.elasticsearch.ingest.SamplingService;
27+
import org.elasticsearch.tasks.CancellableTask;
28+
import org.elasticsearch.tasks.Task;
29+
import org.elasticsearch.tasks.TaskId;
30+
import org.elasticsearch.transport.AbstractTransportRequest;
31+
import org.elasticsearch.xcontent.ToXContent;
32+
33+
import java.io.IOException;
34+
import java.util.Iterator;
35+
import java.util.List;
36+
import java.util.Map;
37+
import java.util.Objects;
38+
39+
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk;
40+
41+
public class GetSampleStatsAction extends ActionType<GetSampleStatsAction.Response> {
42+
43+
public static final GetSampleStatsAction INSTANCE = new GetSampleStatsAction();
44+
public static final String NAME = "indices:admin/sample/stats";
45+
46+
private GetSampleStatsAction() {
47+
super(NAME);
48+
}
49+
50+
public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable {
51+
private String indexName;
52+
53+
public Request(String indexName) {
54+
super((String[]) null);
55+
this.indexName = indexName;
56+
}
57+
58+
@Override
59+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
60+
return new CancellableTask(id, type, action, "get sample stats", parentTaskId, headers);
61+
}
62+
63+
@Override
64+
public ActionRequestValidationException validate() {
65+
if (this.indexName.contains("*")) {
66+
return (ActionRequestValidationException) new ActionRequestValidationException().addValidationError(
67+
"Wildcards are not supported, but found [" + indexName + "]"
68+
);
69+
}
70+
return null;
71+
}
72+
73+
@Override
74+
public IndicesRequest indices(String... indices) {
75+
assert indices.length == 1 : "GetSampleStatsAction only supports a single index name";
76+
this.indexName = indices[0];
77+
return this;
78+
}
79+
80+
@Override
81+
public String[] indices() {
82+
return new String[] { indexName };
83+
}
84+
85+
@Override
86+
public IndicesOptions indicesOptions() {
87+
return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS;
88+
}
89+
}
90+
91+
public static class NodeRequest extends AbstractTransportRequest implements IndicesRequest {
92+
private final String indexName;
93+
94+
public NodeRequest(String indexName) {
95+
this.indexName = indexName;
96+
}
97+
98+
public NodeRequest(StreamInput in) throws IOException {
99+
super(in);
100+
this.indexName = in.readString();
101+
}
102+
103+
@Override
104+
public void writeTo(StreamOutput out) throws IOException {
105+
super.writeTo(out);
106+
out.writeString(indexName);
107+
}
108+
109+
public String getIndexName() {
110+
return indexName;
111+
}
112+
113+
@Override
114+
public String[] indices() {
115+
return new String[] { indexName };
116+
}
117+
118+
@Override
119+
public IndicesOptions indicesOptions() {
120+
return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS;
121+
}
122+
123+
@Override
124+
public boolean equals(Object o) {
125+
if (this == o) return true;
126+
if (o == null || getClass() != o.getClass()) return false;
127+
NodeRequest other = (NodeRequest) o;
128+
return Objects.equals(indexName, other.indexName);
129+
}
130+
131+
@Override
132+
public int hashCode() {
133+
return Objects.hash(indexName);
134+
}
135+
}
136+
137+
public static class Response extends BaseNodesResponse<GetSampleStatsAction.NodeResponse> implements Writeable, ChunkedToXContent {
138+
final int maxSize;
139+
140+
public Response(StreamInput in) throws IOException {
141+
super(in);
142+
maxSize = in.readInt();
143+
}
144+
145+
public Response(
146+
ClusterName clusterName,
147+
List<GetSampleStatsAction.NodeResponse> nodes,
148+
List<FailedNodeException> failures,
149+
int maxSize
150+
) {
151+
super(clusterName, nodes, failures);
152+
this.maxSize = maxSize;
153+
}
154+
155+
@Override
156+
public void writeTo(StreamOutput out) throws IOException {
157+
super.writeTo(out);
158+
out.writeInt(maxSize);
159+
}
160+
161+
public SamplingService.SampleStats getSampleStats() {
162+
SamplingService.SampleStats rawStats = getRawSampleStats();
163+
return rawStats.adjustForMaxSize(maxSize);
164+
}
165+
166+
private SamplingService.SampleStats getRawSampleStats() {
167+
return getNodes().stream()
168+
.map(NodeResponse::getSampleStats)
169+
.filter(Objects::nonNull)
170+
.reduce(SamplingService.SampleStats::combine)
171+
.orElse(new SamplingService.SampleStats());
172+
}
173+
174+
@Override
175+
protected List<GetSampleStatsAction.NodeResponse> readNodesFrom(StreamInput in) throws IOException {
176+
return in.readCollectionAsList(GetSampleStatsAction.NodeResponse::new);
177+
}
178+
179+
@Override
180+
protected void writeNodesTo(StreamOutput out, List<GetSampleStatsAction.NodeResponse> nodes) throws IOException {
181+
out.writeCollection(nodes);
182+
}
183+
184+
@Override
185+
public boolean equals(Object o) {
186+
if (this == o) return true;
187+
if (o == null || getClass() != o.getClass()) return false;
188+
Response other = (Response) o;
189+
return Objects.equals(getNodes(), other.getNodes()) && maxSize == other.maxSize;
190+
}
191+
192+
@Override
193+
public int hashCode() {
194+
return Objects.hash(getNodes(), maxSize);
195+
}
196+
197+
@Override
198+
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
199+
return chunk(getSampleStats());
200+
}
201+
}
202+
203+
public static class NodeResponse extends BaseNodeResponse {
204+
private final SamplingService.SampleStats sampleStats;
205+
206+
protected NodeResponse(StreamInput in) throws IOException {
207+
super(in);
208+
sampleStats = new SamplingService.SampleStats(in);
209+
}
210+
211+
protected NodeResponse(DiscoveryNode node, SamplingService.SampleStats sampleStats) {
212+
super(node);
213+
this.sampleStats = sampleStats;
214+
}
215+
216+
public SamplingService.SampleStats getSampleStats() {
217+
return sampleStats;
218+
}
219+
220+
@Override
221+
public void writeTo(StreamOutput out) throws IOException {
222+
super.writeTo(out);
223+
sampleStats.writeTo(out);
224+
}
225+
226+
@Override
227+
public boolean equals(Object o) {
228+
if (this == o) return true;
229+
if (o == null || getClass() != o.getClass()) return false;
230+
GetSampleStatsAction.NodeResponse other = (GetSampleStatsAction.NodeResponse) o;
231+
return getNode().equals(other.getNode()) && sampleStats.equals(other.sampleStats);
232+
}
233+
234+
@Override
235+
public int hashCode() {
236+
return Objects.hash(getNode(), sampleStats);
237+
}
238+
}
239+
}

0 commit comments

Comments
 (0)