Skip to content

Commit cf4ddb9

Browse files
authored
Merge pull request #37 from ArpNetworking/app_http
Support application HTTP sink
2 parents 239ad7e + b8d0240 commit cf4ddb9

File tree

9 files changed

+253
-22
lines changed

9 files changed

+253
-22
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Metrics Aggregator Daemon
66
alt="License: Apache 2">
77
</a>
88
<a href="https://travis-ci.org/ArpNetworking/metrics-aggregator-daemon/">
9-
<img src="https://travis-ci.org/ArpNetworking/metrics-aggregator-daemon.png"
9+
<img src="https://travis-ci.org/ArpNetworking/metrics-aggregator-daemon.png?branch=master"
1010
alt="Travis Build">
1111
</a>
1212
<a href="http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.arpnetworking.metrics%22%20a%3A%22metrics-aggregator-daemon%22">

pom.xml

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
<arpnetworking.commons.version>1.7.1</arpnetworking.commons.version>
7979
<aspectjrt.version>1.8.9</aspectjrt.version>
8080
<cglib.version>3.2.1</cglib.version>
81+
<client.protocol.version>0.9.3</client.protocol.version>
8182
<commons.codec.version>1.10</commons.codec.version>
8283
<commons.io.version>2.4</commons.io.version>
8384
<commons.math3.version>3.3</commons.math3.version>
@@ -97,12 +98,13 @@
9798
<log4j.over.slf4j.version>1.7.12</log4j.over.slf4j.version>
9899
<metrics.aggregator.protocol.version>1.0.3</metrics.aggregator.protocol.version>
99100
<metrics.client.version>0.4.5</metrics.client.version>
101+
<metrics.client.http.version>0.1.1</metrics.client.http.version>
100102
<metrics.generator.version>1.1.0</metrics.generator.version>
101103
<metrics.jvm.extra.version>0.4.2</metrics.jvm.extra.version>
102104
<mockito.version>1.10.19</mockito.version>
103105
<oval.version>1.86</oval.version>
104106
<performance.test.version>1.1.0</performance.test.version>
105-
<protobuf.version>3.0.0-beta-2</protobuf.version>
107+
<protobuf.version>3.0.2</protobuf.version>
106108
<reflections.version>0.9.10</reflections.version>
107109
<scala.version>2.11</scala.version>
108110
<scala.java.compat.version>0.7.0</scala.java.compat.version>
@@ -411,6 +413,11 @@
411413
<artifactId>metrics-client</artifactId>
412414
<version>${metrics.client.version}</version>
413415
</dependency>
416+
<dependency>
417+
<groupId>com.arpnetworking.metrics</groupId>
418+
<artifactId>sink-http-apache</artifactId>
419+
<version>${metrics.client.http.version}</version>
420+
</dependency>
414421
<dependency>
415422
<groupId>com.arpnetworking.metrics.extras</groupId>
416423
<artifactId>jvm-extra</artifactId>
@@ -592,6 +599,11 @@
592599
<artifactId>metrics-aggregator-protocol</artifactId>
593600
<version>${metrics.aggregator.protocol.version}</version>
594601
</dependency>
602+
<dependency>
603+
<groupId>com.inscopemetrics.client</groupId>
604+
<artifactId>protocol</artifactId>
605+
<version>${client.protocol.version}</version>
606+
</dependency>
595607
<!-- Test - General -->
596608
<dependency>
597609
<groupId>junit</groupId>

