Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit a2bd88d

Browse files
committed
change from group-by mechanism to streaming
1 parent a2f78f1 commit a2bd88d

File tree

1 file changed

+51
-75
lines changed

1 file changed

+51
-75
lines changed

proteus-vizceral/src/main/java/io/netifi/proteus/vizceral/service/VizceralBridge.java

Lines changed: 51 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.slf4j.Logger;
1818
import org.slf4j.LoggerFactory;
1919
import reactor.core.publisher.Flux;
20-
import reactor.core.publisher.GroupedFlux;
2120
import reactor.core.publisher.Mono;
2221

2322
public class VizceralBridge implements VizceralService {
@@ -60,99 +59,76 @@ public Flux<Node> visualisations(com.google.protobuf.Empty empty, ByteBuf metada
6059

6160
return Flux.defer(
6261
() -> {
63-
Flux<GroupedFlux<Edge, Edge>> groupedEdgesPerConnection =
62+
Flux<Edge> groupedEdgesPerConnection =
6463
traceStreams
6564
.apply(traceRequest(tracesLookbackSeconds))
6665
.onErrorResume(
6766
err ->
6867
Flux.error(
6968
new IllegalStateException(
7069
"Error reading traces source stream", err)))
71-
.flatMap(this::buildEdges)
72-
.groupBy(Function.identity());
70+
.flatMap(this::buildEdges);
71+
// .groupBy(Function.identity());
7372

7473
Map<String, Vertex> vertices = new ConcurrentHashMap<>();
74+
Map<Edge, EdgeState> edgeStates = new ConcurrentHashMap<>();
7575
RootNodeFinder rootNodeFinder = new RootNodeFinder();
7676

7777
Flux<Connection> connections =
78-
groupedEdgesPerConnection.flatMap(
79-
edges -> {
80-
Edge edge = edges.key();
81-
78+
groupedEdgesPerConnection.map(
79+
edge -> {
8280
String sourceNodeName = vizceralSourceName(edge);
8381
String targetNodeName = vizceralTargetName(edge);
8482

8583
rootNodeFinder.addEdge(sourceNodeName, targetNodeName);
8684

87-
Mono<EdgeState> reducedEdge =
88-
edges
89-
.scan(
90-
new EdgeState(),
91-
(s, e) -> {
92-
if (edge.kind == Edge.Kind.SUCCESS) {
93-
s.addSuccess();
94-
} else {
95-
s.addError();
96-
}
97-
s.addService(e.getSvc());
98-
s.updateTimeStamp(e.getTimeStamp());
99-
return s;
100-
})
101-
.last();
102-
103-
Mono<Connection> connection =
104-
reducedEdge
105-
.doOnSuccess(
106-
s -> {
107-
long timeStamp = s.getTimestamp();
108-
109-
Vertex source = vertexByName(sourceNodeName, vertices);
110-
source.updatedAt(timeStamp);
111-
112-
Vertex target = vertexByName(targetNodeName, vertices);
113-
target.updatedAt(timeStamp);
114-
target.addMaxVolume(s.getTotal());
115-
})
116-
.map(
117-
s ->
118-
Connection.newBuilder()
119-
.setSource(sourceNodeName)
120-
.setTarget(targetNodeName)
121-
.setMetrics(s.metrics())
122-
.addAllNotices(s.notices())
123-
.setUpdated(s.getTimestamp())
124-
.build());
125-
126-
return connection;
85+
EdgeState s = edgeStates.computeIfAbsent(edge, e -> new EdgeState());
86+
87+
if (edge.getKind() == Edge.Kind.SUCCESS) {
88+
s.addSuccess();
89+
} else {
90+
s.addError();
91+
}
92+
s.addService(edge.getSvc());
93+
s.updateTimeStamp(edge.getTimeStamp());
94+
long timeStamp = s.getTimestamp();
95+
96+
Vertex source = vertexByName(sourceNodeName, vertices);
97+
source.updatedAt(timeStamp);
98+
99+
Vertex target = vertexByName(targetNodeName, vertices);
100+
target.updatedAt(timeStamp);
101+
target.addMaxVolume(s.getTotal());
102+
103+
return Connection.newBuilder()
104+
.setSource(sourceNodeName)
105+
.setTarget(targetNodeName)
106+
.setMetrics(s.metrics())
107+
.addAllNotices(s.notices())
108+
.setUpdated(s.getTimestamp())
109+
.build();
127110
});
128111

129-
return connections
130-
.collectList()
131-
.filter(l -> !l.isEmpty())
132-
.flatMap(
133-
conns -> {
134-
List<Node> nodesCollection =
135-
vertices
136-
.values()
137-
.stream()
138-
.map(Vertex::toNode)
139-
.collect(Collectors.toList());
140-
141-
String rootNodeName = rootNodeFinder.getRootNode();
142-
143-
Node root =
144-
vertices
145-
.get(rootNodeName)
146-
.toNodeBuilder()
147-
.setMaxVolume(0)
148-
.setEntryNode(rootNodeName)
149-
.setRenderer("region")
150-
.addAllConnections(conns)
151-
.addAllNodes(nodesCollection)
152-
.build();
153-
154-
return Mono.just(root);
155-
});
112+
return connections.flatMap(
113+
conn -> {
114+
List<Node> nodesCollection =
115+
vertices.values().stream().map(Vertex::toNode).collect(Collectors.toList());
116+
117+
String rootNodeName = rootNodeFinder.getRootNode();
118+
119+
Node root =
120+
vertices
121+
.get(rootNodeName)
122+
.toNodeBuilder()
123+
.setMaxVolume(0)
124+
.setEntryNode(rootNodeName)
125+
.setRenderer("region")
126+
.addConnections(conn)
127+
.addAllNodes(nodesCollection)
128+
.build();
129+
130+
return Mono.just(root);
131+
});
156132
})
157133
.onErrorResume(new BackOff())
158134
.retry()

0 commit comments

Comments
 (0)