6060import org .apache .kafka .common .header .Header ;
6161import org .apache .kafka .common .header .Headers ;
6262import org .apache .kafka .common .header .internals .RecordHeader ;
63- import org .awaitility .Awaitility ;
6463import org .jspecify .annotations .Nullable ;
6564import org .junit .jupiter .api .Test ;
6665import reactor .core .publisher .Mono ;
111110 * @since 3.0
112111 */
113112@ SpringJUnitConfig
114- @ EmbeddedKafka (topics = { ObservationTests .OBSERVATION_TEST_1 , ObservationTests .OBSERVATION_TEST_2 ,
113+ @ EmbeddedKafka (topics = {ObservationTests .OBSERVATION_TEST_1 , ObservationTests .OBSERVATION_TEST_2 ,
115114 ObservationTests .OBSERVATION_TEST_3 , ObservationTests .OBSERVATION_TEST_4 , ObservationTests .OBSERVATION_REPLY ,
116115 ObservationTests .OBSERVATION_RUNTIME_EXCEPTION , ObservationTests .OBSERVATION_ERROR ,
117- ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1 )
116+ ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1 )
118117@ DirtiesContext
119118public class ObservationTests {
120119
@@ -145,11 +144,12 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
145144 @ Autowired KafkaListenerEndpointRegistry endpointRegistry , @ Autowired KafkaAdmin admin ,
146145 @ Autowired @ Qualifier ("customTemplate" ) KafkaTemplate <Integer , String > customTemplate ,
147146 @ Autowired Config config )
148- throws InterruptedException , ExecutionException , TimeoutException {
147+ throws InterruptedException , ExecutionException , TimeoutException {
149148
150149 AtomicReference <SimpleSpan > spanFromCallback = new AtomicReference <>();
151150
152151 template .setProducerInterceptor (new ProducerInterceptor <>() {
152+
153153 @ Override
154154 public ProducerRecord <Integer , String > onSend (ProducerRecord <Integer , String > record ) {
155155 tracer .currentSpanCustomizer ().tag ("key" , "value" );
@@ -337,10 +337,10 @@ private void assertThatTemplateHasTimerWithNameAndTags(MeterRegistryAssert meter
337337
338338 meterRegistryAssert .hasTimerWithNameAndTags ("spring.kafka.template" ,
339339 KeyValues .of ("spring.kafka.template.name" , "template" ,
340- "messaging.operation" , "publish" ,
341- "messaging.system" , "kafka" ,
342- "messaging.destination.kind" , "topic" ,
343- "messaging.destination.name" , destName )
340+ "messaging.operation" , "publish" ,
341+ "messaging.system" , "kafka" ,
342+ "messaging.destination.kind" , "topic" ,
343+ "messaging.destination.name" , destName )
344344 .and (keyValues ));
345345 }
346346
@@ -349,12 +349,12 @@ private void assertThatListenerHasTimerWithNameAndTags(MeterRegistryAssert meter
349349
350350 meterRegistryAssert .hasTimerWithNameAndTags ("spring.kafka.listener" ,
351351 KeyValues .of (
352- "messaging.kafka.consumer.group" , consumerGroup ,
353- "messaging.operation" , "receive" ,
354- "messaging.source.kind" , "topic" ,
355- "messaging.source.name" , destName ,
356- "messaging.system" , "kafka" ,
357- "spring.kafka.listener.id" , listenerId )
352+ "messaging.kafka.consumer.group" , consumerGroup ,
353+ "messaging.operation" , "receive" ,
354+ "messaging.source.kind" , "topic" ,
355+ "messaging.source.name" , destName ,
356+ "messaging.system" , "kafka" ,
357+ "spring.kafka.listener.id" , listenerId )
358358 .and (keyValues ));
359359 }
360360
@@ -404,7 +404,7 @@ void observationRuntimeException(@Autowired ExceptionListener listener, @Autowir
404404 void observationErrorException (@ Autowired ExceptionListener listener , @ Autowired SimpleTracer tracer ,
405405 @ Autowired @ Qualifier ("throwableTemplate" ) KafkaTemplate <Integer , String > errorTemplate ,
406406 @ Autowired KafkaListenerEndpointRegistry endpointRegistry )
407- throws ExecutionException , InterruptedException , TimeoutException {
407+ throws ExecutionException , InterruptedException , TimeoutException {
408408
409409 errorTemplate .send (OBSERVATION_ERROR , "testError" ).get (10 , TimeUnit .SECONDS );
410410 assertThat (listener .latch5 .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -495,6 +495,7 @@ void verifyTraceParentHeader(@Autowired KafkaTemplate<Integer, String> template,
495495 @ Autowired SimpleTracer tracer ) throws Exception {
496496 CompletableFuture <ProducerRecord <Integer , String >> producerRecordFuture = new CompletableFuture <>();
497497 template .setProducerListener (new ProducerListener <>() {
498+
498499 @ Override
499500 public void onSuccess (ProducerRecord <Integer , String > producerRecord , RecordMetadata recordMetadata ) {
500501 producerRecordFuture .complete (producerRecord );
@@ -525,14 +526,12 @@ public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMeta
525526 void testReplyingKafkaTemplateObservation (
526527 @ Autowired ReplyingKafkaTemplate <Integer , String , String > template ,
527528 @ Autowired ObservationRegistry observationRegistry ) {
528- AtomicReference <KafkaRecordReceiverContext > replyObservationContext = new AtomicReference <>();
529- template .sendAndReceive (new ProducerRecord <>(OBSERVATION_TEST_4 , "test" )).thenAccept (replyRecord -> {
530- Observation .Context observationContext = observationRegistry .getCurrentObservation ().getContext ();
531- assertThat (observationContext ).isInstanceOf (KafkaRecordReceiverContext .class );
532- replyObservationContext .set ((KafkaRecordReceiverContext ) observationContext );
533- });
534- Awaitility .await ().atMost (Duration .ofSeconds (60 )).until (() ->
535- replyObservationContext .get () != null && "spring.kafka.listener" .equals (replyObservationContext .get ().getName ()));
529+ assertThat (template .sendAndReceive (new ProducerRecord <>(OBSERVATION_TEST_4 , "test" ))
530+ // the current observation must be retrieved from the consumer thread of the reply
531+ .thenApply (replyRecord -> observationRegistry .getCurrentObservation ().getContext ()))
532+ .isCompletedWithValueMatchingWithin (observationContext ->
533+ observationContext instanceof KafkaRecordReceiverContext
534+ && "spring.kafka.listener" .equals (observationContext .getName ()), Duration .ofSeconds (30 ));
536535 }
537536
538537 @ Configuration
@@ -759,6 +758,7 @@ void listen3(ConsumerRecord<Integer, String> in) {
759758 public String replyListener (ConsumerRecord <Integer , String > in ) {
760759 return in .value ().toUpperCase ();
761760 }
761+
762762 }
763763
764764 public static class ExceptionListener {
0 commit comments