Skip to content

Commit d2582d4

Browse files
committed
OTLP: rest and transport action
Indexes documents created from grouped data points and handles the http response codes according to the spec.
1 parent 8e9c63f commit d2582d4

File tree

3 files changed

+371
-17
lines changed

3 files changed

+371
-17
lines changed

x-pack/plugin/otel-data/src/javaRestTest/java/org/elasticsearch/action/otlp/OTLPMetricsIndexingRestIT.java

Lines changed: 214 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,20 @@
88
package org.elasticsearch.action.otlp;
99

1010
import io.opentelemetry.api.common.Attributes;
11-
import io.opentelemetry.exporter.internal.FailedExportException;
11+
import io.opentelemetry.api.metrics.Meter;
1212
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
1313
import io.opentelemetry.sdk.common.Clock;
1414
import io.opentelemetry.sdk.common.CompletableResultCode;
1515
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1616
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
17+
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
1718
import io.opentelemetry.sdk.metrics.data.MetricData;
1819
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
1920
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
2021
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
22+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
2123
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
24+
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
2225
import io.opentelemetry.sdk.resources.Resource;
2326

2427
import org.elasticsearch.client.Request;
@@ -29,17 +32,29 @@
2932
import org.elasticsearch.test.cluster.ElasticsearchCluster;
3033
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
3134
import org.elasticsearch.test.rest.ESRestTestCase;
35+
import org.elasticsearch.test.rest.ObjectPath;
3236
import org.junit.Before;
3337
import org.junit.ClassRule;
3438

39+
import java.io.IOException;
3540
import java.time.Duration;
41+
import java.time.Instant;
3642
import java.util.List;
43+
import java.util.Map;
3744
import java.util.concurrent.Executors;
3845
import java.util.concurrent.TimeUnit;
3946

4047
import static io.opentelemetry.api.common.AttributeKey.stringKey;
48+
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
49+
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
50+
import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC;
51+
import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC;
52+
import static org.hamcrest.Matchers.aMapWithSize;
53+
import static org.hamcrest.Matchers.anEmptyMap;
4154
import static org.hamcrest.Matchers.equalTo;
4255
import static org.hamcrest.Matchers.is;
56+
import static org.hamcrest.Matchers.isA;
57+
import static org.hamcrest.Matchers.not;
4358

