Skip to content

Commit 6cabfe5

Browse files
garyrussellartembilan
authored andcommitted
GH-992: Fix SeekToCurrent zero retries
Fixes #992 The `SeekToCurrentErrorHandler` and `DefaultAfterRollbackProcessor` always retried at least one time, even if `maxAttempts` was 1. **cherry-pick to 2.2.x** * Fix infinite retries * Fix test class name and new Sonar issue.
1 parent c226c9e commit 6cabfe5

File tree

3 files changed

+85
-10
lines changed

3 files changed

+85
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import java.time.temporal.ValueRange;
1920
import java.util.function.BiConsumer;
2021

2122
import org.apache.commons.logging.Log;
@@ -38,6 +39,8 @@ class FailedRecordTracker {
3839

3940
private final int maxFailures;
4041

42+
private final boolean noRetries;
43+
4144
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures, Log logger) {
4245
if (recoverer == null) {
4346
this.recoverer = (r, t) -> logger.error("Max failures (" + maxFailures + ") reached for: " + r, t);
@@ -46,24 +49,34 @@ class FailedRecordTracker {
4649
this.recoverer = recoverer;
4750
}
4851
this.maxFailures = maxFailures;
52+
this.noRetries = ValueRange.of(0, 1).isValidIntValue(maxFailures);
4953
}
5054

5155
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
56+
if (this.noRetries) {
57+
this.recoverer.accept(record, exception);
58+
return true;
59+
}
5260
FailedRecord failedRecord = this.failures.get();
53-
if (failedRecord == null || !failedRecord.getTopic().equals(record.topic())
54-
|| failedRecord.getPartition() != record.partition() || failedRecord.getOffset() != record.offset()) {
61+
if (this.maxFailures > 0 && (failedRecord == null || newFailure(record, failedRecord))) {
5562
this.failures.set(new FailedRecord(record.topic(), record.partition(), record.offset()));
5663
return false;
5764
}
58-
else {
59-
if (this.maxFailures >= 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
65+
else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
6066
this.recoverer.accept(record, exception);
6167
return true;
62-
}
68+
}
69+
else {
6370
return false;
6471
}
6572
}
6673

74+
private boolean newFailure(ConsumerRecord<?, ?> record, FailedRecord failedRecord) {
75+
return !failedRecord.getTopic().equals(record.topic())
76+
|| failedRecord.getPartition() != record.partition()
77+
|| failedRecord.getOffset() != record.offset();
78+
}
79+
6780
void clearThreadState() {
6881
this.failures.remove();
6982
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@
115115
public class KafkaMessageListenerContainer<K, V> // NOSONAR comment density
116116
extends AbstractMessageListenerContainer<K, V> {
117117

118+
private static final String UNUSED = "unused";
119+
118120
private static final int DEFAULT_ACK_TIME = 5000;
119121

120122
private final AbstractMessageListenerContainer<K, V> container;
@@ -695,7 +697,7 @@ public void run() {
695697
try {
696698
pollAndInvoke();
697699
}
698-
catch (@SuppressWarnings("unused") WakeupException e) {
700+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
699701
// Ignore, we're stopping
700702
}
701703
catch (NoOffsetForPartitionException nofpe) {
@@ -814,7 +816,7 @@ public void wrapUp() {
814816
try {
815817
this.consumer.unsubscribe();
816818
}
817-
catch (@SuppressWarnings("unused") WakeupException e) {
819+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
818820
// No-op. Continue process
819821
}
820822
}
@@ -900,7 +902,7 @@ private void processAck(ConsumerRecord<K, V> record) {
900902
try {
901903
ackImmediate(record);
902904
}
903-
catch (@SuppressWarnings("unused") WakeupException e) {
905+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
904906
// ignore - not polling
905907
}
906908
}
@@ -1040,7 +1042,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
10401042
throw er;
10411043
}
10421044
}
1043-
catch (@SuppressWarnings("unused") InterruptedException e) {
1045+
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
10441046
Thread.currentThread().interrupt();
10451047
}
10461048
return null;
@@ -1539,7 +1541,7 @@ private void commitIfNecessary() {
15391541
this.consumer.commitAsync(commits, this.commitCallback);
15401542
}
15411543
}
1542-
catch (@SuppressWarnings("unused") WakeupException e) {
1544+
catch (@SuppressWarnings(UNUSED) WakeupException e) {
15431545
// ignore - not polling
15441546
this.logger.debug("Woken up during commit");
15451547
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.apache.commons.logging.Log;
25+
import org.apache.kafka.clients.consumer.ConsumerRecord;
26+
import org.junit.jupiter.api.Test;
27+
28+
/**
29+
* @author Gary Russell
30+
* @since 2.2.5
31+
*
32+
*/
33+
public class FailedRecordTrackerTests {
34+
35+
@Test
36+
public void testNoRetries() {
37+
AtomicBoolean recovered = new AtomicBoolean();
38+
FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> {
39+
recovered.set(true);
40+
}, 1, mock(Log.class));
41+
ConsumerRecord<?, ?> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
42+
assertThat(tracker.skip(record, new RuntimeException())).isTrue();
43+
assertThat(recovered.get()).isTrue();
44+
}
45+
46+
@Test
47+
public void testThreeRetries() {
48+
AtomicBoolean recovered = new AtomicBoolean();
49+
FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> {
50+
recovered.set(true);
51+
}, 4, mock(Log.class));
52+
ConsumerRecord<?, ?> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
53+
assertThat(tracker.skip(record, new RuntimeException())).isFalse();
54+
assertThat(tracker.skip(record, new RuntimeException())).isFalse();
55+
assertThat(tracker.skip(record, new RuntimeException())).isFalse();
56+
assertThat(tracker.skip(record, new RuntimeException())).isTrue();
57+
assertThat(recovered.get()).isTrue();
58+
}
59+
60+
}

0 commit comments

Comments
 (0)