src/main/docker/Dockerfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ ENV APP_PARAMS="/opt/mad/config/config.json"
3434
RUN mkdir -p /opt/mad/lib/ext && \
3535
mkdir -p /opt/mad/logs && \
3636
mkdir -p /opt/mad/config/pipelines
37-
ADD deps/* /opt/mad/lib/
38-
ADD bin/* /opt/mad/bin/
39-
ADD config/* /opt/mad/config/
40-
ADD lib/* /opt/mad/lib/
37+
ADD deps /opt/mad/lib/
38+
ADD bin /opt/mad/bin/
39+
ADD config /opt/mad/config/
40+
ADD lib /opt/mad/lib/
4141

4242
# Entry point
4343
CMD JAVA_OPTS="${LOGBACK_CONFIG} ${ADDITIONAL_JAVA_OPTS}" /opt/mad/bin/mad ${APP_PARAMS} ${ADDITIONAL_APP_PARAMS}

src/main/java/com/arpnetworking/http/Routes.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import com.arpnetworking.metrics.Metrics;
4242
import com.arpnetworking.metrics.MetricsFactory;
4343
import com.arpnetworking.metrics.Timer;
44+
import com.arpnetworking.metrics.common.sources.ClientHttpSourceV1;
45+
import com.arpnetworking.metrics.common.sources.CollectdHttpSourceV1;
4446
import com.arpnetworking.metrics.mad.actors.Status;
4547
import com.arpnetworking.metrics.proxy.actors.Connection;
4648
import com.arpnetworking.metrics.proxy.models.messages.Connect;
@@ -156,22 +158,31 @@ private CompletionStage<HttpResponse> process(final HttpRequest request) {
156158
.withStatus(StatusCodes.OK)
157159
.withEntity(JSON_CONTENT_TYPE, ByteString.fromString(STATUS_JSON)));
158160
}
159-
} else if ((HttpMethods.POST.equals(request.method()))
160-
&& path.equals(COLLECTD_V1_SOURCE_PREFIX)) {
161-
final Future<ActorRef> refFuture = _actorSystem.actorSelection("/user/collectdv1")
162-
.resolveOne(FiniteDuration.create(1, TimeUnit.SECONDS));
163-
return FutureConverters.toJava(refFuture).thenCompose(
164-
ref -> {
165-
final CompletableFuture<HttpResponse> response = new CompletableFuture<>();
166-
ref.tell(new RequestReply(request, response), ActorRef.noSender());
167-
return response;
168-
})
169-
.exceptionally(err -> HttpResponse.create().withStatus(404));
161+
} else if (HttpMethods.POST.equals(request.method())) {
162+
if (path.equals(COLLECTD_V1_SOURCE_PREFIX)) {
163+
return dispatchHttpRequest(request, ACTOR_COLLECTD_V1);
164+
} else if (path.equals(APP_V1_SOURCE_PREFIX)) {
165+
return dispatchHttpRequest(request, ACTOR_APP_V1);
166+
}
170167
}
171168

172169
return CompletableFuture.completedFuture(HttpResponse.create().withStatus(404));
173170
}
174171

172+
private CompletionStage<HttpResponse> dispatchHttpRequest(final HttpRequest request, final String actorName) {
173+
final Future<ActorRef> refFuture = _actorSystem.actorSelection(actorName)
174+
.resolveOne(FiniteDuration.create(1, TimeUnit.SECONDS));
175+
return FutureConverters.toJava(refFuture).thenCompose(
176+
ref -> {
177+
final CompletableFuture<HttpResponse> response = new CompletableFuture<>();
178+
ref.tell(new RequestReply(request, response), ActorRef.noSender());
179+
return response;
180+
})
181+
// We return 404 here since actor startup is controlled by config and
182+
// the actors may not be running.
183+
.exceptionally(err -> HttpResponse.create().withStatus(404));
184+
}
185+
175186
private CompletionStage<HttpResponse> getHttpResponseForTelemetry(
176187
final HttpRequest request,
177188
final MessageProcessorsFactory messageProcessorsFactory) {
@@ -245,6 +256,9 @@ private String createTimerName(final HttpRequest request) {
245256
private static final String TELEMETRY_STREAM_V1_PATH = "/telemetry/v1/stream";
246257
private static final String TELEMETRY_STREAM_V2_PATH = "/telemetry/v2/stream";
247258
private static final String COLLECTD_V1_SOURCE_PREFIX = "/metrics/v1/collectd";
259+
private static final String APP_V1_SOURCE_PREFIX = "/metrics/v1/application";
260+
private static final String ACTOR_COLLECTD_V1 = "/user/" + CollectdHttpSourceV1.ACTOR_NAME;
261+
private static final String ACTOR_APP_V1 = "/user/" + ClientHttpSourceV1.ACTOR_NAME;
248262

249263
// Ping
250264
private static final HttpHeader PING_CACHE_CONTROL_HEADER = CacheControl.create(
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright 2016 Inscope Metrics, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.sources;
17+
18+
import com.arpnetworking.metrics.mad.parsers.ProtobufToRecordParser;
19+
20+
/**
21+
* Processes HTTP requests from the metrics client, extracts data and emits metrics.
22+
*
23+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
24+
*/
25+
public final class ClientHttpSourceV1 extends HttpSource {
26+
27+
/**
28+
* Protected constructor.
29+
*
30+
* @param builder Instance of <code>Builder</code>.
31+
*/
32+
private ClientHttpSourceV1(final Builder builder) {
33+
super(builder);
34+
}
35+
36+
/**
37+
* Name of the actor created to receive the HTTP Posts.
38+
*/
39+
public static final String ACTOR_NAME = "appv1";
40+
41+
/**
42+
* ClientHttpSourceV1 {@link BaseSource.Builder} implementation.
43+
*
44+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
45+
*/
46+
public static final class Builder extends HttpSource.Builder<Builder, ClientHttpSourceV1> {
47+
/**
48+
* Public constructor.
49+
*/
50+
public Builder() {
51+
super(ClientHttpSourceV1::new);
52+
setActorName(ACTOR_NAME);
53+
setParser(new ProtobufToRecordParser());
54+
}
55+
56+
/**
57+
* {@inheritDoc}
58+
*/
59+
@Override
60+
protected Builder self() {
61+
return this;
62+
}
63+
}
64+
65+
}

