Skip to content

Commit feb4fa0

Browse files
garyrussellartembilan
authored andcommitted
GH-1289: STCEH - Fix IndexOutOfBoundsException
Resolves #1289 Fix IOOBE If a not-retryable exception was thrown outside of record handling (when no records are passed to the error handler). Throw an illegal state exception if this condition occurs.
1 parent 30dca49 commit feb4fa0

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.messaging.converter.MessageConversionException;
3838
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
3939
import org.springframework.util.Assert;
40+
import org.springframework.util.ObjectUtils;
4041
import org.springframework.util.backoff.BackOff;
4142
import org.springframework.util.backoff.FixedBackOff;
4243

@@ -181,10 +182,17 @@ public void setClassifier(BinaryExceptionClassifier classifier) {
181182
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
182183
Consumer<?, ?> consumer, MessageListenerContainer container) {
183184

184-
if (thrownException instanceof SerializationException) {
185-
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly, "
186-
+ "please consider configuring an 'ErrorHandlingDeserializer2' in the value and/or key "
187-
+ "deserializer", thrownException);
185+
if (ObjectUtils.isEmpty(records)) {
186+
if (thrownException instanceof SerializationException) {
187+
throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; "
188+
+ "please consider configuring an 'ErrorHandlingDeserializer2' in the value and/or key "
189+
+ "deserializer", thrownException);
190+
}
191+
else {
192+
throw new IllegalStateException("This error handler cannot process '"
193+
+ thrownException.getClass().getName()
194+
+ "'s; no record information is available", thrownException);
195+
}
188196
}
189197

190198
if (!SeekUtils.doSeeks(records, consumer, thrownException, true, getSkipPredicate(records, thrownException),

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentErrorHandlerTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static org.mockito.Mockito.times;
2525

2626
import java.util.Arrays;
27+
import java.util.Collections;
2728
import java.util.List;
2829
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.atomic.AtomicReference;
@@ -104,4 +105,13 @@ void testSerializationException() {
104105
.withCause(thrownException);
105106
}
106107

108+
@Test
109+
void testNotRetryableWithNoRecords() {
110+
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler();
111+
ClassCastException thrownException = new ClassCastException();
112+
assertThatIllegalStateException().isThrownBy(
113+
() -> handler.handle(thrownException, Collections.emptyList(), null, null))
114+
.withCause(thrownException);
115+
}
116+
107117
}

0 commit comments

Comments
 (0)