Skip to content

Commit cad5646

Browse files
garyrussellartembilan
authored andcommitted
GH-1340: Support returning a collection
Resolves #1340 Previously, a reply type of `Collection` always split the collection into discrete replies. There is now an option to send the entire collection in the reply. * Fix cast. * Fix doc link.
1 parent a41da35 commit cad5646

File tree

9 files changed

+176
-15
lines changed

9 files changed

+176
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListener.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,4 +244,13 @@
244244
*/
245245
String[] properties() default {};
246246

247+
/**
248+
* When false and the return type is a {@link Iterable} return the result as the value
249+
* of a single reply record instead of individual records for each element. Default
250+
* true. Ignored if the reply is of type {@code Iterable<Message<?>>}.
251+
* @return false to create a single reply record.
252+
* @since 2.3.5
253+
*/
254+
boolean splitIterables() default true;
255+
247256
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
437437
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
438438
}
439439
resolveKafkaProperties(endpoint, kafkaListener.properties());
440+
endpoint.setSplitIterables(kafkaListener.splitIterables());
440441

441442
KafkaListenerContainerFactory<?> factory = null;
442443
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
114114

115115
private Properties consumerProperties;
116116

117+
private boolean splitIterables = true;
118+
117119
@Override
118120
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
119121
this.beanFactory = beanFactory;
@@ -432,6 +434,21 @@ public void setConsumerProperties(Properties consumerProperties) {
432434
this.consumerProperties = consumerProperties;
433435
}
434436

