5252import org .junit .jupiter .api .BeforeEach ;
5353import org .junit .jupiter .api .Test ;
5454import org .junit .jupiter .api .extension .RegisterExtension ;
55+ import org .slf4j .Logger ;
56+ import org .slf4j .LoggerFactory ;
5557import org .testcontainers .containers .FixedHostPortGenericContainer ;
5658import org .testcontainers .containers .GenericContainer ;
5759import org .testcontainers .containers .Network ;
6668@ SuppressWarnings ({"rawtypes" , "unchecked" , "deprecation" , "unused" })
6769class KafkaConnectSinkTaskTest {
6870
71+ private static final Logger logger = LoggerFactory .getLogger (KafkaConnectSinkTaskTest .class );
72+
6973 @ RegisterExtension
7074 static final InstrumentationExtension testing = AgentInstrumentationExtension .create ();
7175
@@ -230,7 +234,7 @@ public static void setup() {
230234 @ BeforeEach
231235 public void reset () throws InterruptedException {
232236 // Remove connector after each test
233- System . out . println ("Deleting connector [ " + CONNECTOR_NAME + " ] if exists" );
237+ logger . info ("Deleting connector [ {} ] if exists" , CONNECTOR_NAME );
234238 given ()
235239 .log ()
236240 .headers ()
@@ -245,23 +249,23 @@ public void reset() throws InterruptedException {
245249 // Add topic cleanup
246250 // try (AdminClient adminClient = createAdminClient()) {
247251 // adminClient.deleteTopics(Collections.singletonList(TOPIC_NAME)).all().get();
248- // System.out.println ("Deleted existing topic: " + TOPIC_NAME);
252+ // logger.info ("Deleted existing topic: " + TOPIC_NAME);
249253 // } catch (Exception e) {
250254 // if (e instanceof InterruptedException) {
251255 // Thread.currentThread().interrupt();
252256 // throw new RuntimeException("Failed to create topic: " + TOPIC_NAME, e);
253257 // } else if (e.getCause() instanceof org.apache.kafka.common.errors.TopicExistsException) {
254- // System.out.println ("Topic already exists: " + TOPIC_NAME);
258+ // logger.info ("Topic already exists: " + TOPIC_NAME);
255259 // } else {
256- // System.out.println ("Error creating topic: " + e.getMessage());
260+ // logger.info ("Error creating topic: " + e.getMessage());
257261 // throw new RuntimeException("Failed to create topic: " + TOPIC_NAME, e);
258262 // }
259263 // }
260264 }
261265
262266 @ Test
263267 public void testKafkaConnectSinkTaskInstrumentation () throws Exception {
264- System . out . println ("=== Starting Kafka Connect SinkTask instrumentation test ===" );
268+ logger . info ("=== Starting Kafka Connect SinkTask instrumentation test ===" );
265269
266270 // Create unique topic name first
267271 String uniqueTopicName = TOPIC_NAME + "-" + System .currentTimeMillis ();
@@ -270,34 +274,34 @@ public void testKafkaConnectSinkTaskInstrumentation() throws Exception {
270274 setupSinkConnector (uniqueTopicName );
271275
272276 // Create topic first
273- System . out . println ("Creating topic..." );
277+ logger . info ("Creating topic..." );
274278 createTopic (uniqueTopicName );
275- System . out . println ("Topic created" );
279+ logger . info ("Topic created" );
276280
277281 // Wait for topic to be available
278- System . out . println ("Awaiting topic creation..." );
282+ logger . info ("Awaiting topic creation..." );
279283 awaitForTopicCreation (uniqueTopicName );
280- System . out . println ("Topic creation complete" );
284+ logger . info ("Topic creation complete" );
281285
282286 // Produce messages to Kafka WITHOUT manual tracing
283287 // This ensures we only see SinkTask spans, not producer spans
284- System . out . println ("Producing messages without manual spans..." );
288+ logger . info ("Producing messages without manual spans..." );
285289 produceMessagesWithoutTracing (uniqueTopicName );
286- System . out . println ("Messages produced" );
290+ logger . info ("Messages produced" );
287291
288292 // Wait for Kafka Connect to process the messages
289- System . out . println ("Waiting for Kafka Connect to process messages..." );
293+ logger . info ("Waiting for Kafka Connect to process messages..." );
290294 await ()
291295 .atMost (Duration .ofSeconds (60 ))
292296 .pollInterval (Duration .ofSeconds (1 ))
293297 .until (
294298 () -> {
295299 try {
296300 int count = getRecordCountFromPostgres ();
297- System . out . println ("Current record count in PostgreSQL: " + count );
301+ logger . info ("Current record count in PostgreSQL: " + count );
298302 return count >= 2 ; // Expecting 2 messages
299303 } catch (Exception e ) {
300- System . out . println ("Error checking PostgreSQL: " + e .getMessage ());
304+ logger . info ("Error checking PostgreSQL: " + e .getMessage ());
301305 return false ;
302306 }
303307 });
@@ -314,29 +318,28 @@ public void testKafkaConnectSinkTaskInstrumentation() throws Exception {
314318 Thread .sleep (3000 );
315319
316320 // Debug: Print all traces to see what spans are created
317- System . out . println ("=== All Traces ===" );
321+ logger . info ("=== All Traces ===" );
318322 // Add this debugging
319- System . out . println ("=== Checking SinkTask instrumentation ===" );
323+ logger . info ("=== Checking SinkTask instrumentation ===" );
320324 try {
321325 Class .forName (
322326 "io.opentelemetry.javaagent.instrumentation.kafkaconnect.v2_6.SinkTaskInstrumentation" );
323- System . out . println ("✅ SinkTask instrumentation class found" );
327+ logger . info ("✅ SinkTask instrumentation class found" );
324328 } catch (ClassNotFoundException e ) {
325- System . out . println ("❌ SinkTask instrumentation class NOT found: " + e .getMessage ());
329+ logger . info ("❌ SinkTask instrumentation class NOT found: " + e .getMessage ());
326330 }
327331 // Use the actual number of traces found
328332 List <List <SpanData >> allTraces =
329333 testing .waitForTraces (2 ); // Wait for the 2 producer traces we know exist
330- System . out . println ("Found " + allTraces .size () + " traces" );
334+ logger . info ("Found " + allTraces .size () + " traces" );
331335
332336 allTraces .forEach (
333337 trace -> {
334- System . out . println ("Trace: " + trace .size () + " spans" );
338+ logger . info ("Trace: " + trace .size () + " spans" );
335339 trace .forEach (
336340 span -> {
337- System .out .println (" - " + span .getName () + " (" + span .getKind () + ")" );
338- System .out .println (
339- " Instrumentation: " + span .getInstrumentationScopeInfo ().getName ());
341+ logger .info (" - " + span .getName () + " (" + span .getKind () + ")" );
342+ logger .info (" Instrumentation: " + span .getInstrumentationScopeInfo ().getName ());
340343 });
341344 });
342345
@@ -351,7 +354,7 @@ public void testKafkaConnectSinkTaskInstrumentation() throws Exception {
351354 || span .getName ().contains ("KafkaConnect" ));
352355
353356 if (foundSinkTaskSpan ) {
354- System . out . println ("✅ Found SinkTask instrumentation spans!" );
357+ logger . info ("✅ Found SinkTask instrumentation spans!" );
355358
356359 // Verify SinkTask spans
357360 testing .waitAndAssertTraces (
@@ -365,11 +368,11 @@ public void testKafkaConnectSinkTaskInstrumentation() throws Exception {
365368 equalTo (MESSAGING_DESTINATION_NAME , uniqueTopicName ),
366369 equalTo (MESSAGING_OPERATION , "process" ))));
367370 } else {
368- System . out . println ("❌ No SinkTask instrumentation spans found" );
369- System . out . println ("This might indicate:" );
370- System . out . println ("1. SinkTask instrumentation is not working" );
371- System . out . println ("2. SinkTask spans are named differently" );
372- System . out . println ("3. SinkTask spans are in different traces" );
371+ logger . info ("❌ No SinkTask instrumentation spans found" );
372+ logger . info ("This might indicate:" );
373+ logger . info ("1. SinkTask instrumentation is not working" );
374+ logger . info ("2. SinkTask spans are named differently" );
375+ logger . info ("3. SinkTask spans are in different traces" );
373376
374377 // For now, just verify that the data processing worked
375378 // (even if we can't see the spans)
@@ -398,14 +401,14 @@ private static void produceMessagesWithoutTracing(String topicName) {
398401 "2" ,
399402 "{\" schema\" :{\" type\" :\" struct\" ,\" fields\" :[{\" field\" :\" id\" ,\" type\" :\" int32\" },{\" field\" :\" name\" ,\" type\" :\" string\" }]},\" payload\" :{\" id\" :2,\" name\" :\" Bob\" }}" ));
400403 producer .flush ();
401- System . out . println ("Produced 2 messages to Kafka topic: " + topicName );
404+ logger . info ("Produced 2 messages to Kafka topic: " + topicName );
402405 }
403406 }
404407
405408 // Alternative test that focuses specifically on SinkTask behavior
406409 @ Test
407410 public void testSinkTaskSpanCreation () throws Exception {
408- System . out . println ("=== Testing SinkTask span creation ===" );
411+ logger . info ("=== Testing SinkTask span creation ===" );
409412
410413 String uniqueTopicName = TOPIC_NAME + "-" + System .currentTimeMillis ();
411414 setupSinkConnector (uniqueTopicName );
@@ -436,17 +439,16 @@ public void testSinkTaskSpanCreation() throws Exception {
436439 // Collect all spans and look for SinkTask-related ones
437440 List <List <SpanData >> allTraces = testing .waitForTraces (3 );
438441
439- System . out . println ("=== Analyzing spans for SinkTask instrumentation ===" );
442+ logger . info ("=== Analyzing spans for SinkTask instrumentation ===" );
440443 allTraces .forEach (
441444 trace -> {
442445 trace .forEach (
443446 span -> {
444- System .out .println ("Span: " + span .getName ());
445- System .out .println (" Kind: " + span .getKind ());
446- System .out .println (
447- " Instrumentation: " + span .getInstrumentationScopeInfo ().getName ());
448- System .out .println (" Attributes: " + span .getAttributes ());
449- System .out .println ("---" );
447+ logger .info ("Span: " + span .getName ());
448+ logger .info (" Kind: " + span .getKind ());
449+ logger .info (" Instrumentation: " + span .getInstrumentationScopeInfo ().getName ());
450+ logger .info (" Attributes: " + span .getAttributes ());
451+ logger .info ("---" );
450452 });
451453 });
452454
@@ -465,11 +467,11 @@ public void testSinkTaskSpanCreation() throws Exception {
465467 .collect (Collectors .toList ());
466468
467469 if (!connectSpans .isEmpty ()) {
468- System . out . println ("✅ Found " + connectSpans .size () + " Kafka Connect related spans:" );
469- connectSpans .forEach (span -> System . out . println (" - " + span .getName ()));
470+ logger . info ("✅ Found " + connectSpans .size () + " Kafka Connect related spans:" );
471+ connectSpans .forEach (span -> logger . info (" - " + span .getName ()));
470472 } else {
471- System . out . println ("❌ No Kafka Connect spans found" );
472- System . out . println (
473+ logger . info ("❌ No Kafka Connect spans found" );
474+ logger . info (
473475 "Available span names: "
474476 + allSpans .stream ().map (SpanData ::getName ).collect (Collectors .toList ()));
475477 }
@@ -480,45 +482,45 @@ public void testSinkTaskSpanCreation() throws Exception {
480482
481483 @ Test
482484 public void testBasicProducerOnly () throws Exception {
483- System . out . println ("=== Starting basic producer test ===" );
485+ logger . info ("=== Starting basic producer test ===" );
484486
485487 // Create topic first
486- System . out . println ("Creating topic..." );
488+ logger . info ("Creating topic..." );
487489 String uniqueTopicName = TOPIC_NAME + "-" + System .currentTimeMillis ();
488490 createTopic (uniqueTopicName );
489- System . out . println ("Topic created" );
491+ logger . info ("Topic created" );
490492
491493 // Wait for topic to be available
492- System . out . println ("Awaiting topic creation..." );
494+ logger . info ("Awaiting topic creation..." );
493495 awaitForTopicCreation (uniqueTopicName );
494- System . out . println ("Topic creation complete" );
496+ logger . info ("Topic creation complete" );
495497
496498 // Just test the producer instrumentation
497- System . out . println ("Starting producer span..." );
499+ logger . info ("Starting producer span..." );
498500 testing .runWithSpan (
499501 "test-producer" ,
500502 () -> {
501- System . out . println ("Inside producer span, calling produceMessagesToKafka..." );
503+ logger . info ("Inside producer span, calling produceMessagesToKafka..." );
502504 produceMessagesToKafka ();
503- System . out . println ("produceMessagesToKafka complete" );
505+ logger . info ("produceMessagesToKafka complete" );
504506 });
505- System . out . println ("Producer span complete" );
507+ logger . info ("Producer span complete" );
506508
507509 // Wait a bit for traces to be processed
508510 Thread .sleep (2000 );
509511
510512 // Debug: Print all traces
511- // System.out.println ("=== All Traces ===");
513+ // logger.info ("=== All Traces ===");
512514 // List<List<SpanData>> allTraces = testing.waitForTraces(5);
513- // System.out.println ("Found " + allTraces.size() + " traces");
515+ // logger.info ("Found " + allTraces.size() + " traces");
514516 // allTraces.forEach(trace -> {
515- // System.out.println ("Trace: " + trace.size() + " spans");
517+ // logger.info ("Trace: " + trace.size() + " spans");
516518 // trace.forEach(span -> {
517- // System.out.println (" - " + span.getName() + " (" + span.getKind() + ")");
519+ // logger.info (" - " + span.getName() + " (" + span.getKind() + ")");
518520 // });
519521 // });
520522
521- System . out . println ("=== All Traces ===" );
523+ logger . info ("=== All Traces ===" );
522524 // Verify only the producer trace
523525 testing .waitAndAssertTraces (
524526 trace ->
@@ -569,9 +571,9 @@ private static void setupSinkConnector(String topicName) throws IOException {
569571 // Remove this problematic cleanup code:
570572 // try (AdminClient adminClient = createAdminClient()) {
571573 // adminClient.deleteTopics(Collections.singletonList(TOPIC_NAME)).all().get();
572- // System.out.println ("Deleted existing topic: " + TOPIC_NAME);
574+ // logger.info ("Deleted existing topic: " + TOPIC_NAME);
573575 // } catch (e instanceof InterruptedException) {
574- // System.out.println ("Topic cleanup: " + e.getMessage());
576+ // logger.info ("Topic cleanup: " + e.getMessage());
575577 // }
576578 }
577579
@@ -591,23 +593,23 @@ private static void produceMessagesToKafka() {
591593 "1" ,
592594 "{\" schema\" :{\" type\" :\" struct\" ,\" fields\" :[{\" field\" :\" id\" ,\" type\" :\" int32\" },{\" field\" :\" name\" ,\" type\" :\" string\" }]},\" payload\" :{\" id\" :1,\" name\" :\" Alice\" }}" ));
593595 producer .flush ();
594- System . out . println ("Produced 1 message to Kafka topic: " + TOPIC_NAME );
596+ logger . info ("Produced 1 message to Kafka topic: " + TOPIC_NAME );
595597 }
596598 }
597599
598600 private static void createTopic (String topicName ) {
599601 try (AdminClient adminClient = createAdminClient ()) {
600602 NewTopic newTopic = new NewTopic (topicName , 1 , (short ) 1 );
601603 adminClient .createTopics (Collections .singletonList (newTopic )).all ().get ();
602- System . out . println ("Created topic: " + topicName );
604+ logger . info ("Created topic: " + topicName );
603605 } catch (Exception e ) {
604606 if (e instanceof InterruptedException ) {
605607 Thread .currentThread ().interrupt ();
606608 throw new RuntimeException ("Failed to create topic: " + topicName , e );
607609 } else if (e .getCause () instanceof org .apache .kafka .common .errors .TopicExistsException ) {
608- System . out . println ("Topic already exists: " + topicName );
610+ logger . info ("Topic already exists: " + topicName );
609611 } else {
610- System . out . println ("Error creating topic: " + e .getMessage ());
612+ logger . info ("Error creating topic: " + e .getMessage ());
611613 throw new RuntimeException ("Failed to create topic: " + topicName , e );
612614 }
613615 }
@@ -660,7 +662,7 @@ private static int getRandomFreePort() {
660662 try (ServerSocket serverSocket = new ServerSocket (0 )) {
661663 return serverSocket .getLocalPort ();
662664 } catch (IOException e ) {
663- e . printStackTrace ( );
665+ logger . error ( "Failed to get random free port" , e );
664666 return 0 ;
665667 }
666668 }
0 commit comments