Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit 277ce04

Browse files
committed
Restore deleted tracing files
1 parent 9bc5d3d commit 277ce04

File tree

4 files changed

+262
-0
lines changed

4 files changed

+262
-0
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.netifi.proteus.tracing;
2+
3+
import io.netifi.proteus.Proteus;
4+
import java.util.Optional;
5+
import java.util.function.Function;
6+
import javax.inject.Inject;
7+
import reactor.core.publisher.Flux;
8+
9+
public class ProteusTraceStreamsSupplier implements Function<TracesRequest, Flux<Trace>> {
10+
11+
private final ProteusTracingServiceClient client;
12+
13+
@Inject
14+
public ProteusTraceStreamsSupplier(Proteus proteus, Optional<String> tracingGroup) {
15+
client =
16+
new ProteusTracingServiceClient(
17+
proteus.group(tracingGroup.orElse("com.netifi.proteus.tracing")));
18+
}
19+
20+
@Override
21+
public Flux<Trace> apply(TracesRequest message) {
22+
return client.streamTraces(message);
23+
}
24+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.netifi.proteus.tracing;
2+
3+
import brave.Tracing;
4+
import brave.opentracing.BraveTracer;
5+
import io.netifi.proteus.Proteus;
6+
import io.netifi.proteus.rsocket.ProteusSocket;
7+
import io.opentracing.Tracer;
8+
import java.util.Optional;
9+
import java.util.function.Supplier;
10+
import javax.inject.Inject;
11+
import javax.inject.Named;
12+
13+
@Named("ProteusTracerSupplier")
14+
public class ProteusTracerSupplier implements Supplier<Tracer> {
15+
private final Tracer tracer;
16+
17+
@Inject
18+
public ProteusTracerSupplier(Proteus proteus, Optional<String> tracingGroup) {
19+
ProteusSocket proteusSocket = proteus.group(tracingGroup.orElse("com.netifi.proteus.tracing"));
20+
21+
ProteusTracingServiceClient client = new ProteusTracingServiceClient(proteusSocket);
22+
ProteusReporter reporter =
23+
new ProteusReporter(client, proteus.getGroupName(), proteus.getDestination());
24+
25+
Tracing tracing = Tracing.newBuilder().spanReporter(reporter).build();
26+
27+
tracer = BraveTracer.create(tracing);
28+
}
29+
30+
@Override
31+
public Tracer get() {
32+
return tracer;
33+
}
34+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package io.netifi.proteus.tracing;
2+
3+
import com.fasterxml.jackson.core.JsonParser;
4+
import com.fasterxml.jackson.core.JsonToken;
5+
import com.fasterxml.jackson.core.type.TypeReference;
6+
import com.fasterxml.jackson.databind.DeserializationContext;
7+
import com.fasterxml.jackson.databind.ObjectMapper;
8+
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
9+
import com.fasterxml.jackson.databind.module.SimpleDeserializers;
10+
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
11+
import io.netty.handler.codec.json.JsonObjectDecoder;
12+
import java.io.IOException;
13+
import java.io.InputStream;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.function.Function;
16+
import org.reactivestreams.Publisher;
17+
import reactor.core.Exceptions;
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
import reactor.ipc.netty.http.client.HttpClient;
21+
import zipkin2.proto3.Span;
22+
23+
public class TracesStreamer {
24+
25+
private final ObjectMapper objectMapper = protoMapper();
26+
private Function<Integer, Publisher<InputStream>> inputSource;
27+
28+
public TracesStreamer(String zipkinUrl, Mono<HttpClient> client) {
29+
this(zipkinServerStream(zipkinUrl, client));
30+
}
31+
32+
public TracesStreamer(Publisher<InputStream> tracesSource) {
33+
this(v -> tracesSource);
34+
}
35+
36+
private TracesStreamer(Function<Integer, Publisher<InputStream>> inputSource) {
37+
this.inputSource = inputSource;
38+
}
39+
40+
public Flux<Trace> streamTraces(int lookbackSeconds) {
41+
return streamTraces(inputSource.apply(lookbackSeconds));
42+
}
43+
44+
Flux<Trace> streamTraces(Publisher<InputStream> input) {
45+
return Flux.from(input)
46+
.filter(
47+
is -> {
48+
try {
49+
return is.available() > 0;
50+
} catch (IOException e) {
51+
throw Exceptions.propagate(e);
52+
}
53+
})
54+
.map(
55+
is -> {
56+
try {
57+
return objectMapper.readValue(is, new TypeReference<Trace>() {});
58+
} catch (IOException e) {
59+
throw Exceptions.propagate(e);
60+
}
61+
});
62+
}
63+
64+
private static Function<Integer, Publisher<InputStream>> zipkinServerStream(
65+
String zipkinUrl, Mono<HttpClient> client) {
66+
return lookbackSeconds ->
67+
client.flatMapMany(
68+
c ->
69+
c.get(
70+
zipkinQuery(zipkinUrl, lookbackSeconds),
71+
req -> {
72+
req.context().addHandler(new JsonObjectDecoder(true));
73+
return Mono.empty();
74+
})
75+
.flatMapMany(resp -> resp.receive().asInputStream()));
76+
}
77+
78+
private static String zipkinQuery(String zipkinUrl, int lookbackSeconds) {
79+
long lookbackMillis = TimeUnit.SECONDS.toMillis(lookbackSeconds);
80+
return zipkinUrl + "?lookback=" + lookbackMillis + "&limit=100000";
81+
}
82+
83+
private ObjectMapper protoMapper() {
84+
ObjectMapper mapper = new ObjectMapper();
85+
ProtobufModule module = new CustomProtoModule();
86+
mapper.registerModule(module);
87+
return mapper;
88+
}
89+
90+
public static class CustomProtoModule extends ProtobufModule {
91+
@Override
92+
public void setupModule(SetupContext context) {
93+
super.setupModule(context);
94+
SimpleDeserializers deser = new SimpleDeserializers();
95+
deser.addDeserializer(Trace.class, new TracersDeserializer());
96+
context.addDeserializers(deser);
97+
}
98+
}
99+
100+
public static class TracersDeserializer extends StdDeserializer<Trace> {
101+
102+
public TracersDeserializer() {
103+
this(null);
104+
}
105+
106+
protected TracersDeserializer(Class<?> vc) {
107+
super(vc);
108+
}
109+
110+
@Override
111+
public Trace deserialize(JsonParser p, DeserializationContext ctx) throws IOException {
112+
Trace.Builder traceBuilder = Trace.newBuilder();
113+
while (p.nextToken() != JsonToken.END_ARRAY) {
114+
traceBuilder.addSpans(ctx.readValue(p, Span.class));
115+
}
116+
return traceBuilder.build();
117+
}
118+
}
119+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.netifi.proteus.tracing;
2+
3+
import io.netty.channel.ChannelOption;
4+
import java.io.ByteArrayInputStream;
5+
import java.io.InputStream;
6+
import java.nio.charset.Charset;
7+
import java.nio.charset.StandardCharsets;
8+
import java.time.Duration;
9+
import java.util.List;
10+
import java.util.Scanner;
11+
import java.util.concurrent.TimeUnit;
12+
import org.junit.Assert;
13+
import org.junit.Ignore;
14+
import org.junit.Test;
15+
import reactor.core.publisher.Flux;
16+
import reactor.core.publisher.Mono;
17+
import reactor.ipc.netty.http.client.HttpClient;
18+
import reactor.ipc.netty.resources.PoolResources;
19+
20+
public class ZipkinTracesStreamerTest {
21+
22+
private static final int COUNT = 42;
23+
24+
@Test
25+
public void zipkinServerTracesStreaming() {
26+
TracesStreamer tracesStreamer = new TracesStreamer(zipkinSource(COUNT));
27+
List<Trace> traces =
28+
tracesStreamer.streamTraces(42).collectList().block(Duration.ofSeconds(10));
29+
Assert.assertEquals(COUNT, traces.size());
30+
}
31+
32+
@Test
33+
public void emptyResponse() {
34+
TracesStreamer tracesStreamer = new TracesStreamer(emptySource());
35+
List<Trace> traces =
36+
tracesStreamer.streamTraces(42).collectList().block(Duration.ofSeconds(10));
37+
Assert.assertTrue(traces.isEmpty());
38+
}
39+
40+
private Flux<InputStream> zipkinSource(int count) {
41+
return Mono.fromCallable(
42+
() -> {
43+
try (InputStream trace =
44+
getClass().getClassLoader().getResourceAsStream("zipkin_trace.json")) {
45+
Charset utf8 = StandardCharsets.UTF_8;
46+
try (java.util.Scanner s = new Scanner(trace, utf8.name())) {
47+
return s.useDelimiter("\\A").hasNext() ? s.next() : "";
48+
}
49+
}
50+
})
51+
.flatMapMany(trace -> Flux.range(1, count).map(v -> trace).map(this::asInputStream));
52+
}
53+
54+
private InputStream asInputStream(String s) {
55+
return new ByteArrayInputStream(s.getBytes(StandardCharsets.UTF_8));
56+
}
57+
58+
private Flux<InputStream> emptySource() {
59+
return Flux.just(asInputStream(""));
60+
}
61+
62+
@Ignore("requires local zipkin server")
63+
@Test
64+
public void streamerIntegrationTest() {
65+
TracesStreamer streamer = new TracesStreamer("/api/v2/traces", client());
66+
Flux<Trace> traces = streamer.streamTraces((int) TimeUnit.SECONDS.toSeconds(10));
67+
List<Trace> tracesList = traces.collectList().block();
68+
Assert.assertFalse(tracesList.isEmpty());
69+
}
70+
71+
private Mono<HttpClient> client() {
72+
return Mono.just(
73+
HttpClient.builder()
74+
.options(
75+
builder ->
76+
builder
77+
.compression(true)
78+
.poolResources(PoolResources.fixed("proteusZipkinBridge"))
79+
.option(ChannelOption.SO_KEEPALIVE, true)
80+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30_000)
81+
.host("127.0.0.1")
82+
.port(9411))
83+
.build());
84+
}
85+
}

0 commit comments

Comments
 (0)