Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit 3210e5e

Browse files
committed
Merge branch 'develop' into release/0.8.10
2 parents 2ff91be + 13b2f62 commit 3210e5e

File tree

18 files changed

+2161
-13
lines changed

18 files changed

+2161
-13
lines changed

build.gradle

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,17 @@ project(':tracing-idl') {
8888

8989
apply from: file('../gradle/java.gradle')
9090
}
91+
92+
project(':viz-idl') {
93+
description = 'Netifi Proteus Vizceral IDL'
94+
ext.artifactName = 'viz-idl'
95+
96+
apply from: file('../gradle/java.gradle')
97+
}
98+
99+
project(':vizceral') {
100+
description = 'Netifi Proteus Vizceral Service'
101+
ext.artifactName = 'vizceral'
102+
103+
apply from: file('../gradle/java.gradle')
104+
}

client/src/main/java/io/netifi/proteus/presence/BrokerInfoPresenceNotifier.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public void watch(String group) {
5555
}
5656
}
5757
})
58+
.onErrorResume(err -> Mono.delay(Duration.ofMillis(500)).then(Mono.error(err)))
5859
.retry()
5960
.subscribe(this::joinEvent));
6061
}

settings.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ include 'metrics-micrometer'
99
include 'metrics-prometheus'
1010
include 'tracing-openzipkin'
1111
include 'tracing-idl'
12+
include 'viz-idl'
13+
include 'vizceral'

tracing-idl/src/main/proto/proteus/tracing.proto

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,29 @@ syntax = "proto3";
33
package io.netifi.proteus.tracing;
44

55
import "zipkin/proto3/zipkin.proto";
6+
import "google/protobuf/empty.proto";
67

78
option java_package = "io.netifi.proteus.tracing";
89
option java_outer_classname = "ProteusTracingServices";
910
option java_multiple_files = true;
1011

1112
message Ack {}
1213

14+
message Trace {
15+
repeated zipkin.proto3.Span spans = 1;
16+
}
17+
18+
message Traces {
19+
repeated Trace traces = 1;
20+
}
21+
22+
message TracesRequest {
23+
int32 lookbackSeconds = 1;
24+
}
25+
1326
service ProteusTracingService {
27+
1428
rpc StreamSpans (stream zipkin.proto3.Span) returns (Ack) {}
29+
30+
rpc StreamTraces (TracesRequest) returns (stream Trace) {}
1531
}

