Skip to content

Commit 1b7d622

Browse files
garyrussellartembilan
authored andcommitted
GH-3600: Fix Kafka Tx Synchronization
Resolves #3600 Previously, the `KafkaProducerMessageHandler` did not synchronize a transaction with some other transaction (e.g. JDBC); it published in a local transaction instead. Also configure the Gradle Kotlin Daemon to work with JDK 16. **cherry-pick to 5.4.x**
1 parent 1951d52 commit 1b7d622

File tree

3 files changed

+62
-7
lines changed

3 files changed

+62
-7
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
version=5.5.3-SNAPSHOT
2-
org.gradle.jvmargs=-Xms512m -Xmx4g -XX:MaxPermSize=1024m -XX:MaxMetaspaceSize=1g -Dkotlin.daemon.jvm.options="-Xmx1g" -Dfile.encoding=UTF-8
2+
org.gradle.jvmargs=-Xms512m -Xmx4g -XX:MaxPermSize=1024m -XX:MaxMetaspaceSize=1g -Dkotlin.daemon.jvm.options="-Xmx1g --illegal-access=permit" -Dfile.encoding=UTF-8
33
org.gradle.caching=true
44
org.gradle.parallel=true
55
kotlin.stdlib.default.dependency=false

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2020 the original author or authors.
2+
* Copyright 2013-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,7 +67,6 @@
6767
import org.springframework.messaging.MessageChannel;
6868
import org.springframework.messaging.MessageHandlingException;
6969
import org.springframework.messaging.MessageHeaders;
70-
import org.springframework.transaction.support.TransactionSynchronizationManager;
7170
import org.springframework.util.Assert;
7271
import org.springframework.util.StringUtils;
7372
import org.springframework.util.concurrent.ListenableFuture;
@@ -491,9 +490,7 @@ protected Object handleRequestMessage(final Message<?> message) {
491490
sendFuture = gatewayFuture.getSendFuture();
492491
}
493492
else {
494-
if (this.transactional
495-
&& TransactionSynchronizationManager.getResource(this.kafkaTemplate.getProducerFactory()) == null
496-
&& !this.allowNonTransactional) {
493+
if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) {
497494
sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord));
498495
}
499496
else {

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -100,6 +100,11 @@
100100
import org.springframework.messaging.support.ErrorMessage;
101101
import org.springframework.messaging.support.GenericMessage;
102102
import org.springframework.transaction.PlatformTransactionManager;
103+
import org.springframework.transaction.TransactionDefinition;
104+
import org.springframework.transaction.TransactionException;
105+
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
106+
import org.springframework.transaction.support.DefaultTransactionStatus;
107+
import org.springframework.transaction.support.TransactionTemplate;
103108
import org.springframework.util.concurrent.ListenableFuture;
104109
import org.springframework.util.concurrent.SettableListenableFuture;
105110

@@ -598,6 +603,37 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
598603
assertThat(txId.get()).isEqualTo("overridden.tx.id.");
599604
}
600605

606+
@SuppressWarnings({ "rawtypes", "unchecked" })
607+
@Test
608+
void testTransactionSynch() {
609+
Producer producer = mock(Producer.class);
610+
ProducerFactory pf = mock(ProducerFactory.class);
611+
given(pf.transactionCapable()).willReturn(true);
612+
given(pf.createProducer(isNull())).willReturn(producer);
613+
ListenableFuture future = mock(ListenableFuture.class);
614+
willReturn(future).given(producer).send(any(ProducerRecord.class), any(Callback.class));
615+
KafkaTemplate template = new KafkaTemplate(pf);
616+
KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template);
617+
handler.setTopicExpression(new LiteralExpression("bar"));
618+
handler.setBeanFactory(mock(BeanFactory.class));
619+
handler.afterPropertiesSet();
620+
handler.start();
621+
try {
622+
new TransactionTemplate(new SomeOtherTransactionManager()).executeWithoutResult(status -> {
623+
handler.handleMessage(new GenericMessage<>("foo"));
624+
throw new IllegalStateException("test");
625+
});
626+
}
627+
catch (IllegalStateException ex) {
628+
}
629+
handler.stop();
630+
verify(producer).beginTransaction();
631+
verify(producer).send(any(ProducerRecord.class), any(Callback.class));
632+
verify(producer).abortTransaction();
633+
verify(producer).close(any());
634+
verifyNoMoreInteractions(producer);
635+
}
636+
601637
@SuppressWarnings({ "unchecked", "rawtypes" })
602638
@Test
603639
void testConsumeAndProduceTransactionTxIdOverride() throws Exception {
@@ -747,4 +783,26 @@ void testNoFlush() {
747783
handler.stop();
748784
}
749785

786+
@SuppressWarnings("serial")
787+
static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager {
788+
789+
@Override
790+
protected Object doGetTransaction() throws TransactionException {
791+
return new Object();
792+
}
793+
794+
@Override
795+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
796+
}
797+
798+
@Override
799+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
800+
}
801+
802+
@Override
803+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
804+
}
805+
806+
}
807+
750808
}

0 commit comments

Comments
 (0)