Skip to content

Commit ff90f22

Browse files
author
Ville Koskela
committed
Ported InfluxDbSink implementation to oss repository.
1 parent 1a13fb7 commit ff90f22

File tree

2 files changed

+421
-0
lines changed

2 files changed

+421
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/**
2+
* Copyright 2016 Groupon.com
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.tsdcore.sinks;
17+
18+
import akka.http.javadsl.model.HttpMethods;
19+
import com.arpnetworking.tsdcore.model.AggregatedData;
20+
import com.arpnetworking.tsdcore.model.FQDSN;
21+
import com.arpnetworking.tsdcore.model.PeriodicData;
22+
import com.google.common.collect.Lists;
23+
import com.google.common.collect.Maps;
24+
import com.ning.http.client.AsyncHttpClient;
25+
import com.ning.http.client.Request;
26+
import com.ning.http.client.RequestBuilder;
27+
import org.joda.time.format.ISOPeriodFormat;
28+
29+
import java.nio.charset.StandardCharsets;
30+
import java.util.Collection;
31+
import java.util.Map;
32+
import java.util.StringJoiner;
33+
34+
/**
35+
* Publishes to a InfluxDB endpoint. This class is thread safe.
36+
*
37+
* @author Daniel Guerrero (dguerreromartin at groupon dot com)
38+
*/
39+
public final class InfluxDbSink extends HttpPostSink {
40+
41+
42+
/**
43+
* {@inheritDoc}
44+
*/
45+
@Override
46+
protected Request createRequest(final AsyncHttpClient client, final byte[] serializedData) {
47+
return new RequestBuilder()
48+
.setUrl(getUri().toString())
49+
.setHeader("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8")
50+
.setBody(serializedData)
51+
.setMethod(HttpMethods.POST.value())
52+
.build();
53+
}
54+
55+
/**
56+
* {@inheritDoc}
57+
*/
58+
@Override
59+
protected Collection<byte[]> serialize(final PeriodicData periodicData) {
60+
final String period = periodicData.getPeriod()
61+
.toString(ISOPeriodFormat.standard());
62+
63+
final Map<String, MetricFormat> metrics = Maps.newHashMap();
64+
65+
for (final AggregatedData data : periodicData.getData()) {
66+
final String metricName = buildMetricName(period, data.getFQDSN());
67+
MetricFormat formattedData = metrics.get(metricName);
68+
69+
if (formattedData == null) {
70+
formattedData = new MetricFormat(
71+
metricName,
72+
periodicData.getStart().getMillis(),
73+
periodicData.getDimensions()
74+
)
75+
.addTag("service", data.getFQDSN().getService())
76+
.addTag("cluster", data.getFQDSN().getCluster());
77+
78+
metrics.put(metricName, formattedData);
79+
}
80+
81+
formattedData.addMetric(
82+
data.getFQDSN().getStatistic().getName(),
83+
data.getValue().getValue()
84+
);
85+
//TODO(dguerreromartin): include Conditional
86+
}
87+
88+
final StringJoiner dataList = new StringJoiner("\n");
89+
for (MetricFormat metric : metrics.values()) {
90+
dataList.add(metric.buildMetricString());
91+
}
92+
93+
return Lists.newArrayList(dataList.toString().getBytes(StandardCharsets.UTF_8));
94+
}
95+
96+
97+
private String buildMetricName(final String period, final FQDSN fqdsn) {
98+
return new StringBuilder()
99+
.append(period).append(".")
100+
.append(fqdsn.getMetric())
101+
.toString();
102+
}
103+
104+
/**
105+
* Implementation of output format for <code>InfluxDB</code> metrics.
106+
* The format follow the pattern (https://docs.influxdata.com/influxdb/v0.10/write_protocols/write_syntax/):
107+
* measurement[,tag_key1=tag_value1...] field_key=field_value[,field_key2=field_value2] [timestamp]
108+
*
109+
* The spaces, comma and = will be escaped from the measurement,tags and fields
110+
*
111+
* @author Daniel Guerrero (dguerreromartin at groupon dot com)
112+
*/
113+
private static class MetricFormat {
114+
115+
public MetricFormat addTag(final String tagName, final String value) {
116+
this._tags.put(encode(tagName), encode(value));
117+
return this;
118+
}
119+
120+
public MetricFormat addMetric(final String statisticName, final Double value) {
121+
this._values.put(encode(statisticName), value);
122+
return this;
123+
}
124+
125+
public String buildMetricString() {
126+
final StringJoiner metricName = new StringJoiner(",");
127+
metricName.add(_metric);
128+
129+
for (final Map.Entry<String, String> entryTag : this._tags.entrySet()) {
130+
metricName.add(String.format("%s=%s", entryTag.getKey(), entryTag.getValue()));
131+
}
132+
133+
final StringJoiner valuesJoiner = new StringJoiner(",");
134+
135+
for (final Map.Entry<String, Double> entryValue : this._values.entrySet()) {
136+
valuesJoiner.add(String.format("%s=%s", entryValue.getKey(), entryValue.getValue()));
137+
}
138+
139+
return String.format("%s %s %d", metricName.toString(), valuesJoiner.toString(), _timestamp);
140+
}
141+
142+
MetricFormat(final String metric, final long timestamp, final Map<String, String> tags) {
143+
this._metric = encode(metric);
144+
this._timestamp = timestamp;
145+
for (final Map.Entry<String, String> tag : tags.entrySet()) {
146+
this._tags.put(encode(tag.getKey()), encode(tag.getValue()));
147+
}
148+
}
149+
150+
private String encode(final String name) {
151+
return name
152+
.replace(",", "\\,")
153+
.replace(" ", "\\ ")
154+
.replace("=", "_");
155+
}
156+
157+
private final String _metric;
158+
private final long _timestamp;
159+
private final Map<String, Double> _values = Maps.newHashMap();
160+
private final Map<String, String> _tags = Maps.newHashMap();
161+
162+
}
163+
164+
/**
165+
* Private constructor.
166+
*
167+
* @param builder Instance of <code>Builder</code>.
168+
*/
169+
private InfluxDbSink(final Builder builder) {
170+
super(builder);
171+
}
172+
173+
/**
174+
* Implementation of builder pattern for <code>InfluxDbSink</code>.
175+
*
176+
* @author Daniel Guerrero (dguerreromartin at groupon dot com)
177+
*/
178+
public static final class Builder extends HttpPostSink.Builder<Builder, InfluxDbSink> {
179+
180+
/**
181+
* Public constructor.
182+
*/
183+
public Builder() {
184+
super(InfluxDbSink::new);
185+
}
186+
187+
/**
188+
* {@inheritDoc}
189+
*/
190+
@Override
191+
protected Builder self() {
192+
return this;
193+
}
194+
}
195+
196+
}

0 commit comments

Comments
 (0)