Skip to content

Commit 90c4c24

Browse files
committed
Fix commits on partition revocation
Complete the remainder of `onPartitionsRevoked()` if the commits fail fatally. It is possible that commits might fail when using manual acks; we must complete the remainder of the method if the commits fail. **I will backport as needed**
1 parent 3c334f2 commit 90c4c24

File tree

2 files changed

+101
-2
lines changed

2 files changed

+101
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2268,8 +2268,14 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
22682268
else {
22692269
this.userListener.onPartitionsRevoked(partitions);
22702270
}
2271-
// Wait until now to commit, in case the user listener added acks
2272-
commitPendingAcks();
2271+
try {
2272+
// Wait until now to commit, in case the user listener added acks
2273+
commitPendingAcks();
2274+
}
2275+
catch (Exception e) {
2276+
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
2277+
+ partitions);
2278+
}
22732279
if (this.consumerAwareListener != null) {
22742280
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
22752281
partitions);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.stream.Collectors;
5959

6060
import org.apache.commons.logging.LogFactory;
61+
import org.apache.kafka.clients.consumer.CommitFailedException;
6162
import org.apache.kafka.clients.consumer.Consumer;
6263
import org.apache.kafka.clients.consumer.ConsumerConfig;
6364
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -2837,6 +2838,98 @@ public void testCooperativeRebalance() throws Exception {
28372838
container.stop();
28382839
}
28392840

2841+
@SuppressWarnings({ "unchecked", "rawtypes" })
2842+
@Test
2843+
void testCommitFailsOnRevoke() throws Exception {
2844+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2845+
Consumer<Integer, String> consumer = mock(Consumer.class);
2846+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2847+
Map<String, Object> cfProps = new LinkedHashMap<>();
2848+
given(cf.getConfigurationProperties()).willReturn(cfProps);
2849+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
2850+
TopicPartition topicPartition0 = new TopicPartition("foo", 0);
2851+
records.put(topicPartition0, Arrays.asList(
2852+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
2853+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
2854+
records.put(new TopicPartition("foo", 1), Arrays.asList(
2855+
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
2856+
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
2857+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
2858+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
2859+
AtomicBoolean first = new AtomicBoolean(true);
2860+
AtomicInteger rebalance = new AtomicInteger();
2861+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2862+
CountDownLatch latch = new CountDownLatch(2);
2863+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2864+
Thread.sleep(50);
2865+
int call = rebalance.getAndIncrement();
2866+
if (call == 0) {
2867+
rebal.get().onPartitionsRevoked(Collections.emptyList());
2868+
rebal.get().onPartitionsAssigned(records.keySet());
2869+
}
2870+
else if (call == 1) {
2871+
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
2872+
rebal.get().onPartitionsAssigned(Collections.emptyList());
2873+
}
2874+
latch.countDown();
2875+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2876+
});
2877+
willAnswer(invoc -> {
2878+
rebal.set(invoc.getArgument(1));
2879+
return null;
2880+
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
2881+
List<Map<TopicPartition, OffsetAndMetadata>> commits = new ArrayList<>();
2882+
AtomicBoolean firstCommit = new AtomicBoolean(true);
2883+
AtomicInteger commitCount = new AtomicInteger();
2884+
willAnswer(invoc -> {
2885+
commits.add(invoc.getArgument(0, Map.class));
2886+
if (!firstCommit.getAndSet(false)) {
2887+
throw new CommitFailedException();
2888+
}
2889+
return null;
2890+
}).given(consumer).commitSync(any(), any());
2891+
ContainerProperties containerProps = new ContainerProperties("foo");
2892+
containerProps.setGroupId("grp");
2893+
containerProps.setAckMode(AckMode.MANUAL);
2894+
containerProps.setClientId("clientId");
2895+
containerProps.setIdleEventInterval(100L);
2896+
AtomicReference<Acknowledgment> acknowledgment = new AtomicReference<>();
2897+
class AckListener implements AcknowledgingMessageListener {
2898+
// not a lambda https://bugs.openjdk.java.net/browse/JDK-8074381
2899+
2900+
@Override
2901+
public void onMessage(ConsumerRecord data, Acknowledgment ack) {
2902+
acknowledgment.set(ack);
2903+
}
2904+
2905+
@Override
2906+
public void onMessage(Object data) {
2907+
}
2908+
2909+
}
2910+
containerProps.setMessageListener(new AckListener());
2911+
containerProps.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
2912+
2913+
@Override
2914+
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
2915+
Collection<TopicPartition> partitions) {
2916+
2917+
if (acknowledgment.get() != null) {
2918+
acknowledgment.get().acknowledge();
2919+
}
2920+
}
2921+
2922+
});
2923+
Properties consumerProps = new Properties();
2924+
containerProps.setKafkaConsumerProperties(consumerProps);
2925+
KafkaMessageListenerContainer<Integer, String> container =
2926+
new KafkaMessageListenerContainer<>(cf, containerProps);
2927+
container.start();
2928+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
2929+
assertThat(container.getAssignedPartitions()).hasSize(1);
2930+
container.stop();
2931+
}
2932+
28402933
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
28412934
Consumer<?, ?> consumer = spy(
28422935
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)