Skip to content

Commit 5e32ad9

Browse files
authored
FailedRecordProcessor Polishing
- remove unnecessary assertions - it is no longer possible to set the classifier - add varargs version of `addNotRetryableExceptions` * Fix assertion in test.
1 parent 35bdf28 commit 5e32ad9

File tree

3 files changed

+43
-16
lines changed

3 files changed

+43
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public abstract class FailedRecordProcessor extends KafkaExceptionLogLevelAware
5252

5353
private final FailedRecordTracker failureTracker;
5454

55-
private BinaryExceptionClassifier classifier;
55+
private ExtendedBinaryExceptionClassifier classifier;
5656

5757
private boolean commitRecovered;
5858

@@ -85,6 +85,7 @@ protected BinaryExceptionClassifier getClassifier() {
8585
* @param classifications the classifications.
8686
* @param defaultValue whether or not to retry non-matching exceptions.
8787
* @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
88+
* @see #addNotRetryableExceptions(Class...)
8889
*/
8990
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
9091
Assert.notNull(classifications, "'classifications' + cannot be null");
@@ -123,8 +124,8 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
123124
}
124125

125126
/**
126-
* Add an exception type to the default list; if and only if an external classifier
127-
* has not been provided. By default, the following exceptions will not be retried:
127+
* Add an exception type to the default list. By default, the following exceptions
128+
* will not be retried:
128129
* <ul>
129130
* <li>{@link DeserializationException}</li>
130131
* <li>{@link MessageConversionException}</li>
@@ -134,19 +135,47 @@ public int deliveryAttempt(TopicPartitionOffset topicPartitionOffset) {
134135
* </ul>
135136
* All others will be retried.
136137
* @param exceptionType the exception type.
138+
* @deprecated in favor of {@link #addNotRetryableExceptions(Class...)}.
137139
* @see #removeNotRetryableException(Class)
138140
* @see #setClassifications(Map, boolean)
139141
*/
142+
@Deprecated
140143
public void addNotRetryableException(Class<? extends Exception> exceptionType) {
141-
Assert.isTrue(this.classifier instanceof ExtendedBinaryExceptionClassifier,
142-
"Cannot add exception types to a supplied classifier");
143-
((ExtendedBinaryExceptionClassifier) this.classifier).getClassified().put(exceptionType, false);
144+
Assert.notNull(exceptionType, "'exceptionType' cannot be null");
145+
this.classifier.getClassified().put(exceptionType, false);
144146
}
145147

146148
/**
147-
* Remove an exception type from the configured list; if and only if an external
148-
* classifier has not been provided. By default, the following exceptions will not be
149-
* retried:
149+
* Add exception types to the default list. By default, the following exceptions will
150+
* not be retried:
151+
* <ul>
152+
* <li>{@link DeserializationException}</li>
153+
* <li>{@link MessageConversionException}</li>
154+
* <li>{@link MethodArgumentResolutionException}</li>
155+
* <li>{@link NoSuchMethodException}</li>
156+
* <li>{@link ClassCastException}</li>
157+
* </ul>
158+
* All others will be retried.
159+
* @param exceptionTypes the exception types.
160+
* @since 2.6
161+
* @see #removeNotRetryableException(Class)
162+
* @see #setClassifications(Map, boolean)
163+
*/
164+
@SafeVarargs
165+
@SuppressWarnings("varargs")
166+
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
167+
Assert.notNull(exceptionTypes, "'exceptionTypes' cannot be null");
168+
Assert.noNullElements(exceptionTypes, "'exceptionTypes' cannot contain nulls");
169+
for (Class<? extends Exception> exceptionType : exceptionTypes) {
170+
Assert.isTrue(Exception.class.isAssignableFrom(exceptionType),
171+
() -> "exceptionType " + exceptionType + " must be an Exception");
172+
this.classifier.getClassified().put(exceptionType, false);
173+
}
174+
}
175+
176+
/**
177+
* Remove an exception type from the configured list. By default, the following
178+
* exceptions will not be retried:
150179
* <ul>
151180
* <li>{@link DeserializationException}</li>
152181
* <li>{@link MessageConversionException}</li>
@@ -157,13 +186,11 @@ public void addNotRetryableException(Class<? extends Exception> exceptionType) {
157186
* All others will be retried.
158187
* @param exceptionType the exception type.
159188
* @return true if the removal was successful.
160-
* @see #addNotRetryableException(Class)
189+
* @see #addNotRetryableExceptions(Class...)
161190
* @see #setClassifications(Map, boolean)
162191
*/
163192
public boolean removeNotRetryableException(Class<? extends Exception> exceptionType) {
164-
Assert.isTrue(this.classifier instanceof ExtendedBinaryExceptionClassifier,
165-
"Cannot remove exception types from a supplied classifier");
166-
return ((ExtendedBinaryExceptionClassifier) this.classifier).getClassified().remove(exceptionType);
193+
return this.classifier.getClassified().remove(exceptionType);
167194
}
168195

169196
protected BiPredicate<ConsumerRecord<?, ?>, Exception> getSkipPredicate(List<ConsumerRecord<?, ?>> records,
@@ -191,7 +218,7 @@ public void clearThreadState() {
191218
this.failureTracker.clearThreadState();
192219
}
193220

194-
private static BinaryExceptionClassifier configureDefaultClassifier() {
221+
private static ExtendedBinaryExceptionClassifier configureDefaultClassifier() {
195222
Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
196223
classified.put(DeserializationException.class, false);
197224
classified.put(MessageConversionException.class, false);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void testClassifier() {
7575
verify(template).sendOffsetsToTransaction(anyMap());
7676
verify(template, never()).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class));
7777
assertThat(recovered.get()).isSameAs(record1);
78-
processor.addNotRetryableException(IllegalStateException.class);
78+
processor.addNotRetryableExceptions(IllegalStateException.class);
7979
recovered.set(null);
8080
recovererShouldFail.set(true);
8181
processor.process(records, consumer, illegalState, true, EOSMode.ALPHA);

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ consumer, mock(MessageListenerContainer.class)))
6868
handler.handle(new DeserializationException("intended", null, false, illegalState), records,
6969
consumer, mock(MessageListenerContainer.class));
7070
assertThat(recovered.get()).isSameAs(record1);
71-
handler.addNotRetryableException(IllegalStateException.class);
71+
handler.addNotRetryableExceptions(IllegalStateException.class);
7272
recovered.set(null);
7373
recovererShouldFail.set(true);
7474
assertThatExceptionOfType(RuntimeException.class).isThrownBy(() ->

0 commit comments

Comments
 (0)