Skip to content

Commit 463ca22

Browse files
DidierLoiseausobychacko
authored andcommitted
Fixes: #3066
Issue link: #3066 Signed-off-by: DidierLoiseau <[email protected]>
1 parent 9a311e1 commit 463ca22

File tree

2 files changed

+104
-14
lines changed

2 files changed

+104
-14
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@
165165
* @author Byungjun You
166166
* @author Oliver Führer
167167
* @author Omer Celik
168+
* @author Didier Loiseau
168169
*/
169170
public class KafkaMessageChannelBinder extends
170171
// @checkstyle:off
@@ -566,18 +567,24 @@ protected boolean useNativeEncoding(
566567
}
567568

568569
@Override
569-
@SuppressWarnings("unchecked")
570570
protected MessageProducer createConsumerEndpoint(
571571
final ConsumerDestination destination, final String group,
572572
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
573+
return createConsumerEndpointCaptureHelper(destination, group, extendedConsumerProperties);
574+
}
575+
576+
@SuppressWarnings("unchecked")
577+
private <K, V> MessageProducer createConsumerEndpointCaptureHelper(
578+
final ConsumerDestination destination, final String group,
579+
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
573580

574581
boolean anonymous = !StringUtils.hasText(group);
575582
Assert.isTrue(
576583
!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
577584
"DLQ support is not available for anonymous subscriptions");
578585
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString()
579586
: group;
580-
final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(
587+
final ConsumerFactory<K, V> consumerFactory = (ConsumerFactory<K, V>) createKafkaConsumerFactory(
581588
anonymous, consumerGroup, extendedConsumerProperties, destination.getName() + ".consumer", destination.getName());
582589
int partitionCount = extendedConsumerProperties.getInstanceCount()
583590
* extendedConsumerProperties.getConcurrency();
@@ -647,9 +654,8 @@ protected MessageProducer createConsumerEndpoint(
647654
}
648655
resetOffsetsForAutoRebalance(extendedConsumerProperties, consumerFactory, containerProperties);
649656
containerProperties.setAuthExceptionRetryInterval(this.configurationProperties.getAuthorizationExceptionRetryInterval());
650-
@SuppressWarnings("rawtypes")
651-
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer = new ConcurrentMessageListenerContainer(
652-
consumerFactory, containerProperties) {
657+
final ConcurrentMessageListenerContainer<K, V> messageListenerContainer =
658+
new ConcurrentMessageListenerContainer<>(consumerFactory, containerProperties) {
653659

654660
@Override
655661
public void stop(Runnable callback) {
@@ -677,8 +683,7 @@ else if (applicationContext != null) {
677683
ContainerProperties.AckMode ackMode = extendedConsumerProperties.getExtension().getAckMode();
678684

679685
if (ackMode != null) {
680-
if ((extendedConsumerProperties.isBatchMode() && ackMode != ContainerProperties.AckMode.RECORD) ||
681-
!extendedConsumerProperties.isBatchMode()) {
686+
if (!extendedConsumerProperties.isBatchMode() || ackMode != ContainerProperties.AckMode.RECORD) {
682687
messageListenerContainer.getContainerProperties()
683688
.setAckMode(ackMode);
684689
}
@@ -715,7 +720,7 @@ else if (applicationContext != null) {
715720
}
716721
}
717722
else if (!extendedConsumerProperties.isBatchMode() && transMan != null) {
718-
messageListenerContainer.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(
723+
var afterRollbackProcessor = new DefaultAfterRollbackProcessor<K, V>(
719724
(record, exception) -> {
720725
MessagingException payload =
721726
new MessagingException(((RecordMessageConverter) messageConverter)
@@ -740,7 +745,31 @@ else if (!extendedConsumerProperties.isBatchMode() && transMan != null) {
740745
}
741746
}, createBackOff(extendedConsumerProperties),
742747
new KafkaTemplate<>(transMan.getProducerFactory()),
743-
extendedConsumerProperties.getExtension().isTxCommitRecovered()));
748+
extendedConsumerProperties.getExtension().isTxCommitRecovered());
749+
if (!CollectionUtils.isEmpty(extendedConsumerProperties.getRetryableExceptions())) {
750+
// mimic AbstractBinder.buildRetryTemplate(properties)’s retryPolicy
751+
if (!extendedConsumerProperties.isDefaultRetryable()) {
752+
afterRollbackProcessor.defaultFalse(true);
753+
}
754+
extendedConsumerProperties.getRetryableExceptions()
755+
.forEach((t, retry) -> {
756+
if (Exception.class.isAssignableFrom(t)) {
757+
var ex = t.asSubclass(Exception.class);
758+
if (retry) {
759+
afterRollbackProcessor.addRetryableExceptions(ex);
760+
}
761+
else {
762+
afterRollbackProcessor.addNotRetryableExceptions(ex);
763+
}
764+
}
765+
else {
766+
throw new IllegalArgumentException(
767+
"Only Exception types can be configured as retryable-exceptions together with transactions. "
768+
+ "Unsupported type: " + t.getName());
769+
}
770+
});
771+
}
772+
messageListenerContainer.setAfterRollbackProcessor(afterRollbackProcessor);
744773
}
745774
else {
746775
kafkaMessageDrivenChannelAdapter.setErrorChannel(errorInfrastructure.getErrorChannel());

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.atomic.AtomicBoolean;
3737
import java.util.concurrent.atomic.AtomicInteger;
3838
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.function.Supplier;
3940
import java.util.stream.IntStream;
4041

4142
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -172,7 +173,8 @@
172173
* @author Henryk Konsek
173174
* @author Gary Russell
174175
* @author Chris Bono
175-
* @Author Oliver Führer
176+
* @author Oliver Führer
177+
* @author Didier Loiseau
176178
*/
177179
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = "error.pollableDlq.group-pcWithDlq", brokerProperties = {"transaction.state.log.replication.factor=1",
178180
"transaction.state.log.min.isr=1"})
@@ -1052,11 +1054,41 @@ void dlqAndRetry() throws Exception {
10521054
testDlqGuts(true, null, null, false, false);
10531055
}
10541056

1057+
@Test
1058+
void dlqAndRetryWithNonRetryableException() throws Exception {
1059+
testDlqGuts(true, null, null, false, false, true, true);
1060+
}
1061+
1062+
@Test
1063+
void dlqAndRetryDefaultFalse() throws Exception {
1064+
testDlqGuts(true, null, null, false, false, false, false);
1065+
}
1066+
1067+
@Test
1068+
void dlqAndRetryDefaultFalseWithRetryableException() throws Exception {
1069+
testDlqGuts(true, null, null, false, false, false, true);
1070+
}
1071+
10551072
@Test
10561073
void dlqAndRetryTransactional() throws Exception {
10571074
testDlqGuts(true, null, null, true, false);
10581075
}
10591076

1077+
@Test
1078+
void dlqAndRetryWithNonRetryableExceptionTransactional() throws Exception {
1079+
testDlqGuts(true, null, null, true, false, true, true);
1080+
}
1081+
1082+
@Test
1083+
void dlqAndRetryDefaultFalseTransactional() throws Exception {
1084+
testDlqGuts(true, null, null, true, false, false, false);
1085+
}
1086+
1087+
@Test
1088+
void dlqAndRetryDefaultFalseWithRetryableExceptionTransactional() throws Exception {
1089+
testDlqGuts(true, null, null, true, false, false, true);
1090+
}
1091+
10601092
@Test
10611093
void dlq() throws Exception {
10621094
testDlqGuts(false, null, 3, false, false);
@@ -1084,6 +1116,14 @@ void dlqEmbedded() throws Exception {
10841116

10851117
private void testDlqGuts(boolean withRetry, HeaderMode headerMode, Integer dlqPartitions,
10861118
boolean transactional, boolean useDlqDestResolver) throws Exception {
1119+
testDlqGuts(withRetry, headerMode, dlqPartitions, transactional,
1120+
useDlqDestResolver, true, false);
1121+
}
1122+
1123+
private void testDlqGuts(boolean withRetry, HeaderMode headerMode,
1124+
Integer dlqPartitions, boolean transactional, boolean useDlqDestResolver,
1125+
boolean defaultRetryable, boolean useConfiguredRetryableException)
1126+
throws Exception {
10871127

10881128
int expectedDlqPartition = dlqPartitions == null ? 0 : dlqPartitions - 1;
10891129
KafkaBinderConfigurationProperties binderConfig = createConfigurationProperties();
@@ -1128,12 +1168,18 @@ else if (dlqPartitions == null) {
11281168
consumerProperties.getExtension().setDlqPartitions(dlqPartitions);
11291169
consumerProperties.setConcurrency(2);
11301170
consumerProperties.populateBindingName("foobar");
1171+
consumerProperties.setDefaultRetryable(defaultRetryable);
1172+
consumerProperties.getRetryableExceptions().put(NumberFormatException.class,
1173+
!defaultRetryable);
11311174

11321175
DirectChannel moduleInputChannel = createBindableChannel("input",
11331176
createConsumerBindingProperties(consumerProperties));
11341177

11351178
var dlqChannel = new QueueChannel();
1136-
var handler = new FailingInvocationCountingMessageHandler();
1179+
var handler = new FailingInvocationCountingMessageHandler(
1180+
() -> useConfiguredRetryableException
1181+
? new NumberFormatException("fail")
1182+
: new RuntimeException("fail"));
11371183
moduleInputChannel.subscribe(handler);
11381184

11391185
long uniqueBindingId = System.currentTimeMillis();
@@ -1253,8 +1299,10 @@ else if (dlqPartitions == null) {
12531299
.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(expectedDlqPartition);
12541300
}
12551301
else if (!HeaderMode.none.equals(headerMode)) {
1302+
boolean shouldHaveRetried = defaultRetryable != useConfiguredRetryableException;
12561303
assertThat(handler.getInvocationCount())
1257-
.isEqualTo(consumerProperties.getMaxAttempts());
1304+
.isEqualTo(
1305+
shouldHaveRetried ? consumerProperties.getMaxAttempts() : 1);
12581306

12591307
assertThat(receivedMessage.getHeaders()
12601308
.get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
@@ -4090,14 +4138,27 @@ private static void assertionsOnKafkaTemplate(TestObservationRegistry observatio
40904138
private final class FailingInvocationCountingMessageHandler
40914139
implements MessageHandler {
40924140

4141+
private final Supplier<? extends RuntimeException> exceptionProvider;
4142+
40934143
private volatile int invocationCount;
40944144

40954145
private final LinkedHashMap<Long, Message<?>> receivedMessages = new LinkedHashMap<>();
40964146

40974147
private final CountDownLatch latch;
40984148

4099-
private FailingInvocationCountingMessageHandler(int latchSize) {
4149+
private FailingInvocationCountingMessageHandler(int latchSize,
4150+
Supplier<? extends RuntimeException> exceptionProvider) {
41004151
latch = new CountDownLatch(latchSize);
4152+
this.exceptionProvider = exceptionProvider;
4153+
}
4154+
4155+
private FailingInvocationCountingMessageHandler(
4156+
Supplier<? extends RuntimeException> exceptionProvider) {
4157+
this(1, exceptionProvider);
4158+
}
4159+
4160+
private FailingInvocationCountingMessageHandler(int latchSize) {
4161+
this(latchSize, () -> new RuntimeException("fail"));
41014162
}
41024163

41034164
private FailingInvocationCountingMessageHandler() {
@@ -4115,7 +4176,7 @@ public void handleMessage(Message<?> message) throws MessagingException {
41154176
receivedMessages.put(offset, message);
41164177
latch.countDown();
41174178
}
4118-
throw new RuntimeException("fail");
4179+
throw exceptionProvider.get();
41194180
}
41204181

41214182
public LinkedHashMap<Long, Message<?>> getReceivedMessages() {

0 commit comments

Comments
 (0)