Skip to content

Commit fce744a

Browse files
vjkoskelaBrandonArp
authored andcommitted
Telegraf batch format (#102)
* Remove serviceName from documentation as it's no longer read by mad. * Add support for batch telegraf json output format.
1 parent 3c0024b commit fce744a

File tree

4 files changed

+197
-51
lines changed

4 files changed

+197
-51
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ For example:
125125
```json
126126
{
127127
"name": "MyApplicationPipeline",
128-
"serviceName": "MyApplication",
129128
"sources":
130129
[
131130
{

src/main/java/com/arpnetworking/metrics/mad/parsers/TelegrafJsonToRecordParser.java

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import com.arpnetworking.metrics.mad.model.json.Telegraf;
2929
import com.arpnetworking.tsdcore.model.MetricType;
3030
import com.arpnetworking.tsdcore.model.Quantity;
31+
import com.fasterxml.jackson.core.type.TypeReference;
32+
import com.fasterxml.jackson.databind.JsonNode;
3133
import com.fasterxml.jackson.databind.ObjectMapper;
3234
import com.google.common.collect.ImmutableList;
3335
import com.google.common.collect.ImmutableMap;
@@ -40,7 +42,6 @@
4042
import java.time.Instant;
4143
import java.time.ZoneOffset;
4244
import java.time.ZonedDateTime;
43-
import java.util.Collections;
4445
import java.util.List;
4546
import java.util.Map;
4647
import javax.annotation.Nullable;
@@ -110,30 +111,53 @@ public final class TelegrafJsonToRecordParser implements Parser<List<Record>, By
110111
*/
111112
public List<Record> parse(final ByteBuffer record) throws ParsingException {
112113
try {
113-
final Telegraf telegraf = OBJECT_MAPPER.readValue(record.array(), Telegraf.class);
114-
final ImmutableMap.Builder<String, Metric> metrics = ImmutableMap.builder();
115-
for (final Map.Entry<String, String> entry : telegraf.getFields().entrySet()) {
116-
final @Nullable Double value = parseValue(entry.getValue());
117-
if (value != null) {
118-
metrics.put(
119-
telegraf.getName().isEmpty() ? entry.getKey() : telegraf.getName() + "." + entry.getKey(),
120-
ThreadLocalBuilder.build(
121-
DefaultMetric.Builder.class,
122-
b1 -> b1.setType(MetricType.TIMER)
123-
.setValues(ImmutableList.of(
124-
ThreadLocalBuilder.build(
125-
Quantity.Builder.class,
126-
b2 -> b2.setValue(value))))));
114+
// Parse into abstract json node structure to determine between
115+
// batch and single metric telegraf json formats.
116+
final JsonNode jsonNode;
117+
try {
118+
jsonNode = OBJECT_MAPPER.readTree(record.array());
119+
} catch (final IOException e) {
120+
throw new ParsingException("Invalid json", record.array(), e);
121+
}
122+
123+
final ImmutableList<Telegraf> telegrafList;
124+
if (jsonNode.has(METRICS_JSON_KEY)) {
125+
// Convoluted; see: https://github.com/FasterXML/jackson-databind/issues/1294
126+
telegrafList = OBJECT_MAPPER.readValue(
127+
OBJECT_MAPPER.treeAsTokens(jsonNode.get(METRICS_JSON_KEY)),
128+
OBJECT_MAPPER.getTypeFactory().constructType(TELEGRAF_LIST_TYPE_REFERENCE));
129+
} else {
130+
final Telegraf telegraf = OBJECT_MAPPER.treeToValue(jsonNode, Telegraf.class);
131+
telegrafList = ImmutableList.of(telegraf);
132+
}
133+
134+
final ImmutableList.Builder<Record> records = ImmutableList.builder();
135+
for (final Telegraf telegraf : telegrafList) {
136+
final ImmutableMap.Builder<String, Metric> metrics = ImmutableMap.builder();
137+
for (final Map.Entry<String, String> entry : telegraf.getFields().entrySet()) {
138+
final @Nullable Double value = parseValue(entry.getValue());
139+
if (value != null) {
140+
metrics.put(
141+
telegraf.getName().isEmpty() ? entry.getKey() : telegraf.getName() + "." + entry.getKey(),
142+
ThreadLocalBuilder.build(
143+
DefaultMetric.Builder.class,
144+
b1 -> b1.setType(MetricType.TIMER)
145+
.setValues(ImmutableList.of(
146+
ThreadLocalBuilder.build(
147+
Quantity.Builder.class,
148+
b2 -> b2.setValue(value))))));
149+
}
127150
}
151+
final ZonedDateTime timestamp = _timestampUnit.create(telegraf.getTimestamp());
152+
records.add(
153+
ThreadLocalBuilder.build(
154+
DefaultRecord.Builder.class,
155+
b -> b.setId(UUID_FACTORY.create().toString())
156+
.setMetrics(metrics.build())
157+
.setDimensions(telegraf.getTags())
158+
.setTime(timestamp)));
128159
}
129-
final ZonedDateTime timestamp = _timestampUnit.create(telegraf.getTimestamp());
130-
return Collections.singletonList(
131-
ThreadLocalBuilder.build(
132-
DefaultRecord.Builder.class,
133-
b -> b.setId(UUID_FACTORY.create().toString())
134-
.setMetrics(metrics.build())
135-
.setDimensions(telegraf.getTags())
136-
.setTime(timestamp)));
160+
return records.build();
137161
} catch (final IOException e) {
138162
throw new ParsingException("Invalid json", record.array(), e);
139163
}
@@ -161,6 +185,9 @@ private TelegrafJsonToRecordParser(final Builder builder) {
161185
private static final UuidFactory UUID_FACTORY = new SplittableRandomUuidFactory();
162186
private static final ThreadLocal<NumberFormat> NUMBER_FORMAT = ThreadLocal.withInitial(NumberFormat::getInstance);
163187
private static final ObjectMapper OBJECT_MAPPER = ObjectMapperFactory.getInstance();
188+
private static final TypeReference<ImmutableList<Telegraf>> TELEGRAF_LIST_TYPE_REFERENCE =
189+
new TypeReference<ImmutableList<Telegraf>>() {};
190+
private static final String METRICS_JSON_KEY = "metrics";
164191

165192
/**
166193
* Implementation of <code>Builder</code> for {@link TelegrafJsonToRecordParser}.

src/test/java/com/arpnetworking/metrics/mad/parsers/TelegrafJsonToRecordParserTest.java

Lines changed: 107 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.arpnetworking.tsdcore.model.Key;
2323
import com.arpnetworking.tsdcore.model.MetricType;
2424
import com.arpnetworking.tsdcore.model.Quantity;
25-
import com.google.common.collect.Iterables;
2625
import com.google.common.io.Resources;
2726
import org.junit.Assert;
2827
import org.junit.Test;
@@ -32,6 +31,8 @@
3231
import java.time.Instant;
3332
import java.time.ZoneOffset;
3433
import java.time.ZonedDateTime;
34+
import java.util.Collection;
35+
import java.util.Iterator;
3536
import java.util.List;
3637
import java.util.Map;
3738

@@ -44,10 +45,69 @@ public class TelegrafJsonToRecordParserTest {
4445

4546
@Test
4647
public void testParse() throws ParsingException, IOException {
47-
final Record record = parseRecord("TelegrafJsonParserTest/testParse.json");
48+
final Collection<Record> records = parseRecord("TelegrafJsonParserTest/testParse.json");
49+
50+
Assert.assertNotNull(records);
51+
Assert.assertEquals(1, records.size());
52+
53+
final Record record = records.iterator().next();
54+
55+
verifyRecordOne(record);
56+
}
57+
58+
@Test
59+
public void testParseBatch() throws ParsingException, IOException {
60+
final Collection<Record> records = parseRecord("TelegrafJsonParserTest/testBatch.json");
61+
62+
Assert.assertNotNull(records);
63+
Assert.assertEquals(2, records.size());
64+
65+
final Iterator<Record> iterator = records.iterator();
66+
final Record recordOne = iterator.next();
67+
final Record recordTwo = iterator.next();
68+
69+
verifyRecordOne(recordOne);
70+
verifyRecordTwo(recordTwo);
71+
}
72+
73+
@Test
74+
public void tesBlankName() throws ParsingException, IOException {
75+
final Collection<Record> records = parseRecord("TelegrafJsonParserTest/testBlankName.json");
76+
77+
Assert.assertNotNull(records);
78+
Assert.assertEquals(1, records.size());
79+
80+
final Record record = records.iterator().next();
4881

4982
Assert.assertNotNull(record);
5083

84+
Assert.assertEquals(3, record.getDimensions().size());
85+
Assert.assertEquals("MyHost", record.getDimensions().get(Key.HOST_DIMENSION_KEY));
86+
Assert.assertEquals("MyService", record.getDimensions().get(Key.SERVICE_DIMENSION_KEY));
87+
Assert.assertEquals("MyCluster", record.getDimensions().get(Key.CLUSTER_DIMENSION_KEY));
88+
89+
Assert.assertEquals(0, record.getAnnotations().size());
90+
91+
final Map<String, ? extends Metric> map = record.getMetrics();
92+
Assert.assertEquals(2, map.size());
93+
94+
final Metric t1 = map.get("foo.t1");
95+
List<Quantity> vals = t1.getValues();
96+
Assert.assertEquals(1, vals.size());
97+
Assert.assertEquals(123d, vals.get(0).getValue(), 0.001);
98+
Assert.assertFalse(vals.get(0).getUnit().isPresent());
99+
Assert.assertEquals(MetricType.TIMER, t1.getType());
100+
101+
final Metric t2 = map.get("bar.t2");
102+
vals = t2.getValues();
103+
Assert.assertEquals(1, vals.size());
104+
Assert.assertEquals(1.23d, vals.get(0).getValue(), 0.001);
105+
Assert.assertFalse(vals.get(0).getUnit().isPresent());
106+
Assert.assertEquals(MetricType.TIMER, t2.getType());
107+
}
108+
109+
110+
private void verifyRecordOne(final Record record) {
51111
Assert.assertNotNull(record.getAnnotations());
52112
Assert.assertEquals(0, record.getAnnotations().size());
53113

@@ -99,45 +159,65 @@ public void testParse() throws ParsingException, IOException {
99159
Assert.assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli((long) (1458229140 * 1000d)), ZoneOffset.UTC), record.getTime());
100160
}
101161

102-
@Test
103-
public void tesBlankName() throws ParsingException, IOException {
104-
final Record record = parseRecord("TelegrafJsonParserTest/testBlankName.json");
105-
106-
Assert.assertNotNull(record);
107-
108-
Assert.assertEquals(3, record.getDimensions().size());
109-
Assert.assertEquals("MyHost", record.getDimensions().get(Key.HOST_DIMENSION_KEY));
110-
Assert.assertEquals("MyService", record.getDimensions().get(Key.SERVICE_DIMENSION_KEY));
111-
Assert.assertEquals("MyCluster", record.getDimensions().get(Key.CLUSTER_DIMENSION_KEY));
112-
162+
private void verifyRecordTwo(final Record record) {
163+
Assert.assertNotNull(record.getAnnotations());
113164
Assert.assertEquals(0, record.getAnnotations().size());
114165

166+
Assert.assertEquals(5, record.getDimensions().size());
167+
Assert.assertEquals("MyCluster2", record.getDimensions().get(Key.CLUSTER_DIMENSION_KEY));
168+
Assert.assertEquals("MyService2", record.getDimensions().get(Key.SERVICE_DIMENSION_KEY));
169+
Assert.assertEquals("MyHost2", record.getDimensions().get(Key.HOST_DIMENSION_KEY));
170+
Assert.assertEquals("CA", record.getDimensions().get("region"));
171+
Assert.assertEquals("foo", record.getDimensions().get("bar"));
172+
115173
final Map<String, ? extends Metric> map = record.getMetrics();
116-
Assert.assertEquals(2, map.size());
174+
Assert.assertEquals(5, map.size());
117175

118-
final Metric t1 = map.get("foo.t1");
176+
final Metric t1 = map.get("MyName2.t1");
119177
List<Quantity> vals = t1.getValues();
120178
Assert.assertEquals(1, vals.size());
121-
Assert.assertEquals(123d, vals.get(0).getValue(), 0.001);
179+
Assert.assertEquals(456d, vals.get(0).getValue(), 0.001);
122180
Assert.assertFalse(vals.get(0).getUnit().isPresent());
123181
Assert.assertEquals(MetricType.TIMER, t1.getType());
124182

125-
final Metric t2 = map.get("bar.t2");
183+
final Metric t2 = map.get("MyName2.t2");
126184
vals = t2.getValues();
127185
Assert.assertEquals(1, vals.size());
128-
Assert.assertEquals(1.23d, vals.get(0).getValue(), 0.001);
186+
Assert.assertEquals(4.56d, vals.get(0).getValue(), 0.001);
129187
Assert.assertFalse(vals.get(0).getUnit().isPresent());
130188
Assert.assertEquals(MetricType.TIMER, t2.getType());
189+
190+
final Metric g1 = map.get("MyName2.g1");
191+
vals = g1.getValues();
192+
Assert.assertEquals(1, vals.size());
193+
Assert.assertEquals(482d, vals.get(0).getValue(), 0.001);
194+
Assert.assertFalse(vals.get(0).getUnit().isPresent());
195+
Assert.assertEquals(MetricType.TIMER, g1.getType());
196+
197+
final Metric g2 = map.get("MyName2.g2");
198+
vals = g2.getValues();
199+
Assert.assertEquals(1, vals.size());
200+
Assert.assertEquals(4.82d, vals.get(0).getValue(), 0.001);
201+
Assert.assertFalse(vals.get(0).getUnit().isPresent());
202+
Assert.assertEquals(MetricType.TIMER, g2.getType());
203+
204+
final Metric c1 = map.get("MyName2.c1");
205+
vals = c1.getValues();
206+
Assert.assertEquals(1, vals.size());
207+
Assert.assertEquals(2d, vals.get(0).getValue(), 0.001);
208+
Assert.assertFalse(vals.get(0).getUnit().isPresent());
209+
Assert.assertEquals(MetricType.TIMER, c1.getType());
210+
211+
Assert.assertEquals(ZonedDateTime.ofInstant(Instant.ofEpochMilli((long) (1458229140 * 1000d)), ZoneOffset.UTC), record.getTime());
131212
}
132213

133-
private static Record parseRecord(final String fileName) throws ParsingException, IOException {
134-
return Iterables.getOnlyElement(
135-
new TelegrafJsonToRecordParser.Builder()
136-
.build()
137-
.parse(ByteBuffer.wrap(
138-
Resources.toByteArray(
139-
Resources.getResource(
140-
TelegrafJsonToRecordParserTest.class,
141-
fileName)))));
214+
private static Collection<Record> parseRecord(final String fileName) throws ParsingException, IOException {
215+
return new TelegrafJsonToRecordParser.Builder()
216+
.build()
217+
.parse(ByteBuffer.wrap(
218+
Resources.toByteArray(
219+
Resources.getResource(
220+
TelegrafJsonToRecordParserTest.class,
221+
fileName))));
142222
}
143223
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
{
2+
"metrics": [
3+
{
4+
"fields":{
5+
"t1":123,
6+
"t2":1.23,
7+
"g1":246,
8+
"g2":2.46,
9+
"c1":1
10+
},
11+
"name":"MyName",
12+
"tags":{
13+
"cluster":"MyCluster",
14+
"service":"MyService",
15+
"host":"MyHost",
16+
"region":"US",
17+
"foo":"bar"
18+
},
19+
"timestamp":1458229140
20+
},
21+
{
22+
"fields":{
23+
"t1":456,
24+
"t2":4.56,
25+
"g1":482,
26+
"g2":4.82,
27+
"c1":2
28+
},
29+
"name":"MyName2",
30+
"tags":{
31+
"cluster":"MyCluster2",
32+
"service":"MyService2",
33+
"host":"MyHost2",
34+
"region":"CA",
35+
"bar":"foo"
36+
},
37+
"timestamp":1458229140
38+
}
39+
]
40+
}

0 commit comments

Comments
 (0)