Skip to content

Commit 0474eb9

Browse files
garyrussellartembilan
authored andcommitted
GH-763: TX rollback and SeekErrHandler: Recovery
* GH-763: DefaultAfterRollbackProcessor: recovery Resolves #763 Add recovery capability to the default ARP. * Rebase, polishing - PR Comment * Polishing - refactor code for reuse. * BiPredicate instead of BiFunction. * Polishing - PR Comments; also extend functionality to SeekToCurrentErrorHandler for when not using transactions. Resolves #766 * Polishing - map the typed list to a wildcard list. * Polishing - raw types * Polishing - PR Comments
1 parent 02129d9 commit 0474eb9

File tree

11 files changed

+537
-29
lines changed

11 files changed

+537
-29
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,25 @@
4040
public interface AfterRollbackProcessor<K, V> {
4141

4242
/**
43-
* Process the remaining records.
43+
* Process the remaining records. Recoverable will be true if the container is
44+
* processing individual records; this allows the processor to recover (skip) the
45+
* failed record rather than re-seeking it. This is not possible with a batch listener
46+
* since only the listener itself knows which record in the batch keeps failing.
4447
* @param records the records.
4548
* @param consumer the consumer.
49+
* @param exception the exception
50+
* @param recoverable the recoverable.
51+
* @since 2.2
4652
*/
47-
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer);
53+
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception, boolean recoverable);
54+
55+
/**
56+
* Optional method to clear thread state; will be called just before a consumer
57+
* thread terminates.
58+
* @since 2.2
59+
*/
60+
default void clearThreadState() {
61+
// NOSONAR
62+
}
4863

4964
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 53 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,24 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.util.HashMap;
2019
import java.util.List;
21-
import java.util.Map;
20+
import java.util.function.BiConsumer;
2221

2322
import org.apache.commons.logging.Log;
2423
import org.apache.commons.logging.LogFactory;
2524
import org.apache.kafka.clients.consumer.Consumer;
2625
import org.apache.kafka.clients.consumer.ConsumerRecord;
27-
import org.apache.kafka.common.TopicPartition;
26+
27+
import org.springframework.kafka.support.SeekUtils;
28+
import org.springframework.lang.Nullable;
2829

