Skip to content

Commit d6e40e3

Browse files
committed
adding files
1 parent 7e4f30d commit d6e40e3

File tree

4 files changed

+346
-0
lines changed

4 files changed

+346
-0
lines changed
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.ingest;
11+
12+
import org.elasticsearch.action.index.IndexRequest;
13+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
14+
import org.elasticsearch.common.bytes.BytesReference;
15+
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
16+
import org.elasticsearch.common.xcontent.XContentHelper;
17+
import org.elasticsearch.plugins.internal.XContentParserDecorator;
18+
import org.elasticsearch.sample.TransportPutSampleConfigAction;
19+
import org.elasticsearch.script.DynamicMap;
20+
import org.elasticsearch.script.IngestConditionalScript;
21+
import org.elasticsearch.script.Script;
22+
import org.elasticsearch.script.ScriptService;
23+
import org.elasticsearch.xcontent.XContentBuilder;
24+
import org.elasticsearch.xcontent.XContentParser;
25+
import org.elasticsearch.xcontent.XContentType;
26+
import org.elasticsearch.xcontent.json.JsonXContent;
27+
28+
import java.io.IOException;
29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
34+
import static org.elasticsearch.ingest.ConditionalProcessor.FUNCTIONS;
35+
36+
public class SamplingService {
37+
private final ScriptService scriptService;
38+
private final Map<String, List<IndexRequest>> samples = new HashMap<>();
39+
40+
public SamplingService(ScriptService scriptService) {
41+
this.scriptService = scriptService;
42+
}
43+
44+
public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest) throws IOException {
45+
maybeSample(
46+
projectMetadata,
47+
indexRequest,
48+
new IngestDocument(
49+
indexRequest.index(),
50+
indexRequest.id(),
51+
indexRequest.version(),
52+
indexRequest.routing(),
53+
indexRequest.versionType(),
54+
indexRequest.sourceAsMap(XContentParserDecorator.NOOP)
55+
)
56+
);
57+
}
58+
59+
public void maybeSample(ProjectMetadata projectMetadata, IndexRequest indexRequest, IngestDocument ingestDocument) throws IOException {
60+
TransportPutSampleConfigAction.SamplingConfigCustomMetadata samplingConfig = projectMetadata.custom(
61+
TransportPutSampleConfigAction.SamplingConfigCustomMetadata.NAME
62+
);
63+
if (samplingConfig != null) {
64+
String samplingIndex = samplingConfig.indexName;
65+
if (samplingIndex.equals(indexRequest.index())) {
66+
List<IndexRequest> samplesForIndex = samples.computeIfAbsent(samplingIndex, k -> new ArrayList<>());
67+
if (samplesForIndex.size() < samplingConfig.maxSamples) {
68+
String condition = samplingConfig.condition;
69+
if (evaluateCondition(ingestDocument, condition)) {
70+
if (Math.random() < samplingConfig.rate) {
71+
samplesForIndex.add(indexRequest);
72+
System.out.println("Sampling " + indexRequest);
73+
}
74+
}
75+
}
76+
}
77+
}
78+
}
79+
80+
public List<IndexRequest> getSamples(String index) {
81+
return samples.get(index);
82+
}
83+
84+
private boolean evaluateCondition(IngestDocument ingestDocument, String condition) throws IOException {
85+
if (condition == null) {
86+
return true;
87+
}
88+
IngestConditionalScript.Factory factory = null;// precompiledConditionalScriptFactory;
89+
Script script = getScript(condition);
90+
if (factory == null) {
91+
factory = scriptService.compile(script, IngestConditionalScript.CONTEXT);
92+
}
93+
return factory.newInstance(
94+
script.getParams(),
95+
new ConditionalProcessor.UnmodifiableIngestData(new DynamicMap(ingestDocument.getSourceAndMetadata(), FUNCTIONS))
96+
).execute();
97+
}
98+
99+
private static Script getScript(String conditional) throws IOException {
100+
if (conditional != null) {
101+
try (
102+
XContentBuilder builder = XContentBuilder.builder(JsonXContent.jsonXContent).map(Map.of("source", conditional));
103+
XContentParser parser = XContentHelper.createParserNotCompressed(
104+
LoggingDeprecationHandler.XCONTENT_PARSER_CONFIG,
105+
BytesReference.bytes(builder),
106+
XContentType.JSON
107+
)
108+
) {
109+
return Script.parse(parser);
110+
}
111+
}
112+
return null;
113+
}
114+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.sample;
11+
12+
import org.elasticsearch.action.ActionRequest;
13+
import org.elasticsearch.action.ActionRequestValidationException;
14+
import org.elasticsearch.action.ActionResponse;
15+
import org.elasticsearch.action.ActionType;
16+
import org.elasticsearch.action.IndicesRequest;
17+
import org.elasticsearch.action.index.IndexRequest;
18+
import org.elasticsearch.action.support.IndicesOptions;
19+
import org.elasticsearch.common.collect.Iterators;
20+
import org.elasticsearch.common.io.stream.StreamInput;
21+
import org.elasticsearch.common.io.stream.StreamOutput;
22+
import org.elasticsearch.common.xcontent.ChunkedToXContent;
23+
import org.elasticsearch.tasks.CancellableTask;
24+
import org.elasticsearch.tasks.Task;
25+
import org.elasticsearch.tasks.TaskId;
26+
import org.elasticsearch.xcontent.ToXContent;
27+
28+
import java.io.IOException;
29+
import java.util.Iterator;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Objects;
33+
34+
import static org.elasticsearch.common.collect.Iterators.single;
35+
import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk;
36+
37+
public class GetSampleAction extends ActionType<GetSampleAction.Response> {
38+
39+
public static final GetSampleAction INSTANCE = new GetSampleAction();
40+
public static final String NAME = "indices:admin/sample";
41+
42+
private GetSampleAction() {
43+
super(NAME);
44+
}
45+
46+
public static class Response extends ActionResponse implements ChunkedToXContent {
47+
48+
private final List<IndexRequest> samples;
49+
50+
public Response(final List<IndexRequest> samples) {
51+
this.samples = samples;
52+
}
53+
54+
public List<IndexRequest> getSamples() {
55+
return samples;
56+
}
57+
58+
@Override
59+
public void writeTo(StreamOutput out) throws IOException {
60+
out.writeCollection(samples);
61+
}
62+
63+
@Override
64+
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
65+
return Iterators.concat(
66+
chunk((builder, p) -> builder.startObject().startArray("samples")),
67+
Iterators.flatMap(samples.iterator(), sample -> single((builder, params1) -> {
68+
Map<String, Object> source = sample.sourceAsMap();
69+
builder.value(source);
70+
return builder;
71+
})),
72+
chunk((builder, p) -> builder.endArray().endObject())
73+
);
74+
}
75+
76+
@Override
77+
public boolean equals(Object o) {
78+
if (this == o) {
79+
return true;
80+
}
81+
if (o == null || getClass() != o.getClass()) {
82+
return false;
83+
}
84+
GetSampleAction.Response response = (GetSampleAction.Response) o;
85+
return samples.equals(response.samples);
86+
}
87+
88+
@Override
89+
public int hashCode() {
90+
return Objects.hash(samples);
91+
}
92+
93+
@Override
94+
public String toString() {
95+
return "Response{samples=" + samples + '}';
96+
}
97+
}
98+
99+
public static class Request extends ActionRequest implements IndicesRequest.Replaceable {
100+
private String[] names;
101+
102+
public Request(String[] names) {
103+
super();
104+
this.names = names;
105+
}
106+
107+
public Request(StreamInput in) throws IOException {
108+
super(in);
109+
this.names = in.readStringArray();
110+
}
111+
112+
@Override
113+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
114+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
115+
}
116+
117+
@Override
118+
public ActionRequestValidationException validate() {
119+
if (this.indices().length != 1) {
120+
return new ActionRequestValidationException();
121+
}
122+
return null;
123+
}
124+
125+
@Override
126+
public IndicesRequest indices(String... indices) {
127+
this.names = indices;
128+
return this;
129+
}
130+
131+
@Override
132+
public String[] indices() {
133+
return names;
134+
}
135+
136+
@Override
137+
public IndicesOptions indicesOptions() {
138+
return IndicesOptions.DEFAULT;
139+
}
140+
}
141+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.sample;
11+
12+
import org.elasticsearch.client.internal.node.NodeClient;
13+
import org.elasticsearch.rest.BaseRestHandler;
14+
import org.elasticsearch.rest.RestRequest;
15+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
16+
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
17+
18+
import java.io.IOException;
19+
import java.util.List;
20+
21+
import static org.elasticsearch.rest.RestRequest.Method.GET;
22+
23+
public class RestGetSampleAction extends BaseRestHandler {
24+
@Override
25+
public String getName() {
26+
return "get_sample";
27+
}
28+
29+
@Override
30+
public List<Route> routes() {
31+
return List.of(new Route(GET, "/_sample/{name}"));
32+
}
33+
34+
@Override
35+
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
36+
GetSampleAction.Request getSampleRequest = new GetSampleAction.Request(new String[] { request.param("name") });
37+
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
38+
GetSampleAction.INSTANCE,
39+
getSampleRequest,
40+
new RestRefCountedChunkedToXContentListener<>(channel)
41+
);
42+
}
43+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.sample;
11+
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.support.ActionFilters;
15+
import org.elasticsearch.action.support.HandledTransportAction;
16+
import org.elasticsearch.cluster.service.ClusterService;
17+
import org.elasticsearch.common.util.concurrent.EsExecutors;
18+
import org.elasticsearch.ingest.SamplingService;
19+
import org.elasticsearch.injection.guice.Inject;
20+
import org.elasticsearch.tasks.Task;
21+
import org.elasticsearch.threadpool.ThreadPool;
22+
import org.elasticsearch.transport.TransportService;
23+
24+
import java.util.List;
25+
26+
public class TransportGetSampleAction extends HandledTransportAction<GetSampleAction.Request, GetSampleAction.Response> {
27+
private final SamplingService samplingService;
28+
29+
@Inject
30+
public TransportGetSampleAction(
31+
TransportService transportService,
32+
ClusterService clusterService,
33+
ThreadPool threadPool,
34+
ActionFilters actionFilters,
35+
SamplingService samplingService
36+
) {
37+
super(GetSampleAction.NAME, transportService, actionFilters, GetSampleAction.Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
38+
this.samplingService = samplingService;
39+
}
40+
41+
@Override
42+
protected void doExecute(Task task, GetSampleAction.Request request, ActionListener<GetSampleAction.Response> listener) {
43+
String index = request.indices()[0];
44+
List<IndexRequest> samples = samplingService.getSamples(index);
45+
GetSampleAction.Response response = new GetSampleAction.Response(samples);
46+
listener.onResponse(response);
47+
}
48+
}

0 commit comments

Comments
 (0)