Skip to content

Commit e30319d

Browse files
Fixes lint errors.
1 parent ac110b4 commit e30319d

9 files changed

+1056
-946
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@
108108
import org.springframework.kafka.listener.ContainerProperties.AckMode;
109109
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
110110
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
111-
import org.springframework.kafka.listener.FailedRecordTracker.FailedRecord;
112111
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
113112
import org.springframework.kafka.support.Acknowledgment;
114113
import org.springframework.kafka.support.KafkaHeaders;
@@ -846,7 +845,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
846845

847846
private final ConcurrentLinkedDeque<FailedRecordTuple<K, V>> failedRecords = new ConcurrentLinkedDeque();
848847

849-
850848
@SuppressWarnings(UNCHECKED)
851849
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
852850
ObservationRegistry observationRegistry) {
@@ -904,7 +902,7 @@ else if (listener instanceof MessageListener) {
904902
this.observationEnabled = this.containerProperties.isObservationEnabled();
905903

906904
if (!AopUtils.isAopProxy(listener)) {
907-
final java.util.function.Consumer<FailedRecordTuple> callbackForAsyncFailureQueue =
905+
final java.util.function.Consumer<FailedRecordTuple<K, V>> callbackForAsyncFailureQueue =
908906
(fRecord) -> this.failedRecords.addLast(fRecord);
909907
this.listener.setCallbackForAsyncFailureQueue(callbackForAsyncFailureQueue);
910908
}
@@ -1310,10 +1308,11 @@ public void run() {
13101308

13111309
try {
13121310
handleAsyncFailure();
1313-
} catch (Exception e) {
1311+
}
1312+
catch (Exception e) {
13141313
// TODO: Need to improve error handling.
13151314
// TODO: Need to determine how to handle a failed message.
1316-
logger.error("Failed to process re-try messages. ");
1315+
this.logger.error("Failed to process re-try messages. ");
13171316
}
13181317

13191318
try {
@@ -1466,7 +1465,7 @@ protected void handleAsyncFailure() {
14661465

14671466
if (!copyFailedRecords.isEmpty()) {
14681467
copyFailedRecords.forEach(failedRecordTuple ->
1469-
this.invokeErrorHandlerBySingleRecord(failedRecordTuple.record(), failedRecordTuple.ex()));
1468+
this.invokeErrorHandlerBySingleRecord(failedRecordTuple.record(), failedRecordTuple.ex()));
14701469
}
14711470
}
14721471

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListener.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.function.Consumer;
2020

2121
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
2223
import org.springframework.kafka.core.FailedRecordTuple;
2324

2425
/**
@@ -34,7 +35,7 @@
3435
@FunctionalInterface
3536
public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {
3637

37-
default void setCallbackForAsyncFailureQueue(Consumer<FailedRecordTuple> asyncRetryCallback) {
38+
default void setCallbackForAsyncFailureQueue(Consumer<FailedRecordTuple<K, V>> asyncRetryCallback) {
3839
//
3940
}
4041
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.time.Clock;
2121
import java.time.Instant;
2222
import java.util.Optional;
23-
import java.util.function.BiConsumer;
2423

2524
import org.apache.kafka.clients.consumer.Consumer;
2625
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -148,7 +147,7 @@ public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
148147
}
149148

150149
@Override
151-
public void setCallbackForAsyncFailureQueue(java.util.function.Consumer<FailedRecordTuple> callbackForAsyncFailureQueue) {
150+
public void setCallbackForAsyncFailureQueue(java.util.function.Consumer<FailedRecordTuple<K, V>> callbackForAsyncFailureQueue) {
152151
this.delegate.setCallbackForAsyncFailureQueue(callbackForAsyncFailureQueue);
153152
}
154153
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
158158

159159
private BiConsumer<ConsumerRecord<K, V>, RuntimeException> asyncRetryCallback;
160160

161-
private java.util.function.Consumer<FailedRecordTuple> callbackForAsyncFailureQueue;
161+
private java.util.function.Consumer<FailedRecordTuple<K, V>> callbackForAsyncFailureQueue;
162162

163163
/**
164164
* Create an instance with the provided bean and method.
@@ -167,7 +167,6 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
167167
*/
168168
protected MessagingMessageListenerAdapter(Object bean, Method method) {
169169
this(bean, method, null);
170-
System.out.println("here");
171170
}
172171

173172
/**
@@ -692,7 +691,7 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
692691
acknowledge(acknowledgment);
693692
if (canAsyncRetry(request, ex)) {
694693
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
695-
FailedRecordTuple failedRecordTuple = new FailedRecordTuple(record, (RuntimeException) ex);
694+
FailedRecordTuple<K, V> failedRecordTuple = new FailedRecordTuple<>(record, (RuntimeException) ex);
696695
this.callbackForAsyncFailureQueue.accept(failedRecordTuple);
697696
}
698697
}
@@ -897,6 +896,10 @@ private boolean rawByParameterIsType(Type parameterType, Type type) {
897896
return parameterType instanceof ParameterizedType pType && pType.getRawType().equals(type);
898897
}
899898

899+
public void putInAsyncFailureQueue(java.util.function.Consumer<FailedRecordTuple<K, V>> callbackForAsyncFailureQueue) {
900+
this.callbackForAsyncFailureQueue = callbackForAsyncFailureQueue;
901+
}
902+
900903
/**
901904
* Root object for reply expression evaluation.
902905
* @param request the request.
@@ -915,8 +918,4 @@ public void acknowledge() {
915918

916919
}
917920

918-
public void putInAsyncFailureQueue(java.util.function.Consumer<FailedRecordTuple> callbackForAsyncFailureQueue) {
919-
this.callbackForAsyncFailureQueue = callbackForAsyncFailureQueue;
920-
}
921-
922921
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
8888

8989
@Override
9090
public void setCallbackForAsyncFailureQueue(
91-
java.util.function.Consumer<FailedRecordTuple> asyncRetryCallback) {
91+
java.util.function.Consumer<FailedRecordTuple<K, V>> asyncRetryCallback) {
9292
putInAsyncFailureQueue(asyncRetryCallback);
9393
}
9494
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.java

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.kafka.common.serialization.StringSerializer;
4848
import org.assertj.core.api.InstanceOfAssertFactories;
4949
import org.junit.jupiter.api.Test;
50+
5051
import org.springframework.beans.factory.annotation.Autowired;
5152
import org.springframework.context.annotation.Bean;
5253
import org.springframework.context.annotation.Configuration;
@@ -94,12 +95,13 @@
9495

9596
@SpringJUnitConfig
9697
@DirtiesContext
97-
@EmbeddedKafka(topics = { AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.FIRST_TOPIC,
98-
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.SECOND_TOPIC,
99-
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.THIRD_TOPIC,
100-
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.FOURTH_TOPIC,
101-
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.TWO_LISTENERS_TOPIC,
102-
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.MANUAL_TOPIC })
98+
@EmbeddedKafka(topics = {
99+
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.FIRST_TOPIC,
100+
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.SECOND_TOPIC,
101+
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.THIRD_TOPIC,
102+
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.FOURTH_TOPIC,
103+
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.TWO_LISTENERS_TOPIC,
104+
AsyncCompletableFutureRetryTopicClassLevelIntegrationTests.MANUAL_TOPIC })
103105
@TestPropertySource(properties = { "five.attempts=5", "kafka.template=customKafkaTemplate"})
104106
public class AsyncCompletableFutureRetryTopicClassLevelIntegrationTests {
105107

@@ -167,8 +169,9 @@ void shouldRetrySecondTopic() {
167169
}
168170

169171
@Test
170-
void shouldRetryThirdTopicWithTimeout(@Autowired KafkaAdmin admin,
171-
@Autowired KafkaListenerEndpointRegistry registry) throws Exception {
172+
void shouldRetryThirdTopicWithTimeout(
173+
@Autowired KafkaAdmin admin,
174+
@Autowired KafkaListenerEndpointRegistry registry) throws Exception {
172175

173176
kafkaTemplate.send(THIRD_TOPIC, "Testing topic 3");
174177
assertThat(awaitLatch(latchContainer.countDownLatch3)).isTrue();
@@ -215,26 +218,34 @@ void shouldRetryFourthTopicWithNoDlt() {
215218
}
216219

217220
@Test
218-
void shouldRetryFifthTopicWithTwoListenersAndManualAssignment(@Autowired
219-
FifthTopicListener1 listener1,
220-
@Autowired FifthTopicListener2 listener2) {
221+
void shouldRetryFifthTopicWithTwoListenersAndManualAssignment(
222+
@Autowired FifthTopicListener1 listener1,
223+
@Autowired FifthTopicListener2 listener2) {
221224

222225
kafkaTemplate.send(TWO_LISTENERS_TOPIC, 0, "0", "Testing topic 5 - 0");
223226
kafkaTemplate.send(TWO_LISTENERS_TOPIC, 1, "0", "Testing topic 5 - 1");
224227
assertThat(awaitLatch(latchContainer.countDownLatch51)).isTrue();
225228
assertThat(awaitLatch(latchContainer.countDownLatch52)).isTrue();
226229
assertThat(awaitLatch(latchContainer.countDownLatchDltThree)).isTrue();
227-
assertThat(listener1.topics).containsExactly(TWO_LISTENERS_TOPIC, TWO_LISTENERS_TOPIC
228-
+ "-listener1-0", TWO_LISTENERS_TOPIC + "-listener1-1", TWO_LISTENERS_TOPIC + "-listener1-2",
229-
TWO_LISTENERS_TOPIC + "-listener1-dlt");
230-
assertThat(listener2.topics).containsExactly(TWO_LISTENERS_TOPIC, TWO_LISTENERS_TOPIC
231-
+ "-listener2-0", TWO_LISTENERS_TOPIC + "-listener2-1", TWO_LISTENERS_TOPIC + "-listener2-2",
232-
TWO_LISTENERS_TOPIC + "-listener2-dlt");
230+
assertThat(listener1.topics).containsExactly(
231+
TWO_LISTENERS_TOPIC,
232+
TWO_LISTENERS_TOPIC + "-listener1-0",
233+
TWO_LISTENERS_TOPIC + "-listener1-1",
234+
TWO_LISTENERS_TOPIC + "-listener1-2",
235+
TWO_LISTENERS_TOPIC + "-listener1-dlt"
236+
);
237+
assertThat(listener2.topics).containsExactly(
238+
TWO_LISTENERS_TOPIC,
239+
TWO_LISTENERS_TOPIC + "-listener2-0",
240+
TWO_LISTENERS_TOPIC + "-listener2-1",
241+
TWO_LISTENERS_TOPIC + "-listener2-2",
242+
TWO_LISTENERS_TOPIC + "-listener2-dlt");
233243
}
234244

235245
@Test
236-
void shouldRetryManualTopicWithDefaultDlt(@Autowired KafkaListenerEndpointRegistry registry,
237-
@Autowired ConsumerFactory<String, String> cf) {
246+
void shouldRetryManualTopicWithDefaultDlt(
247+
@Autowired KafkaListenerEndpointRegistry registry,
248+
@Autowired ConsumerFactory<String, String> cf) {
238249

239250
kafkaTemplate.send(MANUAL_TOPIC, "Testing topic 6");
240251
assertThat(awaitLatch(latchContainer.countDownLatch6)).isTrue();
@@ -243,9 +254,10 @@ void shouldRetryManualTopicWithDefaultDlt(@Autowired KafkaListenerEndpointRegist
243254
.forEach(id -> {
244255
ConcurrentMessageListenerContainer<?, ?> container =
245256
(ConcurrentMessageListenerContainer<?, ?>) registry.getListenerContainer(id);
246-
assertThat(container).extracting("commonErrorHandler")
247-
.extracting("seekAfterError", InstanceOfAssertFactories.BOOLEAN)
248-
.isFalse();
257+
assertThat(container)
258+
.extracting("commonErrorHandler")
259+
.extracting("seekAfterError", InstanceOfAssertFactories.BOOLEAN)
260+
.isFalse();
249261
});
250262
Consumer<String, String> consumer = cf.createConsumer("manual-dlt", "");
251263
Set<org.apache.kafka.common.TopicPartition> tp =
@@ -276,14 +288,21 @@ void shouldFirstReuseRetryTopic(@Autowired
276288
assertThat(awaitLatch(latchContainer.countDownLatchReuseOne)).isTrue();
277289
assertThat(awaitLatch(latchContainer.countDownLatchReuseTwo)).isTrue();
278290
assertThat(awaitLatch(latchContainer.countDownLatchReuseThree)).isTrue();
279-
assertThat(listener1.topics).containsExactly(FIRST_REUSE_RETRY_TOPIC,
280-
FIRST_REUSE_RETRY_TOPIC + "-retry");
281-
assertThat(listener2.topics).containsExactly(SECOND_REUSE_RETRY_TOPIC,
282-
SECOND_REUSE_RETRY_TOPIC + "-retry-30", SECOND_REUSE_RETRY_TOPIC + "-retry-60",
283-
SECOND_REUSE_RETRY_TOPIC + "-retry-100", SECOND_REUSE_RETRY_TOPIC + "-retry-100");
284-
assertThat(listener3.topics).containsExactly(THIRD_REUSE_RETRY_TOPIC,
285-
THIRD_REUSE_RETRY_TOPIC + "-retry", THIRD_REUSE_RETRY_TOPIC + "-retry",
286-
THIRD_REUSE_RETRY_TOPIC + "-retry", THIRD_REUSE_RETRY_TOPIC + "-retry");
291+
assertThat(listener1.topics).containsExactly(
292+
FIRST_REUSE_RETRY_TOPIC,
293+
FIRST_REUSE_RETRY_TOPIC + "-retry");
294+
assertThat(listener2.topics).containsExactly(
295+
SECOND_REUSE_RETRY_TOPIC,
296+
SECOND_REUSE_RETRY_TOPIC + "-retry-30",
297+
SECOND_REUSE_RETRY_TOPIC + "-retry-60",
298+
SECOND_REUSE_RETRY_TOPIC + "-retry-100",
299+
SECOND_REUSE_RETRY_TOPIC + "-retry-100");
300+
assertThat(listener3.topics).containsExactly(
301+
THIRD_REUSE_RETRY_TOPIC,
302+
THIRD_REUSE_RETRY_TOPIC + "-retry",
303+
THIRD_REUSE_RETRY_TOPIC + "-retry",
304+
THIRD_REUSE_RETRY_TOPIC + "-retry",
305+
THIRD_REUSE_RETRY_TOPIC + "-retry");
287306
}
288307

289308
@Test
@@ -324,7 +343,8 @@ public CompletableFuture<Void> listen(String message, @Header(KafkaHeaders.RECEI
324343
container.countDownLatch1.countDown();
325344
try {
326345
Thread.sleep(1);
327-
} catch (InterruptedException e) {
346+
}
347+
catch (InterruptedException e) {
328348
throw new RuntimeException(e);
329349
}
330350
throw new RuntimeException("Woooops... in topic " + receivedTopic);
@@ -345,7 +365,8 @@ public CompletableFuture<Void> listenAgain(String message, @Header(KafkaHeaders.
345365
container.countDownIfNotKnown(receivedTopic, container.countDownLatch2);
346366
try {
347367
Thread.sleep(1);
348-
} catch (InterruptedException e) {
368+
}
369+
catch (InterruptedException e) {
349370
throw new RuntimeException(e);
350371
}
351372
throw new IllegalStateException("Another woooops... " + receivedTopic);
@@ -377,7 +398,8 @@ public CompletableFuture<Void> listenWithAnnotation(String message, @Header(Kafk
377398
container.countDownIfNotKnown(receivedTopic, container.countDownLatch3);
378399
try {
379400
Thread.sleep(1);
380-
} catch (InterruptedException e) {
401+
}
402+
catch (InterruptedException e) {
381403
throw new RuntimeException(e);
382404
}
383405
throw new MyRetryException("Annotated woooops... " + receivedTopic);
@@ -405,7 +427,8 @@ public CompletableFuture<Void> listenNoDlt(String message, @Header(KafkaHeaders.
405427
container.countDownIfNotKnown(receivedTopic, container.countDownLatch4);
406428
try {
407429
Thread.sleep(1);
408-
} catch (InterruptedException e) {
430+
}
431+
catch (InterruptedException e) {
409432
throw new RuntimeException(e);
410433
}
411434
throw new IllegalStateException("Another woooops... " + receivedTopic);
@@ -456,7 +479,8 @@ public CompletableFuture<Void> listenWithAnnotation(String message, @Header(Kafk
456479
container.countDownIfNotKnown(receivedTopic, container.countDownLatch51);
457480
try {
458481
Thread.sleep(1);
459-
} catch (InterruptedException e) {
482+
}
483+
catch (InterruptedException e) {
460484
throw new RuntimeException(e);
461485
}
462486
throw new RuntimeException("Annotated woooops... " + receivedTopic);
@@ -488,7 +512,8 @@ public CompletableFuture<Void> listenWithAnnotation2(String message, @Header(Kaf
488512
container.countDownLatch52.countDown();
489513
try {
490514
Thread.sleep(1);
491-
} catch (InterruptedException e) {
515+
}
516+
catch (InterruptedException e) {
492517
throw new RuntimeException(e);
493518
}
494519
throw new RuntimeException("Annotated woooops... " + receivedTopic);
@@ -511,13 +536,16 @@ static class SixthTopicDefaultDLTListener {
511536
CountDownLatchContainer container;
512537

513538
@KafkaHandler
514-
public CompletableFuture<Void> listenNoDlt(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic,
515-
@SuppressWarnings("unused") Acknowledgment ack) {
539+
public CompletableFuture<Void> listenNoDlt(
540+
String message,
541+
@Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic,
542+
@SuppressWarnings("unused") Acknowledgment ack) {
516543
return CompletableFuture.supplyAsync(() -> {
517544
container.countDownIfNotKnown(receivedTopic, container.countDownLatch6);
518545
try {
519546
Thread.sleep(1);
520-
} catch (InterruptedException e) {
547+
}
548+
catch (InterruptedException e) {
521549
throw new RuntimeException(e);
522550
}
523551
throw new IllegalStateException("Another woooops... " + receivedTopic);
@@ -545,7 +573,8 @@ public CompletableFuture<Object> listenWithAnnotation2(String message, @Header(K
545573
container.countDownIfNotKnown(receivedTopic, container.countDownLatchNoRetry);
546574
try {
547575
Thread.sleep(1);
548-
} catch (InterruptedException e) {
576+
}
577+
catch (InterruptedException e) {
549578
throw new RuntimeException(e);
550579
}
551580
throw new MyDontRetryException("Annotated second woooops... " + receivedTopic);
@@ -579,7 +608,8 @@ public CompletableFuture<Void> listen1(String message, @Header(KafkaHeaders.RECE
579608
container.countDownLatchReuseOne.countDown();
580609
try {
581610
Thread.sleep(1);
582-
} catch (InterruptedException e) {
611+
}
612+
catch (InterruptedException e) {
583613
throw new RuntimeException(e);
584614
}
585615
throw new RuntimeException("Another woooops... " + receivedTopic);
@@ -609,7 +639,8 @@ public CompletableFuture<Void> listen2(String message, @Header(KafkaHeaders.RECE
609639
container.countDownLatchReuseTwo.countDown();
610640
try {
611641
Thread.sleep(1);
612-
} catch (InterruptedException e) {
642+
}
643+
catch (InterruptedException e) {
613644
throw new RuntimeException(e);
614645
}
615646
throw new RuntimeException("Another woooops... " + receivedTopic);
@@ -635,7 +666,8 @@ public CompletableFuture<Void> listen3(String message, @Header(KafkaHeaders.RECE
635666
container.countDownLatchReuseThree.countDown();
636667
try {
637668
Thread.sleep(1);
638-
} catch (InterruptedException e) {
669+
}
670+
catch (InterruptedException e) {
639671
throw new RuntimeException(e);
640672
}
641673
throw new RuntimeException("Another woooops... " + receivedTopic);

0 commit comments

Comments
 (0)