Skip to content

Commit f9a7e4f

Browse files
garyrussellartembilan
authored andcommitted
GH-1382: RetryingBatchErrorHandler
Resolves #1382 **cherry-pick to 2.4.x, 2.3.x** * * Polishing; use `Runnable` instead of `Supplier`; only pause/resume once. # Conflicts: # src/reference/asciidoc/kafka.adoc Fix broken JavaDocs
1 parent e148db5 commit f9a7e4f

File tree

10 files changed

+608
-25
lines changed

10 files changed

+608
-25
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchErrorHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,4 +42,19 @@ default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consu
4242
handle(thrownException, data);
4343
}
4444

45+
/**
46+
* Handle the exception.
47+
* @param thrownException the exception.
48+
* @param data the consumer records.
49+
* @param consumer the consumer.
50+
* @param container the container.
51+
* @param invokeListener a callback to re-invoke the listener.
52+
* @since 2.3.7
53+
*/
54+
default void handle(Exception thrownException, ConsumerRecords<?, ?> data,
55+
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
56+
57+
handle(thrownException, data);
58+
}
59+
4560
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerAwareBatchErrorHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@ default void handle(Exception thrownException, ConsumerRecords<?, ?> data) {
4141
@Override
4242
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
4343
MessageListenerContainer container) {
44+
4445
handle(thrownException, data, consumer);
4546
}
4647

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerAwareBatchErrorHandler.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,4 +39,21 @@ default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consu
3939
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
4040
MessageListenerContainer container);
4141

42+
/**
43+
* Handle the exception.
44+
* @param thrownException the exception.
45+
* @param data the consumer records.
46+
* @param consumer the consumer.
47+
* @param container the container.
48+
* @param invokeListener a callback to re-invoke the listener.
49+
* @since 2.3.7
50+
*/
51+
@Override
52+
@SuppressWarnings("unused")
53+
default void handle(Exception thrownException, ConsumerRecords<?, ?> data,
54+
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
55+
56+
handle(thrownException, data, consumer, container);
57+
}
58+
4259
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,7 +1380,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
13801380
throw e;
13811381
}
13821382
try {
1383-
invokeBatchErrorHandler(records, e);
1383+
invokeBatchErrorHandler(records, recordList, e);
13841384
// unlikely, but possible, that a batch error handler "handles" the error
13851385
if ((!acked && !this.autoCommit && this.batchErrorHandler.isAckAfterHandle()) || producer != null) {
13861386
this.acks.addAll(getHighestOffsetRecords(records));
@@ -1425,18 +1425,10 @@ private void failureTimer(@Nullable Object sample) {
14251425
}
14261426

14271427
private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONAR - Cyclomatic Complexity
1428-
List<ConsumerRecord<K, V>> recordList, @SuppressWarnings(RAW_TYPES) Producer producer) throws InterruptedException {
1428+
List<ConsumerRecord<K, V>> recordList, @SuppressWarnings(RAW_TYPES) Producer producer)
1429+
throws InterruptedException {
14291430

1430-
if (this.wantsFullRecords) {
1431-
this.batchListener.onMessage(records,
1432-
this.isAnyManualAck
1433-
? new ConsumerBatchAcknowledgment(records)
1434-
: null,
1435-
this.consumer);
1436-
}
1437-
else {
1438-
doInvokeBatchOnMessage(records, recordList);
1439-
}
1431+
invokeBatchOnMessage(records, recordList);
14401432
List<ConsumerRecord<?, ?>> toSeek = null;
14411433
if (this.nackSleep >= 0) {
14421434
int index = 0;
@@ -1467,6 +1459,21 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
14671459
}
14681460
}
14691461

1462+
private void invokeBatchOnMessage(final ConsumerRecords<K, V> records,
1463+
@Nullable List<ConsumerRecord<K, V>> recordList) {
1464+
1465+
if (this.wantsFullRecords) {
1466+
this.batchListener.onMessage(records,
1467+
this.isAnyManualAck
1468+
? new ConsumerBatchAcknowledgment(records)
1469+
: null,
1470+
this.consumer);
1471+
}
1472+
else {
1473+
doInvokeBatchOnMessage(records, recordList);
1474+
}
1475+
}
1476+
14701477
private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
14711478
List<ConsumerRecord<K, V>> recordList) {
14721479

@@ -1492,14 +1499,12 @@ private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
14921499
}
14931500
}
14941501

1495-
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records, RuntimeException e) {
1496-
if (this.batchErrorHandler instanceof ContainerAwareBatchErrorHandler) {
1497-
this.batchErrorHandler.handle(decorateException(e), records, this.consumer,
1498-
KafkaMessageListenerContainer.this.thisOrParentContainer);
1499-
}
1500-
else {
1501-
this.batchErrorHandler.handle(decorateException(e), records, this.consumer);
1502-
}
1502+
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
1503+
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException e) {
1504+
1505+
this.batchErrorHandler.handle(decorateException(e), records, this.consumer,
1506+
KafkaMessageListenerContainer.this.thisOrParentContainer,
1507+
() -> invokeBatchOnMessage(records, list));
15031508
}
15041509

