Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,15 @@ public class TsidBuilder {
private static final int MAX_TSID_VALUE_FIELDS = 16;
private final BufferedMurmur3Hasher murmur3Hasher = new BufferedMurmur3Hasher(0L);

private final List<Dimension> dimensions = new ArrayList<>();
private final List<Dimension> dimensions;

public TsidBuilder() {
this.dimensions = new ArrayList<>();
}

public TsidBuilder(int size) {
this.dimensions = new ArrayList<>(size);
}

public static TsidBuilder newBuilder() {
return new TsidBuilder();
Expand Down Expand Up @@ -281,6 +289,10 @@ private static int writeHash128(MurmurHash3.Hash128 hash128, byte[] buffer, int
return index;
}

public int size() {
return dimensions.size();
}

/**
* A functional interface that describes how objects of a complex type are added to a TSID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
package org.elasticsearch.action.otlp;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.internal.FailedExportException;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableDoublePointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
import io.opentelemetry.sdk.resources.Resource;

import org.elasticsearch.client.Request;
Expand All @@ -29,17 +32,29 @@
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.junit.Before;
import org.junit.ClassRule;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.CUMULATIVE;
import static io.opentelemetry.sdk.metrics.data.AggregationTemporality.DELTA;
import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.MONOTONIC;
import static org.elasticsearch.action.otlp.OTLPMetricsIndexingRestIT.Monotonicity.NON_MONOTONIC;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.not;

public class OTLPMetricsIndexingRestIT extends ESRestTestCase {

Expand All @@ -56,6 +71,8 @@ public class OTLPMetricsIndexingRestIT extends ESRestTestCase {
.user(USER, PASS, "superuser", false)
.setting("xpack.security.autoconfiguration.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("xpack.security.enabled", "false")
.setting("xpack.watcher.enabled", "false")
.build();

@Override
Expand Down Expand Up @@ -98,21 +115,136 @@ public void tearDown() throws Exception {
super.tearDown();
}

public void testIngestMetricViaMeterProvider() throws Exception {
Meter sampleMeter = meterProvider.get("io.opentelemetry.example.metrics");

sampleMeter.gaugeBuilder("jvm.memory.total")
.setDescription("Reports JVM memory usage.")
.setUnit("By")
.buildWithCallback(result -> result.record(Runtime.getRuntime().totalMemory(), Attributes.empty()));

var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
assertThat(result.isSuccess(), is(true));

refreshMetricsIndices();

ObjectPath search = ObjectPath.createFromResponse(
client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search"))
);
assertThat(search.evaluate("hits.total.value"), equalTo(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we validate the response here as in the next one?

}

public void testIngestMetricDataViaMetricExporter() throws Exception {
MetricData jvmMemoryMetricData = createDoubleGauge(
TEST_RESOURCE,
Attributes.empty(),
"jvm.memory.total",
Runtime.getRuntime().totalMemory(),
"By",
Clock.getDefault().now()
long now = Clock.getDefault().now();
long totalMemory = Runtime.getRuntime().totalMemory();
MetricData jvmMemoryMetricData = createLongGauge(TEST_RESOURCE, Attributes.empty(), "jvm.memory.total", totalMemory, "By", now);

export(List.of(jvmMemoryMetricData));
ObjectPath search = ObjectPath.createFromResponse(
client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search"))
);
assertThat(search.evaluate("hits.total.value"), equalTo(1));
var source = search.evaluate("hits.hits.0._source");
assertThat(ObjectPath.evaluate(source, "@timestamp"), equalTo(timestampAsString(now)));
assertThat(ObjectPath.evaluate(source, "start_timestamp"), equalTo(timestampAsString(now)));
assertThat(ObjectPath.evaluate(source, "_metric_names_hash"), isA(String.class));
assertThat(ObjectPath.evaluate(source, "metrics.jvm\\.memory\\.total").toString(), equalTo(Long.toString(totalMemory)));
assertThat(ObjectPath.evaluate(source, "unit"), equalTo("By"));
assertThat(ObjectPath.evaluate(source, "resource.attributes.service\\.name"), equalTo("elasticsearch"));
assertThat(ObjectPath.evaluate(source, "scope.name"), equalTo("io.opentelemetry.example.metrics"));
}

FailedExportException.HttpExportException exception = assertThrows(
FailedExportException.HttpExportException.class,
() -> export(List.of(jvmMemoryMetricData))
public void testGroupingSameGroup() throws Exception {
long now = Clock.getDefault().now();
MetricData metric1 = createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now);
// uses an equal but not the same resource to test grouping across resourceMetrics
MetricData metric2 = createDoubleGauge(TEST_RESOURCE.toBuilder().build(), Attributes.empty(), "metric2", 42, "By", now);

export(List.of(metric1, metric2));

ObjectPath path = ObjectPath.createFromResponse(
client().performRequest(new Request("GET", "metrics-generic.otel-default/_search"))
);
assertThat(exception.getResponse().statusCode(), equalTo(RestStatus.NOT_IMPLEMENTED.getStatus()));
assertThat(path.evaluate("hits.total.value"), equalTo(1));
assertThat(path.evaluate("hits.hits.0._source.metrics"), equalTo(Map.of("metric1", 42.0, "metric2", 42.0)));
assertThat(path.evaluate("hits.hits.0._source.resource"), equalTo(Map.of("attributes", Map.of("service.name", "elasticsearch"))));
}

public void testGroupingDifferentGroup() throws Exception {
long now = Clock.getDefault().now();
export(
List.of(
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now),
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "By", now + TimeUnit.MILLISECONDS.toNanos(1)),
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "metric1", 42, "", now),
createDoubleGauge(TEST_RESOURCE, Attributes.of(stringKey("foo"), "bar"), "metric1", 42, "By", now)
)
);
ObjectPath path = ObjectPath.createFromResponse(
client().performRequest(new Request("GET", "metrics-generic.otel-default" + "/_search"))
);
assertThat(path.evaluate("hits.total.value"), equalTo(4));
}

public void testGauge() throws Exception {
long now = Clock.getDefault().now();
export(
List.of(
createDoubleGauge(TEST_RESOURCE, Attributes.empty(), "double_gauge", 42.0, "By", now),
createLongGauge(TEST_RESOURCE, Attributes.empty(), "long_gauge", 42, "By", now)
)
);
Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(ObjectPath.evaluate(metrics, "double_gauge.type"), equalTo("double"));
assertThat(ObjectPath.evaluate(metrics, "double_gauge.time_series_metric"), equalTo("gauge"));
assertThat(ObjectPath.evaluate(metrics, "long_gauge.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "long_gauge.time_series_metric"), equalTo("gauge"));
}

public void testCounterTemporality() throws Exception {
long now = Clock.getDefault().now();
export(
List.of(
createCounter(TEST_RESOURCE, Attributes.empty(), "cumulative_counter", 42, "By", now, CUMULATIVE, MONOTONIC),
createCounter(TEST_RESOURCE, Attributes.empty(), "delta_counter", 42, "By", now, DELTA, MONOTONIC)
)
);

Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "cumulative_counter.time_series_metric"), equalTo("counter"));
assertThat(ObjectPath.evaluate(metrics, "delta_counter.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "delta_counter.time_series_metric"), equalTo("gauge"));
}

public void testCounterMonotonicity() throws Exception {
long now = Clock.getDefault().now();
export(
List.of(
createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter", 42, "By", now, CUMULATIVE, NON_MONOTONIC),
createCounter(TEST_RESOURCE, Attributes.empty(), "up_down_counter_delta", 42, "By", now, DELTA, NON_MONOTONIC)

)
);

Map<String, Object> metrics = ObjectPath.evaluate(getMapping("metrics-generic.otel-default"), "properties.metrics.properties");
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "up_down_counter.time_series_metric"), equalTo("gauge"));
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.type"), equalTo("long"));
assertThat(ObjectPath.evaluate(metrics, "up_down_counter_delta.time_series_metric"), equalTo("gauge"));
}

private static Map<String, Object> getMapping(String target) throws IOException {
Map<String, Object> mappings = ObjectPath.createFromResponse(client().performRequest(new Request("GET", target + "/_mapping")))
.evaluate("");
assertThat(mappings, aMapWithSize(1));
Map<String, Object> mapping = ObjectPath.evaluate(mappings.values().iterator().next(), "mappings");
assertThat(mapping, not(anEmptyMap()));
return mapping;
}

private static String timestampAsString(long now) {
return Instant.ofEpochMilli(TimeUnit.NANOSECONDS.toMillis(now)).toString();
}

private void export(List<MetricData> metrics) throws Exception {
Expand All @@ -124,7 +256,11 @@ private void export(List<MetricData> metrics) throws Exception {
throw new RuntimeException("Failed to export metrics", failure);
}
assertThat(result.isSuccess(), is(true));
assertOK(client().performRequest(new Request("GET", "_refresh/metrics-*")));
refreshMetricsIndices();
}

private static void refreshMetricsIndices() throws IOException {
assertOK(client().performRequest(new Request("GET", "metrics-*/_refresh")));
}

private static MetricData createDoubleGauge(
Expand All @@ -144,4 +280,62 @@ private static MetricData createDoubleGauge(
ImmutableGaugeData.create(List.of(ImmutableDoublePointData.create(timeEpochNanos, timeEpochNanos, attributes, value)))
);
}

private static MetricData createLongGauge(
Resource resource,
Attributes attributes,
String name,
long value,
String unit,
long timeEpochNanos
) {
return ImmutableMetricData.createLongGauge(
resource,
TEST_SCOPE,
name,
"Your description could be here.",
unit,
ImmutableGaugeData.create(List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value)))
);
}

