Skip to content

Commit 911c118

Browse files
garyrussellartembilan
authored andcommitted
GH-1168: Fix sendOffsets in local transaction
Fixes #1168 `sendOffsetsToTransaction` assumed the transaction was started by a `KafkaTransactionManager` and using `executeInTransaction` failed. Look for a local transactional producer before checking for a thread-bound transaction. **cherry-pick to all supported branches** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java
1 parent e2167ac commit 911c118

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,15 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
343343

344344
@Override
345345
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
346-
@SuppressWarnings("unchecked")
347-
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
348-
.getResource(this.producerFactory);
349-
Assert.isTrue(resourceHolder != null, "No transaction in process");
350-
if (resourceHolder.getProducer() != null) {
351-
resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId);
346+
Producer<K, V> producer = this.producers.get();
347+
if (producer == null) {
348+
@SuppressWarnings("unchecked")
349+
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
350+
.getResource(this.producerFactory);
351+
Assert.isTrue(resourceHolder != null, "No transaction in process");
352+
producer = resourceHolder.getProducer();
352353
}
354+
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
353355
}
354356

355357
protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@
4040
import org.apache.kafka.clients.consumer.ConsumerConfig;
4141
import org.apache.kafka.clients.consumer.ConsumerRecord;
4242
import org.apache.kafka.clients.consumer.ConsumerRecords;
43+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
4344
import org.apache.kafka.clients.producer.Callback;
4445
import org.apache.kafka.clients.producer.MockProducer;
4546
import org.apache.kafka.clients.producer.Producer;
4647
import org.apache.kafka.clients.producer.ProducerConfig;
4748
import org.apache.kafka.clients.producer.ProducerRecord;
49+
import org.apache.kafka.common.TopicPartition;
4850
import org.apache.kafka.common.errors.ProducerFencedException;
4951
import org.apache.kafka.common.serialization.StringDeserializer;
5052
import org.apache.kafka.common.serialization.StringSerializer;
@@ -82,8 +84,11 @@ public class KafkaTemplateTransactionTests {
8284

8385
private static final String STRING_KEY_TOPIC = "stringKeyTopic";
8486

87+
private static final String LOCAL_TX_IN_TOPIC = "localTxInTopic";
88+
8589
@ClassRule
86-
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, STRING_KEY_TOPIC)
90+
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, STRING_KEY_TOPIC,
91+
LOCAL_TX_IN_TOPIC)
8792
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
8893
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");
8994

@@ -100,13 +105,22 @@ public void testLocalTransaction() throws Exception {
100105
template.setDefaultTopic(STRING_KEY_TOPIC);
101106
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testLocalTx", "false", embeddedKafka);
102107
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
108+
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
103109
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
104110
cf.setKeyDeserializer(new StringDeserializer());
105111
Consumer<String, String> consumer = cf.createConsumer();
106-
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, STRING_KEY_TOPIC);
112+
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
113+
template.executeInTransaction(kt -> kt.send(LOCAL_TX_IN_TOPIC, "one"));
114+
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, LOCAL_TX_IN_TOPIC);
107115
template.executeInTransaction(t -> {
108116
t.sendDefault("foo", "bar");
109117
t.sendDefault("baz", "qux");
118+
t.sendOffsetsToTransaction(Collections.singletonMap(
119+
new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()),
120+
new OffsetAndMetadata(singleRecord.offset() + 1L)), "testLocalTx");
121+
assertThat(KafkaTestUtils.getPropertyValue(
122+
KafkaTestUtils.getPropertyValue(template, "producers", ThreadLocal.class).get(),
123+
"delegate.transactionManager.transactionalId")).isEqualTo("my.transaction.0");
110124
return null;
111125
});
112126
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
@@ -119,6 +133,8 @@ public void testLocalTransaction() throws Exception {
119133
}
120134
record = iterator.next();
121135
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("baz"), value("qux")));
136+
// 2 log slots, 1 for the record, 1 for the commit
137+
assertThat(consumer.position(new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()))).isEqualTo(2L);
122138
consumer.close();
123139
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1);
124140
pf.destroy();

0 commit comments

Comments
 (0)