Skip to content

Commit 978868e

Browse files
committed
GH-1446: Support Static Group Membership
Resolves #1446 See https://kafka.apache.org/documentation/#static_membership Add a qualifier to `group.instance.id` for each child container. Catch `FencedInstanceIdException` and stop the container. Fix delay on self-stopping containers. Treat `FencedInstanceIdException` the same as `ProducerFencedException` when using transactions (do not call `AfterRollbackProcessor`. **cherry-pick to 2.4.x, 2.3.x** * Add docs. * Fix container stopped event publishing. * Publish stop event after actually stopped. * Restore publishContainerStoppedEvent() to proper place.
1 parent 2bbcc8f commit 978868e

File tree

5 files changed

+125
-16
lines changed

5 files changed

+125
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,16 +406,32 @@ public void checkGroupId() {
406406

407407
@Override
408408
public final void stop() {
409+
stop(true);
410+
}
411+
412+
/**
413+
* Stop the container.
414+
* @param wait wait for the listener to terminate.
415+
* @since 2.3.8
416+
*/
417+
public final void stop(boolean wait) {
409418
synchronized (this.lifecycleMonitor) {
410419
if (isRunning()) {
411-
final CountDownLatch latch = new CountDownLatch(1);
412-
doStop(latch::countDown);
413-
try {
414-
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
415-
publishContainerStoppedEvent();
420+
if (wait) {
421+
final CountDownLatch latch = new CountDownLatch(1);
422+
doStop(latch::countDown);
423+
try {
424+
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
425+
publishContainerStoppedEvent();
426+
}
427+
catch (@SuppressWarnings("unused") InterruptedException e) {
428+
Thread.currentThread().interrupt();
429+
}
416430
}
417-
catch (@SuppressWarnings("unused") InterruptedException e) {
418-
Thread.currentThread().interrupt();
431+
else {
432+
doStop(() -> {
433+
publishContainerStoppedEvent();
434+
});
419435
}
420436
}
421437
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.kafka.common.MetricName;
5555
import org.apache.kafka.common.TopicPartition;
5656
import org.apache.kafka.common.errors.AuthorizationException;
57+
import org.apache.kafka.common.errors.FencedInstanceIdException;
5758
import org.apache.kafka.common.errors.ProducerFencedException;
5859
import org.apache.kafka.common.errors.WakeupException;
5960

@@ -97,6 +98,7 @@
9798
import org.springframework.transaction.support.TransactionTemplate;
9899
import org.springframework.util.Assert;
99100
import org.springframework.util.ClassUtils;
101+
import org.springframework.util.StringUtils;
100102
import org.springframework.util.concurrent.ListenableFuture;
101103
import org.springframework.util.concurrent.ListenableFutureCallback;
102104

@@ -611,6 +613,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
611613
@SuppressWarnings(UNCHECKED)
612614
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
613615
Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties());
616+
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
614617
this.autoCommit = determineAutoCommit(consumerProperties);
615618
this.consumer =
616619
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
@@ -689,6 +692,23 @@ else if (listener instanceof MessageListener) {
689692
this.micrometerHolder = obtainMicrometerHolder();
690693
}
691694

695+
private void checkGroupInstance(Properties properties, ConsumerFactory<K, V> consumerFactory) {
696+
String groupInstance = properties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
697+
if (!StringUtils.hasText(groupInstance)) {
698+
Object factoryConfig = consumerFactory.getConfigurationProperties()
699+
.get(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG);
700+
if (factoryConfig instanceof String) {
701+
groupInstance = (String) factoryConfig;
702+
}
703+
}
704+
if (StringUtils.hasText(KafkaMessageListenerContainer.this.clientIdSuffix)
705+
&& StringUtils.hasText(groupInstance)) {
706+
707+
properties.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
708+
groupInstance + KafkaMessageListenerContainer.this.clientIdSuffix);
709+
}
710+
}
711+
692712
private boolean determineCommitCurrent(Properties consumerProperties) {
693713
if (AssignmentCommitOption.NEVER.equals(this.autoCommitOption)) {
694714
return false;
@@ -951,6 +971,12 @@ public void run() {
951971
sleepFor(this.authorizationExceptionRetryInterval);
952972
}
953973
}
974+
catch (FencedInstanceIdException fie) {
975+
this.fatalError = true;
976+
ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
977+
+ "' has been fenced");
978+
break;
979+
}
954980
catch (Exception e) {
955981
handleConsumerException(e);
956982
}
@@ -1144,7 +1170,7 @@ private void wrapUp() {
11441170
}
11451171
else {
11461172
this.logger.error("Fatal consumer exception; stopping container");
1147-
KafkaMessageListenerContainer.this.stop();
1173+
KafkaMessageListenerContainer.this.stop(false);
11481174
}
11491175
this.monitorTask.cancel(true);
11501176
if (!this.taskSchedulerExplicitlySet) {
@@ -1305,8 +1331,10 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
13051331
}
13061332
});
13071333
}
1308-
catch (ProducerFencedException e) {
1309-
this.logger.error(e, "Producer fenced during transaction");
1334+
catch (ProducerFencedException | FencedInstanceIdException e) {
1335+
this.logger.error(e, "Producer or '"
1336+
+ ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
1337+
+ "' fenced during transaction");
13101338
}
13111339
catch (RuntimeException e) {
13121340
this.logger.error(e, "Transaction rolled back");
@@ -1557,8 +1585,8 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
15571585

15581586
});
15591587
}
1560-
catch (ProducerFencedException e) {
1561-
this.logger.error(e, "Producer fenced during transaction");
1588+
catch (ProducerFencedException | FencedInstanceIdException e) {
1589+
this.logger.error(e, "Producer or 'group.instance.id' fenced during transaction");
15621590
break;
15631591
}
15641592
catch (RuntimeException e) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
import org.apache.kafka.clients.producer.ProducerConfig;
6969
import org.apache.kafka.common.TopicPartition;
7070
import org.apache.kafka.common.errors.AuthorizationException;
71+
import org.apache.kafka.common.errors.FencedInstanceIdException;
72+
import org.apache.kafka.common.errors.TopicAuthorizationException;
7173
import org.apache.kafka.common.errors.WakeupException;
7274
import org.apache.kafka.common.serialization.IntegerDeserializer;
7375
import org.junit.jupiter.api.BeforeAll;
@@ -2676,16 +2678,72 @@ void testFatalErrorOnAuthorizationException() throws Exception {
26762678
KafkaMessageListenerContainer<Integer, String> container =
26772679
new KafkaMessageListenerContainer<>(cf, containerProps);
26782680

2679-
CountDownLatch stopping = new CountDownLatch(1);
2681+
CountDownLatch stopped = new CountDownLatch(1);
26802682

26812683
container.setApplicationEventPublisher(e -> {
2682-
if (e instanceof ConsumerStoppingEvent) {
2683-
stopping.countDown();
2684+
if (e instanceof ConsumerStoppedEvent) {
2685+
stopped.countDown();
2686+
}
2687+
});
2688+
2689+
container.start();
2690+
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
2691+
container.stop();
2692+
}
2693+
2694+
@SuppressWarnings({ "unchecked", "rawtypes" })
2695+
@Test
2696+
void testNotFatalErrorOnAuthorizationException() throws Exception {
2697+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2698+
Consumer<Integer, String> consumer = mock(Consumer.class);
2699+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2700+
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
2701+
CountDownLatch latch = new CountDownLatch(2);
2702+
willAnswer(invoc -> {
2703+
latch.countDown();
2704+
throw new TopicAuthorizationException("test");
2705+
}).given(consumer).poll(any());
2706+
2707+
ContainerProperties containerProps = new ContainerProperties(topic1);
2708+
containerProps.setGroupId("grp");
2709+
containerProps.setClientId("clientId");
2710+
containerProps.setMessageListener((MessageListener) r -> { });
2711+
containerProps.setAuthorizationExceptionRetryInterval(Duration.ofMillis(100));
2712+
KafkaMessageListenerContainer<Integer, String> container =
2713+
new KafkaMessageListenerContainer<>(cf, containerProps);
2714+
container.start();
2715+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
2716+
container.stop();
2717+
}
2718+
2719+
@SuppressWarnings({ "unchecked", "rawtypes" })
2720+
@Test
2721+
void testFatalErrorOnFencedInstanceException() throws Exception {
2722+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2723+
Consumer<Integer, String> consumer = mock(Consumer.class);
2724+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2725+
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
2726+
2727+
willThrow(FencedInstanceIdException.class)
2728+
.given(consumer).poll(any());
2729+
2730+
ContainerProperties containerProps = new ContainerProperties(topic1);
2731+
containerProps.setGroupId("grp");
2732+
containerProps.setClientId("clientId");
2733+
containerProps.setMessageListener((MessageListener) r -> { });
2734+
KafkaMessageListenerContainer<Integer, String> container =
2735+
new KafkaMessageListenerContainer<>(cf, containerProps);
2736+
2737+
CountDownLatch stopped = new CountDownLatch(1);
2738+
2739+
container.setApplicationEventPublisher(e -> {
2740+
if (e instanceof ConsumerStoppedEvent) {
2741+
stopped.countDown();
26842742
}
26852743
});
26862744

26872745
container.start();
2688-
assertThat(stopping.await(10, TimeUnit.SECONDS)).isTrue();
2746+
assertThat(stopped.await(10, TimeUnit.SECONDS)).isTrue();
26892747
container.stop();
26902748
}
26912749

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,10 @@ Starting with version 2.3.4, you can set the listener container's `interceptBefo
666666

667667
No interceptor is provided for batch listeners because Kafka already provides a `ConsumerInterceptor`.
668668

669+
Starting with versions 2.3.8, 2.4.6, the `ConcurrentMessageListenerContainer` now supports https://kafka.apache.org/documentation/#static_membership[Static Membership] when the concurrency is greater than one.
670+
The `group.instance.id` is suffixed with `-n` with `n` starting at `1`.
671+
This, together with an increased `session.timeout.ms`, can be used to reduce rebalance events, for example, when application instances are restarted.
672+
669673
[[kafka-container]]
670674
====== Using `KafkaMessageListenerContainer`
671675

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ See <<container-factory>> for more information
7878
The `ContainerProperties` provides an `authorizationExceptionRetryInterval` option to let the listener container to retry after any `AuthorizationException` is thrown by the `KafkaConsumer`.
7979
See its JavaDocs and <<kafka-container>> for more information.
8080

81+
Static group membership is now supported.
82+
See <<message-listener-container>> for more information.
83+
8184
==== @KafkaListener
8285

8386
The `@KafkaListener` annotation has a new property `splitIterables`; default true.

0 commit comments

Comments
 (0)