diff --git a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java index 9bf86a19fe9b3..6a64ab0cac183 100644 --- a/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java +++ b/x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsIndexingRestIT.java @@ -8,17 +8,20 @@ package org.elasticsearch.xpack.oteldata.otlp; import io.opentelemetry.api.common.Attributes; -import io.opentelemetry.exporter.internal.FailedExportException; +import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; import io.opentelemetry.sdk.resources.Resource; import org.elasticsearch.client.Request; @@ -29,17 +32,29 @@ import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.ObjectPath; import org.junit.Before; import org.junit.ClassRule; +import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE; +import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA; +import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC; +import static org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.anEmptyMap; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.not; public class OTLPMetricsIndexingRestIT extends ESRestTestCase { @@ -56,6 +71,8 @@ public class OTLPMetricsIndexingRestIT extends ESRestTestCase { .user(USER, PASS, "superuser", false) .setting("xpack.security.autoconfiguration.enabled", "false") .setting("xpack.license.self_generated.type", "trial") + .setting("xpack.ml.enabled", "false") + .setting("xpack.watcher.enabled", "false") .build(); @Override @@ -98,21 +115,138 @@ public void tearDown() throws Exception { super.tearDown(); } + public void testIngestMetricViaMeterProvider() throws Exception { + Meter sampleMeter = meterProvider.get("io.opentelemetry.example.metrics"); + long totalMemory = 42; + + sampleMeter.gaugeBuilder("jvm.memory.total") + .setDescription("Reports JVM memory usage.") + .setUnit("By") + .buildWithCallback(result -> result.record(totalMemory, Attributes.empty())); + + var result = meterProvider.shutdown(); + assertThat(result.isSuccess(), is(true)); + + refreshMetricsIndices(); + + ObjectPath search = search("metrics-generic.otel-default"); + assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); + var source = search.evaluate("hits.hits.0._source"); + assertThat(ObjectPath.evaluate(source, "@timestamp"), isA(String.class)); + assertThat(ObjectPath.evaluate(source, "start_timestamp"), isA(String.class)); + assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class)); + assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory)); + assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By")); + assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); + } + public void testIngestMetricDataViaMetricExporter() throws Exception { - MetricData jvmMemoryMetricData = createDoubleGauge( - TEST_RESOURCE, - Attributes.empty(), - "jvm.memory.total", - Runtime.getRuntime().totalMemory(), - "By", - Clock.getDefault().now() + long now = Clock.getDefault().now(); + long totalMemory = 42; + MetricData jvmMemoryMetricData = createLongGauge(TEST_RESOURCE, Attributes.empty(), "jvm.memory.total", totalMemory, "By", now); + + export(List.of(jvmMemoryMetricData)); + ObjectPath search = search("metrics-generic.otel-default"); + assertThat(search.toString(), search.evaluate("hits.total.value"), equalTo(1)); + var source = search.evaluate("hits.hits.0._source"); + assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now))); + assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now))); + assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class)); + assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").longValue(), equalTo(totalMemory)); + assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By")); + assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch")); + assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics")); + } + + public void testGroupingSameGroup() throws Exception { + long now = Clock.getDefault().now(); + MetricData metric1 = createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now); + // uses an equal but not the same resource to test grouping across resourceMetrics + MetricData metric2 = createDoubleGauge(TEST_RESOURCE.toBuilder().build(), Attributes.empty(), "metric2", 42, "By", now); + + export(List.of(metric1, metric2)); + + ObjectPath path = ObjectPath.createFromResponse( + client().performRequest(new Request("GET", "metrics-generic.otel-default/_search")) + ); + assertThat(path.toString(), path.evaluate("hits.total.value"), equalTo(1)); + assertThat(path.evaluate("hits.hits.0._source.metrics"), equalTo(Map.of("metric1", 42.0, "metric2", 42.0))); + assertThat(path.evaluate("hits.hits.0._source.resource"), equalTo(Map.of("attributes", Map.of("service.name", "elasticsearch")))); + } + + public void testGroupingDifferentGroup() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now), + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now + TimeUnit.MILLISECONDS.toNanos(1)), + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "", now), + createDoubleGauge(TEST_RESOURCE, Attributes.of(stringKey("foo"), "bar"), "metric1", 42, "By", now) + ) + ); + ObjectPath path = search("metrics-generic.otel-default"); + assertThat(path.toString(), path.evaluate("hits.total.value"), equalTo(4)); + } + + public void testGauge() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "double_gauge", 42.0, "By", now), + createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now) + ) ); + Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double")); + assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge")); + assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge")); + } + + public void testCounterTemporality() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createCounter(TEST_RESOURCE, Attributes.empty(), "cumulative_counter", 42, "By", now, CUMULATIVE, MONOTONIC), + createCounter(TEST_RESOURCE, Attributes.empty(), "delta_counter", 42, "By", now, DELTA, MONOTONIC) + ) + ); + + Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter")); + assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge")); + } + + public void testCounterMonotonicity() throws Exception { + long now = Clock.getDefault().now(); + export( + List.of( + createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter", 42, "By", now, CUMULATIVE, NON_MONOTONIC), + createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter_delta", 42, "By", now, DELTA, NON_MONOTONIC) - FailedExportException.HttpExportException exception = assertThrows( - FailedExportException.HttpExportException.class, - () -> export(List.of(jvmMemoryMetricData)) + ) ); - assertThat(exception.getResponse().statusCode(), equalTo(RestStatus.NOT_IMPLEMENTED.getStatus())); + + Map metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties"); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge")); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long")); + assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge")); + } + + private static Map getMapping(String target) throws IOException { + Map mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping"))) + .evaluate(""); + assertThat(mappings, aMapWithSize(1)); + Map mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings"); + assertThat(mapping, not(anEmptyMap())); + return mapping; + } + + private static String timestampAsString(long now) { + return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(now)).toString(); } private void export(List metrics) throws Exception { @@ -124,7 +258,15 @@ private void export(List metrics) throws Exception { throw new RuntimeException("Failed to export metrics", failure); } assertThat(result.isSuccess(), is(true)); - assertOK(client().performRequest(new Request("GET", "_refresh/metrics-*"))); + refreshMetricsIndices(); + } + + private ObjectPath search(String target) throws IOException { + return ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_search"))); + } + + private static void refreshMetricsIndices() throws IOException { + assertOK(client().performRequest(new Request("GET", "metrics-*/_refresh"))); } private static MetricData createDoubleGauge( @@ -144,4 +286,62 @@ private static MetricData createDoubleGauge( ImmutableGaugeData.create(List.of(ImmutableDoublePointData.create(timeEpochNanos, timeEpochNanos, attributes, value))) ); } + + private static MetricData createLongGauge( + Resource resource, + Attributes attributes, + String name, + long value, + String unit, + long timeEpochNanos + ) { + return ImmutableMetricData.createLongGauge( + resource, + TEST_SCOPE, + name, + "Your description could be here.", + unit, + ImmutableGaugeData.create(List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value))) + ); + } + + private static MetricData createCounter( + Resource resource, + Attributes attributes, + String name, + long value, + String unit, + long timeEpochNanos, + AggregationTemporality temporality, + Monotonicity monotonicity + ) { + return ImmutableMetricData.createLongSum( + resource, + TEST_SCOPE, + name, + "Your description could be here.", + unit, + ImmutableSumData.create( + monotonicity.isMonotonic(), + temporality, + List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value)) + ) + ); + } + + // this is just to enhance readability of the createCounter calls (avoid boolean parameter) + enum Monotonicity { + MONOTONIC(true), + NON_MONOTONIC(false); + + private final boolean monotonic; + + Monotonicity(boolean monotonic) { + this.monotonic = monotonic; + } + + public boolean isMonotonic() { + return monotonic; + } + } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java index 23f46363d3060..ff16e40fbcb36 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.oteldata.otlp; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; + import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; @@ -48,8 +50,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) { @Override public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) { - RestStatus restStatus = r.getStatus(); - return new RestResponse(restStatus, "application/x-protobuf", r.getResponse()); + return new RestResponse(r.getStatus(), "application/x-protobuf", r.getResponse()); } }) ); @@ -59,7 +60,13 @@ public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) // (a request that does not carry any telemetry data) // the server SHOULD respond with success. // https://opentelemetry.io/docs/specs/otlp/#full-success-1 - return channel -> channel.sendResponse(new RestResponse(RestStatus.OK, "application/x-protobuf", new BytesArray(new byte[0]))); + return channel -> channel.sendResponse( + new RestResponse( + RestStatus.OK, + "application/x-protobuf", + new BytesArray(ExportMetricsServiceResponse.newBuilder().build().toByteArray()) + ) + ); } } diff --git a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java index 722b727fff40f..43c48942f9f14 100644 --- a/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java +++ b/x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportAction.java @@ -7,19 +7,32 @@ package org.elasticsearch.xpack.oteldata.otlp; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; + +import com.google.protobuf.MessageLite; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.CompositeIndicesRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.injection.guice.Inject; @@ -27,6 +40,11 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xpack.oteldata.otlp.datapoint.DataPointGroupingContext; +import org.elasticsearch.xpack.oteldata.otlp.docbuilder.MetricDocumentBuilder; +import org.elasticsearch.xpack.oteldata.otlp.proto.BufferedByteStringAccessor; import java.io.IOException; @@ -62,7 +80,126 @@ public OTLPMetricsTransportAction( @Override protected void doExecute(Task task, MetricsRequest request, ActionListener listener) { - listener.onResponse(new MetricsResponse(RestStatus.NOT_IMPLEMENTED, new BytesArray(new byte[0]))); + BufferedByteStringAccessor byteStringAccessor = new BufferedByteStringAccessor(); + DataPointGroupingContext context = new DataPointGroupingContext(byteStringAccessor); + try { + var metricsServiceRequest = ExportMetricsServiceRequest.parseFrom(request.exportMetricsServiceRequest.streamInput()); + context.groupDataPoints(metricsServiceRequest); + if (context.totalDataPoints() == 0) { + handleEmptyRequest(listener); + return; + } + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + MetricDocumentBuilder metricDocumentBuilder = new MetricDocumentBuilder(byteStringAccessor); + context.consume(dataPointGroup -> addIndexRequest(bulkRequestBuilder, metricDocumentBuilder, dataPointGroup)); + if (bulkRequestBuilder.numberOfActions() == 0) { + // all data points were ignored + handlePartialSuccess(listener, context); + return; + } + + bulkRequestBuilder.execute(new ActionListener<>() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures() || context.getIgnoredDataPoints() > 0) { + handlePartialSuccess(bulkItemResponses, context, listener); + } else { + handleSuccess(listener); + } + } + + @Override + public void onFailure(Exception e) { + handleFailure(listener, e, context); + } + }); + + } catch (Exception e) { + logger.error("failed to execute otlp metrics request", e); + handleFailure(listener, e, context); + } + } + + private void addIndexRequest( + BulkRequestBuilder bulkRequestBuilder, + MetricDocumentBuilder metricDocumentBuilder, + DataPointGroupingContext.DataPointGroup dataPointGroup + ) throws IOException { + try (XContentBuilder xContentBuilder = XContentFactory.cborBuilder(new BytesStreamOutput())) { + var dynamicTemplates = metricDocumentBuilder.buildMetricDocument(xContentBuilder, dataPointGroup); + bulkRequestBuilder.add( + new IndexRequest(dataPointGroup.targetIndex().index()).opType(DocWriteRequest.OpType.CREATE) + .setRequireDataStream(true) + .source(xContentBuilder) + .setDynamicTemplates(dynamicTemplates) + ); + } + } + + private static void handleSuccess(ActionListener listener) { + listener.onResponse(new MetricsResponse(RestStatus.OK, ExportMetricsServiceResponse.newBuilder().build())); + } + + private static void handleEmptyRequest(ActionListener listener) { + // If the server receives an empty request + // (a request that does not carry any telemetry data) + // the server SHOULD respond with success. + // https://opentelemetry.io/docs/specs/otlp/#full-success-1 + handleSuccess(listener); + } + + private static void handlePartialSuccess(ActionListener listener, DataPointGroupingContext context) { + // If the request is only partially accepted + // (i.e. when the server accepts only parts of the data and rejects the rest), + // the server MUST respond with HTTP 200 OK. + // https://opentelemetry.io/docs/specs/otlp/#partial-success-1 + MessageLite response = responseWithRejectedDataPoints(context.getIgnoredDataPoints(), context.getIgnoredDataPointsMessage()); + listener.onResponse(new MetricsResponse(RestStatus.OK, response)); + } + + private static void handlePartialSuccess( + BulkResponse bulkItemResponses, + DataPointGroupingContext context, + ActionListener listener + ) { + // If the request is only partially accepted + // (i.e. when the server accepts only parts of the data and rejects the rest), + // the server MUST respond with HTTP 200 OK. + // https://opentelemetry.io/docs/specs/otlp/#partial-success-1 + RestStatus status = RestStatus.OK; + int failures = 0; + for (BulkItemResponse bulkItemResponse : bulkItemResponses.getItems()) { + failures += bulkItemResponse.isFailed() ? 1 : 0; + if (bulkItemResponse.isFailed() && bulkItemResponse.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) { + // If the server receives more requests than the client is allowed or the server is overloaded, + // the server SHOULD respond with HTTP 429 Too Many Requests or HTTP 503 Service Unavailable + // and MAY include “Retry-After” header with a recommended time interval in seconds to wait before retrying. + // https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling + status = RestStatus.TOO_MANY_REQUESTS; + } + } + MessageLite response = responseWithRejectedDataPoints( + failures + context.getIgnoredDataPoints(), + bulkItemResponses.buildFailureMessage() + context.getIgnoredDataPointsMessage() + ); + listener.onResponse(new MetricsResponse(status, response)); + } + + private static void handleFailure(ActionListener listener, Exception e, DataPointGroupingContext context) { + // https://opentelemetry.io/docs/specs/otlp/#failures-1 + // If the processing of the request fails, + // the server MUST respond with appropriate HTTP 4xx or HTTP 5xx status code. + listener.onResponse( + new MetricsResponse(ExceptionsHelper.status(e), responseWithRejectedDataPoints(context.totalDataPoints(), e.getMessage())) + ); + } + + private static ExportMetricsPartialSuccess responseWithRejectedDataPoints(int rejectedDataPoints, String message) { + return ExportMetricsServiceResponse.newBuilder() + .getPartialSuccessBuilder() + .setRejectedDataPoints(rejectedDataPoints) + .setErrorMessage(message) + .build(); } public static class MetricsRequest extends ActionRequest implements CompositeIndicesRequest { @@ -87,6 +224,10 @@ public static class MetricsResponse extends ActionResponse { private final BytesReference response; private final RestStatus status; + public MetricsResponse(RestStatus status, MessageLite response) { + this(status, new BytesArray(response.toByteArray())); + } + public MetricsResponse(RestStatus status, BytesReference response) { this.response = response; this.status = status; diff --git a/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java new file mode 100644 index 0000000000000..648df30555d27 --- /dev/null +++ b/x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsTransportActionTests.java @@ -0,0 +1,183 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.oteldata.otlp; + +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsPartialSuccess; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; +import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; +import io.opentelemetry.proto.metrics.v1.Metric; + +import com.google.protobuf.InvalidProtocolBufferException; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.oteldata.otlp.OTLPMetricsTransportAction.MetricsResponse; +import org.mockito.ArgumentCaptor; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Consumer; + +import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class OTLPMetricsTransportActionTests extends ESTestCase { + + private OTLPMetricsTransportAction action; + private Client client; + + @Override + public void setUp() throws Exception { + super.setUp(); + client = mock(Client.class); + when(client.prepareBulk()).thenAnswer(invocation -> new BulkRequestBuilder(client)); + + action = new OTLPMetricsTransportAction(mock(TransportService.class), mock(ActionFilters.class), mock(ThreadPool.class), client); + } + + public void testSuccess() throws Exception { + MetricsResponse response = executeRequest(createMetricsRequest(createMetric())); + + assertThat(response.getStatus(), equalTo(RestStatus.OK)); + ExportMetricsServiceResponse metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()); + assertThat(metricsServiceResponse.hasPartialSuccess(), equalTo(false)); + } + + public void testSuccessEmptyRequest() throws Exception { + MetricsResponse response = executeRequest(createMetricsRequest()); + + assertThat(response.getStatus(), equalTo(RestStatus.OK)); + ExportMetricsServiceResponse metricsServiceResponse = ExportMetricsServiceResponse.parseFrom(response.getResponse().array()); + assertThat(metricsServiceResponse.hasPartialSuccess(), equalTo(false)); + } + + public void test429() throws Exception { + BulkItemResponse[] bulkItemResponses = new BulkItemResponse[] { + failureResponse(RestStatus.TOO_MANY_REQUESTS, "too many requests"), + successResponse() }; + MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), new BulkResponse(bulkItemResponses, 0)); + + assertThat(response.getStatus(), equalTo(RestStatus.TOO_MANY_REQUESTS)); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + assertThat( + metricsServiceResponse.getRejectedDataPoints(), + equalTo(Arrays.stream(bulkItemResponses).filter(BulkItemResponse::isFailed).count()) + ); + assertThat(metricsServiceResponse.getErrorMessage(), containsString("too many requests")); + } + + public void testPartialSuccess() throws Exception { + MetricsResponse response = executeRequest( + createMetricsRequest(createMetric()), + new BulkResponse(new BulkItemResponse[] { failureResponse(RestStatus.BAD_REQUEST, "bad request") }, 0) + ); + + assertThat(response.getStatus(), equalTo(RestStatus.OK)); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L)); + assertThat(metricsServiceResponse.getErrorMessage(), containsString("bad request")); + } + + public void testBulkError() throws Exception { + assertExceptionStatus(new IllegalArgumentException("bazinga"), RestStatus.BAD_REQUEST); + assertExceptionStatus(new IllegalStateException("bazinga"), RestStatus.INTERNAL_SERVER_ERROR); + } + + private void assertExceptionStatus(Exception exception, RestStatus restStatus) throws InvalidProtocolBufferException { + if (randomBoolean()) { + doThrow(exception).when(client).execute(any(), any(), any()); + } + MetricsResponse response = executeRequest(createMetricsRequest(createMetric()), exception); + + assertThat(response.getStatus(), equalTo(restStatus)); + ExportMetricsPartialSuccess metricsServiceResponse = ExportMetricsPartialSuccess.parseFrom(response.getResponse().array()); + assertThat(metricsServiceResponse.getRejectedDataPoints(), equalTo(1L)); + assertThat(metricsServiceResponse.getErrorMessage(), equalTo(exception.getMessage())); + } + + private MetricsResponse executeRequest(OTLPMetricsTransportAction.MetricsRequest request) { + return executeRequest(request, listener -> listener.onResponse(new BulkResponse(new BulkItemResponse[] {}, 0))); + } + + private MetricsResponse executeRequest(OTLPMetricsTransportAction.MetricsRequest request, BulkResponse bulkResponse) { + return executeRequest(request, listener -> listener.onResponse(bulkResponse)); + } + + private MetricsResponse executeRequest(OTLPMetricsTransportAction.MetricsRequest request, Exception bulkFailure) { + return executeRequest(request, listener -> listener.onFailure(bulkFailure)); + } + + private MetricsResponse executeRequest( + OTLPMetricsTransportAction.MetricsRequest request, + Consumer> bulkResponseConsumer + ) { + ArgumentCaptor> bulkResponseListener = ArgumentCaptor.captor(); + doNothing().when(client).execute(any(), any(), bulkResponseListener.capture()); + + ActionListener metricsResponseListener = mock(); + action.doExecute(null, request, metricsResponseListener); + if (bulkResponseListener.getAllValues().isEmpty() == false) { + bulkResponseConsumer.accept(bulkResponseListener.getValue()); + } + + ArgumentCaptor response = ArgumentCaptor.forClass(MetricsResponse.class); + verify(metricsResponseListener).onResponse(response.capture()); + return response.getValue(); + } + + private static OTLPMetricsTransportAction.MetricsRequest createMetricsRequest(Metric... metrics) { + return new OTLPMetricsTransportAction.MetricsRequest( + new BytesArray( + ExportMetricsServiceRequest.newBuilder() + .addResourceMetrics( + OtlpUtils.createResourceMetrics( + List.of(keyValue("service.name", "test-service")), + List.of(OtlpUtils.createScopeMetrics("test", "1.0.0", List.of(metrics))) + ) + ) + .build() + .toByteArray() + ) + ); + } + + private static Metric createMetric() { + return OtlpUtils.createGaugeMetric("test.metric", "", List.of(OtlpUtils.createDoubleDataPoint(0))); + } + + private static BulkItemResponse successResponse() { + return BulkItemResponse.success(-1, DocWriteRequest.OpType.CREATE, mock(DocWriteResponse.class)); + } + + private static BulkItemResponse failureResponse(RestStatus restStatus, String failureMessage) { + return BulkItemResponse.failure( + -1, + DocWriteRequest.OpType.CREATE, + new BulkItemResponse.Failure("index", "id", new RuntimeException(failureMessage), restStatus) + ); + } + +}