Skip to content

Commit 309d28e

Browse files
garyrussellartembilan
authored andcommitted
GH-1247: Only wake consumer while polling
Fixes #1247 Previously, stopping the container unconditionally called `consumer.wakeUp()`. It should only be woken while actually polling, otherwise operations such as `commitSync()` would fail (after committing the offset but before calling any callbacks or interceptors). Add logic to only wake the consumer while it is in the `poll()` method. **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandlerTests.java # spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerBatchModeTests.java
1 parent a91ffcc commit 309d28e

File tree

5 files changed

+87
-7
lines changed

5 files changed

+87
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.LinkedBlockingQueue;
3434
import java.util.concurrent.ScheduledFuture;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3536
import java.util.stream.Collectors;
3637

3738
import org.apache.commons.logging.Log;
@@ -292,7 +293,7 @@ public void onSuccess(Object result) {
292293
}
293294
});
294295
setRunning(false);
295-
this.listenerConsumer.consumer.wakeup();
296+
this.listenerConsumer.wakeIfNecessary();
296297
}
297298
}
298299

@@ -405,6 +406,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
405406

406407
private volatile Map<TopicPartition, OffsetMetadata> definedPartitions;
407408

409+
private final AtomicBoolean polling = new AtomicBoolean();
410+
408411
private volatile Collection<TopicPartition> assignedPartitions;
409412

410413
private volatile Thread consumerThread;
@@ -727,8 +730,19 @@ public void run() {
727730
}
728731
publishConsumerPausedEvent(this.consumer.assignment());
729732
}
730-
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
731733
this.lastPoll = System.currentTimeMillis();
734+
this.polling.set(true);
735+
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
736+
if (!this.polling.compareAndSet(true, false)) {
737+
/*
738+
* There is a small race condition where wakeIfNecessary was called between
739+
* exiting the poll and before we reset the boolean.
740+
*/
741+
if (records.count() > 0 && this.logger.isDebugEnabled()) {
742+
this.logger.debug("Discarding polled records, container stopped: " + records.count());
743+
}
744+
return;
745+
}
732746
if (this.consumerPaused && !isPaused()) {
733747
if (this.logger.isDebugEnabled()) {
734748
this.logger.debug("Resuming consumption from: " + this.consumer.paused());
@@ -808,6 +822,12 @@ public void run() {
808822
this.logger.info("Consumer stopped");
809823
}
810824

825+
void wakeIfNecessary() {
826+
if (this.polling.getAndSet(false)) {
827+
this.consumer.wakeup();
828+
}
829+
}
830+
811831
/**
812832
* Handle exceptions thrown by the consumer outside of message listener
813833
* invocation (e.g. commit exceptions).

spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingBatchErrorHandlerTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -91,7 +91,6 @@ public void stopContainerAfterException() throws Exception {
9191
InOrder inOrder = inOrder(this.consumer);
9292
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
9393
inOrder.verify(this.consumer).poll(1000);
94-
inOrder.verify(this.consumer).wakeup();
9594
inOrder.verify(this.consumer).unsubscribe();
9695
inOrder.verify(this.consumer).close();
9796
inOrder.verifyNoMoreInteractions();

spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerBatchModeTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -93,7 +93,6 @@ public void stopContainerAfterException() throws Exception {
9393
InOrder inOrder = inOrder(this.consumer);
9494
inOrder.verify(this.consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
9595
inOrder.verify(this.consumer).poll(1000);
96-
inOrder.verify(this.consumer).wakeup();
9796
inOrder.verify(this.consumer).unsubscribe();
9897
inOrder.verify(this.consumer).close();
9998
inOrder.verifyNoMoreInteractions();

spring-kafka/src/test/java/org/springframework/kafka/listener/ContainerStoppingErrorHandlerRecordModeTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ public void stopContainerAfterException() throws Exception {
101101
Collections.singletonMap(new TopicPartition("foo", 0), new OffsetAndMetadata(2L)));
102102
inOrder.verify(this.consumer).commitSync(
103103
Collections.singletonMap(new TopicPartition("foo", 1), new OffsetAndMetadata(1L)));
104-
inOrder.verify(this.consumer).wakeup();
105104
inOrder.verify(this.consumer).unsubscribe();
106105
inOrder.verify(this.consumer).close();
107106
inOrder.verifyNoMoreInteractions();

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.BDDMockito.willAnswer;
2626
import static org.mockito.Mockito.inOrder;
2727
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.never;
2829
import static org.mockito.Mockito.spy;
2930
import static org.mockito.Mockito.times;
3031
import static org.mockito.Mockito.verify;
@@ -550,6 +551,68 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
550551
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
551552
inOrder.verify(consumer).commitSync(any(Map.class));
552553
container.stop();
554+
verify(consumer).wakeup();
555+
}
556+
557+
@SuppressWarnings("unchecked")
558+
@Test
559+
public void testRecordAckAfterStop() throws Exception {
560+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
561+
Consumer<Integer, String> consumer = mock(Consumer.class);
562+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull())).willReturn(consumer);
563+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
564+
records.put(new TopicPartition("foo", 0), Collections.singletonList(
565+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo")));
566+
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
567+
given(consumer.poll(anyLong())).willAnswer(i -> {
568+
Thread.sleep(50);
569+
return consumerRecords;
570+
});
571+
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
572+
new TopicPartitionInitialOffset("foo", 0) };
573+
ContainerProperties containerProps = new ContainerProperties(topicPartition);
574+
containerProps.setGroupId("grp");
575+
containerProps.setAckMode(AckMode.RECORD);
576+
final CountDownLatch latch1 = new CountDownLatch(1);
577+
final CountDownLatch latch2 = new CountDownLatch(1);
578+
MessageListener<Integer, String> messageListener = spy(
579+
new MessageListener<Integer, String>() { // Cannot be lambda: Mockito doesn't mock final classes
580+
581+
@Override
582+
public void onMessage(ConsumerRecord<Integer, String> data) {
583+
latch1.countDown();
584+
try {
585+
latch2.await(10, TimeUnit.SECONDS);
586+
}
587+
catch (InterruptedException e) {
588+
Thread.currentThread().interrupt();
589+
}
590+
}
591+
592+
});
593+
594+
final CountDownLatch commitLatch = new CountDownLatch(1);
595+
willAnswer(i -> {
596+
commitLatch.countDown();
597+
return null;
598+
}
599+
).given(consumer).commitSync(any());
600+
601+
containerProps.setMessageListener(messageListener);
602+
containerProps.setClientId("clientId");
603+
containerProps.setShutdownTimeout(5L);
604+
KafkaMessageListenerContainer<Integer, String> container =
605+
new KafkaMessageListenerContainer<>(cf, containerProps);
606+
container.start();
607+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
608+
container.stop();
609+
latch2.countDown();
610+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
611+
InOrder inOrder = inOrder(messageListener, consumer);
612+
inOrder.verify(consumer).poll(ContainerProperties.DEFAULT_POLL_TIMEOUT);
613+
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
614+
inOrder.verify(consumer).commitSync(any());
615+
verify(consumer, never()).wakeup();
553616
}
554617

555618
@Test

0 commit comments

Comments
 (0)