Skip to content

Commit 3d2f1ed

Browse files
garyrussellartembilan
authored andcommitted
GH-859: Fix nested transactions
Resolves #859 When using `executeInTransaction` on a transactional container thread, we cannot use the existing transaction - clear the TL to allow a new Producer to be allocated. Invalid state transition (in-trans to in-trans). **cherry-pick to 2.1.x, 2.0.x, backport needed for 1.3.x** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java
1 parent e895eb2 commit 3d2f1ed

File tree

4 files changed

+88
-1
lines changed

4 files changed

+88
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,16 @@ public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition
167167
this.producerPerConsumerPartition = producerPerConsumerPartition;
168168
}
169169

170+
/**
171+
* Return the producerPerConsumerPartition.
172+
* @return the producerPerConsumerPartition.
173+
* @since 1.3.8
174+
*/
175+
@Override
176+
public boolean isProducerPerConsumerPartition() {
177+
return this.producerPerConsumerPartition;
178+
}
179+
170180
/**
171181
* Return an unmodifiable reference to the configuration map for this factory.
172182
* Useful for cloning to make a similar factory.
@@ -259,7 +269,7 @@ protected Producer<K, V> createKafkaProducer() {
259269
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
260270
}
261271

262-
private Producer<K, V> createTransactionalProducerForPartition() {
272+
Producer<K, V> createTransactionalProducerForPartition() {
263273
String suffix = TransactionSupport.getTransactionIdSuffix();
264274
if (suffix == null) {
265275
return createTransactionalProducer();

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.kafka.support.LoggingProducerListener;
3636
import org.springframework.kafka.support.ProducerListener;
3737
import org.springframework.kafka.support.SendResult;
38+
import org.springframework.kafka.support.TransactionSupport;
3839
import org.springframework.kafka.support.converter.MessageConverter;
3940
import org.springframework.kafka.support.converter.MessagingMessageConverter;
4041
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -259,6 +260,15 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
259260
Assert.state(this.transactional, "Producer factory does not support transactions");
260261
Producer<K, V> producer = this.producers.get();
261262
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
263+
String transactionIdSuffix;
264+
if (this.producerFactory.isProducerPerConsumerPartition()) {
265+
transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();
266+
TransactionSupport.clearTransactionIdSuffix();
267+
}
268+
else {
269+
transactionIdSuffix = null;
270+
}
271+
262272
producer = this.producerFactory.createProducer();
263273

264274
try {
@@ -288,6 +298,9 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
288298
throw e;
289299
}
290300
finally {
301+
if (transactionIdSuffix != null) {
302+
TransactionSupport.setTransactionIdSuffix(transactionIdSuffix);
303+
}
291304
this.producers.remove();
292305
closeProducer(producer, false);
293306
}

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,13 @@ default void closeProducerFor(String transactionIdSuffix) {
5151
// NOSONAR
5252
}
5353

54+
/**
55+
* Return the producerPerConsumerPartition.
56+
* @return the producerPerConsumerPartition.
57+
* @since 1.3.8
58+
*/
59+
default boolean isProducerPerConsumerPartition() {
60+
return false;
61+
}
62+
5463
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
2929
import static org.springframework.kafka.test.assertj.KafkaConditions.value;
3030

31+
import java.util.Collections;
3132
import java.util.Iterator;
3233
import java.util.Map;
3334
import java.util.concurrent.BlockingQueue;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3436

3537
import org.apache.kafka.clients.consumer.Consumer;
3638
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -51,6 +53,7 @@
5153
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5254
import org.springframework.context.annotation.Bean;
5355
import org.springframework.context.annotation.Configuration;
56+
import org.springframework.kafka.support.TransactionSupport;
5457
import org.springframework.kafka.support.transaction.ResourcelessTransactionManager;
5558
import org.springframework.kafka.test.rule.KafkaEmbedded;
5659
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -263,6 +266,58 @@ public void testTransactionSynchronizationExceptionOnCommit() {
263266
assertThat(producer.closed()).isTrue();
264267
}
265268

269+
@Test
270+
public void testExcecuteInTransactionNewInnerTx() {
271+
@SuppressWarnings("unchecked")
272+
Producer<Object, Object> producer1 = mock(Producer.class);
273+
@SuppressWarnings("unchecked")
274+
Producer<Object, Object> producer2 = mock(Producer.class);
275+
producer1.initTransactions();
276+
AtomicBoolean first = new AtomicBoolean(true);
277+
278+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<Object, Object>(
279+
Collections.emptyMap()) {
280+
281+
@Override
282+
protected Producer<Object, Object> createTransactionalProducer() {
283+
return first.getAndSet(false) ? producer1 : producer2;
284+
}
285+
286+
@Override
287+
Producer<Object, Object> createTransactionalProducerForPartition() {
288+
return createTransactionalProducer();
289+
}
290+
291+
};
292+
pf.setTransactionIdPrefix("tx.");
293+
294+
KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
295+
template.setDefaultTopic(STRING_KEY_TOPIC);
296+
297+
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
298+
299+
try {
300+
TransactionSupport.setTransactionIdSuffix("testExcecuteInTransactionNewInnerTx");
301+
new TransactionTemplate(tm).execute(s -> {
302+
return template.executeInTransaction(t -> {
303+
template.sendDefault("foo", "bar");
304+
return null;
305+
});
306+
});
307+
308+
InOrder inOrder = inOrder(producer1, producer2);
309+
inOrder.verify(producer1).beginTransaction();
310+
inOrder.verify(producer2).beginTransaction();
311+
inOrder.verify(producer2).commitTransaction();
312+
inOrder.verify(producer2).close();
313+
inOrder.verify(producer1).commitTransaction();
314+
inOrder.verify(producer1).close();
315+
}
316+
finally {
317+
TransactionSupport.clearTransactionIdSuffix();
318+
}
319+
}
320+
266321
@Configuration
267322
@EnableTransactionManagement
268323
public static class DeclarativeConfig {

0 commit comments

Comments
 (0)