4459
public class OTLPMetricsIndexingRestIT extends ESRestTestCase {
4560

@@ -56,6 +71,8 @@ public class OTLPMetricsIndexingRestIT extends ESRestTestCase {
5671
.user(USER, PASS, "superuser", false)
5772
.setting("xpack.security.autoconfiguration.enabled", "false")
5873
.setting("xpack.license.self_generated.type", "trial")
74+
.setting("xpack.security.enabled", "false")
75+
.setting("xpack.watcher.enabled", "false")
5976
.build();
6077

6178
@Override
@@ -98,21 +115,139 @@ public void tearDown() throws Exception {
98115
super.tearDown();
99116
}
100117

118+
public void testIngestMetricViaMeterProvider() throws Exception {
119+
Meter sampleMeter = meterProvider.get("io.opentelemetry.example.metrics");
120+
long totalMemory = randomLong();
121+
122+
sampleMeter.gaugeBuilder("jvm.memory.total")
123+
.setDescription("Reports JVM memory usage.")
124+
.setUnit("By")
125+
.buildWithCallback(result -> result.record(totalMemory, Attributes.empty()));
126+
127+
var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
128+
assertThat(result.isSuccess(), is(true));
129+
130+
refreshMetricsIndices();
131+
132+
ObjectPath search = search("metrics-generic.otel-default");
133+
assertThat(search.evaluate("hits.total.value"), equalTo(1));
134+
var source = search.evaluate("hits.hits.0._source");
135+
assertThat(ObjectPath.evaluate(source, "@timestamp"), isA(String.class));
136+
assertThat(ObjectPath.evaluate(source, "start_timestamp"), isA(String.class));
137+
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
138+
assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").toString(), equalTo(Long.toString(totalMemory)));
139+
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
140+
assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
141+
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
142+
}
143+
101144
public void testIngestMetricDataViaMetricExporter() throws Exception {
102-
MetricData jvmMemoryMetricData = createDoubleGauge(
103-
TEST_RESOURCE,
104-
Attributes.empty(),
105-
"jvm.memory.total",
106-
Runtime.getRuntime().totalMemory(),
107-
"By",
108-
Clock.getDefault().now()
145+
long now = Clock.getDefault().now();
146+
long totalMemory = randomLong();
147+
MetricData jvmMemoryMetricData = createLongGauge(TEST_RESOURCE, Attributes.empty(), "jvm.memory.total", totalMemory, "By", now);
148+
149+
export(List.of(jvmMemoryMetricData));
150+
ObjectPath search = search("metrics-generic.otel-default");
151+
assertThat(search.evaluate("hits.total.value"), equalTo(1));
152+
var source = search.evaluate("hits.hits.0._source");
153+
assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
154+
assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
155+
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
156+
assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").toString(), equalTo(Long.toString(totalMemory)));
157+
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
158+
assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
159+
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
160+
}
161+
162+
public void testGroupingSameGroup() throws Exception {
163+
long now = Clock.getDefault().now();
164+
MetricData metric1 = createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now);
165+
// uses an equal but not the same resource to test grouping across resourceMetrics
166+
MetricData metric2 = createDoubleGauge(TEST_RESOURCE.toBuilder().build(), Attributes.empty(), "metric2", 42, "By", now);
167+
168+
export(List.of(metric1, metric2));
169+
170+
ObjectPath path = ObjectPath.createFromResponse(
171+
client().performRequest(new Request("GET", "metrics-generic.otel-default/_search"))
172+
);
173+
assertThat(path.evaluate("hits.total.value"), equalTo(1));
174+
assertThat(path.evaluate("hits.hits.0._source.metrics"), equalTo(Map.of("metric1", 42.0, "metric2", 42.0)));
175+
assertThat(path.evaluate("hits.hits.0._source.resource"), equalTo(Map.of("attributes", Map.of("service.name", "elasticsearch"))));
176+
}
177+
178+
public void testGroupingDifferentGroup() throws Exception {
179+
long now = Clock.getDefault().now();
180+
export(
181+
List.of(
182+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now),
183+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now + TimeUnit.MILLISECONDS.toNanos(1)),
184+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "", now),
185+
createDoubleGauge(TEST_RESOURCE, Attributes.of(stringKey("foo"), "bar"), "metric1", 42, "By", now)
186+
)
187+
);
188+
ObjectPath path = search("metrics-generic.otel-default");
189+
assertThat(path.evaluate("hits.total.value"), equalTo(4));
190+
}
191+
192+
public void testGauge() throws Exception {
193+
long now = Clock.getDefault().now();
194+
export(
195+
List.of(
196+
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "double_gauge", 42.0, "By", now),
197+
createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now)
198+
)
109199
);
200+
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
201+
assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double"));
202+
assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge"));
203+
assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long"));
204+
assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge"));
205+
}
206+
207+
public void testCounterTemporality() throws Exception {
208+
long now = Clock.getDefault().now();
209+
export(
210+
List.of(
211+
createCounter(TEST_RESOURCE, Attributes.empty(), "cumulative_counter", 42, "By", now, CUMULATIVE, MONOTONIC),
212+
createCounter(TEST_RESOURCE, Attributes.empty(), "delta_counter", 42, "By", now, DELTA, MONOTONIC)
213+
)
214+
);
215+
216+
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
217+
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long"));
218+
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter"));
219+
assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long"));
220+
assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge"));
221+
}
222+
223+
public void testCounterMonotonicity() throws Exception {
224+
long now = Clock.getDefault().now();
225+
export(
226+
List.of(
227+
createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter", 42, "By", now, CUMULATIVE, NON_MONOTONIC),
228+
createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter_delta", 42, "By", now, DELTA, NON_MONOTONIC)
110229

111-
FailedExportException.HttpExportException exception = assertThrows(
112-
FailedExportException.HttpExportException.class,
113-
() -> export(List.of(jvmMemoryMetricData))
230+
)
114231
);
115-
assertThat(exception.getResponse().statusCode(), equalTo(RestStatus.NOT_IMPLEMENTED.getStatus()));
232+
233+
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
234+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long"));
235+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge"));
236+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long"));
237+
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
238+
}
239+
240+
private static Map<String, Object> getMapping(String target) throws IOException {
241+
Map<String, Object> mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping")))
242+
.evaluate("");
243+
assertThat(mappings, aMapWithSize(1));
244+
Map<String, Object> mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings");
245+
assertThat(mapping, not(anEmptyMap()));
246+
return mapping;
247+
}
248+
249+
private static String timestampAsString(long now) {
250+
return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(now)).toString();
116251
}
117252