15051510
private void invokeRecordListener(final ConsumerRecords<K, V> records) {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecords;
21+
22+
/**
23+
* A batch error handler that is capable of invoking the listener during error handling.
24+
*
25+
* @author Gary Russell
26+
* @since 2.3.7
27+
*
28+
*/
29+
@FunctionalInterface
30+
public interface ListenerInvokingBatchErrorHandler extends ContainerAwareBatchErrorHandler {
31+
32+
@Override
33+
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
34+
MessageListenerContainer container) {
35+
36+
throw new UnsupportedOperationException("Container should never call this");
37+
}
38+
39+
@Override
40+
void handle(Exception thrownException, ConsumerRecords<?, ?> records,
41+
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener);
42+
43+
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Duration;
20+
import java.util.function.BiConsumer;
21+
22+
import org.apache.commons.logging.LogFactory;
23+
import org.apache.kafka.clients.consumer.Consumer;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
26+
import org.springframework.core.log.LogAccessor;
27+
import org.springframework.kafka.KafkaException;
28+
import org.springframework.lang.Nullable;
29+
import org.springframework.util.backoff.BackOff;
30+
import org.springframework.util.backoff.BackOffExecution;
31+
import org.springframework.util.backoff.FixedBackOff;
32+
33+
/**
34+
* A batch error handler that invokes the listener according to the supplied
35+
* {@link BackOff}. The consumer is paused/polled/resumed before each retry in order to
36+
* avoid a rebalance. If/when retries are exhausted, the provided
37+
* {@link ConsumerRecordRecoverer} is invoked. If the recoverer throws an exception, or
38+
* the thread is interrupted while sleeping, seeks are performed so that the batch will be
39+
* redelivered on the next poll.
40+
*
41+
* @author Gary Russell
42+
* @since 2.3.7
43+
*
44+
*/
45+
public class RetryingBatchErrorHandler implements ListenerInvokingBatchErrorHandler {
46+
47+
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryingBatchErrorHandler.class));
48+
49+
private final BackOff backOff;
50+
51+
private final BiConsumer<ConsumerRecords<?, ?>, Exception> recoverer;
52+
53+
private final SeekToCurrentBatchErrorHandler seeker = new SeekToCurrentBatchErrorHandler();
54+
55+
/**
56+
* Construct an instance with a default {@link FixedBackOff} (unlimited attempts with
57+
* a 5 second back off).
58+
*/
59+
public RetryingBatchErrorHandler() {
60+
this(new FixedBackOff(), null);
61+
}
62+
63+
/**
64+
* Construct an instance with the provided {@link BackOff} and
65+
* {@link ConsumerRecordRecoverer}. If the recoverer is {@code null}, the discarded
66+
* records (topic-partition{@literal @}offset) will be logged.
67+
* @param backOff the back off.
68+
* @param recoverer the recoverer.
69+
*/
70+
public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecoverer recoverer) {
71+
this.backOff = backOff;
72+
this.recoverer = (crs, ex) -> {
73+
if (recoverer == null) {
74+
LOGGER.error(ex, () -> "Records discarded: " + tpos(crs));
75+
}
76+
else {
77+
crs.spliterator().forEachRemaining(rec -> recoverer.accept(rec, ex));
78+
}
79+
};
80+
}
81+
82+
@Override
83+
public void handle(Exception thrownException, ConsumerRecords<?, ?> records,
84+
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
85+
86+
BackOffExecution execution = this.backOff.start();
87+
long nextBackOff = execution.nextBackOff();
88+
String failed = null;
89+
consumer.pause(consumer.assignment());
90+
try {
91+
while (nextBackOff != BackOffExecution.STOP) {
92+
consumer.poll(Duration.ZERO);
93+
try {
94+
Thread.sleep(nextBackOff);
95+
}
96+
catch (InterruptedException e1) {
97+
Thread.currentThread().interrupt();
98+
this.seeker.handle(thrownException, records, consumer, container);
99+
throw new KafkaException("Interrupted during retry", e1);
100+
}
101+
try {
102+
invokeListener.run();
103+
return;
104+
}
105+
catch (Exception e) {
106+
if (failed == null) {
107+
failed = tpos(records);
108+
}
109+
String toLog = failed;
110+
LOGGER.debug(e, () -> "Retry failed for: " + toLog);
111+
}
112+
nextBackOff = execution.nextBackOff();
113+
}
114+
try {
115+
this.recoverer.accept(records, thrownException);
116+
}
117+
catch (Exception e) {
118+
LOGGER.error(e, () -> "Recoverer threw an exception; re-seeking batch");
119+
this.seeker.handle(thrownException, records, consumer, container);
120+
}
121+
}
122+
finally {
123+
consumer.resume(consumer.assignment());
124+
}
125+
}
126+
127+
private String tpos(ConsumerRecords<?, ?> records) {
128+
StringBuffer sb = new StringBuffer();
129+
records.spliterator().forEachRemaining(rec -> sb
130+
.append(rec.topic())
131+
.append('-')
132+
.append(rec.partition())
133+
.append('@')
134+
.append(rec.offset())
135+
.append(','));
136+
sb.deleteCharAt(sb.length() - 1);
137+
return sb.toString();
138+
}
139+
140+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1247,7 +1247,7 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) {
12471247
InOrder inOrder = inOrder(messageListener, consumer, errorHandler);
12481248
inOrder.verify(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
12491249
inOrder.verify(messageListener).onMessage(any());
1250-
inOrder.verify(errorHandler).handle(any(), any(), any());
1250+
inOrder.verify(errorHandler).handle(any(), any(), any(), any(), any());
12511251
inOrder.verify(consumer).commitSync(anyMap(), any());
12521252
container.stop();
12531253
}

0 commit comments

Comments
 (0)