Skip to content

Commit e2167ac

Browse files
garyrussellartembilan
authored andcommitted
AckMode.MANUAL_IMMEDIATE - wake Consumer
When processing manual acks on a "foreign" thread, the commits are not made until the consumer wakes from his `poll()`. For `AckMode.MANUAL_IMMEDIATE`, we should wake the consumer so the offset(s) are committed as soon as possible. **cherry-pick to 2.2.x**
1 parent 5fa8601 commit e2167ac

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ public void run() {
700700
pollAndInvoke();
701701
}
702702
catch (@SuppressWarnings(UNUSED) WakeupException e) {
703-
// Ignore, we're stopping
703+
// Ignore, we're stopping or applying immediate foreign acks
704704
}
705705
catch (NoOffsetForPartitionException nofpe) {
706706
this.fatalError = true;
@@ -893,6 +893,9 @@ private void processAck(ConsumerRecord<K, V> record) {
893893
if (!Thread.currentThread().equals(this.consumerThread)) {
894894
try {
895895
this.acks.put(record);
896+
if (this.isManualImmediateAck) {
897+
this.consumer.wakeup();
898+
}
896899
}
897900
catch (InterruptedException e) {
898901
Thread.currentThread().interrupt();

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,15 +608,25 @@ private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exceptio
608608
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
609609
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
610610
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
611+
long sleepFor = ackMode.equals(AckMode.MANUAL_IMMEDIATE) ? 20_000 : 50;
612+
AtomicBoolean first = new AtomicBoolean(true);
611613
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
612-
Thread.sleep(50);
614+
if (!first.getAndSet(false)) {
615+
try {
616+
Thread.sleep(sleepFor);
617+
}
618+
catch (@SuppressWarnings("unused") InterruptedException ex) {
619+
throw new WakeupException();
620+
}
621+
}
613622
return consumerRecords;
614623
});
615624
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
616625
new TopicPartitionInitialOffset("foo", 0) };
617626
ContainerProperties containerProps = new ContainerProperties(topicPartition);
618627
containerProps.setGroupId("grp");
619628
containerProps.setAckMode(ackMode);
629+
containerProps.setMissingTopicsFatal(false);
620630
final CountDownLatch latch = new CountDownLatch(2);
621631
final List<Acknowledgment> acks = new ArrayList<>();
622632
final AtomicReference<Thread> consumerThread = new AtomicReference<>();
@@ -634,6 +644,10 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
634644
}
635645

636646
});
647+
willAnswer(inv -> {
648+
consumerThread.get().interrupt();
649+
return null;
650+
}).given(consumer).wakeup();
637651

638652
final CountDownLatch commitLatch = new CountDownLatch(1);
639653
final AtomicReference<Thread> commitThread = new AtomicReference<>();
@@ -650,6 +664,7 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
650664
new KafkaMessageListenerContainer<>(cf, containerProps);
651665
container.start();
652666
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
667+
long t1 = System.currentTimeMillis();
653668
acks.get(1).acknowledge();
654669
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
655670
InOrder inOrder = inOrder(messageListener, consumer);
@@ -658,6 +673,7 @@ public void onMessage(ConsumerRecord<Integer, String> data, Acknowledgment ackno
658673
inOrder.verify(consumer).commitSync(any(Map.class));
659674
container.stop();
660675
assertThat(commitThread.get()).isSameAs(consumerThread.get());
676+
assertThat(System.currentTimeMillis() - t1).isLessThan(15_000);
661677
}
662678

663679
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)