Skip to content

Commit 31d08b4

Browse files
garyrussellartembilan
authored andcommitted
Fix BatchToRecordAdapter
Incorrectly passed the whole message list to the callback, which caused listeners with a `ConsumerRecord<?, ?>` argument to fail. Changes a public API but there is no choice and it was only added a couple of weeks ago.
1 parent 5c34d6a commit 31d08b4

File tree

4 files changed

+34
-7
lines changed

4 files changed

+34
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchToRecordAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ interface Callback<K, V> {
6161

6262
/**
6363
* Handle each message.
64-
* @param records the records.
64+
* @param record the record.
6565
* @param ack the acknowledgment.
6666
* @param consumer the consumer.
6767
* @param message the message.
6868
*/
69-
void invoke(List<ConsumerRecord<K, V>> records, Acknowledgment ack, Consumer<?, ?> consumer,
69+
void invoke(ConsumerRecord<K, V> record, Acknowledgment ack, Consumer<?, ?> consumer,
7070
Message<?> message);
7171

7272
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DefaultBatchToRecordAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records,
6868
for (int i = 0; i < messages.size(); i++) {
6969
Message<?> message = messages.get(i);
7070
try {
71-
callback.invoke(records, ack, consumer, message);
71+
callback.invoke(records.get(i), ack, consumer, message);
7272
}
7373
catch (Exception e) {
7474
this.recoverer.accept(records.get(i), e);

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> record, Acknowledgm
318318
*/
319319
protected final Object invokeHandler(Object data, Acknowledgment acknowledgment, Message<?> message,
320320
Consumer<?, ?> consumer) {
321+
321322
try {
322323
if (data instanceof List && !this.isConsumerRecordList) {
323324
return this.handlerMethod.invoke(message, acknowledgment, consumer);

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchListenerWithRecordAdapterTests.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,48 @@ void test(@Autowired KafkaListenerEndpointRegistry registry, @Autowired TestList
5757
records.add(barRecord);
5858
records.add(new ConsumerRecord<String, String>("foo", 0, 2, null, "baz"));
5959
adapter.onMessage(records, null, null);
60-
assertThat(foo.values).contains("foo", "bar", "baz");
60+
assertThat(foo.values1).contains("foo", "bar", "baz");
6161
assertThat(config.failed).isSameAs(barRecord);
6262
}
6363

64+
@SuppressWarnings("unchecked")
65+
@Test
66+
void testFullRecord(@Autowired KafkaListenerEndpointRegistry registry, @Autowired TestListener foo,
67+
@Autowired Config config) {
68+
69+
config.failed = null;
70+
BatchMessagingMessageListenerAdapter<String, String> adapter =
71+
(BatchMessagingMessageListenerAdapter<String, String>) registry
72+
.getListenerContainer("batchRecordAdapterFullRecord").getContainerProperties().getMessageListener();
73+
List<ConsumerRecord<String, String>> records = new ArrayList<>();
74+
records.add(new ConsumerRecord<String, String>("foo", 0, 0, null, "foo"));
75+
ConsumerRecord<String, String> barRecord = new ConsumerRecord<String, String>("foo", 0, 1, null, "bar");
76+
records.add(barRecord);
77+
records.add(new ConsumerRecord<String, String>("foo", 0, 2, null, "baz"));
78+
adapter.onMessage(records, null, null);
79+
assertThat(foo.values2).contains("foo", "bar", "baz");
80+
assertThat(config.failed).isNull();
81+
}
82+
6483
public static class TestListener {
6584

66-
final List<String> values = new ArrayList<>();
85+
final List<String> values1 = new ArrayList<>();
86+
87+
final List<String> values2 = new ArrayList<>();
6788

6889
@KafkaListener(id = "batchRecordAdapter", topics = "foo", autoStartup = "false")
69-
public void listen(String data) {
70-
values.add(data);
90+
public void listen1(String data) {
91+
values1.add(data);
7192
if ("bar".equals(data)) {
7293
throw new RuntimeException("reject partial");
7394
}
7495
}
7596

97+
@KafkaListener(id = "batchRecordAdapterFullRecord", topics = "foo", autoStartup = "false")
98+
public void listen2(ConsumerRecord<Integer, String> data) {
99+
values2.add(data.value());
100+
}
101+
76102
}
77103

78104
@Configuration

0 commit comments

Comments
 (0)