Skip to content

Commit 535c573

Browse files
garyrussellartembilan
authored andcommitted
Fix close producers after a rebalance
With a transactional container if committing the offsets fails during a rebalance, we would leave the producers open. Move the close to a finally block.
1 parent 0d5495f commit 535c573

File tree

1 file changed

+18
-14
lines changed

1 file changed

+18
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,21 +1720,25 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
17201720

17211721
@Override
17221722
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
1723-
if (this.consumerAwareListener != null) {
1724-
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
1725-
partitions);
1726-
}
1727-
else {
1728-
this.userListener.onPartitionsRevoked(partitions);
1729-
}
1730-
// Wait until now to commit, in case the user listener added acks
1731-
commitPendingAcks();
1732-
if (this.consumerAwareListener != null) {
1733-
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
1734-
partitions);
1723+
try {
1724+
if (this.consumerAwareListener != null) {
1725+
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
1726+
partitions);
1727+
}
1728+
else {
1729+
this.userListener.onPartitionsRevoked(partitions);
1730+
}
1731+
// Wait until now to commit, in case the user listener added acks
1732+
commitPendingAcks();
1733+
if (this.consumerAwareListener != null) {
1734+
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
1735+
partitions);
1736+
}
17351737
}
1736-
if (ListenerConsumer.this.kafkaTxManager != null) {
1737-
closeProducers(partitions);
1738+
finally {
1739+
if (ListenerConsumer.this.kafkaTxManager != null) {
1740+
closeProducers(partitions);
1741+
}
17381742
}
17391743
}
17401744

0 commit comments

Comments
 (0)