@@ -232,17 +232,16 @@ public void configure(Map<String, ?> configs) {
232232 MessageListenerContainer listenerContainer1 = rler .getListenerContainer ("obs1" );
233233 listenerContainer1 .stop ();
234234
235- template .send (OBSERVATION_TEST_1 , "test" )
236- .thenAccept ((sendResult ) -> spanFromCallback .set (tracer .currentSpan ()))
237- .get ( 10 , TimeUnit . SECONDS );
235+ assertThat ( template .send (OBSERVATION_TEST_1 , "test" )
236+ .thenAccept ((sendResult ) -> spanFromCallback .set (tracer .currentSpan ())))
237+ .succeedsWithin ( Duration . ofSeconds ( 20 ) );
238238
239239 Deque <SimpleSpan > spans = tracer .getSpans ();
240240 assertThat (spans ).hasSize (1 );
241241
242242 SimpleSpan templateSpan = spans .peek ();
243243 assertThat (templateSpan ).isNotNull ();
244- assertThat (templateSpan .getTags ()).containsAllEntriesOf (Map .of (
245- "key" , "value" ));
244+ assertThat (templateSpan .getTags ()).containsAllEntriesOf (Map .of ("key" , "value" ));
246245
247246 assertThat (spanFromCallback .get ()).isNotNull ();
248247 listenerContainer1 .start ();
@@ -352,7 +351,7 @@ private void assertThatTemplateSpanTags(Deque<SimpleSpan> spans, int tagSize, St
352351 "messaging.system" , "kafka" ,
353352 "messaging.destination.kind" , "topic" ,
354353 "messaging.destination.name" , destName ));
355- if (keyValues != null && keyValues .length > 0 ) {
354+ if (keyValues .length > 0 ) {
356355 Arrays .stream (keyValues ).forEach (entry -> assertThat (span .getTags ()).contains (entry ));
357356 }
358357 assertThat (span .getName ()).isEqualTo (destName + " send" );
@@ -382,7 +381,7 @@ private SimpleSpan assertThatListenerSpanTags(Deque<SimpleSpan> spans, int tagSi
382381 Map .entry ("messaging.source.kind" , "topic" ),
383382 Map .entry ("messaging.source.name" , sourceName ),
384383 Map .entry ("messaging.system" , "kafka" )));
385- if (keyValues != null && keyValues .length > 0 ) {
384+ if (keyValues .length > 0 ) {
386385 Arrays .stream (keyValues ).forEach (entry -> assertThat (span .getTags ()).contains (entry ));
387386 }
388387 assertThat (span .getName ()).isEqualTo (sourceName + " receive" );
@@ -479,7 +478,8 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
479478 }
480479
481480 @ Test
482- void observationErrorExceptionWhenCompletableFutureReturned (@ Autowired ExceptionListener listener , @ Autowired SimpleTracer tracer ,
481+ void observationErrorExceptionWhenCompletableFutureReturned (@ Autowired ExceptionListener listener ,
482+ @ Autowired SimpleTracer tracer ,
483483 @ Autowired @ Qualifier ("throwableTemplate" ) KafkaTemplate <Integer , String > errorTemplate ,
484484 @ Autowired KafkaListenerEndpointRegistry endpointRegistry )
485485 throws ExecutionException , InterruptedException , TimeoutException {
@@ -665,8 +665,11 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
665665 }
666666
667667 @ Bean
668- ReplyingKafkaTemplate <Integer , String , String > replyingKafkaTemplate (ProducerFactory <Integer , String > pf , ConcurrentKafkaListenerContainerFactory <Integer , String > containerFactory ) {
669- ReplyingKafkaTemplate <Integer , String , String > kafkaTemplate = new ReplyingKafkaTemplate <>(pf , containerFactory .createContainer (OBSERVATION_REPLY ));
668+ ReplyingKafkaTemplate <Integer , String , String > replyingKafkaTemplate (
669+ ProducerFactory <Integer , String > pf ,
670+ ConcurrentKafkaListenerContainerFactory <Integer , String > containerFactory ) {
671+
672+ var kafkaTemplate = new ReplyingKafkaTemplate <>(pf , containerFactory .createContainer (OBSERVATION_REPLY ));
670673 kafkaTemplate .setObservationEnabled (true );
671674 return kafkaTemplate ;
672675 }
@@ -734,7 +737,8 @@ ObservationRegistry observationRegistry(Tracer tracer, Propagator propagator, Me
734737 new PropagatingSenderTracingObservationHandler <>(tracer , propagator ),
735738 // This is responsible for creating a default span
736739 new DefaultTracingObservationHandler (tracer )))
737- .observationHandler (new TracingAwareMeterObservationHandler <>(new DefaultMeterObservationHandler (meterRegistry ), tracer ));
740+ .observationHandler (new TracingAwareMeterObservationHandler <>(
741+ new DefaultMeterObservationHandler (meterRegistry ), tracer ));
738742 return observationRegistry ;
739743 }
740744
@@ -803,6 +807,7 @@ AsyncFailureListener asyncFailureListener(SimpleTracer tracer) {
803807 public TaskScheduler taskExecutor () {
804808 return new ThreadPoolTaskScheduler ();
805809 }
810+
806811 }
807812
808813 public static class Listener {
@@ -932,6 +937,7 @@ void handleDlt(ConsumerRecord<Integer, String> record, Exception exception) {
932937 this .capturedSpanInDlt = this .tracer .currentSpan ();
933938 this .asyncFailureLatch .countDown ();
934939 }
940+
935941 }
936942
937943}
0 commit comments