Skip to content

Commit 909af16

Browse files
garyrussellartembilan
authored andcommitted
GH-1356: Add BatchToRecordAdapter
Resolves #1356 Also fix deprecation warning in `LoggingProducerListener`.
1 parent 5716bf0 commit 909af16

File tree

10 files changed

+405
-16
lines changed

10 files changed

+405
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@
4040
import org.springframework.kafka.listener.ErrorHandler;
4141
import org.springframework.kafka.listener.GenericErrorHandler;
4242
import org.springframework.kafka.listener.RecordInterceptor;
43+
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
4344
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
4445
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
4546
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
@@ -105,6 +106,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe
105106

106107
private RecordInterceptor<K, V> recordInterceptor;
107108

109+
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
110+
108111
private ApplicationContext applicationContext;
109112

110113
private ContainerCustomizer<K, V, C> containerCustomizer;
@@ -306,6 +309,15 @@ public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
306309
this.recordInterceptor = recordInterceptor;
307310
}
308311

312+
/**
313+
* Set a {@link BatchToRecordAdapter}.
314+
* @param batchToRecordAdapter the adapter.
315+
* @since 2.4.2
316+
*/
317+
public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdapter) {
318+
this.batchToRecordAdapter = batchToRecordAdapter;
319+
}
320+
309321
/**
310322
* Set a customizer used to further configure a container after it has been created.
311323
* @param containerCustomizer the customizer.
@@ -356,7 +368,8 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
356368
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
357369
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener)
358370
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
359-
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer);
371+
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer)
372+
.acceptIfNotNull(this.batchToRecordAdapter, aklEndpoint::setBatchToRecordAdapter);
360373
}
361374

362375
/**

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
3939
import org.springframework.kafka.listener.BatchMessageListener;
4040
import org.springframework.kafka.listener.MessageListener;
4141
import org.springframework.kafka.listener.MessageListenerContainer;
42+
import org.springframework.kafka.listener.adapter.BatchToRecordAdapter;
4243
import org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter;
4344
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
4445
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
@@ -116,6 +117,8 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
116117

117118
private boolean splitIterables = true;
118119

120+
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
121+
119122
@Override
120123
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
121124
this.beanFactory = beanFactory;
@@ -449,6 +452,19 @@ public void setSplitIterables(boolean splitIterables) {
449452
this.splitIterables = splitIterables;
450453
}
451454

455+
protected BatchToRecordAdapter<K, V> getBatchToRecordAdapter() {
456+
return this.batchToRecordAdapter;
457+
}
458+
459+
/**
460+
* Set a {@link BatchToRecordAdapter}.
461+
* @param batchToRecordAdapter the adapter.
462+
* @since 2.4.2
463+
*/
464+
public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdapter) {
465+
this.batchToRecordAdapter = batchToRecordAdapter;
466+
}
467+
452468
@Override
453469
public void afterPropertiesSet() {
454470
boolean topicsEmpty = getTopics().isEmpty();

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -184,6 +184,9 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(Me
184184
if (isBatchListener()) {
185185
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>(
186186
this.bean, this.method, this.errorHandler);
187+
if (getBatchToRecordAdapter() != null) {
188+
messageListener.setBatchToRecordAdapter(getBatchToRecordAdapter());
189+
}
187190
if (messageConverter instanceof BatchMessageConverter) {
188191
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
189192
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -62,10 +62,23 @@ public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessage
6262

6363
private KafkaListenerErrorHandler errorHandler;
6464

65+
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
66+
67+
/**
68+
* Create an instance with the provided parameters.
69+
* @param bean the listener bean.
70+
* @param method the listener method.
71+
*/
6572
public BatchMessagingMessageListenerAdapter(Object bean, Method method) {
6673
this(bean, method, null);
6774
}
6875

76+
/**
77+
* Create an instance with the provided parameters.
78+
* @param bean the listener bean.
79+
* @param method the listener method.
80+
* @param errorHandler the error handler.
81+
*/
6982
public BatchMessagingMessageListenerAdapter(Object bean, Method method, KafkaListenerErrorHandler errorHandler) {
7083
super(bean, method);
7184
this.errorHandler = errorHandler;
@@ -82,6 +95,15 @@ public void setBatchMessageConverter(BatchMessageConverter messageConverter) {
8295
}
8396
}
8497

98+
/**
99+
* Set a {@link BatchToRecordAdapter}.
100+
* @param batchToRecordAdapter the adapter.
101+
* @since 2.4.2
102+
*/
103+
public void setBatchToRecordAdapter(BatchToRecordAdapter<K, V> batchToRecordAdapter) {
104+
this.batchToRecordAdapter = batchToRecordAdapter;
105+
}
106+
85107
/**
86108
* Return the {@link BatchMessagingMessageConverter} for this listener,
87109
* being able to convert {@link org.springframework.messaging.Message}.
@@ -115,12 +137,19 @@ public void onMessage(ConsumerRecords<K, V> records, Acknowledgment acknowledgme
115137
public void onMessage(List<ConsumerRecord<K, V>> records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
116138
Message<?> message;
117139
if (!isConsumerRecordList()) {
118-
if (isMessageList()) {
140+
if (isMessageList() || this.batchToRecordAdapter != null) {
119141
List<Message<?>> messages = new ArrayList<>(records.size());
120142
for (ConsumerRecord<K, V> record : records) {
121143
messages.add(toMessagingMessage(record, acknowledgment, consumer));
122144
}
123-
message = MessageBuilder.withPayload(messages).build();
145+
if (this.batchToRecordAdapter == null) {
146+
message = MessageBuilder.withPayload(messages).build();
147+
}
148+
else {
149+
logger.debug(() -> "Processing " + messages);
150+
this.batchToRecordAdapter.adapt(messages, records, acknowledgment, consumer, this::invoke);
151+
return;
152+
}
124153
}
125154
else {
126155
message = toMessagingMessage(records, acknowledgment, consumer);
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
import org.springframework.kafka.support.Acknowledgment;
25+
import org.springframework.messaging.Message;
26+
27+
/**
28+
* An adapter that adapts a batch listener to a record listener method. Use this, for
29+
* example, if you want a batch to be processed in a single transaction but wish to invoke
30+
* the listener with each message individually.
31+
*
32+
* @param <K> the key type.
33+
* @param <V> the value type.
34+
*
35+
* @author Gary Russell
36+
* @since 2.4.2
37+
*
38+
*/
39+
@FunctionalInterface
40+
public interface BatchToRecordAdapter<K, V> {
41+
42+
/**
43+
* Adapt the list and invoke the callback for each message.
44+
* @param messages the messages.
45+
* @param records the records.
46+
* @param ack the acknowledgment.
47+
* @param consumer the consumer.
48+
* @param callback the callback.
49+
*/
50+
void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, Acknowledgment ack,
51+
Consumer<?, ?> consumer, Callback<K, V> callback);
52+
53+
/**
54+
* A callback for each message.
55+
*
56+
* @param <K> the key type.
57+
* @param <V> the value type.
58+
*/
59+
@FunctionalInterface
60+
interface Callback<K, V> {
61+
62+
/**
63+
* Handle each message.
64+
* @param records the records.
65+
* @param ack the acknowledgment.
66+
* @param consumer the consumer.
67+
* @param message the message.
68+
*/
69+
void invoke(List<ConsumerRecord<K, V>> records, Acknowledgment ack, Consumer<?, ?> consumer,
70+
Message<?> message);
71+
72+
}
73+
74+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
import java.util.List;
20+
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
import org.springframework.core.log.LogAccessor;
25+
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
26+
import org.springframework.kafka.support.Acknowledgment;
27+
import org.springframework.messaging.Message;
28+
import org.springframework.util.Assert;
29+
30+
/**
31+
* The default {@link BatchToRecordAdapter} implementation; if the supplied recoverer
32+
* throws an exception, the batch will be aborted; otherwise the next record will be
33+
* processed.
34+
*
35+
* @param <K> the key type.
36+
* @param <V> the value type.
37+
*
38+
* @author Gary Russell
39+
* @since 2.4.2
40+
*/
41+
public class DefaultBatchToRecordAdapter<K, V> implements BatchToRecordAdapter<K, V> {
42+
43+
private static final LogAccessor LOGGER = new LogAccessor(DefaultBatchToRecordAdapter.class);
44+
45+
private final ConsumerRecordRecoverer recoverer;
46+
47+
/**
48+
* Construct an instance with the default recoverer which simply logs the failed
49+
* record.
50+
*/
51+
public DefaultBatchToRecordAdapter() {
52+
this((record, ex) -> LOGGER.error(ex, () -> "Failed to process " + record));
53+
}
54+
55+
/**
56+
* Construct an instance with the provided recoverer.
57+
* @param recoverer the recoverer.
58+
*/
59+
public DefaultBatchToRecordAdapter(ConsumerRecordRecoverer recoverer) {
60+
Assert.notNull(recoverer, "'recoverer' cannot be null");
61+
this.recoverer = recoverer;
62+
}
63+
64+
@Override
65+
public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, Acknowledgment ack,
66+
Consumer<?, ?> consumer, Callback<K, V> callback) {
67+
68+
for (int i = 0; i < messages.size(); i++) {
69+
Message<?> message = messages.get(i);
70+
try {
71+
callback.invoke(records, ack, consumer, message);
72+
}
73+
catch (Exception e) {
74+
this.recoverer.accept(records.get(i), e);
75+
}
76+
}
77+
78+
}
79+
80+
}

spring-kafka/src/main/java/org/springframework/kafka/support/LoggingProducerListener.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.support;
1818

1919
import org.apache.commons.logging.LogFactory;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
2021

2122
import org.springframework.core.log.LogAccessor;
2223
import org.springframework.util.ObjectUtils;
@@ -64,21 +65,21 @@ public void setMaxContentLogged(int maxContentLogged) {
6465
}
6566

6667
@Override
67-
public void onError(String topic, Integer partition, K key, V value, Exception exception) {
68+
public void onError(ProducerRecord<K, V> record, Exception exception) {
6869
LOGGER.error(exception, () -> {
6970
StringBuffer logOutput = new StringBuffer();
7071
logOutput.append("Exception thrown when sending a message");
7172
if (this.includeContents) {
7273
logOutput.append(" with key='")
73-
.append(toDisplayString(ObjectUtils.nullSafeToString(key), this.maxContentLogged))
74+
.append(toDisplayString(ObjectUtils.nullSafeToString(record.key()), this.maxContentLogged))
7475
.append("'")
7576
.append(" and payload='")
76-
.append(toDisplayString(ObjectUtils.nullSafeToString(value), this.maxContentLogged))
77+
.append(toDisplayString(ObjectUtils.nullSafeToString(record.value()), this.maxContentLogged))
7778
.append("'");
7879
}
79-
logOutput.append(" to topic ").append(topic);
80-
if (partition != null) {
81-
logOutput.append(" and partition ").append(partition);
80+
logOutput.append(" to topic ").append(record.topic());
81+
if (record.partition() != null) {
82+
logOutput.append(" and partition ").append(record.partition());
8283
}
8384
logOutput.append(":");
8485
return logOutput.toString();

0 commit comments

Comments
 (0)