tracing-openzipkin/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ dependencies {
1515

1616
compile 'io.opentracing:opentracing-api:0.31.0'
1717
compile 'io.opentracing.brave:brave-opentracing:0.31.2'
18+
compile group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.6'
19+
compile group: 'com.hubspot.jackson', name: 'jackson-datatype-protobuf', version: '0.9.10-jackson2.9-proto3'
1820

1921
testCompile 'org.apache.logging.log4j:log4j-api:2.9.0'
2022
testCompile 'org.apache.logging.log4j:log4j-core:2.9.0'
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.netifi.proteus.tracing;
2+
3+
import io.netifi.proteus.Proteus;
4+
import reactor.core.publisher.Flux;
5+
6+
import javax.inject.Inject;
7+
import java.util.Optional;
8+
import java.util.function.Function;
9+
10+
public class ProteusTraceStreamsSupplier implements Function<TracesRequest, Flux<Trace>> {
11+
12+
private final ProteusTracingServiceClient client;
13+
14+
@Inject
15+
public ProteusTraceStreamsSupplier(Proteus proteus, Optional<String> tracingGroup) {
16+
client = new ProteusTracingServiceClient(proteus.group(tracingGroup.orElse("com.netifi.proteus.tracing")));
17+
}
18+
19+
@Override
20+
public Flux<Trace> apply(TracesRequest message) {
21+
return client.streamTraces(message);
22+
}
23+
}

tracing-openzipkin/src/main/java/io/netifi/proteus/tracing/ProteusZipkinHttpBridge.java

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,22 @@
11
package io.netifi.proteus.tracing;
22

3+
import com.fasterxml.jackson.core.JsonParser;
4+
import com.fasterxml.jackson.core.JsonProcessingException;
5+
import com.fasterxml.jackson.core.JsonToken;
6+
import com.fasterxml.jackson.core.TreeNode;
7+
import com.fasterxml.jackson.core.type.TypeReference;
8+
import com.fasterxml.jackson.databind.DeserializationContext;
9+
import com.fasterxml.jackson.databind.DeserializationFeature;
10+
import com.fasterxml.jackson.databind.JsonNode;
11+
import com.fasterxml.jackson.databind.ObjectMapper;
12+
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
13+
import com.fasterxml.jackson.databind.module.SimpleDeserializers;
14+
import com.fasterxml.jackson.databind.module.SimpleModule;
15+
import com.fasterxml.jackson.databind.type.CollectionType;
16+
import com.google.protobuf.Empty;
317
import com.google.protobuf.InvalidProtocolBufferException;
18+
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
19+
import com.hubspot.jackson.datatype.protobuf.builtin.deserializers.ListValueDeserializer;
420
import io.netifi.proteus.Proteus;
521
import io.netty.buffer.ByteBuf;
622
import io.netty.channel.ChannelOption;
@@ -11,33 +27,41 @@
1127
import reactor.core.publisher.Flux;
1228
import reactor.core.publisher.Mono;
1329
import reactor.ipc.netty.http.client.HttpClient;
30+
import reactor.ipc.netty.http.client.HttpClientResponse;
1431
import reactor.ipc.netty.resources.PoolResources;
1532
import zipkin2.proto3.Span;
1633

34+
import java.io.IOException;
1735
import java.time.Duration;
36+
import java.util.List;
1837
import java.util.Optional;
1938
import java.util.StringJoiner;
39+
import java.util.function.Function;
2040

2141
public class ProteusZipkinHttpBridge implements ProteusTracingService {
2242
private static final Logger logger = LoggerFactory.getLogger(ProteusZipkinHttpBridge.class);
2343

24-
private static final String DEFAULT_ZIPKIN_URL = "/api/v2/spans";
44+
private static final String DEFAULT_ZIPKIN_SPANS_URL = "/api/v2/spans";
45+
private static final String DEFAULT_ZIPKIN_TRACES_URL = "/api/v2/traces";
2546

2647
private final String host;
27-
2848
private final int port;
29-
30-
private final String zipkinUrl;
49+
private final String zipkinSpansUrl;
3150
private HttpClient httpClient;
51+
private TracesStreamer tracesStreamer;
3252

33-
public ProteusZipkinHttpBridge(String host, int port, String zipkinUrl) {
34-
this.zipkinUrl = zipkinUrl;
53+
public ProteusZipkinHttpBridge(String host,
54+
int port,
55+
String zipkinSpansUrl,
56+
String zipkinTracesUrl) {
3557
this.host = host;
3658
this.port = port;
59+
this.zipkinSpansUrl = zipkinSpansUrl;
60+
this.tracesStreamer = new TracesStreamer(zipkinTracesUrl, Mono.fromCallable(this::getClient));
3761
}
3862

3963
public ProteusZipkinHttpBridge(String host, int port) {
40-
this(host, port, DEFAULT_ZIPKIN_URL);
64+
this(host, port, DEFAULT_ZIPKIN_SPANS_URL, DEFAULT_ZIPKIN_TRACES_URL);
4165
}
4266

4367
public static void main(String... args) {
@@ -48,7 +72,8 @@ public static void main(String... args) {
4872
int brokerPort = Integer.getInteger("netifi.proteus.port", 8001);
4973
String zipkinHost = System.getProperty("netifi.proteus.zipkinHost", "localhost");
5074
int zipkinPort = Integer.getInteger("netifi.proteus.zipkinPort", 9411);
51-
String zipkinUrl = System.getProperty("netifi.proteus.zipkinUrl", DEFAULT_ZIPKIN_URL);
75+
String zipkinSpansUrl = System.getProperty("netifi.proteus.zipkinSpansUrl", DEFAULT_ZIPKIN_SPANS_URL);
76+
String zipkinTracesUrl = System.getProperty("netifi.proteus.zipkinTracesUrl", DEFAULT_ZIPKIN_TRACES_URL);
5277
long accessKey = Long.getLong("netifi.proteus.accessKey", 3855261330795754807L);
5378
String accessToken =
5479
System.getProperty("netifi.authentication.accessToken", "kTBDVtfRBO4tHOnZzSyY5ym2kfY");
@@ -58,7 +83,8 @@ public static void main(String... args) {
5883
logger.info("broker port - {}", brokerPort);
5984
logger.info("zipkin host - {}", zipkinHost);
6085
logger.info("zipkin port - {}", zipkinPort);
61-
logger.info("zipkin url - {}", zipkinUrl);
86+
logger.info("zipkin spans url - {}", zipkinSpansUrl);
87+
logger.info("zipkin traces url - {}", zipkinTracesUrl);
6288
logger.info("access key - {}", accessKey);
6389

6490
Proteus proteus =
@@ -73,7 +99,11 @@ public static void main(String... args) {
7399

74100
proteus.addService(
75101
new ProteusTracingServiceServer(
76-
new ProteusZipkinHttpBridge(zipkinHost, zipkinPort, zipkinUrl),
102+
new ProteusZipkinHttpBridge(
103+
zipkinHost,
104+
zipkinPort,
105+
zipkinSpansUrl,
106+
zipkinTracesUrl),
77107
Optional.empty(),
78108
Optional.empty()));
79109

@@ -124,12 +154,12 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
124154
.reduce(new StringJoiner(","), StringJoiner::add)
125155
.map(stringJoiner -> "[" + stringJoiner.toString() + "]"))
126156
.onBackpressureBuffer(1 << 16)
127-
.flatMap(stringMono -> stringMono)
157+
.flatMap(stringMono -> stringMono)
128158
.concatMap(
129159
spans ->
130160
getClient()
131161
.post(
132-
zipkinUrl,
162+
zipkinSpansUrl,
133163
request -> {
134164
request.addHeader("Content-Type", "application/json");
135165
return request.sendString(Mono.just(spans));
@@ -139,7 +169,12 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
139169
8)
140170
.doOnError(
141171
throwable ->
142-
logger.error("error sending data to tracing data to url " + zipkinUrl, throwable))
172+
logger.error("error sending data to tracing data to url " + zipkinSpansUrl, throwable))
143173
.then(Mono.never());
144174
}
175+
176+
@Override
177+
public Flux<Trace> streamTraces(TracesRequest message, ByteBuf metadata) {
178+
return tracesStreamer.streamTraces(message.getLookbackSeconds());
179+
}
145180
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package io.netifi.proteus.tracing;
2+
3+
import com.fasterxml.jackson.core.JsonParser;
4+
import com.fasterxml.jackson.core.JsonToken;
5+
import com.fasterxml.jackson.core.type.TypeReference;
6+
import com.fasterxml.jackson.databind.DeserializationContext;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
9+
import com.fasterxml.jackson.databind.module.SimpleDeserializers;
10+
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
11+
import reactor.core.Exceptions;
12+
import reactor.core.publisher.Flux;
13+
import reactor.core.publisher.Mono;
14+
import reactor.ipc.netty.http.client.HttpClient;
15+
import zipkin2.proto3.Span;
16+
17+
import java.io.IOException;
18+
import java.io.InputStream;
19+
import java.util.concurrent.TimeUnit;
20+
import java.util.function.Function;
21+
22+
public class TracesStreamer {
23+
24+
private final ObjectMapper objectMapper = protoMapper();
25+
private Function<Integer, Mono<InputStream>> inputSource;
26+
27+
public TracesStreamer(String zipkinUrl,
28+
Mono<HttpClient> client) {
29+
this.inputSource = zipkinServerStream(zipkinUrl, client);
30+
}
31+
32+
public TracesStreamer(Mono<InputStream> inputSource) {
33+
this.inputSource = v -> inputSource;
34+
}
35+
36+
public Flux<Trace> streamTraces(int lookbackSeconds) {
37+
return streamTraces(inputSource.apply(lookbackSeconds));
38+
}
39+
40+
Flux<Trace> streamTraces(Mono<InputStream> input) {
41+
return input.map(is -> {
42+
try {
43+
return objectMapper.<Traces>readValue(
44+
is,
45+
new TypeReference<Traces>() {
46+
});
47+
} catch (IOException e) {
48+
throw Exceptions.propagate(e);
49+
}
50+
}).flatMapIterable(Traces::getTracesList)
51+
;
52+
}
53+
54+
private static Function<Integer, Mono<InputStream>> zipkinServerStream(String zipkinUrl,
55+
Mono<HttpClient> client) {
56+
return lookbackSeconds -> client
57+
.flatMap(c -> c
58+
.get(zipkinQuery(zipkinUrl, lookbackSeconds))
59+
.flatMap(resp ->
60+
resp.receive()
61+
.aggregate()
62+
.asInputStream()));
63+
}
64+
65+
private static String zipkinQuery(String zipkinUrl, int lookbackSeconds) {
66+
long lookbackMicros = TimeUnit.SECONDS.toMillis(lookbackSeconds);
67+
return zipkinUrl + "?lookback=" + lookbackMicros + "&limit=100000";
68+
}
69+
70+
private ObjectMapper protoMapper() {
71+
ObjectMapper mapper = new ObjectMapper();
72+
ProtobufModule module = new CustomProtoModule();
73+
mapper.registerModule(module);
74+
return mapper;
75+
}
76+
77+
public static class CustomProtoModule extends ProtobufModule {
78+
@Override
79+
public void setupModule(SetupContext context) {
80+
super.setupModule(context);
81+
SimpleDeserializers deser = new SimpleDeserializers();
82+
deser.addDeserializer(Traces.class, new TracersDeserializer());
83+
context.addDeserializers(deser);
84+
}
85+
}
86+
87+
public static class TracersDeserializer extends StdDeserializer<Traces> {
88+
89+
public TracersDeserializer() {
90+
this(null);
91+
}
92+
93+
protected TracersDeserializer(Class<?> vc) {
94+
super(vc);
95+
}
96+
97+
@Override
98+
public Traces deserialize(JsonParser p,
99+
DeserializationContext ctxt) throws IOException {
100+
p.nextToken();
101+
Traces.Builder tracesBuilder = Traces.newBuilder();
102+
while (p.currentToken() != JsonToken.END_ARRAY) {
103+
tracesBuilder.addTraces(nextTrace(p, ctxt));
104+
}
105+
p.nextToken();
106+
return tracesBuilder.build();
107+
}
108+
109+
private Trace nextTrace(JsonParser p,
110+
DeserializationContext ctxt) throws IOException {
111+
Trace.Builder traceBuilder = Trace.newBuilder();
112+
p.nextToken();
113+
while (p.currentToken() != JsonToken.END_ARRAY) {
114+
traceBuilder.addSpans(ctxt.readValue(p, Span.class));
115+
p.nextToken();
116+
}
117+
p.nextToken();
118+
return traceBuilder.build();
119+
}
120+
}
121+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.netifi.proteus.tracing;
2+
3+
import org.junit.Assert;
4+
import org.junit.Before;
5+
import org.junit.Test;
6+
import reactor.core.publisher.Mono;
7+
8+
import java.io.InputStream;
9+
import java.time.Duration;
10+
import java.util.List;
11+
12+
public class ZipkinTracesStreamerTest {
13+
14+
private TracesStreamer tracesStreamer;
15+
16+
@Before
17+
public void setUp() {
18+
tracesStreamer = new TracesStreamer(zipkinSource());
19+
}
20+
21+
@Test
22+
public void zipkinServerTracesStreaming() {
23+
List<Trace> traces = tracesStreamer
24+
.streamTraces(42)
25+
.collectList()
26+
.block(Duration.ofSeconds(10));
27+
Assert.assertFalse(traces.isEmpty());
28+
}
29+
30+
private Mono<InputStream> zipkinSource() {
31+
return Mono.fromCallable(() ->
32+
getClass().getClassLoader().getResourceAsStream("zipkin.json"));
33+
}
34+
}

0 commit comments

Comments
 (0)