Skip to content

Commit 6331d89

Browse files
garyrussellartembilan
authored andcommitted
Fix close producers after a rebalance
With a transactional container if committing the offsets fails during a rebalance, we would leave the producers open. Move the close to a finally block.
1 parent 73e71e5 commit 6331d89

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -487,11 +487,15 @@ public ConsumerRebalanceListener createRebalanceListener(final Consumer<K, V> co
487487

488488
@Override
489489
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
490-
getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions);
491-
// Wait until now to commit, in case the user listener added acks
492-
commitPendingAcks();
493-
if (ListenerConsumer.this.kafkaTxManager != null) {
494-
closeProducers(partitions);
490+
try {
491+
getContainerProperties().getConsumerRebalanceListener().onPartitionsRevoked(partitions);
492+
// Wait until now to commit, in case the user listener added acks
493+
commitPendingAcks();
494+
}
495+
finally {
496+
if (ListenerConsumer.this.kafkaTxManager != null) {
497+
closeProducers(partitions);
498+
}
495499
}
496500
}
497501

0 commit comments

Comments
 (0)