Skip to content

Commit 959d344

Browse files
garyrussellartembilan
authored andcommitted
GH-1354: Fix default BackOff with deprecated CTOR
Resolves #1354 When a negative `maxAttempts` was used, the default backOff interval was 5s instead of 0. **cherry-pick to 2.3.x**
1 parent e16b433 commit 959d344

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Excep
7373

7474
private static FixedBackOff maxFailuresToBackOff(int maxFailures) {
7575
if (maxFailures < 0) {
76-
return new FixedBackOff();
76+
return new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS);
7777
}
7878
return new FixedBackOff(0L, maxFailures == 0 ? 0 : maxFailures - 1);
7979
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.kafka.test.utils.KafkaTestUtils;
24+
25+
/**
26+
* @author Gary Russell
27+
* @since 2.3.6
28+
*
29+
*/
30+
public class FailedRecordProcessorTests {
31+
32+
@Test
33+
void testDefaultBackOff() {
34+
FailedRecordProcessor frp = new FailedRecordProcessor(null, 1) {
35+
};
36+
assertThat(KafkaTestUtils.getPropertyValue(frp, "failureTracker.backOff.interval", Long.class)).isEqualTo(0L);
37+
assertThat(KafkaTestUtils.getPropertyValue(frp, "failureTracker.backOff.maxAttempts", Long.class))
38+
.isEqualTo(0L);
39+
frp = new FailedRecordProcessor(null, 0) {
40+
};
41+
assertThat(KafkaTestUtils.getPropertyValue(frp, "failureTracker.backOff.interval", Long.class)).isEqualTo(0L);
42+
assertThat(KafkaTestUtils.getPropertyValue(frp, "failureTracker.backOff.maxAttempts", Long.class))
43+
.isEqualTo(0L);
44+
frp = new FailedRecordProcessor(null, -1) {
45+
};
46+
assertThat(KafkaTestUtils.getPropertyValue(frp, "failureTracker.backOff.interval", Long.class)).isEqualTo(0L);
47+
assertThat(KafkaTestUtils.getPropertyValue(frp, "failureTracker.backOff.maxAttempts", Long.class))
48+
.isEqualTo(Long.MAX_VALUE);
49+
}
50+
51+
}

0 commit comments

Comments
 (0)