Skip to content

Commit 7d71c1c

Browse files
vjkoskelaBrandonArp
authored andcommitted
Add telegraf source support. (#66)
1 parent 76a46a4 commit 7d71c1c

File tree

7 files changed

+812
-2
lines changed

7 files changed

+812
-2
lines changed

src/main/java/com/arpnetworking/metrics/common/sources/StatsdSource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import akka.io.Udp;
2222
import akka.io.UdpMessage;
2323
import com.arpnetworking.metrics.common.parsers.Parser;
24+
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
2425
import com.arpnetworking.metrics.mad.model.Record;
2526
import com.arpnetworking.metrics.mad.parsers.StatsdToRecordParser;
2627
import com.arpnetworking.steno.Logger;
2728
import com.arpnetworking.steno.LoggerFactory;
28-
import jdk.nashorn.internal.runtime.ParserException;
2929
import net.sf.oval.constraint.NotEmpty;
3030
import net.sf.oval.constraint.NotNull;
3131
import net.sf.oval.constraint.Range;
@@ -111,7 +111,7 @@ public void onReceive(final Object message) throws Exception {
111111
// if there are more records to be parsed then a single thread can handle.
112112
final List<Record> records = PARSER.parse(updReceived.data().toByteBuffer());
113113
records.forEach(_sink::notify);
114-
} catch (final ParserException e) {
114+
} catch (final ParsingException e) {
115115
BAD_REQUEST_LOGGER.warn()
116116
.setMessage("Error handling statsd datagram")
117117
.addData("socket", _socket)
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
/**
2+
* Copyright 2017 Inscope Metrics, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.sources;
17+
18+
import akka.actor.ActorRef;
19+
import akka.actor.Props;
20+
import akka.actor.UntypedActor;
21+
import akka.io.Tcp;
22+
import akka.io.TcpMessage;
23+
import akka.util.ByteString;
24+
import akka.util.ByteStringBuilder;
25+
import com.arpnetworking.metrics.common.parsers.Parser;
26+
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
27+
import com.arpnetworking.metrics.mad.model.Record;
28+
import com.arpnetworking.metrics.mad.parsers.TelegrafJsonToRecordParser;
29+
import com.arpnetworking.steno.Logger;
30+
import com.arpnetworking.steno.LoggerFactory;
31+
import net.sf.oval.constraint.Min;
32+
import net.sf.oval.constraint.NotEmpty;
33+
import net.sf.oval.constraint.NotNull;
34+
import net.sf.oval.constraint.Range;
35+
36+
import java.net.InetSocketAddress;
37+
import java.nio.ByteBuffer;
38+
import java.time.Duration;
39+
import java.util.List;
40+
import java.util.Objects;
41+
42+
/**
43+
* Source that uses Telegraf TCP SocketWriter with JSON as input. This sink
44+
* merges the name with the field using Telegraf's standard '.' (period)
45+
* separator. You can wrap this sink with the MappingSource in order to convert
46+
* to '/' (slash) delimited metric names.
47+
*
48+
* Sample MAD configuration:
49+
* <pre>
50+
* {
51+
* type="com.arpnetworking.metrics.mad.sources.MappingSource"
52+
* name="telegraftcp_mapping_source"
53+
* findAndReplace={
54+
* "\\."=["/"]
55+
* }
56+
* source={
57+
* type="com.arpnetworking.metrics.common.sources.TelegrafTcpSource"
58+
* name="telegraftcp_source"
59+
* host="0.0.0.0"
60+
* port="8094"
61+
* }
62+
* }
63+
* </pre>
64+
*
65+
* Sample Telegraf configuration:
66+
* <pre>
67+
* [agent]
68+
* interval="1s"
69+
* flush_interval="1s"
70+
* round_interval=true
71+
* omit_hostname=false
72+
*
73+
* [global_tags]
74+
* service="telegraf"
75+
* cluster="telegraf_local"
76+
*
77+
* [[outputs.socket_writer]]
78+
* address = "tcp://127.0.0.1:8094"
79+
* data_format = "json"
80+
*
81+
* [[inputs.cpu]]
82+
* percpu = true
83+
* totalcpu = true
84+
* collect_cpu_time = false
85+
* report_active = false
86+
* </pre>
87+
*
88+
* TODO(ville): Parameterize the parser to use.
89+
*
90+
* @author Ville Koskela (ville dot koskela at inscopemetrics dot com)
91+
*/
92+
public final class TelegrafTcpSource extends ActorSource {
93+
94+
@Override
95+
protected Props createProps() {
96+
return Actor.props(this);
97+
}
98+
99+
/**
100+
* Protected constructor.
101+
*
102+
* @param builder Instance of <code>Builder</code>.
103+
*/
104+
private TelegrafTcpSource(final Builder builder) {
105+
super(builder);
106+
_host = builder._host;
107+
_port = builder._port;
108+
_acceptQueue = builder._acceptQueue;
109+
}
110+
111+
private final String _host;
112+
private final int _port;
113+
private final int _acceptQueue;
114+
115+
private static final Logger LOGGER = LoggerFactory.getLogger(TelegrafTcpSource.class);
116+
private static final Parser<List<Record>, ByteBuffer> PARSER = new TelegrafJsonToRecordParser.Builder().build();
117+
118+
/**
119+
* Name of the actor created to receive the Telegraf JSON.
120+
*/
121+
public static final String ACTOR_NAME = "telegraf-tcp-json";
122+
123+
/**
124+
* Internal actor to process requests.
125+
*/
126+
/* package private */ static final class Actor extends UntypedActor {
127+
/**
128+
* Creates a {@link Props} for this actor.
129+
*
130+
* @param source The {@link TelegrafTcpSource} to send notifications through.
131+
* @return A new {@link Props}
132+
*/
133+
/* package private */ static Props props(final TelegrafTcpSource source) {
134+
return Props.create(Actor.class, source);
135+
}
136+
137+
@Override
138+
public void preStart() {
139+
_tcpManager.tell(
140+
TcpMessage.bind(
141+
getSelf(),
142+
new InetSocketAddress(_host, _port),
143+
_acceptQueue),
144+
getSelf());
145+
}
146+
147+
@Override
148+
public void onReceive(final Object message) throws Exception {
149+
if (Objects.equals(IS_READY, message)) {
150+
getSender().tell(_isReady, getSelf());
151+
} else if (message instanceof Tcp.Bound) {
152+
final Tcp.Bound tcpBound = (Tcp.Bound) message;
153+
_isReady = true;
154+
_tcpManager.tell(message, getSelf());
155+
LOGGER.info()
156+
.setMessage("Telegraf tcp server binding complete")
157+
.addData("address", tcpBound.localAddress().getAddress().getHostAddress())
158+
.addData("port", tcpBound.localAddress().getPort())
159+
.log();
160+
} else if (message instanceof Tcp.CommandFailed) {
161+
getContext().stop(getSelf());
162+
LOGGER.warn()
163+
.setMessage("Telegraf tcp server bad command")
164+
.log();
165+
} else if (message instanceof Tcp.Connected) {
166+
final Tcp.Connected tcpConnected = (Tcp.Connected) message;
167+
_tcpManager.tell(message, getSelf());
168+
LOGGER.debug()
169+
.setMessage("Telegraf tcp connection established")
170+
.addData("remoteAddress", tcpConnected.remoteAddress().getAddress().getHostAddress())
171+
.addData("remotePort", tcpConnected.remoteAddress().getPort())
172+
.log();
173+
174+
final ActorRef handler = getContext().actorOf(Props.create(
175+
RequestHandlerActor.class,
176+
_sink,
177+
tcpConnected.remoteAddress()));
178+
getSender().tell(TcpMessage.register(handler), getSelf());
179+
} else {
180+
unhandled(message);
181+
}
182+
}
183+
184+
/**
185+
* Constructor.
186+
*
187+
* @param source The {@link TelegrafTcpSource} to send notifications through.
188+
*/
189+
/* package private */ Actor(final TelegrafTcpSource source) {
190+
_sink = source;
191+
_host = source._host;
192+
_port = source._port;
193+
_acceptQueue = source._acceptQueue;
194+
195+
_tcpManager = Tcp.get(getContext().system()).manager();
196+
}
197+
198+
private boolean _isReady = false;
199+
private final TelegrafTcpSource _sink;
200+
private final String _host;
201+
private final int _port;
202+
private final int _acceptQueue;
203+
private final ActorRef _tcpManager;
204+
205+
private static final String IS_READY = "IsReady";
206+
}
207+
208+
/**
209+
* Internal actor to process requests.
210+
*/
211+
/* package private */ static final class RequestHandlerActor extends UntypedActor {
212+
213+
/* package private */ RequestHandlerActor(final TelegrafTcpSource sink, final InetSocketAddress remoteAddress) {
214+
_sink = sink;
215+
_remoteAddress = remoteAddress;
216+
}
217+
218+
@Override
219+
public void onReceive(final Object message) throws Throwable {
220+
if (message instanceof Tcp.Received) {
221+
final ByteString data = ((Tcp.Received) message).data();
222+
final int indexOfNewline = data.indexOf('\012');
223+
if (indexOfNewline >= 0) {
224+
_buffer.append(data.slice(0, indexOfNewline));
225+
process(_buffer.result());
226+
_buffer.clear();
227+
_buffer.append(data.slice(indexOfNewline + 1, data.size() - 1));
228+
} else {
229+
_buffer.append(data);
230+
}
231+
232+
LOGGER.trace()
233+
.setMessage("Telegraf tcp data received")
234+
.addData("remoteAddress", _remoteAddress.getAddress().getHostAddress())
235+
.addData("remotePort", _remoteAddress.getPort())
236+
.addData("data", data)
237+
.log();
238+
} else if (message instanceof Tcp.ConnectionClosed) {
239+
getContext().stop(getSelf());
240+
LOGGER.debug()
241+
.setMessage("Telegraf tcp connection close")
242+
.addData("remoteAddress", _remoteAddress.getAddress().getHostAddress())
243+
.addData("remotePort", _remoteAddress.getPort())
244+
.log();
245+
}
246+
}
247+
248+
private void process(final ByteString data) {
249+
try {
250+
// NOTE: The parsing occurs in the actor itself which can become a bottleneck
251+
// if there are more records to be parsed then a single thread can handle.
252+
final List<Record> records = PARSER.parse(data.toByteBuffer());
253+
records.forEach(_sink::notify);
254+
} catch (final ParsingException e) {
255+
BAD_REQUEST_LOGGER.warn()
256+
.setMessage("Error handling telegraph tcp json")
257+
.addData("remoteAddress", _remoteAddress.getAddress().getHostAddress())
258+
.addData("remotePort", _remoteAddress.getPort())
259+
.setThrowable(e)
260+
.log();
261+
}
262+
}
263+
264+
private final ByteStringBuilder _buffer = new ByteStringBuilder();
265+
private final TelegrafTcpSource _sink;
266+
private final InetSocketAddress _remoteAddress;
267+
268+
private static final Logger BAD_REQUEST_LOGGER =
269+
LoggerFactory.getRateLimitLogger(TelegrafTcpSource.class, Duration.ofSeconds(30));
270+
}
271+
272+
/**
273+
* TelegrafTcpSource {@link BaseSource.Builder} implementation.
274+
*
275+
* @author Ville Koskela (ville dot koskela at inscopemetrics dot com)
276+
*/
277+
public static final class Builder extends ActorSource.Builder<Builder, TelegrafTcpSource> {
278+
279+
/**
280+
* Public constructor.
281+
*/
282+
public Builder() {
283+
super(TelegrafTcpSource::new);
284+
setActorName(ACTOR_NAME);
285+
}
286+
287+
/**
288+
* Sets the host to bind to. Optional. Cannot be null or empty.
289+
*
290+
* @param value the port to listen on
291+
* @return This builder
292+
*/
293+
public Builder setHost(final String value) {
294+
_host = value;
295+
return self();
296+
}
297+
298+
/**
299+
* Sets the port to listen on. Optional. Cannot be null. Must be
300+
* between 1 and 65535 (inclusive). Default is 8094.
301+
*
302+
* @param value the port to listen on
303+
* @return This builder
304+
*/
305+
public Builder setPort(final Integer value) {
306+
_port = value;
307+
return self();
308+
}
309+
310+
/**
311+
* Sets the accept queue length. Optional. Cannot be null. Must be at
312+
* least 0. Default is 100.
313+
*
314+
* @param value the port to listen on
315+
* @return This builder
316+
*/
317+
public Builder setAcceptQueue(final Integer value) {
318+
_acceptQueue = value;
319+
return self();
320+
}
321+
322+
@Override
323+
protected Builder self() {
324+
return this;
325+
}
326+
327+
@NotNull
328+
@NotEmpty
329+
private String _host = "localhost";
330+
@NotNull
331+
@Range(min = 1, max = 65535)
332+
private Integer _port = 8094;
333+
@NotNull
334+
@Min(0)
335+
private Integer _acceptQueue = 100;
336+
}
337+
}

0 commit comments

Comments
 (0)