Skip to content
This repository was archived by the owner on May 4, 2019. It is now read-only.

Commit b81d835

Browse files
committed
VizceralBridge: graphs streaming, tweak vizceral related fields
1 parent 6bf85c1 commit b81d835

File tree

2 files changed

+218
-158
lines changed

2 files changed

+218
-158
lines changed

proteus-vizceral/src/main/java/io/netifi/proteus/vizceral/service/VizceralBridge.java

Lines changed: 135 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -4,114 +4,140 @@
44
import io.netifi.proteus.tracing.ProteusTraceStreamsSupplier;
55
import io.netifi.proteus.tracing.Trace;
66
import io.netifi.proteus.tracing.TracesRequest;
7-
import io.netifi.proteus.vizceral.*;
7+
import io.netifi.proteus.viz.*;
88
import io.netty.buffer.ByteBuf;
9+
import reactor.core.publisher.Flux;
10+
import reactor.core.publisher.GroupedFlux;
11+
import reactor.core.publisher.Mono;
12+
13+
import java.time.Duration;
914
import java.util.*;
1015
import java.util.concurrent.ConcurrentHashMap;
1116
import java.util.function.Function;
1217
import java.util.stream.Collectors;
13-
import reactor.core.publisher.Flux;
14-
import reactor.core.publisher.GroupedFlux;
15-
import reactor.core.publisher.Mono;
1618

