Skip to content

Commit 0c28e05

Browse files
committed
OTLP: ignore duplicate metric data points
1 parent f7dd604 commit 0c28e05

File tree

2 files changed

+35
-7
lines changed

2 files changed

+35
-7
lines changed

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ public void addDataPoint(DataPoint dataPoint) {
186186
ignoredDataPoints++;
187187
return;
188188
}
189-
getOrCreateDataPointGroup(dataPoint).addDataPoint(dataPoint);
189+
DataPointGroup dataPointGroup = getOrCreateDataPointGroup(dataPoint);
190+
if (dataPointGroup.addDataPoint(ignoredDataPointMessages, dataPoint) == false) {
191+
ignoredDataPoints++;
192+
}
190193
}
191194

192195
private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
@@ -207,7 +210,6 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
207210
scopeSchemaUrl,
208211
dataPoint.getAttributes(),
209212
dataPoint.getUnit(),
210-
new ArrayList<>(),
211213
targetIndex
212214
);
213215
dataPointGroups.put(dataPointGroupHash, dataPointGroup);
@@ -233,7 +235,8 @@ public static final class DataPointGroup {
233235
private final ByteString scopeSchemaUrl;
234236
private final List<KeyValue> dataPointAttributes;
235237
private final String unit;
236-
private final List<DataPoint> dataPoints;
238+
private final Set<String> metricNames = new HashSet<>();
239+
private final List<DataPoint> dataPoints = new ArrayList<>();
237240
private final String targetIndex;
238241
private String metricNamesHash;
239242

@@ -244,7 +247,6 @@ public DataPointGroup(
244247
ByteString scopeSchemaUrl,
245248
List<KeyValue> dataPointAttributes,
246249
String unit,
247-
List<DataPoint> dataPoints,
248250
String targetIndex
249251
) {
250252
this.resource = resource;
@@ -253,7 +255,6 @@ public DataPointGroup(
253255
this.scopeSchemaUrl = scopeSchemaUrl;
254256
this.dataPointAttributes = dataPointAttributes;
255257
this.unit = unit;
256-
this.dataPoints = dataPoints;
257258
this.targetIndex = targetIndex;
258259
}
259260

@@ -276,9 +277,17 @@ public String getMetricNamesHash() {
276277
return metricNamesHash;
277278
}
278279

279-
public void addDataPoint(DataPoint dataPoint) {
280+
boolean addDataPoint(Set<String> ignoredDataPointMessages, DataPoint dataPoint) {
280281
metricNamesHash = null; // reset the hash when adding a new data point
281-
dataPoints.add(dataPoint);
282+
if (metricNames.add(dataPoint.getMetricName()) == false) {
283+
ignoredDataPointMessages.add(
284+
"Duplicate metric name '" + dataPoint.getMetricName() + "' for timestamp " + getTimestampUnixNano()
285+
);
286+
return false;
287+
} else {
288+
dataPoints.add(dataPoint);
289+
return true;
290+
}
282291
}
283292

284293
public Resource resource() {

x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics;
2626
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
2727
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
28+
import static org.hamcrest.Matchers.containsString;
2829

2930
public class DataPointGroupingContextTests extends ESTestCase {
3031

@@ -74,6 +75,24 @@ public void testGroupingDifferentGroupUnit() throws Exception {
7475
assertEquals(2, groupCount.get());
7576
}
7677

78+
public void testGroupingDuplicateName() throws Exception {
79+
// Group data points
80+
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
81+
List.of(
82+
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos, List.of()))),
83+
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createLongDataPoint(nowUnixNanos, List.of())))
84+
)
85+
);
86+
context.groupDataPoints(metricsRequest);
87+
assertEquals(2, context.totalDataPoints());
88+
assertEquals(1, context.getIgnoredDataPoints());
89+
assertThat(context.getIgnoredDataPointsMessage(), containsString("Duplicate metric name 'system.cpu.usage' for timestamp"));
90+
91+
AtomicInteger groupCount = new AtomicInteger(0);
92+
context.consume(dataPointGroup -> groupCount.incrementAndGet());
93+
assertEquals(1, groupCount.get());
94+
}
95+
7796
public void testGroupingDifferentResource() throws Exception {
7897
ResourceMetrics resource1 = createResourceMetrics(
7998
List.of(keyValue("service.name", "test-service_1")),

0 commit comments

Comments
 (0)