Skip to content

Commit a306c58

Browse files
committed
GH-1643: Add More ConsumerStoppedEvent.Reasons
Resolves #1643 **cherry-pick to 2.5.x**
1 parent 1118056 commit a306c58

File tree

4 files changed

+43
-5
lines changed

4 files changed

+43
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public class ConsumerStoppedEvent extends KafkaEvent {
3131

3232
private static final long serialVersionUID = 1L;
3333

34+
/**
35+
* Reasons for stopping a consumer.
36+
* @since 2.5.9
37+
*
38+
*/
3439
public enum Reason {
3540

3641
/**
@@ -44,6 +49,18 @@ public enum Reason {
4449
*/
4550
FENCED,
4651

52+
/**
53+
* An authorization exception occurred.
54+
* @since 2.5.10
55+
*/
56+
AUTH,
57+
58+
/**
59+
* No offset found for a partition and no reset policy.
60+
* @since 2.5.10
61+
*/
62+
NO_OFFSET,
63+
4764
/**
4865
* A {@link java.lang.Error} was thrown.
4966
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,12 +396,24 @@ private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
396396

397397
private void publishConsumerStoppedEvent(@Nullable Throwable throwable) {
398398
if (getApplicationEventPublisher() != null) {
399+
Reason reason;
400+
if (throwable instanceof Error) {
401+
reason = Reason.ERROR;
402+
}
403+
else if (throwable instanceof StopAfterFenceException || throwable instanceof FencedInstanceIdException) {
404+
reason = Reason.FENCED;
405+
}
406+
else if (throwable instanceof AuthorizationException) {
407+
reason = Reason.AUTH;
408+
}
409+
else if (throwable instanceof NoOffsetForPartitionException) {
410+
reason = Reason.NO_OFFSET;
411+
}
412+
else {
413+
reason = Reason.NORMAL;
414+
}
399415
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer,
400-
throwable instanceof Error
401-
? Reason.ERROR
402-
: throwable instanceof StopAfterFenceException
403-
? Reason.FENCED
404-
: Reason.NORMAL));
416+
reason));
405417
}
406418
}
407419

@@ -1050,12 +1062,14 @@ public void run() {
10501062
catch (NoOffsetForPartitionException nofpe) {
10511063
this.fatalError = true;
10521064
ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
1065+
exitThrowable = nofpe;
10531066
break;
10541067
}
10551068
catch (AuthorizationException ae) {
10561069
if (this.authorizationExceptionRetryInterval == null) {
10571070
ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");
10581071
this.fatalError = true;
1072+
exitThrowable = ae;
10591073
break;
10601074
}
10611075
else {
@@ -1071,6 +1085,7 @@ public void run() {
10711085
this.fatalError = true;
10721086
ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
10731087
+ "' has been fenced");
1088+
exitThrowable = fie;
10741089
break;
10751090
}
10761091
catch (StopAfterFenceException e) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.springframework.kafka.event.ConsumerPausedEvent;
9595
import org.springframework.kafka.event.ConsumerResumedEvent;
9696
import org.springframework.kafka.event.ConsumerStoppedEvent;
97+
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
9798
import org.springframework.kafka.event.ConsumerStoppingEvent;
9899
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
99100
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@@ -2726,16 +2727,19 @@ void testFatalErrorOnAuthorizationException() throws Exception {
27262727
KafkaMessageListenerContainer<Integer, String> container =
27272728
new KafkaMessageListenerContainer<>(cf, containerProps);
27282729

2730+
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
27292731
CountDownLatch stopped = new CountDownLatch(1);
27302732

27312733
container.setApplicationEventPublisher(e -> {
27322734
if (e instanceof ConsumerStoppedEvent) {
2735+
reason.set(((ConsumerStoppedEvent) e).getReason());
27332736
stopped.countDown();
27342737
}
27352738
});
27362739

27372740
container.start();
27382741
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
2742+
assertThat(reason.get()).isEqualTo(Reason.AUTH);
27392743
container.stop();
27402744
}
27412745

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2555,6 +2555,8 @@ In addition, the `ConsumerStoppedEvent` has the following additional property:
25552555
** `NORMAL` - the consumer stopped normally (container was stopped).
25562556
** `ERROR` - a `java.lang.Error` was thrown.
25572557
** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`.
2558+
** `AUTH` - an `AuthorizationException` was thrown and the `authorizationExceptionRetryInterval` is not configured.
2559+
** `NO_OFFSET` - there is no offset for a partition and the `auto.offset.reset` policy is `none`.
25582560

25592561
You can use this event to restart the container after such a condition:
25602562

0 commit comments

Comments
 (0)