118253
private void export(List<MetricData> metrics) throws Exception {
@@ -124,7 +259,15 @@ private void export(List<MetricData> metrics) throws Exception {
124259
throw new RuntimeException("Failed to export metrics", failure);
125260
}
126261
assertThat(result.isSuccess(), is(true));
127-
assertOK(client().performRequest(new Request("GET", "_refresh/metrics-*")));
262+
refreshMetricsIndices();
263+
}
264+
265+
private ObjectPath search(String target) throws IOException {
266+
return ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_search")));
267+
}
268+
269+
private static void refreshMetricsIndices() throws IOException {
270+
assertOK(client().performRequest(new Request("GET", "metrics-*/_refresh")));
128271
}
129272

130273
private static MetricData createDoubleGauge(
@@ -144,4 +287,62 @@ private static MetricData createDoubleGauge(
144287
ImmutableGaugeData.create(List.of(ImmutableDoublePointData.create(timeEpochNanos, timeEpochNanos, attributes, value)))
145288
);
146289
}
290+
291+
private static MetricData createLongGauge(
292+
Resource resource,
293+
Attributes attributes,
294+
String name,
295+
long value,
296+
String unit,
297+
long timeEpochNanos
298+
) {
299+
return ImmutableMetricData.createLongGauge(
300+
resource,
301+
TEST_SCOPE,
302+
name,
303+
"Your description could be here.",
304+
unit,
305+
ImmutableGaugeData.create(List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value)))
306+
);
307+
}
308+
309+
private static MetricData createCounter(
310+
Resource resource,
311+
Attributes attributes,
312+
String name,
313+
long value,
314+
String unit,
315+
long timeEpochNanos,
316+
AggregationTemporality temporality,
317+
Monotonicity monotonicity
318+
) {
319+
return ImmutableMetricData.createLongSum(
320+
resource,
321+
TEST_SCOPE,
322+
name,
323+
"Your description could be here.",
324+
unit,
325+
ImmutableSumData.create(
326+
monotonicity.isMonotonic(),
327+
temporality,
328+
List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value))
329+
)
330+
);
331+
}
332+
333+
// this is just to enhance readability of the createCounter calls (avoid boolean parameter)
334+
enum Monotonicity {
335+
MONOTONIC(true),
336+
NON_MONOTONIC(false);
337+
338+
private final boolean monotonic;
339+
340+
Monotonicity(boolean monotonic) {
341+
this.monotonic = monotonic;
342+
}
343+
344+
public boolean isMonotonic() {
345+
return monotonic;
346+
}
347+
}
147348
}

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/OTLPMetricsRestAction.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77

88
package org.elasticsearch.xpack.oteldata.otlp;
99

10+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
11+
1012
import org.elasticsearch.action.ActionListener;
1113
import org.elasticsearch.client.internal.node.NodeClient;
1214
import org.elasticsearch.common.bytes.BytesArray;
@@ -48,8 +50,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
4850
ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) {
4951
@Override
5052
public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) {
51-
RestStatus restStatus = r.getStatus();
52-
return new RestResponse(restStatus, "application/x-protobuf", r.getResponse());
53+
return new RestResponse(r.getStatus(), "application/x-protobuf", r.getResponse());
5354
}
5455
})
5556
);
@@ -59,7 +60,13 @@ public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r)
5960
// (a request that does not carry any telemetry data)
6061
// the server SHOULD respond with success.
6162
// https://opentelemetry.io/docs/specs/otlp/#full-success-1
62-
return channel -> channel.sendResponse(new RestResponse(RestStatus.OK, "application/x-protobuf", new BytesArray(new byte[0])));
63+
return channel -> channel.sendResponse(
64+
new RestResponse(
65+
RestStatus.OK,
66+
"application/x-protobuf",
67+
new BytesArray(ExportMetricsServiceResponse.newBuilder().build().toByteArray())
68+
)
69+
);
6370
}
6471

6572
}

0 commit comments

Comments
 (0)