Skip to content

Commit 784d69e

Browse files
committed
Add transport action unit tests
1 parent 5c1243b commit 784d69e

File tree

3 files changed

+200
-22
lines changed

3 files changed

+200
-22
lines changed

x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import static io.opentelemetry.api.common.AttributeKey.stringKey;
4848
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
4949
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
50-
import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC;
51-
import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC;
50+
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC;
51+
import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC;
5252
import static org.hamcrest.Matchers.aMapWithSize;
5353
import static org.hamcrest.Matchers.anEmptyMap;
5454
import static org.hamcrest.Matchers.equalTo;
@@ -124,13 +124,13 @@ public void testIngestMetricViaMeterProvider() throws Exception {
124124
.setUnit("By")
125125
.buildWithCallback(result -> result.record(totalMemory, Attributes.empty()));
126126

127-
var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
127+
var result = meterProvider.shutdown();
128128
assertThat(result.isSuccess(), is(true));
129129

130130
refreshMetricsIndices();
131131

132132
ObjectPath search = search("metrics-generic.otel-default");
133-
assertThat(search.evaluate("hits.total.value"), equalTo(1));
133+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
134134
var source = search.evaluate("hits.hits.0._source");
135135
assertThat(ObjectPath.evaluate(source, "@timestamp"), isA(String.class));
136136
assertThat(ObjectPath.evaluate(source, "start_timestamp"), isA(String.class));
@@ -147,7 +147,7 @@ public void testIngestMetricDataViaMetricExporter() throws Exception {
147147

148148
export(List.of(jvmMemoryMetricData));
149149
ObjectPath search = search("metrics-generic.otel-default");
150-
assertThat(search.evaluate("hits.total.value"), equalTo(1));
150+
assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1));
151151
var source = search.evaluate("hits.hits.0._source");
152152
assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
153153
assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
@@ -169,7 +169,7 @@ public void testGroupingSameGroup() throws Exception {
169169
ObjectPath path = ObjectPath.createFromResponse(
170170
client().performRequest(new Request("GET", "metrics-generic.otel-default/_search"))
171171
);
172-
assertThat(path.evaluate("hits.total.value"), equalTo(1));
172+
assertThat(path.toString(), path.evaluate("hits.total.value"), equalTo(1));
173173
assertThat(path.evaluate("hits.hits.0._source.metrics"), equalTo(Map.of("metric1", 42.0, "metric2", 42.0)));
174174
assertThat(path.evaluate("hits.hits.0._source.resource"), equalTo(Map.of("attributes", Map.of("service.name", "elasticsearch"))));
175175
}
@@ -185,7 +185,7 @@ public void testGroupingDifferentGroup() throws Exception {
185185
)
186186
);
187187
ObjectPath path = search("metrics-generic.otel-default");
188-
assertThat(path.evaluate("hits.total.value"), equalTo(4));
188+
assertThat(path.toString(), path.evaluate("hits.total.value"), equalTo(4));
189189
}
190190

191191
public void testGauge() throws Exception {

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ protected void doExecute(Task task, MetricsRequest request, ActionListener<Metri
9393
MetricDocumentBuilder metricDocumentBuilder = new MetricDocumentBuilder(byteStringAccessor);
9494
context.consume(dataPointGroup -> addIndexRequest(bulkRequestBuilder, metricDocumentBuilder, dataPointGroup));
9595
if (bulkRequestBuilder.numberOfActions() == 0) {
96-
handleEmptyBulk(listener, context);
96+
// all data points were ignored
97+
handlePartialSuccess(listener, context);
9798
return;
9899
}
99100

@@ -127,11 +128,9 @@ private void addIndexRequest(
127128
try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) {
128129
var dynamicTemplates = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dataPointGroup);
129130
bulkRequestBuilder.add(
130-
new IndexRequest(dataPointGroup.targetIndex()).opType(DocWriteRequest.OpType.CREATE)
131+
new IndexRequest(dataPointGroup.targetIndex().index()).opType(DocWriteRequest.OpType.CREATE)
131132
.setRequireDataStream(true)
132133
.source(xContentBuilder)
133-
// TODO uncomment after https://github.com/elastic/elasticsearch/pull/132566 has been merged
134-
// .tsid(DataPointGroupTsidFunnel.forDataPointGroup(dataPointGroup).buildTsid())
135134
.setDynamicTemplates(dynamicTemplates)
136135
);
137136
}
@@ -149,17 +148,13 @@ private static void handleEmptyRequest(ActionListener<MetricsResponse> listener)
149148
handleSuccess(listener);
150149
}
151150