437+
@Override
438+
public boolean isSplitIterables() {
439+
return this.splitIterables;
440+
}
441+
442+
/**
443+
* Set to false to disable splitting {@link Iterable} reply values into separate
444+
* records.
445+
* @param splitIterables false to disable; default true.
446+
* @since 2.3.5
447+
*/
448+
public void setSplitIterables(boolean splitIterables) {
449+
this.splitIterables = splitIterables;
450+
}
451+
435452
@Override
436453
public void afterPropertiesSet() {
437454
boolean topicsEmpty = getTopics().isEmpty();
@@ -470,6 +487,7 @@ private void setupMessageListener(MessageListenerContainer container, MessageCon
470487
if (this.replyHeadersConfigurer != null) {
471488
adapter.setReplyHeadersConfigurer(this.replyHeadersConfigurer);
472489
}
490+
adapter.setSplitIterables(this.splitIterables);
473491
Object messageListener = adapter;
474492
Assert.state(messageListener != null,
475493
() -> "Endpoint [" + this + "] must provide a non null message listener");

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpoint.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,11 @@ default Properties getConsumerProperties() {
144144
*/
145145
void setupListenerContainer(MessageListenerContainer listenerContainer, MessageConverter messageConverter);
146146

147+
/**
148+
* When true, {@link Iterable} return results will be split into discrete records.
149+
* @return true to split.
150+
* @since 2.3.5
151+
*/
152+
boolean isSplitIterables();
153+
147154
}

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointAdapter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,9 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer,
8787
MessageConverter messageConverter) {
8888
}
8989

90+
@Override
91+
public boolean isSplitIterables() {
92+
return true;
93+
}
94+
9095
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.ByteBuffer;
2424
import java.nio.charset.StandardCharsets;
2525
import java.util.Collection;
26+
import java.util.Iterator;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.stream.Collectors;
@@ -123,6 +124,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
123124

124125
private ReplyHeadersConfigurer replyHeadersConfigurer;
125126

127+
private boolean splitIterables = true;
128+
126129
public MessagingMessageListenerAdapter(Object bean, Method method) {
127130
this.bean = bean;
128131
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
@@ -252,6 +255,25 @@ public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigu
252255
this.replyHeadersConfigurer = replyHeadersConfigurer;
253256
}
254257

258+
/**
259+
* When true, {@link Iterable} return results will be split into discrete records.
260+
* @return true to split.
261+
* @since 2.3.5
262+
*/
263+
protected boolean isSplitIterables() {
264+
return this.splitIterables;
265+
}
266+
267+
/**
268+
* Set to false to disable splitting {@link Iterable} reply values into separate
269+
* records.
270+
* @param splitIterables false to disable; default true.
271+
* @since 2.3.5
272+
*/
273+
public void setSplitIterables(boolean splitIterables) {
274+
this.splitIterables = splitIterables;
275+
}
276+
255277
@Override
256278
public void registerSeekCallback(ConsumerSeekCallback callback) {
257279
if (this.bean instanceof ConsumerSeekAware) {
@@ -406,15 +428,25 @@ else if (result instanceof Message) {
406428
this.replyTemplate.send((Message<?>) result);
407429
}
408430
else {
409-
if (result instanceof Collection) {
410-
((Collection<V>) result).forEach(v -> {
411-
if (v instanceof Message) {
412-
this.replyTemplate.send((Message<?>) v);
413-
}
414-
else {
415-
this.replyTemplate.send(topic, v);
416-
}
417-
});
431+
if (result instanceof Iterable) {
432+
Iterator<?> iterator = ((Iterable<?>) result).iterator();
433+
boolean iterableOfMessages = false;
434+
if (iterator.hasNext()) {
435+
iterableOfMessages = iterator.next() instanceof Message;
436+
}
437+
if (iterableOfMessages || this.splitIterables) {
438+
((Iterable<V>) result).forEach(v -> {
439+
if (v instanceof Message) {
440+
this.replyTemplate.send((Message<?>) v);
441+
}
442+
else {
443+
this.replyTemplate.send(topic, v);
444+
}
445+
});
446+
}
447+
else {
448+
sendSingleResult(result, topic, source);
449+
}
418450
}
419451
else {
420452
sendSingleResult(result, topic, source);

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@
107107
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
108108
import org.springframework.kafka.support.converter.RecordMessageConverter;
109109
import org.springframework.kafka.support.converter.StringJsonMessageConverter;
110+
import org.springframework.kafka.support.serializer.JsonDeserializer;
111+
import org.springframework.kafka.support.serializer.JsonSerializer;
110112
import org.springframework.kafka.test.EmbeddedKafkaBroker;
111113
import org.springframework.kafka.test.context.EmbeddedKafka;
112114
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -153,7 +155,8 @@
153155
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
154156
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
155157
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
156-
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle" })
158+
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
159+
"annotated38", "annotated38reply" })
157160
public class EnableKafkaIntegrationTests {
158161

159162
private static final String DEFAULT_TEST_GROUP_ID = "testAnnot";
@@ -803,6 +806,40 @@ public void testSeekToLastOnIdle() throws InterruptedException {
803806
assertThat(KafkaTestUtils.getPropertyValue(this.seekOnIdleListener, "callbacks", Map.class)).hasSize(0);
804807
}
805808

809+
@SuppressWarnings({ "unchecked", "rawtypes" })
810+
@Test
811+
public void testReplyingBatchListenerReturnCollection() {
812+
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
813+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testReplyingBatchListenerReturnCollection");
814+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
815+
ConsumerFactory<Integer, Object> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
816+
Consumer<Integer, Object> consumer = cf.createConsumer();
817+
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "annotated38reply");
818+
template.send("annotated38", 0, 0, "FoO");
819+
template.send("annotated38", 0, 0, "BaR");
820+
template.flush();
821+
ConsumerRecords replies = KafkaTestUtils.getRecords(consumer);
822+
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
823+
Iterator<ConsumerRecord<?, ?>> iterator = replies.iterator();
824+
Object value = iterator.next().value();
825+
assertThat(value).isInstanceOf(List.class);
826+
List list = (List) value;
827+
assertThat(list).hasSizeGreaterThanOrEqualTo(1);
828+
assertThat(list.get(0)).isEqualTo("FOO");
829+
if (list.size() > 1) {
830+
assertThat(list.get(1)).isEqualTo("BAR");
831+
}
832+
else {
833+
replies = KafkaTestUtils.getRecords(consumer);
834+
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
835+
iterator = replies.iterator();
836+
list = (List) iterator.next();
837+
assertThat(list).hasSize(1);
838+
assertThat(list.get(0)).isEqualTo("BAR");
839+
}
840+
consumer.close();
841+
}
842+
806843
@Configuration
807844
@EnableKafka
808845
@EnableTransactionManagement(proxyTargetClass = true)
@@ -851,7 +888,7 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
851888
new ConcurrentKafkaListenerContainerFactory<>();
852889
factory.setConsumerFactory(consumerFactory());
853890
factory.setRecordFilterStrategy(recordFilter());
854-
factory.setReplyTemplate(partitionZeroReplyingTemplate());
891+
factory.setReplyTemplate(partitionZeroReplyTemplate());
855892
factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> {
856893
this.globalErrorThrowable = t;
857894
c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset());
@@ -868,7 +905,7 @@ public ChainedKafkaTransactionManager<Integer, String> cktm() {
868905
new ConcurrentKafkaListenerContainerFactory<>();
869906
factory.setConsumerFactory(consumerFactory());
870907
factory.setRecordFilterStrategy(recordFilter());
871-
factory.setReplyTemplate(partitionZeroReplyingTemplate());
908+
factory.setReplyTemplate(partitionZeroReplyTemplate());
872909
factory.setErrorHandler((ConsumerAwareErrorHandler) (t, d, c) -> {
873910
this.globalErrorThrowable = t;
874911
c.seek(new org.apache.kafka.common.TopicPartition(d.topic(), d.partition()), d.offset());
@@ -988,7 +1025,19 @@ public KafkaListenerContainerFactory<?> batchFactory() {
9881025
factory.setBatchListener(true);
9891026
factory.setRecordFilterStrategy(recordFilter());
9901027
// always send to the same partition so the replies are in order for the test
991-
factory.setReplyTemplate(partitionZeroReplyingTemplate());
1028+
factory.setReplyTemplate(partitionZeroReplyTemplate());
1029+
return factory;
1030+
}
1031+
1032+
@Bean
1033+
public KafkaListenerContainerFactory<?> batchJsonReplyFactory() {
1034+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
1035+
new ConcurrentKafkaListenerContainerFactory<>();
1036+
factory.setConsumerFactory(consumerFactory());
1037+
factory.setBatchListener(true);
1038+
factory.setRecordFilterStrategy(recordFilter());
1039+
// always send to the same partition so the replies are in order for the test
1040+
factory.setReplyTemplate(partitionZeroReplyJsonTemplate());
9921041
return factory;
9931042
}
9941043

@@ -1018,7 +1067,7 @@ public KafkaListenerContainerFactory<?> batchSpyFactory() {
10181067
factory.setBatchListener(true);
10191068
factory.setRecordFilterStrategy(recordFilter());
10201069
// always send to the same partition so the replies are in order for the test
1021-
factory.setReplyTemplate(partitionZeroReplyingTemplate());
1070+
factory.setReplyTemplate(partitionZeroReplyTemplate());
10221071
factory.setMissingTopicsFatal(false);
10231072
return factory;
10241073
}
@@ -1168,6 +1217,13 @@ public ProducerFactory<Integer, String> producerFactory() {
11681217
return new DefaultKafkaProducerFactory<>(producerConfigs());
11691218
}
11701219

1220+
@Bean
1221+
public ProducerFactory<Integer, Object> jsonProducerFactory() {
1222+
Map<String, Object> producerConfigs = producerConfigs();
1223+
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
1224+
return new DefaultKafkaProducerFactory<>(producerConfigs);
1225+
}
1226+
11711227
@Bean
11721228
public ProducerFactory<Integer, String> txProducerFactory() {
11731229
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
@@ -1198,7 +1254,7 @@ public KafkaTemplate<byte[], String> bytesKeyTemplate() {
11981254
}
11991255

12001256
@Bean
1201-
public KafkaTemplate<Integer, String> partitionZeroReplyingTemplate() {
1257+
public KafkaTemplate<Integer, String> partitionZeroReplyTemplate() {
12021258
// reply always uses the no-partition, no-key method; subclasses can be used
12031259
return new KafkaTemplate<Integer, String>(producerFactory(), true) {
12041260

@@ -1210,6 +1266,19 @@ public ListenableFuture<SendResult<Integer, String>> send(String topic, String d
12101266
};
12111267
}
12121268

1269+
@Bean
1270+
public KafkaTemplate<Integer, Object> partitionZeroReplyJsonTemplate() {
1271+
// reply always uses the no-partition, no-key method; subclasses can be used
1272+
return new KafkaTemplate<Integer, Object>(jsonProducerFactory(), true) {
1273+
1274+
@Override
1275+
public ListenableFuture<SendResult<Integer, Object>> send(String topic, Object data) {
1276+
return super.send(topic, 0, null, data);
1277+
}
1278+
1279+
};
1280+
}
1281+
12131282
@Bean
12141283
public KafkaTemplate<Integer, String> kafkaJsonTemplate() {
12151284
KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
@@ -1731,6 +1800,15 @@ public Collection<String> replyingBatchListenerWithErrorHandler(List<String> in)
17311800
throw new RuntimeException("return this");
17321801
}
17331802

1803+
@KafkaListener(id = "replyingBatchListenerCollection", topics = "annotated38",
1804+
containerFactory = "batchJsonReplyFactory", splitIterables = false)
1805+
@SendTo("annotated38reply")
1806+
public Collection<String> replyingBatchListenerCollection(List<String> in) {
1807+
return in.stream()
1808+
.map(String::toUpperCase)
1809+
.collect(Collectors.toList());
1810+
}
1811+
17341812
@KafkaListener(id = "batchAckListener", topics = { "annotated26", "annotated27" },
17351813
containerFactory = "batchFactory")
17361814
public void batchAckListener(@SuppressWarnings("unused") List<String> in,

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1744,6 +1744,11 @@ public KafkaListenerErrorHandler voidSendToErrorHandler() {
17441744
See <<annotation-error-handling>> for more information.
17451745
====
17461746

1747+
NOTE: If a listener method returns an `Iterable`, by default a record for each element as the value is sent.
1748+
Starting with version 2.3.5, set the `splitIterables` property on `@KafkaListener` to `false` and the entire result will be sent as the value of a single `ProducerRecord`.
1749+
This requires a suitable serializer in the reply template's producer configuration.
1750+
However, if the reply is `Iterable<Message<?>>` the property is ignored and each message is sent separately.
1751+
17471752
===== Filtering Messages
17481753

17491754
In certain scenarios, such as rebalancing, a message that has already been processed may be redelivered.

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ See <<aggregating-request-reply>> for more information.
3838
The `ContainerProperties` provides an `authorizationExceptionRetryInterval` option to let the listener container to retry after any `AuthorizationException` is thrown by the `KafkaConsumer`.
3939
See its JavaDocs and <<kafka-container>> for more information.
4040

41+
==== @KafkaListener
42+
43+
The `@KafkaListener` annotation has a new property `splitIterables`; default true.
44+
When a replying listener returns an `Iterable` this property controls whether the return result is sent as a single record or a record for each element is sent.
45+
See <<annotation-send-to>> for more information.
46+
4147
=== Migration Guide
4248

4349
* This release is essentially the same as the 2.3.x line, except it has been compiled against the 2.4 `kafka-clients` jar, due to a binary incompatibility.

0 commit comments

Comments
 (0)