Skip to content

Commit c43a85f

Browse files
garyrussellartembilan
authored andcommitted
FailedRecordProcessors - selectable BackOff
Resolves #1559 Provide a mechanism to select which `BackOff` to apply.
1 parent bc65c9e commit c43a85f

File tree

5 files changed

+123
-6
lines changed

5 files changed

+123
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.function.BiConsumer;
23+
import java.util.function.BiFunction;
2324
import java.util.function.BiPredicate;
2425

2526
import org.apache.commons.logging.LogFactory;
@@ -108,6 +109,17 @@ public void setCommitRecovered(boolean commitRecovered) {
108109
this.commitRecovered = commitRecovered;
109110
}
110111

112+
/**
113+
* Set a function to dynamically determine the {@link BackOff} to use, based on the
114+
* consumer record and/or exception. If null is returned, the default BackOff will be
115+
* used.
116+
* @param backOffFunction the function.
117+
* @since 2.6
118+
*/
119+
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
120+
this.failureTracker.setBackOffFunction(backOffFunction);
121+
}
122+
111123
/**
112124
* Set to false to immediately attempt to recover on the next attempt instead
113125
* of repeating the BackOff cycle when recovery fails.

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Map;
2121
import java.util.concurrent.atomic.AtomicInteger;
2222
import java.util.function.BiConsumer;
23+
import java.util.function.BiFunction;
2324

2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526
import org.apache.kafka.common.TopicPartition;
@@ -48,6 +49,8 @@ class FailedRecordTracker {
4849

4950
private final BackOff backOff;
5051

52+
private BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction;
53+
5154
private boolean resetStateOnRecoveryFailure = true;
5255

5356
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
@@ -75,6 +78,17 @@ class FailedRecordTracker {
7578
this.backOff = backOff;
7679
}
7780

81+
/**
82+
* Set a function to dynamically determine the {@link BackOff} to use, based on the
83+
* consumer record and/or exception. If null is returned, the default BackOff will be
84+
* used.
85+
* @param backOffFunction the function.
86+
* @since 2.6
87+
*/
88+
public void setBackOffFunction(BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> backOffFunction) {
89+
this.backOffFunction = backOffFunction;
90+
}
91+
7892
/**
7993
* Set to false to immediately attempt to recover on the next attempt instead
8094
* of repeating the BackOff cycle when recovery fails.
@@ -98,7 +112,7 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
98112
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
99113
FailedRecord failedRecord = map.get(topicPartition);
100114
if (failedRecord == null || failedRecord.getOffset() != record.offset()) {
101-
failedRecord = new FailedRecord(record.offset(), this.backOff.start());
115+
failedRecord = new FailedRecord(record.offset(), determineBackOff(record, exception).start());
102116
map.put(topicPartition, failedRecord);
103117
}
104118
else {
@@ -124,6 +138,14 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
124138
}
125139
}
126140

141+
private BackOff determineBackOff(ConsumerRecord<?, ?> record, Exception exception) {
142+
if (this.backOffFunction == null) {
143+
return this.backOff;
144+
}
145+
BackOff backOff = this.backOffFunction.apply(record, exception);
146+
return backOff != null ? backOff : this.backOff;
147+
}
148+
127149
private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @Nullable TopicPartition tp) {
128150
try {
129151
this.recoverer.accept(record, exception);

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,16 +17,22 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.given;
2021
import static org.mockito.Mockito.mock;
2122

2223
import java.util.ArrayList;
2324
import java.util.List;
25+
import java.util.Map;
2426
import java.util.concurrent.atomic.AtomicBoolean;
2527

2628
import org.apache.kafka.clients.consumer.ConsumerRecord;
29+
import org.apache.kafka.common.TopicPartition;
2730
import org.junit.jupiter.api.Test;
2831

2932
import org.springframework.core.log.LogAccessor;
33+
import org.springframework.kafka.test.utils.KafkaTestUtils;
34+
import org.springframework.util.backoff.BackOff;
35+
import org.springframework.util.backoff.BackOffExecution;
3036
import org.springframework.util.backoff.FixedBackOff;
3137

3238
/**
@@ -37,7 +43,7 @@
3743
public class FailedRecordTrackerTests {
3844

3945
@Test
40-
public void testNoRetries() {
46+
void testNoRetries() {
4147
AtomicBoolean recovered = new AtomicBoolean();
4248
FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> {
4349
recovered.set(true);
@@ -48,7 +54,7 @@ public void testNoRetries() {
4854
}
4955

5056
@Test
51-
public void testThreeRetries() {
57+
void testThreeRetries() {
5258
AtomicBoolean recovered = new AtomicBoolean();
5359
FailedRecordTracker tracker = new FailedRecordTracker((r, e) -> {
5460
recovered.set(true);
@@ -62,7 +68,7 @@ public void testThreeRetries() {
6268
}
6369

6470
@Test
65-
public void testSuccessAfterFailure() {
71+
void testSuccessAfterFailure() {
6672
FailedRecordTracker tracker = new FailedRecordTracker(null, new FixedBackOff(0L, 1L), mock(LogAccessor.class));
6773
ConsumerRecord<?, ?> record = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
6874
assertThat(tracker.skip(record, new RuntimeException())).isFalse();
@@ -77,7 +83,7 @@ record = new ConsumerRecord<>("bar", 1, 1L, "bar", "baz");
7783
}
7884

7985
@Test
80-
public void testDifferentOrder() {
86+
void testDifferentOrder() {
8187
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
8288
FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> {
8389
records.add(rec);
@@ -93,4 +99,36 @@ public void testDifferentOrder() {
9399
assertThat(records).hasSize(2);
94100
}
95101

102+
@Test
103+
void multiBackOffs() {
104+
BackOff bo1 = mock(BackOff.class);
105+
BackOffExecution be1 = mock(BackOffExecution.class);
106+
given(bo1.start()).willReturn(be1);
107+
BackOff bo2 = mock(BackOff.class);
108+
BackOffExecution be2 = mock(BackOffExecution.class);
109+
given(bo2.start()).willReturn(be2);
110+
FailedRecordTracker tracker = new FailedRecordTracker((rec, ex) -> { }, bo1, mock(LogAccessor.class));
111+
tracker.setBackOffFunction((record, ex) -> {
112+
if (record.topic().equals("foo")) {
113+
return bo2;
114+
}
115+
else {
116+
return null;
117+
}
118+
});
119+
@SuppressWarnings("unchecked")
120+
ThreadLocal<Map<TopicPartition, Object>> failures = (ThreadLocal<Map<TopicPartition, Object>>) KafkaTestUtils
121+
.getPropertyValue(tracker, "failures");
122+
ConsumerRecord<?, ?> record1 = new ConsumerRecord<>("foo", 0, 0L, "bar", "baz");
123+
tracker.skip(record1, new RuntimeException());
124+
assertThat(KafkaTestUtils.getPropertyValue(failures.get()
125+
.get(new TopicPartition("foo", 0)), "backOffExecution"))
126+
.isSameAs(be2);
127+
ConsumerRecord<?, ?> record2 = new ConsumerRecord<>("bar", 0, 0L, "bar", "baz");
128+
tracker.skip(record2, new RuntimeException());
129+
assertThat(KafkaTestUtils.getPropertyValue(failures.get()
130+
.get(new TopicPartition("bar", 0)), "backOffExecution"))
131+
.isSameAs(be1);
132+
}
133+
96134
}

src/reference/asciidoc/kafka.adoc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2159,6 +2159,17 @@ Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset
21592159
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
21602160
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.
21612161

2162+
Starting with version 2.6, you can now provide the error handler with a `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception:
2163+
2164+
====
2165+
[source, java]
2166+
----
2167+
handler.setBackOffFunction((record, ex) -> { ... });
2168+
----
2169+
====
2170+
2171+
If the function returns `null`, the handler's default `BackOff` will be used.
2172+
21622173
[[container-props]]
21632174
==== Listener Container Properties
21642175

@@ -4654,6 +4665,17 @@ To revert to the previous behavior, set the error handler's `resetStateOnRecover
46544665
Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container `AckMode` s is configured).
46554666
To revert to the previous behavior, set the error handler's `ackAfterHandle` property to false.
46564667

4668+
Starting with version 2.6, you can now provide the error handler with a `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception:
4669+
4670+
====
4671+
[source, java]
4672+
----
4673+
handler.setBackOffFunction((record, ex) -> { ... });
4674+
----
4675+
====
4676+
4677+
If the function returns `null`, the handler's default `BackOff` will be used.
4678+
46574679
Also see <<delivery-header>>.
46584680

46594681
[[retrying-batch-eh]]
@@ -4759,6 +4781,17 @@ Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset
47594781
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
47604782
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.
47614783

4784+
Starting with version 2.6, you can now provide the error handler with a `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception:
4785+
4786+
====
4787+
[source, java]
4788+
----
4789+
handler.setBackOffFunction((record, ex) -> { ... });
4790+
----
4791+
====
4792+
4793+
If the function returns `null`, the handler's default `BackOff` will be used.
4794+
47624795
===== Container Stopping Error Handlers
47634796

47644797
The `ContainerStoppingErrorHandler` (used with record listeners) stops the container if the listener throws an exception.
@@ -4814,6 +4847,17 @@ Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset
48144847
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
48154848
To revert to the previous behavior, set the processor's `resetStateOnRecoveryFailure` property to `false`.
48164849

4850+
Starting with version 2.6, you can now provide the processor with a `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` to determine the `BackOff` to use, based on the failed record and/or the exception:
4851+
4852+
====
4853+
[source, java]
4854+
----
4855+
handler.setBackOffFunction((record, ex) -> { ... });
4856+
----
4857+
====
4858+
4859+
If the function returns `null`, the processor's default `BackOff` will be used.
4860+
48174861
Starting with version 2.3.1, similar to the `SeekToCurrentErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
48184862
The exceptions that are considered fatal, by default, are:
48194863

src/reference/asciidoc/whats-new.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ The default `EOSMode` is now `BETA`.
1414
See <<exactly-once>> for more information.
1515

1616
Various error handlers (that extend `FailedRecordProcessor`) and the `DefaultAfterRollbackProcessor` now reset the `BackOff` if recovery fails.
17+
In addition, you can now select the `BackOff` to use based on the failed record and/or exception.
1718
See <<seek-to-current>>, <<recovering-batch-eh>>, <<dead-letters>> and <<after-rollback>> for more information.
1819

1920
==== @KafkaLisener Changes

0 commit comments

Comments
 (0)