Skip to content

Commit 3da65a8

Browse files
garyrussellartembilan
authored andcommitted
GH-2001: Propagate Exception from TX Sync Commit
Resolves #2001 Previously, synchronized transaction commits (for producer initiated transactions) were not propagated to the caller. **cherry-pick to all supported branches** * Fix typo in doc
1 parent 1361a36 commit 3da65a8

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3465,6 +3465,10 @@ If you wish the commits to be performed in the reverse order (Kafka first), use
34653465

34663466
See <<ex-jdbc-sync>> for examples of an application that synchronizes JDBC and Kafka transactions in Kafka-first or DB-first configurations.
34673467

3468+
NOTE: Starting with versions 2.5.17, 2.6.12, 2.7.9 and 2.8.0, if the commit fails on the synchronized transaction (after the primary transaction has committed), the exception will be thrown to the caller.
3469+
Previously, this was silently ignored (logged at debug).
3470+
Applications should take remedial action, if necessary, to compensate for the committed primary transaction.
3471+
34683472
[[container-transaction-manager]]
34693473
===== Using Consumer-Initiated Transactions
34703474

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -145,13 +145,15 @@ protected boolean shouldReleaseBeforeCompletion() {
145145
return false;
146146
}
147147

148+
@Override
149+
protected void processResourceAfterCommit(KafkaResourceHolder<K, V> resourceHolder) {
150+
resourceHolder.commit();
151+
}
152+
148153
@Override
149154
public void afterCompletion(int status) {
150155
try {
151-
if (status == TransactionSynchronization.STATUS_COMMITTED) {
152-
this.resourceHolder.commit();
153-
}
154-
else {
156+
if (status != TransactionSynchronization.STATUS_COMMITTED) {
155157
this.resourceHolder.rollback();
156158
}
157159
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,13 @@
7272
import org.springframework.kafka.test.context.EmbeddedKafka;
7373
import org.springframework.kafka.test.utils.KafkaTestUtils;
7474
import org.springframework.kafka.transaction.KafkaTransactionManager;
75+
import org.springframework.transaction.TransactionDefinition;
76+
import org.springframework.transaction.TransactionException;
7577
import org.springframework.transaction.annotation.EnableTransactionManagement;
7678
import org.springframework.transaction.annotation.Propagation;
7779
import org.springframework.transaction.annotation.Transactional;
7880
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
81+
import org.springframework.transaction.support.DefaultTransactionStatus;
7982
import org.springframework.transaction.support.TransactionTemplate;
8083
import org.springframework.util.concurrent.SettableListenableFuture;
8184

@@ -295,14 +298,15 @@ public void testTransactionSynchronizationExceptionOnCommit() {
295298

296299
ResourcelessTransactionManager tm = new ResourcelessTransactionManager();
297300

298-
new TransactionTemplate(tm)
299-
.execute(s -> {
300-
template.sendDefault("foo", "bar");
301+
assertThatExceptionOfType(ProducerFencedException.class).isThrownBy(() ->
302+
new TransactionTemplate(tm)
303+
.execute(s -> {
304+
template.sendDefault("foo", "bar");
301305

302-
// Mark the mock producer as fenced so it throws when committing the transaction
303-
producer.fenceProducer();
304-
return null;
305-
});
306+
// Mark the mock producer as fenced so it throws when committing the transaction
307+
producer.fenceProducer();
308+
return null;
309+
}));
306310

307311
assertThat(producer.transactionCommitted()).isFalse();
308312
assertThat(producer.closed()).isTrue();
@@ -573,6 +577,28 @@ void testNonTxWithTx() {
573577
pf.destroy();
574578
}
575579

580+
@Test
581+
void syncCommitFails() {
582+
DummyTM tm = new DummyTM();
583+
MockProducer<String, String> producer =
584+
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
585+
producer.initTransactions();
586+
producer.commitTransactionException = new IllegalStateException();
587+
588+
@SuppressWarnings("unchecked")
589+
ProducerFactory<String, String> pf = mock(ProducerFactory.class);
590+
given(pf.transactionCapable()).willReturn(true);
591+
given(pf.createProducer(isNull())).willReturn(producer);
592+
593+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
594+
template.setDefaultTopic(STRING_KEY_TOPIC);
595+
596+
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
597+
new TransactionTemplate(tm).execute(status -> template.sendDefault("foo")));
598+
599+
assertThat(tm.committed).isTrue();
600+
}
601+
576602
@Configuration
577603
@EnableTransactionManagement
578604
public static class DeclarativeConfig {
@@ -680,4 +706,29 @@ public void anotherTxMethod() {
680706

681707
}
682708

709+
@SuppressWarnings("serial")
710+
private static final class DummyTM extends AbstractPlatformTransactionManager {
711+
712+
boolean committed;
713+
714+
@Override
715+
protected Object doGetTransaction() throws TransactionException {
716+
return new Object();
717+
}
718+
719+
@Override
720+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
721+
}
722+
723+
@Override
724+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
725+
this.committed = true;
726+
}
727+
728+
@Override
729+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
730+
}
731+
732+
}
733+
683734
}

0 commit comments

Comments
 (0)