Skip to content

Commit 16f3d8c

Browse files
garyrussellartembilan
authored andcommitted
GH-813: Re-pause a paused consumer after rebalance
Fixes #813 Kafka resumes the consumer(s) after a rebalance, but since the `ListenerConsumer.consumerPaused` is still true, the container starts consuming again and can't be paused without first issuing a resume. On a rebalance, reset the boolean; the consumer will re-pause before the next poll, if the container's `isPaused()` is still true after the rebalance listeners exit. Tested with a stand-alone Boot app (see the referenced issue). **cherry-pick to 2.1.x** (cherry picked from commit 012215c)
1 parent 98af3e6 commit 16f3d8c

File tree

1 file changed

+6
-0
lines changed

1 file changed

+6
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,12 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
548548

549549
@Override
550550
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
551+
if (ListenerConsumer.this.consumerPaused) {
552+
ListenerConsumer.this.consumerPaused = false;
553+
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
554+
+ "the container will pause again before polling, unless the container's "
555+
+ "'paused' property is reset by a custom rebalance listener");
556+
}
551557
ListenerConsumer.this.assignedPartitions = partitions;
552558
if (!ListenerConsumer.this.autoCommit) {
553559
// Commit initial positions - this is generally redundant but

0 commit comments

Comments
 (0)