Skip to content

Commit 8c22b78

Browse files
committed
GH-1247: Polishing
- revert lastPoll position - fix return/break issue - fix race in existing test
1 parent 309d28e commit 8c22b78

File tree

2 files changed

+2
-3
lines changed

2 files changed

+2
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,6 @@ public void run() {
730730
}
731731
publishConsumerPausedEvent(this.consumer.assignment());
732732
}
733-
this.lastPoll = System.currentTimeMillis();
734733
this.polling.set(true);
735734
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
736735
if (!this.polling.compareAndSet(true, false)) {
@@ -741,8 +740,9 @@ public void run() {
741740
if (records.count() > 0 && this.logger.isDebugEnabled()) {
742741
this.logger.debug("Discarding polled records, container stopped: " + records.count());
743742
}
744-
return;
743+
break;
745744
}
745+
this.lastPoll = System.currentTimeMillis();
746746
if (this.consumerPaused && !isPaused()) {
747747
if (this.logger.isDebugEnabled()) {
748748
this.logger.debug("Resuming consumption from: " + this.consumer.paused());

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,6 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
551551
inOrder.verify(messageListener).onMessage(any(ConsumerRecord.class));
552552
inOrder.verify(consumer).commitSync(any(Map.class));
553553
container.stop();
554-
verify(consumer).wakeup();
555554
}
556555

557556
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)