Skip to content

Commit 87ac522

Browse files
GH-3276: Draft and test codes.
1 parent fdc59f0 commit 87ac522

File tree

7 files changed

+1413
-535
lines changed

7 files changed

+1413
-535
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2016-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* ...
19+
* @author Sanghyeok An
20+
*
21+
* @since 3.3
22+
*/
23+
24+
package org.springframework.kafka.core;
25+
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
28+
public record FailedRecordTuple<K, V>(
29+
ConsumerRecord<K, V> record,
30+
RuntimeException ex) {
31+
32+
};

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.lang.Thread.UncaughtExceptionHandler;
2019
import java.nio.ByteBuffer;
2120
import java.time.Duration;
2221
import java.util.AbstractMap.SimpleEntry;
@@ -39,6 +38,7 @@
3938
import java.util.concurrent.BlockingQueue;
4039
import java.util.concurrent.CompletableFuture;
4140
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.ConcurrentLinkedDeque;
4242
import java.util.concurrent.CountDownLatch;
4343
import java.util.concurrent.LinkedBlockingQueue;
4444
import java.util.concurrent.ScheduledFuture;
@@ -84,6 +84,7 @@
8484
import org.springframework.core.task.SimpleAsyncTaskExecutor;
8585
import org.springframework.kafka.KafkaException;
8686
import org.springframework.kafka.core.ConsumerFactory;
87+
import org.springframework.kafka.core.FailedRecordTuple;
8788
import org.springframework.kafka.core.KafkaAdmin;
8889
import org.springframework.kafka.core.KafkaResourceHolder;
8990
import org.springframework.kafka.event.ConsumerFailedToStartEvent;
@@ -107,6 +108,7 @@
107108
import org.springframework.kafka.listener.ContainerProperties.AckMode;
108109
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
109110
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
111+
import org.springframework.kafka.listener.FailedRecordTracker.FailedRecord;
110112
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
111113
import org.springframework.kafka.support.Acknowledgment;
112114
import org.springframework.kafka.support.KafkaHeaders;
@@ -842,6 +844,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
842844

843845
private volatile long lastPoll = System.currentTimeMillis();
844846

