Skip to content

Commit 3e00c51

Browse files
authored
fix ingesting of infinite values (#273)
* fix ingesting of infinite values
1 parent 7c3398c commit 3e00c51

File tree

2 files changed

+77
-70
lines changed

2 files changed

+77
-70
lines changed

src/main/java/com/arpnetworking/metrics/mad/parsers/PrometheusToRecordParser.java

Lines changed: 47 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ public final class PrometheusToRecordParser implements Parser<List<Record>, Http
6161
/**
6262
* public constructor.
6363
*
64-
* @param interpretUnits specifies whether or not to interpret units.
65-
* @param outputDebugInfo specifies whether or not to output debug files.
64+
* @param interpretUnits specifies whether to interpret units.
65+
* @param outputDebugInfo specifies whether to output debug files.
6666
*/
6767
public PrometheusToRecordParser(final boolean interpretUnits, final boolean outputDebugInfo) {
6868
_interpretUnits = interpretUnits;
@@ -78,37 +78,29 @@ public PrometheusToRecordParser(final boolean interpretUnits, final boolean outp
7878
* For more information see: https://prometheus.io/docs/practices/naming/
7979
*/
8080
ParseResult parseNameAndUnit(final String name) {
81-
if (!_interpretUnits) {
82-
return new ParseResult(name, Optional.empty());
83-
}
84-
final StringBuilder builder = new StringBuilder();
85-
for (int i = name.length() - 1; i >= 0; i--) {
86-
final char ch = name.charAt(i);
87-
if (ch == '_') {
88-
final String key = builder.toString();
89-
if (PROMETHEUS_AGGREGATION_KEYS.contains(key)) {
90-
builder.setLength(0); //reset builder
91-
} else {
92-
final Unit value = UNIT_MAP.get(key);
93-
if (value != null) {
94-
final String newName = name.substring(0, i).concat(name.substring(i + 1 + key.length()));
95-
return new ParseResult(newName, Optional.of(value));
96-
} else {
97-
return new ParseResult(name, Optional.empty());
98-
}
99-
}
100-
} else {
101-
builder.append(ch);
81+
Optional<String> aggregationKey = Optional.empty();
82+
final int lastUnderscore = name.lastIndexOf('_');
83+
if (lastUnderscore >= 0) {
84+
final String lastSuffix = name.substring(lastUnderscore + 1);
85+
if (PROMETHEUS_AGGREGATION_KEYS.contains(lastSuffix)) {
86+
aggregationKey = Optional.of(lastSuffix);
10287
}
10388
}
104-
final String possibleUnit = builder.toString();
105-
final Unit value = UNIT_MAP.get(possibleUnit);
106-
if (value != null) {
107-
final String newName = name.substring(Math.min(possibleUnit.length() + 1, name.length()));
108-
return new ParseResult(newName, Optional.of(value));
89+
if (!_interpretUnits) {
90+
return new ParseResult(name, aggregationKey, Optional.empty());
91+
}
92+
93+
final int unitIndexEnd = aggregationKey.isPresent() ? lastUnderscore - 1 : name.length() - 1;
94+
final int prevUnderscore = name.lastIndexOf('_', unitIndexEnd);
95+
final int unitStart;
96+
if (prevUnderscore >= 0) {
97+
unitStart = prevUnderscore + 1;
10998
} else {
110-
return new ParseResult(name, Optional.empty());
99+
unitStart = 0;
111100
}
101+
final String unitString = name.substring(unitStart, unitIndexEnd + 1);
102+
final Optional<Unit> unit = Optional.ofNullable(UNIT_MAP.get(unitString));
103+
return new ParseResult(name, aggregationKey, unit);
112104
}
113105

114106
@Override
@@ -131,17 +123,23 @@ public List<Record> parse(final HttpRequest data) throws ParsingException {
131123
dimensionsBuilder.put(label.getName(), label.getValue());
132124
}
133125
}
134-
135126
if (skipSeries) {
136127
continue;
137128
}
138129
final ParseResult result = parseNameAndUnit(nameOpt.orElse("").trim());
130+
// We don't currently support aggregate metrics from prometheus
131+
if (result.getAggregationKey().isPresent()) {
132+
continue;
133+
}
139134
final String metricName = result.getName();
140135
if (metricName.isEmpty()) {
141136
throw new ParsingException("Found a metric with an empty name", data.getBody().toArray());
142137
}
143138
final ImmutableMap<String, String> immutableDimensions = dimensionsBuilder.build();
144139
for (final Types.Sample sample : timeSeries.getSamplesList()) {
140+
if (!Double.isFinite(sample.getValue())) {
141+
continue;
142+
}
145143
final Record record = ThreadLocalBuilder.build(
146144
DefaultRecord.Builder.class,
147145
b -> b.setId(UUID.randomUUID().toString())
@@ -203,26 +201,22 @@ private Quantity createQuantity(final Types.Sample sample, final Optional<Unit>
203201
);
204202
}
205203

206-
private static String createUnitMapKey(final String name) {
207-
return new StringBuilder(name).reverse().toString();
208-
}
209-
210204
private final boolean _interpretUnits;
211205
private final AtomicInteger _outputFileNumber = new AtomicInteger(0);
212206
private final boolean _outputDebugInfo;
213207

214208
private static final ImmutableMap<String, Unit> UNIT_MAP = ImmutableMap.of(
215-
createUnitMapKey("seconds"), Unit.SECOND,
216-
createUnitMapKey("celcius"), Unit.CELCIUS,
217-
createUnitMapKey("bytes"), Unit.BYTE,
218-
createUnitMapKey("bits"), Unit.BIT
209+
"seconds", Unit.SECOND,
210+
"celcius", Unit.CELCIUS,
211+
"bytes", Unit.BYTE,
212+
"bits", Unit.BIT
219213
);
220214
private static final ImmutableSet<String> PROMETHEUS_AGGREGATION_KEYS = ImmutableSet.of(
221-
createUnitMapKey("total"),
222-
createUnitMapKey("bucket"),
223-
createUnitMapKey("sum"),
224-
createUnitMapKey("avg"),
225-
createUnitMapKey("count")
215+
"total",
216+
"bucket",
217+
"sum",
218+
"avg",
219+
"count"
226220
);
227221

228222
static final class ParseResult {
@@ -237,6 +231,7 @@ public boolean equals(final Object o) {
237231
}
238232
final ParseResult that = (ParseResult) o;
239233
return _unit.equals(that._unit)
234+
&& _aggregationKey.equals(that._aggregationKey)
240235
&& _name.equals(that._name);
241236
}
242237

@@ -245,12 +240,13 @@ public String toString() {
245240
return MoreObjects.toStringHelper(this)
246241
.add("unit", _unit)
247242
.add("name", _name)
243+
.add("aggregationKey", _aggregationKey)
248244
.toString();
249245
}
250246

251247
@Override
252248
public int hashCode() {
253-
return Objects.hash(_unit, _name);
249+
return Objects.hash(_unit, _name, _aggregationKey);
254250
}
255251

256252
public Optional<Unit> getUnit() {
@@ -261,12 +257,18 @@ public String getName() {
261257
return _name;
262258
}
263259

264-
ParseResult(final String name, final Optional<Unit> unit) {
260+
public Optional<String> getAggregationKey() {
261+
return _aggregationKey;
262+
}
263+
264+
ParseResult(final String name, final Optional<String> aggregationKey, final Optional<Unit> unit) {
265265
_unit = unit;
266266
_name = name;
267+
_aggregationKey = aggregationKey;
267268
}
268269

269270
private final String _name;
271+
private final Optional<String> _aggregationKey;
270272
private final Optional<Unit> _unit;
271273
}
272274
}

src/test/java/com/arpnetworking/metrics/mad/parsers/PrometheusToRecordParserTest.java

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,16 @@
2828
import com.google.common.collect.ImmutableMultimap;
2929
import com.google.common.io.Resources;
3030
import org.junit.Assert;
31+
import org.junit.Ignore;
3132
import org.junit.Test;
3233

3334
import java.io.IOException;
3435
import java.time.Instant;
3536
import java.time.ZoneOffset;
3637
import java.time.ZonedDateTime;
37-
import java.util.Arrays;
3838
import java.util.List;
3939
import java.util.Optional;
40-
import java.util.stream.Collectors;
40+
import javax.annotation.Nullable;
4141

4242
/**
4343
* Tests for the prometheus parser.
@@ -53,7 +53,8 @@ public void testParseEmpty() throws ParsingException, IOException {
5353
Assert.assertEquals(0, records.size());
5454
}
5555

56-
@Test
56+
@Test()
57+
@Ignore("Currently dropping aggregates")
5758
public void testParseSingleRecord() throws ParsingException, IOException {
5859
final ZonedDateTime time = ZonedDateTime.ofInstant(Instant.ofEpochMilli(1542330080682L), ZoneOffset.UTC);
5960
final List<Record> records = parseRecords("PrometheusParserTest/testSingleRecord");
@@ -86,17 +87,17 @@ public void testParseSingleRecord() throws ParsingException, IOException {
8687

8788
@Test
8889
public void testParseSingleRecordWithoutUnitInterpreter() throws ParsingException, IOException {
89-
final List<Record> records = parseRecords("PrometheusParserTest/testSingleRecord", createParserWithoutInterpreter());
90-
final Record record = records.get(0);
90+
final List<Record> records = parseRecords("PrometheusParserTest/testLivePrometheus2", createParserWithoutInterpreter());
91+
final Record record = records.get(57);
9192
final ImmutableMap<String, ? extends Metric> metrics = record.getMetrics();
9293
Assert.assertEquals(1, metrics.size());
93-
final Metric gauge = metrics.get("rpc_durations_histogram_seconds_count");
94+
final Metric gauge = metrics.get("container_spec_memory_limit_bytes");
9495
Assert.assertNotNull(gauge);
9596
Assert.assertEquals(MetricType.GAUGE, gauge.getType());
9697
Assert.assertEquals(1, gauge.getValues().size());
9798
final Quantity gaugeQuantity = gauge.getValues().get(0);
9899
Assert.assertEquals(Optional.empty(), gaugeQuantity.getUnit());
99-
Assert.assertEquals(493.0, gaugeQuantity.getValue(), 0.001);
100+
Assert.assertEquals(1.3500524544e11, gaugeQuantity.getValue(), 0.001);
100101
}
101102

102103
@Test(expected = ParsingException.class)
@@ -129,10 +130,12 @@ public void testUnitParserNoUnit() {
129130
testUnitParserNoUnitHelper("foo_bar");
130131
testUnitParserNoUnitHelper("foo_seconds_bar");
131132
testUnitParserNoUnitHelper("seconds_bar");
133+
testUnitParserNoUnitHelper("foo_seconds_total_");
134+
testUnitParserNoUnitHelper("foo_seconds_");
132135
}
133136
private void testUnitParserNoUnitHelper(final String name) {
134137
final PrometheusToRecordParser.ParseResult expectedResult
135-
= new PrometheusToRecordParser.ParseResult(name, Optional.empty());
138+
= new PrometheusToRecordParser.ParseResult(name, Optional.empty(), Optional.empty());
136139
Assert.assertEquals(expectedResult, createParser().parseNameAndUnit(name));
137140
}
138141

@@ -160,28 +163,30 @@ public void testUnitParserBits() {
160163
public void testLive1() throws ParsingException, IOException {
161164
final List<Record> records = parseRecords("PrometheusParserTest/testLivePrometheus1");
162165

163-
Assert.assertEquals(500, records.size());
166+
Assert.assertEquals(294, records.size());
164167
}
165168

166169
private static void testUnitParsing(final String prometheusUnit, final Unit expected) {
167-
assertUnitNewName(prometheusUnit, prometheusUnit, expected);
168-
assertUnitNewName("foo_" + prometheusUnit, prometheusUnit, expected);
169-
assertUnitNewName(prometheusUnit + "_total", prometheusUnit, expected);
170-
assertUnitNewName(prometheusUnit + "_bucket", prometheusUnit, expected);
171-
assertUnitNewName(prometheusUnit + "_sum", prometheusUnit, expected);
172-
assertUnitNewName(prometheusUnit + "_avg", prometheusUnit, expected);
173-
assertUnitNewName(prometheusUnit + "_count", prometheusUnit, expected);
174-
assertUnitNewName(prometheusUnit + "_total_count", prometheusUnit, expected);
175-
assertUnitNewName("foo_" + prometheusUnit + "_total_count", prometheusUnit, expected);
176-
}
177-
private static void assertUnitNewName(final String fullName, final String prometheusUnit, final Unit expectedUnit) {
170+
assertUnitNewName(prometheusUnit, expected, null);
171+
assertUnitNewName("foo_" + prometheusUnit, expected, null);
172+
assertUnitNewName("foo_" + prometheusUnit + "_total", expected, "total");
173+
assertUnitNewName("foo_" + prometheusUnit + "_bucket", expected, "bucket");
174+
assertUnitNewName("foo_" + prometheusUnit + "_sum", expected, "sum");
175+
assertUnitNewName("foo_" + prometheusUnit + "_avg", expected, "avg");
176+
assertUnitNewName("foo_" + prometheusUnit + "_count", expected, "count");
177+
assertUnitNewName(prometheusUnit + "_total_count", null, "count");
178+
assertUnitNewName("foo_" + prometheusUnit + "_total_count", null, "count");
179+
assertUnitNewName("foo_" + prometheusUnit + "_total", expected, "total");
180+
assertUnitNewName("foo_" + prometheusUnit + "_sum", expected, "sum");
181+
}
182+
private static void assertUnitNewName(
183+
final String fullName,
184+
@Nullable final Unit expectedUnit,
185+
@Nullable final String expectedAggregation) {
178186
final PrometheusToRecordParser parser = createParser();
179-
final String newName
180-
= Arrays.stream(fullName.split("_"))
181-
.filter(x -> !prometheusUnit.equals(x))
182-
.collect(Collectors.joining("_"));
183187
final PrometheusToRecordParser.ParseResult expectedResult
184-
= new PrometheusToRecordParser.ParseResult(newName, Optional.of(expectedUnit));
188+
= new PrometheusToRecordParser.ParseResult(
189+
fullName, Optional.ofNullable(expectedAggregation), Optional.ofNullable(expectedUnit));
185190
Assert.assertEquals(expectedResult, parser.parseNameAndUnit(fullName));
186191
}
187192

0 commit comments

Comments
 (0)