66import io .netifi .proteus .tracing .TracesRequest ;
77import io .netifi .proteus .vizceral .*;
88import io .netty .buffer .ByteBuf ;
9+ import java .time .Duration ;
910import java .util .*;
1011import java .util .concurrent .ConcurrentHashMap ;
12+ import java .util .concurrent .atomic .AtomicInteger ;
13+ import java .util .concurrent .atomic .AtomicLong ;
1114import java .util .function .Function ;
1215import java .util .stream .Collectors ;
16+ import org .reactivestreams .Publisher ;
17+ import org .slf4j .Logger ;
18+ import org .slf4j .LoggerFactory ;
1319import reactor .core .publisher .Flux ;
14- import reactor .core .publisher .GroupedFlux ;
1520import reactor .core .publisher .Mono ;
1621
1722public class VizceralBridge implements VizceralService {
23+ private static final Logger logger = LoggerFactory .getLogger (VizceralBridge .class );
1824
1925 private final Function <TracesRequest , Flux <Trace >> traceStreams ;
26+ private int repeatIntervalSeconds ;
27+ private int tracesLookbackSeconds ;
28+
29+ public VizceralBridge (
30+ Proteus proteus ,
31+ Optional <String > tracingGroup ,
32+ int intervalSeconds ,
33+ int tracesLookbackSeconds ) {
34+ this (
35+ new ProteusTraceStreamsSupplier (proteus , tracingGroup ),
36+ intervalSeconds ,
37+ tracesLookbackSeconds );
38+ }
2039
2140 public VizceralBridge (Proteus proteus , Optional <String > tracingGroup ) {
22- this (new ProteusTraceStreamsSupplier (proteus , tracingGroup ));
41+ this (proteus , tracingGroup , 5 , 10 );
42+ }
43+
44+ public VizceralBridge (Proteus proteus ) {
45+ this (proteus , Optional .empty (), 5 , 10 );
2346 }
2447
25- VizceralBridge (Function <TracesRequest , Flux <Trace >> traceStreams ) {
48+ VizceralBridge (
49+ Function <TracesRequest , Flux <Trace >> traceStreams ,
50+ int repeatIntervalSeconds ,
51+ int tracesLookbackSeconds ) {
2652 this .traceStreams = traceStreams ;
53+ this .repeatIntervalSeconds = repeatIntervalSeconds ;
54+ this .tracesLookbackSeconds = tracesLookbackSeconds ;
2755 }
2856
2957 @ Override
30- public Flux <Node > visualisations (VisualisationRequest message , ByteBuf metadata ) {
31- return Flux .defer (
32- () -> {
33- Flux <GroupedFlux <Edge , Edge >> groupedEdgesPerConnection =
34- traceStreams
35- .apply (tracesFor (message ))
36- .onErrorResume (
37- err ->
38- Flux .error (
39- new IllegalStateException ("Error reading traces source stream" , err )))
40- .flatMap (this ::buildEdges )
41- .groupBy (Function .identity ());
42-
43- Map <String , Vertex > vertices = new ConcurrentHashMap <>();
44- Flux <Connection > connections =
45- groupedEdgesPerConnection .flatMap (
46- edges -> {
47- Edge edge = edges .key ();
48-
49- String sourceNodeName = vizceralSourceName (edge );
50- String targetNodeName = vizceralTargetName (edge );
51-
52- Mono <EdgeState > reducedEdge =
53- edges
54- .scan (
55- new EdgeState (),
56- (s , e ) -> {
57- if (edge .kind == Edge .Kind .SUCCESS ) {
58- s .addSuccess ();
59- } else {
60- s .addError ();
61- }
62- s .addService (e .getSvc ());
63- s .updateTimeStamp (e .getTimeStamp ());
64- return s ;
65- })
66- .last ();
67-
68- Mono <Connection > connection =
69- reducedEdge
70- .doOnSuccess (
71- s -> {
72- long timeStamp = s .getTimestamp ();
73-
74- Vertex source = vertexByName (sourceNodeName , vertices );
75- source .updatedAt (timeStamp );
76-
77- Vertex target = vertexByName (targetNodeName , vertices );
78- target .updatedAt (timeStamp );
79- target .addMaxVolume (s .getTotal ());
80- })
81- .map (
82- s ->
83- Connection .newBuilder ()
84- .setSource (sourceNodeName )
85- .setTarget (targetNodeName )
86- .setMetrics (s .metrics ())
87- .addAllNotices (s .notices ())
88- .build ());
89-
90- return connection ;
91- });
58+ public Flux <Node > visualisations (com .google .protobuf .Empty empty , ByteBuf metadata ) {
9259
93- return connections
94- .collectList ()
95- .map (
96- conns -> {
60+ return Flux .defer (
61+ () -> {
62+ Flux <Edge > groupedEdgesPerConnection =
63+ traceStreams
64+ .apply (traceRequest (tracesLookbackSeconds ))
65+ .onErrorResume (
66+ err ->
67+ Flux .error (
68+ new IllegalStateException (
69+ "Error reading traces source stream" , err )))
70+ .flatMap (this ::buildEdges );
71+ // .groupBy(Function.identity());
72+
73+ Map <String , Vertex > vertices = new ConcurrentHashMap <>();
74+ Map <Edge , EdgeState > edgeStates = new ConcurrentHashMap <>();
75+ RootNodeFinder rootNodeFinder = new RootNodeFinder ();
76+
77+ Flux <Connection > connections =
78+ groupedEdgesPerConnection .map (
79+ edge -> {
80+ String sourceNodeName = vizceralSourceName (edge );
81+ String targetNodeName = vizceralTargetName (edge );
82+
83+ rootNodeFinder .addEdge (sourceNodeName , targetNodeName );
84+
85+ EdgeState s = edgeStates .computeIfAbsent (edge , e -> new EdgeState ());
86+
87+ if (edge .getKind () == Edge .Kind .SUCCESS ) {
88+ s .addSuccess ();
89+ } else {
90+ s .addError ();
91+ }
92+ s .addService (edge .getSvc ());
93+ s .updateTimeStamp (edge .getTimeStamp ());
94+ long timeStamp = s .getTimestamp ();
95+
96+ Vertex source = vertexByName (sourceNodeName , vertices );
97+ source .updatedAt (timeStamp );
98+
99+ Vertex target = vertexByName (targetNodeName , vertices );
100+ target .updatedAt (timeStamp );
101+ target .addMaxVolume (s .getTotal ());
102+
103+ return Connection .newBuilder ()
104+ .setSource (sourceNodeName )
105+ .setTarget (targetNodeName )
106+ .setMetrics (s .metrics ())
107+ .addAllNotices (s .notices ())
108+ .setUpdated (s .getTimestamp ())
109+ .build ();
110+ });
111+
112+ return connections .flatMap (
113+ conn -> {
97114 List <Node > nodesCollection =
98115 vertices .values ().stream ().map (Vertex ::toNode ).collect (Collectors .toList ());
99116
100- String rootNodeName = message .getRootNode ();
117+ String rootNodeName = rootNodeFinder .getRootNode ();
118+
119+ Node root =
120+ vertices
121+ .get (rootNodeName )
122+ .toNodeBuilder ()
123+ .setMaxVolume (0 )
124+ .setEntryNode (rootNodeName )
125+ .setRenderer ("region" )
126+ .addConnections (conn )
127+ .addAllNodes (nodesCollection )
128+ .build ();
101129
102- return Node .newBuilder ()
103- .setRenderer ("region" )
104- .setName (rootNodeName )
105- .setEntryNode (rootNodeName )
106- .addAllConnections (conns )
107- .addAllNodes (nodesCollection )
108- .build ();
130+ return Mono .just (root );
109131 });
110- });
132+ })
133+ .onErrorResume (new BackOff ())
134+ .retry ()
135+ .repeatWhen (
136+ flux -> flux .flatMap (v -> Mono .delay (Duration .ofSeconds (repeatIntervalSeconds ))));
111137 }
112138
113- private TracesRequest tracesFor ( VisualisationRequest message ) {
114- return TracesRequest .newBuilder ().setLookbackSeconds (message . getDataLookbackSeconds () ).build ();
139+ private static TracesRequest traceRequest ( int tracesLookbackSeconds ) {
140+ return TracesRequest .newBuilder ().setLookbackSeconds (tracesLookbackSeconds ).build ();
115141 }
116142
117143 private Vertex vertexByName (String name , Map <String , Vertex > map ) {
@@ -134,6 +160,25 @@ private Flux<Edge> buildEdges(Trace trace) {
134160 return Flux .create (new EdgesBuilder (trace .getSpansList ()));
135161 }
136162
163+ static class RootNodeFinder {
164+ private final Set <String > responders = new HashSet <>();
165+ private final Set <String > requesters = new HashSet <>();
166+
167+ public void addEdge (String source , String target ) {
168+ responders .add (target );
169+ requesters .remove (target );
170+ if (!responders .contains (source )) {
171+ requesters .add (source );
172+ }
173+ }
174+
175+ public String getRootNode () {
176+ Set <String > target =
177+ !requesters .isEmpty () ? requesters : !responders .isEmpty () ? responders : null ;
178+ return Optional .ofNullable (target ).map (v -> v .iterator ().next ()).orElse ("" );
179+ }
180+ }
181+
137182 static class EdgeState {
138183 private int success ;
139184 private int error ;
@@ -183,7 +228,7 @@ public long getTimestamp() {
183228 public Collection <Notice > notices () {
184229 return services
185230 .stream ()
186- .map (s -> Notice .newBuilder ().setTitle (s ).build ())
231+ .map (s -> Notice .newBuilder ().setTitle (s ).setSeverity ( 1 ). build ())
187232 .collect (Collectors .toList ());
188233 }
189234 }
@@ -208,13 +253,16 @@ public void addMaxVolume(long maxVolume) {
208253 }
209254
210255 public Node toNode () {
256+ return toNodeBuilder ().build ();
257+ }
258+
259+ public Node .Builder toNodeBuilder () {
211260 return Node .newBuilder ()
212261 .setName (name )
213262 .setDisplayName (name )
214263 .setClass_ ("normal" )
215264 .setMaxVolume (maxVolume )
216- .setUpdated (updated )
217- .build ();
265+ .setUpdated (updated );
218266 }
219267 }
220268
@@ -293,4 +341,20 @@ public int hashCode() {
293341 return Objects .hash (sourceGroup , sourceDest , targetGroup , targetDest );
294342 }
295343 }
344+
345+ private static class BackOff implements Function <Throwable , Publisher <? extends Node >> {
346+ AtomicLong lastRetry = new AtomicLong (System .currentTimeMillis ());
347+ AtomicInteger count = new AtomicInteger ();
348+
349+ @ Override
350+ public Publisher <? extends Node > apply (Throwable throwable ) {
351+ if (System .currentTimeMillis () - lastRetry .getAndSet (System .currentTimeMillis ()) > 30_000 ) {
352+ count .set (0 );
353+ }
354+
355+ int i = Math .min (30 , count .incrementAndGet ());
356+ logger .error ("error getting vizceral data" , throwable );
357+ return Mono .delay (Duration .ofSeconds (i )).then (Mono .error (throwable ));
358+ }
359+ }
296360}
0 commit comments