Skip to content

Commit b00f542

Browse files
artembilangaryrussell
authored andcommitted
GH-1059: Add SpEL support for KafkaListener.id()
Fixes #1059 * Code style polishing in the `KafkaListenerAnnotationBeanPostProcessor` and `EnableKafkaIntegrationTests` **Cherry-pick to 2.2.x**
1 parent 535c573 commit b00f542

File tree

2 files changed

+41
-42
lines changed

2 files changed

+41
-42
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -278,14 +278,9 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
278278
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
279279
final List<Method> multiMethods = new ArrayList<>();
280280
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
281-
new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {
282-
283-
@Override
284-
public Set<KafkaListener> inspect(Method method) {
285-
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
286-
return (!listenerMethods.isEmpty() ? listenerMethods : null);
287-
}
288-
281+
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
282+
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
283+
return (!listenerMethods.isEmpty() ? listenerMethods : null);
289284
});
290285
if (hasClassLevelListeners) {
291286
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
@@ -428,8 +423,7 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
428423
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
429424
endpoint.setTopics(resolveTopics(kafkaListener));
430425
endpoint.setTopicPattern(resolvePattern(kafkaListener));
431-
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(),
432-
"clientIdPrefix"));
426+
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
433427
String group = kafkaListener.containerGroup();
434428
if (StringUtils.hasText(group)) {
435429
Object resolvedGroup = resolveExpression(group);
@@ -476,11 +470,14 @@ private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint,
476470
if (propertyStrings.length > 0) {
477471
Properties properties = new Properties();
478472
for (String property : propertyStrings) {
479-
try {
480-
properties.load(new StringReader(resolveExpressionAsString(property, "property")));
481-
}
482-
catch (IOException e) {
483-
this.logger.error("Failed to load property " + property + ", continuing...", e);
473+
String value = resolveExpressionAsString(property, "property");
474+
if (value != null) {
475+
try {
476+
properties.load(new StringReader(value));
477+
}
478+
catch (IOException e) {
479+
this.logger.error("Failed to load property " + property + ", continuing...", e);
480+
}
484481
}
485482
}
486483
endpoint.setConsumerProperties(properties);
@@ -489,7 +486,7 @@ private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint,
489486

490487
private String getEndpointId(KafkaListener kafkaListener) {
491488
if (StringUtils.hasText(kafkaListener.id())) {
492-
return resolve(kafkaListener.id());
489+
return resolveExpressionAsString(kafkaListener.id(), "id");
493490
}
494491
else {
495492
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
@@ -515,19 +512,19 @@ private TopicPartitionInitialOffset[] resolveTopicPartitions(KafkaListener kafka
515512
result.addAll(resolveTopicPartitionsList(topicPartition));
516513
}
517514
}
518-
return result.toArray(new TopicPartitionInitialOffset[result.size()]);
515+
return result.toArray(new TopicPartitionInitialOffset[0]);
519516
}
520517

521518
private String[] resolveTopics(KafkaListener kafkaListener) {
522519
String[] topics = kafkaListener.topics();
523520
List<String> result = new ArrayList<>();
524521
if (topics.length > 0) {
525-
for (int i = 0; i < topics.length; i++) {
526-
Object topic = resolveExpression(topics[i]);
522+
for (String topic1 : topics) {
523+
Object topic = resolveExpression(topic1);
527524
resolveAsString(topic, result);
528525
}
529526
}
530-
return result.toArray(new String[result.size()]);
527+
return result.toArray(new String[0]);
531528
}
532529

533530
private Pattern resolvePattern(KafkaListener kafkaListener) {
@@ -559,8 +556,8 @@ private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartit
559556
Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
560557
"At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
561558
List<TopicPartitionInitialOffset> result = new ArrayList<>();
562-
for (int i = 0; i < partitions.length; i++) {
563-
resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);
559+
for (String partition : partitions) {
560+
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
564561
}
565562

566563
for (PartitionOffset partitionOffset : partitionOffsets) {
@@ -929,4 +926,5 @@ protected boolean isEmptyPayload(Object payload) {
929926
}
930927

931928
}
929+
932930
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.springframework.kafka.annotation;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.anyMap;
2323
import static org.mockito.ArgumentMatchers.anyString;
@@ -257,7 +257,7 @@ public void testSimple() throws Exception {
257257
"listenerConsumer.consumer"));
258258
assertThat(
259259
KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer, "fetcher.maxPollRecords", Integer.class))
260-
.isEqualTo(100);
260+
.isEqualTo(100);
261261
assertThat(this.quxGroup).hasSize(1);
262262
assertThat(this.quxGroup.get(0)).isSameAs(manualContainer);
263263
List<?> containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class);
@@ -311,14 +311,12 @@ public void testSimple() throws Exception {
311311
.isNotEqualTo("rebalanceListener");
312312
String clientId = KafkaTestUtils.getPropertyValue(rebalanceContainer, "listenerConsumer.consumer.clientId",
313313
String.class);
314-
assertThat(
315-
clientId)
316-
.startsWith("rebal-");
314+
assertThat(clientId).startsWith("rebal-");
317315
assertThat(clientId.indexOf('-')).isEqualTo(clientId.lastIndexOf('-'));
318316
}
319317

320318
@Test
321-
public void testAutoStartup() throws Exception {
319+
public void testAutoStartup() {
322320
MessageListenerContainer listenerContainer = registry.getListenerContainer("manualStart");
323321
assertThat(listenerContainer).isNotNull();
324322
assertThat(listenerContainer.isRunning()).isFalse();
@@ -401,7 +399,6 @@ public void testJson() throws Exception {
401399
}
402400

403401
@Test
404-
@DirtiesContext
405402
public void testJsonHeaders() throws Exception {
406403
ConcurrentMessageListenerContainer<?, ?> container =
407404
(ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer("jsonHeaders");
@@ -549,7 +546,7 @@ public void testValidation() throws Exception {
549546
}
550547

551548
@Test
552-
public void testReplyingListener() throws Exception {
549+
public void testReplyingListener() {
553550
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
554551
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testReplying");
555552
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -563,7 +560,7 @@ public void testReplyingListener() throws Exception {
563560
}
564561

565562
@Test
566-
public void testReplyingBatchListener() throws Exception {
563+
public void testReplyingBatchListener() {
567564
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
568565
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testBatchReplying");
569566
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -589,7 +586,7 @@ public void testReplyingBatchListener() throws Exception {
589586
}
590587

591588
@Test
592-
public void testReplyingListenerWithErrorHandler() throws Exception {
589+
public void testReplyingListenerWithErrorHandler() {
593590
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
594591
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testErrorHandlerReplying");
595592
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -603,7 +600,7 @@ public void testReplyingListenerWithErrorHandler() throws Exception {
603600
}
604601

605602
@Test
606-
public void testVoidListenerWithReplyingErrorHandler() throws Exception {
603+
public void testVoidListenerWithReplyingErrorHandler() {
607604
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
608605
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testVoidWithErrorHandlerReplying");
609606
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -617,7 +614,7 @@ public void testVoidListenerWithReplyingErrorHandler() throws Exception {
617614
}
618615

619616
@Test
620-
public void testReplyingBatchListenerWithErrorHandler() throws Exception {
617+
public void testReplyingBatchListenerWithErrorHandler() {
621618
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
622619
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testErrorHandlerBatchReplying");
623620
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -643,7 +640,7 @@ public void testReplyingBatchListenerWithErrorHandler() throws Exception {
643640
}
644641

645642
@Test
646-
public void testMultiReplyTo() throws Exception {
643+
public void testMultiReplyTo() {
647644
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
648645
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
649646
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -712,10 +709,12 @@ public void testAddingTopics() {
712709
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 1);
713710
embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1));
714711
assertThat(embeddedKafka.getTopics().size()).isEqualTo(count + 2);
715-
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))
716-
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("exists");
717-
assertThatThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2)))
718-
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("replication");
712+
assertThatIllegalArgumentException()
713+
.isThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions", 10, (short) 1)))
714+
.withMessageContaining("exists");
715+
assertThatIllegalArgumentException()
716+
.isThrownBy(() -> embeddedKafka.addTopics(new NewTopic("morePartitions2", 10, (short) 2)))
717+
.withMessageContaining("replication");
719718
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
720719
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testMultiReplying");
721720
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
@@ -736,7 +735,7 @@ public void testReceivePollResults() throws Exception {
736735
@Test
737736
public void testAutoConfigTm() {
738737
assertThat(this.transactionalFactory.getContainerProperties().getTransactionManager())
739-
.isInstanceOf(ChainedKafkaTransactionManager.class);
738+
.isInstanceOf(ChainedKafkaTransactionManager.class);
740739
}
741740

742741
@Test
@@ -1424,8 +1423,9 @@ public void listen3(ConsumerRecord<?, ?> record) {
14241423
this.latch3.countDown();
14251424
}
14261425

1427-
@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
1428-
containerGroup = "qux#{'Group'}", properties = {
1426+
@KafkaListener(id = "#{'qux'}", topics = "annotated4",
1427+
containerFactory = "kafkaManualAckListenerContainerFactory", containerGroup = "qux#{'Group'}",
1428+
properties = {
14291429
"max.poll.interval.ms:#{'${poll.interval:60000}'}",
14301430
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=#{'${poll.recs:100}'}"
14311431
})
@@ -1888,6 +1888,7 @@ public void setDelegate(
18881888
public Foo convert(String source) {
18891889
return delegate.convert(source);
18901890
}
1891+
18911892
}
18921893

18931894
public static class ValidatedClass {

0 commit comments

Comments
 (0)