@@ -662,6 +662,14 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
662662
663663 private final boolean stopImmediate = this .containerProperties .isStopImmediate ();
664664
665+ private final Set <TopicPartition > pausedPartitions = new HashSet <>();
666+
667+ private final Map <TopicPartition , Long > lastReceivePartition ;
668+
669+ private final Map <TopicPartition , Long > lastAlertPartition ;
670+
671+ private final Map <TopicPartition , Boolean > wasIdlePartition ;
672+
665673 private Map <TopicPartition , OffsetMetadata > definedPartitions ;
666674
667675 private int count ;
@@ -676,10 +684,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
676684
677685 private long lastAlertAt = this .lastReceive ;
678686
679- private final Map <TopicPartition , Long > lastReceivePartition ;
680-
681- private final Map <TopicPartition , Long > lastAlertPartition ;
682-
683687 private long nackSleep = -1 ;
684688
685689 private int nackIndex ;
@@ -696,8 +700,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
696700
697701 private boolean wasIdle ;
698702
699- private final Map <TopicPartition , Boolean > wasIdlePartition ;
700-
701703 private boolean batchFailed ;
702704
703705 private volatile boolean consumerPaused ;
@@ -706,8 +708,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
706708
707709 private volatile long lastPoll = System .currentTimeMillis ();
708710
709- private final Set <TopicPartition > pausedPartitions ;
710-
711711 @ SuppressWarnings (UNCHECKED )
712712 ListenerConsumer (GenericMessageListener <?> listener , ListenerType listenerType ) {
713713 Properties consumerProperties = propertiesFromProperties ();
@@ -795,7 +795,6 @@ else if (listener instanceof MessageListener) {
795795 this .lastReceivePartition = new HashMap <>();
796796 this .lastAlertPartition = new HashMap <>();
797797 this .wasIdlePartition = new HashMap <>();
798- this .pausedPartitions = new HashSet <>();
799798 }
800799
801800 private Properties propertiesFromProperties () {
@@ -2918,12 +2917,15 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
29182917 this .userListener instanceof ConsumerAwareRebalanceListener
29192918 ? (ConsumerAwareRebalanceListener ) this .userListener : null ;
29202919
2920+ private final Collection <TopicPartition > revoked = new LinkedList <>();
2921+
29212922 ListenerConsumerRebalanceListener () {
29222923 }
29232924
29242925 @ Override
29252926 public void onPartitionsRevoked (Collection <TopicPartition > partitions ) {
29262927 try {
2928+ this .revoked .addAll (partitions );
29272929 if (this .consumerAwareListener != null ) {
29282930 this .consumerAwareListener .onPartitionsRevokedBeforeCommit (ListenerConsumer .this .consumer ,
29292931 partitions );
@@ -2961,11 +2963,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
29612963
29622964 @ Override
29632965 public void onPartitionsAssigned (Collection <TopicPartition > partitions ) {
2964- if (ListenerConsumer .this .consumerPaused ) {
2965- ListenerConsumer .this .consumer .pause (partitions );
2966- ListenerConsumer .this .logger .warn ("Paused consumer resumed by Kafka due to rebalance; "
2967- + "consumer paused again, so the initial poll() will never return any records" );
2968- }
2966+ repauseIfNeeded (partitions );
29692967 ListenerConsumer .this .assignedPartitions .addAll (partitions );
29702968 if (ListenerConsumer .this .commitCurrentOnAssignment
29712969 && !collectAndCommitIfNecessary (partitions )) {
@@ -2982,6 +2980,27 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
29822980 }
29832981 }
29842982
2983+ private void repauseIfNeeded (Collection <TopicPartition > partitions ) {
2984+ if (ListenerConsumer .this .consumerPaused ) {
2985+ ListenerConsumer .this .consumer .pause (partitions );
2986+ ListenerConsumer .this .logger .warn ("Paused consumer resumed by Kafka due to rebalance; "
2987+ + "consumer paused again, so the initial poll() will never return any records" );
2988+ }
2989+ Collection <TopicPartition > toRepause = new LinkedList <>();
2990+ partitions .forEach (tp -> {
2991+ if (isPartitionPauseRequested (tp )) {
2992+ toRepause .add (tp );
2993+ }
2994+ });
2995+ if (!ListenerConsumer .this .consumerPaused && toRepause .size () > 0 ) {
2996+ ListenerConsumer .this .consumer .pause (toRepause );
2997+ }
2998+ this .revoked .removeAll (toRepause );
2999+ this .revoked .forEach (tp -> resumePartition (tp ));
3000+ ListenerConsumer .this .pausedPartitions .removeAll (this .revoked );
3001+ this .revoked .clear ();
3002+ }
3003+
29853004 private boolean collectAndCommitIfNecessary (Collection <TopicPartition > partitions ) {
29863005 // Commit initial positions - this is generally redundant but
29873006 // it protects us from the case when another consumer starts
0 commit comments