Skip to content

Commit 7aa3d61

Browse files
authored
GH-1547: Reset BackOff State on Recovery Failure
Resolves #1547 It generally makes sense to repeat the back offs after a recovery failure instead of attempting recovery immediately on the next delivery. Add an option to revert to the previous behavior. * Fix code typos in docs
1 parent ec98428 commit 7aa3d61

File tree

7 files changed

+131
-8
lines changed

7 files changed

+131
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,16 @@ public void setCommitRecovered(boolean commitRecovered) {
107107
this.commitRecovered = commitRecovered;
108108
}
109109

110+
/**
111+
* Set to false to immediately attempt to recover on the next attempt instead
112+
* of repeating the BackOff cycle when recovery fails.
113+
* @param resetStateOnRecoveryFailure false to retain state.
114+
* @since 3.5.5
115+
*/
116+
public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) {
117+
this.failureTracker.setResetStateOnRecoveryFailure(resetStateOnRecoveryFailure);
118+
}
119+
110120
@Override
111121
public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
112122
return this.failureTracker.deliveryAttempt(topicPartitionOffset);

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ class FailedRecordTracker {
4848

4949
private final BackOff backOff;
5050

51+
private boolean resetStateOnRecoveryFailure = true;
52+
5153
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff,
5254
LogAccessor logger) {
5355

@@ -73,9 +75,19 @@ class FailedRecordTracker {
7375
this.backOff = backOff;
7476
}
7577

78+
/**
79+
* Set to false to immediately attempt to recover on the next attempt instead
80+
* of repeating the BackOff cycle when recovery fails.
81+
* @param resetStateOnRecoveryFailure false to retain state.
82+
* @since 3.5.5
83+
*/
84+
public void setResetStateOnRecoveryFailure(boolean resetStateOnRecoveryFailure) {
85+
this.resetStateOnRecoveryFailure = resetStateOnRecoveryFailure;
86+
}
87+
7688
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
7789
if (this.noRetries) {
78-
this.recoverer.accept(record, exception);
90+
attemptRecovery(record, exception, null);
7991
return true;
8092
}
8193
Map<TopicPartition, FailedRecord> map = this.failures.get();
@@ -103,7 +115,7 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
103115
return false;
104116
}
105117
else {
106-
this.recoverer.accept(record, exception);
118+
attemptRecovery(record, exception, topicPartition);
107119
map.remove(topicPartition);
108120
if (map.isEmpty()) {
109121
this.failures.remove();
@@ -112,6 +124,18 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
112124
}
113125
}
114126

127+
private void attemptRecovery(ConsumerRecord<?, ?> record, Exception exception, @Nullable TopicPartition tp) {
128+
try {
129+
this.recoverer.accept(record, exception);
130+
}
131+
catch (RuntimeException e) {
132+
if (tp != null && this.resetStateOnRecoveryFailure) {
133+
this.failures.get().remove(tp);
134+
}
135+
throw e;
136+
}
137+
}
138+
115139
void clearThreadState() {
116140
this.failures.remove();
117141
}

spring-kafka/src/test/java/org/springframework/kafka/listener/RecoveringBatchErrorHandlerIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
179179

