88import com .fasterxml .jackson .databind .deser .std .StdDeserializer ;
99import com .fasterxml .jackson .databind .module .SimpleDeserializers ;
1010import com .hubspot .jackson .datatype .protobuf .ProtobufModule ;
11+ import io .netty .handler .codec .json .JsonObjectDecoder ;
12+ import org .reactivestreams .Publisher ;
1113import reactor .core .Exceptions ;
1214import reactor .core .publisher .Flux ;
1315import reactor .core .publisher .Mono ;
2224public class TracesStreamer {
2325
2426 private final ObjectMapper objectMapper = protoMapper ();
25- private Function <Integer , Mono <InputStream >> inputSource ;
27+ private Function <Integer , Publisher <InputStream >> inputSource ;
2628
2729 public TracesStreamer (String zipkinUrl ,
2830 Mono <HttpClient > client ) {
29- this . inputSource = zipkinServerStream (zipkinUrl , client );
31+ this ( zipkinServerStream (zipkinUrl , client ) );
3032 }
3133
32- public TracesStreamer (Mono <InputStream > inputSource ) {
33- this .inputSource = v -> inputSource ;
34+ public TracesStreamer (Publisher <InputStream > tracesSource ) {
35+ this (v -> tracesSource );
36+ }
37+
38+ private TracesStreamer (Function <Integer , Publisher <InputStream >> inputSource ) {
39+ this .inputSource = inputSource ;
3440 }
3541
3642 public Flux <Trace > streamTraces (int lookbackSeconds ) {
3743 return streamTraces (inputSource .apply (lookbackSeconds ));
3844 }
3945
40- Flux <Trace > streamTraces (Mono <InputStream > input ) {
41- return input .map (is -> {
42- try {
43- return objectMapper .<Traces >readValue (
44- is ,
45- new TypeReference <Traces >() {
46- });
47- } catch (IOException e ) {
48- throw Exceptions .propagate (e );
49- }
50- }).flatMapIterable (Traces ::getTracesList )
51- ;
46+ Flux <Trace > streamTraces (Publisher <InputStream > input ) {
47+ return Flux .from (input )
48+ .filter (is -> {
49+ try {
50+ return is .available () > 0 ;
51+ } catch (IOException e ) {
52+ throw Exceptions .propagate (e );
53+ }
54+ })
55+ .map (is -> {
56+ try {
57+ return objectMapper .readValue (
58+ is ,
59+ new TypeReference <Trace >() {
60+ });
61+ } catch (IOException e ) {
62+ throw Exceptions .propagate (e );
63+ }
64+ });
5265 }
5366
54- private static Function <Integer , Mono <InputStream >> zipkinServerStream (String zipkinUrl ,
55- Mono <HttpClient > client ) {
67+ private static Function <Integer , Publisher <InputStream >> zipkinServerStream (String zipkinUrl ,
68+ Mono <HttpClient > client ) {
5669 return lookbackSeconds -> client
57- .flatMap (c -> c
58- .get (zipkinQuery (zipkinUrl , lookbackSeconds ))
59- .flatMap (resp ->
70+ .flatMapMany (c -> c
71+ .get (
72+ zipkinQuery (zipkinUrl , lookbackSeconds ),
73+ req -> {
74+ req .context ().addHandler (new JsonObjectDecoder (true ));
75+ return Mono .empty ();
76+ })
77+ .flatMapMany (resp ->
6078 resp .receive ()
61- .aggregate ()
6279 .asInputStream ()));
6380 }
6481
6582 private static String zipkinQuery (String zipkinUrl , int lookbackSeconds ) {
66- long lookbackMicros = TimeUnit .SECONDS .toMillis (lookbackSeconds );
67- return zipkinUrl + "?lookback=" + lookbackMicros + "&limit=100000" ;
83+ long lookbackMillis = TimeUnit .SECONDS .toMillis (lookbackSeconds );
84+ return zipkinUrl + "?lookback=" + lookbackMillis + "&limit=100000" ;
6885 }
6986
7087 private ObjectMapper protoMapper () {
@@ -79,12 +96,12 @@ public static class CustomProtoModule extends ProtobufModule {
7996 public void setupModule (SetupContext context ) {
8097 super .setupModule (context );
8198 SimpleDeserializers deser = new SimpleDeserializers ();
82- deser .addDeserializer (Traces .class , new TracersDeserializer ());
99+ deser .addDeserializer (Trace .class , new TracersDeserializer ());
83100 context .addDeserializers (deser );
84101 }
85102 }
86103
87- public static class TracersDeserializer extends StdDeserializer <Traces > {
104+ public static class TracersDeserializer extends StdDeserializer <Trace > {
88105
89106 public TracersDeserializer () {
90107 this (null );
@@ -95,26 +112,12 @@ protected TracersDeserializer(Class<?> vc) {
95112 }
96113
97114 @ Override
98- public Traces deserialize (JsonParser p ,
99- DeserializationContext ctxt ) throws IOException {
100- p .nextToken ();
101- Traces .Builder tracesBuilder = Traces .newBuilder ();
102- while (p .currentToken () != JsonToken .END_ARRAY ) {
103- tracesBuilder .addTraces (nextTrace (p , ctxt ));
104- }
105- p .nextToken ();
106- return tracesBuilder .build ();
107- }
108-
109- private Trace nextTrace (JsonParser p ,
110- DeserializationContext ctxt ) throws IOException {
115+ public Trace deserialize (JsonParser p ,
116+ DeserializationContext ctx ) throws IOException {
111117 Trace .Builder traceBuilder = Trace .newBuilder ();
112- p .nextToken ();
113- while (p .currentToken () != JsonToken .END_ARRAY ) {
114- traceBuilder .addSpans (ctxt .readValue (p , Span .class ));
115- p .nextToken ();
118+ while (p .nextToken () != JsonToken .END_ARRAY ) {
119+ traceBuilder .addSpans (ctx .readValue (p , Span .class ));
116120 }
117- p .nextToken ();
118121 return traceBuilder .build ();
119122 }
120123 }
0 commit comments