1313import reactor .core .Exceptions ;
1414import reactor .core .publisher .Flux ;
1515import reactor .core .publisher .Mono ;
16+ import reactor .core .publisher .ReplayProcessor ;
1617import reactor .ipc .netty .http .client .HttpClient ;
1718import reactor .ipc .netty .resources .PoolResources ;
1819import zipkin2 .proto3 .Span ;
@@ -21,24 +22,30 @@ public class ProteusZipkinHttpBridge implements ProteusTracingService {
2122 private static final Logger logger = LoggerFactory .getLogger (ProteusZipkinHttpBridge .class );
2223
2324 private static final String DEFAULT_ZIPKIN_SPANS_URL = "/api/v2/spans" ;
24- private static final String DEFAULT_ZIPKIN_TRACES_URL = "/api/v2/traces" ;
25+ private static final Duration DEFAULT_TRACE_RETENTION = Duration . ofSeconds ( 60 ) ;
2526
2627 private final String host ;
2728 private final int port ;
2829 private final String zipkinSpansUrl ;
30+ private final Duration retentionPeriod ;
31+ private final ReplayProcessor <Span > spansProcessor ;
2932 private HttpClient httpClient ;
30- private TracesStreamer tracesStreamer ;
33+
34+ public ProteusZipkinHttpBridge (String host , int port , String zipkinSpansUrl ) {
35+ this (host , port , zipkinSpansUrl , DEFAULT_TRACE_RETENTION );
36+ }
37+
38+ public ProteusZipkinHttpBridge (String host , int port ) {
39+ this (host , port , DEFAULT_ZIPKIN_SPANS_URL , DEFAULT_TRACE_RETENTION );
40+ }
3141
3242 public ProteusZipkinHttpBridge (
33- String host , int port , String zipkinSpansUrl , String zipkinTracesUrl ) {
43+ String host , int port , String zipkinSpansUrl , Duration retentionPeriod ) {
3444 this .host = host ;
3545 this .port = port ;
3646 this .zipkinSpansUrl = zipkinSpansUrl ;
37- this .tracesStreamer = new TracesStreamer (zipkinTracesUrl , Mono .fromCallable (this ::getClient ));
38- }
39-
40- public ProteusZipkinHttpBridge (String host , int port ) {
41- this (host , port , DEFAULT_ZIPKIN_SPANS_URL , DEFAULT_ZIPKIN_TRACES_URL );
47+ this .retentionPeriod = retentionPeriod ;
48+ this .spansProcessor = ReplayProcessor .createTimeout (this .retentionPeriod );
4249 }
4350
4451 public static void main (String ... args ) {
@@ -51,8 +58,8 @@ public static void main(String... args) {
5158 int zipkinPort = Integer .getInteger ("netifi.proteus.zipkinPort" , 9411 );
5259 String zipkinSpansUrl =
5360 System .getProperty ("netifi.proteus.zipkinSpansUrl" , DEFAULT_ZIPKIN_SPANS_URL );
54- String zipkinTracesUrl =
55- System . getProperty ( "netifi.proteus.zipkinTracesUrl " , DEFAULT_ZIPKIN_TRACES_URL );
61+ Duration traceRetentionPeriod =
62+ Duration . ofSeconds ( Integer . getInteger ( "netifi.proteus.traceRetentionPeriod " , 60 ) );
5663 long accessKey = Long .getLong ("netifi.proteus.accessKey" , 3855261330795754807L );
5764 String accessToken =
5865 System .getProperty ("netifi.authentication.accessToken" , "kTBDVtfRBO4tHOnZzSyY5ym2kfY" );
@@ -63,7 +70,7 @@ public static void main(String... args) {
6370 logger .info ("zipkin host - {}" , zipkinHost );
6471 logger .info ("zipkin port - {}" , zipkinPort );
6572 logger .info ("zipkin spans url - {}" , zipkinSpansUrl );
66- logger .info ("zipkin traces url - {}" , zipkinTracesUrl );
73+ logger .info ("traces retention period - {}" , traceRetentionPeriod );
6774 logger .info ("access key - {}" , accessKey );
6875
6976 Proteus proteus =
@@ -78,7 +85,8 @@ public static void main(String... args) {
7885
7986 proteus .addService (
8087 new ProteusTracingServiceServer (
81- new ProteusZipkinHttpBridge (zipkinHost , zipkinPort , zipkinSpansUrl , zipkinTracesUrl ),
88+ new ProteusZipkinHttpBridge (
89+ zipkinHost , zipkinPort , zipkinSpansUrl , traceRetentionPeriod ),
8290 Optional .empty (),
8391 Optional .empty ()));
8492
@@ -112,6 +120,9 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
112120 return Flux .from (messages )
113121 .map (
114122 span -> {
123+ // Retain span for replay
124+ spansProcessor .onNext (span );
125+
115126 try {
116127 String json = JsonFormat .printer ().print (span );
117128 if (logger .isTraceEnabled ()) {
@@ -151,6 +162,18 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
151162
152163 @ Override
153164 public Flux <Trace > streamTraces (TracesRequest message , ByteBuf metadata ) {
154- return tracesStreamer .streamTraces (message .getLookbackSeconds ());
165+ Duration skipDuration = retentionPeriod .minus (Duration .ofSeconds (message .getLookbackSeconds ()));
166+ // This isn't working for the moment - WIP
167+ // if (skipDuration.isNegative()) {
168+ // } else {
169+ // traces = spansProcessor.skip(skipDuration);
170+ // }
171+ return spansProcessor
172+ .filter (p -> p != null )
173+ .window (Duration .ofSeconds (1 ))
174+ .flatMap (spans -> spans .collectList ())
175+ .filter (spans -> spans .size () > 0 )
176+ .map (spans -> Trace .newBuilder ().addAllSpans (spans ).build ())
177+ .onBackpressureDrop ();
155178 }
156179}
0 commit comments