Skip to content

Commit 5771f4c

Browse files
committed
Adding an API to get random sampling stats
1 parent fa1b376 commit 5771f4c

File tree

8 files changed

+577
-11
lines changed

8 files changed

+577
-11
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@
129129
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
130130
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
131131
import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction;
132+
import org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction;
133+
import org.elasticsearch.action.admin.indices.sampling.RestGetSampleStatsAction;
134+
import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleStatsAction;
132135
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
133136
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
134137
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@@ -427,6 +430,7 @@
427430
import java.util.stream.Stream;
428431

429432
import static java.util.Collections.unmodifiableMap;
433+
import static org.elasticsearch.ingest.SamplingService.RANDOM_SAMPLING_FEATURE_FLAG;
430434

431435
/**
432436
* Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}.
@@ -815,6 +819,10 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
815819
actions.register(GetSynonymRuleAction.INSTANCE, TransportGetSynonymRuleAction.class);
816820
actions.register(DeleteSynonymRuleAction.INSTANCE, TransportDeleteSynonymRuleAction.class);
817821

822+
if (RANDOM_SAMPLING_FEATURE_FLAG) {
823+
actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class);
824+
}
825+
818826
return unmodifiableMap(actions.getRegistry());
819827
}
820828

@@ -1042,6 +1050,10 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
10421050
registerHandler.accept(new RestPutSynonymRuleAction());
10431051
registerHandler.accept(new RestGetSynonymRuleAction());
10441052
registerHandler.accept(new RestDeleteSynonymRuleAction());
1053+
1054+
if (RANDOM_SAMPLING_FEATURE_FLAG) {
1055+
registerHandler.accept(new RestGetSampleStatsAction());
1056+
}
10451057
}
10461058

10471059
@Override
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.indices.sampling;
11+
12+
import org.elasticsearch.action.ActionRequestValidationException;
13+
import org.elasticsearch.action.ActionType;
14+
import org.elasticsearch.action.FailedNodeException;
15+
import org.elasticsearch.action.IndicesRequest;
16+
import org.elasticsearch.action.support.IndicesOptions;
17+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
18+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
19+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
20+
import org.elasticsearch.cluster.ClusterName;
21+
import org.elasticsearch.cluster.node.DiscoveryNode;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.io.stream.Writeable;
25+
import org.elasticsearch.ingest.SamplingService;
26+
import org.elasticsearch.tasks.CancellableTask;
27+
import org.elasticsearch.tasks.Task;
28+
import org.elasticsearch.tasks.TaskId;
29+
import org.elasticsearch.transport.AbstractTransportRequest;
30+
import org.elasticsearch.xcontent.ToXContentObject;
31+
import org.elasticsearch.xcontent.XContentBuilder;
32+
33+
import java.io.IOException;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Objects;
37+
38+
public class GetSampleStatsAction extends ActionType<GetSampleStatsAction.Response> {
39+
40+
public static final GetSampleStatsAction INSTANCE = new GetSampleStatsAction();
41+
public static final String NAME = "indices:admin/sample/stats";
42+
43+
private GetSampleStatsAction() {
44+
super(NAME);
45+
}
46+
47+
public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable {
48+
private String indexName;
49+
50+
public Request(String indexName) {
51+
super((String[]) null);
52+
this.indexName = indexName;
53+
}
54+
55+
@Override
56+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
57+
return new CancellableTask(id, type, action, "get sample stats", parentTaskId, headers);
58+
}
59+
60+
@Override
61+
public ActionRequestValidationException validate() {
62+
if (this.indices().length != 1) {
63+
return new ActionRequestValidationException();
64+
}
65+
return null;
66+
}
67+
68+
@Override
69+
public IndicesRequest indices(String... indices) {
70+
assert indices.length == 1 : "GetSampleStatsAction only supports a single index name";
71+
this.indexName = indices[0];
72+
return this;
73+
}
74+
75+
@Override
76+
public String[] indices() {
77+
return new String[] { indexName };
78+
}
79+
80+
@Override
81+
public IndicesOptions indicesOptions() {
82+
return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS;
83+
}
84+
}
85+
86+
public static class NodeRequest extends AbstractTransportRequest implements IndicesRequest {
87+
private final String indexName;
88+
89+
public NodeRequest(String indexName) {
90+
this.indexName = indexName;
91+
}
92+
93+
public NodeRequest(StreamInput in) throws IOException {
94+
super(in);
95+
this.indexName = in.readString();
96+
}
97+
98+
@Override
99+
public void writeTo(StreamOutput out) throws IOException {
100+
super.writeTo(out);
101+
out.writeString(indexName);
102+
}
103+
104+
public String getIndexName() {
105+
return indexName;
106+
}
107+
108+
@Override
109+
public String[] indices() {
110+
return new String[] { indexName };
111+
}
112+
113+
@Override
114+
public IndicesOptions indicesOptions() {
115+
return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS;
116+
}
117+
118+
@Override
119+
public boolean equals(Object o) {
120+
if (this == o) return true;
121+
if (o == null || getClass() != o.getClass()) return false;
122+
NodeRequest other = (NodeRequest) o;
123+
return Objects.equals(indexName, other.indexName);
124+
}
125+
126+
@Override
127+
public int hashCode() {
128+
return Objects.hash(indexName);
129+
}
130+
}
131+
132+
public static class Response extends BaseNodesResponse<GetSampleStatsAction.NodeResponse> implements Writeable, ToXContentObject {
133+
final int maxSize;
134+
135+
public Response(StreamInput in) throws IOException {
136+
super(in);
137+
maxSize = in.readInt();
138+
}
139+
140+
public Response(
141+
ClusterName clusterName,
142+
List<GetSampleStatsAction.NodeResponse> nodes,
143+
List<FailedNodeException> failures,
144+
int maxSize
145+
) {
146+
super(clusterName, nodes, failures);
147+
this.maxSize = maxSize;
148+
}
149+
150+
@Override
151+
public void writeTo(StreamOutput out) throws IOException {
152+
super.writeTo(out);
153+
out.writeInt(maxSize);
154+
}
155+
156+
public SamplingService.SampleStats getSampleStats() {
157+
SamplingService.SampleStats rawStats = getRawSampleStats();
158+
if (rawStats.getSamples() > maxSize) {
159+
SamplingService.SampleStats filteredStats = new SamplingService.SampleStats().combine(rawStats);
160+
filteredStats.adjustForMaxSize(maxSize);
161+
return filteredStats;
162+
} else {
163+
return rawStats;
164+
}
165+
}
166+
167+
private SamplingService.SampleStats getRawSampleStats() {
168+
return getNodes().stream()
169+
.map(NodeResponse::getSampleStats)
170+
.filter(Objects::nonNull)
171+
.reduce(SamplingService.SampleStats::combine)
172+
.orElse(new SamplingService.SampleStats());
173+
}
174+
175+
@Override
176+
protected List<GetSampleStatsAction.NodeResponse> readNodesFrom(StreamInput in) throws IOException {
177+
return in.readCollectionAsList(GetSampleStatsAction.NodeResponse::new);
178+
}
179+
180+
@Override
181+
protected void writeNodesTo(StreamOutput out, List<GetSampleStatsAction.NodeResponse> nodes) throws IOException {
182+
out.writeCollection(nodes);
183+
}
184+
185+
@Override
186+
public boolean equals(Object o) {
187+
if (this == o) return true;
188+
if (o == null || getClass() != o.getClass()) return false;
189+
Response other = (Response) o;
190+
return Objects.equals(getNodes(), other.getNodes()) && maxSize == other.maxSize;
191+
}
192+
193+
@Override
194+
public int hashCode() {
195+
return Objects.hash(getNodes(), maxSize);
196+
}
197+
198+
@Override
199+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
200+
return getSampleStats().toXContent(builder, params);
201+
}
202+
}
203+
204+
public static class NodeResponse extends BaseNodeResponse {
205+
private final SamplingService.SampleStats sampleStats;
206+
207+
protected NodeResponse(StreamInput in) throws IOException {
208+
super(in);
209+
sampleStats = new SamplingService.SampleStats(in);
210+
}
211+
212+
protected NodeResponse(DiscoveryNode node, SamplingService.SampleStats sampleStats) {
213+
super(node);
214+
this.sampleStats = sampleStats;
215+
}
216+
217+
public SamplingService.SampleStats getSampleStats() {
218+
return sampleStats;
219+
}
220+
221+
@Override
222+
public void writeTo(StreamOutput out) throws IOException {
223+
super.writeTo(out);
224+
sampleStats.writeTo(out);
225+
}
226+
}
227+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.indices.sampling;
11+
12+
import org.elasticsearch.action.ActionRequestValidationException;
13+
import org.elasticsearch.client.internal.node.NodeClient;
14+
import org.elasticsearch.rest.BaseRestHandler;
15+
import org.elasticsearch.rest.RestRequest;
16+
import org.elasticsearch.rest.action.RestToXContentListener;
17+
18+
import java.io.IOException;
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.stream.Collectors;
22+
23+
import static org.elasticsearch.rest.RestRequest.Method.GET;
24+
25+
public class RestGetSampleStatsAction extends BaseRestHandler {
26+
27+
@Override
28+
public String getName() {
29+
return "get_sample_stats";
30+
}
31+
32+
@Override
33+
public List<Route> routes() {
34+
return List.of(new Route(GET, "/{index}/_sample/stats"));
35+
}
36+
37+
@Override
38+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
39+
String[] indexNames = request.param("index").split(",");
40+
if (indexNames.length > 1) {
41+
throw new ActionRequestValidationException().addValidationError(
42+
"Can only get samples for a single index at a time, but found "
43+
+ Arrays.stream(indexNames).collect(Collectors.joining(", ", "[", "]"))
44+
);
45+
}
46+
GetSampleStatsAction.Request getSampleStatsRequest = new GetSampleStatsAction.Request(indexNames[0]);
47+
return channel -> client.execute(GetSampleStatsAction.INSTANCE, getSampleStatsRequest, new RestToXContentListener<>(channel));
48+
}
49+
}

0 commit comments

Comments
 (0)