|
63 | 63 | import org.apache.kafka.common.errors.TopicExistsException; |
64 | 64 | import org.apache.kafka.common.serialization.ByteArrayDeserializer; |
65 | 65 | import org.apache.kafka.common.serialization.ByteArraySerializer; |
| 66 | +import org.awaitility.Awaitility; |
66 | 67 | import org.jspecify.annotations.NonNull; |
67 | 68 | import org.jspecify.annotations.Nullable; |
68 | 69 | import org.junit.jupiter.api.Test; |
|
141 | 142 | import org.springframework.messaging.converter.AbstractMessageConverter; |
142 | 143 | import org.springframework.messaging.converter.SmartMessageConverter; |
143 | 144 | import org.springframework.messaging.handler.annotation.Header; |
| 145 | +import org.springframework.messaging.handler.annotation.Headers; |
144 | 146 | import org.springframework.messaging.handler.annotation.Payload; |
145 | 147 | import org.springframework.messaging.handler.annotation.SendTo; |
146 | 148 | import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException; |
|
181 | 183 | * @author Soby Chacko |
182 | 184 | * @author Wang Zhiyang |
183 | 185 | * @author Borahm Lee |
| 186 | + * @author Sean Sullivan |
184 | 187 | */ |
185 | 188 | @SpringJUnitConfig |
186 | 189 | @DirtiesContext |
|
196 | 199 | "annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33", |
197 | 200 | "annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle", |
198 | 201 | "annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42", |
199 | | - "annotated43", "annotated43reply", "seekToComputeFn"}) |
| 202 | + "annotated43", "annotated43reply", "seekToComputeFn", |
| 203 | + "headerMapTopic"}) |
200 | 204 | @TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10") |
201 | 205 | public class EnableKafkaIntegrationTests { |
202 | 206 |
|
@@ -242,6 +246,9 @@ public class EnableKafkaIntegrationTests { |
242 | 246 | @Autowired |
243 | 247 | public MultiJsonListenerBean multiJsonListener; |
244 | 248 |
|
| 249 | + @Autowired |
| 250 | + public HeaderMapListenerBean headerMapListener; |
| 251 | + |
245 | 252 | @Autowired |
246 | 253 | public MultiListenerNoDefault multiNoDefault; |
247 | 254 |
|
@@ -584,6 +591,50 @@ public void testMultiJson() throws Exception { |
584 | 591 | assertThat(this.multiJsonListener.validated.valCount).isEqualTo(1); |
585 | 592 | } |
586 | 593 |
|
| 594 | + @Test |
| 595 | + public void testHeadersAnnotation() throws Exception { |
| 596 | + template.setDefaultTopic("headerMapTopic"); |
| 597 | + |
| 598 | + template.send(new GenericMessage<>("message1", Collections.emptyMap())); |
| 599 | + Awaitility.await().untilAsserted(() -> { |
| 600 | + assertThat(this.headerMapListener.invocationCount.get()).isEqualTo(1); |
| 601 | + }); |
| 602 | + assertThat(this.headerMapListener.text).isEqualTo("message1"); |
| 603 | + assertThat(this.headerMapListener.headers) |
| 604 | + .isNotNull() |
| 605 | + .containsOnlyKeys( |
| 606 | + "kafka_offset", |
| 607 | + "kafka_consumer", |
| 608 | + "kafka_timestampType", |
| 609 | + "kafka_receivedPartitionId", |
| 610 | + "kafka_receivedTopic", |
| 611 | + "kafka_receivedTimestamp", |
| 612 | + "kafka_groupId"); |
| 613 | + |
| 614 | + template.send(new GenericMessage<>("message2", |
| 615 | + Map.of("akey", "avalue", |
| 616 | + "bkey", "bvalue"))); |
| 617 | + Awaitility.await().untilAsserted(() -> { |
| 618 | + assertThat(this.headerMapListener.invocationCount.get()).isEqualTo(2); |
| 619 | + }); |
| 620 | + assertThat(this.headerMapListener.text).isEqualTo("message2"); |
| 621 | + assertThat(this.headerMapListener.headers) |
| 622 | + .isNotNull() |
| 623 | + .containsOnlyKeys( |
| 624 | + "kafka_offset", |
| 625 | + "kafka_consumer", |
| 626 | + "kafka_timestampType", |
| 627 | + "kafka_receivedPartitionId", |
| 628 | + "kafka_receivedTopic", |
| 629 | + "kafka_receivedTimestamp", |
| 630 | + "kafka_groupId", |
| 631 | + "akey", |
| 632 | + "bkey") |
| 633 | + .contains( |
| 634 | + Map.entry("akey", "avalue"), |
| 635 | + Map.entry("bkey", "bvalue")); |
| 636 | + } |
| 637 | + |
587 | 638 | @Test |
588 | 639 | public void testMultiValidateNoDefaultHandler() throws Exception { |
589 | 640 | this.kafkaJsonTemplate.setDefaultTopic("annotated40"); |
@@ -1545,6 +1596,11 @@ public MultiJsonListenerBean multiJsonListener() { |
1545 | 1596 | return new MultiJsonListenerBean(); |
1546 | 1597 | } |
1547 | 1598 |
|
| 1599 | + @Bean |
| 1600 | + public HeaderMapListenerBean headerMapListener() { |
| 1601 | + return new HeaderMapListenerBean(); |
| 1602 | + } |
| 1603 | + |
1548 | 1604 | @Bean |
1549 | 1605 | public MultiListenerNoDefault multiNoDefault() { |
1550 | 1606 | return new MultiListenerNoDefault(); |
@@ -2742,6 +2798,22 @@ public void defaultHandler(Bar bar) { |
2742 | 2798 |
|
2743 | 2799 | } |
2744 | 2800 |
|
| 2801 | + @KafkaListener(id = "headerMap", topics = "headerMapTopic") |
| 2802 | + static class HeaderMapListenerBean { |
| 2803 | + |
| 2804 | + final AtomicInteger invocationCount = new AtomicInteger(); |
| 2805 | + private String text; |
| 2806 | + private Map<String, Object> headers; |
| 2807 | + |
| 2808 | + @KafkaHandler(isDefault = true) |
| 2809 | + public void defaultHandler(@Payload String text, @Headers Map<String, Object> headers) { |
| 2810 | + this.text = text; |
| 2811 | + this.headers = headers; |
| 2812 | + this.invocationCount.incrementAndGet(); |
| 2813 | + } |
| 2814 | + |
| 2815 | + } |
| 2816 | + |
2745 | 2817 | @KafkaListener(id = "multiNoDefault", topics = "annotated40", containerFactory = "kafkaJsonListenerContainerFactory2") |
2746 | 2818 | static class MultiListenerNoDefault { |
2747 | 2819 |
|
|
0 commit comments