Skip to content

Commit a50a2a9

Browse files
garyrussellartembilan
authored andcommitted
GH-859: Fix nested transactions
Resolves #859 When using `executeInTransaction` on a transactional container thread, we cannot use the existing transaction - clear the TL to allow a new Producer to be allocated. Invalid state transition (in-trans to in-trans). **cherry-pick to 2.1.x, 2.0.x, backport needed for 1.3.x**
1 parent 9c9a200 commit a50a2a9

File tree

3 files changed

+87
-1
lines changed

3 files changed

+87
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,15 @@ public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition
153153
this.producerPerConsumerPartition = producerPerConsumerPartition;
154154
}
155155

156+
/**
157+
* Return the producerPerConsumerPartition.
158+
* @return the producerPerConsumerPartition.
159+
* @since 1.3.8
160+
*/
161+
public boolean isProducerPerConsumerPartition() {
162+
return this.producerPerConsumerPartition;
163+
}
164+
156165
/**
157166
* Return an unmodifiable reference to the configuration map for this factory.
158167
* Useful for cloning to make a similar factory.
@@ -247,7 +256,7 @@ protected Producer<K, V> createKafkaProducer() {
247256
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
248257
}
249258

250-
private Producer<K, V> createTransactionalProducerForPartition() {
259+
Producer<K, V> createTransactionalProducerForPartition() {
251260
String suffix = TransactionSupport.getTransactionIdSuffix();
252261
if (suffix == null) {
253262
return createTransactionalProducer();

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.kafka.support.LoggingProducerListener;
3535
import org.springframework.kafka.support.ProducerListener;
3636
import org.springframework.kafka.support.SendResult;
37+
import org.springframework.kafka.support.TransactionSupport;
3738
import org.springframework.kafka.support.converter.MessageConverter;
3839
import org.springframework.kafka.support.converter.MessagingMessageConverter;
3940
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -62,6 +63,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V> {
6263

6364
private final ProducerFactory<K, V> producerFactory;
6465

66+
private final DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory;
67+
6568
private final boolean autoFlush;
6669

6770
private final boolean transactional;
@@ -94,6 +97,12 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
9497
this.producerFactory = producerFactory;
9598
this.autoFlush = autoFlush;
9699
this.transactional = producerFactory.transactionCapable();
100+
if (producerFactory instanceof DefaultKafkaProducerFactory) {
101+
this.defaultKafkaProducerFactory = (DefaultKafkaProducerFactory<K, V>) producerFactory;
102+
}
103+
else {
104+
this.defaultKafkaProducerFactory = null;
105+
}
97106
}
98107

99108
/**
@@ -236,6 +245,16 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
236245
Assert.state(this.transactional, "Producer factory does not support transactions");
237246
Producer<K, V> producer = this.producers.get();
238247
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
248+
String transactionIdSuffix;
249+
if (this.defaultKafkaProducerFactory != null
250+
&& this.defaultKafkaProducerFactory.isProducerPerConsumerPartition()) {
251+
transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();
252+
TransactionSupport.clearTransactionIdSuffix();
253+
}
254+
else {
255+
transactionIdSuffix = null;
256+
}
257+
239258
producer = this.producerFactory.createProducer();
240259

241260
try {
@@ -265,6 +284,9 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
265284
producer.commitTransaction();
266285
}
267286
finally {
287+
if (transactionIdSuffix != null) {
288+
TransactionSupport.setTransactionIdSuffix(transactionIdSuffix);
289+
}
268290
this.producers.remove();
269291
closeProducer(producer, false);
270292
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
2929
import static org.springframework.kafka.test.assertj.KafkaConditions.value;
3030

31+
import java.util.Collections;
3132
import java.util.Iterator;
3233
import java.util.Map;
3334
import java.util.concurrent.BlockingQueue;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3436

3537
import org.apache.kafka.clients.consumer.Consumer;
3638
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -51,6 +53,7 @@
5153
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5254
import org.springframework.context.annotation.Bean;
5355
import org.springframework.context.annotation.Configuration;
56+
import org.springframework.kafka.support.TransactionSupport;
5457
import org.springframework.kafka.support.transaction.ResourcelessTransactionManager;
5558
import org.springframework.kafka.test.rule.KafkaEmbedded;
5659
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -234,6 +237,58 @@ public void testTransactionSynchronizationExceptionOnCommit() {
234237
assertThat(producer.closed()).isTrue();
235238
}
236239

240+
@Test
241+
public void testExcecuteInTransactionNewInnerTx() {
242+
@SuppressWarnings("unchecked")
243+
Producer<Object, Object> producer1 = mock(Producer.class);
244+
@SuppressWarnings("unchecked")
245+
Producer<Object, Object> producer2 = mock(Producer.class);
246+
producer1.initTransactions();
247+
AtomicBoolean first = new AtomicBoolean(true);
248+
249+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<Object, Object>(
250+
Collections.emptyMap()) {
251+
252+
@Override
253+
protected Producer<Object, Object> createTransactionalProducer() {
254+
return first.getAndSet(false) ? producer1 : producer2;
255+
}
256+
257+
@Override
258+
Producer<Object, Object> createTransactionalProducerForPartition() {
259+
return createTransactionalProducer();
260+
}
261+
262+
};
263+
pf.setTransactionIdPrefix("tx.");
264+
265+
KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
266+
template.setDefaultTopic(STRING_KEY_TOPIC);
267+
268+
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
269+
270+
try {
271+
TransactionSupport.setTransactionIdSuffix("testExcecuteInTransactionNewInnerTx");
272+
new TransactionTemplate(tm).execute(s -> {
273+
return template.executeInTransaction(t -> {
274+
template.sendDefault("foo", "bar");
275+
return null;
276+
});
277+
});
278+
279+
InOrder inOrder = inOrder(producer1, producer2);
280+
inOrder.verify(producer1).beginTransaction();
281+
inOrder.verify(producer2).beginTransaction();
282+
inOrder.verify(producer2).commitTransaction();
283+
inOrder.verify(producer2).close();
284+
inOrder.verify(producer1).commitTransaction();
285+
inOrder.verify(producer1).close();
286+
}
287+
finally {
288+
TransactionSupport.clearTransactionIdSuffix();
289+
}
290+
}
291+
237292
@Configuration
238293
@EnableTransactionManagement
239294
public static class DeclarativeConfig {

0 commit comments

Comments
 (0)