Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ public void addDataPoint(DataPoint dataPoint) {
ignoredDataPoints++;
return;
}
getOrCreateDataPointGroup(dataPoint).addDataPoint(dataPoint);
DataPointGroup dataPointGroup = getOrCreateDataPointGroup(dataPoint);
if (dataPointGroup.addDataPoint(ignoredDataPointMessages, dataPoint) == false) {
ignoredDataPoints++;
}
}

private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
Expand All @@ -207,7 +210,6 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
scopeSchemaUrl,
dataPoint.getAttributes(),
dataPoint.getUnit(),
new ArrayList<>(),
targetIndex
);
dataPointGroups.put(dataPointGroupHash, dataPointGroup);
Expand All @@ -233,7 +235,8 @@ public static final class DataPointGroup {
private final ByteString scopeSchemaUrl;
private final List<KeyValue> dataPointAttributes;
private final String unit;
private final List<DataPoint> dataPoints;
private final Set<String> metricNames = new HashSet<>();
private final List<DataPoint> dataPoints = new ArrayList<>();
private final String targetIndex;
private String metricNamesHash;

Expand All @@ -244,7 +247,6 @@ public DataPointGroup(
ByteString scopeSchemaUrl,
List<KeyValue> dataPointAttributes,
String unit,
List<DataPoint> dataPoints,
String targetIndex
) {
this.resource = resource;
Expand All @@ -253,7 +255,6 @@ public DataPointGroup(
this.scopeSchemaUrl = scopeSchemaUrl;
this.dataPointAttributes = dataPointAttributes;
this.unit = unit;
this.dataPoints = dataPoints;
this.targetIndex = targetIndex;
}

Expand All @@ -276,9 +277,17 @@ public String getMetricNamesHash(BufferedMurmur3Hasher hasher) {
return metricNamesHash;
}

public void addDataPoint(DataPoint dataPoint) {
public boolean addDataPoint(Set<String> ignoredDataPointMessages, DataPoint dataPoint) {
metricNamesHash = null; // reset the hash when adding a new data point
dataPoints.add(dataPoint);
if (metricNames.add(dataPoint.getMetricName()) == false) {
ignoredDataPointMessages.add(
"Duplicate metric name '" + dataPoint.getMetricName() + "' for timestamp " + getTimestampUnixNano()
);
return false;
} else {
dataPoints.add(dataPoint);
return true;
}
}

public Resource resource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
import static org.hamcrest.Matchers.containsString;

public class DataPointGroupingContextTests extends ESTestCase {

Expand Down Expand Up @@ -74,6 +75,39 @@ public void testGroupingDifferentGroupUnit() throws Exception {
assertEquals(2, groupCount.get());
}

public void testGroupingDuplicateNameSameTimeSeries() throws Exception {
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
List.of(
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos))),
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createLongDataPoint(nowUnixNanos)))
)
);
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(1, context.getIgnoredDataPoints());
assertThat(context.getIgnoredDataPointsMessage(), containsString("Duplicate metric name 'system.cpu.usage' for timestamp"));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
assertEquals(1, groupCount.get());
}

public void testGroupingDuplicateNameDifferentTimeSeries() throws Exception {
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
List.of(
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))),
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createLongDataPoint(nowUnixNanos)))
)
);
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(0, context.getIgnoredDataPoints());

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
assertEquals(2, groupCount.get());
}

public void testGroupingDifferentResource() throws Exception {
ResourceMetrics resource1 = createResourceMetrics(
List.of(keyValue("service.name", "test-service_1")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.opentelemetry.proto.common.v1.InstrumentationScope;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
import io.opentelemetry.proto.resource.v1.Resource;

import com.google.protobuf.ByteString;
Expand All @@ -30,6 +29,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint;
Expand Down Expand Up @@ -66,11 +66,24 @@ public void testBuildMetricDocument() throws IOException {

List<KeyValue> dataPointAttributes = List.of(keyValue("operation", "test"), (keyValue("environment", "production")));

List<DataPoint> dataPoints = List.of(
DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
resource,
resourceSchemaUrl,
scope,
scopeSchemaUrl,
dataPointAttributes,
"{test}",
"metrics-generic.otel-default"
);
dataPointGroup.addDataPoint(
Set.of(),
new DataPoint.Number(
createDoubleDataPoint(timestamp, startTimestamp, dataPointAttributes),
createGaugeMetric("system.cpu.usage", "", List.of())
),
)
);
dataPointGroup.addDataPoint(
Set.of(),
new DataPoint.Number(
createLongDataPoint(timestamp, startTimestamp, dataPointAttributes),
createSumMetric(
Expand All @@ -82,16 +95,6 @@ public void testBuildMetricDocument() throws IOException {
)
)
);
DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
resource,
resourceSchemaUrl,
scope,
scopeSchemaUrl,
dataPointAttributes,
"{test}",
dataPoints,
"metrics-generic.otel-default"
);

XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
HashMap<String, String> dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup);
Expand Down Expand Up @@ -129,20 +132,19 @@ public void testAttributeTypes() throws IOException {
Resource resource = Resource.newBuilder().addAllAttributes(resourceAttributes).build();
InstrumentationScope scope = InstrumentationScope.newBuilder().build();

List<DataPoint> dataPoints = List.of(
new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of()))
);

DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
resource,
null,
scope,
null,
List.of(),
"",
dataPoints,
"metrics-generic.otel-default"
);
dataPointGroup.addDataPoint(
Set.of(),
new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of()))
);

XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
documentBuilder.buildMetricDocument(builder, dataPointGroup);
Expand All @@ -162,21 +164,21 @@ public void testEmptyFields() throws IOException {
Resource resource = Resource.newBuilder().build();
InstrumentationScope scope = InstrumentationScope.newBuilder().build();

NumberDataPoint dataPoint = createDoubleDataPoint(timestamp);
var metric = createGaugeMetric("test.metric", "", List.of(dataPoint));
List<DataPoint> dataPoints = List.of(new DataPoint.Number(dataPoint, metric));

DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
resource,
null,
scope,
null,
List.of(),
"",
dataPoints,
"metrics-generic.otel-default"
);

dataPointGroup.addDataPoint(
Set.of(),
new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of()))
);

XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
documentBuilder.buildMetricDocument(builder, dataPointGroup);

Expand Down