Skip to content

Commit 2b6400d

Browse files
committed
Fix commits on partition revocation
Complete the remainder of `onPartitionsRevoked()` if the commits fail fatally. It is possible that commits might fail when using manual acks; we must complete the remainder of the method if the commits fail. **I will backport as needed**
1 parent 978868e commit 2b6400d

File tree

2 files changed

+101
-2
lines changed

2 files changed

+101
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2267,8 +2267,14 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
22672267
else {
22682268
this.userListener.onPartitionsRevoked(partitions);
22692269
}
2270-
// Wait until now to commit, in case the user listener added acks
2271-
commitPendingAcks();
2270+
try {
2271+
// Wait until now to commit, in case the user listener added acks
2272+
commitPendingAcks();
2273+
}
2274+
catch (Exception e) {
2275+
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
2276+
+ partitions);
2277+
}
22722278
if (this.consumerAwareListener != null) {
22732279
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
22742280
partitions);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.stream.Collectors;
5959

6060
import org.apache.commons.logging.LogFactory;
61+
import org.apache.kafka.clients.consumer.CommitFailedException;
6162
import org.apache.kafka.clients.consumer.Consumer;
6263
import org.apache.kafka.clients.consumer.ConsumerConfig;
6364
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -2747,6 +2748,98 @@ void testFatalErrorOnFencedInstanceException() throws Exception {
27472748
container.stop();
27482749
}
27492750

2751+
@SuppressWarnings({ "unchecked", "rawtypes" })
2752+
@Test
2753+
void testCommitFailsOnRevoke() throws Exception {
2754+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2755+
Consumer<Integer, String> consumer = mock(Consumer.class);
2756+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2757+
Map<String, Object> cfProps = new LinkedHashMap<>();
2758+
given(cf.getConfigurationProperties()).willReturn(cfProps);
2759+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
2760+
TopicPartition topicPartition0 = new TopicPartition("foo", 0);
2761+
records.put(topicPartition0, Arrays.asList(
2762+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
2763+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
2764+
records.put(new TopicPartition("foo", 1), Arrays.asList(
2765+
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
2766+
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
2767+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
2768+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
2769+
AtomicBoolean first = new AtomicBoolean(true);
2770+
AtomicInteger rebalance = new AtomicInteger();
2771+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2772+
CountDownLatch latch = new CountDownLatch(2);
2773+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2774+
Thread.sleep(50);
2775+
int call = rebalance.getAndIncrement();
2776+
if (call == 0) {
2777+
rebal.get().onPartitionsRevoked(Collections.emptyList());
2778+
rebal.get().onPartitionsAssigned(records.keySet());
2779+
}
2780+
else if (call == 1) {
2781+
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
2782+
rebal.get().onPartitionsAssigned(Collections.emptyList());
2783+
}
2784+
latch.countDown();
2785+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2786+
});
2787+
willAnswer(invoc -> {
2788+
rebal.set(invoc.getArgument(1));
2789+
return null;
2790+
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
2791+
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
2792+
AtomicBoolean firstCommit = new AtomicBoolean(true);
2793+
AtomicInteger commitCount = new AtomicInteger();
2794+
willAnswer(invoc -> {
2795+
commits.add(invoc.getArgument(0, Map.class));
2796+
if (!firstCommit.getAndSet(false)) {
2797+
throw new CommitFailedException();
2798+
}
2799+
return null;
2800+
}).given(consumer).commitSync(any(), any());
2801+
ContainerProperties containerProps = new ContainerProperties("foo");
2802+
containerProps.setGroupId("grp");
2803+
containerProps.setAckMode(AckMode.MANUAL);
2804+
containerProps.setClientId("clientId");
2805+
containerProps.setIdleEventInterval(100L);
2806+
AtomicReference<Acknowledgment> acknowledgment = new AtomicReference<>();
2807+
class AckListener implements AcknowledgingMessageListener {
2808+
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381
2809+
2810+
@Override
2811+
public void onMessage(ConsumerRecord data, Acknowledgment ack) {
2812+
acknowledgment.set(ack);
2813+
}
2814+
2815+
@Override
2816+
public void onMessage(Object data) {
2817+
}
2818+
2819+
}
2820+
containerProps.setMessageListener(new AckListener());
2821+
containerProps.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
2822+
2823+
@Override
2824+
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
2825+
Collection<TopicPartition> partitions) {
2826+
2827+
if (acknowledgment.get() != null) {
2828+
acknowledgment.get().acknowledge();
2829+
}
2830+
}
2831+
2832+
});
2833+
Properties consumerProps = new Properties();
2834+
containerProps.setKafkaConsumerProperties(consumerProps);
2835+
KafkaMessageListenerContainer<Integer, String> container =
2836+
new KafkaMessageListenerContainer<>(cf, containerProps);
2837+
container.start();
2838+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
2839+
assertThat(container.getAssignedPartitions()).hasSize(1);
2840+
container.stop();
2841+
}
2842+
27502843
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
27512844
Consumer<?, ?> consumer = spy(
27522845
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)