Skip to content

Commit 56f43a8

Browse files
garyrussellartembilan
authored andcommitted
GH-1168: Fix sendOffsets in local transaction
Fixes #1168 `sendOffsetsToTransaction` assumed the transaction was started by a `KafkaTransactionManager` and using `executeInTransaction` failed. Look for a local transactional producer before checking for a thread-bound transaction. **cherry-pick to all supported branches** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java
1 parent 8451480 commit 56f43a8

File tree

2 files changed

+37
-8
lines changed

2 files changed

+37
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,13 +318,15 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
318318

319319
@Override
320320
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) {
321-
@SuppressWarnings("unchecked")
322-
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
323-
.getResource(this.producerFactory);
324-
Assert.isTrue(resourceHolder != null, "No transaction in process");
325-
if (resourceHolder.getProducer() != null) {
326-
resourceHolder.getProducer().sendOffsetsToTransaction(offsets, consumerGroupId);
321+
Producer<K, V> producer = this.producers.get();
322+
if (producer == null) {
323+
@SuppressWarnings("unchecked")
324+
KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager
325+
.getResource(this.producerFactory);
326+
Assert.isTrue(resourceHolder != null, "No transaction in process");
327+
producer = resourceHolder.getProducer();
327328
}
329+
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
328330
}
329331

330332
protected void closeProducer(Producer<K, V> producer, boolean inLocalTx) {

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static org.springframework.kafka.test.assertj.KafkaConditions.value;
3030

3131
import java.util.Collections;
32+
import java.util.HashMap;
3233
import java.util.Iterator;
3334
import java.util.Map;
3435
import java.util.concurrent.BlockingQueue;
@@ -38,11 +39,13 @@
3839
import org.apache.kafka.clients.consumer.ConsumerConfig;
3940
import org.apache.kafka.clients.consumer.ConsumerRecord;
4041
import org.apache.kafka.clients.consumer.ConsumerRecords;
42+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
4143
import org.apache.kafka.clients.producer.Callback;
4244
import org.apache.kafka.clients.producer.MockProducer;
4345
import org.apache.kafka.clients.producer.Producer;
4446
import org.apache.kafka.clients.producer.ProducerConfig;
4547
import org.apache.kafka.clients.producer.ProducerRecord;
48+
import org.apache.kafka.common.TopicPartition;
4649
import org.apache.kafka.common.serialization.StringDeserializer;
4750
import org.apache.kafka.common.serialization.StringSerializer;
4851
import org.assertj.core.api.Assertions;
@@ -64,6 +67,8 @@
6467
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
6568
import org.springframework.transaction.support.TransactionTemplate;
6669

70+
import kafka.server.KafkaConfig;
71+
6772
/**
6873
* @author Gary Russell
6974
* @since 1.3
@@ -73,8 +78,19 @@ public class KafkaTemplateTransactionTests {
7378

7479
private static final String STRING_KEY_TOPIC = "stringKeyTopic";
7580

81+
private static final String LOCAL_TX_IN_TOPIC = "localTxInTopic";
82+
83+
private static final Map<String, String> BROKER_PROPERTIES = new HashMap<>();
84+
85+
static {
86+
BROKER_PROPERTIES.put(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1");
87+
BROKER_PROPERTIES.put(KafkaConfig.TransactionsTopicMinISRProp(), "1");
88+
}
89+
7690
@ClassRule
77-
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(3, true, STRING_KEY_TOPIC);
91+
public static KafkaEmbedded embeddedKafka =
92+
new KafkaEmbedded(1, true, STRING_KEY_TOPIC, LOCAL_TX_IN_TOPIC)
93+
.brokerProperties(BROKER_PROPERTIES);
7894

7995
@Test
8096
public void testLocalTransaction() throws Exception {
@@ -87,13 +103,22 @@ public void testLocalTransaction() throws Exception {
87103
template.setDefaultTopic(STRING_KEY_TOPIC);
88104
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testTxString", "false", embeddedKafka);
89105
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
106+
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
90107
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
91108
cf.setKeyDeserializer(new StringDeserializer());
92109
Consumer<String, String> consumer = cf.createConsumer();
93-
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, STRING_KEY_TOPIC);
110+
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
111+
template.executeInTransaction(kt -> kt.send(LOCAL_TX_IN_TOPIC, "one"));
112+
ConsumerRecord<String, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, LOCAL_TX_IN_TOPIC);
94113
template.executeInTransaction(t -> {
95114
t.sendDefault("foo", "bar");
96115
t.sendDefault("baz", "qux");
116+
t.sendOffsetsToTransaction(Collections.singletonMap(
117+
new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()),
118+
new OffsetAndMetadata(singleRecord.offset() + 1L)), "testLocalTx");
119+
assertThat(KafkaTestUtils.getPropertyValue(
120+
KafkaTestUtils.getPropertyValue(template, "producers", ThreadLocal.class).get(),
121+
"delegate.transactionManager.transactionalId")).isEqualTo("my.transaction.0");
97122
return null;
98123
});
99124
ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
@@ -106,6 +131,8 @@ public void testLocalTransaction() throws Exception {
106131
}
107132
record = iterator.next();
108133
assertThat(record).has(Assertions.<ConsumerRecord<String, String>>allOf(key("baz"), value("qux")));
134+
// 2 log slots, 1 for the record, 1 for the commit
135+
assertThat(consumer.position(new TopicPartition(LOCAL_TX_IN_TOPIC, singleRecord.partition()))).isEqualTo(2L);
109136
consumer.close();
110137
assertThat(KafkaTestUtils.getPropertyValue(pf, "cache", BlockingQueue.class).size()).isEqualTo(1);
111138
pf.destroy();

0 commit comments

Comments
 (0)