Skip to content

Commit bb1c279

Browse files
committed
Fix manual acks with transactions
- send the offset to the transaction instead of committing via the consumer.
1 parent fc949fe commit bb1c279

File tree

2 files changed

+54
-14
lines changed

2 files changed

+54
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -598,6 +598,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
598598

599599
private ConsumerRecords<K, V> lastBatch;
600600

601+
private Producer<?, ?> producer;
602+
601603
private volatile boolean consumerPaused;
602604

603605
private volatile Collection<TopicPartition> assignedPartitions;
@@ -988,7 +990,7 @@ protected void pollAndInvoke() {
988990
this.lastPoll = System.currentTimeMillis();
989991
this.polling.set(true);
990992
ConsumerRecords<K, V> records = doPoll();
991-
if (!this.polling.compareAndSet(true, false)) {
993+
if (!this.polling.compareAndSet(true, false) && records != null) {
992994
/*
993995
* There is a small race condition where wakeIfNecessary was called between
994996
* exiting the poll and before we reset the boolean.
@@ -1241,7 +1243,10 @@ private void ackImmediate(ConsumerRecord<K, V> record) {
12411243
new TopicPartition(record.topic(), record.partition()),
12421244
new OffsetAndMetadata(record.offset() + 1));
12431245
this.commitLogger.log(() -> "Committing: " + commits);
1244-
if (this.syncCommits) {
1246+
if (this.producer != null) {
1247+
this.producer.sendOffsetsToTransaction(commits, this.consumerGroupId);
1248+
}
1249+
else if (this.syncCommits) {
12451250
this.consumer.commitSync(commits, this.syncCommitTimeout);
12461251
}
12471252
else {
@@ -1291,6 +1296,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
12911296
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
12921297
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
12931298
.getProducer(); // NOSONAR nullable
1299+
ListenerConsumer.this.producer = producer;
12941300
}
12951301
RuntimeException aborted = doInvokeBatchListener(records, recordList, producer);
12961302
if (aborted != null) {
@@ -1541,6 +1547,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
15411547
producer = ((KafkaResourceHolder) TransactionSynchronizationManager
15421548
.getResource(ListenerConsumer.this.kafkaTxManager.getProducerFactory()))
15431549
.getProducer(); // NOSONAR
1550+
ListenerConsumer.this.producer = producer;
15441551
}
15451552
RuntimeException aborted = doInvokeRecordListener(record, producer, iterator);
15461553
if (aborted != null) {
@@ -1714,7 +1721,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record,
17141721
checkDeser(record, ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER);
17151722
}
17161723
doInvokeOnMessage(record);
1717-
if (this.nackSleep < 0) {
1724+
if (this.nackSleep < 0 && !this.isManualImmediateAck) {
17181725
ackCurrent(record, producer);
17191726
}
17201727
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,6 +77,8 @@
7777
import org.springframework.kafka.core.ProducerFactory;
7878
import org.springframework.kafka.core.ProducerFactoryUtils;
7979
import org.springframework.kafka.event.ConsumerStoppedEvent;
80+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
81+
import org.springframework.kafka.support.Acknowledgment;
8082
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
8183
import org.springframework.kafka.support.KafkaHeaders;
8284
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -128,21 +130,28 @@ public static void setup() {
128130

129131
@Test
130132
public void testConsumeAndProduceTransactionKTM() throws Exception {
131-
testConsumeAndProduceTransactionGuts(false, false);
133+
testConsumeAndProduceTransactionGuts(false, false, AckMode.RECORD);
132134
}
133135

134136
@Test
135137
public void testConsumeAndProduceTransactionKCTM() throws Exception {
136-
testConsumeAndProduceTransactionGuts(true, false);
138+
testConsumeAndProduceTransactionGuts(true, false, AckMode.RECORD);
137139
}
138140

139141
@Test
140142
public void testConsumeAndProduceTransactionHandleError() throws Exception {
141-
testConsumeAndProduceTransactionGuts(false, true);
143+
testConsumeAndProduceTransactionGuts(false, true, AckMode.RECORD);
144+
}
145+
146+
@Test
147+
public void testConsumeAndProduceTransactionKTMManual() throws Exception {
148+
testConsumeAndProduceTransactionGuts(false, false, AckMode.MANUAL_IMMEDIATE);
142149
}
143150

144151
@SuppressWarnings({ "rawtypes", "unchecked" })
145-
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError) throws Exception {
152+
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError, AckMode ackMode)
153+
throws Exception {
154+
146155
Consumer consumer = mock(Consumer.class);
147156
final TopicPartition topicPartition = new TopicPartition("foo", 0);
148157
willAnswer(i -> {
@@ -152,14 +161,15 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
152161
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
153162
ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition,
154163
Collections.singletonList(new ConsumerRecord<>("foo", 0, 0, "key", "value"))));
164+
ConsumerRecords empty = new ConsumerRecords(Collections.emptyMap());
155165
final AtomicBoolean done = new AtomicBoolean();
156166
willAnswer(i -> {
157167
if (done.compareAndSet(false, true)) {
158168
return records;
159169
}
160170
else {
161171
Thread.sleep(500);
162-
return null;
172+
return empty;
163173
}
164174
}).given(consumer).poll(any(Duration.class));
165175
ConsumerFactory cf = mock(ConsumerFactory.class);
@@ -183,15 +193,38 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
183193
ptm = new ChainedKafkaTransactionManager(new SomeOtherTransactionManager(), tm);
184194
}
185195
ContainerProperties props = new ContainerProperties("foo");
196+
props.setAckMode(ackMode);
186197
props.setGroupId("group");
187198
props.setTransactionManager(ptm);
188199
final KafkaTemplate template = new KafkaTemplate(pf);
189-
props.setMessageListener((MessageListener) m -> {
190-
template.send("bar", "baz");
191-
if (handleError) {
192-
throw new RuntimeException("fail");
200+
if (AckMode.MANUAL_IMMEDIATE.equals(ackMode)) {
201+
class AckListener implements AcknowledgingMessageListener {
202+
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381
203+
204+
@Override
205+
public void onMessage(ConsumerRecord data, Acknowledgment acknowledgment) {
206+
template.send("bar", "baz");
207+
if (handleError) {
208+
throw new RuntimeException("fail");
209+
}
210+
acknowledgment.acknowledge();
211+
}
212+
213+
@Override
214+
public void onMessage(Object data) {
215+
}
216+
193217
}
194-
});
218+
props.setMessageListener(new AckListener());
219+
}
220+
else {
221+
props.setMessageListener((MessageListener) m -> {
222+
template.send("bar", "baz");
223+
if (handleError) {
224+
throw new RuntimeException("fail");
225+
}
226+
});
227+
}
195228
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, props);
196229
container.setBeanName("commit");
197230
if (handleError) {

0 commit comments

Comments
 (0)