Skip to content

Commit 6689bbb

Browse files
garyrussellartembilan
authored andcommitted
GH-683: Fix Transactions with ErrorHandler
Fixes #683 If the error handler handles the error, the offset should be sent to the tx. **cherry-pick to 2.0.x, 1.3.x** * Polish ackOnError Javadoc to reflect transactions # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java
1 parent 0e90111 commit 6689bbb

File tree

3 files changed

+25
-14
lines changed

3 files changed

+25
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -966,12 +966,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
966966
try {
967967
this.errorHandler.handle(e, record);
968968
if (producer != null) {
969-
try {
970-
sendOffsetsToTransaction(producer);
971-
}
972-
catch (Exception e1) {
973-
this.logger.error("Send offsets to transaction failed", e1);
974-
}
969+
ackCurrent(record, producer);
975970
}
976971
}
977972
catch (RuntimeException ee) {

spring-kafka/src/main/java/org/springframework/kafka/listener/config/ContainerProperties.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,13 @@ public void setIdleEventInterval(Long idleEventInterval) {
317317
* offset of the failed message anyway, so this option has limited applicability.
318318
* Perhaps useful for a component that starts throwing exceptions consistently;
319319
* allowing it to resume when restarted from the last successfully processed message.
320+
* <p>
321+
* Does not apply when transactions are used - in that case, whether or not the
322+
* offsets are sent to the transaction depends on whether the transaction is committed
323+
* or rolled back. If a listener throws an exception, the transaction will normally
324+
* be rolled back unless an error handler is provided that handles the error and
325+
* exits normally; in which case the offsets are sent to the transaction and the
326+
* transaction is committed.
320327
* @param ackOnError whether the container should acknowledge messages that throw
321328
* exceptions.
322329
*/

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,7 +63,6 @@
6363
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
6464
import org.springframework.kafka.core.KafkaTemplate;
6565
import org.springframework.kafka.core.ProducerFactory;
66-
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
6766
import org.springframework.kafka.listener.config.ContainerProperties;
6867
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6968
import org.springframework.kafka.test.rule.KafkaEmbedded;
@@ -93,17 +92,22 @@ public class TransactionalContainerTests {
9392
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(3, true, topic1, topic2);
9493

9594
@Test
96-
public void testConsumeAndProduceTransactionBatch() throws Exception {
97-
testConsumeAndProduceTransactionGuts(AckMode.BATCH);
95+
public void testConsumeAndProduceTransactionKTM() throws Exception {
96+
testConsumeAndProduceTransactionGuts(false, false);
9897
}
9998

10099
@Test
101-
public void testConsumeAndProduceTransactionRecord() throws Exception {
102-
testConsumeAndProduceTransactionGuts(AckMode.RECORD);
100+
public void testConsumeAndProduceTransactionKCTM() throws Exception {
101+
testConsumeAndProduceTransactionGuts(true, false);
102+
}
103+
104+
@Test
105+
public void testConsumeAndProduceTransactionHandleError() throws Exception {
106+
testConsumeAndProduceTransactionGuts(false, true);
103107
}
104108

105109
@SuppressWarnings({ "rawtypes", "unchecked" })
106-
private void testConsumeAndProduceTransactionGuts(AckMode ackMode) throws Exception {
110+
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError) throws Exception {
107111
Consumer consumer = mock(Consumer.class);
108112
final TopicPartition topicPartition = new TopicPartition("foo", 0);
109113
willAnswer(i -> {
@@ -141,8 +145,13 @@ private void testConsumeAndProduceTransactionGuts(AckMode ackMode) throws Except
141145
final KafkaTemplate template = new KafkaTemplate(pf);
142146
props.setMessageListener((MessageListener) m -> {
143147
template.send("bar", "baz");
148+
if (handleError) {
149+
throw new RuntimeException("fail");
150+
}
144151
});
145-
props.setAckMode(ackMode);
152+
if (handleError) {
153+
props.setErrorHandler((e, data) -> { });
154+
}
146155
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
147156
container.setBeanName("commit");
148157
container.start();

0 commit comments

Comments
 (0)