Skip to content

Commit c240737

Browse files
committed
GH-1621: Fix nack() with Tx Batch Listener
Resolves #1621 When using transactions, the remaining records in the batch after a `nack()` were incorrectly added to the commit list. **I will back-port - conflicts expected**
1 parent 3fef4d4 commit c240737

File tree

2 files changed

+264
-2
lines changed

2 files changed

+264
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1550,8 +1550,10 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
15501550
}
15511551
}
15521552
if (producer != null || (!this.isAnyManualAck && !this.autoCommit)) {
1553-
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
1554-
this.acks.put(record);
1553+
if (this.nackSleep < 0) {
1554+
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(records)) {
1555+
this.acks.put(record);
1556+
}
15551557
}
15561558
if (producer != null) {
15571559
sendOffsetsToTransaction(producer);
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* Copyright 2017-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyMap;
22+
import static org.mockito.ArgumentMatchers.eq;
23+
import static org.mockito.ArgumentMatchers.isNull;
24+
import static org.mockito.BDDMockito.given;
25+
import static org.mockito.BDDMockito.willAnswer;
26+
import static org.mockito.Mockito.inOrder;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.withSettings;
29+
30+
import java.time.Duration;
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.Collection;
34+
import java.util.Collections;
35+
import java.util.HashMap;
36+
import java.util.LinkedHashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
43+
import org.apache.kafka.clients.consumer.Consumer;
44+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
45+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
46+
import org.apache.kafka.clients.consumer.ConsumerRecord;
47+
import org.apache.kafka.clients.consumer.ConsumerRecords;
48+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
49+
import org.apache.kafka.clients.producer.Producer;
50+
import org.apache.kafka.common.TopicPartition;
51+
import org.apache.kafka.common.record.TimestampType;
52+
import org.junit.jupiter.api.Test;
53+
import org.mockito.InOrder;
54+
55+
import org.springframework.beans.factory.annotation.Autowired;
56+
import org.springframework.context.annotation.Bean;
57+
import org.springframework.context.annotation.Configuration;
58+
import org.springframework.kafka.annotation.EnableKafka;
59+
import org.springframework.kafka.annotation.KafkaListener;
60+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
61+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
62+
import org.springframework.kafka.core.ConsumerFactory;
63+
import org.springframework.kafka.core.ProducerFactory;
64+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
65+
import org.springframework.kafka.support.Acknowledgment;
66+
import org.springframework.kafka.test.utils.KafkaTestUtils;
67+
import org.springframework.kafka.transaction.KafkaTransactionManager;
68+
import org.springframework.test.annotation.DirtiesContext;
69+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
70+
71+
/**
72+
* @author Gary Russell
73+
* @since 2.3
74+
*
75+
*/
76+
@SpringJUnitConfig
77+
@DirtiesContext
78+
public class ManualNackRecordTxTests {
79+
80+
@SuppressWarnings("rawtypes")
81+
@Autowired
82+
private Consumer consumer;
83+
84+
@SuppressWarnings("rawtypes")
85+
@Autowired
86+
private Producer producer;
87+
88+
@Autowired
89+
private Config config;
90+
91+
@Autowired
92+
private KafkaListenerEndpointRegistry registry;
93+
94+
/*
95+
* Deliver 6 records from three partitions, fail on the second record second
96+
* partition, first attempt; verify partition 0,1 committed and a total of 7 records
97+
* handled after seek.
98+
*/
99+
@SuppressWarnings({ "unchecked" })
100+
@Test
101+
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
102+
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
103+
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
104+
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
105+
this.registry.stop();
106+
assertThat(this.config.closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
107+
InOrder inOrder = inOrder(this.consumer, this.producer);
108+
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
109+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
110+
HashMap<TopicPartition, OffsetAndMetadata> commit1 = new HashMap<>();
111+
commit1.put(new TopicPartition("foo", 0), new OffsetAndMetadata(2L));
112+
commit1.put(new TopicPartition("foo", 1), new OffsetAndMetadata(1L));
113+
HashMap<TopicPartition, OffsetAndMetadata> commit2 = new HashMap<>();
114+
commit2.put(new TopicPartition("foo", 1), new OffsetAndMetadata(2L));
115+
commit2.put(new TopicPartition("foo", 2), new OffsetAndMetadata(2L));
116+
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(commit1), any(ConsumerGroupMetadata.class));
117+
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 1), 1L);
118+
inOrder.verify(this.consumer).seek(new TopicPartition("foo", 2), 0L);
119+
inOrder.verify(this.consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
120+
inOrder.verify(this.producer).sendOffsetsToTransaction(eq(commit2), any(ConsumerGroupMetadata.class));
121+
assertThat(this.config.count).isEqualTo(2);
122+
assertThat(this.config.contents.toString()).isEqualTo("[[foo, bar, baz, qux, fiz, buz], [qux, fiz, buz]]");
123+
}
124+
125+
@Configuration
126+
@EnableKafka
127+
public static class Config {
128+
129+
private final List<List<String>> contents = new ArrayList<>();
130+
131+
private final CountDownLatch pollLatch = new CountDownLatch(3);
132+
133+
private final CountDownLatch deliveryLatch = new CountDownLatch(2);
134+
135+
private final CountDownLatch closeLatch = new CountDownLatch(1);
136+
137+
private final CountDownLatch commitLatch = new CountDownLatch(2);
138+
139+
private int count;
140+
141+
@KafkaListener(topics = "foo", groupId = "grp")
142+
public void foo(List<String> in, Acknowledgment ack) {
143+
this.contents.add(in);
144+
this.deliveryLatch.countDown();
145+
if (++this.count == 1) { // part 1, offset 1, first time
146+
ack.nack(3, 0);
147+
}
148+
else {
149+
ack.acknowledge();
150+
}
151+
}
152+
153+
@SuppressWarnings({ "rawtypes" })
154+
@Bean
155+
public ConsumerFactory consumerFactory() {
156+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
157+
final Consumer consumer = consumer();
158+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
159+
.willReturn(consumer);
160+
return consumerFactory;
161+
}
162+
163+
@SuppressWarnings({ "rawtypes", "unchecked" })
164+
@Bean
165+
public Consumer consumer() {
166+
final Consumer consumer = mock(Consumer.class);
167+
final TopicPartition topicPartition0 = new TopicPartition("foo", 0);
168+
final TopicPartition topicPartition1 = new TopicPartition("foo", 1);
169+
final TopicPartition topicPartition2 = new TopicPartition("foo", 2);
170+
willAnswer(i -> {
171+
((ConsumerRebalanceListener) i.getArgument(1)).onPartitionsAssigned(
172+
Collections.singletonList(topicPartition1));
173+
return null;
174+
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
175+
Map<TopicPartition, List<ConsumerRecord>> records1 = new LinkedHashMap<>();
176+
records1.put(topicPartition0, Arrays.asList(
177+
new ConsumerRecord("foo", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "foo"),
178+
new ConsumerRecord("foo", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "bar")));
179+
records1.put(topicPartition1, Arrays.asList(
180+
new ConsumerRecord("foo", 1, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "baz"),
181+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "qux")));
182+
records1.put(topicPartition2, Arrays.asList(
183+
new ConsumerRecord("foo", 2, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "fiz"),
184+
new ConsumerRecord("foo", 2, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "buz")));
185+
Map<TopicPartition, List<ConsumerRecord>> records2 = new LinkedHashMap<>(records1);
186+
records2.remove(topicPartition0);
187+
records2.put(topicPartition1, Arrays.asList(
188+
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, 0, null, "qux")));
189+
final AtomicInteger which = new AtomicInteger();
190+
willAnswer(i -> {
191+
this.pollLatch.countDown();
192+
switch (which.getAndIncrement()) {
193+
case 0:
194+
return new ConsumerRecords(records1);
195+
case 1:
196+
return new ConsumerRecords(records2);
197+
default:
198+
try {
199+
Thread.sleep(1000);
200+
}
201+
catch (InterruptedException e) {
202+
Thread.currentThread().interrupt();
203+
}
204+
return new ConsumerRecords(Collections.emptyMap());
205+
}
206+
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
207+
willAnswer(i -> {
208+
this.commitLatch.countDown();
209+
return null;
210+
}).given(consumer).commitSync(anyMap(), any());
211+
willAnswer(i -> {
212+
this.closeLatch.countDown();
213+
return null;
214+
}).given(consumer).close();
215+
given(consumer.groupMetadata()).willReturn(mock(ConsumerGroupMetadata.class));
216+
return consumer;
217+
}
218+
219+
@SuppressWarnings({ "rawtypes", "unchecked" })
220+
@Bean
221+
Producer producer() {
222+
Producer producer = mock(Producer.class, withSettings().verboseLogging());
223+
willAnswer(inv -> {
224+
this.commitLatch.countDown();
225+
return null;
226+
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
227+
return producer;
228+
}
229+
230+
@SuppressWarnings("rawtypes")
231+
@Bean
232+
ProducerFactory pf() {
233+
ProducerFactory pf = mock(ProducerFactory.class, withSettings().verboseLogging());
234+
given(pf.createProducer(isNull())).willReturn(producer());
235+
given(pf.transactionCapable()).willReturn(true);
236+
return pf;
237+
}
238+
239+
@SuppressWarnings("rawtypes")
240+
@Bean
241+
KafkaTransactionManager tm() {
242+
return new KafkaTransactionManager<>(pf());
243+
}
244+
245+
@SuppressWarnings({ "rawtypes", "unchecked" })
246+
@Bean
247+
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
248+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
249+
factory.setConsumerFactory(consumerFactory());
250+
factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
251+
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
252+
factory.getContainerProperties().setMissingTopicsFatal(false);
253+
factory.getContainerProperties().setTransactionManager(tm());
254+
factory.setBatchListener(true);
255+
return factory;
256+
}
257+
258+
}
259+
260+
}

0 commit comments

Comments
 (0)