847+
private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords = new ConcurrentLinkedDeque();
848+
849+
845850
@SuppressWarnings(UNCHECKED)
846851
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
847852
ObservationRegistry observationRegistry) {
@@ -899,10 +904,9 @@ else if (listener instanceof MessageListener) {
899904
this.observationEnabled = this.containerProperties.isObservationEnabled();
900905

901906
if (!AopUtils.isAopProxy(listener)) {
902-
final BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback
903-
= (cRecord, runtimeException) ->
904-
this.invokeErrorHandlerBySingleRecord(cRecord, runtimeException);
905-
this.listener.setAsyncRetryCallback(asyncRetryCallback);
907+
final java.util.function.Consumer<FailedRecordTuple> callbackForAsyncFailureQueue =
908+
(fRecord) -> this.failedRecords.addLast(fRecord);
909+
this.listener.setCallbackForAsyncFailureQueue(callbackForAsyncFailureQueue);
906910
}
907911
}
908912
else {
@@ -1303,6 +1307,15 @@ public void run() {
13031307
boolean failedAuthRetry = false;
13041308
this.lastReceive = System.currentTimeMillis();
13051309
while (isRunning()) {
1310+
1311+
try {
1312+
handleAsyncFailure();
1313+
} catch (Exception e) {
1314+
// TODO: Need to improve error handling.
1315+
// TODO: Need to determine how to handle a failed message.
1316+
logger.error("Failed to process re-try messages. ");
1317+
}
1318+
13061319
try {
13071320
pollAndInvoke();
13081321
if (failedAuthRetry) {
@@ -1444,6 +1457,19 @@ protected void pollAndInvoke() {
14441457
}
14451458
}
14461459

1460+
protected void handleAsyncFailure() {
1461+
List<FailedRecordTuple> copyFailedRecords = new ArrayList<>();
1462+
while (!this.failedRecords.isEmpty()) {
1463+
FailedRecordTuple failedRecordTuple = this.failedRecords.pollFirst();
1464+
copyFailedRecords.add(failedRecordTuple);
1465+
}
1466+
1467+
if (!copyFailedRecords.isEmpty()) {
1468+
copyFailedRecords.forEach(failedRecordTuple ->
1469+
this.invokeErrorHandlerBySingleRecord(failedRecordTuple.record(), failedRecordTuple.ex()));
1470+
}
1471+
}
1472+
14471473
private void doProcessCommits() {
14481474
if (!this.autoCommit && !this.isRecordAck) {
14491475
try {

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.util.function.BiConsumer;
19+
import java.util.function.Consumer;
2020

2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.springframework.kafka.core.FailedRecordTuple;
2223

2324
/**
2425
* Listener for handling individual incoming Kafka messages.
@@ -28,11 +29,12 @@
2829
*
2930
* @author Marius Bogoevici
3031
* @author Gary Russell
32+
* @author Sanghyeok An
3133
*/
3234
@FunctionalInterface
3335
public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {
3436

35-
default void setAsyncRetryCallback(BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback) {
37+
default void setCallbackForAsyncFailureQueue(Consumer<FailedRecordTuple> asyncRetryCallback) {
3638
//
3739
}
3840
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java

Lines changed: 4 additions & 13 deletions
Large diffs are not rendered by default.

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@
4747
import org.springframework.expression.spel.standard.SpelExpressionParser;
4848
import org.springframework.expression.spel.support.StandardEvaluationContext;
4949
import org.springframework.expression.spel.support.StandardTypeConverter;
50+
import org.springframework.kafka.core.FailedRecordTuple;
5051
import org.springframework.kafka.core.KafkaTemplate;
51-
import org.springframework.kafka.listener.AsyncRetryableException;
5252
import org.springframework.kafka.listener.ConsumerSeekAware;
5353
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
5454
import org.springframework.kafka.listener.ListenerExecutionFailedException;
@@ -94,6 +94,7 @@
9494
* @author Wang ZhiYang
9595
* @author Huijin Hong
9696
* @author Soby Chacko
97+
* @author Sanghyeok An
9798
*/
9899
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, AsyncRepliesAware {
99100

@@ -157,13 +158,16 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
157158

158159
private BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback;
159160

161+
private java.util.function.Consumer<FailedRecordTuple> callbackForAsyncFailureQueue;
162+
160163
/**
161164
* Create an instance with the provided bean and method.
162165
* @param bean the bean.
163166
* @param method the method.
164167
*/
165168
protected MessagingMessageListenerAdapter(Object bean, Method method) {
166169
this(bean, method, null);
170+
System.out.println("here");
167171
}
168172

169173
/**
@@ -689,7 +693,8 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
689693
if (request instanceof ConsumerRecord &&
690694
ex instanceof RuntimeException) {
691695
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
692-
asyncRetryCallback.accept(record, (RuntimeException) ex);
696+
FailedRecordTuple failedRecordTuple = new FailedRecordTuple(record, (RuntimeException) ex);
697+
this.callbackForAsyncFailureQueue.accept(failedRecordTuple);
693698
}
694699
}
695700
}
@@ -910,4 +915,8 @@ public void setAsyncRetryCallback(BiConsumer<ConsumerRecord<K, V>, RuntimeExcept
910915
this.asyncRetryCallback = asyncRetryCallback;
911916
}
912917

918+
public void putInAsyncFailureQueue(java.util.function.Consumer<FailedRecordTuple> callbackForAsyncFailureQueue) {
919+
this.callbackForAsyncFailureQueue = callbackForAsyncFailureQueue;
920+
}
921+
913922
}

0 commit comments

Comments
 (0)