Skip to content

Commit a4efb11

Browse files
authored
Tracing sink (#254)
* Add a tracing sink
1 parent 58f8880 commit a4efb11

File tree

4 files changed

+178
-8
lines changed

4 files changed

+178
-8
lines changed

src/main/java/com/arpnetworking/metrics/common/sources/PrometheusHttpSource.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public static final class Builder extends HttpSource.Builder<Builder, Prometheus
4242
public Builder() {
4343
super(PrometheusHttpSource::new);
4444
setActorName(ACTOR_NAME);
45-
setParser(new PrometheusToRecordParser(_interpretUnits));
45+
setParser(new PrometheusToRecordParser(_interpretUnits, _outputDebugFiles));
4646
}
4747

4848
/**
@@ -56,12 +56,24 @@ public Builder setInterpretUnits(final Boolean value) {
5656
return this;
5757
}
5858

59+
/**
60+
* Whether to output debug files with the raw prometheus data. Cannot be null.
61+
*
62+
* @param value the value
63+
* @return this {@link Builder}
64+
*/
65+
public Builder setOutputDebugFiles(final Boolean value) {
66+
_outputDebugFiles = value;
67+
return this;
68+
}
69+
5970
@Override
6071
protected Builder self() {
6172
return this;
6273
}
6374

6475
private Boolean _interpretUnits = false;
76+
private Boolean _outputDebugFiles = false;
6577
}
6678

6779
}

