Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"indices.get_sample_stats": {
"documentation": {
"url": "https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-get-sample",
"description": "Get stats about a random sample of ingested data"
},
"stability": "experimental",
"visibility": "public",
"headers": {
"accept": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/{index}/_sample/stats",
"methods": [
"GET"
],
"parts": {
"index": {
"type": "string",
"description": "The name of a data stream or index"
}
}
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
"Test get sample stats for index with no sample config":
- requires:
cluster_features: [ "random_sampling" ]
reason: requires feature 'random_sampling' to get random samples

- do:
indices.get_sample_stats:
index: non_existent
catch: missing

- do:
indices.create:
index: no_config

- do:
indices.get_sample_stats:
index: no_config
catch: missing
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.ingest.SamplingService;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;

public class GetSampleActionIT extends ESIntegTestCase {

Expand All @@ -34,7 +36,8 @@ public void testGetSample() throws Exception {
createIndex(indexName);
// the index exists but there is no sampling configuration for it, so getting its sample will throw an exception:
assertGetSampleThrowsResourceNotFoundException(indexName);
addSamplingConfig(indexName);
final int maxSamples = 30;
addSamplingConfig(indexName, maxSamples);
// There is now a sampling configuration, but no data has been ingested:
assertEmptySample(indexName);
int docsToIndex = randomIntBetween(1, 20);
Expand All @@ -49,6 +52,29 @@ public void testGetSample() throws Exception {
for (int i = 0; i < docsToIndex; i++) {
assertRawDocument(sample.get(i), indexName);
}

GetSampleStatsAction.Request statsRequest = new GetSampleStatsAction.Request(indexName);
GetSampleStatsAction.Response statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet();
SamplingService.SampleStats stats = statsResponse.getSampleStats();
assertThat(stats.getSamples(), equalTo((long) docsToIndex));
assertThat(stats.getPotentialSamples(), equalTo((long) docsToIndex));
assertThat(stats.getTimeSampling(), greaterThan(TimeValue.ZERO));
assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo(0L));
assertThat(stats.getSamplesRejectedForRate(), equalTo(0L));
assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L));
assertThat(stats.getSamplesRejectedForCondition(), equalTo(0L));

final int samplesOverMax = randomIntBetween(1, 5);
for (int i = docsToIndex; i < maxSamples + samplesOverMax; i++) {
indexDoc(indexName, randomIdentifier(), randomAlphanumericOfLength(10), randomAlphanumericOfLength(10));
}
statsRequest = new GetSampleStatsAction.Request(indexName);
statsResponse = client().execute(GetSampleStatsAction.INSTANCE, statsRequest).actionGet();
stats = statsResponse.getSampleStats();
assertThat(stats.getSamples(), equalTo((long) maxSamples));
assertThat(stats.getPotentialSamples(), equalTo((long) maxSamples + samplesOverMax));
assertThat(stats.getSamplesRejectedForMaxSamplesExceeded(), equalTo((long) samplesOverMax));

}

private void assertRawDocument(SamplingService.RawDocument rawDocument, String indexName) {
Expand All @@ -68,7 +94,7 @@ private void assertGetSampleThrowsResourceNotFoundException(String indexName) {
}

@SuppressWarnings("deprecation")
private void addSamplingConfig(String indexName) throws Exception {
private void addSamplingConfig(String indexName, int maxSamples) throws Exception {
/*
* Note: The following code writes a sampling config directly to the cluster state. It can be replaced with a call to the action
* that does this once that action exists.
Expand All @@ -81,7 +107,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
currentState.metadata().getProject(ProjectId.DEFAULT)
);
SamplingMetadata samplingMetadata = new SamplingMetadata(
Map.of(indexName, new SamplingConfiguration(1.0d, 100, null, null, null))
Map.of(indexName, new SamplingConfiguration(1.0d, maxSamples, null, null, null))
);
projectMetadataBuilder.putCustom(SamplingMetadata.TYPE, samplingMetadata);
ClusterState newState = new ClusterState.Builder(currentState).putProjectMetadata(projectMetadataBuilder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,11 @@
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.TransportRolloverAction;
import org.elasticsearch.action.admin.indices.sampling.GetSampleAction;
import org.elasticsearch.action.admin.indices.sampling.GetSampleStatsAction;
import org.elasticsearch.action.admin.indices.sampling.RestGetSampleAction;
import org.elasticsearch.action.admin.indices.sampling.RestGetSampleStatsAction;
import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleAction;
import org.elasticsearch.action.admin.indices.sampling.TransportGetSampleStatsAction;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
Expand Down Expand Up @@ -821,6 +824,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

if (RANDOM_SAMPLING_FEATURE_FLAG) {
actions.register(GetSampleAction.INSTANCE, TransportGetSampleAction.class);
actions.register(GetSampleStatsAction.INSTANCE, TransportGetSampleStatsAction.class);
}

return unmodifiableMap(actions.getRegistry());
Expand Down Expand Up @@ -1053,6 +1057,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster, Predicate<

if (RANDOM_SAMPLING_FEATURE_FLAG) {
registerHandler.accept(new RestGetSampleAction());
registerHandler.accept(new RestGetSampleStatsAction());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.admin.indices.sampling;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.ingest.SamplingService;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.AbstractTransportRequest;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class GetSampleStatsAction extends ActionType<GetSampleStatsAction.Response> {

public static final GetSampleStatsAction INSTANCE = new GetSampleStatsAction();
public static final String NAME = "indices:admin/sample/stats";

private GetSampleStatsAction() {
super(NAME);
}

public static class Request extends BaseNodesRequest implements IndicesRequest.Replaceable {
private String indexName;

public Request(String indexName) {
super((String[]) null);
this.indexName = indexName;
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "get sample stats", parentTaskId, headers);
}

@Override
public ActionRequestValidationException validate() {
if (this.indexName.contains("*")) {
return (ActionRequestValidationException) new ActionRequestValidationException().addValidationError(
"Wildcards are not supported, but found [" + indexName + "]"
);
}
return null;
}

@Override
public IndicesRequest indices(String... indices) {
assert indices.length == 1 : "GetSampleStatsAction only supports a single index name";
this.indexName = indices[0];
return this;
}

@Override
public String[] indices() {
return new String[] { indexName };
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS;
}
}

public static class NodeRequest extends AbstractTransportRequest implements IndicesRequest {
private final String indexName;

public NodeRequest(String indexName) {
this.indexName = indexName;
}

public NodeRequest(StreamInput in) throws IOException {
super(in);
this.indexName = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(indexName);
}

public String getIndexName() {
return indexName;
}

@Override
public String[] indices() {
return new String[] { indexName };
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.STRICT_SINGLE_INDEX_NO_EXPAND_FORBID_CLOSED_ALLOW_SELECTORS;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
NodeRequest other = (NodeRequest) o;
return Objects.equals(indexName, other.indexName);
}

@Override
public int hashCode() {
return Objects.hash(indexName);
}
}

public static class Response extends BaseNodesResponse<GetSampleStatsAction.NodeResponse> implements Writeable, ToXContentObject {
final int maxSize;

public Response(StreamInput in) throws IOException {
super(in);
maxSize = in.readInt();
}

public Response(
ClusterName clusterName,
List<GetSampleStatsAction.NodeResponse> nodes,
List<FailedNodeException> failures,
int maxSize
) {
super(clusterName, nodes, failures);
this.maxSize = maxSize;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeInt(maxSize);
}

public SamplingService.SampleStats getSampleStats() {
SamplingService.SampleStats rawStats = getRawSampleStats();
return rawStats.adjustForMaxSize(maxSize);
}

private SamplingService.SampleStats getRawSampleStats() {
return getNodes().stream()
.map(NodeResponse::getSampleStats)
.filter(Objects::nonNull)
.reduce(SamplingService.SampleStats::combine)
.orElse(new SamplingService.SampleStats());
}

@Override
protected List<GetSampleStatsAction.NodeResponse> readNodesFrom(StreamInput in) throws IOException {
return in.readCollectionAsList(GetSampleStatsAction.NodeResponse::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<GetSampleStatsAction.NodeResponse> nodes) throws IOException {
out.writeCollection(nodes);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response other = (Response) o;
return Objects.equals(getNodes(), other.getNodes()) && maxSize == other.maxSize;
}

@Override
public int hashCode() {
return Objects.hash(getNodes(), maxSize);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return getSampleStats().toXContent(builder, params);
}
}

public static class NodeResponse extends BaseNodeResponse {
private final SamplingService.SampleStats sampleStats;

protected NodeResponse(StreamInput in) throws IOException {
super(in);
sampleStats = new SamplingService.SampleStats(in);
}

protected NodeResponse(DiscoveryNode node, SamplingService.SampleStats sampleStats) {
super(node);
this.sampleStats = sampleStats;
}

public SamplingService.SampleStats getSampleStats() {
return sampleStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sampleStats.writeTo(out);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
GetSampleStatsAction.NodeResponse other = (GetSampleStatsAction.NodeResponse) o;
return getNode().equals(other.getNode()) && sampleStats.equals(other.sampleStats);
}

@Override
public int hashCode() {
return Objects.hash(getNode(), sampleStats);
}
}
}
Loading