7676import org .springframework .kafka .KafkaException ;
7777import org .springframework .kafka .core .ConsumerFactory ;
7878import org .springframework .kafka .core .KafkaResourceHolder ;
79- import org .springframework .kafka .core .ProducerFactory ;
8079import org .springframework .kafka .event .ConsumerFailedToStartEvent ;
8180import org .springframework .kafka .event .ConsumerPartitionPausedEvent ;
8281import org .springframework .kafka .event .ConsumerPartitionResumedEvent ;
102101import org .springframework .kafka .support .LogIfLevelEnabled ;
103102import org .springframework .kafka .support .TopicPartitionOffset ;
104103import org .springframework .kafka .support .TopicPartitionOffset .SeekPosition ;
105- import org .springframework .kafka .support .TransactionSupport ;
106104import org .springframework .kafka .support .micrometer .MicrometerHolder ;
107105import org .springframework .kafka .support .serializer .DeserializationException ;
108106import org .springframework .kafka .support .serializer .ErrorHandlingDeserializer ;
@@ -738,8 +736,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
738736
739737 private Producer <?, ?> producer ;
740738
741- private boolean producerPerConsumerPartition ;
742-
743739 private boolean commitRecovered ;
744740
745741 private boolean wasIdle ;
@@ -1022,10 +1018,6 @@ boolean isPartitionPaused(TopicPartition topicPartition) {
10221018
10231019 @ Nullable
10241020 private TransactionTemplate determineTransactionTemplate () {
1025- if (this .kafkaTxManager != null ) {
1026- this .producerPerConsumerPartition =
1027- this .kafkaTxManager .getProducerFactory ().isProducerPerConsumerPartition ();
1028- }
10291021 if (this .transactionManager != null ) {
10301022 TransactionTemplate template = new TransactionTemplate (this .transactionManager );
10311023 TransactionDefinition definition = this .containerProperties .getTransactionDefinition ();
@@ -1740,9 +1732,6 @@ private void wrapUp(@Nullable Throwable throwable) {
17401732 // No-op. Continue process
17411733 }
17421734 }
1743- else {
1744- closeProducers (partitions );
1745- }
17461735 }
17471736 else {
17481737 this .logger .error ("Fatal consumer exception; stopping container" );
@@ -1995,11 +1984,6 @@ private void invokeBatchListenerInTx(final ConsumerRecords<K, V> records,
19951984 @ Nullable final List <ConsumerRecord <K , V >> recordList ) {
19961985
19971986 try {
1998- if (this .subBatchPerPartition && this .producerPerConsumerPartition ) {
1999- ConsumerRecord <K , V > record = recordList == null ? records .iterator ().next () : recordList .get (0 );
2000- TransactionSupport
2001- .setTransactionIdSuffix (zombieFenceTxIdSuffix (record .topic (), record .partition ())); // NOSONAR
2002- }
20031987 this .transactionTemplate .execute (new TransactionCallbackWithoutResult () {
20041988
20051989 @ Override
@@ -2028,11 +2012,6 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
20282012 this .logger .error (e , "Transaction rolled back" );
20292013 batchRollback (records , recordList , e );
20302014 }
2031- finally {
2032- if (this .subBatchPerPartition && this .producerPerConsumerPartition ) {
2033- TransactionSupport .clearTransactionIdSuffix ();
2034- }
2035- }
20362015 }
20372016
20382017 private void batchRollback (final ConsumerRecords <K , V > records ,
@@ -2327,11 +2306,6 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
23272306 this .logger .error (ex , "Transaction rolled back" );
23282307 recordAfterRollback (iterator , record , ex );
23292308 }
2330- finally {
2331- if (this .producerPerConsumerPartition ) {
2332- TransactionSupport .clearTransactionIdSuffix ();
2333- }
2334- }
23352309 if (this .commonRecordInterceptor != null ) {
23362310 this .commonRecordInterceptor .afterRecord (record , this .consumer );
23372311 }
@@ -2344,10 +2318,6 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
23442318 }
23452319
23462320 private void invokeInTransaction (Iterator <ConsumerRecord <K , V >> iterator , final ConsumerRecord <K , V > record ) {
2347- if (this .producerPerConsumerPartition ) {
2348- TransactionSupport
2349- .setTransactionIdSuffix (zombieFenceTxIdSuffix (record .topic (), record .partition ()));
2350- }
23512321 this .transactionTemplate .execute (new TransactionCallbackWithoutResult () {
23522322
23532323 @ Override
@@ -3109,25 +3079,6 @@ public String toString() {
31093079 + "\n ]" ;
31103080 }
31113081
3112- private void closeProducers (@ Nullable Collection <TopicPartition > partitions ) {
3113- if (partitions != null ) {
3114- ProducerFactory <?, ?> producerFactory = this .kafkaTxManager .getProducerFactory ();
3115- partitions .forEach (tp -> {
3116- try {
3117- producerFactory .closeProducerFor (zombieFenceTxIdSuffix (tp .topic (), tp .partition ()));
3118- }
3119- catch (Exception e ) {
3120- this .logger .error (e , () -> "Failed to close producer with transaction id suffix: "
3121- + zombieFenceTxIdSuffix (tp .topic (), tp .partition ()));
3122- }
3123- });
3124- }
3125- }
3126-
3127- private String zombieFenceTxIdSuffix (String topic , int partition ) {
3128- return this .consumerGroupId + "." + topic + "." + partition ;
3129- }
3130-
31313082 private final class ConsumerAcknowledgment implements Acknowledgment {
31323083
31333084 private final ConsumerRecord <K , V > record ;
@@ -3246,47 +3197,40 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
32463197
32473198 @ Override
32483199 public void onPartitionsRevoked (Collection <TopicPartition > partitions ) {
3200+ if (this .consumerAwareListener != null ) {
3201+ this .consumerAwareListener .onPartitionsRevokedBeforeCommit (ListenerConsumer .this .consumer ,
3202+ partitions );
3203+ }
3204+ else {
3205+ this .userListener .onPartitionsRevoked (partitions );
3206+ }
32493207 try {
3250- if (this .consumerAwareListener != null ) {
3251- this .consumerAwareListener .onPartitionsRevokedBeforeCommit (ListenerConsumer .this .consumer ,
3252- partitions );
3253- }
3254- else {
3255- this .userListener .onPartitionsRevoked (partitions );
3256- }
3257- try {
3258- // Wait until now to commit, in case the user listener added acks
3259- commitPendingAcks ();
3260- fixTxOffsetsIfNeeded ();
3261- }
3262- catch (Exception e ) {
3263- ListenerConsumer .this .logger .error (e , () -> "Fatal commit error after revocation "
3264- + partitions );
3265- }
3266- if (this .consumerAwareListener != null ) {
3267- this .consumerAwareListener .onPartitionsRevokedAfterCommit (ListenerConsumer .this .consumer ,
3268- partitions );
3269- }
3270- if (ListenerConsumer .this .consumerSeekAwareListener != null ) {
3271- ListenerConsumer .this .consumerSeekAwareListener .onPartitionsRevoked (partitions );
3272- }
3273- if (ListenerConsumer .this .assignedPartitions != null ) {
3274- ListenerConsumer .this .assignedPartitions .removeAll (partitions );
3275- }
3276- ListenerConsumer .this .pausedForNack .removeAll (partitions );
3277- partitions .forEach (tp -> ListenerConsumer .this .lastCommits .remove (tp ));
3278- synchronized (ListenerConsumer .this ) {
3279- if (ListenerConsumer .this .offsetsInThisBatch != null ) {
3280- partitions .forEach (tp -> {
3281- ListenerConsumer .this .offsetsInThisBatch .remove (tp );
3282- ListenerConsumer .this .deferredOffsets .remove (tp );
3283- });
3284- }
3285- }
3208+ // Wait until now to commit, in case the user listener added acks
3209+ commitPendingAcks ();
3210+ fixTxOffsetsIfNeeded ();
32863211 }
3287- finally {
3288- if (ListenerConsumer .this .kafkaTxManager != null ) {
3289- closeProducers (partitions );
3212+ catch (Exception e ) {
3213+ ListenerConsumer .this .logger .error (e , () -> "Fatal commit error after revocation "
3214+ + partitions );
3215+ }
3216+ if (this .consumerAwareListener != null ) {
3217+ this .consumerAwareListener .onPartitionsRevokedAfterCommit (ListenerConsumer .this .consumer ,
3218+ partitions );
3219+ }
3220+ if (ListenerConsumer .this .consumerSeekAwareListener != null ) {
3221+ ListenerConsumer .this .consumerSeekAwareListener .onPartitionsRevoked (partitions );
3222+ }
3223+ if (ListenerConsumer .this .assignedPartitions != null ) {
3224+ ListenerConsumer .this .assignedPartitions .removeAll (partitions );
3225+ }
3226+ ListenerConsumer .this .pausedForNack .removeAll (partitions );
3227+ partitions .forEach (tp -> ListenerConsumer .this .lastCommits .remove (tp ));
3228+ synchronized (ListenerConsumer .this ) {
3229+ if (ListenerConsumer .this .offsetsInThisBatch != null ) {
3230+ partitions .forEach (tp -> {
3231+ ListenerConsumer .this .offsetsInThisBatch .remove (tp );
3232+ ListenerConsumer .this .deferredOffsets .remove (tp );
3233+ });
32903234 }
32913235 }
32923236 }
@@ -3349,33 +3293,25 @@ private void commitCurrentOffsets(Map<TopicPartition, OffsetAndMetadata> offsets
33493293 if (ListenerConsumer .this .transactionTemplate != null
33503294 && ListenerConsumer .this .kafkaTxManager != null
33513295 && !AssignmentCommitOption .LATEST_ONLY_NO_TX .equals (ListenerConsumer .this .autoCommitOption )) {
3352- try {
3353- offsetsToCommit .forEach ((partition , offsetAndMetadata ) -> {
3354- if (ListenerConsumer .this .producerPerConsumerPartition ) {
3355- TransactionSupport .setTransactionIdSuffix (
3356- zombieFenceTxIdSuffix (partition .topic (), partition .partition ()));
3357- }
3358- ListenerConsumer .this .transactionTemplate
3359- .execute (new TransactionCallbackWithoutResult () {
3360-
3361- @ Override
3362- protected void doInTransactionWithoutResult (TransactionStatus status ) {
3363- KafkaResourceHolder <?, ?> holder =
3364- (KafkaResourceHolder <?, ?>) TransactionSynchronizationManager
3365- .getResource (ListenerConsumer .this .kafkaTxManager
3366- .getProducerFactory ());
3367- if (holder != null ) {
3368- doSendOffsets (holder .getProducer (),
3369- Collections .singletonMap (partition , offsetAndMetadata ));
3370- }
3296+
3297+ offsetsToCommit .forEach ((partition , offsetAndMetadata ) -> {
3298+ ListenerConsumer .this .transactionTemplate
3299+ .execute (new TransactionCallbackWithoutResult () {
3300+
3301+ @ Override
3302+ protected void doInTransactionWithoutResult (TransactionStatus status ) {
3303+ KafkaResourceHolder <?, ?> holder =
3304+ (KafkaResourceHolder <?, ?>) TransactionSynchronizationManager
3305+ .getResource (ListenerConsumer .this .kafkaTxManager
3306+ .getProducerFactory ());
3307+ if (holder != null ) {
3308+ doSendOffsets (holder .getProducer (),
3309+ Collections .singletonMap (partition , offsetAndMetadata ));
33713310 }
3311+ }
33723312
3373- });
3374- });
3375- }
3376- finally {
3377- TransactionSupport .clearTransactionIdSuffix ();
3378- }
3313+ });
3314+ });
33793315 }
33803316 else {
33813317 ContainerProperties containerProps = KafkaMessageListenerContainer .this .getContainerProperties ();
0 commit comments