Skip to content

Commit 2143168

Browse files
garyrussellartembilan
authored andcommitted
@KafkaListener Add BytesToStringConverter
Currently a `byte[]` gets converted to a String using an `ArrayToStringConverter`, which is not very useful. Add a `BytesToStringConverter`. * Remove obsolete getter.
1 parent 265a4d3 commit 2143168

File tree

2 files changed

+86
-2
lines changed

2 files changed

+86
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.kafka.annotation;
1818

1919
import java.lang.reflect.Method;
20+
import java.nio.charset.Charset;
21+
import java.nio.charset.StandardCharsets;
2022
import java.util.ArrayList;
2123
import java.util.Arrays;
2224
import java.util.Collection;
@@ -152,6 +154,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
152154

153155
private BeanExpressionContext expressionContext;
154156

157+
private Charset charset = StandardCharsets.UTF_8;
158+
155159
@Override
156160
public int getOrder() {
157161
return LOWEST_PRECEDENCE;
@@ -204,6 +208,16 @@ public void setBeanFactory(BeanFactory beanFactory) {
204208
}
205209
}
206210

211+
/**
212+
* Set a charset to use when converting byte[] to String in method arguments.
213+
* Default UTF-8.
214+
* @param charset the charset.
215+
* @since 2.2
216+
*/
217+
public void setCharset(Charset charset) {
218+
Assert.notNull(charset, "'charset' cannot be null");
219+
this.charset = charset;
220+
}
207221

