Skip to content

Commit a521a78

Browse files
garyrussellartembilan
authored andcommitted
GH-1424: Abort delivery on ProducerFencedException
Resolves #1424 Do not deliver remaining records after a `ProducerFencedException` - the partitions will have been reassigned to another instance. **cherry-pick to 2.4.x, 2.3.x**
1 parent bb1c279 commit a521a78

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1559,6 +1559,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
15591559
}
15601560
catch (ProducerFencedException e) {
15611561
this.logger.error(e, "Producer fenced during transaction");
1562+
break;
15621563
}
15631564
catch (RuntimeException e) {
15641565
this.logger.error(e, "Transaction rolled back");

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.concurrent.CountDownLatch;
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicBoolean;
49+
import java.util.concurrent.atomic.AtomicInteger;
4950
import java.util.concurrent.atomic.AtomicReference;
5051
import java.util.function.BiConsumer;
5152

@@ -715,7 +716,9 @@ void testNoAfterRollbackWhenFenced() throws Exception {
715716
recordMap.put(topicPartition1, Collections.singletonList(new ConsumerRecord<>("foo", 1, 0, "key", "value")));
716717
ConsumerRecords records = new ConsumerRecords(recordMap);
717718
final AtomicBoolean done = new AtomicBoolean();
719+
final CountDownLatch pollLatch = new CountDownLatch(2);
718720
willAnswer(i -> {
721+
pollLatch.countDown();
719722
if (done.compareAndSet(false, true)) {
720723
return records;
721724
}
@@ -727,11 +730,6 @@ void testNoAfterRollbackWhenFenced() throws Exception {
727730
ConsumerFactory cf = mock(ConsumerFactory.class);
728731
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
729732
Producer producer = mock(Producer.class);
730-
final CountDownLatch closeLatch = new CountDownLatch(1);
731-
willAnswer(i -> {
732-
closeLatch.countDown();
733-
return null;
734-
}).given(producer).close(any());
735733
willThrow(new ProducerFencedException("test")).given(producer).commitTransaction();
736734
ProducerFactory pf = mock(ProducerFactory.class);
737735
given(pf.transactionCapable()).willReturn(true);
@@ -741,19 +739,23 @@ void testNoAfterRollbackWhenFenced() throws Exception {
741739
new TopicPartitionOffset("foo", 1));
742740
props.setGroupId("group");
743741
props.setTransactionManager(tm);
742+
AtomicInteger deliveryCount = new AtomicInteger();
744743
props.setMessageListener((MessageListener) m -> {
744+
deliveryCount.incrementAndGet();
745745
});
746746
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
747747
AfterRollbackProcessor arp = mock(AfterRollbackProcessor.class);
748748
given(arp.isProcessInTransaction()).willReturn(true);
749749
container.setAfterRollbackProcessor(arp);
750750
container.setBeanName("rollback");
751751
container.start();
752-
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
752+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
753753
InOrder inOrder = inOrder(producer);
754754
inOrder.verify(producer).beginTransaction();
755755
inOrder.verify(producer).commitTransaction();
756756
inOrder.verify(producer).close(any());
757+
inOrder.verifyNoMoreInteractions();
758+
assertThat(deliveryCount.get()).isEqualTo(1);
757759

758760
verify(arp, never()).process(any(), any(), any(), anyBoolean());
759761

0 commit comments

Comments
 (0)