Skip to content

Commit edf2b37

Browse files
garyrussellartembilan
authored andcommitted
GH-2525: Fix Paused Partition Resume (Rebalance)
Resolves #2525 If a paused partition is revoked and re-assigned during a retry topic pause delay, the partition is not resumed because it no longer exists in the `pausedPartitions` field. When re-pausing a paused partition, it must be re-added to the paused partitions so that it will not be filtered out of the resume logic. **cherry-pick to 2.9.x**
1 parent b4a7190 commit edf2b37

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -3531,6 +3531,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
35313531
ListenerConsumer.this.consumer.pause(toRepause);
35323532
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
35333533
publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
3534+
ListenerConsumer.this.pausedPartitions.addAll(toRepause);
35343535
}
35353536
this.revoked.removeAll(toRepause);
35363537
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2842,6 +2842,95 @@ public void rePausePartitionAfterRebalance() throws Exception {
28422842
container.stop();
28432843
}
28442844

2845+
@SuppressWarnings({ "unchecked" })
2846+
@Test
2847+
public void resumePartitionAfterRevokeAndReAssign() throws Exception {
2848+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2849+
Consumer<Integer, String> consumer = mock(Consumer.class);
2850+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2851+
AtomicBoolean first = new AtomicBoolean(true);
2852+
TopicPartition tp0 = new TopicPartition("foo", 0);
2853+
TopicPartition tp1 = new TopicPartition("foo", 1);
2854+
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
2855+
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2856+
final CountDownLatch suspendConsumerThread = new CountDownLatch(1);
2857+
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2858+
Thread testThread = Thread.currentThread();
2859+
AtomicBoolean paused = new AtomicBoolean();
2860+
willAnswer(i -> {
2861+
pausedParts.clear();
2862+
pausedParts.addAll(i.getArgument(0));
2863+
if (!Thread.currentThread().equals(testThread)) {
2864+
paused.set(true);
2865+
}
2866+
return null;
2867+
}).given(consumer).pause(any());
2868+
given(consumer.paused()).willReturn(pausedParts);
2869+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2870+
if (paused.get()) {
2871+
pauseLatch1.countDown();
2872+
// hold up the consumer thread while we revoke/assign partitions on the test thread
2873+
suspendConsumerThread.await(10, TimeUnit.SECONDS);
2874+
}
2875+
Thread.sleep(50);
2876+
return ConsumerRecords.empty();
2877+
});
2878+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2879+
Collection<String> foos = new ArrayList<>();
2880+
foos.add("foo");
2881+
willAnswer(inv -> {
2882+
rebal.set(inv.getArgument(1));
2883+
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
2884+
return null;
2885+
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
2886+
final CountDownLatch resumeLatch = new CountDownLatch(1);
2887+
willAnswer(i -> {
2888+
pausedParts.removeAll(i.getArgument(0));
2889+
resumeLatch.countDown();
2890+
return null;
2891+
}).given(consumer).resume(any());
2892+
ContainerProperties containerProps = new ContainerProperties("foo");
2893+
containerProps.setGroupId("grp");
2894+
containerProps.setAckMode(AckMode.RECORD);
2895+
containerProps.setClientId("clientId");
2896+
containerProps.setIdleEventInterval(100L);
2897+
containerProps.setMessageListener((MessageListener) rec -> { });
2898+
containerProps.setMissingTopicsFatal(false);
2899+
KafkaMessageListenerContainer<Integer, String> container =
2900+
new KafkaMessageListenerContainer<>(cf, containerProps);
2901+
container.start();
2902+
container.pausePartition(tp0);
2903+
container.pausePartition(tp1);
2904+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
2905+
assertThat(pausedParts).hasSize(2)
2906+
.contains(tp0, tp1);
2907+
rebal.get().onPartitionsRevoked(Set.of(tp0, tp1));
2908+
rebal.get().onPartitionsAssigned(Collections.singleton(tp0));
2909+
rebal.get().onPartitionsRevoked(Set.of(tp0));
2910+
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
2911+
assertThat(pausedParts).hasSize(2)
2912+
.contains(tp0, tp1);
2913+
assertThat(container).extracting("listenerConsumer")
2914+
.extracting("pausedPartitions")
2915+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2916+
.hasSize(2)
2917+
.contains(tp0, tp1);
2918+
assertThat(container)
2919+
.extracting("pauseRequestedPartitions")
2920+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2921+
.hasSize(2)
2922+
.contains(tp0, tp1);
2923+
container.resumePartition(tp0);
2924+
container.resumePartition(tp1);
2925+
suspendConsumerThread.countDown();
2926+
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
2927+
assertThat(pausedParts).hasSize(0);
2928+
ArgumentCaptor<List<TopicPartition>> resumed = ArgumentCaptor.forClass(List.class);
2929+
verify(consumer).resume(resumed.capture());
2930+
assertThat(resumed.getValue()).contains(tp0, tp1);
2931+
container.stop();
2932+
}
2933+
28452934
@SuppressWarnings({ "unchecked", "rawtypes" })
28462935
@Test
28472936
public void testInitialSeek() throws Exception {

0 commit comments

Comments
 (0)