11/*
2- * Copyright 2017-2020 the original author or authors.
2+ * Copyright 2017-2025 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1818
1919import java .time .Duration ;
2020
21+ import org .jspecify .annotations .Nullable ;
22+
2123import org .springframework .kafka .core .KafkaResourceHolder ;
2224import org .springframework .kafka .core .ProducerFactory ;
2325import org .springframework .kafka .core .ProducerFactoryUtils ;
4547 * <p>
4648 * Application code is required to retrieve the transactional Kafka resources via
4749 * {@link ProducerFactoryUtils#getTransactionalResourceHolder(ProducerFactory, String, java.time.Duration)}.
48- * Spring's {@link org.springframework.kafka.core.KafkaTemplate KafkaTemplate} will auto
49- * detect a thread-bound Producer and automatically participate in it.
50+ * Spring's {@link org.springframework.kafka.core.KafkaTemplate KafkaTemplate} will auto-detect
51+ * a thread-bound Producer and automatically participate in it.
5052 *
5153 * <p>
5254 * <b>The use of {@link org.springframework.kafka.core.DefaultKafkaProducerFactory
6365 * @param <V> the value type.
6466 *
6567 * @author Gary Russell
68+ * @author Soby Chacko
6669 */
6770@ SuppressWarnings ("serial" )
6871public class KafkaTransactionManager <K , V > extends AbstractPlatformTransactionManager
@@ -72,7 +75,7 @@ public class KafkaTransactionManager<K, V> extends AbstractPlatformTransactionMa
7275
7376 private final ProducerFactory <K , V > producerFactory ;
7477
75- private String transactionIdPrefix ;
78+ private @ Nullable String transactionIdPrefix ;
7679
7780 private Duration closeTimeout = ProducerFactoryUtils .DEFAULT_CLOSE_TIMEOUT ;
7881
@@ -121,7 +124,7 @@ public void setCloseTimeout(Duration closeTimeout) {
121124 @ SuppressWarnings (UNCHECKED )
122125 @ Override
123126 protected Object doGetTransaction () {
124- KafkaTransactionObject <K , V > txObject = new KafkaTransactionObject <K , V >();
127+ KafkaTransactionObject <K , V > txObject = new KafkaTransactionObject <>();
125128 txObject .setResourceHolder ((KafkaResourceHolder <K , V >) TransactionSynchronizationManager
126129 .getResource (getProducerFactory ()));
127130 return txObject ;
@@ -149,10 +152,10 @@ protected void doBegin(Object transaction, TransactionDefinition definition) {
149152 logger .debug ("Created Kafka transaction on producer [" + resourceHolder .getProducer () + "]" );
150153 }
151154 txObject .setResourceHolder (resourceHolder );
152- txObject . getResourceHolder () .setSynchronizedWithTransaction (true );
155+ resourceHolder .setSynchronizedWithTransaction (true );
153156 int timeout = determineTimeout (definition );
154157 if (timeout != TransactionDefinition .TIMEOUT_DEFAULT ) {
155- txObject . getResourceHolder () .setTimeoutInSeconds (timeout );
158+ resourceHolder .setTimeoutInSeconds (timeout );
156159 }
157160 }
158161 catch (Exception ex ) {
@@ -172,9 +175,13 @@ protected Object doSuspend(Object transaction) {
172175 }
173176
174177 @ Override
175- protected void doResume ( Object transaction , Object suspendedResources ) {
176- @ SuppressWarnings ( UNCHECKED )
178+ @ SuppressWarnings ( UNCHECKED )
179+ protected void doResume ( @ Nullable Object transaction , Object suspendedResources ) {
177180 KafkaResourceHolder <K , V > producerHolder = (KafkaResourceHolder <K , V >) suspendedResources ;
181+ if (transaction != null ) {
182+ KafkaTransactionObject <K , V > txObject = (KafkaTransactionObject <K , V >) transaction ;
183+ txObject .setResourceHolder (producerHolder );
184+ }
178185 TransactionSynchronizationManager .bindResource (getProducerFactory (), producerHolder );
179186 }
180187
@@ -183,31 +190,41 @@ protected void doCommit(DefaultTransactionStatus status) {
183190 @ SuppressWarnings (UNCHECKED )
184191 KafkaTransactionObject <K , V > txObject = (KafkaTransactionObject <K , V >) status .getTransaction ();
185192 KafkaResourceHolder <K , V > resourceHolder = txObject .getResourceHolder ();
186- resourceHolder .commit ();
193+ if (resourceHolder != null ) {
194+ resourceHolder .commit ();
195+ }
187196 }
188197
189198 @ Override
190199 protected void doRollback (DefaultTransactionStatus status ) {
191200 @ SuppressWarnings (UNCHECKED )
192201 KafkaTransactionObject <K , V > txObject = (KafkaTransactionObject <K , V >) status .getTransaction ();
193202 KafkaResourceHolder <K , V > resourceHolder = txObject .getResourceHolder ();
194- resourceHolder .rollback ();
203+ if (resourceHolder != null ) {
204+ resourceHolder .rollback ();
205+ }
195206 }
196207
197208 @ Override
198209 protected void doSetRollbackOnly (DefaultTransactionStatus status ) {
199210 @ SuppressWarnings (UNCHECKED )
200211 KafkaTransactionObject <K , V > txObject = (KafkaTransactionObject <K , V >) status .getTransaction ();
201- txObject .getResourceHolder ().setRollbackOnly ();
212+ KafkaResourceHolder <K , V > kafkaResourceHolder = txObject .getResourceHolder ();
213+ if (kafkaResourceHolder != null ) {
214+ kafkaResourceHolder .setRollbackOnly ();
215+ }
202216 }
203217
204218 @ Override
205219 protected void doCleanupAfterCompletion (Object transaction ) {
206220 @ SuppressWarnings (UNCHECKED )
207221 KafkaTransactionObject <K , V > txObject = (KafkaTransactionObject <K , V >) transaction ;
208222 TransactionSynchronizationManager .unbindResource (getProducerFactory ());
209- txObject .getResourceHolder ().close ();
210- txObject .getResourceHolder ().clear ();
223+ KafkaResourceHolder <K , V > kafkaResourceHolder = txObject .getResourceHolder ();
224+ if (kafkaResourceHolder != null ) {
225+ kafkaResourceHolder .close ();
226+ kafkaResourceHolder .clear ();
227+ }
211228 }
212229
213230 /**
@@ -217,22 +234,22 @@ protected void doCleanupAfterCompletion(Object transaction) {
217234 */
218235 private static class KafkaTransactionObject <K , V > implements SmartTransactionObject {
219236
220- private KafkaResourceHolder <K , V > resourceHolder ;
237+ private @ Nullable KafkaResourceHolder <K , V > resourceHolder ;
221238
222239 KafkaTransactionObject () {
223240 }
224241
225- public void setResourceHolder (KafkaResourceHolder <K , V > resourceHolder ) {
242+ public void setResourceHolder (@ Nullable KafkaResourceHolder <K , V > resourceHolder ) {
226243 this .resourceHolder = resourceHolder ;
227244 }
228245
229- public KafkaResourceHolder <K , V > getResourceHolder () {
246+ public @ Nullable KafkaResourceHolder <K , V > getResourceHolder () {
230247 return this .resourceHolder ;
231248 }
232249
233250 @ Override
234251 public boolean isRollbackOnly () {
235- return this .resourceHolder .isRollbackOnly ();
252+ return this .resourceHolder != null && this . resourceHolder .isRollbackOnly ();
236253 }
237254
238255 @ Override
0 commit comments