Skip to content

Commit 726c485

Browse files
garyrussellartembilan
authored andcommitted
GH-1866: Fix Pause/Resume
Resolves #1866 The new retryable topic feature pauses/resumes individual partitions. This broke normal container pause/resume by incorrectly resuming partitions that were paused by the container pause operation. Similarly, if individual partitions were paused and then the container was paused and resumed, the container resumed all partitions. Decouple the functionality to prevent this cross-talk. Do not resume any individually paused partitions when the container is in a paused state. Do not resume any individually paused partitions when the container is resumed. Also Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests. Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments. Add tests to verify no cross-talk between pausing individual partitions and the container. * Fix race in test. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent 6cbfe2a commit 726c485

File tree

3 files changed

+85
-27
lines changed

3 files changed

+85
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import java.util.Arrays;
2020
import java.util.Collection;
21-
import java.util.HashSet;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Map.Entry;
2524
import java.util.Set;
25+
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.regex.Pattern;
@@ -109,7 +109,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
109109

110110
private ApplicationContext applicationContext;
111111

112-
private final Set<TopicPartition> pauseRequestedPartitions;
112+
private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();
113113

114114
/**
115115
* Construct an instance with the provided factory and properties.
@@ -159,8 +159,6 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
159159
if (this.containerProperties.getConsumerRebalanceListener() == null) {
160160
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
161161
}
162-
163-
this.pauseRequestedPartitions = new HashSet<>();
164162
}
165163

166164
@Override
@@ -263,23 +261,17 @@ protected boolean isPaused() {
263261

264262
@Override
265263
public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
266-
synchronized (this.pauseRequestedPartitions) {
267-
return this.pauseRequestedPartitions.contains(topicPartition);
268-
}
264+
return this.pauseRequestedPartitions.contains(topicPartition);
269265
}
270266

271267
@Override
272268
public void pausePartition(TopicPartition topicPartition) {
273-
synchronized (this.pauseRequestedPartitions) {
274-
this.pauseRequestedPartitions.add(topicPartition);
275-
}
269+
this.pauseRequestedPartitions.add(topicPartition);
276270
}
277271

278272
@Override
279273
public void resumePartition(TopicPartition topicPartition) {
280-
synchronized (this.pauseRequestedPartitions) {
281-
this.pauseRequestedPartitions.remove(topicPartition);
282-
}
274+
this.pauseRequestedPartitions.remove(topicPartition);
283275
}
284276

285277
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,7 +1258,9 @@ protected void pollAndInvoke() {
12581258
return;
12591259
}
12601260
resumeConsumerIfNeccessary();
1261-
resumePartitionsIfNecessary();
1261+
if (!this.consumerPaused) {
1262+
resumePartitionsIfNecessary();
1263+
}
12621264
debugRecords(records);
12631265

12641266
invokeIfHaveRecords(records);
@@ -1466,7 +1468,8 @@ private void pauseConsumerIfNecessary() {
14661468
private void resumeConsumerIfNeccessary() {
14671469
if (this.consumerPaused && !isPaused()) {
14681470
this.logger.debug(() -> "Resuming consumption from: " + this.consumer.paused());
1469-
Set<TopicPartition> paused = this.consumer.paused();
1471+
Collection<TopicPartition> paused = new LinkedList<>(this.consumer.paused());
1472+
paused.removeAll(this.pausedPartitions);
14701473
this.consumer.resume(paused);
14711474
this.consumerPaused = false;
14721475
publishConsumerResumedEvent(paused);
@@ -1475,8 +1478,7 @@ private void resumeConsumerIfNeccessary() {
14751478

14761479
private void pausePartitionsIfNecessary() {
14771480
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
1478-
List<TopicPartition> partitionsToPause = this
1479-
.assignedPartitions
1481+
List<TopicPartition> partitionsToPause = getAssignedPartitions()
14801482
.stream()
14811483
.filter(tp -> isPartitionPauseRequested(tp)
14821484
&& !pausedConsumerPartitions.contains(tp))

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import java.util.Map;
4848
import java.util.Map.Entry;
4949
import java.util.Properties;
50+
import java.util.Set;
51+
import java.util.concurrent.ConcurrentHashMap;
5052
import java.util.concurrent.CountDownLatch;
5153
import java.util.concurrent.Executors;
5254
import java.util.concurrent.TimeUnit;
@@ -2338,14 +2340,6 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
23382340
AtomicBoolean first = new AtomicBoolean(true);
23392341
AtomicBoolean rebalance = new AtomicBoolean(true);
23402342
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2341-
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2342-
Thread.sleep(50);
2343-
if (rebalance.getAndSet(false)) {
2344-
rebal.get().onPartitionsRevoked(Collections.emptyList());
2345-
rebal.get().onPartitionsAssigned(records.keySet());
2346-
}
2347-
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2348-
});
23492343
final CountDownLatch seekLatch = new CountDownLatch(7);
23502344
willAnswer(i -> {
23512345
seekLatch.countDown();
@@ -2354,17 +2348,32 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
23542348
given(consumer.assignment()).willReturn(records.keySet());
23552349
final CountDownLatch pauseLatch1 = new CountDownLatch(2); // consumer, event publisher
23562350
final CountDownLatch pauseLatch2 = new CountDownLatch(2); // consumer, consumer
2351+
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
23572352
willAnswer(i -> {
2353+
pausedParts.addAll(i.getArgument(0));
23582354
pauseLatch1.countDown();
23592355
pauseLatch2.countDown();
23602356
return null;
23612357
}).given(consumer).pause(records.keySet());
2362-
given(consumer.paused()).willReturn(records.keySet());
2358+
given(consumer.paused()).willReturn(pausedParts);
2359+
CountDownLatch pollWhilePausedLatch = new CountDownLatch(2);
2360+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2361+
Thread.sleep(50);
2362+
if (pauseLatch1.getCount() == 0) {
2363+
pollWhilePausedLatch.countDown();
2364+
}
2365+
if (rebalance.getAndSet(false)) {
2366+
rebal.get().onPartitionsRevoked(Collections.emptyList());
2367+
rebal.get().onPartitionsAssigned(records.keySet());
2368+
}
2369+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2370+
});
23632371
final CountDownLatch resumeLatch = new CountDownLatch(2);
23642372
willAnswer(i -> {
2373+
pausedParts.removeAll(i.getArgument(0));
23652374
resumeLatch.countDown();
23662375
return null;
2367-
}).given(consumer).resume(records.keySet());
2376+
}).given(consumer).resume(any());
23682377
willAnswer(invoc -> {
23692378
rebal.set(invoc.getArgument(1));
23702379
return null;
@@ -2456,6 +2465,8 @@ else if (e instanceof ConsumerStoppedEvent) {
24562465
assertThat(container.isPaused()).isTrue();
24572466
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
24582467
assertThat(container.isContainerPaused()).isTrue();
2468+
assertThat(pollWhilePausedLatch.await(10, TimeUnit.SECONDS)).isTrue();
2469+
verify(consumer, never()).resume(any());
24592470
rebalance.set(true); // force a re-pause
24602471
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
24612472
container.resume();
@@ -2465,6 +2476,59 @@ else if (e instanceof ConsumerStoppedEvent) {
24652476
verify(consumer, times(6)).commitSync(anyMap(), eq(Duration.ofSeconds(41)));
24662477
}
24672478

2479+
@SuppressWarnings({ "unchecked" })
2480+
@Test
2481+
public void dontResumePausedPartition() throws Exception {
2482+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2483+
Consumer<Integer, String> consumer = mock(Consumer.class);
2484+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2485+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
2486+
AtomicBoolean first = new AtomicBoolean(true);
2487+
given(consumer.assignment()).willReturn(Set.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
2488+
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2489+
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
2490+
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2491+
willAnswer(i -> {
2492+
pausedParts.addAll(i.getArgument(0));
2493+
pauseLatch1.countDown();
2494+
pauseLatch2.countDown();
2495+
return null;
2496+
}).given(consumer).pause(any());
2497+
given(consumer.paused()).willReturn(pausedParts);
2498+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2499+
Thread.sleep(50);
2500+
return emptyRecords;
2501+
});
2502+
final CountDownLatch resumeLatch = new CountDownLatch(1);
2503+
willAnswer(i -> {
2504+
pausedParts.removeAll(i.getArgument(0));
2505+
resumeLatch.countDown();
2506+
return null;
2507+
}).given(consumer).resume(any());
2508+
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0),
2509+
new TopicPartitionOffset("foo", 1));
2510+
containerProps.setGroupId("grp");
2511+
containerProps.setAckMode(AckMode.RECORD);
2512+
containerProps.setClientId("clientId");
2513+
containerProps.setIdleEventInterval(100L);
2514+
containerProps.setMessageListener((MessageListener) rec -> { });
2515+
containerProps.setMissingTopicsFatal(false);
2516+
KafkaMessageListenerContainer<Integer, String> container =
2517+
new KafkaMessageListenerContainer<>(cf, containerProps);
2518+
container.start();
2519+
InOrder inOrder = inOrder(consumer);
2520+
container.pausePartition(new TopicPartition("foo", 1));
2521+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
2522+
assertThat(pausedParts).hasSize(1);
2523+
container.pause();
2524+
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
2525+
assertThat(pausedParts).hasSize(2);
2526+
container.resume();
2527+
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
2528+
assertThat(pausedParts).hasSize(1);
2529+
container.stop();
2530+
}
2531+
24682532
@SuppressWarnings({ "unchecked", "rawtypes" })
24692533
@Test
24702534
public void testInitialSeek() throws Exception {

0 commit comments

Comments
 (0)