Skip to content

Commit 2dde0e3

Browse files
vjkoskelaBrandonArp
authored andcommitted
Support multiline statsd datagrams and single datagrams ending with a newline. (#64)
1 parent 3fb7ad1 commit 2dde0e3

File tree

4 files changed

+48
-28
lines changed

4 files changed

+48
-28
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,7 @@
356356
<ports>
357357
<port>${debugJavaPort}:${debugJavaPort}</port>
358358
<port>7090:7090</port>
359+
<port>8125:8125/udp</port>
359360
</ports>
360361
<volumes>
361362
<bind>

src/main/java/com/arpnetworking/metrics/common/parsers/exceptions/ParsingException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public byte[] getOffendingData() {
5151
return _offendingData;
5252
}
5353

54-
// TODO(barp): change this into a List or similar struture to ensure no modifications
54+
// TODO(barp): change this into a List or similar structure to ensure no modifications
5555
private final byte[] _offendingData;
5656

5757
private static final long serialVersionUID = 1L;

src/main/java/com/arpnetworking/metrics/common/sources/StatsdSource.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private StatsdSource(final Builder builder) {
6464

6565
private static final Logger LOGGER = LoggerFactory.getLogger(StatsdSource.class);
6666
private static final Parser<List<Record>, ByteBuffer> PARSER = new StatsdToRecordParser();
67-
67+
6868
/**
6969
* Name of the actor created to receive the Statsd datagrams.
7070
*/
@@ -90,18 +90,20 @@ public void onReceive(final Object message) throws Exception {
9090
getSender().tell(_isReady, getSelf());
9191
} else if (message instanceof Udp.Bound) {
9292
final Udp.Bound updBound = (Udp.Bound) message;
93-
LOGGER.debug()
93+
_socket = getSender();
94+
_isReady = true;
95+
LOGGER.info()
9496
.setMessage("Statsd server binding complete")
9597
.addData("address", updBound.localAddress().getAddress().getHostAddress())
9698
.addData("port", updBound.localAddress().getPort())
99+
.addData("socket", _socket)
97100
.log();
98-
_socket = getSender();
99-
_isReady = true;
100101
} else if (message instanceof Udp.Received) {
101102
final Udp.Received updReceived = (Udp.Received) message;
102103
LOGGER.trace()
103104
.setMessage("Statsd received datagram")
104105
.addData("bytes", updReceived.data().size())
106+
.addData("socket", _socket)
105107
.log();
106108

107109
try {
@@ -112,14 +114,23 @@ public void onReceive(final Object message) throws Exception {
112114
} catch (final ParserException e) {
113115
BAD_REQUEST_LOGGER.warn()
114116
.setMessage("Error handling statsd datagram")
117+
.addData("socket", _socket)
115118
.setThrowable(e)
116119
.log();
117120
}
118121

119122
} else if (Objects.equals(message, UdpMessage.unbind())) {
123+
LOGGER.debug()
124+
.setMessage("Statsd unbind")
125+
.addData("socket", _socket)
126+
.log();
120127
_socket.tell(message, getSelf());
121128

122129
} else if (message instanceof Udp.Unbound) {
130+
LOGGER.debug()
131+
.setMessage("Statsd unbound")
132+
.addData("socket", _socket)
133+
.log();
123134
getContext().stop(getSelf());
124135

125136
} else {

src/main/java/com/arpnetworking/metrics/mad/parsers/StatsdToRecordParser.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import com.arpnetworking.tsdcore.model.Quantity;
2525
import com.arpnetworking.tsdcore.model.Unit;
2626
import com.google.common.base.Charsets;
27+
import com.google.common.base.Splitter;
2728
import com.google.common.base.Strings;
29+
import com.google.common.collect.ImmutableList;
2830
import com.google.common.collect.ImmutableMap;
2931
import com.google.common.collect.ImmutableSet;
3032
import com.google.common.collect.Maps;
@@ -77,38 +79,43 @@ public final class StatsdToRecordParser implements Parser<List<Record>, ByteBuff
7779
public List<Record> parse(final ByteBuffer datagram) throws ParsingException {
7880
// CHECKSTYLE.OFF: IllegalInstantiation - This is the recommended way
7981
final String datagramAsString = new String(datagram.array(), Charsets.UTF_8);
80-
// CHECKSTYLE.ON: IllegalInstantiation
81-
final Matcher matcher = STATSD_PATTERN.matcher(datagramAsString);
82-
if (!matcher.matches()) {
83-
throw new ParsingException("Invalid statsd datagram", datagram.array());
84-
}
82+
final ImmutableList.Builder<Record> recordListBuilder = ImmutableList.builder();
83+
for (final String line : LINE_SPLITTER.split(datagramAsString)) {
84+
// CHECKSTYLE.ON: IllegalInstantiation
85+
final Matcher matcher = STATSD_PATTERN.matcher(line);
86+
if (!matcher.matches()) {
87+
throw new ParsingException("Invalid statsd line", line.getBytes(Charsets.UTF_8));
88+
}
8589

86-
// Parse the name
87-
final String name = parseName(datagram, matcher.group("NAME"));
90+
// Parse the name
91+
final String name = parseName(datagram, matcher.group("NAME"));
8892

89-
// Parse the _metricType
90-
final StatsdType type = parseStatsdType(datagram, matcher.group("TYPE"));
93+
// Parse the _metricType
94+
final StatsdType type = parseStatsdType(datagram, matcher.group("TYPE"));
9195

92-
// Parse the value
93-
final Number value = parseValue(datagram, matcher.group("VALUE"), type);
96+
// Parse the value
97+
final Number value = parseValue(datagram, matcher.group("VALUE"), type);
9498

95-
// Parse the value
96-
final Optional<Double> sampleRate = parseSampleRate(datagram, matcher.group("SAMPLERATE"), type);
99+
// Parse the value
100+
final Optional<Double> sampleRate = parseSampleRate(datagram, matcher.group("SAMPLERATE"), type);
97101

98-
// Parse the tags
99-
final ImmutableMap<String, String> annotations = parseTags(matcher.group("TAGS"));
102+
// Parse the tags
103+
final ImmutableMap<String, String> annotations = parseTags(matcher.group("TAGS"));
100104

101-
// Enforce sampling
102-
if (sampleRate.isPresent() && sampleRate.get().compareTo(1.0) != 0) {
103-
if (sampleRate.get().compareTo(0.0) == 0) {
104-
return Collections.emptyList();
105-
}
106-
if (Double.compare(_randomSupplier.get().nextDouble(), sampleRate.get()) > 0) {
107-
return Collections.emptyList();
105+
// Enforce sampling
106+
if (sampleRate.isPresent() && sampleRate.get().compareTo(1.0) != 0) {
107+
if (sampleRate.get().compareTo(0.0) == 0) {
108+
return Collections.emptyList();
109+
}
110+
if (Double.compare(_randomSupplier.get().nextDouble(), sampleRate.get()) > 0) {
111+
return Collections.emptyList();
112+
}
108113
}
114+
115+
recordListBuilder.add(createRecord(name, value, type, annotations));
109116
}
110117

111-
return Collections.singletonList(createRecord(name, value, type, annotations));
118+
return recordListBuilder.build();
112119
}
113120

114121
private StatsdType parseStatsdType(final ByteBuffer datagram, final @Nullable String statsdTypeAsString) throws ParsingException {
@@ -225,6 +232,7 @@ public StatsdToRecordParser() {
225232
StatsdType.COUNTER,
226233
StatsdType.HISTOGRAM,
227234
StatsdType.TIMER);
235+
private static final Splitter LINE_SPLITTER = Splitter.on('\n').omitEmptyStrings();
228236
private static final ThreadLocal<NumberFormat> NUMBER_FORMAT = ThreadLocal.withInitial(NumberFormat::getInstance);
229237
private static final Pattern STATSD_PATTERN = Pattern.compile(
230238
"^(?<NAME>[^:@|]+):(?<VALUE>[^|]+)\\|(?<TYPE>[^|]+)(\\|@(?<SAMPLERATE>[^|]+))?(\\|#(?<TAGS>.+))?$");

0 commit comments

Comments
 (0)