Skip to content

Commit b05b04c

Browse files
garyrussellartembilan
authored andcommitted
GH-1259: Handle Failed Record Recovery
Resolves #1259 Previously if the recoverer in a `SeekToCurrentErrorHandler` or `DefaultAfterRollbackProcessor` failed to recover a record, the record could be lost; the `FailedRecordTracker` simply logged the exception. Change the `SeekUtils` to detect a failure in the recoverer (actually any failure when determining if the failed record should be recovered) and include the failed record in the seeks. In this way the recovery will be attempted once more on each delivery attempt. **cherry-pick to 2.2.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java # spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java # src/reference/asciidoc/kafka.adoc
1 parent ee6e9da commit b05b04c

File tree

4 files changed

+24
-15
lines changed

4 files changed

+24
-15
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ class FailedRecordTracker {
6060

6161
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
6262
if (this.noRetries) {
63-
recover(record, exception);
63+
this.recoverer.accept(record, exception);
6464
return true;
6565
}
6666
Map<TopicPartition, FailedRecord> map = this.failures.get();
@@ -76,7 +76,7 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
7676
return false;
7777
}
7878
else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailures) {
79-
recover(record, exception);
79+
this.recoverer.accept(record, exception);
8080
map.remove(topicPartition);
8181
if (map.isEmpty()) {
8282
this.failures.remove();
@@ -88,15 +88,6 @@ else if (this.maxFailures > 0 && failedRecord.incrementAndGet() >= this.maxFailu
8888
}
8989
}
9090

91-
private void recover(ConsumerRecord<?, ?> record, Exception exception) {
92-
try {
93-
this.recoverer.accept(record, exception);
94-
}
95-
catch (Exception ex) {
96-
this.logger.error("Recoverer threw exception", ex);
97-
}
98-
}
99-
10091
void clearThreadState() {
10192
this.failures.remove();
10293
}

spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,14 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
6363
AtomicBoolean skipped = new AtomicBoolean();
6464
records.forEach(record -> {
6565
if (recoverable && first.get()) {
66-
skipped.set(skipper.test(record, exception));
66+
try {
67+
boolean test = skipper.test(record, exception);
68+
skipped.set(test);
69+
}
70+
catch (Exception ex) {
71+
logger.error("Failed to determine if this record should be recovererd, including in seeks", ex);
72+
skipped.set(false);
73+
}
6774
if (skipped.get() && logger.isDebugEnabled()) {
6875
logger.debug("Skipping seek of: " + record);
6976
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -533,10 +533,14 @@ public void testMaxFailures() throws Exception {
533533
container.setBeanName("testMaxFailures");
534534
final CountDownLatch recoverLatch = new CountDownLatch(1);
535535
final KafkaTemplate<Object, Object> dlTemplate = spy(new KafkaTemplate<>(pf));
536+
AtomicBoolean recovererShouldFail = new AtomicBoolean(true);
536537
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(dlTemplate) {
537538

538539
@Override
539540
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
541+
if (recovererShouldFail.getAndSet(false)) {
542+
throw new RuntimeException("test recoverer failure");
543+
}
540544
super.accept(record, exception);
541545
recoverLatch.countDown();
542546
}
@@ -587,8 +591,8 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
587591
assertThat(headers.get("baz")).isEqualTo("qux".getBytes());
588592
pf.destroy();
589593
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
590-
verify(afterRollbackProcessor, times(3)).isProcessInTransaction();
591-
verify(afterRollbackProcessor, times(3)).process(any(), any(), any(), anyBoolean());
594+
verify(afterRollbackProcessor, times(4)).isProcessInTransaction();
595+
verify(afterRollbackProcessor, times(4)).process(any(), any(), any(), anyBoolean());
592596
verify(afterRollbackProcessor).clearThreadState();
593597
verify(dlTemplate).send(any(ProducerRecord.class));
594598
verify(dlTemplate).sendOffsetsToTransaction(
@@ -629,8 +633,11 @@ public void testRollbackProcessorCrash() throws Exception {
629633
KafkaMessageListenerContainer<Integer, String> container =
630634
new KafkaMessageListenerContainer<>(cf, containerProps);
631635
container.setBeanName("testRollbackNoRetries");
636+
AtomicBoolean recovererShouldFail = new AtomicBoolean(true);
632637
BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer = (rec, ex) -> {
633-
throw new RuntimeException("arbp fail");
638+
if (recovererShouldFail.getAndSet(false)) {
639+
throw new RuntimeException("arbp fail");
640+
}
634641
};
635642
DefaultAfterRollbackProcessor<Object, Object> afterRollbackProcessor =
636643
spy(new DefaultAfterRollbackProcessor<>(recoverer, 0));

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2866,6 +2866,8 @@ This error handler does not support recovery, because the framework cannot know
28662866
After seeking, an exception that wraps the `ListenerExecutionFailedException` is thrown.
28672867
This is to cause the transaction to roll back (if transactions are enabled).
28682868

2869+
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
2870+
28692871
===== Container Stopping Error Handlers
28702872

28712873
The `ContainerStoppingErrorHandler` (used with record listeners) stops the container if the listener throws an exception.
@@ -2916,6 +2918,8 @@ Starting with version 2.2.5, the `DefaultAfterRollbackProcessor` can be invoked
29162918
Then, if you are using the `DeadLetterPublishingRecoverer` to publish a failed record, the processor will send the recovered record's offset in the original topic/partition to the transaction.
29172919
To enable this feature, set the `processInTransaction` and `kafkaTemplate` properties on the `DefaultAfterRollbackProcessor`.
29182920

2921+
IMPORTANT: If the recoverer fails (throws an exception), the record will be included in the seeks and recovery will be attempted again during the next delivery.
2922+
29192923
[[dead-letters]]
29202924
===== Publishing Dead-letter Records
29212925

0 commit comments

Comments
 (0)