Skip to content

Commit a8c3864

Browse files
bngreenvjkoskela
authored andcommitted
Prometheus Http Source (#143)
* Prometheus Http Source * Prometheus source: parse units from metric name * changed to version 1.0.0 of prometheus-remote-protocol * checkstyle * changed unit parser to return the unit when no other name part is present * PrometheusToRecordParser tests * used Optional<T> * removed type & scope of snappy dep * copyright * throw exception on null/empty metric name, parseUnit non optional input, used ofNullable, parsed unit after validating metric name, members at end of class * Removed javadoc for private method * Isolated snappy into a separate try/catch block * parseUnit javadoc * Added code to remove units from name. Added configuration to whether or not to interpret units * code cleanliness and naming. pom alpha-order. Dropped application suffix.
1 parent 48d395c commit a8c3864

File tree

13 files changed

+544
-0
lines changed

13 files changed

+544
-0
lines changed

config/pipelines/pipeline.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ sources=[
4646
type="com.arpnetworking.metrics.common.sources.ClientHttpSourceV2"
4747
name="http_v2_source"
4848
}
49+
{
50+
type="com.arpnetworking.metrics.common.sources.PrometheusHttpSource"
51+
name="prometheus_source"
52+
}
4953
{
5054
type="com.arpnetworking.metrics.common.sources.StatsdSource"
5155
name="statsd_source"

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
<logback.version>1.2.3</logback.version>
9292
<logback.steno.version>1.18.2</logback.steno.version>
9393
<log4j.over.slf4j.version>1.7.25</log4j.over.slf4j.version>
94+
<metrics.aggregator.protocol.prometheus.version>1.0.0</metrics.aggregator.protocol.prometheus.version>
9495
<metrics.aggregator.protocol.version>1.0.6</metrics.aggregator.protocol.version>
9596
<metrics.client.version>0.10.1</metrics.client.version>
9697
<metrics.client.http.version>0.9.4</metrics.client.http.version>
@@ -101,6 +102,7 @@
101102
<scala.version>2.11</scala.version>
102103
<scala.library.version>2.11.12</scala.library.version>
103104
<slf4j.version>1.7.25</slf4j.version>
105+
<snappy.version>1.1.7.2</snappy.version>
104106
<typesafe.config.version>1.3.3</typesafe.config.version>
105107
<vertx.core.version>2.1.6</vertx.core.version>
106108

@@ -670,6 +672,11 @@
670672
<version>${cglib.version}</version>
671673
<scope>runtime</scope>
672674
</dependency>
675+
<dependency>
676+
<groupId>org.xerial.snappy</groupId>
677+
<artifactId>snappy-java</artifactId>
678+
<version>${snappy.version}</version>
679+
</dependency>
673680
<!-- Aggregator Protocol -->
674681
<dependency>
675682
<groupId>com.arpnetworking.metrics</groupId>
@@ -681,6 +688,11 @@
681688
<artifactId>protocol</artifactId>
682689
<version>${client.protocol.version}</version>
683690
</dependency>
691+
<dependency>
692+
<groupId>com.arpnetworking.metrics</groupId>
693+
<artifactId>prometheus-remote-protocol</artifactId>
694+
<version>${metrics.aggregator.protocol.prometheus.version}</version>
695+
</dependency>
684696
<!-- Test - General -->
685697
<dependency>
686698
<groupId>junit</groupId>

src/main/java/com/arpnetworking/http/Routes.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.arpnetworking.metrics.common.sources.ClientHttpSourceV1;
4444
import com.arpnetworking.metrics.common.sources.ClientHttpSourceV2;
4545
import com.arpnetworking.metrics.common.sources.CollectdHttpSourceV1;
46+
import com.arpnetworking.metrics.common.sources.PrometheusHttpSource;
4647
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
4748
import com.arpnetworking.metrics.mad.actors.Status;
4849
import com.arpnetworking.metrics.proxy.actors.Connection;
@@ -220,6 +221,8 @@ private CompletionStage<HttpResponse> process(final HttpRequest request) {
220221
return dispatchHttpRequest(request, ACTOR_APP_V2);
221222
} else if (Objects.equals(path, APP_V1_SOURCE_PREFIX)) {
222223
return dispatchHttpRequest(request, ACTOR_APP_V1);
224+
} else if (Objects.equals(path, PROMETHEUS_SOURCE_PREFIX)) {
225+
return dispatchHttpRequest(request, ACTOR_PROMETHEUS);
223226
}
224227
}
225228

@@ -347,9 +350,11 @@ private String createMetricName(final HttpRequest request, final String actionPa
347350
private static final String COLLECTD_V1_SOURCE_PREFIX = "/metrics/v1/collectd";
348351
private static final String APP_V1_SOURCE_PREFIX = "/metrics/v1/application";
349352
private static final String APP_V2_SOURCE_PREFIX = "/metrics/v2/application";
353+
private static final String PROMETHEUS_SOURCE_PREFIX = "/metrics/prometheus";
350354
private static final String ACTOR_COLLECTD_V1 = "/user/" + CollectdHttpSourceV1.ACTOR_NAME;
351355
private static final String ACTOR_APP_V1 = "/user/" + ClientHttpSourceV1.ACTOR_NAME;
352356
private static final String ACTOR_APP_V2 = "/user/" + ClientHttpSourceV2.ACTOR_NAME;
357+
private static final String ACTOR_PROMETHEUS = "/user/" + PrometheusHttpSource.ACTOR_NAME;
353358
private static final String REST_SERVICE_METRIC_ROOT = "rest_service/";
354359
private static final String BODY_SIZE_METRIC = "body_size";
355360
private static final String REQUEST_METRIC = "request";
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2018 Bruno Green.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.sources;
17+
18+
import com.arpnetworking.metrics.mad.parsers.PrometheusToRecordParser;
19+
20+
/**
21+
* Processes Prometheus messages, extracts data and emits metrics.
22+
*
23+
* @author Bruno Green (bruno dot green at gmail dot com)
24+
*/
25+
public final class PrometheusHttpSource extends HttpSource{
26+
private PrometheusHttpSource(final HttpSource.Builder<?, ? extends HttpSource> builder) {
27+
super(builder);
28+
}
29+
30+
/**
31+
* Name of the actor created to receive the HTTP Posts.
32+
*/
33+
public static final String ACTOR_NAME = "prometheus";
34+
35+
/**
36+
* PrometheusHttpSource {@link BaseSource.Builder} implementation.
37+
*/
38+
public static final class Builder extends HttpSource.Builder<Builder, PrometheusHttpSource> {
39+
/**
40+
* Public constructor.
41+
*/
42+
public Builder() {
43+
super(PrometheusHttpSource::new);
44+
setActorName(ACTOR_NAME);
45+
setParser(new PrometheusToRecordParser(_interpretUnits));
46+
}
47+
48+
/**
49+
* Whether to interpret units in the metric name. Optional. Defaults to false. Cannot be null.
50+
*
51+
* @param value the value
52+
* @return this {@link Builder}
53+
*/
54+
public Builder setInterpretUnits(final Boolean value) {
55+
_interpretUnits = value;
56+
return this;
57+
}
58+
59+
@Override
60+
protected Builder self() {
61+
return this;
62+
}
63+
64+
private Boolean _interpretUnits = false;
65+
}
66+
67+
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright 2018 Bruno Green.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.mad.parsers;
17+
18+
import com.arpnetworking.commons.builder.ThreadLocalBuilder;
19+
import com.arpnetworking.metrics.common.parsers.Parser;
20+
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
21+
import com.arpnetworking.metrics.mad.model.DefaultMetric;
22+
import com.arpnetworking.metrics.mad.model.DefaultRecord;
23+
import com.arpnetworking.metrics.mad.model.HttpRequest;
24+
import com.arpnetworking.metrics.mad.model.Metric;
25+
import com.arpnetworking.metrics.mad.model.Record;
26+
import com.arpnetworking.metrics.prometheus.Remote;
27+
import com.arpnetworking.metrics.prometheus.Types;
28+
import com.arpnetworking.metrics.prometheus.Types.TimeSeries;
29+
import com.arpnetworking.tsdcore.model.MetricType;
30+
import com.arpnetworking.tsdcore.model.Quantity;
31+
import com.arpnetworking.tsdcore.model.Unit;
32+
import com.google.common.base.MoreObjects;
33+
import com.google.common.collect.ImmutableList;
34+
import com.google.common.collect.ImmutableMap;
35+
import com.google.common.collect.ImmutableSet;
36+
import com.google.common.collect.Lists;
37+
import com.google.protobuf.InvalidProtocolBufferException;
38+
import net.sf.oval.exception.ConstraintsViolatedException;
39+
import org.xerial.snappy.Snappy;
40+
41+
import java.io.IOException;
42+
import java.time.Instant;
43+
import java.time.ZoneOffset;
44+
import java.time.ZonedDateTime;
45+
import java.util.List;
46+
import java.util.Objects;
47+
import java.util.Optional;
48+
import java.util.UUID;
49+
50+
/**
51+
* Parses the Prometheus protobuf binary protocol into records.
52+
*
53+
* @author Bruno Green (bruno dot green at gmail dot com)
54+
*/
55+
public final class PrometheusToRecordParser implements Parser<List<Record>, HttpRequest> {
56+
57+
/**
58+
* public constructor.
59+
*
60+
* @param interpretUnits specifies whether or not to interpret units.
61+
*/
62+
public PrometheusToRecordParser(final boolean interpretUnits) {
63+
_interpretUnits = interpretUnits;
64+
}
65+
66+
/*
67+
* Parses a unit and the new name from the name of a metric.
68+
* Prometheus will, by default, add unit names to the end of a metric name.
69+
* We want to parse that name and apply that unit to the metric.
70+
* An unit suffix might be added to the name of the metric, we currently have a set of
71+
* whitelisted suffixes that is most likely not exhaustive.
72+
* For more information see: https://prometheus.io/docs/practices/naming/
73+
*/
74+
ParseResult parseNameAndUnit(final String name) {
75+
if (!_interpretUnits) {
76+
return new ParseResult(name, Optional.empty());
77+
}
78+
final StringBuilder builder = new StringBuilder();
79+
for (int i = name.length() - 1; i >= 0; i--) {
80+
final char ch = name.charAt(i);
81+
if (ch == '_') {
82+
final String key = builder.toString();
83+
if (PROMETHEUS_AGGREGATION_KEYS.contains(key)) {
84+
builder.setLength(0); //reset builder
85+
} else {
86+
final Unit value = UNIT_MAP.get(key);
87+
if (value != null) {
88+
final String newName = name.substring(0, i).concat(name.substring(i + 1 + key.length()));
89+
return new ParseResult(newName, Optional.of(value));
90+
} else {
91+
return new ParseResult(name, Optional.empty());
92+
}
93+
}
94+
} else {
95+
builder.append(ch);
96+
}
97+
}
98+
final String possibleUnit = builder.toString();
99+
final Unit value = UNIT_MAP.get(possibleUnit);
100+
if (value != null) {
101+
final String newName = name.substring(Math.min(possibleUnit.length() + 1, name.length()));
102+
return new ParseResult(newName, Optional.of(value));
103+
} else {
104+
return new ParseResult(name, Optional.empty());
105+
}
106+
}
107+
108+
@Override
109+
public List<Record> parse(final HttpRequest data) throws ParsingException {
110+
final List<Record> records = Lists.newArrayList();
111+
final byte[] uncompressed;
112+
try {
113+
uncompressed = Snappy.uncompress(data.getBody().toArray());
114+
} catch (final IOException e) {
115+
throw new ParsingException("Failed to decompress snappy stream", data.getBody().toArray(), e);
116+
}
117+
try {
118+
final Remote.WriteRequest writeRequest = Remote.WriteRequest.parseFrom(uncompressed);
119+
for (final TimeSeries timeSeries : writeRequest.getTimeseriesList()) {
120+
Optional<String> nameOpt = Optional.empty();
121+
final ImmutableMap.Builder<String, String> dimensionsBuilder = ImmutableMap.builder();
122+
for (final Types.Label label : timeSeries.getLabelsList()) {
123+
if ("__name__".equals(label.getName())) {
124+
final String value = label.getValue();
125+
nameOpt = Optional.ofNullable(value);
126+
} else {
127+
dimensionsBuilder.put(label.getName(), label.getValue());
128+
}
129+
}
130+
final ParseResult result = parseNameAndUnit(nameOpt.orElse("").trim());
131+
final String metricName = result.getName();
132+
if (metricName.isEmpty()) {
133+
throw new ParsingException("Found a metric with an empty name", data.getBody().toArray());
134+
}
135+
final ImmutableMap<String, String> immutableDimensions = dimensionsBuilder.build();
136+
for (final Types.Sample sample : timeSeries.getSamplesList()) {
137+
final Record record = ThreadLocalBuilder.build(
138+
DefaultRecord.Builder.class,
139+
b -> b.setId(UUID.randomUUID().toString())
140+
.setTime(
141+
ZonedDateTime.ofInstant(
142+
Instant.ofEpochMilli(sample.getTimestamp()),
143+
ZoneOffset.UTC))
144+
.setMetrics(
145+
createMetric(
146+
metricName,
147+
sample,
148+
result.getUnit()))
149+
.setDimensions(immutableDimensions)
150+
);
151+
records.add(record);
152+
}
153+
}
154+
} catch (final InvalidProtocolBufferException e) {
155+
throw new ParsingException("Could not create Request message from data", data.getBody().toArray(), e);
156+
} catch (final ConstraintsViolatedException | IllegalArgumentException e) {
157+
throw new ParsingException("Could not build record", data.getBody().toArray(), e);
158+
}
159+
return records;
160+
}
161+
162+
private ImmutableMap<String, ? extends Metric> createMetric(final String name, final Types.Sample sample, final Optional<Unit> unit) {
163+
final Metric metric = ThreadLocalBuilder.build(
164+
DefaultMetric.Builder.class,
165+
p -> p
166+
.setType(MetricType.GAUGE)
167+
.setValues(ImmutableList.of(createQuantity(sample, unit)))
168+
.build()
169+
);
170+
return ImmutableMap.of(name, metric);
171+
}
172+
173+
private Quantity createQuantity(final Types.Sample sample, final Optional<Unit> unit) {
174+
return ThreadLocalBuilder.build(
175+
Quantity.Builder.class,
176+
p -> p
177+
.setValue(sample.getValue())
178+
.setUnit(unit.orElse(null))
179+
);
180+
}
181+
182+
private static String createUnitMapKey(final String name) {
183+
return new StringBuilder(name).reverse().toString();
184+
}
185+
186+
private final boolean _interpretUnits;
187+
188+
private static final ImmutableMap<String, Unit> UNIT_MAP = ImmutableMap.of(
189+
createUnitMapKey("seconds"), Unit.SECOND,
190+
createUnitMapKey("celcius"), Unit.CELCIUS,
191+
createUnitMapKey("bytes"), Unit.BYTE,
192+
createUnitMapKey("bits"), Unit.BIT
193+
);
194+
private static final ImmutableSet<String> PROMETHEUS_AGGREGATION_KEYS = ImmutableSet.of(
195+
createUnitMapKey("total"),
196+
createUnitMapKey("bucket"),
197+
createUnitMapKey("sum"),
198+
createUnitMapKey("avg"),
199+
createUnitMapKey("count")
200+
);
201+
202+
static final class ParseResult {
203+
204+
@Override
205+
public boolean equals(final Object o) {
206+
if (this == o) {
207+
return true;
208+
}
209+
if (o == null || getClass() != o.getClass()) {
210+
return false;
211+
}
212+
final ParseResult that = (ParseResult) o;
213+
return _unit.equals(that._unit)
214+
&& _name.equals(that._name);
215+
}
216+
217+
@Override
218+
public String toString() {
219+
return MoreObjects.toStringHelper(this)
220+
.add("unit", _unit)
221+
.add("name", _name)
222+
.toString();
223+
}
224+
225+
@Override
226+
public int hashCode() {
227+
return Objects.hash(_unit, _name);
228+
}
229+
230+
public Optional<Unit> getUnit() {
231+
return _unit;
232+
}
233+
234+
public String getName() {
235+
return _name;
236+
}
237+
238+
ParseResult(final String name, final Optional<Unit> unit) {
239+
_unit = unit;
240+
_name = name;
241+
}
242+
243+
private final String _name;
244+
private final Optional<Unit> _unit;
245+
}
246+
}

0 commit comments

Comments
 (0)