Skip to content

Commit 663dc9b

Browse files
authored
Get action for random sampling (#135640)
1 parent c1cce66 commit 663dc9b

File tree

13 files changed

+779
-7
lines changed

13 files changed

+779
-7
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"indices.get_sample": {
3+
"documentation": {
4+
"url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-get-sample",
5+
"description": "Get random sample of ingested data"
6+
},
7+
"stability": "experimental",
8+
"visibility": "public",
9+
"headers": {
10+
"accept": [
11+
"application/json"
12+
]
13+
},
14+
"url": {
15+
"paths": [
16+
{
17+
"path": "/{index}/_sample",
18+
"methods": [
19+
"GET"
20+
],
21+
"parts": {
22+
"index": {
23+
"type": "string",
24+
"description": "The name of a data stream or index"
25+
}
26+
}
27+
}
28+
]
29+
}
30+
}
31+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
"Test get sample for index with no sample config":
3+
- requires:
4+
cluster_features: [ "random_sampling" ]
5+
reason: requires feature 'random_sampling' to get random samples
6+
7+
- do:
8+
indices.get_sample:
9+
index: non_existent
10+
catch: missing
11+
12+
- do:
13+
indices.create:
14+
index: no_config
15+
16+
- do:
17+
indices.get_sample:
18+
index: no_config
19+
catch: missing
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.admin.indices.sampling;
11+
12+
import org.elasticsearch.ResourceNotFoundException;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
15+
import org.elasticsearch.cluster.metadata.ProjectId;
16+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
17+
import org.elasticsearch.cluster.service.ClusterService;
18+
import org.elasticsearch.common.Priority;
19+
import org.elasticsearch.ingest.SamplingService;
20+
import org.elasticsearch.test.ESIntegTestCase;
21+
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
import static org.hamcrest.Matchers.equalTo;
26+
27+
public class GetSampleActionIT extends ESIntegTestCase {
28+
29+
public void testGetSample() throws Exception {
30+
assumeTrue("Requires the sampling feature flag to be enabled", SamplingService.RANDOM_SAMPLING_FEATURE_FLAG);
31+
String indexName = randomIdentifier();
32+
// the index doesn't exist, so getting its sample will throw an exception:
33+
assertGetSampleThrowsResourceNotFoundException(indexName);
34+
createIndex(indexName);
35+
// the index exists but there is no sampling configuration for it, so getting its sample will throw an exception:
36+
assertGetSampleThrowsResourceNotFoundException(indexName);
37+
addSamplingConfig(indexName);
38+
// There is now a sampling configuration, but no data has been ingested:
39+
assertEmptySample(indexName);
40+
int docsToIndex = randomIntBetween(1, 20);
41+
for (int i = 0; i < docsToIndex; i++) {
42+
indexDoc(indexName, randomIdentifier(), randomAlphanumericOfLength(10), randomAlphanumericOfLength(10));
43+
}
44+
GetSampleAction.Request request = new GetSampleAction.Request(indexName);
45+
GetSampleAction.Response response = client().execute(GetSampleAction.INSTANCE, request).actionGet();
46+
List<SamplingService.RawDocument> sample = response.getSample();
47+
// The sampling config created by addSamplingConfig samples at 100%, so we expect everything to be sampled:
48+
assertThat(sample.size(), equalTo(docsToIndex));
49+
for (int i = 0; i < docsToIndex; i++) {
50+
assertRawDocument(sample.get(i), indexName);
51+
}
52+
}
53+
54+
private void assertRawDocument(SamplingService.RawDocument rawDocument, String indexName) {
55+
assertThat(rawDocument.indexName(), equalTo(indexName));
56+
}
57+
58+
private void assertEmptySample(String indexName) {
59+
GetSampleAction.Request request = new GetSampleAction.Request(indexName);
60+
GetSampleAction.Response response = client().execute(GetSampleAction.INSTANCE, request).actionGet();
61+
List<SamplingService.RawDocument> sample = response.getSample();
62+
assertThat(sample, equalTo(List.of()));
63+
}
64+
65+
private void assertGetSampleThrowsResourceNotFoundException(String indexName) {
66+
GetSampleAction.Request request = new GetSampleAction.Request(indexName);
67+
assertThrows(ResourceNotFoundException.class, () -> client().execute(GetSampleAction.INSTANCE, request).actionGet());
68+
}
69+
70+
@SuppressWarnings("deprecation")
71+
private void addSamplingConfig(String indexName) throws Exception {
72+
/*
73+
* Note: The following code writes a sampling config directly to the cluster state. It can be replaced with a call to the action
74+
* that does this once that action exists.
75+
*/
76+
final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
77+
clusterService.submitUnbatchedStateUpdateTask("blocking-task", new ClusterStateUpdateTask(Priority.IMMEDIATE) {
78+
@Override
79+
public ClusterState execute(ClusterState currentState) throws Exception {
80+
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(
81+
currentState.metadata().getProject(ProjectId.DEFAULT)
82+
);
83+
SamplingMetadata samplingMetadata = new SamplingMetadata(
84+
Map.of(indexName, new SamplingConfiguration(1.0d, 100, null, null, null))
85+
);
86+
projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, samplingMetadata);
87+
ClusterState newState = new ClusterState.Builder(currentState).putProjectMetadata(projectMetadataBuilder).build();
88+
return newState;
89+
}
90+
91+
@Override
92+
public void onFailure(Exception e) {
93+
assert false : e.getMessage();
94+
}
95+
});
96+
awaitClusterState(state -> {
97+
SamplingMetadata samplingMetadata = clusterService.state()
98+
.metadata()
99+
.getProject(ProjectId.DEFAULT)
100+
.custom(SamplingMetadata.TYPE);
101+
return samplingMetadata != null && samplingMetadata.getIndexToSamplingConfigMap().get(indexName) != null;
102+
});
103+
}
104+
}

server/src/main/java/org/elasticsearch/action/ActionModule.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@
129129
import org.elasticsearch.action.admin.indices.rollover.LazyRolloverAction;
130130
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
131131
import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction;
132+
import org.elasticsearch.action.admin.indices.sampling.GetSampleAction;
133+
import org.elasticsearch.action.admin.indices.sampling.RestGetSampleAction;
134+
import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleAction;
132135
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
133136
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
134137
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
@@ -427,6 +430,7 @@
427430
import java.util.stream.Stream;
428431

429432
import static java.util.Collections.unmodifiableMap;
433+
import static org.elasticsearch.ingest.SamplingService.RANDOM_SAMPLING_FEATURE_FLAG;
430434

431435
/**
432436
* Builds and binds the generic action map, all {@link TransportAction}s, and {@link ActionFilters}.
@@ -815,6 +819,10 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
815819
actions.register(GetSynonymRuleAction.INSTANCE, TransportGetSynonymRuleAction.class);
816820
actions.register(DeleteSynonymRuleAction.INSTANCE, TransportDeleteSynonymRuleAction.class);
817821

822+
if (RANDOM_SAMPLING_FEATURE_FLAG) {
823+
actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class);
824+
}
825+
818826
return unmodifiableMap(actions.getRegistry());
819827
}
820828

@@ -1042,6 +1050,10 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<
10421050
registerHandler.accept(new RestPutSynonymRuleAction());
10431051
registerHandler.accept(new RestGetSynonymRuleAction());
10441052
registerHandler.accept(new RestDeleteSynonymRuleAction());
1053+
1054+
if (RANDOM_SAMPLING_FEATURE_FLAG) {
1055+
registerHandler.accept(new RestGetSampleAction());
1056+
}
10451057
}
10461058

10471059
@Override

0 commit comments

Comments
 (0)