@@ -372,7 +372,8 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
372372 // Write traces JSON to file for inspection
373373 writeTracesToFile (tracesJson , "mongo-traces.json" );
374374
375- // Use structured deserialization approach with JsonFormat and add distributed tracing assertions
375+ // Use structured deserialization approach with JsonFormat and add distributed tracing
376+ // assertions
376377 try {
377378 verifyDistributedTracingFlow (tracesJson , uniqueTopicName );
378379 } catch (Exception e ) {
@@ -561,53 +562,54 @@ public static void cleanup() {
561562 logger .info ("Test cleanup complete" );
562563 }
563564
564- /**
565- * Writes traces JSON to a file for inspection and debugging.
566- */
565+ /** Writes traces JSON to a file for inspection and debugging. */
567566 private static void writeTracesToFile (String tracesJson , String filename ) {
568567 try {
569568 File outputDir = new File (System .getProperty ("user.home" ) + "/Desktop/kafka-connect-logs" );
570569 if (!outputDir .exists ()) {
571570 outputDir .mkdirs ();
572571 }
573-
572+
574573 File tracesFile = new File (outputDir , filename );
575-
574+
576575 // Pretty print the JSON for better readability
577576 ObjectMapper objectMapper = new ObjectMapper ();
578577 objectMapper .enable (SerializationFeature .INDENT_OUTPUT );
579578 JsonNode jsonNode = objectMapper .readTree (tracesJson );
580579 String prettyJson = objectMapper .writeValueAsString (jsonNode );
581-
582- try (java .io .Writer writer = java .nio .file .Files .newBufferedWriter (tracesFile .toPath (), java .nio .charset .StandardCharsets .UTF_8 )) {
580+
581+ try (java .io .Writer writer =
582+ java .nio .file .Files .newBufferedWriter (
583+ tracesFile .toPath (), java .nio .charset .StandardCharsets .UTF_8 )) {
583584 writer .write (prettyJson );
584585 }
585-
586+
586587 logger .info ("📄 Traces JSON written to: {}" , tracesFile .getAbsolutePath ());
587588 } catch (Exception e ) {
588589 logger .warn ("Failed to write traces to file: {}" , e .getMessage ());
589590 }
590591 }
591592
592593 /**
593- * Simple approach - just parse JSON and print the trace info you want to see!
594- * Since the JSON is already in resourceSpans format, we can extract what we need directly
594+ * Simple approach - just parse JSON and print the trace info you want to see! Since the JSON is
595+ * already in resourceSpans format, we can extract what we need directly
595596 */
596- private static void deserializeOtlpDataFromJson (String tracesJson , String expectedTopicName ) throws Exception {
597+ private static void deserializeOtlpDataFromJson (String tracesJson , String expectedTopicName )
598+ throws Exception {
597599 logger .info ("🔍 Parsing JSON traces to extract span information" );
598-
600+
599601 ObjectMapper objectMapper = new ObjectMapper ();
600602 JsonNode tracesArray = objectMapper .readTree (tracesJson );
601-
603+
602604 // Process each trace in the JSON array
603605 for (JsonNode traceNode : tracesArray ) {
604606 JsonNode resourceSpansArray = traceNode .get ("resourceSpans" );
605607 if (resourceSpansArray != null && resourceSpansArray .isArray ()) {
606-
608+
607609 // Process each ResourceSpans
608610 for (JsonNode resourceSpansNode : resourceSpansArray ) {
609611 logger .info ("Processing Resource with attributes:" );
610-
612+
611613 JsonNode resourceNode = resourceSpansNode .get ("resource" );
612614 if (resourceNode != null ) {
613615 JsonNode attributes = resourceNode .get ("attributes" );
@@ -619,31 +621,31 @@ private static void deserializeOtlpDataFromJson(String tracesJson, String expect
619621 }
620622 }
621623 }
622-
624+
623625 JsonNode scopeSpansArray = resourceSpansNode .get ("scopeSpans" );
624626 if (scopeSpansArray != null && scopeSpansArray .isArray ()) {
625-
627+
626628 // Process each ScopeSpans
627629 for (JsonNode scopeSpansNode : scopeSpansArray ) {
628630 JsonNode scopeNode = scopeSpansNode .get ("scope" );
629631 String scopeName = scopeNode != null ? scopeNode .path ("name" ).asText () : "unknown" ;
630632 logger .info ("Scope name: {}" , scopeName );
631-
633+
632634 JsonNode spansArray = scopeSpansNode .get ("spans" );
633635 if (spansArray != null && spansArray .isArray ()) {
634-
636+
635637 // Process each Span
636638 for (JsonNode spanNode : spansArray ) {
637639 String spanName = spanNode .path ("name" ).asText ();
638640 String spanId = spanNode .path ("spanId" ).asText ();
639641 String traceId = spanNode .path ("traceId" ).asText ();
640642 String kind = spanNode .path ("kind" ).asText ();
641-
643+
642644 logger .info (" Span: " + spanName );
643645 logger .info (" Span ID: " + spanId );
644646 logger .info (" Trace ID: " + traceId );
645647 logger .info (" Kind: " + kind );
646-
648+
647649 // Print span attributes
648650 JsonNode spanAttributes = spanNode .path ("attributes" );
649651 if (spanAttributes .isArray ()) {
@@ -661,10 +663,8 @@ private static void deserializeOtlpDataFromJson(String tracesJson, String expect
661663 }
662664 }
663665 }
664-
665- /**
666- * Helper to extract attribute values from different types
667- */
666+
667+ /** Helper to extract attribute values from different types */
668668 private static String extractAttributeValue (JsonNode valueNode ) {
669669 if (valueNode .has ("stringValue" )) {
670670 return valueNode .path ("stringValue" ).asText ();
@@ -677,14 +677,16 @@ private static String extractAttributeValue(JsonNode valueNode) {
677677 }
678678 return valueNode .toString ();
679679 }
680-
680+
681681 /**
682- * Your exact deserializeOtlpData method using TracesData.parseFrom(otlpBytes)!
683- * Just prints trace info, no assertions
682+ * Your exact deserializeOtlpData method using TracesData.parseFrom(otlpBytes)! Just prints trace
683+ * info, no assertions
684684 */
685- private static void verifyDistributedTracingFlow (String tracesJson , String expectedTopicName ) throws IOException {
686- logger .info ("🔍 Verifying Kafka Connect distributed tracing flow for topic: {}" , expectedTopicName );
687-
685+ private static void verifyDistributedTracingFlow (String tracesJson , String expectedTopicName )
686+ throws IOException {
687+ logger .info (
688+ "🔍 Verifying Kafka Connect distributed tracing flow for topic: {}" , expectedTopicName );
689+
688690 ObjectMapper objectMapper = new ObjectMapper ();
689691 JsonNode rootNode = objectMapper .readTree (tracesJson );
690692
@@ -714,8 +716,10 @@ private static void verifyDistributedTracingFlow(String tracesJson, String expec
714716
715717 for (JsonNode scopeSpansNode : scopeSpansArray ) {
716718 JsonNode scopeNode = scopeSpansNode .get ("scope" );
717- String scopeName = scopeNode != null && scopeNode .get ("name" ) != null ?
718- scopeNode .get ("name" ).asText () : "" ;
719+ String scopeName =
720+ scopeNode != null && scopeNode .get ("name" ) != null
721+ ? scopeNode .get ("name" ).asText ()
722+ : "" ;
719723
720724 JsonNode spansArray = scopeSpansNode .get ("spans" );
721725 if (spansArray == null || !spansArray .isArray ()) {
@@ -724,31 +728,40 @@ private static void verifyDistributedTracingFlow(String tracesJson, String expec
724728
725729 for (JsonNode spanNode : spansArray ) {
726730 String spanName = spanNode .get ("name" ) != null ? spanNode .get ("name" ).asText () : "" ;
727- String traceId = spanNode .get ("traceId" ) != null ? spanNode .get ("traceId" ).asText () : "" ;
731+ String traceId =
732+ spanNode .get ("traceId" ) != null ? spanNode .get ("traceId" ).asText () : "" ;
728733 String spanId = spanNode .get ("spanId" ) != null ? spanNode .get ("spanId" ).asText () : "" ;
729- String parentSpanId = spanNode .get ("parentSpanId" ) != null && !spanNode .get ("parentSpanId" ).asText ().isEmpty () ?
730- spanNode .get ("parentSpanId" ).asText () : null ;
734+ String parentSpanId =
735+ spanNode .get ("parentSpanId" ) != null
736+ && !spanNode .get ("parentSpanId" ).asText ().isEmpty ()
737+ ? spanNode .get ("parentSpanId" ).asText ()
738+ : null ;
731739 String spanKind = spanNode .get ("kind" ) != null ? spanNode .get ("kind" ).asText () : "" ;
732-
740+
733741 // Identify spans in our Kafka Connect flow
734- if (scopeName .contains ("kafka-connect" ) && spanName .contains (expectedTopicName ) &&
735- spanKind .equals ("SPAN_KIND_CONSUMER" )) {
736- kafkaConnectConsumerSpan = new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
742+ if (scopeName .contains ("kafka-connect" )
743+ && spanName .contains (expectedTopicName )
744+ && spanKind .equals ("SPAN_KIND_CONSUMER" )) {
745+ kafkaConnectConsumerSpan =
746+ new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
737747 logger .info ("✅ Found Kafka Connect Consumer Span: {} ({})" , spanName , spanId );
738-
748+
739749 // Check for span links (indicating producer -> consumer linking)
740750 JsonNode linksArray = spanNode .get ("links" );
741751 if (linksArray != null && linksArray .isArray () && linksArray .size () > 0 ) {
742752 hasSpanLinks = true ;
743753 logger .info ("✅ Kafka Connect span has {} span links" , linksArray .size ());
744754 for (JsonNode linkNode : linksArray ) {
745- String linkedTraceId = linkNode .get ("traceId" ) != null ? linkNode .get ("traceId" ).asText () : "" ;
746- String linkedSpanId = linkNode .get ("spanId" ) != null ? linkNode .get ("spanId" ).asText () : "" ;
755+ String linkedTraceId =
756+ linkNode .get ("traceId" ) != null ? linkNode .get ("traceId" ).asText () : "" ;
757+ String linkedSpanId =
758+ linkNode .get ("spanId" ) != null ? linkNode .get ("spanId" ).asText () : "" ;
747759 logger .info (" 🔗 Linked to: Trace {} Span {}" , linkedTraceId , linkedSpanId );
748760 }
749761 }
750762 } else if (scopeName .contains ("mongo" ) && spanName .contains ("testdb.person" )) {
751- databaseSpan = new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
763+ databaseSpan =
764+ new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
752765 logger .info ("✅ Found Database Span: {} ({})" , spanName , spanId );
753766 }
754767 }
@@ -761,40 +774,59 @@ private static void verifyDistributedTracingFlow(String tracesJson, String expec
761774
762775 // Check that we found the required spans
763776 if (kafkaConnectConsumerSpan == null ) {
764- throw new AssertionError ("❌ Kafka Connect Consumer span not found for topic: " + expectedTopicName );
777+ throw new AssertionError (
778+ "❌ Kafka Connect Consumer span not found for topic: " + expectedTopicName );
765779 }
766780 if (databaseSpan == null ) {
767- throw new AssertionError ("❌ Database span not found" );
781+ throw new AssertionError ("❌ Database span not found" );
768782 }
769783
770784 // Assertion 1: Same Trace ID between Kafka Connect Consumer and Database spans
771785 if (!kafkaConnectConsumerSpan .traceId .equals (databaseSpan .traceId )) {
772- throw new AssertionError ("❌ Trace ID mismatch between Kafka Connect Consumer (" + kafkaConnectConsumerSpan .traceId +
773- ") and Database (" + databaseSpan .traceId + ")" );
786+ throw new AssertionError (
787+ "❌ Trace ID mismatch between Kafka Connect Consumer ("
788+ + kafkaConnectConsumerSpan .traceId
789+ + ") and Database ("
790+ + databaseSpan .traceId
791+ + ")" );
774792 }
775- logger .info ("✅ ASSERTION 1 PASSED: Kafka Connect and Database spans share the same trace ID: {}" , kafkaConnectConsumerSpan .traceId );
793+ logger .info (
794+ "✅ ASSERTION 1 PASSED: Kafka Connect and Database spans share the same trace ID: {}" ,
795+ kafkaConnectConsumerSpan .traceId );
776796
777- // Assertion 2: Span linking - Kafka Connect span should have span links if trace context was propagated
797+ // Assertion 2: Span linking - Kafka Connect span should have span links if trace context was
798+ // propagated
778799 if (!hasSpanLinks ) {
779- logger .info ("ℹ️ ASSERTION 2 INFO: Kafka Connect span has no span links (expected since test JVM producer is not instrumented)" );
800+ logger .info (
801+ "ℹ️ ASSERTION 2 INFO: Kafka Connect span has no span links (expected since test JVM producer is not instrumented)" );
780802 } else {
781- logger .info ("✅ ASSERTION 2 PASSED: Kafka Connect span has span links, indicating proper trace context propagation and KafkaConnectBatchProcessSpanLinksExtractor is working" );
803+ logger .info (
804+ "✅ ASSERTION 2 PASSED: Kafka Connect span has span links, indicating proper trace context propagation and KafkaConnectBatchProcessSpanLinksExtractor is working" );
782805 }
783806
784807 // Assertion 3: Parent-child relationship between Kafka Connect and Database
785808 if (databaseSpan .parentSpanId == null ) {
786809 throw new AssertionError ("❌ Database span has no parent span ID" );
787810 }
788811 if (!databaseSpan .parentSpanId .equals (kafkaConnectConsumerSpan .spanId )) {
789- throw new AssertionError ("❌ Database span parent (" + databaseSpan .parentSpanId +
790- ") does not match Kafka Connect Consumer span (" + kafkaConnectConsumerSpan .spanId + ")" );
812+ throw new AssertionError (
813+ "❌ Database span parent ("
814+ + databaseSpan .parentSpanId
815+ + ") does not match Kafka Connect Consumer span ("
816+ + kafkaConnectConsumerSpan .spanId
817+ + ")" );
791818 }
792819 logger .info ("✅ ASSERTION 3 PASSED: Database span is child of Kafka Connect Consumer span" );
793820
794821 logger .info ("🎉 KAFKA CONNECT DISTRIBUTED TRACING ASSERTIONS COMPLETED!" );
795822 logger .info ("📊 Flow Summary:" );
796- logger .info (" Consumer: {} [{}] {}" , kafkaConnectConsumerSpan .name , kafkaConnectConsumerSpan .spanId , hasSpanLinks ? "(has span links)" : "(no span links)" );
797- logger .info (" Database: {} [{}] (child of consumer)" , databaseSpan .name , databaseSpan .spanId );
823+ logger .info (
824+ " Consumer: {} [{}] {}" ,
825+ kafkaConnectConsumerSpan .name ,
826+ kafkaConnectConsumerSpan .spanId ,
827+ hasSpanLinks ? "(has span links)" : "(no span links)" );
828+ logger .info (
829+ " Database: {} [{}] (child of consumer)" , databaseSpan .name , databaseSpan .spanId );
798830 logger .info (" Trace ID: {}" , kafkaConnectConsumerSpan .traceId );
799831 }
800832
@@ -807,7 +839,13 @@ private static class SpanInfo {
807839 final String kind ;
808840 final String scope ;
809841
810- SpanInfo (String name , String traceId , String spanId , String parentSpanId , String kind , String scope ) {
842+ SpanInfo (
843+ String name ,
844+ String traceId ,
845+ String spanId ,
846+ String parentSpanId ,
847+ String kind ,
848+ String scope ) {
811849 this .name = name ;
812850 this .traceId = traceId ;
813851 this .spanId = spanId ;
@@ -816,7 +854,4 @@ private static class SpanInfo {
816854 this .scope = scope ;
817855 }
818856 }
819-
820-
821-
822857}
0 commit comments