2930
/**
3031
* Default implementation of {@link AfterRollbackProcessor}. Seeks all
3132
* topic/partitions so the records will be re-fetched, including the failed
32-
* record.
33+
* record. Starting with version 2.2 after a configurable number of failures
34+
* for the same topic/partition/offset, that record will be skipped after
35+
* calling a {@link BiConsumer} recoverer. The default recoverer simply logs
36+
* the failed record.
3337
*
3438
* @param <K> the key type.
3539
* @param <V> the value type.
@@ -43,19 +47,52 @@ public class DefaultAfterRollbackProcessor<K, V> implements AfterRollbackProcess
4347

4448
private static final Log logger = LogFactory.getLog(DefaultAfterRollbackProcessor.class);
4549

50+
private final FailedRecordTracker failureTracker;
51+
52+
/**
53+
* Construct an instance with the default recoverer which simply logs the record after
54+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
55+
* topic/partition/offset.
56+
* @since 2.2
57+
*/
58+
public DefaultAfterRollbackProcessor() {
59+
this(null, SeekUtils.DEFAULT_MAX_FAILURES);
60+
}
61+
62+
/**
63+
* Construct an instance with the provided recoverer which will be called after
64+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
65+
* topic/partition/offset.
66+
* @param recoverer the recoverer.
67+
* @since 2.2
68+
*/
69+
public DefaultAfterRollbackProcessor(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
70+
this(recoverer, SeekUtils.DEFAULT_MAX_FAILURES);
71+
}
72+
73+
/**
74+
* Construct an instance with the provided recoverer which will be called after
75+
* maxFailures have occurred for a topic/partition/offset.
76+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
77+
* @param maxFailures the maxFailures.
78+
* @since 2.2
79+
*/
80+
public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer,
81+
int maxFailures) {
82+
this.failureTracker = new FailedRecordTracker(recoverer, maxFailures, logger);
83+
}
84+
85+
@SuppressWarnings({ "unchecked", "rawtypes" })
86+
@Override
87+
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
88+
boolean recoverable) {
89+
SeekUtils.doSeeks(((List) records),
90+
consumer, exception, recoverable, this.failureTracker::skip, logger);
91+
}
92+
4693
@Override
47-
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer) {
48-
Map<TopicPartition, Long> partitions = new HashMap<>();
49-
records.forEach(r -> partitions.computeIfAbsent(new TopicPartition(r.topic(), r.partition()),
50-
offset -> r.offset()));
51-
partitions.forEach((topicPartition, offset) -> {
52-
try {
53-
consumer.seek(topicPartition, offset);
54-
}
55-
catch (Exception e) {
56-
logger.error("Failed to seek " + topicPartition + " to " + offset);
57-
}
58-
});
94+
public void clearThreadState() {
95+
this.failureTracker.clearThreadState();
5996
}
6097

6198
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,13 @@ default void handle(Exception thrownException, List<ConsumerRecord<?, ?>> record
4141
handle(thrownException, null);
4242
}
4343

44+
/**
45+
* Optional method to clear thread state; will be called just before a consumer
46+
* thread terminates.
47+
* @since 2.2
48+
*/
49+
default void clearThreadState() {
50+
// NOSONAR
51+
}
52+
4453
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.function.BiConsumer;
20+
21+
import org.apache.commons.logging.Log;
22+
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
24+
import org.springframework.lang.Nullable;
25+
26+
/**
27+
* Track record processing failure counts.
28+
*
29+
* @author Gary Russell
30+
* @since 2.2
31+
*
32+
*/
33+
class FailedRecordTracker {
34+
35+
private final ThreadLocal<FailedRecord> failures = new ThreadLocal<>(); // intentionally not static
36+
37+
private final BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer;
38+
39+
private final int maxFailures;
40+
41+
FailedRecordTracker(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures, Log logger) {
42+
if (recoverer == null) {
43+
this.recoverer = (r, t) -> logger.error("Max failures (" + maxFailures + ") reached for: " + r, t);
44+
}
45+
else {
46+
this.recoverer = recoverer;
47+
}
48+
this.maxFailures = maxFailures;
49+
}
50+
51+
boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
52+
FailedRecord failedRecord = this.failures.get();
53+
if (failedRecord == null || !failedRecord.getTopic().equals(record.topic())
54+
|| failedRecord.getPartition() != record.partition() || failedRecord.getOffset() != record.offset()) {
55+
this.failures.set(new FailedRecord(record.topic(), record.partition(), record.offset()));
56+
return false;
57+
}
58+
else {
59+
if (failedRecord.incrementAndGet() >= this.maxFailures) {
60+
this.recoverer.accept(record, exception);
61+
return true;
62+
}
63+
return false;
64+
}
65+
}
66+
67+
void clearThreadState() {
68+
this.failures.remove();
69+
}
70+
71+
private static final class FailedRecord {
72+
73+
private final String topic;
74+
75+
private final int partition;
76+
77+
private final long offset;
78+
79+
private int count;
80+
81+
FailedRecord(String topic, int partition, long offset) {
82+
this.topic = topic;
83+
this.partition = partition;
84+
this.offset = offset;
85+
this.count = 1;
86+
}
87+
88+
private String getTopic() {
89+
return this.topic;
90+
}
91+
92+
private int getPartition() {
93+
return this.partition;
94+
}
95+
96+
private long getOffset() {
97+
return this.offset;
98+
}
99+
100+
private int incrementAndGet() {
101+
return ++this.count;
102+
}
103+
104+
}
105+
106+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,10 @@ public void run() {
789789
((ThreadPoolTaskScheduler) this.taskScheduler).destroy();
790790
}
791791
this.consumer.close();
792+
getAfterRollbackProcessor().clearThreadState();
793+
if (this.errorHandler != null) {
794+
this.errorHandler.clearThreadState();
795+
}
792796
this.logger.info("Consumer stopped");
793797
publishConsumerStoppedEvent();
794798
}
@@ -924,10 +928,10 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
924928
catch (RuntimeException e) {
925929
this.logger.error("Transaction rolled back", e);
926930
if (recordList == null) {
927-
getAfterRollbackProcessor().process(createRecordList(records), this.consumer);
931+
getAfterRollbackProcessor().process(createRecordList(records), this.consumer, e, false);
928932
}
929933
else {
930-
getAfterRollbackProcessor().process(recordList, this.consumer);
934+
getAfterRollbackProcessor().process(recordList, this.consumer, e, false);
931935
}
932936
}
933937
}
@@ -1077,7 +1081,7 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
10771081
while (iterator.hasNext()) {
10781082
unprocessed.add(iterator.next());
10791083
}
1080-
getAfterRollbackProcessor().process(unprocessed, this.consumer);
1084+
getAfterRollbackProcessor().process(unprocessed, this.consumer, e, true);
10811085
}
10821086
}
10831087
}

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.util.LinkedHashMap;
2019
import java.util.List;
21-
import java.util.Map;
20+
import java.util.function.BiConsumer;
2221

