1717package org .springframework .kafka .support .micrometer ;
1818
1919import java .nio .charset .StandardCharsets ;
20+ import java .time .Duration ;
2021import java .util .Arrays ;
2122import java .util .Deque ;
2223import java .util .List ;
5960import org .apache .kafka .common .header .Header ;
6061import org .apache .kafka .common .header .Headers ;
6162import org .apache .kafka .common .header .internals .RecordHeader ;
63+ import org .awaitility .Awaitility ;
6264import org .jspecify .annotations .Nullable ;
6365import org .junit .jupiter .api .Test ;
6466import reactor .core .publisher .Mono ;
8183import org .springframework .kafka .core .ProducerFactory ;
8284import org .springframework .kafka .listener .MessageListenerContainer ;
8385import org .springframework .kafka .listener .RecordInterceptor ;
86+ import org .springframework .kafka .requestreply .ReplyingKafkaTemplate ;
8487import org .springframework .kafka .support .ProducerListener ;
8588import org .springframework .kafka .support .micrometer .KafkaListenerObservation .DefaultKafkaListenerObservationConvention ;
8689import org .springframework .kafka .support .micrometer .KafkaTemplateObservation .DefaultKafkaTemplateObservationConvention ;
8790import org .springframework .kafka .test .EmbeddedKafkaBroker ;
8891import org .springframework .kafka .test .context .EmbeddedKafka ;
8992import org .springframework .kafka .test .utils .KafkaTestUtils ;
93+ import org .springframework .messaging .handler .annotation .SendTo ;
9094import org .springframework .test .annotation .DirtiesContext ;
9195import org .springframework .test .context .junit .jupiter .SpringJUnitConfig ;
9296import org .springframework .util .StringUtils ;
107111 */
108112@ SpringJUnitConfig
109113@ EmbeddedKafka (topics = { ObservationTests .OBSERVATION_TEST_1 , ObservationTests .OBSERVATION_TEST_2 ,
110- ObservationTests .OBSERVATION_TEST_3 , ObservationTests .OBSERVATION_RUNTIME_EXCEPTION ,
111- ObservationTests .OBSERVATION_ERROR , ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1 )
114+ ObservationTests .OBSERVATION_TEST_3 , ObservationTests .OBSERVATION_TEST_4 , ObservationTests .OBSERVATION_REPLY ,
115+ ObservationTests .OBSERVATION_RUNTIME_EXCEPTION , ObservationTests .OBSERVATION_ERROR ,
116+ ObservationTests .OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1 )
112117@ DirtiesContext
113118public class ObservationTests {
114119
@@ -118,6 +123,10 @@ public class ObservationTests {
118123
119124 public final static String OBSERVATION_TEST_3 = "observation.testT3" ;
120125
126+ public final static String OBSERVATION_TEST_4 = "observation.testT4" ;
127+
128+ public final static String OBSERVATION_REPLY = "observation.reply" ;
129+
121130 public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception" ;
122131
123132 public final static String OBSERVATION_ERROR = "observation.error.sync" ;
@@ -511,6 +520,20 @@ public void onSuccess(ProducerRecord<Integer, String> producerRecord, RecordMeta
511520 tracer .getSpans ().clear ();
512521 }
513522
523+ @ Test
524+ void testReplyingKafkaTemplateObservation (
525+ @ Autowired ReplyingKafkaTemplate <Integer , String , String > template ,
526+ @ Autowired ObservationRegistry observationRegistry ) {
527+ AtomicReference <KafkaRecordReceiverContext > replyObservationContext = new AtomicReference <>();
528+ template .sendAndReceive (new ProducerRecord <>(OBSERVATION_TEST_4 , "test" )).thenAccept (replyRecord -> {
529+ Observation .Context observationContext = observationRegistry .getCurrentObservation ().getContext ();
530+ assertThat (observationContext ).isInstanceOf (KafkaRecordReceiverContext .class );
531+ replyObservationContext .set ((KafkaRecordReceiverContext ) observationContext );
532+ });
533+ Awaitility .await ().atMost (Duration .ofSeconds (60 )).until (() ->
534+ replyObservationContext .get () != null && "spring.kafka.listener" .equals (replyObservationContext .get ().getName ()));
535+ }
536+
514537 @ Configuration
515538 @ EnableKafka
516539 public static class Config {
@@ -584,13 +607,22 @@ KafkaTemplate<Integer, String> reuseAdminBeanKafkaTemplate(
584607 return template ;
585608 }
586609
610+ @ Bean
611+ ReplyingKafkaTemplate <Integer , String , String > replyingKafkaTemplate (ProducerFactory <Integer , String > pf , ConcurrentKafkaListenerContainerFactory <Integer , String > containerFactory ) {
612+ ReplyingKafkaTemplate <Integer , String , String > kafkaTemplate = new ReplyingKafkaTemplate <>(pf , containerFactory .createContainer (OBSERVATION_REPLY ));
613+ kafkaTemplate .setObservationEnabled (true );
614+ return kafkaTemplate ;
615+ }
616+
587617 @ Bean
588618 ConcurrentKafkaListenerContainerFactory <Integer , String > kafkaListenerContainerFactory (
589- ConsumerFactory <Integer , String > cf , ObservationRegistry observationRegistry ) {
619+ ConsumerFactory <Integer , String > cf , ObservationRegistry observationRegistry ,
620+ KafkaTemplate <Integer , String > kafkaTemplate ) {
590621
591622 ConcurrentKafkaListenerContainerFactory <Integer , String > factory =
592623 new ConcurrentKafkaListenerContainerFactory <>();
593624 factory .setConsumerFactory (cf );
625+ factory .setReplyTemplate (kafkaTemplate );
594626 factory .getContainerProperties ().setObservationEnabled (true );
595627 factory .setContainerCustomizer (container -> {
596628 if (container .getListenerId ().equals ("obs3" )) {
@@ -721,6 +753,11 @@ void listen2(ConsumerRecord<?, ?> in) {
721753 void listen3 (ConsumerRecord <Integer , String > in ) {
722754 }
723755
756+ @ KafkaListener (id = "obsReply" , topics = OBSERVATION_TEST_4 )
757+ @ SendTo // default REPLY_TOPIC header
758+ public String replyListener (ConsumerRecord <Integer , String > in ) {
759+ return in .value ().toUpperCase ();
760+ }
724761 }
725762
726763 public static class ExceptionListener {
0 commit comments