src/main/java/com/arpnetworking/metrics/common/sources/CollectdHttpSourceV1.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ private CollectdHttpSourceV1(final Builder builder) {
3333
super(builder);
3434
}
3535

36+
/**
37+
* Name of the actor created to receive the HTTP Posts.
38+
*/
39+
public static final String ACTOR_NAME = "collectdv1";
40+
3641
/**
3742
* CollectdHttpSourceV1 {@link BaseSource.Builder} implementation.
3843
*
@@ -45,6 +50,7 @@ public static final class Builder extends HttpSource.Builder<Builder, CollectdHt
4550
public Builder() {
4651
super(CollectdHttpSourceV1::new);
4752
setParser(new CollectdJsonToRecordParser());
53+
setActorName(ACTOR_NAME);
4854
}
4955

5056
@Override

src/main/java/com/arpnetworking/metrics/common/sources/HttpSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void onReceive(final Object message) throws Exception {
114114
responseFuture.complete(HttpResponse.create().withStatus(200));
115115
} else {
116116
BAD_REQUEST_LOGGER.warn()
117-
.setMessage("Error handling collectd post")
117+
.setMessage("Error handling http post")
118118
.setThrowable(err)
119119
.log();
120120
if (err instanceof ParsingException) {

src/main/java/com/arpnetworking/metrics/mad/Main.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.arpnetworking.configuration.triggers.FileTrigger;
3232
import com.arpnetworking.http.Routes;
3333
import com.arpnetworking.metrics.MetricsFactory;
34-
import com.arpnetworking.metrics.impl.TsdLogSink;
34+
import com.arpnetworking.metrics.impl.ApacheHttpSink;
3535
import com.arpnetworking.metrics.impl.TsdMetricsFactory;
3636
import com.arpnetworking.metrics.jvm.JvmMetricsRunnable;
3737
import com.arpnetworking.metrics.mad.actors.Status;
@@ -250,14 +250,15 @@ private Injector launchGuice(final ActorSystem actorSystem) {
250250
}
251251

252252
// Instantiate the metrics factory
253+
final String sinkHost = "0.0.0.0".equals(_configuration.getHttpHost()) ? "localhost" : _configuration.getHttpHost();
254+
final String sinkUrl = "http://" + sinkHost + ":" + _configuration.getHttpPort() + "/metrics/v1/application";
253255
final MetricsFactory metricsFactory = new TsdMetricsFactory.Builder()
254256
.setClusterName(_configuration.getMonitoringCluster())
255257
.setServiceName("mad")
256258
.setSinks(
257259
Collections.singletonList(
258-
new TsdLogSink.Builder()
259-
.setDirectory(_configuration.getLogDirectory())
260-
.setName("mad-query")
260+
new ApacheHttpSink.Builder()
261+
.setUri(sinkUrl)
261262
.build()))
262263
.build();
263264

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright 2016 Inscope Metrics, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.mad.parsers;
17+
18+
import com.arpnetworking.metrics.common.parsers.Parser;
19+
import com.arpnetworking.metrics.common.parsers.exceptions.ParsingException;
20+
import com.arpnetworking.metrics.mad.model.DefaultMetric;
21+
import com.arpnetworking.metrics.mad.model.DefaultRecord;
22+
import com.arpnetworking.metrics.mad.model.HttpRequest;
23+
import com.arpnetworking.metrics.mad.model.Metric;
24+
import com.arpnetworking.metrics.mad.model.Record;
25+
import com.arpnetworking.tsdcore.model.MetricType;
26+
import com.arpnetworking.tsdcore.model.Quantity;
27+
import com.arpnetworking.tsdcore.model.Unit;
28+
import com.google.common.collect.ImmutableMap;
29+
import com.google.common.collect.Lists;
30+
import com.google.protobuf.InvalidProtocolBufferException;
31+
import com.inscopemetrics.client.protocol.ClientV1;
32+
import net.sf.oval.exception.ConstraintsViolatedException;
33+
import org.joda.time.DateTime;
34+
35+
import java.nio.ByteBuffer;
36+
import java.util.List;
37+
import java.util.UUID;
38+
import javax.annotation.Nullable;
39+
40+
/**
41+
* Parses the Inscope Metrics protobuf binary protocol into records.
42+
*
43+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
44+
*/
45+
public class ProtobufToRecordParser implements Parser<List<Record>, HttpRequest> {
46+
/**
47+
* {@inheritDoc}
48+
*/
49+
@Override
50+
public List<Record> parse(final HttpRequest data) throws ParsingException {
51+
try {
52+
final ClientV1.RecordSet request = ClientV1.RecordSet.parseFrom(data.getBody());
53+
final List<Record> records = Lists.newArrayList();
54+
for (final ClientV1.Record record : request.getRecordsList()) {
55+
final ByteBuffer byteBuffer = ByteBuffer.wrap(record.getId().toByteArray());
56+
final Long high = byteBuffer.getLong();
57+
final Long low = byteBuffer.getLong();
58+
final DefaultRecord.Builder builder = new DefaultRecord.Builder()
59+
.setId(new UUID(high, low).toString())
60+
.setTime(new DateTime(record.getEndMillisSinceEpoch()))
61+
.setAnnotations(buildAnnotations(record))
62+
.setMetrics(buildMetrics(record));
63+
64+
records.add(builder.build());
65+
}
66+
return records;
67+
} catch (final InvalidProtocolBufferException e) {
68+
throw new ParsingException("Could not create Request message from data", data.getBody(), e);
69+
} catch (final ConstraintsViolatedException | IllegalArgumentException e) {
70+
throw new ParsingException("Could not build record", data.getBody(), e);
71+
}
72+
}
73+
74+
private ImmutableMap<String, ? extends Metric> buildMetrics(final ClientV1.Record record) {
75+
final ImmutableMap.Builder<String, Metric> metrics = ImmutableMap.builder();
76+
processEntries(metrics, record.getCountersList(), MetricType.COUNTER);
77+
processEntries(metrics, record.getTimersList(), MetricType.TIMER);
78+
processEntries(metrics, record.getGaugesList(), MetricType.GAUGE);
79+
return metrics.build();
80+
}
81+
82+
private void processEntries(
83+
final ImmutableMap.Builder<String, Metric> metrics,
84+
final List<ClientV1.MetricEntry> entries,
85+
final MetricType metricType) {
86+
for (final ClientV1.MetricEntry metricEntry : entries) {
87+
final DefaultMetric.Builder metricBuilder = new DefaultMetric.Builder()
88+
.setType(metricType);
89+
final List<Quantity> quantities = Lists.newArrayListWithExpectedSize(metricEntry.getSamplesCount());
90+
for (final ClientV1.DoubleQuantity quantity : metricEntry.getSamplesList()) {
91+
quantities.add(
92+
new Quantity.Builder()
93+
.setUnit(baseUnit(quantity.getUnit()))
94+
.setValue(quantity.getValue())
95+
.build());
96+
}
97+
98+
metricBuilder.setValues(quantities);
99+
metrics.put(metricEntry.getName(), metricBuilder.build());
100+
}
101+
}
102+
103+
@Nullable
104+
private Unit baseUnit(final ClientV1.CompoundUnit compoundUnit) {
105+
if (!compoundUnit.getNumeratorList().isEmpty()) {
106+
final ClientV1.Unit selectedUnit = compoundUnit.getNumerator(0);
107+
if (ClientV1.Unit.Type.UNRECOGNIZED.equals(selectedUnit.getType())) {
108+
return null;
109+
}
110+
final String unitName;
111+
if (!ClientV1.Unit.Scale.UNIT.equals(selectedUnit.getScale())
112+
&& !ClientV1.Unit.Scale.UNRECOGNIZED.equals(selectedUnit.getScale())) {
113+
unitName = selectedUnit.getScale().name() + selectedUnit.getType().name();
114+
} else {
115+
unitName = selectedUnit.getType().name();
116+
}
117+
118+
return Unit.valueOf(unitName);
119+
}
120+
return null;
121+
}
122+
123+
private ImmutableMap<String, String> buildAnnotations(final ClientV1.Record record) {
124+
final ImmutableMap.Builder<String, String> annotations = ImmutableMap.builder();
125+
for (final ClientV1.AnnotationEntry annotationEntry : record.getAnnotationsList()) {
126+
annotations.put(annotationEntry.getName(), annotationEntry.getValue());
127+
}
128+
for (final ClientV1.DimensionEntry dimensionEntry : record.getDimensionsList()) {
129+
annotations.put(dimensionEntry.getName(), dimensionEntry.getValue());
130+
}
131+
return annotations.build();
132+
}
133+
}

0 commit comments

Comments
 (0)