22+
import org.apache.commons.logging.Log;
23+
import org.apache.commons.logging.LogFactory;
2324
import org.apache.kafka.clients.consumer.Consumer;
2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
25-
import org.apache.kafka.common.TopicPartition;
2626

2727
import org.springframework.kafka.KafkaException;
28+
import org.springframework.kafka.support.SeekUtils;
29+
import org.springframework.lang.Nullable;
2830

2931
/**
3032
* An error handler that seeks to the current offset for each topic in the remaining
@@ -37,14 +39,52 @@
3739
*/
3840
public class SeekToCurrentErrorHandler implements ContainerAwareErrorHandler {
3941

42+
private static final Log logger = LogFactory.getLog(SeekToCurrentErrorHandler.class);
43+
44+
private final FailedRecordTracker failureTracker;
45+
46+
/**
47+
* Construct an instance with the default recoverer which simply logs the record after
48+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
49+
* topic/partition/offset.
50+
* @since 2.2
51+
*/
52+
public SeekToCurrentErrorHandler() {
53+
this(null, SeekUtils.DEFAULT_MAX_FAILURES);
54+
}
55+
56+
/**
57+
* Construct an instance with the provided recoverer which will be called after
58+
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
59+
* topic/partition/offset.
60+
* @param recoverer the recoverer.
61+
* @since 2.2
62+
*/
63+
public SeekToCurrentErrorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
64+
this(recoverer, SeekUtils.DEFAULT_MAX_FAILURES);
65+
}
66+
67+
/**
68+
* Construct an instance with the provided recoverer which will be called after
69+
* maxFailures have occurred for a topic/partition/offset.
70+
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
71+
* @param maxFailures the maxFailures.
72+
* @since 2.2
73+
*/
74+
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, int maxFailures) {
75+
this.failureTracker = new FailedRecordTracker(recoverer, maxFailures, logger);
76+
}
77+
4078
@Override
4179
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
4280
Consumer<?, ?> consumer, MessageListenerContainer container) {
43-
Map<TopicPartition, Long> offsets = new LinkedHashMap<>();
44-
records.forEach(r ->
45-
offsets.computeIfAbsent(new TopicPartition(r.topic(), r.partition()), k -> r.offset()));
46-
offsets.forEach(consumer::seek);
81+
SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, logger);
4782
throw new KafkaException("Seek to current after exception", thrownException);
4883
}
4984

85+
@Override
86+
public void clearThreadState() {
87+
this.failureTracker.clearThreadState();
88+
}
89+
5090
}

0 commit comments

Comments
 (0)