Skip to content

Commit 5f3b208

Browse files
vjkoskelaBrandonArp
authored andcommitted
Add support for a Graphite plaintext source. (#67)
1 parent c54539a commit 5f3b208

File tree

14 files changed

+1080
-361
lines changed

14 files changed

+1080
-361
lines changed

src/main/java/com/arpnetworking/metrics/common/parsers/exceptions/ParsingException.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.arpnetworking.metrics.common.parsers.exceptions;
1717

18+
import com.arpnetworking.logback.annotations.Loggable;
1819
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1920

2021
/**
@@ -23,6 +24,7 @@
2324
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
2425
*/
2526
@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
27+
@Loggable
2628
public class ParsingException extends Exception {
2729
/**
2830
* Public constructor with a description.
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/**
2+
* Copyright 2017 Inscope Metrics, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.sources;
17+
18+
import akka.actor.ActorRef;
19+
import akka.actor.Props;
20+
import akka.actor.UntypedActor;
21+
import akka.io.Tcp;
22+
import akka.io.TcpMessage;
23+
import com.arpnetworking.steno.Logger;
24+
import com.arpnetworking.steno.LoggerFactory;
25+
import net.sf.oval.constraint.Min;
26+
import net.sf.oval.constraint.NotEmpty;
27+
import net.sf.oval.constraint.NotNull;
28+
import net.sf.oval.constraint.Range;
29+
30+
import java.net.InetSocketAddress;
31+
import java.util.Objects;
32+
import java.util.function.Function;
33+
34+
/**
35+
* Base source that listens on a tcp port. Subclasses should set appropriate
36+
* defaults on the abstract builder.
37+
*
38+
* @author Ville Koskela (ville dot koskela at inscopemetrics dot com)
39+
*/
40+
public abstract class BaseTcpSource extends ActorSource {
41+
42+
/**
43+
* Protected constructor.
44+
*
45+
* @param builder Instance of <code>Builder</code>.
46+
*/
47+
protected BaseTcpSource(final Builder<?, ?> builder) {
48+
super(builder);
49+
_host = builder._host;
50+
_port = builder._port;
51+
_acceptQueue = builder._acceptQueue;
52+
}
53+
54+
private final String _host;
55+
private final int _port;
56+
private final int _acceptQueue;
57+
58+
private static final Logger LOGGER = LoggerFactory.getLogger(BaseTcpSource.class);
59+
60+
/**
61+
* Internal actor to process requests.
62+
*/
63+
/* package private */ abstract static class BaseTcpListenerActor extends UntypedActor {
64+
/**
65+
* Creates a {@link Props} for this actor.
66+
*
67+
* @param source The {@link BaseTcpSource} to send notifications through.
68+
* @return A new {@link Props}
69+
*/
70+
/* package private */ static Props props(final BaseTcpSource source) {
71+
return Props.create(BaseTcpListenerActor.class, source);
72+
}
73+
74+
@Override
75+
public void preStart() {
76+
_tcpManager.tell(
77+
TcpMessage.bind(
78+
getSelf(),
79+
new InetSocketAddress(_host, _port),
80+
_acceptQueue),
81+
getSelf());
82+
}
83+
84+
@Override
85+
public void onReceive(final Object message) throws Exception {
86+
if (Objects.equals(IS_READY, message)) {
87+
getSender().tell(_isReady, getSelf());
88+
} else if (message instanceof Tcp.Bound) {
89+
final Tcp.Bound tcpBound = (Tcp.Bound) message;
90+
_isReady = true;
91+
_tcpManager.tell(message, getSelf());
92+
LOGGER.info()
93+
.setMessage("Tcp server binding complete")
94+
.addData("name", _sink.getName())
95+
.addData("address", tcpBound.localAddress().getAddress().getHostAddress())
96+
.addData("port", tcpBound.localAddress().getPort())
97+
.log();
98+
} else if (message instanceof Tcp.CommandFailed) {
99+
getContext().stop(getSelf());
100+
LOGGER.warn()
101+
.setMessage("Tcp server bad command")
102+
.addData("name", _sink.getName())
103+
.log();
104+
} else if (message instanceof Tcp.Connected) {
105+
final Tcp.Connected tcpConnected = (Tcp.Connected) message;
106+
_tcpManager.tell(message, getSelf());
107+
LOGGER.debug()
108+
.setMessage("Tcp connection established")
109+
.addData("name", _sink.getName())
110+
.addData("remoteAddress", tcpConnected.remoteAddress().getAddress().getHostAddress())
111+
.addData("remotePort", tcpConnected.remoteAddress().getPort())
112+
.log();
113+
114+
final ActorRef handler = createHandler(_sink, tcpConnected);
115+
getSender().tell(TcpMessage.register(handler), getSelf());
116+
} else {
117+
unhandled(message);
118+
}
119+
}
120+
121+
/**
122+
* Abstract method to create tcp message actor instance for each connection.
123+
*
124+
* @param sink the source to bind the actor to
125+
* @param connected the connected message
126+
* @return the actor reference
127+
*/
128+
protected abstract ActorRef createHandler(final BaseTcpSource sink, final Tcp.Connected connected);
129+
130+
protected BaseTcpSource getSink() {
131+
return _sink;
132+
}
133+
134+
/**
135+
* Constructor.
136+
*
137+
* @param source The {@link BaseTcpSource} to send notifications through.
138+
*/
139+
protected BaseTcpListenerActor(final BaseTcpSource source) {
140+
_sink = source;
141+
_host = source._host;
142+
_port = source._port;
143+
_acceptQueue = source._acceptQueue;
144+
145+
_tcpManager = Tcp.get(getContext().system()).manager();
146+
}
147+
148+
private boolean _isReady = false;
149+
private final BaseTcpSource _sink;
150+
private final String _host;
151+
private final int _port;
152+
private final int _acceptQueue;
153+
private final ActorRef _tcpManager;
154+
155+
private static final String IS_READY = "IsReady";
156+
}
157+
158+
/**
159+
* BaseTcpSource {@link Builder} implementation.
160+
*
161+
* @param <B> the builder type
162+
* @param <S> the source type
163+
*
164+
* @author Ville Koskela (ville dot koskela at inscopemetrics dot com)
165+
*/
166+
public abstract static class Builder<B extends Builder<B, S>, S extends BaseTcpSource>
167+
extends ActorSource.Builder<B, S> {
168+
169+
/**
170+
* Public constructor.
171+
*
172+
* @param targetConstructor the concrete source constructor to build through
173+
*/
174+
protected Builder(final Function<B, S> targetConstructor) {
175+
super(targetConstructor);
176+
}
177+
178+
/**
179+
* Sets the host to bind to. Optional. Cannot be null or empty.
180+
*
181+
* @param value the port to listen on
182+
* @return This builder
183+
*/
184+
public B setHost(final String value) {
185+
_host = value;
186+
return self();
187+
}
188+
189+
/**
190+
* Sets the port to listen on. Required. Cannot be null. Must be
191+
* between 1 and 65535 (inclusive). Subclasses may set a default
192+
* port, in which case this field is effectively optional.
193+
*
194+
* @param value the port to listen on
195+
* @return This builder
196+
*/
197+
public B setPort(final Integer value) {
198+
_port = value;
199+
return self();
200+
}
201+
202+
/**
203+
* Sets the accept queue length. Optional. Cannot be null. Must be at
204+
* least 0. Default is 100.
205+
*
206+
* @param value the port to listen on
207+
* @return This builder
208+
*/
209+
public B setAcceptQueue(final Integer value) {
210+
_acceptQueue = value;
211+
return self();
212+
}
213+
214+
@NotNull
215+
@NotEmpty
216+
private String _host = "localhost";
217+
@NotNull
218+
@Range(min = 1, max = 65535)
219+
private Integer _port;
220+
@NotNull
221+
@Min(0)
222+
private Integer _acceptQueue = 100;
223+
}
224+
}

0 commit comments

Comments
 (0)