1212import zipkin2 .proto3 .Annotation ;
1313import zipkin2 .proto3 .Span ;
1414
15- import java .util .List ;
16- import java .util .Map ;
17- import java .util .Objects ;
18- import java .util .Optional ;
15+ import java .util .*;
1916import java .util .concurrent .ConcurrentHashMap ;
2017import java .util .function .Function ;
2118import java .util .stream .Collectors ;
@@ -35,66 +32,56 @@ public VizceralBridge(Proteus proteus, Optional<String> tracingGroup) {
3532 @ Override
3633 public Flux <Node > visualisations (VisualisationRequest message , ByteBuf metadata ) {
3734 return Flux .defer (() -> {
38- Flux <GroupedFlux <Edge , Edge >> edgesPerDestSvc =
35+ Flux <GroupedFlux <Edge , Edge >> groupedEdgesPerConnection =
3936 traceStreams .apply (tracesFor (message ))
4037 .flatMap (this ::edges )
4138 .groupBy (Function .identity ());
4239
4340 Map <String , Vertex > vertices = new ConcurrentHashMap <>();
44-
4541 Flux <Connection > connections =
46- edgesPerDestSvc
47- .flatMap (edgesPerSvc -> {
42+ groupedEdgesPerConnection
43+ .flatMap (edges -> {
4844
49- Edge edge = edgesPerSvc .key ();
45+ Edge edge = edges .key ();
5046
5147 String sourceNodeName = vizceralSourceName (edge );
5248 String targetNodeName = vizceralTargetName (edge );
5349
54- Flux <GroupedFlux <Boolean , Edge >> edgesBySuccess =
55- edgesPerSvc
56- .groupBy (e -> e .kind == Edge .Kind .SUCCESS )
57- .publish ()
58- .autoConnect (2 );
59-
60- Mono <Long > successCount = count (edgesBySuccess .filter (e -> e .key ()));
61- Mono <Long > errorCount = count (edgesBySuccess .filter (e -> !e .key ()));
62-
63- Mono <Metrics > metrics = successCount
64- .zipWith (errorCount , (succ , err ) -> {
65-
66- long total = succ + err ;
67- double errRate = err / (double ) total ;
68- double succRate = succ / (double ) total ;
69-
70- long timeStamp = edge .getTimeStamp ();
50+ Mono <EdgeState > reducedEdge = edges
51+ .scan (new EdgeState (), (s , e ) -> {
52+ if (edge .kind == Edge .Kind .SUCCESS ) {
53+ s .addSuccess ();
54+ } else {
55+ s .addError ();
56+ }
57+ s .addService (e .getSvc ());
58+ s .updateTimeStamp (e .getTimeStamp ());
59+ return s ;
60+ }).last ();
61+
62+ Mono <Connection > connection = reducedEdge
63+ .doOnSuccess (s -> {
64+ long timeStamp = s .getTimestamp ();
7165
7266 Vertex source = vertexByName (sourceNodeName , vertices );
7367 source .updatedAt (timeStamp );
7468
7569 Vertex target = vertexByName (targetNodeName , vertices );
76- target .addMaxVolume (total );
7770 target .updatedAt (timeStamp );
78-
79- return Metrics .newBuilder ()
80- .setNormal (succRate )
81- .setDanger (errRate )
82- .build ();
83- });
84-
85- Mono <Connection > connection = metrics
86- .map (m -> Connection .newBuilder ()
71+ target .addMaxVolume (s .getTotal ());
72+ })
73+ .map (s -> Connection .newBuilder ()
8774 .setSource (sourceNodeName )
8875 .setTarget (targetNodeName )
89- .setMetrics (m )
76+ .setMetrics (s .metrics ())
77+ .addAllNotices (s .notices ())
9078 .build ());
9179
9280 return connection ;
9381 });
9482
95- Mono <List <Connection >> connectionsCollection = connections .collectList ();
96-
97- Mono <Node > rootNode = connectionsCollection
83+ return connections
84+ .collectList ()
9885 .map (conns -> {
9986
10087 List <Node > nodesCollection = vertices
@@ -103,15 +90,16 @@ public Flux<Node> visualisations(VisualisationRequest message, ByteBuf metadata)
10390 .map (Vertex ::toNode )
10491 .collect (Collectors .toList ());
10592
93+ String rootNodeName = message .getRootNode ();
94+
10695 return Node .newBuilder ()
10796 .setRenderer ("region" )
108- .setName (message . getRootNode () )
109- .setEntryNode (entryNode ( nodesCollection ) )
97+ .setName (rootNodeName )
98+ .setEntryNode (rootNodeName )
11099 .addAllConnections (conns )
111100 .addAllNodes (nodesCollection )
112101 .build ();
113102 });
114- return rootNode ;
115103 });
116104 }
117105
@@ -122,17 +110,6 @@ private TracesRequest tracesFor(VisualisationRequest message) {
122110 .build ();
123111 }
124112
125- private Mono <Long > count (Flux <GroupedFlux <Boolean , Edge >> f ) {
126- return f
127- .next ()
128- .flatMap (Flux ::count )
129- .switchIfEmpty (Mono .just (0L ));
130- }
131-
132- private String entryNode (List <Node > nodesCollection ) {
133- return nodesCollection .iterator ().next ().getName ();
134- }
135-
136113 private Vertex vertexByName (String name , Map <String , Vertex > map ) {
137114 return map .computeIfAbsent (name , this ::createVertex );
138115 }
@@ -142,11 +119,11 @@ private Vertex createVertex(String name) {
142119 }
143120
144121 private String vizceralSourceName (Edge edge ) {
145- return edge .getSourceGroup () + "-" + edge .getSourceDest () + "-" + edge . getSvc () ;
122+ return edge .getSourceGroup () + "-" + edge .getSourceDest ();
146123 }
147124
148125 private String vizceralTargetName (Edge edge ) {
149- return edge .getTargetGroup () + "-" + edge .getTargetDest () + "-" + edge . getSvc () ;
126+ return edge .getTargetGroup () + "-" + edge .getTargetDest ();
150127 }
151128
152129 private Flux <Edge > edges (Trace trace ) {
@@ -213,6 +190,63 @@ private String svc(Map<String, String> tags) {
213190 return tags .get ("proteus.service" );
214191 }
215192
193+ static class EdgeState {
194+ private int success ;
195+ private int error ;
196+ private Set <String > services = new HashSet <>();
197+ private long timestamp ;
198+
199+ public void addSuccess () {
200+ success ++;
201+ }
202+
203+ public void addError () {
204+ error ++;
205+ }
206+
207+ public void addService (String svc ) {
208+ services .add (svc );
209+ }
210+
211+ public void updateTimeStamp (long timestamp ) {
212+ this .timestamp = Math .max (this .timestamp , timestamp );
213+ }
214+
215+ public Metrics metrics () {
216+ return Metrics .newBuilder ()
217+ .setNormal (successRate ())
218+ .setDanger (errorRate ())
219+ .build ();
220+ }
221+
222+ public int getTotal () {
223+ return success + error ;
224+ }
225+
226+ public double successRate () {
227+ return success / (double ) getTotal ();
228+ }
229+
230+ public double errorRate () {
231+ return error / (double ) getTotal ();
232+ }
233+
234+ public Set <String > getServices () {
235+ return new HashSet <>(services );
236+ }
237+
238+ public long getTimestamp () {
239+ return timestamp ;
240+ }
241+
242+ public Collection <Notice > notices () {
243+ return services
244+ .stream ()
245+ .map (s -> Notice .newBuilder ().setTitle (s ).build ())
246+ .collect (Collectors .toList ());
247+ }
248+ }
249+
216250 static class Vertex {
217251 /*written from single thread*/
218252 private volatile long maxVolume = 0 ;
@@ -225,9 +259,7 @@ public Vertex(String name) {
225259 }
226260
227261 public void updatedAt (long millis ) {
228- if (millis > updated ) {
229- updated = millis ;
230- }
262+ updated = Math .max (millis , updated );
231263 }
232264
233265 public void addMaxVolume (long maxVolume ) {
@@ -310,13 +342,12 @@ public boolean equals(Object o) {
310342 return Objects .equals (sourceGroup , edge .sourceGroup ) &&
311343 Objects .equals (sourceDest , edge .sourceDest ) &&
312344 Objects .equals (targetGroup , edge .targetGroup ) &&
313- Objects .equals (targetDest , edge .targetDest ) &&
314- Objects .equals (svc , edge .svc );
345+ Objects .equals (targetDest , edge .targetDest );
315346 }
316347
317348 @ Override
318349 public int hashCode () {
319- return Objects .hash (sourceGroup , sourceDest , targetGroup , targetDest , svc );
350+ return Objects .hash (sourceGroup , sourceDest , targetGroup , targetDest );
320351 }
321352 }
322353}
0 commit comments