Skip to content

Commit ed90ec3

Browse files
committed
Introduce transport action and inject stuff into it
1 parent ee09edf commit ed90ec3

File tree

3 files changed

+124
-18
lines changed

3 files changed

+124
-18
lines changed

x-pack/plugin/metrics/src/main/java/org/elasticsearch/xpack/metrics/MetricsPlugin.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.metrics;
99

10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionResponse;
1012
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1113
import org.elasticsearch.cluster.node.DiscoveryNodes;
1214
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -26,11 +28,6 @@
2628
import java.util.function.Supplier;
2729

2830
public class MetricsPlugin extends Plugin implements ActionPlugin {
29-
@Override
30-
public Collection<?> createComponents(PluginServices services) {
31-
return super.createComponents(services);
32-
}
33-
3431
@Override
3532
public Collection<RestHandler> getRestHandlers(
3633
Settings settings,
@@ -45,4 +42,9 @@ public Collection<RestHandler> getRestHandlers(
4542
) {
4643
return List.of(new MetricsRestAction());
4744
}
45+
46+
@Override
47+
public Collection<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
48+
return List.of(new ActionHandler<>(MetricsTransportAction.TYPE, MetricsTransportAction.class));
49+
}
4850
}

x-pack/plugin/metrics/src/main/java/org/elasticsearch/xpack/metrics/MetricsRestAction.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,17 @@
77

88
package org.elasticsearch.xpack.metrics;
99

10-
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
1110
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
1211

1312
import com.google.protobuf.CodedOutputStream;
1413

15-
import org.apache.log4j.Logger;
1614
import org.elasticsearch.client.internal.node.NodeClient;
1715
import org.elasticsearch.common.bytes.BytesReference;
1816
import org.elasticsearch.rest.BaseRestHandler;
1917
import org.elasticsearch.rest.RestRequest;
2018
import org.elasticsearch.rest.RestResponse;
2119
import org.elasticsearch.rest.RestStatus;
20+
import org.elasticsearch.rest.action.RestResponseListener;
2221

2322
import java.io.IOException;
2423
import java.nio.ByteBuffer;
@@ -27,8 +26,6 @@
2726
import static org.elasticsearch.rest.RestRequest.Method.POST;
2827

2928
public class MetricsRestAction extends BaseRestHandler {
30-
private static final Logger logger = Logger.getLogger(MetricsRestAction.class);
31-
3229
@Override
3330
public String getName() {
3431
return "metrics_action";
@@ -48,17 +45,24 @@ public boolean mediaTypesValid(RestRequest request) {
4845
@Override
4946
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
5047
if (request.hasContent()) {
51-
var metricsServiceRequest = ExportMetricsServiceRequest.parseFrom(request.content().streamInput());
52-
53-
logger.info("Received " + metricsServiceRequest.getResourceMetricsCount() + " metrics");
48+
var transportRequest = new MetricsTransportAction.MetricsRequest(request.param("index"), request.content());
49+
return channel -> client.execute(MetricsTransportAction.TYPE, transportRequest, new RestResponseListener<>(channel) {
50+
@Override
51+
public RestResponse buildResponse(MetricsTransportAction.MetricsResponse r) throws Exception {
52+
return successResponse();
53+
}
54+
});
5455
}
5556

56-
return channel -> {
57-
var response = ExportMetricsServiceResponse.newBuilder().build();
58-
var responseBytes = ByteBuffer.allocate(response.getSerializedSize());
59-
response.writeTo(CodedOutputStream.newInstance(responseBytes));
57+
// according to spec empty requests are successful
58+
return channel -> { channel.sendResponse(successResponse()); };
59+
}
60+
61+
private RestResponse successResponse() throws IOException {
62+
var response = ExportMetricsServiceResponse.newBuilder().build();
63+
var responseBytes = ByteBuffer.allocate(response.getSerializedSize());
64+
response.writeTo(CodedOutputStream.newInstance(responseBytes));
6065

61-
channel.sendResponse(new RestResponse(RestStatus.OK, "application/x-protobuf", BytesReference.fromByteBuffer(responseBytes)));
62-
};
66+
return new RestResponse(RestStatus.OK, "application/x-protobuf", BytesReference.fromByteBuffer(responseBytes));
6367
}
6468
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.metrics;
9+
10+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
11+
12+
import org.apache.logging.log4j.LogManager;
13+
import org.apache.logging.log4j.Logger;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.ActionRequest;
16+
import org.elasticsearch.action.ActionRequestValidationException;
17+
import org.elasticsearch.action.ActionResponse;
18+
import org.elasticsearch.action.ActionType;
19+
import org.elasticsearch.action.support.ActionFilters;
20+
import org.elasticsearch.action.support.HandledTransportAction;
21+
import org.elasticsearch.cluster.project.ProjectResolver;
22+
import org.elasticsearch.common.bytes.BytesReference;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.common.util.concurrent.EsExecutors;
26+
import org.elasticsearch.indices.IndicesService;
27+
import org.elasticsearch.injection.guice.Inject;
28+
import org.elasticsearch.tasks.Task;
29+
import org.elasticsearch.transport.TransportService;
30+
31+
import java.io.IOException;
32+
33+
public class MetricsTransportAction extends HandledTransportAction<
34+
MetricsTransportAction.MetricsRequest,
35+
MetricsTransportAction.MetricsResponse> {
36+
37+
public static final String NAME = "indices:data/write/metrics";
38+
public static final ActionType<MetricsTransportAction.MetricsResponse> TYPE = new ActionType<>(NAME);
39+
40+
private static final Logger logger = LogManager.getLogger(MetricsTransportAction.class);
41+
42+
private final ProjectResolver projectResolver;
43+
private final IndicesService indicesService;
44+
45+
@Inject
46+
public MetricsTransportAction(
47+
TransportService transportService,
48+
ActionFilters actionFilters,
49+
ProjectResolver projectResolver,
50+
IndicesService indicesService
51+
) {
52+
super(NAME, transportService, actionFilters, MetricsRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
53+
this.projectResolver = projectResolver;
54+
this.indicesService = indicesService;
55+
}
56+
57+
@Override
58+
protected void doExecute(Task task, MetricsRequest request, ActionListener<MetricsResponse> listener) {
59+
try {
60+
var metricsServiceRequest = ExportMetricsServiceRequest.parseFrom(request.exportMetricsServiceRequest.streamInput());
61+
62+
logger.info("Received " + metricsServiceRequest.getResourceMetricsCount() + " metrics");
63+
64+
// resolve index somehow
65+
logger.info("Indices service " + indicesService);
66+
67+
listener.onResponse(new MetricsResponse());
68+
} catch (Exception e) {
69+
listener.onFailure(e);
70+
}
71+
}
72+
73+
public static class MetricsRequest extends ActionRequest {
74+
private String index;
75+
private BytesReference exportMetricsServiceRequest;
76+
77+
public MetricsRequest(StreamInput in) throws IOException {
78+
super(in);
79+
index = in.readString();
80+
exportMetricsServiceRequest = in.readBytesReference();
81+
}
82+
83+
public MetricsRequest(String index, BytesReference exportMetricsServiceRequest) {
84+
this.index = index;
85+
this.exportMetricsServiceRequest = exportMetricsServiceRequest;
86+
}
87+
88+
@Override
89+
public ActionRequestValidationException validate() {
90+
return null;
91+
}
92+
}
93+
94+
public static class MetricsResponse extends ActionResponse {
95+
@Override
96+
public void writeTo(StreamOutput out) throws IOException {
97+
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)