180180
};
181181
RecoveringBatchErrorHandler errorHandler = new RecoveringBatchErrorHandler(recoverer, new FixedBackOff(0L, 1));
182+
errorHandler.setResetStateOnRecoveryFailure(false);
182183
container.setBatchErrorHandler(errorHandler);
183184
final CountDownLatch stopLatch = new CountDownLatch(1);
184185
container.setApplicationEventPublisher(e -> {

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.assertj.core.api.Assertions.fail;
2122
import static org.mockito.ArgumentMatchers.any;
2223
import static org.mockito.ArgumentMatchers.eq;
2324
import static org.mockito.BDDMockito.given;
25+
import static org.mockito.BDDMockito.willAnswer;
2426
import static org.mockito.Mockito.mock;
2527
import static org.mockito.Mockito.never;
2628
import static org.mockito.Mockito.spy;
@@ -36,6 +38,7 @@
3638
import java.util.Map;
3739
import java.util.concurrent.CountDownLatch;
3840
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicBoolean;
3942
import java.util.concurrent.atomic.AtomicReference;
4043
import java.util.function.BiConsumer;
4144

@@ -194,6 +197,70 @@ public void seekToCurrentErrorHandlerRecovers() {
194197
verify(recoverer).accept(eq(records.get(0)), any());
195198
}
196199

200+
@Test
201+
public void seekToCurrentErrorHandlerRecovererFailsBackOffReset() {
202+
@SuppressWarnings("unchecked")
203+
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
204+
AtomicBoolean fail = new AtomicBoolean(true);
205+
willAnswer(incovation -> {
206+
if (fail.getAndSet(false)) {
207+
throw new RuntimeException("recovery failed");
208+
}
209+
return null;
210+
}).given(recoverer).accept(any(), any());
211+
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1));
212+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
213+
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
214+
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
215+
Consumer<?, ?> consumer = mock(Consumer.class);
216+
assertThatExceptionOfType(KafkaException.class).isThrownBy(
217+
() -> eh.handle(new RuntimeException(), records, consumer, null));
218+
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
219+
verifyNoMoreInteractions(consumer);
220+
assertThatExceptionOfType(KafkaException.class).isThrownBy(
221+
() -> eh.handle(new RuntimeException(), records, consumer, null));
222+
verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L);
223+
assertThatExceptionOfType(KafkaException.class).isThrownBy(
224+
() -> eh.handle(new RuntimeException(), records, consumer, null));
225+
verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L);
226+
eh.handle(new RuntimeException(), records, consumer, null);
227+
verify(consumer, times(3)).seek(new TopicPartition("foo", 0), 0L);
228+
verify(consumer).seek(new TopicPartition("foo", 0), 1L);
229+
verifyNoMoreInteractions(consumer);
230+
verify(recoverer, times(2)).accept(eq(records.get(0)), any());
231+
}
232+
233+
@Test
234+
public void seekToCurrentErrorHandlerRecovererFailsBackOffNotReset() {
235+
@SuppressWarnings("unchecked")
236+
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = mock(BiConsumer.class);
237+
AtomicBoolean fail = new AtomicBoolean(true);
238+
willAnswer(incovation -> {
239+
if (fail.getAndSet(false)) {
240+
throw new RuntimeException("recovery failed");
241+
}
242+
return null;
243+
}).given(recoverer).accept(any(), any());
244+
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 1));
245+
eh.setResetStateOnRecoveryFailure(false);
246+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
247+
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
248+
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
249+
Consumer<?, ?> consumer = mock(Consumer.class);
250+
assertThatExceptionOfType(KafkaException.class).isThrownBy(
251+
() -> eh.handle(new RuntimeException(), records, consumer, null));
252+
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
253+
verifyNoMoreInteractions(consumer);
254+
assertThatExceptionOfType(KafkaException.class).isThrownBy(
255+
() -> eh.handle(new RuntimeException(), records, consumer, null));
256+
verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L);
257+
eh.handle(new RuntimeException(), records, consumer, null); // immediate re-attempt recovery
258+
verify(consumer, times(2)).seek(new TopicPartition("foo", 0), 0L);
259+
verify(consumer).seek(new TopicPartition("foo", 0), 1L);
260+
verifyNoMoreInteractions(consumer);
261+
verify(recoverer, times(2)).accept(eq(records.get(0)), any());
262+
}
263+
197264
@Test
198265
public void seekToCurrentErrorHandlerRecoversManualAcksAsync() {
199266
seekToCurrentErrorHandlerRecoversManualAcks(false);

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
625625
};
626626
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
627627
spy(new DefaultAfterRollbackProcessor<>(recoverer, new FixedBackOff(0L, 2L), dlTemplate, true));
628+
afterRollbackProcessor.setResetStateOnRecoveryFailure(false);
628629
container.setAfterRollbackProcessor(afterRollbackProcessor);
629630
final CountDownLatch stopLatch = new CountDownLatch(1);
630631
container.setApplicationEventPublisher(e -> {

src/reference/asciidoc/kafka.adoc

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2128,7 +2128,10 @@ public SeekToCurrentErrorHandler eh() {
21282128

21292129
However, see the note at the beginning of this section; you can avoid using the `RetryTemplate` altogether.
21302130

2131-
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
2131+
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
2132+
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
2133+
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
2134+
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.
21322135

21332136
[[container-props]]
21342137
==== Listener Container Properties
@@ -4587,7 +4590,7 @@ Here is an example that adds `IllegalArgumentException` to the not-retryable exc
45874590
[source, java]
45884591
----
45894592
@Bean
4590-
public SeekToCurrentErrorHandler errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
4593+
public SeekToCurrentErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
45914594
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler(recoverer);
45924595
handler.addNotRetryableException(IllegalArgumentException.class);
45934596
return handler;
@@ -4618,7 +4621,10 @@ However, since this error handler has no mechanism to "recover" after retries ar
46184621
Again, the maximum delay must be less than the `max.poll.interval.ms` consumer property.
46194622
Also see <<retrying-batch-eh>>.
46204623

4621-
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
4624+
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
4625+
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
4626+
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
4627+
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.
46224628

46234629
Starting with version 2.3.2, after a record has been recovered, its offset will be committed (if one of the container `AckMode` s is configured).
46244630
To revert to the previous behavior, set the error handler's `ackAfterHandle` property to false.
@@ -4721,7 +4727,12 @@ public void listen(List<Thing> things) {
47214727
----
47224728
====
47234729

4724-
IMPORTANT: This error handler cannot be used with transactions.
4730+
IMPORTANT: This error handler cannot be used with transactions
4731+
4732+
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
4733+
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
4734+
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
4735+
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` to `false`.
47254736

47264737
===== Container Stopping Error Handlers
47274738

@@ -4773,7 +4784,10 @@ Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked
47734784
Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction.
47744785
To enable this feature, set the `commitRecovered` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.
47754786

4776-
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
4787+
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
4788+
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
4789+
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
4790+
To revert to the previous behavior, set the processor's `resetStateOnRecoveryFailure` property to `false`.
47774791

47784792
Starting with version 2.3.1, similar to the `SeekToCurrentErrorHandler`, the `DefaultAfterRollbackProcessor` considers certain exceptions to be fatal, and retries are skipped for such exceptions; the recoverer is invoked on the first failure.
47794793
The exceptions that are considered fatal, by default, are:
@@ -4905,7 +4919,10 @@ A `LinkedHashMap` is recommended so that the keys are examined in order.
49054919

49064920
When publishing `null` values, when there are multiple templates, the recoverer will look for a template for the `Void` class; if none is present, the first template from the `values().iterator()` will be used.
49074921

4908-
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
4922+
IMPORTANT: If the recoverer fails (throws an exception), the failed record will be included in the seeks.
4923+
Starting with version 2.5.5, if the recoverer fails, the `BackOff` will be reset by default and redeliveries will again go through the back offs before recovery is attempted again.
4924+
With earlier versions, the `BackOff` was not reset and recovery was re-attempted on the next failure.
4925+
To revert to the previous behavior, set the error handler's `resetStateOnRecoveryFailure` property to `false`.
49094926

49104927
Starting with version 2.3, the recoverer can also be used with Kafka Streams - see <<streams-deser-recovery>> for more information.
49114928

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ This version requires the 2.6.0 `kafka-clients`.
1212

1313
The default `EOSMode` is now `BETA`.
1414
See <<exactly-once>> for more information.
15+
16+
Various error handlers (that extend `FailedRecordProcessor`) and the `DefaultAfterRollbackProcessor` now reset the `BackOff` if recovery fails.
17+
See <<seek-to-current>>, <<recovering-batch-eh>>, <<dead-letters>> and <<after-rollback>> for more information.

0 commit comments

Comments
 (0)