1719
public class VizceralBridge implements VizceralService {
1820

1921
private final Function<TracesRequest, Flux<Trace>> traceStreams;
22+
private int repeatIntervalSeconds;
2023

21-
public VizceralBridge(Proteus proteus, Optional<String> tracingGroup) {
22-
this(new ProteusTraceStreamsSupplier(proteus, tracingGroup));
24+
public VizceralBridge(Proteus proteus,
25+
Optional<String> tracingGroup,
26+
int repeatIntervalSeconds) {
27+
this(new ProteusTraceStreamsSupplier(proteus, tracingGroup),
28+
repeatIntervalSeconds);
2329
}
2430

25-
VizceralBridge(Function<TracesRequest, Flux<Trace>> traceStreams) {
31+
public VizceralBridge(Proteus proteus,
32+
Optional<String> tracingGroup) {
33+
this(proteus, tracingGroup, 2);
34+
}
35+
36+
VizceralBridge(Function<TracesRequest, Flux<Trace>> traceStreams,
37+
int repeatIntervalSeconds) {
2638
this.traceStreams = traceStreams;
39+
this.repeatIntervalSeconds = repeatIntervalSeconds;
2740
}
2841

2942
@Override
3043
public Flux<Node> visualisations(VisualisationRequest message, ByteBuf metadata) {
31-
return Flux.defer(
32-
() -> {
33-
Flux<GroupedFlux<Edge, Edge>> groupedEdgesPerConnection =
34-
traceStreams
35-
.apply(tracesFor(message))
36-
.onErrorResume(
37-
err ->
38-
Flux.error(
39-
new IllegalStateException("Error reading traces source stream", err)))
40-
.flatMap(this::buildEdges)
41-
.groupBy(Function.identity());
42-
43-
Map<String, Vertex> vertices = new ConcurrentHashMap<>();
44-
Flux<Connection> connections =
45-
groupedEdgesPerConnection.flatMap(
46-
edges -> {
47-
Edge edge = edges.key();
48-
49-
String sourceNodeName = vizceralSourceName(edge);
50-
String targetNodeName = vizceralTargetName(edge);
51-
52-
Mono<EdgeState> reducedEdge =
53-
edges
54-
.scan(
55-
new EdgeState(),
56-
(s, e) -> {
57-
if (edge.kind == Edge.Kind.SUCCESS) {
58-
s.addSuccess();
59-
} else {
60-
s.addError();
61-
}
62-
s.addService(e.getSvc());
63-
s.updateTimeStamp(e.getTimeStamp());
64-
return s;
65-
})
66-
.last();
67-
68-
Mono<Connection> connection =
69-
reducedEdge
70-
.doOnSuccess(
71-
s -> {
72-
long timeStamp = s.getTimestamp();
73-
74-
Vertex source = vertexByName(sourceNodeName, vertices);
75-
source.updatedAt(timeStamp);
76-
77-
Vertex target = vertexByName(targetNodeName, vertices);
78-
target.updatedAt(timeStamp);
79-
target.addMaxVolume(s.getTotal());
80-
})
81-
.map(
82-
s ->
83-
Connection.newBuilder()
84-
.setSource(sourceNodeName)
85-
.setTarget(targetNodeName)
86-
.setMetrics(s.metrics())
87-
.addAllNotices(s.notices())
88-
.build());
89-
90-
return connection;
91-
});
92-
93-
return connections
94-
.collectList()
95-
.map(
96-
conns -> {
97-
List<Node> nodesCollection =
98-
vertices.values().stream().map(Vertex::toNode).collect(Collectors.toList());
99-
100-
String rootNodeName = message.getRootNode();
101-
102-
return Node.newBuilder()
44+
return Flux.defer(() -> {
45+
Flux<GroupedFlux<Edge, Edge>> groupedEdgesPerConnection =
46+
traceStreams.apply(tracesFor(message))
47+
.onErrorResume(err ->
48+
Flux.error(
49+
new IllegalStateException("Error reading traces source stream", err)))
50+
.flatMap(this::buildEdges)
51+
.groupBy(Function.identity());
52+
53+
Map<String, Vertex> vertices = new ConcurrentHashMap<>();
54+
Flux<Connection> connections =
55+
groupedEdgesPerConnection
56+
.flatMap(edges -> {
57+
58+
Edge edge = edges.key();
59+
60+
String sourceNodeName = vizceralSourceName(edge);
61+
String targetNodeName = vizceralTargetName(edge);
62+
63+
Mono<EdgeState> reducedEdge = edges
64+
.scan(new EdgeState(), (s, e) -> {
65+
if (edge.kind == Edge.Kind.SUCCESS) {
66+
s.addSuccess();
67+
} else {
68+
s.addError();
69+
}
70+
s.addService(e.getSvc());
71+
s.updateTimeStamp(e.getTimeStamp());
72+
return s;
73+
}).last();
74+
75+
Mono<Connection> connection = reducedEdge
76+
.doOnSuccess(s -> {
77+
long timeStamp = s.getTimestamp();
78+
79+
Vertex source = vertexByName(sourceNodeName, vertices);
80+
source.updatedAt(timeStamp);
81+
82+
Vertex target = vertexByName(targetNodeName, vertices);
83+
target.updatedAt(timeStamp);
84+
target.addMaxVolume(s.getTotal());
85+
})
86+
.map(s -> Connection.newBuilder()
87+
.setSource(sourceNodeName)
88+
.setTarget(targetNodeName)
89+
.setMetrics(s.metrics())
90+
.addAllNotices(s.notices())
91+
.setUpdated(s.getTimestamp())
92+
.build());
93+
94+
return connection;
95+
});
96+
97+
return connections
98+
.collectList()
99+
.map(conns -> {
100+
101+
List<Node> nodesCollection = vertices
102+
.values()
103+
.stream()
104+
.map(Vertex::toNode)
105+
.collect(Collectors.toList());
106+
107+
String rootNodeName = message.getRootNode();
108+
109+
Node root = Optional.ofNullable(
110+
vertices.get(rootNodeName))
111+
.map(vertex -> vertex
112+
.toNodeBuilder()
113+
.setEntryNode(rootNodeName)
114+
.setRenderer("region"))
115+
.orElseGet(
116+
() -> Node.newBuilder()
103117
.setRenderer("region")
118+
.setClass_("normal")
104119
.setName(rootNodeName)
105-
.setEntryNode(rootNodeName)
106-
.addAllConnections(conns)
107-
.addAllNodes(nodesCollection)
108-
.build();
109-
});
110-
});
120+
.setDisplayName(rootNodeName)
121+
.setUpdated(
122+
nodesCollection
123+
.stream()
124+
.map(Node::getUpdated)
125+
.max(Comparator.naturalOrder()).get()))
126+
.addAllConnections(conns)
127+
.addAllNodes(nodesCollection)
128+
.build();
129+
130+
return root;
131+
});
132+
}).repeatWhen(flux -> flux.flatMap(v ->
133+
Mono.delay(Duration.ofSeconds(repeatIntervalSeconds))));
111134
}
112135

113136
private TracesRequest tracesFor(VisualisationRequest message) {
114-
return TracesRequest.newBuilder().setLookbackSeconds(message.getDataLookbackSeconds()).build();
137+
return TracesRequest
138+
.newBuilder()
139+
.setLookbackSeconds(message.getDataLookbackSeconds())
140+
.build();
115141
}
116142

117143
private Vertex vertexByName(String name, Map<String, Vertex> map) {
@@ -157,7 +183,10 @@ public void updateTimeStamp(long timestamp) {
157183
}
158184

159185
public Metrics metrics() {
160-
return Metrics.newBuilder().setNormal(successRate()).setDanger(errorRate()).build();
186+
return Metrics.newBuilder()
187+
.setNormal(successRate())
188+
.setDanger(errorRate())
189+
.build();
161190
}
162191

163192
public int getTotal() {
@@ -183,7 +212,7 @@ public long getTimestamp() {
183212
public Collection<Notice> notices() {
184213
return services
185214
.stream()
186-
.map(s -> Notice.newBuilder().setTitle(s).build())
215+
.map(s -> Notice.newBuilder().setTitle(s).setSeverity(1).build())
187216
.collect(Collectors.toList());
188217
}
189218
}
@@ -208,13 +237,16 @@ public void addMaxVolume(long maxVolume) {
208237
}
209238

210239
public Node toNode() {
240+
return toNodeBuilder().build();
241+
}
242+
243+
public Node.Builder toNodeBuilder() {
211244
return Node.newBuilder()
212245
.setName(name)
213246
.setDisplayName(name)
214247
.setClass_("normal")
215248
.setMaxVolume(maxVolume)
216-
.setUpdated(updated)
217-
.build();
249+
.setUpdated(updated);
218250
}
219251
}
220252

@@ -227,14 +259,13 @@ static class Edge {
227259
private final Kind kind;
228260
private final long timeStamp;
229261

230-
public Edge(
231-
String sourceGroup,
232-
String sourceDest,
233-
String targetGroup,
234-
String targetDest,
235-
String svc,
236-
Kind kind,
237-
long timeStamp) {
262+
public Edge(String sourceGroup,
263+
String sourceDest,
264+
String targetGroup,
265+
String targetDest,
266+
String svc,
267+
Kind kind,
268+
long timeStamp) {
238269
this.sourceGroup = sourceGroup;
239270
this.sourceDest = sourceDest;
240271
this.targetGroup = targetGroup;
@@ -273,19 +304,18 @@ public Kind getKind() {
273304
}
274305

275306
enum Kind {
276-
SUCCESS,
277-
ERROR
307+
SUCCESS, ERROR
278308
}
279309

280310
@Override
281311
public boolean equals(Object o) {
282312
if (this == o) return true;
283313
if (o == null || getClass() != o.getClass()) return false;
284314
Edge edge = (Edge) o;
285-
return Objects.equals(sourceGroup, edge.sourceGroup)
286-
&& Objects.equals(sourceDest, edge.sourceDest)
287-
&& Objects.equals(targetGroup, edge.targetGroup)
288-
&& Objects.equals(targetDest, edge.targetDest);
315+
return Objects.equals(sourceGroup, edge.sourceGroup) &&
316+
Objects.equals(sourceDest, edge.sourceDest) &&
317+
Objects.equals(targetGroup, edge.targetGroup) &&
318+
Objects.equals(targetDest, edge.targetDest);
289319
}
290320

291321
@Override
@@ -294,3 +324,4 @@ public int hashCode() {
294324
}
295325
}
296326
}
327+

0 commit comments

Comments
 (0)