Skip to content

Commit 6955429

Browse files
authored
add histogram support to kairosdb sink (#63)
1 parent 6394486 commit 6955429

File tree

2 files changed

+214
-2
lines changed

2 files changed

+214
-2
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
<akka.kryo.version>0.5.0</akka.kryo.version>
108108
<apache.httpclient.version>4.5.2</apache.httpclient.version>
109109
<apache.httpcore.version>4.4.5</apache.httpcore.version>
110-
<arpnetworking.commons.version>1.10.1</arpnetworking.commons.version>
110+
<arpnetworking.commons.version>1.11.0</arpnetworking.commons.version>
111111
<aspectjrt.version>1.8.10</aspectjrt.version>
112112
<cglib.version>3.2.4</cglib.version>
113113
<commons.codec.version>1.10</commons.codec.version>

src/main/java/com/arpnetworking/tsdcore/sinks/KairosDbSink.java

Lines changed: 213 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,14 @@
2222
import com.arpnetworking.steno.LoggerFactory;
2323
import com.arpnetworking.tsdcore.model.AggregatedData;
2424
import com.arpnetworking.tsdcore.model.Condition;
25+
import com.arpnetworking.tsdcore.model.FQDSN;
2526
import com.arpnetworking.tsdcore.model.PeriodicData;
27+
import com.arpnetworking.tsdcore.statistics.HistogramStatistic;
28+
import com.arpnetworking.tsdcore.statistics.MaxStatistic;
29+
import com.arpnetworking.tsdcore.statistics.MeanStatistic;
30+
import com.arpnetworking.tsdcore.statistics.MinStatistic;
31+
import com.arpnetworking.tsdcore.statistics.Statistic;
32+
import com.arpnetworking.tsdcore.statistics.SumStatistic;
2633
import com.fasterxml.jackson.core.JsonEncoding;
2734
import com.fasterxml.jackson.core.JsonGenerator;
2835
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -59,6 +66,9 @@ public Object toLogValue() {
5966
return LogValueMapFactory.builder(this)
6067
.put("super", super.toLogValue())
6168
.put("maxRequestSize", _maxRequestSize)
69+
.put("ttlSeconds", _ttlSeconds)
70+
.put("publishHistograms", _publishHistograms)
71+
.put("histogramTtlSeconds", _histogramTtlSeconds)
6272
.build();
6373
}
6474

@@ -75,11 +85,30 @@ protected Collection<byte[]> serialize(final PeriodicData periodicData) {
7585
final ImmutableMap<String, String> dimensions = periodicData.getDimensions();
7686
final Serializer serializer = new Serializer(timestamp, serializedPeriod, dimensions);
7787

88+
final KairosHistogramAdditionalData histogramAdditionalData = new KairosHistogramAdditionalData();
89+
AggregatedData histogram = null;
90+
7891
// Initialize the chunk buffer
7992
currentChunk.put(HEADER);
8093

8194
// Add aggregated data
8295
for (final AggregatedData datum : periodicData.getData()) {
96+
if (_publishHistograms) {
97+
// We need to collect the min, max, mean, and sum
98+
final Statistic statistic = datum.getFQDSN().getStatistic();
99+
if (statistic instanceof MeanStatistic) {
100+
histogramAdditionalData.setMean(datum.getValue().getValue());
101+
} else if (statistic instanceof SumStatistic) {
102+
histogramAdditionalData.setSum(datum.getValue().getValue());
103+
} else if (statistic instanceof MaxStatistic) {
104+
histogramAdditionalData.setMax(datum.getValue().getValue());
105+
} else if (statistic instanceof MinStatistic) {
106+
histogramAdditionalData.setMin(datum.getValue().getValue());
107+
} else if (statistic instanceof HistogramStatistic) {
108+
histogram = datum;
109+
}
110+
}
111+
83112
if (!datum.isSpecified()) {
84113
LOGGER.trace()
85114
.setMessage("Skipping unspecified datum")
@@ -88,7 +117,18 @@ protected Collection<byte[]> serialize(final PeriodicData periodicData) {
88117
continue;
89118
}
90119

91-
serializer.serializeDatum(completeChunks, currentChunk, chunkStream, datum);
120+
if (_publishStandardMetrics) {
121+
serializer.serializeDatum(completeChunks, currentChunk, chunkStream, datum);
122+
}
123+
}
124+
125+
if (_publishHistograms && histogram != null) {
126+
serializer.serializeHistogram(completeChunks, currentChunk, chunkStream, histogram, histogramAdditionalData);
127+
} else if (_publishHistograms) {
128+
NO_HISTOGRAM_LOGGER.warn()
129+
.setMessage("Expected to publish histogram, but none found")
130+
.addData("periodicData", periodicData)
131+
.log();
92132
}
93133

94134
// Add conditions
@@ -141,9 +181,17 @@ private void addChunk(
141181
private KairosDbSink(final Builder builder) {
142182
super(builder);
143183
_maxRequestSize = builder._maxRequestSize;
184+
_ttlSeconds = (int) builder._ttl.getSeconds();
185+
_publishStandardMetrics = builder._publishStandardMetrics;
186+
_publishHistograms = builder._publishHistograms;
187+
_histogramTtlSeconds = (int) builder._histogramTtl.getSeconds();
144188
}
145189

146190
private final int _maxRequestSize;
191+
private final int _ttlSeconds;
192+
private final boolean _publishHistograms;
193+
private final boolean _publishStandardMetrics;
194+
private final int _histogramTtlSeconds;
147195

148196
private static final byte HEADER = '[';
149197
private static final byte FOOTER = ']';
@@ -152,9 +200,50 @@ private KairosDbSink(final Builder builder) {
152200
// TODO(vkoskela): Switch to ImmutableObjectMapper. [https://github.com/ArpNetworking/commons/issues/7]
153201
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.createInstance();
154202
private static final Logger LOGGER = LoggerFactory.getLogger(KairosDbSink.class);
203+
private static final Logger NO_HISTOGRAM_LOGGER = LoggerFactory.getRateLimitLogger(KairosDbSink.class, Duration.ofSeconds(30));
155204
private static final Logger SERIALIZATION_FAILURE_LOGGER = LoggerFactory.getRateLimitLogger(KairosDbSink.class, Duration.ofSeconds(30));
156205
private static final Logger CHUNK_TOO_BIG_LOGGER = LoggerFactory.getRateLimitLogger(KairosDbSink.class, Duration.ofSeconds(30));
157206

207+
208+
private static final class KairosHistogramAdditionalData {
209+
public double getMin() {
210+
return _min;
211+
}
212+
213+
public void setMin(final double min) {
214+
_min = min;
215+
}
216+
217+
public double getMax() {
218+
return _max;
219+
}
220+
221+
public void setMax(final double max) {
222+
_max = max;
223+
}
224+
225+
public double getMean() {
226+
return _mean;
227+
}
228+
229+
public void setMean(final double mean) {
230+
_mean = mean;
231+
}
232+
233+
public double getSum() {
234+
return _sum;
235+
}
236+
237+
public void setSum(final double sum) {
238+
_sum = sum;
239+
}
240+
241+
private double _min;
242+
private double _max;
243+
private double _mean;
244+
private double _sum;
245+
}
246+
158247
private class Serializer {
159248

160249
Serializer(
@@ -180,6 +269,9 @@ public void serializeDatum(
180269
chunkGenerator.writeStartObject();
181270
chunkGenerator.writeStringField("name", name);
182271
chunkGenerator.writeNumberField("timestamp", _timestamp);
272+
if (_ttlSeconds > 0) {
273+
chunkGenerator.writeNumberField("ttl", _ttlSeconds);
274+
}
183275
chunkGenerator.writeNumberField("value", datum.getValue().getValue());
184276
chunkGenerator.writeObjectFieldStart("tags");
185277
for (Map.Entry<String, String> entry : _dimensions.entrySet()) {
@@ -206,6 +298,65 @@ public void serializeDatum(
206298
}
207299
}
208300

301+
public void serializeHistogram(
302+
final List<byte[]> completeChunks,
303+
final ByteBuffer currentChunk,
304+
final ByteArrayOutputStream chunkStream,
305+
final AggregatedData data,
306+
final KairosHistogramAdditionalData additionalData) {
307+
final FQDSN fqdsn = data.getFQDSN();
308+
309+
try {
310+
final HistogramStatistic.HistogramSnapshot bins = ((HistogramStatistic.HistogramSupportingData) data.getSupportingData())
311+
.getHistogramSnapshot();
312+
final JsonGenerator chunkGenerator = OBJECT_MAPPER.getFactory().createGenerator(chunkStream, JsonEncoding.UTF8);
313+
314+
chunkGenerator.writeStartObject();
315+
chunkGenerator.writeStringField("name", fqdsn.getMetric());
316+
chunkGenerator.writeNumberField("timestamp", _timestamp);
317+
chunkGenerator.writeStringField("type", "histogram");
318+
319+
chunkGenerator.writeObjectFieldStart("value");
320+
chunkGenerator.writeNumberField("min", additionalData.getMin());
321+
chunkGenerator.writeNumberField("max", additionalData.getMax());
322+
chunkGenerator.writeNumberField("mean", additionalData.getMean());
323+
chunkGenerator.writeNumberField("sum", additionalData.getSum());
324+
chunkGenerator.writeObjectFieldStart("bins");
325+
for (Map.Entry<Double, Integer> bin : bins.getValues()) {
326+
chunkGenerator.writeNumberField(bin.getKey().toString(), bin.getValue());
327+
}
328+
329+
chunkGenerator.writeEndObject(); //close bins
330+
chunkGenerator.writeEndObject(); //close value
331+
332+
if (_histogramTtlSeconds > 0) {
333+
chunkGenerator.writeNumberField("ttl", _histogramTtlSeconds);
334+
}
335+
chunkGenerator.writeObjectFieldStart("tags");
336+
for (Map.Entry<String, String> entry : _dimensions.entrySet()) {
337+
chunkGenerator.writeStringField(entry.getKey(), entry.getValue());
338+
}
339+
if (!_dimensions.containsKey("service")) {
340+
chunkGenerator.writeStringField("service", fqdsn.getService());
341+
}
342+
if (!_dimensions.containsKey("cluster")) {
343+
chunkGenerator.writeStringField("cluster", fqdsn.getCluster());
344+
}
345+
chunkGenerator.writeEndObject();
346+
chunkGenerator.writeEndObject();
347+
348+
chunkGenerator.close();
349+
350+
addChunk(chunkStream, currentChunk, completeChunks);
351+
} catch (final IOException e) {
352+
SERIALIZATION_FAILURE_LOGGER.error()
353+
.setMessage("Serialization failure")
354+
.addData("data", data)
355+
.setThrowable(e)
356+
.log();
357+
}
358+
}
359+
209360
public void serializeCondition(
210361
final List<byte[]> completeChunks,
211362
final ByteBuffer currentChunk,
@@ -249,6 +400,9 @@ private void serializeConditionStatus(
249400
chunkGenerator.writeStartObject();
250401
chunkGenerator.writeStringField("name", conditionStatusName);
251402
chunkGenerator.writeNumberField("timestamp", _timestamp);
403+
if (_ttlSeconds > 0) {
404+
chunkGenerator.writeNumberField("ttl", _ttlSeconds);
405+
}
252406
chunkGenerator.writeNumberField("value", condition.isTriggered().get() ? 1 : 0);
253407
chunkGenerator.writeObjectFieldStart("tags");
254408
for (Map.Entry<String, String> entry : _dimensions.entrySet()) {
@@ -335,8 +489,66 @@ public Builder setMaxRequestSize(final Integer value) {
335489
return this;
336490
}
337491

492+
/**
493+
* Sets whether or not to publish non-histogram metrics.
494+
* Optional. Defaults to true.
495+
*
496+
* @param value true to publish standard metrics
497+
* @return This instance of {@link Builder}.
498+
*/
499+
public Builder setStandardMetrics(final Boolean value) {
500+
_publishStandardMetrics = value;
501+
return this;
502+
}
503+
504+
/**
505+
* Sets whether or not to publish full histograms.
506+
* Optional. Defaults to false.
507+
*
508+
* @param value true to publish histograms
509+
* @return This instance of {@link Builder}.
510+
*/
511+
public Builder setPublishHistograms(final Boolean value) {
512+
_publishHistograms = value;
513+
return this;
514+
}
515+
516+
/**
517+
* Sets the TTL of non-histogram metrics.
518+
* NOTE: A value of 0 represents permanent.
519+
* Optional. Defaults to permanent.
520+
*
521+
* @param value the time to retain histograms
522+
* @return This instance of {@link Builder}.
523+
*/
524+
public Builder setTtl(final Duration value) {
525+
_ttl = value;
526+
return this;
527+
}
528+
529+
/**
530+
* Sets the TTL of histograms.
531+
* NOTE: A value of 0 represents permanent.
532+
* Optional. Defaults to permanent.
533+
*
534+
* @param value the time to retain histograms
535+
* @return This instance of {@link Builder}.
536+
*/
537+
public Builder setHistogramTtl(final Duration value) {
538+
_histogramTtl = value;
539+
return this;
540+
}
541+
338542
@NotNull
339543
@Min(value = 0)
340544
private Integer _maxRequestSize = 100 * 1024;
545+
@NotNull
546+
private Boolean _publishStandardMetrics = true;
547+
@NotNull
548+
private Boolean _publishHistograms = false;
549+
@NotNull
550+
private Duration _histogramTtl = Duration.ofSeconds(0);
551+
@NotNull
552+
private Duration _ttl = Duration.ofSeconds(0);
341553
}
342554
}

0 commit comments

Comments
 (0)