@@ -1829,22 +1829,7 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
18291829 TransactionSupport
18301830 .setTransactionIdSuffix (zombieFenceTxIdSuffix (record .topic (), record .partition ()));
18311831 }
1832- this .transactionTemplate .execute (new TransactionCallbackWithoutResult () {
1833-
1834- @ Override
1835- public void doInTransactionWithoutResult (TransactionStatus s ) {
1836- if (ListenerConsumer .this .kafkaTxManager != null ) {
1837- ListenerConsumer .this .producer = ((KafkaResourceHolder ) TransactionSynchronizationManager
1838- .getResource (ListenerConsumer .this .kafkaTxManager .getProducerFactory ()))
1839- .getProducer (); // NOSONAR
1840- }
1841- RuntimeException aborted = doInvokeRecordListener (record , iterator );
1842- if (aborted != null ) {
1843- throw aborted ;
1844- }
1845- }
1846-
1847- });
1832+ invokeInTransaction (iterator , record );
18481833 }
18491834 catch (ProducerFencedException | FencedInstanceIdException e ) {
18501835 this .logger .error (e , "Producer or 'group.instance.id' fenced during transaction" );
@@ -1870,6 +1855,25 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
18701855 }
18711856 }
18721857
1858+ private void invokeInTransaction (Iterator <ConsumerRecord <K , V >> iterator , final ConsumerRecord <K , V > record ) {
1859+ this .transactionTemplate .execute (new TransactionCallbackWithoutResult () {
1860+
1861+ @ Override
1862+ public void doInTransactionWithoutResult (TransactionStatus s ) {
1863+ if (ListenerConsumer .this .kafkaTxManager != null ) {
1864+ ListenerConsumer .this .producer = ((KafkaResourceHolder ) TransactionSynchronizationManager
1865+ .getResource (ListenerConsumer .this .kafkaTxManager .getProducerFactory ()))
1866+ .getProducer (); // NOSONAR
1867+ }
1868+ RuntimeException aborted = doInvokeRecordListener (record , iterator );
1869+ if (aborted != null ) {
1870+ throw aborted ;
1871+ }
1872+ }
1873+
1874+ });
1875+ }
1876+
18731877 private void recordAfterRollback (Iterator <ConsumerRecord <K , V >> iterator , final ConsumerRecord <K , V > record ,
18741878 RuntimeException e ) {
18751879
0 commit comments