Skip to content

Commit 44626fa

Browse files
authored
support protocol v2 (#70)
1 parent b7bc70f commit 44626fa

File tree

17 files changed

+429
-36
lines changed

17 files changed

+429
-36
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
<arpnetworking.commons.version>1.9.0</arpnetworking.commons.version>
7979
<aspectjrt.version>1.8.9</aspectjrt.version>
8080
<cglib.version>3.2.5</cglib.version>
81-
<client.protocol.version>0.9.3</client.protocol.version>
81+
<client.protocol.version>0.10.0</client.protocol.version>
8282
<commons.codec.version>1.10</commons.codec.version>
8383
<commons.io.version>2.4</commons.io.version>
8484
<commons.math3.version>3.3</commons.math3.version>
@@ -105,7 +105,7 @@
105105
<mockito.version>2.12.0</mockito.version>
106106
<oval.version>1.90</oval.version>
107107
<performance.test.version>1.1.0</performance.test.version>
108-
<protobuf.version>3.0.2</protobuf.version>
108+
<protobuf.version>3.4.0</protobuf.version>
109109
<reflections.version>0.9.11</reflections.version>
110110
<scala.version>2.11</scala.version>
111111
<scala.java.compat.version>0.7.0</scala.java.compat.version>

src/main/java/com/arpnetworking/http/Routes.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import akka.util.Timeout;
4141
import com.arpnetworking.metrics.Units;
4242
import com.arpnetworking.metrics.common.sources.ClientHttpSourceV1;
43+
import com.arpnetworking.metrics.common.sources.ClientHttpSourceV2;
4344
import com.arpnetworking.metrics.common.sources.CollectdHttpSourceV1;
4445
import com.arpnetworking.metrics.incubator.PeriodicMetrics;
4546
import com.arpnetworking.metrics.mad.actors.Status;
@@ -177,6 +178,8 @@ private CompletionStage<HttpResponse> process(final HttpRequest request) {
177178
} else if (Objects.equals(HttpMethods.POST, request.method())) {
178179
if (Objects.equals(path, COLLECTD_V1_SOURCE_PREFIX)) {
179180
return dispatchHttpRequest(request, ACTOR_COLLECTD_V1);
181+
} else if (Objects.equals(path, APP_V2_SOURCE_PREFIX)) {
182+
return dispatchHttpRequest(request, ACTOR_APP_V2);
180183
} else if (Objects.equals(path, APP_V1_SOURCE_PREFIX)) {
181184
return dispatchHttpRequest(request, ACTOR_APP_V1);
182185
}
@@ -295,8 +298,10 @@ private String createMetricName(final HttpRequest request, final String actionPa
295298
private static final String TELEMETRY_STREAM_V2_PATH = "/telemetry/v2/stream";
296299
private static final String COLLECTD_V1_SOURCE_PREFIX = "/metrics/v1/collectd";
297300
private static final String APP_V1_SOURCE_PREFIX = "/metrics/v1/application";
301+
private static final String APP_V2_SOURCE_PREFIX = "/metrics/v2/application";
298302
private static final String ACTOR_COLLECTD_V1 = "/user/" + CollectdHttpSourceV1.ACTOR_NAME;
299303
private static final String ACTOR_APP_V1 = "/user/" + ClientHttpSourceV1.ACTOR_NAME;
304+
private static final String ACTOR_APP_V2 = "/user/" + ClientHttpSourceV2.ACTOR_NAME;
300305
private static final String REST_SERVICE_METRIC_ROOT = "rest_service/";
301306
private static final String BODY_SIZE_METRIC = "body_size";
302307
private static final String REQUEST_METRIC = "request";

src/main/java/com/arpnetworking/metrics/common/sources/ClientHttpSourceV1.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package com.arpnetworking.metrics.common.sources;
1717

18-
import com.arpnetworking.metrics.mad.parsers.ProtobufToRecordParser;
18+
import com.arpnetworking.metrics.mad.parsers.ProtobufV1ToRecordParser;
1919

2020
/**
2121
* Processes HTTP requests from the metrics client, extracts data and emits metrics.
@@ -50,7 +50,7 @@ public static final class Builder extends HttpSource.Builder<Builder, ClientHttp
5050
public Builder() {
5151
super(ClientHttpSourceV1::new);
5252
setActorName(ACTOR_NAME);
53-
setParser(new ProtobufToRecordParser());
53+
setParser(new ProtobufV1ToRecordParser());
5454
}
5555

5656
@Override
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright 2017 Inscope Metrics, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.arpnetworking.metrics.common.sources;
17+
18+
import com.arpnetworking.metrics.mad.parsers.ProtobufV2ToRecordParser;
19+
20+
/**
21+
* Processes HTTP requests from the metrics client, extracts data and emits metrics.
22+
*
23+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
24+
*/
25+
public final class ClientHttpSourceV2 extends HttpSource {
26+
27+
/**
28+
* Protected constructor.
29+
*
30+
* @param builder Instance of <code>Builder</code>.
31+
*/
32+
private ClientHttpSourceV2(final Builder builder) {
33+
super(builder);
34+
}
35+
36+
/**
37+
* Name of the actor created to receive the HTTP Posts.
38+
*/
39+
public static final String ACTOR_NAME = "appv2";
40+
41+
/**
42+
* ClientHttpSourceV2 {@link BaseSource.Builder} implementation.
43+
*
44+
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
45+
*/
46+
public static final class Builder extends HttpSource.Builder<Builder, ClientHttpSourceV2> {
47+
/**
48+
* Public constructor.
49+
*/
50+
public Builder() {
51+
super(ClientHttpSourceV2::new);
52+
setActorName(ACTOR_NAME);
53+
setParser(new ProtobufV2ToRecordParser());
54+
}
55+
56+
@Override
57+
protected Builder self() {
58+
return this;
59+
}
60+
}
61+
62+
}

src/main/java/com/arpnetworking/metrics/common/sources/HttpSource.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.arpnetworking.steno.Logger;
4747
import com.arpnetworking.steno.LoggerFactory;
4848
import com.google.common.collect.ImmutableMultimap;
49-
import com.google.common.collect.Multimap;
5049
import net.sf.oval.constraint.NotNull;
5150

5251
import java.time.Duration;
@@ -138,20 +137,19 @@ public Receive createReceive() {
138137
_processGraph = GraphDSL.create(builder -> {
139138

140139
// Flows
141-
final Flow<HttpRequest, byte[], NotUsed> getBodyFlow = Flow.<HttpRequest>create()
140+
final Flow<HttpRequest, ByteString, NotUsed> getBodyFlow = Flow.<HttpRequest>create()
142141
.map(HttpRequest::entity)
143142
.flatMapConcat(RequestEntity::getDataBytes)
144143
.reduce(ByteString::concat)
145-
.map(ByteString::toArray) // Transform to array form
146144
.named("getBody");
147145

148-
final Flow<HttpRequest, Multimap<String, String>, NotUsed> getHeadersFlow = Flow.<HttpRequest>create()
146+
final Flow<HttpRequest, ImmutableMultimap<String, String>, NotUsed> getHeadersFlow = Flow.<HttpRequest>create()
149147
.map(HttpRequest::getHeaders)
150148
.map(Actor::createHeaderMultimap) // Transform to array form
151149
.named("getHeaders");
152150

153-
final Flow<Pair<byte[], Multimap<String, String>>, Record, NotUsed> createAndParseFlow =
154-
Flow.<Pair<byte[], Multimap<String, String>>>create()
151+
final Flow<Pair<ByteString, ImmutableMultimap<String, String>>, Record, NotUsed> createAndParseFlow =
152+
Flow.<Pair<ByteString, ImmutableMultimap<String, String>>>create()
155153
.map(Actor::mapModel)
156154
.mapConcat(this::parseRecords) // Parse the json string into a record builder
157155
// NOTE: this should be _parser::parse, but aspectj NPEs with that currently
@@ -160,13 +158,13 @@ public Receive createReceive() {
160158
// Shapes
161159
final UniformFanOutShape<HttpRequest, HttpRequest> split = builder.add(Broadcast.create(2));
162160

163-
final FlowShape<HttpRequest, byte[]> getBody = builder.add(getBodyFlow);
164-
final FlowShape<HttpRequest, Multimap<String, String>> getHeaders = builder.add(getHeadersFlow);
161+
final FlowShape<HttpRequest, ByteString> getBody = builder.add(getBodyFlow);
162+
final FlowShape<HttpRequest, ImmutableMultimap<String, String>> getHeaders = builder.add(getHeadersFlow);
165163
final FanInShape2<
166-
byte[],
167-
Multimap<String, String>,
168-
Pair<byte[], Multimap<String, String>>> join = builder.add(Zip.create());
169-
final FlowShape<Pair<byte[], Multimap<String, String>>, Record> createRequest =
164+
ByteString,
165+
ImmutableMultimap<String, String>,
166+
Pair<ByteString, ImmutableMultimap<String, String>>> join = builder.add(Zip.create());
167+
final FlowShape<Pair<ByteString, ImmutableMultimap<String, String>>, Record> createRequest =
170168
builder.add(createAndParseFlow);
171169

172170
// Wire the shapes
@@ -178,7 +176,7 @@ public Receive createReceive() {
178176
});
179177
}
180178

181-
private static Multimap<String, String> createHeaderMultimap(final Iterable<HttpHeader> headers) {
179+
private static ImmutableMultimap<String, String> createHeaderMultimap(final Iterable<HttpHeader> headers) {
182180
final ImmutableMultimap.Builder<String, String> headersBuilder = ImmutableMultimap.builder();
183181

184182
for (final HttpHeader httpHeader : headers) {
@@ -188,7 +186,8 @@ private static Multimap<String, String> createHeaderMultimap(final Iterable<Http
188186
return headersBuilder.build();
189187
}
190188

191-
private static com.arpnetworking.metrics.mad.model.HttpRequest mapModel(final Pair<byte[], Multimap<String, String>> pair) {
189+
private static com.arpnetworking.metrics.mad.model.HttpRequest mapModel(
190+
final Pair<ByteString, ImmutableMultimap<String, String>> pair) {
192191
return new com.arpnetworking.metrics.mad.model.HttpRequest(pair.second(), pair.first());
193192
}
194193

src/main/java/com/arpnetworking/metrics/mad/model/HttpRequest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,21 @@
1515
*/
1616
package com.arpnetworking.metrics.mad.model;
1717

18+
import akka.util.ByteString;
19+
import com.google.common.collect.ImmutableMultimap;
1820
import com.google.common.collect.Multimap;
19-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2021

2122
/**
2223
* Represents a parsable HTTP request.
2324
*
2425
* @author Brandon Arp (brandon dot arp at smartsheet dot com)
2526
*/
26-
@SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"})
2727
public final class HttpRequest {
2828
public Multimap<String, String> getHeaders() {
2929
return _headers;
3030
}
3131

32-
public byte[] getBody() {
32+
public ByteString getBody() {
3333
return _body;
3434
}
3535

@@ -39,12 +39,11 @@ public byte[] getBody() {
3939
* @param headers The headers.
4040
* @param body The body of the request.
4141
*/
42-
public HttpRequest(final Multimap<String, String> headers, final byte[] body) {
42+
public HttpRequest(final ImmutableMultimap<String, String> headers, final ByteString body) {
4343
_headers = headers;
4444
_body = body;
4545
}
4646

47-
private final Multimap<String, String> _headers;
48-
// TODO(barp): change this into a List or similar struture to ensure no modifications
49-
private final byte[] _body;
47+
private final ImmutableMultimap<String, String> _headers;
48+
private final ByteString _body;
5049
}

src/main/java/com/arpnetworking/metrics/mad/parsers/CollectdJsonToRecordParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public List<Record> parse(final HttpRequest request) throws ParsingException {
7878
}
7979
}
8080
try {
81-
final List<CollectdRecord> records = OBJECT_MAPPER.readValue(request.getBody(), COLLECTD_RECORD_LIST);
81+
final List<CollectdRecord> records = OBJECT_MAPPER.readValue(request.getBody().toArray(), COLLECTD_RECORD_LIST);
8282
final List<Record> parsedRecords = Lists.newArrayList();
8383
for (final CollectdRecord record : records) {
8484
final Multimap<String, Metric> metrics = HashMultimap.create();
@@ -118,7 +118,7 @@ public List<Record> parse(final HttpRequest request) throws ParsingException {
118118
}
119119
return parsedRecords;
120120
} catch (final IOException | ConstraintsViolatedException ex) {
121-
throw new ParsingException("Error parsing collectd json", request.getBody(), ex);
121+
throw new ParsingException("Error parsing collectd json", request.getBody().toArray(), ex);
122122
}
123123
}
124124

src/main/java/com/arpnetworking/metrics/mad/parsers/ProtobufToRecordParser.java renamed to src/main/java/com/arpnetworking/metrics/mad/parsers/ProtobufV1ToRecordParser.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,12 @@
4242
*
4343
* @author Brandon Arp (brandon dot arp at inscopemetrics dot com)
4444
*/
45-
public class ProtobufToRecordParser implements Parser<List<Record>, HttpRequest> {
45+
@SuppressWarnings("deprecation")
46+
public final class ProtobufV1ToRecordParser implements Parser<List<Record>, HttpRequest> {
4647
@Override
4748
public List<Record> parse(final HttpRequest data) throws ParsingException {
4849
try {
49-
final ClientV1.RecordSet request = ClientV1.RecordSet.parseFrom(data.getBody());
50+
final ClientV1.RecordSet request = ClientV1.RecordSet.parseFrom(data.getBody().asByteBuffer());
5051
final List<Record> records = Lists.newArrayList();
5152
for (final ClientV1.Record record : request.getRecordsList()) {
5253
final ByteBuffer byteBuffer = ByteBuffer.wrap(record.getId().toByteArray());
@@ -63,9 +64,9 @@ public List<Record> parse(final HttpRequest data) throws ParsingException {
6364
}
6465
return records;
6566
} catch (final InvalidProtocolBufferException e) {
66-
throw new ParsingException("Could not create Request message from data", data.getBody(), e);
67+
throw new ParsingException("Could not create Request message from data", data.getBody().toArray(), e);
6768
} catch (final ConstraintsViolatedException | IllegalArgumentException e) {
68-
throw new ParsingException("Could not build record", data.getBody(), e);
69+
throw new ParsingException("Could not build record", data.getBody().toArray(), e);
6970
}
7071
}
7172

0 commit comments

Comments
 (0)