src/main/java/com/arpnetworking/metrics/mad/parsers/PrometheusToRecordParser.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,11 @@ public final class PrometheusToRecordParser implements Parser<List<Record>, Http
6262
* public constructor.
6363
*
6464
* @param interpretUnits specifies whether or not to interpret units.
65+
* @param outputDebugInfo specifies whether or not to output debug files.
6566
*/
66-
public PrometheusToRecordParser(final boolean interpretUnits) {
67+
public PrometheusToRecordParser(final boolean interpretUnits, final boolean outputDebugInfo) {
6768
_interpretUnits = interpretUnits;
69+
_outputDebugInfo = outputDebugInfo;
6870
}
6971

7072
/*
@@ -115,9 +117,11 @@ public List<Record> parse(final HttpRequest data) throws ParsingException {
115117
final byte[] uncompressed;
116118
try {
117119
final byte[] input = data.getBody().toArray();
118-
int outputFile = _outputFileNumber.incrementAndGet();
119-
if (outputFile < 10) {
120-
Files.write(Paths.get("prometheus_debug_" + outputFile), input);
120+
if (_outputDebugInfo) {
121+
final int outputFile = _outputFileNumber.incrementAndGet();
122+
if (outputFile < 10) {
123+
Files.write(Paths.get("prometheus_debug_" + outputFile), input);
124+
}
121125
}
122126
uncompressed = Snappy.uncompress(input);
123127
} catch (final IOException e) {
@@ -194,6 +198,7 @@ private static String createUnitMapKey(final String name) {
194198

195199
private final boolean _interpretUnits;
196200
private final AtomicInteger _outputFileNumber = new AtomicInteger(0);
201+
private final boolean _outputDebugInfo;
197202

198203
private static final ImmutableMap<String, Unit> UNIT_MAP = ImmutableMap.of(
199204
createUnitMapKey("seconds"), Unit.SECOND,
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2022 Brandon Arp
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.tsdcore.sinks;
17+
18+
import com.arpnetworking.logback.annotations.LogValue;
19+
import com.arpnetworking.metrics.Metrics;
20+
import com.arpnetworking.metrics.MetricsFactory;
21+
import com.arpnetworking.metrics.common.sources.HttpSource;
22+
import com.arpnetworking.steno.LogValueMapFactory;
23+
import com.arpnetworking.steno.Logger;
24+
import com.arpnetworking.steno.LoggerFactory;
25+
import com.arpnetworking.tsdcore.model.PeriodicData;
26+
import com.fasterxml.jackson.annotation.JacksonInject;
27+
import com.google.common.collect.ImmutableSet;
28+
import com.google.common.collect.Maps;
29+
import net.sf.oval.constraint.NotNull;
30+
31+
import java.util.AbstractMap;
32+
import java.util.Map;
33+
import java.util.regex.Pattern;
34+
import java.util.stream.Collectors;
35+
36+
/**
37+
* A publisher that looks for certain metrics and reports on their occurrences.
38+
*
39+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot io)
40+
*/
41+
public final class TracingSink extends BaseSink {
42+
43+
@Override
44+
public void recordAggregateData(final PeriodicData periodicData) {
45+
LOGGER.debug()
46+
.setMessage("Writing aggregated data")
47+
.addData("sink", getName())
48+
.addData("dataSize", periodicData.getData().size())
49+
.log();
50+
final ImmutableSet<String> metricNames = periodicData.getData().keySet();
51+
for (Map.Entry<String, Pattern> entry : _patterns.entrySet()) {
52+
int hits = 0;
53+
final Pattern p = entry.getValue();
54+
for (final String metric : metricNames) {
55+
if (p.matcher(metric).matches()) {
56+
hits++;
57+
}
58+
}
59+
60+
if (hits > 0) {
61+
try (Metrics metrics = _metricsFactory.create()) {
62+
metrics.addAnnotation("sink", getName());
63+
metrics.addAnnotation("pattern", entry.getKey());
64+
metrics.incrementCounter("tracingSink/hit", hits);
65+
}
66+
}
67+
}
68+
69+
}
70+
71+
@Override
72+
public void close() {
73+
LOGGER.info()
74+
.setMessage("Closing sink")
75+
.addData("sink", getName())
76+
.log();
77+
}
78+
79+
@LogValue
80+
@Override
81+
public Object toLogValue() {
82+
return LogValueMapFactory.builder(this)
83+
.put("super", super.toLogValue())
84+
.put("patterns", _patterns)
85+
.build();
86+
}
87+
88+
private TracingSink(final Builder builder) {
89+
super(builder);
90+
_patterns = builder._patterns
91+
.entrySet()
92+
.stream()
93+
.map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), Pattern.compile(entry.getValue())))
94+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
95+
_metricsFactory = builder._metricsFactory;
96+
}
97+
98+
private final Map<String, Pattern> _patterns;
99+
private final MetricsFactory _metricsFactory;
100+
101+
private static final Logger LOGGER = LoggerFactory.getLogger(TracingSink.class);
102+
103+
/**
104+
* Implementation of builder pattern for {@link TracingSink}.
105+
*
106+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot io)
107+
*/
108+
public static final class Builder extends BaseSink.Builder<Builder, TracingSink> {
109+
110+
/**
111+
* Public constructor.
112+
*/
113+
public Builder() {
114+
super(TracingSink::new);
115+
}
116+
117+
/**
118+
* The patterns to look for and report on.
119+
*
120+
* @param value The aggregated data sinks to wrap.
121+
* @return This instance of {@link Builder}.
122+
*/
123+
public Builder setPatterns(final Map<String, String> value) {
124+
_patterns = Maps.newHashMap(value);
125+
return this;
126+
}
127+
128+
/**
129+
* Sets the periodic metrics instance.
130+
*
131+
* @param value The periodic metrics.
132+
* @return This instance of {@link HttpSource.Builder}
133+
*/
134+
public Builder setMetricsFactory(final MetricsFactory value) {
135+
_metricsFactory = value;
136+
return this;
137+
}
138+
139+
140+
141+
@Override
142+
protected Builder self() {
143+
return this;
144+
}
145+
146+
@NotNull
147+
private Map<String, String> _patterns;
148+
149+
@NotNull
150+
@JacksonInject
151+
private MetricsFactory _metricsFactory;
152+
}
153+
}

src/test/java/com/arpnetworking/metrics/mad/parsers/PrometheusToRecordParserTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void testUnitParserBits() {
160160
public void testLive1() throws ParsingException, IOException {
161161
final List<Record> records = parseRecords("PrometheusParserTest/testLivePrometheus1");
162162

163-
Assert.assertEquals(0, records.size());
163+
Assert.assertEquals(500, records.size());
164164
}
165165

166166
private static void testUnitParsing(final String prometheusUnit, final Unit expected) {
@@ -199,9 +199,9 @@ private static List<Record> parseRecords(
199199
}
200200

201201
private static PrometheusToRecordParser createParser() {
202-
return new PrometheusToRecordParser(true);
202+
return new PrometheusToRecordParser(true, false);
203203
}
204204
private static PrometheusToRecordParser createParserWithoutInterpreter() {
205-
return new PrometheusToRecordParser(false);
205+
return new PrometheusToRecordParser(false, false);
206206
}
207207
}

0 commit comments

Comments
 (0)