Skip to content

Commit 2f2010d

Browse files
garyrussellartembilan
authored andcommitted
GH-1643: Add More ConsumerStoppedEvent.Reasons
Resolves #1643 **cherry-pick to 2.5.x**
1 parent 1140d5e commit 2f2010d

File tree

4 files changed

+43
-5
lines changed

4 files changed

+43
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public class ConsumerStoppedEvent extends KafkaEvent {
3131

3232
private static final long serialVersionUID = 1L;
3333

34+
/**
35+
* Reasons for stopping a consumer.
36+
* @since 2.5.9
37+
*
38+
*/
3439
public enum Reason {
3540

3641
/**
@@ -44,6 +49,18 @@ public enum Reason {
4449
*/
4550
FENCED,
4651

52+
/**
53+
* An authorization exception occurred.
54+
* @since 2.5.10
55+
*/
56+
AUTH,
57+
58+
/**
59+
* No offset found for a partition and no reset policy.
60+
* @since 2.5.10
61+
*/
62+
NO_OFFSET,
63+
4764
/**
4865
* A {@link java.lang.Error} was thrown.
4966
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -404,12 +404,24 @@ private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
404404

405405
private void publishConsumerStoppedEvent(@Nullable Throwable throwable) {
406406
if (getApplicationEventPublisher() != null) {
407+
Reason reason;
408+
if (throwable instanceof Error) {
409+
reason = Reason.ERROR;
410+
}
411+
else if (throwable instanceof StopAfterFenceException || throwable instanceof FencedInstanceIdException) {
412+
reason = Reason.FENCED;
413+
}
414+
else if (throwable instanceof AuthorizationException) {
415+
reason = Reason.AUTH;
416+
}
417+
else if (throwable instanceof NoOffsetForPartitionException) {
418+
reason = Reason.NO_OFFSET;
419+
}
420+
else {
421+
reason = Reason.NORMAL;
422+
}
407423
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer,
408-
throwable instanceof Error
409-
? Reason.ERROR
410-
: throwable instanceof StopAfterFenceException
411-
? Reason.FENCED
412-
: Reason.NORMAL));
424+
reason));
413425
}
414426
}
415427

@@ -1066,12 +1078,14 @@ public void run() { // NOSONAR complexity
10661078
catch (NoOffsetForPartitionException nofpe) {
10671079
this.fatalError = true;
10681080
ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
1081+
exitThrowable = nofpe;
10691082
break;
10701083
}
10711084
catch (AuthorizationException ae) {
10721085
if (this.authorizationExceptionRetryInterval == null) {
10731086
ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");
10741087
this.fatalError = true;
1088+
exitThrowable = ae;
10751089
break;
10761090
}
10771091
else {
@@ -1087,6 +1101,7 @@ public void run() { // NOSONAR complexity
10871101
this.fatalError = true;
10881102
ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
10891103
+ "' has been fenced");
1104+
exitThrowable = fie;
10901105
break;
10911106
}
10921107
catch (StopAfterFenceException e) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import org.springframework.kafka.event.ConsumerPausedEvent;
9595
import org.springframework.kafka.event.ConsumerResumedEvent;
9696
import org.springframework.kafka.event.ConsumerStoppedEvent;
97+
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
9798
import org.springframework.kafka.event.ConsumerStoppingEvent;
9899
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
99100
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@@ -2726,16 +2727,19 @@ void testFatalErrorOnAuthorizationException() throws Exception {
27262727
KafkaMessageListenerContainer<Integer, String> container =
27272728
new KafkaMessageListenerContainer<>(cf, containerProps);
27282729

2730+
AtomicReference<ConsumerStoppedEvent.Reason> reason = new AtomicReference<>();
27292731
CountDownLatch stopped = new CountDownLatch(1);
27302732

27312733
container.setApplicationEventPublisher(e -> {
27322734
if (e instanceof ConsumerStoppedEvent) {
2735+
reason.set(((ConsumerStoppedEvent) e).getReason());
27332736
stopped.countDown();
27342737
}
27352738
});
27362739

27372740
container.start();
27382741
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
2742+
assertThat(reason.get()).isEqualTo(Reason.AUTH);
27392743
container.stop();
27402744
}
27412745

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2655,6 +2655,8 @@ In addition, the `ConsumerStoppedEvent` has the following additional property:
26552655
** `NORMAL` - the consumer stopped normally (container was stopped).
26562656
** `ERROR` - a `java.lang.Error` was thrown.
26572657
** `FENCED` - the transactional producer was fenced and the `stopContainerWhenFenced` container property is `true`.
2658+
** `AUTH` - an `AuthorizationException` was thrown and the `authorizationExceptionRetryInterval` is not configured.
2659+
** `NO_OFFSET` - there is no offset for a partition and the `auto.offset.reset` policy is `none`.
26582660

26592661
You can use this event to restart the container after such a condition:
26602662

0 commit comments

Comments
 (0)