152-
private static void handleEmptyBulk(ActionListener<MetricsResponse> listener, DataPointGroupingContext context) {
153-
// If the processing of the request fails because the request contains data that cannot be decoded
154-
// or is otherwise invalid and such failure is permanent,
155-
// then the server MUST respond with HTTP 400 Bad Request.
156-
// https://opentelemetry.io/docs/specs/otlp/#bad-data
157-
listener.onResponse(
158-
new MetricsResponse(
159-
RestStatus.BAD_REQUEST,
160-
responseWithRejectedDataPoints(context.totalDataPoints(), context.getIgnoredDataPointsMessage())
161-
)
162-
);
151+
private static void handlePartialSuccess(ActionListener<MetricsResponse> listener, DataPointGroupingContext context) {
152+
// If the request is only partially accepted
153+
// (i.e. when the server accepts only parts of the data and rejects the rest),
154+
// the server MUST respond with HTTP 200 OK.
155+
// https://opentelemetry.io/docs/specs/otlp/#partial-success-1
156+
MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage());
157+
listener.onResponse(new MetricsResponse(RestStatus.OK, response));
163158
}
164159

165160
private static void handlePartialSuccess(
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.oteldata.otlp;
9+
10+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess;
11+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
12+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
13+
import io.opentelemetry.proto.metrics.v1.Metric;
14+
15+
import com.google.protobuf.InvalidProtocolBufferException;
16+
17+
import org.elasticsearch.action.ActionListener;
18+
import org.elasticsearch.action.DocWriteRequest;
19+
import org.elasticsearch.action.DocWriteResponse;
20+
import org.elasticsearch.action.bulk.BulkItemResponse;
21+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
22+
import org.elasticsearch.action.bulk.BulkResponse;
23+
import org.elasticsearch.action.support.ActionFilters;
24+
import org.elasticsearch.client.internal.Client;
25+
import org.elasticsearch.common.bytes.BytesArray;
26+
import org.elasticsearch.rest.RestStatus;
27+
import org.elasticsearch.test.ESTestCase;
28+
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.transport.TransportService;
30+
import org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsTransportAction.MetricsResponse;
31+
import org.mockito.ArgumentCaptor;
32+
33+
import java.util.Arrays;
34+
import java.util.List;
35+
import java.util.function.Consumer;
36+
37+
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
38+
import static org.hamcrest.Matchers.containsString;
39+
import static org.hamcrest.Matchers.equalTo;
40+
import static org.mockito.ArgumentMatchers.any;
41+
import static org.mockito.Mockito.doNothing;
42+
import static org.mockito.Mockito.doThrow;
43+
import static org.mockito.Mockito.mock;
44+
import static org.mockito.Mockito.verify;
45+
import static org.mockito.Mockito.when;
46+
47+
public class OTLPMetricsTransportActionTests extends ESTestCase {
48+
49+
private OTLPMetricsTransportAction action;
50+
private Client client;
51+
52+
@Override
53+
public void setUp() throws Exception {
54+
super.setUp();
55+
client = mock(Client.class);
56+
when(client.prepareBulk()).thenAnswer(invocation -> new BulkRequestBuilder(client));
57+
58+
action = new OTLPMetricsTransportAction(mock(TransportService.class), mock(ActionFilters.class), mock(ThreadPool.class), client);
59+
}
60+
61+
public void testSuccess() throws Exception {
62+
MetricsResponse response = executeRequest(createMetricsRequest(createMetric()));
63+
64+
assertThat(response.getStatus(), equalTo(RestStatus.OK));
65+
ExportMetricsServiceResponse metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array());
66+
assertThat(metricsServiceResponse.hasPartialSuccess(), equalTo(false));
67+
}
68+
69+
public void testSuccessEmptyRequest() throws Exception {
70+
MetricsResponse response = executeRequest(createMetricsRequest());
71+
72+
assertThat(response.getStatus(), equalTo(RestStatus.OK));
73+
ExportMetricsServiceResponse metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array());
74+
assertThat(metricsServiceResponse.hasPartialSuccess(), equalTo(false));
75+
}
76+
77+
public void test429() throws Exception {
78+
BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] {
79+
failureResponse(RestStatus.TOO_MANY_REQUESTS, "too many requests"),
80+
successResponse() };
81+
MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), new BulkResponse(bulkItemResponses, 0));
82+
83+
assertThat(response.getStatus(), equalTo(RestStatus.TOO_MANY_REQUESTS));
84+
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array());
85+
assertThat(
86+
metricsServiceResponse.getRejectedDataPoints(),
87+
equalTo(Arrays.stream(bulkItemResponses).filter(BulkItemResponse::isFailed).count())
88+
);
89+
assertThat(metricsServiceResponse.getErrorMessage(), containsString("too many requests"));
90+
}
91+
92+
public void testPartialSuccess() throws Exception {
93+
MetricsResponse response = executeRequest(
94+
createMetricsRequest(createMetric()),
95+
new BulkResponse(new BulkItemResponse[] { failureResponse(RestStatus.BAD_REQUEST, "bad request") }, 0)
96+
);
97+
98+
assertThat(response.getStatus(), equalTo(RestStatus.OK));
99+
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array());
100+
assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L));
101+
assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request"));
102+
}
103+
104+
public void testBulkError() throws Exception {
105+
assertExceptionStatus(new IllegalArgumentException("bazinga"), RestStatus.BAD_REQUEST);
106+
assertExceptionStatus(new IllegalStateException("bazinga"), RestStatus.INTERNAL_SERVER_ERROR);
107+
}
108+
109+
private void assertExceptionStatus(Exception exception, RestStatus restStatus) throws InvalidProtocolBufferException {
110+
if (randomBoolean()) {
111+
doThrow(exception).when(client).execute(any(), any(), any());
112+
}
113+
MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), exception);
114+
115+
assertThat(response.getStatus(), equalTo(restStatus));
116+
ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array());
117+
assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L));
118+
assertThat(metricsServiceResponse.getErrorMessage(), equalTo(exception.getMessage()));
119+
}
120+
121+
private MetricsResponse executeRequest(OTLPMetricsTransportAction.MetricsRequest request) {
122+
return executeRequest(request, listener -> listener.onResponse(new BulkResponse(new BulkItemResponse[] {}, 0)));
123+
}
124+
125+
private MetricsResponse executeRequest(OTLPMetricsTransportAction.MetricsRequest request, BulkResponse bulkResponse) {
126+
return executeRequest(request, listener -> listener.onResponse(bulkResponse));
127+
}
128+
129+
private MetricsResponse executeRequest(OTLPMetricsTransportAction.MetricsRequest request, Exception bulkFailure) {
130+
return executeRequest(request, listener -> listener.onFailure(bulkFailure));
131+
}
132+
133+
private MetricsResponse executeRequest(
134+
OTLPMetricsTransportAction.MetricsRequest request,
135+
Consumer<ActionListener<BulkResponse>> bulkResponseConsumer
136+
) {
137+
ArgumentCaptor<ActionListener<BulkResponse>> bulkResponseListener = ArgumentCaptor.captor();
138+
doNothing().when(client).execute(any(), any(), bulkResponseListener.capture());
139+
140+
ActionListener<MetricsResponse> metricsResponseListener = mock();
141+
action.doExecute(null, request, metricsResponseListener);
142+
if (bulkResponseListener.getAllValues().isEmpty() == false) {
143+
bulkResponseConsumer.accept(bulkResponseListener.getValue());
144+
}
145+
146+
ArgumentCaptor<MetricsResponse> response = ArgumentCaptor.forClass(MetricsResponse.class);
147+
verify(metricsResponseListener).onResponse(response.capture());
148+
return response.getValue();
149+
}
150+
151+
private static OTLPMetricsTransportAction.MetricsRequest createMetricsRequest(Metric... metrics) {
152+
return new OTLPMetricsTransportAction.MetricsRequest(
153+
new BytesArray(
154+
ExportMetricsServiceRequest.newBuilder()
155+
.addResourceMetrics(
156+
OtlpUtils.createResourceMetrics(
157+
List.of(keyValue("service.name", "test-service")),
158+
List.of(OtlpUtils.createScopeMetrics("test", "1.0.0", List.of(metrics)))
159+
)
160+
)
161+
.build()
162+
.toByteArray()
163+
)
164+
);
165+
}
166+
167+
private static Metric createMetric() {
168+
return OtlpUtils.createGaugeMetric("test.metric", "", List.of(OtlpUtils.createDoubleDataPoint(0)));
169+
}
170+
171+
private static BulkItemResponse successResponse() {
172+
return BulkItemResponse.success(-1, DocWriteRequest.OpType.CREATE, mock(DocWriteResponse.class));
173+
}
174+
175+
private static BulkItemResponse failureResponse(RestStatus restStatus, String failureMessage) {
176+
return BulkItemResponse.failure(
177+
-1,
178+
DocWriteRequest.OpType.CREATE,
179+
new BulkItemResponse.Failure("index", "id", new RuntimeException(failureMessage), restStatus)
180+
);
181+
}
182+
183+
}

0 commit comments

Comments
 (0)