1919import com .mongodb .client .MongoCollection ;
2020import com .mongodb .client .MongoDatabase ;
2121import io .restassured .http .ContentType ;
22- import java .io .File ;
2322import java .io .IOException ;
2423import java .net .ServerSocket ;
2524import java .time .Duration ;
6160
6261@ Testcontainers
6362// Suppressing warnings for test dependencies and deprecated Testcontainers API
64- @ SuppressWarnings ({"rawtypes" , "unchecked" , " deprecation" , "unused " })
63+ @ SuppressWarnings ({"deprecation" })
6564class MongoKafkaConnectSinkTaskTest {
6665
6766 private static final Logger logger = LoggerFactory .getLogger (MongoKafkaConnectSinkTaskTest .class );
@@ -105,7 +104,6 @@ class MongoKafkaConnectSinkTaskTest {
105104 private static MongoDBContainer mongoDB ;
106105 private static int kafkaExposedPort ;
107106
108- private static AdminClient adminClient ;
109107
110108 // Static methods
111109
@@ -209,7 +207,6 @@ public static void setup() throws IOException {
209207 throw new IllegalStateException (
210208 "Agent path not found. Make sure the shadowJar task is configured correctly." );
211209 }
212- File agentFile = new File (agentPath );
213210
214211 kafkaConnect =
215212 new GenericContainer <>("confluentinc/cp-kafka-connect:" + CONFLUENT_VERSION )
@@ -679,14 +676,7 @@ private static void deleteConnectorIfExists() {
679676
680677 @ AfterAll
681678 public static void cleanup () {
682- // Close AdminClient first to release Kafka connections
683- if (adminClient != null ) {
684- try {
685- adminClient .close ();
686- } catch (RuntimeException e ) {
687- logger .error ("Error closing AdminClient: " + e .getMessage ());
688- }
689- }
679+ // AdminClient connections are managed locally in methods
690680
691681 // Stop all containers in reverse order of startup to ensure clean shutdown
692682 if (kafkaConnect != null ) {
@@ -747,7 +737,6 @@ private static TracingData deserializeAndExtractSpans(String tracesJson, String
747737 assertThat (rootNode .isArray ()).as ("Traces JSON should be an array" ).isTrue ();
748738
749739 // Extract all spans and organize by type
750- SpanInfo kafkaProducerSpan = null ;
751740 SpanInfo kafkaConnectConsumerSpan = null ;
752741 SpanInfo databaseSpan = null ;
753742 SpanLinkInfo extractedSpanLink = null ;
@@ -790,16 +779,11 @@ private static TracingData deserializeAndExtractSpans(String tracesJson, String
790779 String spanKind = spanNode .get ("kind" ) != null ? spanNode .get ("kind" ).asText () : "" ;
791780
792781 // Identify spans in our end-to-end flow
793- if (scopeName .contains ("kafka-clients" )
794- && spanName .contains (expectedTopicName )
795- && spanKind .equals ("SPAN_KIND_PRODUCER" )) {
796- kafkaProducerSpan =
797- new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
798- } else if (scopeName .contains (KAFKA_CONNECT_SCOPE )
782+ if (scopeName .contains (KAFKA_CONNECT_SCOPE )
799783 && spanName .contains (expectedTopicName )
800784 && spanKind .equals ("SPAN_KIND_CONSUMER" )) {
801785 kafkaConnectConsumerSpan =
802- new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
786+ new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind );
803787
804788 // Extract span link information for verification
805789 JsonNode linksArray = spanNode .get ("links" );
@@ -809,21 +793,20 @@ private static TracingData deserializeAndExtractSpans(String tracesJson, String
809793 firstLink .get ("traceId" ) != null ? firstLink .get ("traceId" ).asText () : "" ;
810794 String linkedSpanId =
811795 firstLink .get ("spanId" ) != null ? firstLink .get ("spanId" ).asText () : "" ;
812- int flags = firstLink .get ("flags" ) != null ? firstLink .get ("flags" ).asInt () : 0 ;
813796
814- extractedSpanLink = new SpanLinkInfo (linkedTraceId , linkedSpanId , flags );
797+ extractedSpanLink = new SpanLinkInfo (linkedTraceId , linkedSpanId );
815798 }
816799 } else if (scopeName .contains ("mongo" ) && spanName .contains ("testdb.person" )) {
817800 databaseSpan =
818- new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
801+ new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind );
819802 }
820803 }
821804 }
822805 }
823806 }
824807
825808 return new TracingData (
826- kafkaProducerSpan , kafkaConnectConsumerSpan , databaseSpan , extractedSpanLink );
809+ kafkaConnectConsumerSpan , databaseSpan , extractedSpanLink );
827810 }
828811
829812 /** Deserialize traces JSON and extract span information for multi-topic scenarios */
@@ -889,7 +872,7 @@ private static MultiTopicTracingData deserializeAndExtractMultiTopicSpans(
889872
890873 if (containsExpectedTopics ) {
891874 kafkaConnectConsumerSpan =
892- new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
875+ new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind );
893876
894877 // Extract span link information for verification
895878 JsonNode linksArray = spanNode .get ("links" );
@@ -899,14 +882,13 @@ private static MultiTopicTracingData deserializeAndExtractMultiTopicSpans(
899882 firstLink .get ("traceId" ) != null ? firstLink .get ("traceId" ).asText () : "" ;
900883 String linkedSpanId =
901884 firstLink .get ("spanId" ) != null ? firstLink .get ("spanId" ).asText () : "" ;
902- int flags = firstLink .get ("flags" ) != null ? firstLink .get ("flags" ).asInt () : 0 ;
903885
904- extractedSpanLink = new SpanLinkInfo (linkedTraceId , linkedSpanId , flags );
886+ extractedSpanLink = new SpanLinkInfo (linkedTraceId , linkedSpanId );
905887 }
906888 }
907889 } else if (scopeName .contains ("mongo" ) && spanName .contains ("testdb.person" )) {
908890 databaseSpan =
909- new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind , scopeName );
891+ new SpanInfo (spanName , traceId , spanId , parentSpanId , spanKind );
910892 }
911893 }
912894 }
@@ -932,17 +914,14 @@ private static class MultiTopicTracingData {
932914
933915 /** Helper class to hold all extracted tracing data */
934916 private static class TracingData {
935- final SpanInfo kafkaProducerSpan ;
936917 final SpanInfo kafkaConnectConsumerSpan ;
937918 final SpanInfo databaseSpan ;
938919 final SpanLinkInfo extractedSpanLink ;
939920
940921 TracingData (
941- SpanInfo kafkaProducerSpan ,
942922 SpanInfo kafkaConnectConsumerSpan ,
943923 SpanInfo databaseSpan ,
944924 SpanLinkInfo extractedSpanLink ) {
945- this .kafkaProducerSpan = kafkaProducerSpan ;
946925 this .kafkaConnectConsumerSpan = kafkaConnectConsumerSpan ;
947926 this .databaseSpan = databaseSpan ;
948927 this .extractedSpanLink = extractedSpanLink ;
@@ -953,12 +932,10 @@ private static class TracingData {
953932 private static class SpanLinkInfo {
954933 final String linkedTraceId ;
955934 final String linkedSpanId ;
956- final int flags ;
957935
958- SpanLinkInfo (String linkedTraceId , String linkedSpanId , int flags ) {
936+ SpanLinkInfo (String linkedTraceId , String linkedSpanId ) {
959937 this .linkedTraceId = linkedTraceId ;
960938 this .linkedSpanId = linkedSpanId ;
961- this .flags = flags ;
962939 }
963940 }
964941
@@ -969,21 +946,18 @@ private static class SpanInfo {
969946 final String spanId ;
970947 final String parentSpanId ;
971948 final String kind ;
972- final String scope ;
973949
974950 SpanInfo (
975951 String name ,
976952 String traceId ,
977953 String spanId ,
978954 String parentSpanId ,
979- String kind ,
980- String scope ) {
955+ String kind ) {
981956 this .name = name ;
982957 this .traceId = traceId ;
983958 this .spanId = spanId ;
984959 this .parentSpanId = parentSpanId ;
985960 this .kind = kind ;
986- this .scope = scope ;
987961 }
988962 }
989963}
0 commit comments