Skip to content

Commit f173706

Browse files
garyrussellartembilan
authored andcommitted
GH-2826: Fix CommonDelegatingErrorHandler
Resolves #2826 The `addDelegate` method did not update the classifier, so it failed to work when cause chain traversal is enabled. **cherry-pick to 3.0.x, 2.9.x** * Do not mutate the delegates field until after the validity check. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java
1 parent 1304168 commit f173706

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonDelegatingErrorHandler.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -70,15 +70,7 @@ public void setErrorHandlers(Map<Class<? extends Throwable>, CommonErrorHandler>
7070
Assert.notNull(delegates, "'delegates' cannot be null");
7171
this.delegates.clear();
7272
this.delegates.putAll(delegates);
73-
checkDelegates();
74-
updateClassifier(delegates);
75-
}
76-
77-
private void updateClassifier(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
78-
Map<Class<? extends Throwable>, Boolean> classifications = delegates.keySet().stream()
79-
.map(commonErrorHandler -> Map.entry(commonErrorHandler, true))
80-
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
81-
this.classifier = new BinaryExceptionClassifier(classifications);
73+
checkDelegatesAndUpdateClassifier(this.delegates);
8274
}
8375

8476
/**
@@ -125,12 +117,17 @@ public void setAckAfterHandle(boolean ack) {
125117
* @param handler the handler.
126118
*/
127119
public void addDelegate(Class<? extends Throwable> throwable, CommonErrorHandler handler) {
128-
this.delegates.put(throwable, handler);
129-
checkDelegates();
120+
Map<Class<? extends Throwable>, CommonErrorHandler> delegatesToCheck = new LinkedHashMap<>(this.delegates);
121+
delegatesToCheck.put(throwable, handler);
122+
checkDelegatesAndUpdateClassifier(delegatesToCheck);
123+
this.delegates.clear();
124+
this.delegates.putAll(delegatesToCheck);
130125
}
131126

132127
@SuppressWarnings("deprecation")
133-
private void checkDelegates() {
128+
private void checkDelegatesAndUpdateClassifier(Map<Class<? extends Throwable>,
129+
CommonErrorHandler> delegatesToCheck) {
130+
134131
boolean remainingRecords = this.defaultErrorHandler.remainingRecords();
135132
boolean ackAfterHandle = this.defaultErrorHandler.isAckAfterHandle();
136133
boolean seeksAfterHandling = this.defaultErrorHandler.seeksAfterHandling();
@@ -142,6 +139,14 @@ private void checkDelegates() {
142139
Assert.isTrue(seeksAfterHandling == handler.seeksAfterHandling(),
143140
"All delegates must return the same value when calling 'seeksAfterHandling()'");
144141
});
142+
updateClassifier(delegatesToCheck);
143+
}
144+
145+
private void updateClassifier(Map<Class<? extends Throwable>, CommonErrorHandler> delegates) {
146+
Map<Class<? extends Throwable>, Boolean> classifications = delegates.keySet().stream()
147+
.map(commonErrorHandler -> Map.entry(commonErrorHandler, true))
148+
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
149+
this.classifier = new BinaryExceptionClassifier(classifications);
145150
}
146151

147152
@Override

spring-kafka/src/test/java/org/springframework/kafka/listener/CommonDelegatingErrorHandlerTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.Mockito.mock;
2122
import static org.mockito.Mockito.never;
@@ -31,6 +32,7 @@
3132

3233
import org.springframework.kafka.KafkaException;
3334
import org.springframework.kafka.core.KafkaProducerException;
35+
import org.springframework.kafka.test.utils.KafkaTestUtils;
3436

3537
/**
3638
* Tests for {@link CommonDelegatingErrorHandler}.
@@ -134,7 +136,7 @@ void testDelegateForThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
134136
}
135137

136138
@Test
137-
@SuppressWarnings("ConstantConditions")
139+
@SuppressWarnings({ "ConstantConditions", "unchecked" })
138140
void testDelegateForClassifiableThrowableCauseIsAppliedWhenCauseTraversingIsEnabled() {
139141
var defaultHandler = mock(CommonErrorHandler.class);
140142

@@ -147,6 +149,10 @@ void testDelegateForClassifiableThrowableCauseIsAppliedWhenCauseTraversingIsEnab
147149
delegatingErrorHandler.setErrorHandlers(Map.of(
148150
KafkaException.class, directCauseErrorHandler
149151
));
152+
delegatingErrorHandler.addDelegate(IllegalStateException.class, mock(CommonErrorHandler.class));
153+
assertThat(KafkaTestUtils.getPropertyValue(delegatingErrorHandler, "classifier.classified", Map.class).keySet())
154+
.contains(IllegalStateException.class);
155+
150156

151157
delegatingErrorHandler.handleRemaining(exc, Collections.emptyList(), mock(Consumer.class),
152158
mock(MessageListenerContainer.class));

0 commit comments

Comments
 (0)