From 0531648d44fad5a51fe57d52b8053772de4d77df Mon Sep 17 00:00:00 2001 From: sullis Date: Wed, 16 Apr 2025 08:33:27 -0700 Subject: [PATCH] Add integration test for @Headers annotation Signed-off-by: sullis --- .../EnableKafkaIntegrationTests.java | 75 ++++++++++++++++++- 1 file changed, 74 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java index 36f1cc6c45..552cc500aa 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java @@ -63,6 +63,7 @@ import org.apache.kafka.common.errors.TopicExistsException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.awaitility.Awaitility; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; @@ -141,6 +142,7 @@ import org.springframework.messaging.converter.AbstractMessageConverter; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException; @@ -181,6 +183,7 @@ * @author Soby Chacko * @author Wang Zhiyang * @author Borahm Lee + * @author Sean Sullivan */ @SpringJUnitConfig @DirtiesContext @@ -196,7 +199,7 @@ "annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33", "annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle", "annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42", - "annotated43", "annotated43reply", "seekToComputeFn"}) + "annotated43", "annotated43reply", "seekToComputeFn", "headerMapTopic"}) @TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10") public class EnableKafkaIntegrationTests { @@ -242,6 +245,9 @@ public class EnableKafkaIntegrationTests { @Autowired public MultiJsonListenerBean multiJsonListener; + @Autowired + public HeaderMapListenerBean headerMapListener; + @Autowired public MultiListenerNoDefault multiNoDefault; @@ -584,6 +590,50 @@ public void testMultiJson() throws Exception { assertThat(this.multiJsonListener.validated.valCount).isEqualTo(1); } + @Test + public void testHeadersAnnotation() throws Exception { + template.setDefaultTopic("headerMapTopic"); + + template.send(new GenericMessage<>("message1", Collections.emptyMap())); + Awaitility.await().untilAsserted(() -> { + assertThat(this.headerMapListener.invocationCount.get()).isEqualTo(1); + }); + assertThat(this.headerMapListener.text).isEqualTo("message1"); + assertThat(this.headerMapListener.headers) + .isNotNull() + .containsOnlyKeys( + "kafka_offset", + "kafka_consumer", + "kafka_timestampType", + "kafka_receivedPartitionId", + "kafka_receivedTopic", + "kafka_receivedTimestamp", + "kafka_groupId"); + + template.send(new GenericMessage<>("message2", + Map.of("akey", "avalue", + "bkey", "bvalue"))); + Awaitility.await().untilAsserted(() -> { + assertThat(this.headerMapListener.invocationCount.get()).isEqualTo(2); + }); + assertThat(this.headerMapListener.text).isEqualTo("message2"); + assertThat(this.headerMapListener.headers) + .isNotNull() + .containsOnlyKeys( + "kafka_offset", + "kafka_consumer", + "kafka_timestampType", + "kafka_receivedPartitionId", + "kafka_receivedTopic", + "kafka_receivedTimestamp", + "kafka_groupId", + "akey", + "bkey") + .contains( + Map.entry("akey", "avalue"), + Map.entry("bkey", "bvalue")); + } + @Test public void testMultiValidateNoDefaultHandler() throws Exception { this.kafkaJsonTemplate.setDefaultTopic("annotated40"); @@ -1545,6 +1595,11 @@ public MultiJsonListenerBean multiJsonListener() { return new MultiJsonListenerBean(); } + @Bean + public HeaderMapListenerBean headerMapListener() { + return new HeaderMapListenerBean(); + } + @Bean public MultiListenerNoDefault multiNoDefault() { return new MultiListenerNoDefault(); @@ -2742,6 +2797,24 @@ public void defaultHandler(Bar bar) { } + @KafkaListener(id = "headerMap", topics = "headerMapTopic") + static class HeaderMapListenerBean { + + final AtomicInteger invocationCount = new AtomicInteger(); + + private String text; + + private Map headers; + + @KafkaHandler(isDefault = true) + public void defaultHandler(@Payload String text, @Headers Map headers) { + this.text = text; + this.headers = headers; + this.invocationCount.incrementAndGet(); + } + + } + @KafkaListener(id = "multiNoDefault", topics = "annotated40", containerFactory = "kafkaJsonListenerContainerFactory2") static class MultiListenerNoDefault {