Skip to content

Commit d3a06ff

Browse files
vjkoskelaBrandonArp
authored andcommitted
Add statsd protocol compatible udp source. (#62)
* Add statsd protocol compatible udp source.
1 parent 37284f7 commit d3a06ff

File tree

6 files changed

+851
-6
lines changed

6 files changed

+851
-6
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
<scala.java.compat.version>0.7.0</scala.java.compat.version>
112112
<scala.library.version>2.11.7</scala.library.version>
113113
<slf4j.version>1.7.12</slf4j.version>
114+
<statsd.client.timgroup>3.0.1</statsd.client.timgroup>
114115
<typesafe.config.version>1.3.1</typesafe.config.version>
115116
<vertx.core.version>2.1.6</vertx.core.version>
116117
<wiremock.version>1.57</wiremock.version>
@@ -673,6 +674,12 @@
673674
<version>${commons.math3.version}</version>
674675
<scope>test</scope>
675676
</dependency>
677+
<dependency>
678+
<groupId>com.timgroup</groupId>
679+
<artifactId>java-statsd-client</artifactId>
680+
<version>${statsd.client.timgroup}</version>
681+
<scope>test</scope>
682+
</dependency>
676683
</dependencies>
677684

678685
<profiles>
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/**
2+
* Copyright 2017 Inscope Metrics, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.sources;
17+
18+
import akka.actor.ActorRef;
19+
import akka.actor.Props;
20+
import akka.actor.UntypedActor;
21+
import akka.io.Udp;
22+
import akka.io.UdpMessage;
23+
import com.arpnetworking.metrics.common.parsers.Parser;
24+
import com.arpnetworking.metrics.mad.model.Record;
25+
import com.arpnetworking.metrics.mad.parsers.StatsdToRecordParser;
26+
import com.arpnetworking.steno.Logger;
27+
import com.arpnetworking.steno.LoggerFactory;
28+
import jdk.nashorn.internal.runtime.ParserException;
29+
import net.sf.oval.constraint.NotEmpty;
30+
import net.sf.oval.constraint.NotNull;
31+
import net.sf.oval.constraint.Range;
32+
33+
import java.net.InetSocketAddress;
34+
import java.nio.ByteBuffer;
35+
import java.time.Duration;
36+
import java.util.List;
37+
import java.util.Objects;
38+
39+
/**
40+
* Source that uses Statsd as input.
41+
*
42+
* @author Ville Koskela (ville dot koskela at inscopemetrics dot com)
43+
*/
44+
public final class StatsdSource extends ActorSource {
45+
46+
@Override
47+
protected Props createProps() {
48+
return Actor.props(this);
49+
}
50+
51+
/**
52+
* Protected constructor.
53+
*
54+
* @param builder Instance of <code>Builder</code>.
55+
*/
56+
private StatsdSource(final Builder builder) {
57+
super(builder);
58+
_host = builder._host;
59+
_port = builder._port;
60+
}
61+
62+
private final String _host;
63+
private final int _port;
64+
65+
private static final Logger LOGGER = LoggerFactory.getLogger(StatsdSource.class);
66+
private static final Parser<List<Record>, ByteBuffer> PARSER = new StatsdToRecordParser();
67+
68+
/**
69+
* Internal actor to process requests.
70+
*/
71+
/* package private */ static final class Actor extends UntypedActor {
72+
/**
73+
* Creates a {@link Props} for this actor.
74+
*
75+
* @param source The {@link StatsdSource} to send notifications through.
76+
* @return A new {@link Props}
77+
*/
78+
/* package private */ static Props props(final StatsdSource source) {
79+
return Props.create(Actor.class, source);
80+
}
81+
82+
@Override
83+
public void onReceive(final Object message) throws Exception {
84+
if (Objects.equals(IS_READY, message)) {
85+
getSender().tell(_isReady, getSelf());
86+
} else if (message instanceof Udp.Bound) {
87+
final Udp.Bound updBound = (Udp.Bound) message;
88+
LOGGER.debug()
89+
.setMessage("Statsd server binding complete")
90+
.addData("address", updBound.localAddress().getAddress().getHostAddress())
91+
.addData("port", updBound.localAddress().getPort())
92+
.log();
93+
_socket = getSender();
94+
_isReady = true;
95+
} else if (message instanceof Udp.Received) {
96+
final Udp.Received updReceived = (Udp.Received) message;
97+
LOGGER.trace()
98+
.setMessage("Statsd received datagram")
99+
.addData("bytes", updReceived.data().size())
100+
.log();
101+
102+
try {
103+
// NOTE: The parsing occurs in the actor itself which can become a bottleneck
104+
// if there are more records to be parsed then a single thread can handle.
105+
final List<Record> records = PARSER.parse(updReceived.data().toByteBuffer());
106+
records.forEach(_sink::notify);
107+
} catch (final ParserException e) {
108+
BAD_REQUEST_LOGGER.warn()
109+
.setMessage("Error handling statsd datagram")
110+
.setThrowable(e)
111+
.log();
112+
}
113+
114+
} else if (Objects.equals(message, UdpMessage.unbind())) {
115+
_socket.tell(message, getSelf());
116+
117+
} else if (message instanceof Udp.Unbound) {
118+
getContext().stop(getSelf());
119+
120+
} else {
121+
unhandled(message);
122+
}
123+
}
124+
125+
/**
126+
* Constructor.
127+
*
128+
* @param source The {@link StatsdSource} to send notifications through.
129+
*/
130+
/* package private */ Actor(final StatsdSource source) {
131+
_sink = source;
132+
_host = source._host;
133+
_port = source._port;
134+
135+
final ActorRef udpManager = Udp.get(getContext().system()).getManager();
136+
udpManager.tell(
137+
UdpMessage.bind(getSelf(), new InetSocketAddress(_host, _port)),
138+
getSelf());
139+
}
140+
141+
private boolean _isReady = false;
142+
private ActorRef _socket;
143+
private final StatsdSource _sink;
144+
private final String _host;
145+
private final int _port;
146+
147+
private static final String IS_READY = "IsReady";
148+
private static final Logger BAD_REQUEST_LOGGER =
149+
LoggerFactory.getRateLimitLogger(StatsdSource.class, Duration.ofSeconds(30));
150+
}
151+
152+
/**
153+
* StatsdSource {@link BaseSource.Builder} implementation.
154+
*
155+
* @author Ville Koskela (ville dot koskela at inscopemetrics dot com)
156+
*/
157+
public static final class Builder extends ActorSource.Builder<Builder, StatsdSource> {
158+
159+
/**
160+
* Public constructor.
161+
*/
162+
public Builder() {
163+
super(StatsdSource::new);
164+
}
165+
166+
/**
167+
* Sets the host to bind to. Optional. Cannot be null or empty.
168+
*
169+
* @param value the port to listen on
170+
* @return This builder
171+
*/
172+
public Builder setHost(final String value) {
173+
_host = value;
174+
return self();
175+
}
176+
177+
/**
178+
* Sets the port to listen on. Optional. Cannot be null. Must be
179+
* between 1 and 65535 (inclusive). Default is 8125.
180+
*
181+
* @param value the port to listen on
182+
* @return This builder
183+
*/
184+
public Builder setPort(final Integer value) {
185+
_port = value;
186+
return self();
187+
}
188+
189+
@Override
190+
protected Builder self() {
191+
return this;
192+
}
193+
194+
@NotNull
195+
@NotEmpty
196+
private String _host = "localhost";
197+
@NotNull
198+
@Range(min = 1, max = 65535)
199+
private Integer _port = 8125;
200+
}
201+
}

src/main/java/com/arpnetworking/metrics/mad/Main.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,12 +144,8 @@ public static void main(final String[] args) {
144144
} catch (final InterruptedException e) {
145145
throw new RuntimeException(e);
146146
} finally {
147-
if (configurator.isPresent()) {
148-
configurator.get().shutdown();
149-
}
150-
if (configuration.isPresent()) {
151-
configuration.get().shutdown();
152-
}
147+
configurator.ifPresent(Configurator::shutdown);
148+
configuration.ifPresent(DynamicConfiguration::shutdown);
153149
// Notify the shutdown that we're done
154150
SHUTDOWN_SEMAPHORE.release();
155151
}

0 commit comments

Comments
 (0)