Skip to content

Commit 1cf314d

Browse files
authored
Telemetry Integration Test (#166)
* Add integration test for telemetry. * Addressed code review feedback.
1 parent 151cb44 commit 1cf314d

File tree

4 files changed

+460
-30
lines changed

4 files changed

+460
-30
lines changed

config/pipelines/pipeline.conf

Lines changed: 76 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ sources=[
4646
type="com.arpnetworking.metrics.common.sources.ClientHttpSourceV2"
4747
name="http_v2_source"
4848
}
49+
{
50+
type="com.arpnetworking.metrics.common.sources.ClientHttpSourceV3"
51+
name="http_v3_source"
52+
}
4953
{
5054
type="com.arpnetworking.metrics.common.sources.PrometheusHttpSource"
5155
name="prometheus_source"
@@ -60,7 +64,7 @@ sources=[
6064
actorName="graphite-plaintext-tcp-source"
6165
name="graphite_plaintext_source"
6266
host="0.0.0.0"
63-
port="8094"
67+
port="2003"
6468
parser={
6569
type="com.arpnetworking.metrics.mad.parsers.GraphitePlaintextToRecordParser"
6670
#globalTags={}
@@ -86,8 +90,8 @@ sources=[
8690
}
8791
}
8892
{
89-
type="com.arpnetworking.metrics.mad.sources.MappingSource"
90-
name="collectd_mapping_source"
93+
type = "com.arpnetworking.metrics.mad.sources.MappingSource"
94+
name = "collectd_mapping_source"
9195
"findAndReplace": {
9296
"^cpu/([\\d]+)/(cpu|percent)/([^/]+)(/value)?": ["cpu/$3", "cpu/by_core/$1/$3"],
9397
"^snmp/cpu_detailed/([\\d]+)/([^/]+)(/value)?": ["snmp/cpu/$2", "snmp/cpu/by_core/$1/$2"],
@@ -119,47 +123,90 @@ sources=[
119123
"^memcached/ps_count/.*": [],
120124
"^memcached/ps_cputime/.*": [],
121125
"^uptime/uptime(/value)?": ["uptime/value"]
122-
},
126+
}
123127
"source": {
124-
type="com.arpnetworking.metrics.common.sources.CollectdHttpSourceV1"
125-
name="collectd_http_source"
126-
},
127-
{
128-
type="com.arpnetworking.metrics.common.sources.KafkaSource"
129-
name="kafka_source"
130-
consumer={
131-
type="org.apache.kafka.clients.consumer.Consumer"
132-
topics=["topic"]
133-
configs={
134-
# Set any properties defined at: https://kafka.apache.org/documentation/#consumerconfigs
135-
bootstrap.servers="localhost:9092"
136-
group.id="group0"
137-
client.id="consumer0"
138-
key.deserializer="org.apache.kafka.common.serialization.ByteArrayDeserializer"
139-
value.deserializer="org.apache.kafka.common.serialization.ByteArrayDeserializer"
140-
auto.offset.reset="earliest"
141-
}
142-
}
143-
parser={
144-
type="com.arpnetworking.metrics.mad.parsers.ProtobufV2bytesToRecordParser"
128+
type = "com.arpnetworking.metrics.common.sources.CollectdHttpSourceV1"
129+
name = "collectd_http_source"
130+
}
131+
}
132+
{
133+
type="com.arpnetworking.metrics.common.sources.KafkaSource"
134+
name="kafka_source"
135+
consumer={
136+
type="org.apache.kafka.clients.consumer.Consumer"
137+
topics=["topic"]
138+
configs={
139+
# Set any properties defined at: https://kafka.apache.org/documentation/#consumerconfigs
140+
"bootstrap.servers"="localhost:9092"
141+
"group.id"="group0"
142+
"client.id"="consumer0"
143+
"key.deserializer"="org.apache.kafka.common.serialization.ByteArrayDeserializer"
144+
"value.deserializer"="org.apache.kafka.common.serialization.ByteArrayDeserializer"
145+
"auto.offset.reset"="earliest"
145146
}
146-
pollTime="PT1S"
147-
shutdownAwaitTime="PT10S"
148-
backoffTime="PT1S"
149147
}
148+
parser={
149+
type="com.arpnetworking.metrics.mad.parsers.ProtobufV2bytesToRecordParser"
150+
}
151+
pollTime="PT1S"
152+
shutdownAwaitTime="PT10S"
153+
backoffTime="PT1S"
150154
}
151155
]
152156

153157
# Sinks
154158
# ~~~~
159+
periodic_statistics_interval="1000"
160+
periodic_statistics_interval=${?PERIODIC_STATISTICS_INTERVAL}
161+
162+
telemetry_min_period=null
163+
telemetry_min_period=${?TELEMETRY_MIN_PERIOD}
164+
telemetry_max_period="PT1S"
165+
telemetry_max_period=${?TELEMETRY_MAX_PERIOD}
166+
167+
cluster_aggregator_min_period="PT1M"
168+
cluster_aggregator_min_period=${?CLUSTER_AGGREGATOR_MIN_PERIOD}
169+
cluster_aggregator_max_period=null
170+
cluster_aggregator_max_period=${?CLUSTER_AGGREGATOR_MAX_PERIOD}
171+
172+
cluster_aggregator_host="localhost"
173+
cluster_aggregator_host=${?CLUSTER_AGGREGATOR_HOST}
174+
cluster_aggregator_port="7066"
175+
cluster_aggregator_port=${?CLUSTER_AGGREGATOR_PORT}
176+
155177
sinks=[
178+
{
179+
type="com.arpnetworking.tsdcore.sinks.PeriodicStatisticsSink"
180+
name="periodic_statistics_sink"
181+
intervalInMilliseconds=${periodic_statistics_interval}
182+
}
156183
{
157184
type="com.arpnetworking.tsdcore.sinks.PeriodFilteringSink"
158185
name="telemetry_period_filtering_sink"
159-
excludeGreaterThan="PT1S"
186+
excludeLessThan=${telemetry_min_period}
187+
excludeGreaterThan=${telemetry_max_period}
160188
sink={
161189
type="com.arpnetworking.tsdcore.sinks.TelemetrySink"
162190
name="telemetry_sink"
191+
# TODO(ville): enable with mad-2.0
192+
#histogramStatistics = [
193+
# "p25",
194+
# "p50",
195+
# "p75",
196+
# "p90",
197+
# "p99"
198+
#]
199+
}
200+
}
201+
{
202+
type="com.arpnetworking.tsdcore.sinks.PeriodFilteringSink"
203+
name="cluster_http_period_filtering_sink"
204+
excludeLessThan=${cluster_aggregator_min_period}
205+
excludeGreaterThan=${cluster_aggregator_max_period}
206+
sink={
207+
type="com.arpnetworking.tsdcore.sinks.AggregationServerHttpSink"
208+
name="cluster_http_sink"
209+
uri="http://"${cluster_aggregator_host}":"${cluster_aggregator_port}"/metrics/v1/data/persist"
163210
}
164211
}
165212
]

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
<cglib.version>3.2.10</cglib.version>
8282
<client.protocol.version>0.11.1</client.protocol.version>
8383
<fastutil.version>8.2.2</fastutil.version>
84-
<spotbugs.annotations.version>3.1.12</spotbugs.annotations.version>
8584
<guava.version>25.1-jre</guava.version>
8685
<guice.version>4.2.2</guice.version>
8786
<jackson.version>2.9.8</jackson.version>
@@ -103,6 +102,7 @@
103102
<scala.library.version>2.11.12</scala.library.version>
104103
<slf4j.version>1.7.25</slf4j.version>
105104
<snappy.version>1.1.7.2</snappy.version>
105+
<spotbugs.annotations.version>3.1.12</spotbugs.annotations.version>
106106
<typesafe.config.version>1.3.3</typesafe.config.version>
107107
<vertx.core.version>2.1.6</vertx.core.version>
108108

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2019 Dropbox
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.mad.integration;
17+
18+
import com.arpnetworking.metrics.Metrics;
19+
import com.arpnetworking.metrics.MetricsFactory;
20+
import com.arpnetworking.metrics.impl.TsdMetricsFactory;
21+
import com.arpnetworking.metrics.mad.model.statistics.Statistic;
22+
import com.arpnetworking.metrics.mad.model.statistics.StatisticFactory;
23+
import com.arpnetworking.test.TelemetryClient;
24+
import org.junit.Assert;
25+
import org.junit.Test;
26+
import org.junit.runner.RunWith;
27+
import org.junit.runners.Parameterized;
28+
29+
import java.util.Arrays;
30+
import java.util.Collection;
31+
import java.util.UUID;
32+
import java.util.concurrent.CompletableFuture;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
36+
37+
/**
38+
* Integration test for streaming telemetry.
39+
*
40+
* @author Ville Koskela (ville dot koskela at inscopemetrics dot io)
41+
*/
42+
@RunWith(Parameterized.class)
43+
public final class TelemetryIT {
44+
45+
public TelemetryIT(final Statistic statistic, final double expectedResult) {
46+
_statistic = statistic;
47+
_expectedResult = expectedResult;
48+
_telemetryClient = TelemetryClient.getInstance();
49+
_metricName = UUID.randomUUID().toString();
50+
}
51+
52+
@Parameterized.Parameters(name = "{0}")
53+
public static Collection<Object[]> createParameters() {
54+
return Arrays.asList(
55+
new Object[]{STATISTIC_FACTORY.getStatistic("count"), 10.0d},
56+
new Object[]{STATISTIC_FACTORY.getStatistic("sum"), 423.5d},
57+
new Object[]{STATISTIC_FACTORY.getStatistic("mean"), 42.35d},
58+
new Object[]{STATISTIC_FACTORY.getStatistic("max"), 110d},
59+
new Object[]{STATISTIC_FACTORY.getStatistic("min"), 1.1d});
60+
// TODO(ville): enable with mad-2.0
61+
//new Object[]{STATISTIC_FACTORY.getStatistic("median"), 33.55d},
62+
//new Object[]{STATISTIC_FACTORY.getStatistic("p25"), 9.9d},
63+
//new Object[]{STATISTIC_FACTORY.getStatistic("p75"), 70.4d});
64+
}
65+
66+
@Test
67+
public void testFromSamples() throws InterruptedException, ExecutionException, TimeoutException {
68+
final CompletableFuture<Double> future = new CompletableFuture<>();
69+
70+
try {
71+
_telemetryClient.subscribe(
72+
"TelemetryIT",
73+
_metricName,
74+
_statistic,
75+
future::complete);
76+
77+
try (Metrics metrics = METRICS_FACTORY.create()) {
78+
for (int i = 1; i <= 10; ++i) {
79+
metrics.setGauge(_metricName, i * i * 1.1d);
80+
}
81+
}
82+
83+
Assert.assertEquals(_expectedResult, future.get(5, TimeUnit.SECONDS), 0.0001);
84+
} finally {
85+
_telemetryClient.unsubscribe(
86+
"TelemetryIT",
87+
_metricName,
88+
_statistic);
89+
}
90+
}
91+
92+
private final Statistic _statistic;
93+
private final double _expectedResult;
94+
private final TelemetryClient _telemetryClient;
95+
private final String _metricName;
96+
97+
private static final MetricsFactory METRICS_FACTORY = TsdMetricsFactory.newInstance("TelemetryIT", "test");
98+
private static final StatisticFactory STATISTIC_FACTORY = new StatisticFactory();
99+
}

0 commit comments

Comments
 (0)