208222
@Override
209223
public void afterSingletonsInstantiated() {
@@ -729,7 +743,8 @@ private <T> Collection<T> getBeansOfType(Class<T> type) {
729743
*/
730744
private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
731745

732-
private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
746+
private final DefaultFormattingConversionService defaultFormattingConversionService =
747+
new DefaultFormattingConversionService();
733748

734749
private MessageHandlerMethodFactory messageHandlerMethodFactory;
735750

@@ -762,6 +777,9 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
762777
(ConfigurableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory : null);
763778

764779

780+
this.defaultFormattingConversionService.addConverter(
781+
new BytesToStringConverter(KafkaListenerAnnotationBeanPostProcessor.this.charset));
782+
765783
defaultFactory.setConversionService(this.defaultFormattingConversionService);
766784

767785
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
@@ -789,6 +807,22 @@ protected boolean isEmptyPayload(Object payload) {
789807

790808
}
791809

810+
private static class BytesToStringConverter implements Converter<byte[], String> {
811+
812+
813+
private final Charset charset;
814+
815+
BytesToStringConverter(Charset charset) {
816+
this.charset = charset;
817+
}
818+
819+
@Override
820+
public String convert(byte[] source) {
821+
return new String(source, this.charset);
822+
}
823+
824+
}
825+
792826
private static class ListenerScope implements Scope {
793827

794828
private final Map<String, Object> listeners = new HashMap<>();

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
import org.apache.kafka.clients.consumer.ConsumerRecord;
4747
import org.apache.kafka.clients.consumer.ConsumerRecords;
4848
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
49+
import org.apache.kafka.clients.producer.ProducerConfig;
50+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
51+
import org.apache.kafka.common.serialization.ByteArraySerializer;
4952
import org.junit.ClassRule;
5053
import org.junit.Test;
5154
import org.junit.runner.RunWith;
@@ -142,7 +145,7 @@ public class EnableKafkaIntegrationTests {
142145
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
143146
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
144147
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
145-
"annotated34", "annotated35");
148+
"annotated34", "annotated35", "annotated36");
146149

147150
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
148151

@@ -167,6 +170,9 @@ public class EnableKafkaIntegrationTests {
167170
@Autowired
168171
public KafkaTemplate<Integer, String> kafkaJsonTemplate;
169172

173+
@Autowired
174+
public KafkaTemplate<byte[], String> bytesKeyTemplate;
175+
170176
@Autowired
171177
public KafkaListenerEndpointRegistry registry;
172178

@@ -697,6 +703,13 @@ public void testAutoConfigTm() {
697703
.isInstanceOf(ChainedKafkaTransactionManager.class);
698704
}
699705

706+
@Test
707+
public void testKeyConversion() throws Exception {
708+
this.bytesKeyTemplate.send("annotated36", "foo".getBytes(), "bar");
709+
assertThat(this.listener.keyLatch.await(10, TimeUnit.SECONDS)).isTrue();
710+
assertThat(this.listener.convertedKey).isEqualTo("foo");
711+
}
712+
700713
@Configuration
701714
@EnableKafka
702715
@EnableTransactionManagement(proxyTargetClass = true)
@@ -807,6 +820,14 @@ public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory2() {
807820
return factory;
808821
}
809822

823+
@Bean
824+
public KafkaListenerContainerFactory<?> bytesStringListenerContainerFactory() {
825+
ConcurrentKafkaListenerContainerFactory<byte[], String> factory =
826+
new ConcurrentKafkaListenerContainerFactory<>();
827+
factory.setConsumerFactory(bytesStringConsumerFactory());
828+
return factory;
829+
}
830+
810831
@Bean
811832
public KafkaListenerContainerFactory<?> batchFactory() {
812833
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
@@ -935,6 +956,13 @@ public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
935956
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
936957
}
937958

959+
@Bean
960+
public DefaultKafkaConsumerFactory<byte[], String> bytesStringConsumerFactory() {
961+
Map<String, Object> configs = consumerConfigs();
962+
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
963+
return new DefaultKafkaConsumerFactory<>(configs);
964+
}
965+
938966
private ConsumerFactory<Integer, String> configuredConsumerFactory(String clientAndGroupId) {
939967
Map<String, Object> configs = consumerConfigs();
940968
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
@@ -988,6 +1016,13 @@ public ProducerFactory<Integer, String> txProducerFactory() {
9881016
return pf;
9891017
}
9901018

1019+
@Bean
1020+
public ProducerFactory<byte[], String> bytesStringProducerFactory() {
1021+
Map<String, Object> configs = producerConfigs();
1022+
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
1023+
return new DefaultKafkaProducerFactory<>(configs);
1024+
}
1025+
9911026
@Bean
9921027
public Map<String, Object> producerConfigs() {
9931028
return KafkaTestUtils.producerProps(embeddedKafka);
@@ -998,6 +1033,11 @@ public KafkaTemplate<Integer, String> template() {
9981033
return new KafkaTemplate<>(producerFactory());
9991034
}
10001035

1036+
@Bean
1037+
public KafkaTemplate<byte[], String> bytesKeyTemplate() {
1038+
return new KafkaTemplate<>(bytesStringProducerFactory());
1039+
}
1040+
10011041
@Bean
10021042
public KafkaTemplate<Integer, String> partitionZeroReplyingTemplate() {
10031043
// reply always uses the no-partition, no-key method; subclasses can be used
@@ -1255,6 +1295,10 @@ static class Listener implements ConsumerSeekAware {
12551295

12561296
private final CountDownLatch eventLatch = new CountDownLatch(1);
12571297

1298+
private final CountDownLatch keyLatch = new CountDownLatch(1);
1299+
1300+
private String convertedKey;
1301+
12581302
private volatile Integer partition;
12591303

12601304
private volatile ConsumerRecord<?, ?> record;
@@ -1525,6 +1569,12 @@ public void ackWithAutoContainerListener(String payload, Acknowledgment ack) {
15251569
// empty
15261570
}
15271571

1572+
@KafkaListener(topics = "annotated36", containerFactory = "bytesStringListenerContainerFactory")
1573+
public void bytesKey(String in, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
1574+
this.convertedKey = key;
1575+
this.keyLatch.countDown();
1576+
}
1577+
15281578
@KafkaListener(topics = "annotated29")
15291579
public void anonymousListener(String in) {
15301580
}

0 commit comments

Comments
 (0)