private static MetricData createCounter(
Resource resource,
Attributes attributes,
String name,
long value,
String unit,
long timeEpochNanos,
AggregationTemporality temporality,
Monotonicity monotonicity
) {
return ImmutableMetricData.createLongSum(
resource,
TEST_SCOPE,
name,
"Your description could be here.",
unit,
ImmutableSumData.create(
monotonicity.isMonotonic(),
temporality,
List.of(ImmutableLongPointData.create(timeEpochNanos, timeEpochNanos, attributes, value))
)
);
}

// this is just to enhance readability of the createCounter calls (avoid boolean parameter)
enum Monotonicity {
MONOTONIC(true),
NON_MONOTONIC(false);

private final boolean monotonic;

Monotonicity(boolean monotonic) {
this.monotonic = monotonic;
}

public boolean isMonotonic() {
return monotonic;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.elasticsearch.xpack.oteldata.otlp;

import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -48,8 +50,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
ActionListener.releaseBefore(request.content(), new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r) {
RestStatus restStatus = r.getStatus();
return new RestResponse(restStatus, "application/x-protobuf", r.getResponse());
return new RestResponse(r.getStatus(), "application/x-protobuf", r.getResponse());
}
})
);
Expand All @@ -59,7 +60,13 @@ public RestResponse buildResponse(OTLPMetricsTransportAction.MetricsResponse r)
// (a request that does not carry any telemetry data)
// the server SHOULD respond with success.
// https://opentelemetry.io/docs/specs/otlp/#full-success-1
return channel -> channel.sendResponse(new RestResponse(RestStatus.OK, "application/x-protobuf", new BytesArray(new byte[0])));
return channel -> channel.sendResponse(
new RestResponse(
RestStatus.OK,
"application/x-protobuf",
new BytesArray(ExportMetricsServiceResponse.newBuilder().build().toByteArray())
)
);
}

}
Loading