1010import io .opentelemetry .sdk .metrics .data .MetricData ;
1111import io .opentelemetry .sdk .trace .data .SpanData ;
1212import io .opentelemetry .testing .internal .armeria .client .WebClient ;
13- import java .nio .charset .StandardCharsets ;
13+ import io .opentelemetry .testing .internal .jackson .core .JsonProcessingException ;
14+ import io .opentelemetry .testing .internal .jackson .databind .ObjectMapper ;
15+ import io .opentelemetry .testing .internal .proto .collector .trace .v1 .ExportTraceServiceRequest ;
16+ import io .opentelemetry .testing .internal .protobuf .GeneratedMessage ;
17+ import io .opentelemetry .testing .internal .protobuf .InvalidProtocolBufferException ;
18+ import io .opentelemetry .testing .internal .protobuf .util .JsonFormat ;
1419import java .util .Collection ;
1520import java .util .Collections ;
1621import java .util .List ;
1722import java .util .concurrent .TimeUnit ;
23+ import java .util .function .Supplier ;
24+ import java .util .stream .Collectors ;
1825
1926public class JavaTelemetryRetriever {
27+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
2028 private final WebClient client ;
2129
2230 public JavaTelemetryRetriever (int backendPort ) {
@@ -28,49 +36,76 @@ public void clearTelemetry() {
2836 }
2937
3038 public List <SpanData > waitForTraces () {
31- try {
32- return TelemetryConverter . getSpanData (
33- Collections . singletonList ( waitForContent ( "get-traces" )));
34- } catch ( InterruptedException e ) {
35- throw new RuntimeException ( e );
36- }
39+ Collection < ExportTraceServiceRequest > requests =
40+ waitForTelemetry ( "get-traces" , () -> ExportTraceServiceRequest . newBuilder ());
41+ return TelemetryConverter . getSpanData (
42+ requests . stream ()
43+ . flatMap ( r -> r . getResourceSpansList (). stream ())
44+ . collect ( Collectors . toList ()));
3745 }
3846
3947 public Collection <MetricData > waitForMetrics () {
40- try {
41- return TelemetryConverter .getMetricsData (
42- Collections .singletonList (waitForContent ("get-metrics" )));
43- } catch (InterruptedException e ) {
44- throw new RuntimeException (e );
45- }
48+ // try {
49+ // return TelemetryConverter.getMetricsData(singletonList(waitForContent("get-metrics")));
50+ // } catch (InterruptedException e) {
51+ // throw new RuntimeException(e);
52+ // }
53+ // todo
54+ return Collections .emptyList ();
4655 }
4756
4857 public Collection <LogRecordData > waitForLogs () {
58+ // try {
59+ // return TelemetryConverter.getLogRecordData(singletonList(waitForContent("get-logs")));
60+ // } catch (InterruptedException e) {
61+ // throw new RuntimeException(e);
62+ // }
63+ // todo
64+ return Collections .emptyList ();
65+ }
66+
67+ @ SuppressWarnings ({"unchecked" , "rawtypes" })
68+ private <T extends GeneratedMessage , B extends GeneratedMessage .Builder >
69+ Collection <T > waitForTelemetry (String path , Supplier <B > builderConstructor ) {
4970 try {
50- return TelemetryConverter .getLogRecordData (
51- Collections .singletonList (waitForContent ("get-logs" )));
52- } catch (InterruptedException e ) {
71+ return OBJECT_MAPPER
72+ .readTree (waitForContent (path ))
73+ .valueStream ()
74+ .map (
75+ it -> {
76+ B builder = builderConstructor .get ();
77+ // TODO: Register parser into object mapper to avoid de -> re -> deserialize.
78+ try {
79+ JsonFormat .parser ().merge (OBJECT_MAPPER .writeValueAsString (it ), builder );
80+ return (T ) builder .build ();
81+ } catch (InvalidProtocolBufferException | JsonProcessingException e ) {
82+ throw new RuntimeException (e );
83+ }
84+ })
85+ .collect (Collectors .toList ());
86+ } catch (InterruptedException
87+ | io .opentelemetry .testing .internal .jackson .core .JsonProcessingException e ) {
5388 throw new RuntimeException (e );
5489 }
5590 }
5691
57- private byte [] waitForContent (String path ) throws InterruptedException {
92+ private String waitForContent (String path ) throws InterruptedException {
5893 long previousSize = 0 ;
5994 long deadline = System .currentTimeMillis () + TimeUnit .SECONDS .toMillis (30 );
60- byte [] content = "[]" . getBytes ( StandardCharsets . UTF_8 ) ;
95+ String content = "[]" ;
6196 while (System .currentTimeMillis () < deadline ) {
62- content = client .get (path ).aggregate ().join ().content (). array ();
63- if (content .length > 2 && content .length == previousSize ) {
97+ content = client .get (path ).aggregate ().join ().contentUtf8 ();
98+ if (content .length () > 2 && content .length () == previousSize ) {
6499 break ;
65100 }
66101
67- previousSize = content .length ;
102+ previousSize = content .length () ;
68103 System .out .println ("Current content size " + previousSize );
69104 TimeUnit .MILLISECONDS .sleep (500 );
70105 }
71106
72107 // todo remove debug
73- System .out .println (new String (content , StandardCharsets .UTF_8 ));
108+ // System.out.println(new String(content, StandardCharsets.UTF_8));
74109
75110 return content ;
76111 }
0 commit comments