1313import reactor .core .Exceptions ;
1414import reactor .core .publisher .Flux ;
1515import reactor .core .publisher .Mono ;
16- import reactor .core .publisher .ReplayProcessor ;
1716import reactor .ipc .netty .http .client .HttpClient ;
1817import reactor .ipc .netty .resources .PoolResources ;
1918import zipkin2 .proto3 .Span ;
@@ -22,30 +21,24 @@ public class ProteusZipkinHttpBridge implements ProteusTracingService {
2221 private static final Logger logger = LoggerFactory .getLogger (ProteusZipkinHttpBridge .class );
2322
2423 private static final String DEFAULT_ZIPKIN_SPANS_URL = "/api/v2/spans" ;
25- private static final Duration DEFAULT_TRACE_RETENTION = Duration . ofSeconds ( 60 ) ;
24+ private static final String DEFAULT_ZIPKIN_TRACES_URL = "/api/v2/traces" ;
2625
2726 private final String host ;
2827 private final int port ;
2928 private final String zipkinSpansUrl ;
30- private final Duration retentionPeriod ;
31- private final ReplayProcessor <Span > spansProcessor ;
3229 private HttpClient httpClient ;
33-
34- public ProteusZipkinHttpBridge (String host , int port , String zipkinSpansUrl ) {
35- this (host , port , zipkinSpansUrl , DEFAULT_TRACE_RETENTION );
36- }
37-
38- public ProteusZipkinHttpBridge (String host , int port ) {
39- this (host , port , DEFAULT_ZIPKIN_SPANS_URL , DEFAULT_TRACE_RETENTION );
40- }
30+ private TracesStreamer tracesStreamer ;
4131
4232 public ProteusZipkinHttpBridge (
43- String host , int port , String zipkinSpansUrl , Duration retentionPeriod ) {
33+ String host , int port , String zipkinSpansUrl , String zipkinTracesUrl ) {
4434 this .host = host ;
4535 this .port = port ;
4636 this .zipkinSpansUrl = zipkinSpansUrl ;
47- this .retentionPeriod = retentionPeriod ;
48- this .spansProcessor = ReplayProcessor .createTimeout (this .retentionPeriod );
37+ this .tracesStreamer = new TracesStreamer (zipkinTracesUrl , Mono .fromCallable (this ::getClient ));
38+ }
39+
40+ public ProteusZipkinHttpBridge (String host , int port ) {
41+ this (host , port , DEFAULT_ZIPKIN_SPANS_URL , DEFAULT_ZIPKIN_TRACES_URL );
4942 }
5043
5144 public static void main (String ... args ) {
@@ -58,8 +51,8 @@ public static void main(String... args) {
5851 int zipkinPort = Integer .getInteger ("netifi.proteus.zipkinPort" , 9411 );
5952 String zipkinSpansUrl =
6053 System .getProperty ("netifi.proteus.zipkinSpansUrl" , DEFAULT_ZIPKIN_SPANS_URL );
61- Duration traceRetentionPeriod =
62- Duration . ofSeconds ( Integer . getInteger ( "netifi.proteus.traceRetentionPeriod " , 60 ) );
54+ String zipkinTracesUrl =
55+ System . getProperty ( "netifi.proteus.zipkinTracesUrl " , DEFAULT_ZIPKIN_TRACES_URL );
6356 long accessKey = Long .getLong ("netifi.proteus.accessKey" , 3855261330795754807L );
6457 String accessToken =
6558 System .getProperty ("netifi.authentication.accessToken" , "kTBDVtfRBO4tHOnZzSyY5ym2kfY" );
@@ -70,7 +63,7 @@ public static void main(String... args) {
7063 logger .info ("zipkin host - {}" , zipkinHost );
7164 logger .info ("zipkin port - {}" , zipkinPort );
7265 logger .info ("zipkin spans url - {}" , zipkinSpansUrl );
73- logger .info ("traces retention period - {}" , traceRetentionPeriod );
66+ logger .info ("zipkin traces url - {}" , zipkinTracesUrl );
7467 logger .info ("access key - {}" , accessKey );
7568
7669 Proteus proteus =
@@ -85,8 +78,7 @@ public static void main(String... args) {
8578
8679 proteus .addService (
8780 new ProteusTracingServiceServer (
88- new ProteusZipkinHttpBridge (
89- zipkinHost , zipkinPort , zipkinSpansUrl , traceRetentionPeriod ),
81+ new ProteusZipkinHttpBridge (zipkinHost , zipkinPort , zipkinSpansUrl , zipkinTracesUrl ),
9082 Optional .empty (),
9183 Optional .empty ()));
9284
@@ -120,9 +112,6 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
120112 return Flux .from (messages )
121113 .map (
122114 span -> {
123- // Retain span for replay
124- spansProcessor .onNext (span );
125-
126115 try {
127116 String json = JsonFormat .printer ().print (span );
128117 if (logger .isTraceEnabled ()) {
@@ -162,18 +151,6 @@ public Mono<Ack> streamSpans(Publisher<Span> messages, ByteBuf metadata) {
162151
163152 @ Override
164153 public Flux <Trace > streamTraces (TracesRequest message , ByteBuf metadata ) {
165- Duration skipDuration = retentionPeriod .minus (Duration .ofSeconds (message .getLookbackSeconds ()));
166- // This isn't working for the moment - WIP
167- // if (skipDuration.isNegative()) {
168- // } else {
169- // traces = spansProcessor.skip(skipDuration);
170- // }
171- return spansProcessor
172- .filter (p -> p != null )
173- .window (Duration .ofSeconds (1 ))
174- .flatMap (spans -> spans .collectList ())
175- .filter (spans -> spans .size () > 0 )
176- .map (spans -> Trace .newBuilder ().addAllSpans (spans ).build ())
177- .onBackpressureDrop ();
154+ return tracesStreamer .streamTraces (message .getLookbackSeconds ());
178155 }
179156}
0 commit comments