Skip to content

Commit 705930c

Browse files
garyrussellartembilan
authored andcommitted
GH-1353: Fix sleep with BatchListener nack
Resolves #1363 **cherry-pick to 2.3.x** (cherry picked from commit cd6c2aa)
1 parent 516783d commit 705930c

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -567,9 +567,10 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
567567

568568
private final boolean subBatchPerPartition = this.containerProperties.isSubBatchPerPartition();
569569

570-
private Map<TopicPartition, OffsetMetadata> definedPartitions;
570+
private final Duration authorizationExceptionRetryInterval =
571+
this.containerProperties.getAuthorizationExceptionRetryInterval();
571572

572-
private Duration authorizationExceptionRetryInterval = this.containerProperties.getAuthorizationExceptionRetryInterval();
573+
private Map<TopicPartition, OffsetMetadata> definedPartitions;
573574

574575
private int count;
575576

@@ -1438,7 +1439,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
14381439
processCommits();
14391440
}
14401441
SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger);
1441-
this.nackSleep = -1;
1442+
nackSleepAndReset();
14421443
}
14431444
}
14441445

@@ -1606,6 +1607,10 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
16061607
}
16071608
}
16081609
SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger);
1610+
nackSleepAndReset();
1611+
}
1612+
1613+
private void nackSleepAndReset() {
16091614
try {
16101615
Thread.sleep(this.nackSleep);
16111616
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public class ManualNackBatchTests {
9191
@Test
9292
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
9393
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
94+
assertThat(this.config.replayTime).isBetween(50L, 30_000L);
9495
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
9596
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
9697
this.registry.stop();
@@ -117,24 +118,27 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
117118
@EnableKafka
118119
public static class Config {
119120

120-
private final List<String> contents = new ArrayList<>();
121+
final List<String> contents = new ArrayList<>();
121122

122-
private final CountDownLatch pollLatch = new CountDownLatch(3);
123+
final CountDownLatch pollLatch = new CountDownLatch(3);
123124

124-
private final CountDownLatch deliveryLatch = new CountDownLatch(2);
125+
final CountDownLatch deliveryLatch = new CountDownLatch(2);
125126

126-
private final CountDownLatch commitLatch = new CountDownLatch(2);
127+
final CountDownLatch commitLatch = new CountDownLatch(2);
127128

128-
private final CountDownLatch closeLatch = new CountDownLatch(1);
129+
final CountDownLatch closeLatch = new CountDownLatch(1);
129130

130-
private final AtomicBoolean fail = new AtomicBoolean(true);
131+
final AtomicBoolean fail = new AtomicBoolean(true);
132+
133+
volatile long replayTime;
131134

132135
@KafkaListener(id = CONTAINER_ID, topics = "foo")
133136
public void foo(List<String> in, Acknowledgment ack) {
134137
contents.addAll(in);
138+
this.replayTime = System.currentTimeMillis() - this.replayTime;
135139
this.deliveryLatch.countDown();
136140
if (this.fail.getAndSet(false)) {
137-
ack.nack(3, 0);
141+
ack.nack(3, 50);
138142
}
139143
else {
140144
ack.acknowledge();

0 commit comments

Comments
 (0)