Skip to content

Commit 95f7c67

Browse files
garyrussellartembilan
authored andcommitted
Fix close producers after a rebalance
With a transactional container if committing the offsets fails during a rebalance, we would leave the producers open. Move the close to a finally block.
1 parent 0a7f4a9 commit 95f7c67

File tree

1 file changed

+18
-14
lines changed

1 file changed

+18
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1679,21 +1679,25 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
16791679

16801680
@Override
16811681
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
1682-
if (this.consumerAwareListener != null) {
1683-
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
1684-
partitions);
1685-
}
1686-
else {
1687-
this.userListener.onPartitionsRevoked(partitions);
1688-
}
1689-
// Wait until now to commit, in case the user listener added acks
1690-
commitPendingAcks();
1691-
if (this.consumerAwareListener != null) {
1692-
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
1693-
partitions);
1682+
try {
1683+
if (this.consumerAwareListener != null) {
1684+
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
1685+
partitions);
1686+
}
1687+
else {
1688+
this.userListener.onPartitionsRevoked(partitions);
1689+
}
1690+
// Wait until now to commit, in case the user listener added acks
1691+
commitPendingAcks();
1692+
if (this.consumerAwareListener != null) {
1693+
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
1694+
partitions);
1695+
}
16941696
}
1695-
if (ListenerConsumer.this.kafkaTxManager != null) {
1696-
closeProducers(partitions);
1697+
finally {
1698+
if (ListenerConsumer.this.kafkaTxManager != null) {
1699+
closeProducers(partitions);
1700+
}
16971701
}
16981702
}
16991703

0 commit comments

Comments
 (0)