@@ -177,19 +177,18 @@ public static void setup() throws IOException {
177177 backend .start ();
178178
179179 // Initialize TelemetryRetriever after backend starts with custom error handling
180- telemetryRetriever = new TelemetryRetriever (
181- backend .getMappedPort (BACKEND_PORT ),
182- Duration .ofMinutes (2 )) {
183- @ Override
184- public void clearTelemetry () {
185- try {
186- super .clearTelemetry ();
187- } catch (RuntimeException e ) {
188- // Ignore cleanup errors - backend might already be stopped
189- logger .debug ("Failed to clear telemetry: {}" , e .getMessage ());
190- }
191- }
192- };
180+ telemetryRetriever =
181+ new TelemetryRetriever (backend .getMappedPort (BACKEND_PORT ), Duration .ofMinutes (2 )) {
182+ @ Override
183+ public void clearTelemetry () {
184+ try {
185+ super .clearTelemetry ();
186+ } catch (RuntimeException e ) {
187+ // Ignore cleanup errors - backend might already be stopped
188+ logger .debug ("Failed to clear telemetry: {}" , e .getMessage ());
189+ }
190+ }
191+ };
193192
194193 zookeeper =
195194 new GenericContainer <>("confluentinc/cp-zookeeper:" + CONFLUENT_VERSION )
@@ -327,7 +326,6 @@ public void reset() {
327326 clearTelemetryGracefully ();
328327 }
329328
330-
331329 @ Test
332330 public void testKafkaConnectMongoSinkTaskInstrumentation ()
333331 throws IOException , InterruptedException {
@@ -358,48 +356,70 @@ public void testKafkaConnectMongoSinkTaskInstrumentation()
358356
359357 // Use SmokeTestInstrumentationExtension's testing framework to wait for and assert traces
360358 // Wait for traces and then find the specific trace we want
361- await ().atMost (Duration .ofSeconds (30 )).untilAsserted (() -> {
362- List <List <SpanData >> traces = testing .waitForTraces (1 );
363-
364- // Find the trace that contains our Kafka Connect Consumer span and database INSERT span
365- List <SpanData > targetTrace = traces .stream ()
366- .filter (trace -> {
367- boolean hasKafkaConnectSpan = trace .stream ()
368- .anyMatch (span -> span .getName ().contains (uniqueTopicName ) &&
369- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER );
370-
371- boolean hasInsertSpan = trace .stream ()
372- .anyMatch (span -> span .getName ().contains ("update" ) &&
373- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT );
374-
375- return hasKafkaConnectSpan && hasInsertSpan ;
376- })
377- .findFirst ()
378- .orElse (null );
379-
380- // Assert that we found the target trace
381- assertThat (targetTrace ).isNotNull ();
382-
383- // Assert on the spans in the target trace (should have at least 2 spans: Kafka Connect Consumer + database operations)
384- assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
385-
386- // Find and assert the Kafka Connect Consumer span
387- SpanData kafkaConnectSpan = targetTrace .stream ()
388- .filter (span -> span .getName ().contains (uniqueTopicName ) &&
389- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
390- .findFirst ()
391- .orElse (null );
392- assertThat (kafkaConnectSpan ).isNotNull ();
393- assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
394-
395- // Find and assert the database UPDATE span
396- SpanData insertSpan = targetTrace .stream ()
397- .filter (span -> span .getName ().contains ("update" ) &&
398- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
399- .findFirst ()
400- .orElse (null );
401- assertThat (insertSpan ).isNotNull ();
402- });
359+ await ()
360+ .atMost (Duration .ofSeconds (30 ))
361+ .untilAsserted (
362+ () -> {
363+ List <List <SpanData >> traces = testing .waitForTraces (1 );
364+
365+ // Find the trace that contains our Kafka Connect Consumer span and database INSERT
366+ // span
367+ List <SpanData > targetTrace =
368+ traces .stream ()
369+ .filter (
370+ trace -> {
371+ boolean hasKafkaConnectSpan =
372+ trace .stream ()
373+ .anyMatch (
374+ span ->
375+ span .getName ().contains (uniqueTopicName )
376+ && span .getKind ()
377+ == io .opentelemetry .api .trace .SpanKind
378+ .CONSUMER );
379+
380+ boolean hasInsertSpan =
381+ trace .stream ()
382+ .anyMatch (
383+ span ->
384+ span .getName ().contains ("update" )
385+ && span .getKind ()
386+ == io .opentelemetry .api .trace .SpanKind .CLIENT );
387+
388+ return hasKafkaConnectSpan && hasInsertSpan ;
389+ })
390+ .findFirst ()
391+ .orElse (null );
392+
393+ // Assert that we found the target trace
394+ assertThat (targetTrace ).isNotNull ();
395+
396+ // Assert on the spans in the target trace (should have at least 2 spans: Kafka
397+ // Connect Consumer + database operations)
398+ assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
399+
400+ // Find and assert the Kafka Connect Consumer span
401+ SpanData kafkaConnectSpan =
402+ targetTrace .stream ()
403+ .filter (
404+ span ->
405+ span .getName ().contains (uniqueTopicName )
406+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
407+ .findFirst ()
408+ .orElse (null );
409+ assertThat (kafkaConnectSpan ).isNotNull ();
410+ assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
411+
412+ // Find and assert the database UPDATE span
413+ SpanData insertSpan =
414+ targetTrace .stream ()
415+ .filter (
416+ span ->
417+ span .getName ().contains ("update" )
418+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
419+ .findFirst ()
420+ .orElse (null );
421+ assertThat (insertSpan ).isNotNull ();
422+ });
403423 }
404424
405425 @ Test
@@ -446,52 +466,74 @@ public void testKafkaConnectMongoSinkTaskMultiTopicInstrumentation()
446466
447467 // Use SmokeTestInstrumentationExtension's testing framework to wait for and assert traces
448468 // Wait for traces and then find the specific trace we want
449- await ().atMost (Duration .ofSeconds (30 )).untilAsserted (() -> {
450- List <List <SpanData >> traces = testing .waitForTraces (1 );
451-
452- // Find the trace that contains our Kafka Connect Consumer span and database INSERT span
453- List <SpanData > targetTrace = traces .stream ()
454- .filter (trace -> {
455- boolean hasKafkaConnectSpan = trace .stream ()
456- .anyMatch (span -> (span .getName ().contains (topicName1 ) ||
457- span .getName ().contains (topicName2 ) ||
458- span .getName ().contains (topicName3 )) &&
459- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER );
460-
461- boolean hasInsertSpan = trace .stream ()
462- .anyMatch (span -> span .getName ().contains ("update" ) &&
463- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT );
464-
465- return hasKafkaConnectSpan && hasInsertSpan ;
466- })
467- .findFirst ()
468- .orElse (null );
469-
470- // Assert that we found the target trace
471- assertThat (targetTrace ).isNotNull ();
472-
473- // Assert on the spans in the target trace (should have at least 2 spans: Kafka Connect Consumer + database operations)
474- assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
475-
476- // Find and assert the Kafka Connect Consumer span (multi-topic span)
477- SpanData kafkaConnectSpan = targetTrace .stream ()
478- .filter (span -> (span .getName ().contains (topicName1 ) ||
479- span .getName ().contains (topicName2 ) ||
480- span .getName ().contains (topicName3 )) &&
481- span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
482- .findFirst ()
483- .orElse (null );
484- assertThat (kafkaConnectSpan ).isNotNull ();
485- assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
486-
487- // Find and assert the database UPDATE span
488- SpanData insertSpan = targetTrace .stream ()
489- .filter (span -> span .getName ().contains ("update" ) &&
490- span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
491- .findFirst ()
492- .orElse (null );
493- assertThat (insertSpan ).isNotNull ();
494- });
469+ await ()
470+ .atMost (Duration .ofSeconds (30 ))
471+ .untilAsserted (
472+ () -> {
473+ List <List <SpanData >> traces = testing .waitForTraces (1 );
474+
475+ // Find the trace that contains our Kafka Connect Consumer span and database INSERT
476+ // span
477+ List <SpanData > targetTrace =
478+ traces .stream ()
479+ .filter (
480+ trace -> {
481+ boolean hasKafkaConnectSpan =
482+ trace .stream ()
483+ .anyMatch (
484+ span ->
485+ (span .getName ().contains (topicName1 )
486+ || span .getName ().contains (topicName2 )
487+ || span .getName ().contains (topicName3 ))
488+ && span .getKind ()
489+ == io .opentelemetry .api .trace .SpanKind
490+ .CONSUMER );
491+
492+ boolean hasInsertSpan =
493+ trace .stream ()
494+ .anyMatch (
495+ span ->
496+ span .getName ().contains ("update" )
497+ && span .getKind ()
498+ == io .opentelemetry .api .trace .SpanKind .CLIENT );
499+
500+ return hasKafkaConnectSpan && hasInsertSpan ;
501+ })
502+ .findFirst ()
503+ .orElse (null );
504+
505+ // Assert that we found the target trace
506+ assertThat (targetTrace ).isNotNull ();
507+
508+ // Assert on the spans in the target trace (should have at least 2 spans: Kafka
509+ // Connect Consumer + database operations)
510+ assertThat (targetTrace ).hasSizeGreaterThanOrEqualTo (2 );
511+
512+ // Find and assert the Kafka Connect Consumer span (multi-topic span)
513+ SpanData kafkaConnectSpan =
514+ targetTrace .stream ()
515+ .filter (
516+ span ->
517+ (span .getName ().contains (topicName1 )
518+ || span .getName ().contains (topicName2 )
519+ || span .getName ().contains (topicName3 ))
520+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CONSUMER )
521+ .findFirst ()
522+ .orElse (null );
523+ assertThat (kafkaConnectSpan ).isNotNull ();
524+ assertThat (kafkaConnectSpan .getParentSpanContext ().isValid ()).isFalse (); // No parent
525+
526+ // Find and assert the database UPDATE span
527+ SpanData insertSpan =
528+ targetTrace .stream ()
529+ .filter (
530+ span ->
531+ span .getName ().contains ("update" )
532+ && span .getKind () == io .opentelemetry .api .trace .SpanKind .CLIENT )
533+ .findFirst ()
534+ .orElse (null );
535+ assertThat (insertSpan ).isNotNull ();
536+ });
495537 }
496538
497539 // Private methods
@@ -695,6 +737,4 @@ public static void cleanup() {
695737 }
696738 }
697739 }
698-
699-
700740}
0 commit comments