Skip to content

Commit 0e411db

Browse files
authored
GH-1582: Add GenericErrorHandler.setAckAfterHandle
Resolves #1582 - allow users to use `getContainerProperties().getGenericErrorHandler().setAckAfterHandle(false)` - add the property to error handlers where it was missing (except logging handlers) **cherry-pick to 2.5.x**
1 parent 625bee2 commit 0e411db

File tree

4 files changed

+45
-14
lines changed

4 files changed

+45
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,4 +66,14 @@ default boolean isAckAfterHandle() {
6666
return true;
6767
}
6868

69+
/**
70+
* Set to false to prevent the container from committing the offset of a recovered
71+
* record (when the error handler does not itself throw an exception).
72+
* @param ack false to not commit.
73+
* @since 2.5.6
74+
*/
75+
default void setAckAfterHandle(boolean ack) {
76+
throw new UnsupportedOperationException("This error handler does not support setting this property");
77+
}
78+
6979
}

spring-kafka/src/main/java/org/springframework/kafka/listener/RecoveringBatchErrorHandler.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class RecoveringBatchErrorHandler extends FailedRecordProcessor
6060

6161
private final SeekToCurrentBatchErrorHandler fallbackHandler = new SeekToCurrentBatchErrorHandler();
6262

63+
private boolean ackAfterHandle = true;
64+
6365
/**
6466
* Construct an instance with the default recoverer which simply logs the record after
6567
* {@value SeekUtils#DEFAULT_MAX_FAILURES} (maxFailures) have occurred for a
@@ -102,6 +104,16 @@ public RecoveringBatchErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Ex
102104
this.fallbackHandler.setBackOff(backOff);
103105
}
104106

107+
@Override
108+
public boolean isAckAfterHandle() {
109+
return this.ackAfterHandle;
110+
}
111+
112+
@Override
113+
public void setAckAfterHandle(boolean ackAfterHandle) {
114+
this.ackAfterHandle = ackAfterHandle;
115+
}
116+
105117
@Override
106118
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
107119
MessageListenerContainer container) {

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
5353

5454
private final SeekToCurrentBatchErrorHandler seeker = new SeekToCurrentBatchErrorHandler();
5555

56+
private boolean ackAfterHandle = true;
57+
5658
/**
5759
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
5860
* a 5 second back off).
@@ -80,6 +82,17 @@ public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecove
8082
};
8183
}
8284

85+
@Override
86+
public boolean isAckAfterHandle() {
87+
return this.ackAfterHandle;
88+
}
89+
90+
@Override
91+
public void setAckAfterHandle(boolean ackAfterHandle) {
92+
this.ackAfterHandle = ackAfterHandle;
93+
}
94+
95+
8396
@Override
8497
public void handle(Exception thrownException, ConsumerRecords<?, ?> records,
8598
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -96,26 +96,22 @@ public void setCommitRecovered(boolean commitRecovered) { // NOSONAR enhanced ja
9696
super.setCommitRecovered(commitRecovered);
9797
}
9898

99-
@Override
100-
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
101-
Consumer<?, ?> consumer, MessageListenerContainer container) {
102-
103-
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
104-
getSkipPredicate(records, thrownException), this.logger, getLogLevel());
105-
}
106-
10799
@Override
108100
public boolean isAckAfterHandle() {
109101
return this.ackAfterHandle;
110102
}
111103

112-
/**
113-
* Set to false to tell the container to NOT commit the offset for a recovered record.
114-
* @param ackAfterHandle false to suppress committing the offset.
115-
* @since 2.3.2
116-
*/
104+
@Override
117105
public void setAckAfterHandle(boolean ackAfterHandle) {
118106
this.ackAfterHandle = ackAfterHandle;
119107
}
120108

109+
@Override
110+
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
111+
Consumer<?, ?> consumer, MessageListenerContainer container) {
112+
113+
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
114+
getSkipPredicate(records, thrownException), this.logger, getLogLevel());
115+
}
116+
121117
}

0 commit comments

Comments
 (0)