@@ -761,6 +761,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
761761
762762 private ConsumerRecords <K , V > pendingRecordsAfterError ;
763763
764+ private boolean pauseForPending ;
765+
764766 private volatile boolean consumerPaused ;
765767
766768 private volatile Thread consumerThread ;
@@ -1547,7 +1549,10 @@ private ConsumerRecords<K, V> doPoll() {
15471549 + "after an error; emergency stop invoked to avoid message loss" , howManyRecords ));
15481550 KafkaMessageListenerContainer .this .emergencyStop .run ();
15491551 }
1550- if (!isPartitionPaused (this .pendingRecordsAfterError .partitions ().iterator ().next ())) {
1552+ TopicPartition firstPart = this .pendingRecordsAfterError .partitions ().iterator ().next ();
1553+ boolean isPaused = isPartitionPauseRequested (firstPart );
1554+ this .logger .debug (() -> "First pending after error: " + firstPart + "; paused: " + isPaused );
1555+ if (!isPaused ) {
15511556 records = this .pendingRecordsAfterError ;
15521557 this .pendingRecordsAfterError = null ;
15531558 }
@@ -1663,10 +1668,11 @@ private void doPauseConsumerIfNecessary() {
16631668 this .logger .debug (() -> "Pausing for incomplete async acks: " + this .offsetsInThisBatch );
16641669 }
16651670 if (!this .consumerPaused && (isPaused () || this .pausedForAsyncAcks )
1666- || this .pendingRecordsAfterError != null ) {
1671+ || this .pauseForPending ) {
16671672
16681673 this .consumer .pause (this .consumer .assignment ());
16691674 this .consumerPaused = true ;
1675+ this .pauseForPending = false ;
16701676 this .logger .debug (() -> "Paused consumption from: " + this .consumer .paused ());
16711677 publishConsumerPausedEvent (this .consumer .assignment ());
16721678 }
@@ -2381,6 +2387,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
23812387 () -> invokeBatchOnMessageWithRecordsOrList (records , list ));
23822388 if (!afterHandling .isEmpty ()) {
23832389 this .pendingRecordsAfterError = afterHandling ;
2390+ this .pauseForPending = true ;
23842391 }
23852392 }
23862393 }
@@ -2786,6 +2793,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
27862793 }
27872794 if (records .size () > 0 ) {
27882795 this .pendingRecordsAfterError = new ConsumerRecords <>(records );
2796+ this .pauseForPending = true ;
27892797 }
27902798 }
27912799 }
0 commit comments