Skip to content

Commit 3392789

Browse files
authored
OTLP: ignore duplicate metric data points (elastic#133899)
1 parent 953b9ef commit 3392789

File tree

3 files changed

+75
-30
lines changed

3 files changed

+75
-30
lines changed

x-pack/plugin/otel-data/src/main/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContext.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,10 @@ public void addDataPoint(DataPoint dataPoint) {
186186
ignoredDataPoints++;
187187
return;
188188
}
189-
getOrCreateDataPointGroup(dataPoint).addDataPoint(dataPoint);
189+
DataPointGroup dataPointGroup = getOrCreateDataPointGroup(dataPoint);
190+
if (dataPointGroup.addDataPoint(ignoredDataPointMessages, dataPoint) == false) {
191+
ignoredDataPoints++;
192+
}
190193
}
191194

192195
private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
@@ -207,7 +210,6 @@ private DataPointGroup getOrCreateDataPointGroup(DataPoint dataPoint) {
207210
scopeSchemaUrl,
208211
dataPoint.getAttributes(),
209212
dataPoint.getUnit(),
210-
new ArrayList<>(),
211213
targetIndex
212214
);
213215
dataPointGroups.put(dataPointGroupHash, dataPointGroup);
@@ -233,7 +235,8 @@ public static final class DataPointGroup {
233235
private final ByteString scopeSchemaUrl;
234236
private final List<KeyValue> dataPointAttributes;
235237
private final String unit;
236-
private final List<DataPoint> dataPoints;
238+
private final Set<String> metricNames = new HashSet<>();
239+
private final List<DataPoint> dataPoints = new ArrayList<>();
237240
private final String targetIndex;
238241
private String metricNamesHash;
239242

@@ -244,7 +247,6 @@ public DataPointGroup(
244247
ByteString scopeSchemaUrl,
245248
List<KeyValue> dataPointAttributes,
246249
String unit,
247-
List<DataPoint> dataPoints,
248250
String targetIndex
249251
) {
250252
this.resource = resource;
@@ -253,7 +255,6 @@ public DataPointGroup(
253255
this.scopeSchemaUrl = scopeSchemaUrl;
254256
this.dataPointAttributes = dataPointAttributes;
255257
this.unit = unit;
256-
this.dataPoints = dataPoints;
257258
this.targetIndex = targetIndex;
258259
}
259260

@@ -276,9 +277,17 @@ public String getMetricNamesHash(BufferedMurmur3Hasher hasher) {
276277
return metricNamesHash;
277278
}
278279

279-
public void addDataPoint(DataPoint dataPoint) {
280+
public boolean addDataPoint(Set<String> ignoredDataPointMessages, DataPoint dataPoint) {
280281
metricNamesHash = null; // reset the hash when adding a new data point
281-
dataPoints.add(dataPoint);
282+
if (metricNames.add(dataPoint.getMetricName()) == false) {
283+
ignoredDataPointMessages.add(
284+
"Duplicate metric name '" + dataPoint.getMetricName() + "' for timestamp " + getTimestampUnixNano()
285+
);
286+
return false;
287+
} else {
288+
dataPoints.add(dataPoint);
289+
return true;
290+
}
282291
}
283292

284293
public Resource resource() {

x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/datapoint/DataPointGroupingContextTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createScopeMetrics;
2626
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createSumMetric;
2727
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.keyValue;
28+
import static org.hamcrest.Matchers.containsString;
2829

2930
public class DataPointGroupingContextTests extends ESTestCase {
3031

@@ -74,6 +75,39 @@ public void testGroupingDifferentGroupUnit() throws Exception {
7475
assertEquals(2, groupCount.get());
7576
}
7677

78+
public void testGroupingDuplicateNameSameTimeSeries() throws Exception {
79+
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
80+
List.of(
81+
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createDoubleDataPoint(nowUnixNanos))),
82+
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createLongDataPoint(nowUnixNanos)))
83+
)
84+
);
85+
context.groupDataPoints(metricsRequest);
86+
assertEquals(2, context.totalDataPoints());
87+
assertEquals(1, context.getIgnoredDataPoints());
88+
assertThat(context.getIgnoredDataPointsMessage(), containsString("Duplicate metric name 'system.cpu.usage' for timestamp"));
89+
90+
AtomicInteger groupCount = new AtomicInteger(0);
91+
context.consume(dataPointGroup -> groupCount.incrementAndGet());
92+
assertEquals(1, groupCount.get());
93+
}
94+
95+
public void testGroupingDuplicateNameDifferentTimeSeries() throws Exception {
96+
ExportMetricsServiceRequest metricsRequest = createMetricsRequest(
97+
List.of(
98+
createGaugeMetric("system.cpu.usage", "", List.of(createDoubleDataPoint(nowUnixNanos))),
99+
createGaugeMetric("system.cpu.usage", "{percent}", List.of(createLongDataPoint(nowUnixNanos)))
100+
)
101+
);
102+
context.groupDataPoints(metricsRequest);
103+
assertEquals(2, context.totalDataPoints());
104+
assertEquals(0, context.getIgnoredDataPoints());
105+
106+
AtomicInteger groupCount = new AtomicInteger(0);
107+
context.consume(dataPointGroup -> groupCount.incrementAndGet());
108+
assertEquals(2, groupCount.get());
109+
}
110+
77111
public void testGroupingDifferentResource() throws Exception {
78112
ResourceMetrics resource1 = createResourceMetrics(
79113
List.of(keyValue("service.name", "test-service_1")),

x-pack/plugin/otel-data/src/test/java/org/elasticsearch/xpack/oteldata/otlp/docbuilder/MetricDocumentBuilderTests.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import io.opentelemetry.proto.common.v1.InstrumentationScope;
1111
import io.opentelemetry.proto.common.v1.KeyValue;
1212
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
13-
import io.opentelemetry.proto.metrics.v1.NumberDataPoint;
1413
import io.opentelemetry.proto.resource.v1.Resource;
1514

1615
import com.google.protobuf.ByteString;
@@ -30,6 +29,7 @@
3029
import java.util.ArrayList;
3130
import java.util.HashMap;
3231
import java.util.List;
32+
import java.util.Set;
3333
import java.util.concurrent.TimeUnit;
3434

3535
import static org.elasticsearch.xpack.oteldata.otlp.OtlpUtils.createDoubleDataPoint;
@@ -66,11 +66,24 @@ public void testBuildMetricDocument() throws IOException {
6666

6767
List<KeyValue> dataPointAttributes = List.of(keyValue("operation", "test"), (keyValue("environment", "production")));
6868

69-
List<DataPoint> dataPoints = List.of(
69+
DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
70+
resource,
71+
resourceSchemaUrl,
72+
scope,
73+
scopeSchemaUrl,
74+
dataPointAttributes,
75+
"{test}",
76+
"metrics-generic.otel-default"
77+
);
78+
dataPointGroup.addDataPoint(
79+
Set.of(),
7080
new DataPoint.Number(
7181
createDoubleDataPoint(timestamp, startTimestamp, dataPointAttributes),
7282
createGaugeMetric("system.cpu.usage", "", List.of())
73-
),
83+
)
84+
);
85+
dataPointGroup.addDataPoint(
86+
Set.of(),
7487
new DataPoint.Number(
7588
createLongDataPoint(timestamp, startTimestamp, dataPointAttributes),
7689
createSumMetric(
@@ -82,16 +95,6 @@ public void testBuildMetricDocument() throws IOException {
8295
)
8396
)
8497
);
85-
DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
86-
resource,
87-
resourceSchemaUrl,
88-
scope,
89-
scopeSchemaUrl,
90-
dataPointAttributes,
91-
"{test}",
92-
dataPoints,
93-
"metrics-generic.otel-default"
94-
);
9598

9699
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
97100
HashMap<String, String> dynamicTemplates = documentBuilder.buildMetricDocument(builder, dataPointGroup);
@@ -129,20 +132,19 @@ public void testAttributeTypes() throws IOException {
129132
Resource resource = Resource.newBuilder().addAllAttributes(resourceAttributes).build();
130133
InstrumentationScope scope = InstrumentationScope.newBuilder().build();
131134

132-
List<DataPoint> dataPoints = List.of(
133-
new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of()))
134-
);
135-
136135
DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
137136
resource,
138137
null,
139138
scope,
140139
null,
141140
List.of(),
142141
"",
143-
dataPoints,
144142
"metrics-generic.otel-default"
145143
);
144+
dataPointGroup.addDataPoint(
145+
Set.of(),
146+
new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of()))
147+
);
146148

147149
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
148150
documentBuilder.buildMetricDocument(builder, dataPointGroup);
@@ -162,21 +164,21 @@ public void testEmptyFields() throws IOException {
162164
Resource resource = Resource.newBuilder().build();
163165
InstrumentationScope scope = InstrumentationScope.newBuilder().build();
164166

165-
NumberDataPoint dataPoint = createDoubleDataPoint(timestamp);
166-
var metric = createGaugeMetric("test.metric", "", List.of(dataPoint));
167-
List<DataPoint> dataPoints = List.of(new DataPoint.Number(dataPoint, metric));
168-
169167
DataPointGroupingContext.DataPointGroup dataPointGroup = new DataPointGroupingContext.DataPointGroup(
170168
resource,
171169
null,
172170
scope,
173171
null,
174172
List.of(),
175173
"",
176-
dataPoints,
177174
"metrics-generic.otel-default"
178175
);
179176

177+
dataPointGroup.addDataPoint(
178+
Set.of(),
179+
new DataPoint.Number(createDoubleDataPoint(timestamp), createGaugeMetric("test.metric", "", List.of()))
180+
);
181+
180182
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
181183
documentBuilder.buildMetricDocument(builder, dataPointGroup);
182184

0 commit comments

Comments
 (0)