Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ public void addDataPoint(DataPoint dataPoint) {
ignoredDataPoints++;
return;
}
getOrCreateDataPointGroup(dataPoint).addDataPoint(dataPoint);
DataPointGroup dataPointGroup = getOrCreateDataPointGroup(dataPoint);
if (dataPointGroup.addDataPoint(ignoredDataPointMessages, dataPoint) == false) {
ignoredDataPoints++;
}
}

private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
Expand All @@ -207,7 +210,6 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
scopeSchemaUrl,
dataPoint.getAttributes(),
dataPoint.getUnit(),
new ArrayList<>(),
targetIndex
);
dataPointGroups.put(dataPointGroupHash, dataPointGroup);
Expand All @@ -233,7 +235,8 @@ public static final class DataPointGroup {
private final ByteString scopeSchemaUrl;
private final List<KeyValue> dataPointAttributes;
private final String unit;
private final List<DataPoint> dataPoints;
private final Set<String> metricNames = new HashSet<>();
private final List<DataPoint> dataPoints = new ArrayList<>();
private final String targetIndex;
private String metricNamesHash;

Expand All @@ -244,7 +247,6 @@ public DataPointGroup(
ByteString scopeSchemaUrl,
List<KeyValue> dataPointAttributes,
String unit,
List<DataPoint> dataPoints,
String targetIndex
) {
this.resource = resource;
Expand All @@ -253,7 +255,6 @@ public DataPointGroup(
this.scopeSchemaUrl = scopeSchemaUrl;
this.dataPointAttributes = dataPointAttributes;
this.unit = unit;
this.dataPoints = dataPoints;
this.targetIndex = targetIndex;
}

Expand All @@ -276,9 +277,17 @@ public String getMetricNamesHash() {
return metricNamesHash;
}

public void addDataPoint(DataPoint dataPoint) {
boolean addDataPoint(Set<String> ignoredDataPointMessages, DataPoint dataPoint) {
metricNamesHash = null; // reset the hash when adding a new data point
dataPoints.add(dataPoint);
if (metricNames.add(dataPoint.getMetricName()) == false) {
ignoredDataPointMessages.add(
"Duplicate metric name '" + dataPoint.getMetricName() + "' for timestamp " + getTimestampUnixNano()
);
return false;
} else {
dataPoints.add(dataPoint);
return true;
}
}

public Resource resource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
import static org.hamcrest.Matchers.containsString;

public class DataPointGroupingContextTests extends ESTestCase {

Expand Down Expand Up @@ -74,6 +75,24 @@ public void testGroupingDifferentGroupUnit() throws Exception {
assertEquals(2, groupCount.get());
}

public void testGroupingDuplicateName() throws Exception {
// Group data points
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
List.of(
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createLongDataPoint(nowUnixNanos, List.of())))
)
);
context.groupDataPoints(metricsRequest);
assertEquals(2, context.totalDataPoints());
assertEquals(1, context.getIgnoredDataPoints());
assertThat(context.getIgnoredDataPointsMessage(), containsString("Duplicate metric name 'system.cpu.usage' for timestamp"));

AtomicInteger groupCount = new AtomicInteger(0);
context.consume(dataPointGroup -> groupCount.incrementAndGet());
assertEquals(1, groupCount.get());
}

public void testGroupingDifferentResource() throws Exception {
ResourceMetrics resource1 = createResourceMetrics(
List.of(keyValue("service.name", "test-service_1")),
Expand Down