Skip to content

Commit e518f1a

Browse files
sobychackoartembilan
authored andcommitted
GH-3011: Support enforced consumer rebalance
Fixes: #3011 Kafka consumer API supports an enforced rebalance. Provide an option via the message listener container to trigger this operation. * Update spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/enforced-rebalance.adoc **Auto-cherry-pick to `3.1.x`** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java
1 parent 0087e8f commit e518f1a

File tree

8 files changed

+262
-3
lines changed

8 files changed

+262
-3
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
**** xref:kafka/receiving-messages/kafkalistener-lifecycle.adoc[]
2121
**** xref:kafka/receiving-messages/validation.adoc[]
2222
**** xref:kafka/receiving-messages/rebalance-listeners.adoc[]
23+
**** xref:kafka/receiving-messages/enforced-rebalance.adoc[]
2324
**** xref:kafka/receiving-messages/annotation-send-to.adoc[]
2425
**** xref:kafka/receiving-messages/filtering.adoc[]
2526
**** xref:kafka/receiving-messages/retrying-deliveries.adoc[]
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[[enforced-rebalance]]
2+
= Enforcing Consumer Rebalance
3+
4+
Kafka clients now support an option to trigger an https://cwiki.apache.org/confluence/display/KAFKA/KIP-568%3A+Explicit+rebalance+triggering+on+the+Consumer[enforced rebalance].
5+
Starting with version `3.1.2`, Spring for Apache Kafka provides an option to invoke this API on the Kafka consumer via the message listener container.
6+
When calling this API, it is simply alerting the Kafka consumer to trigger an enforced rebalance; the actual rebalance will only occur as part of the next `poll()` operation.
7+
If there is already a rebalance in progress, calling an enforced rebalance is a NO-OP.
8+
The caller must wait for the current rebalance to complete before invoking another one.
9+
See the javadocs for `enfroceRebalance` for more details.
10+
11+
The following code snippet shows the essence of enforcing a rebalance using the message listener container.
12+
13+
[source, java]
14+
----
15+
@KafkaListener(id = "my.id", topics = "my-topic")
16+
void listen(ConsumerRecord<String, String> in) {
17+
System.out.println("From KafkaListener: " + in);
18+
}
19+
20+
@Bean
21+
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
22+
return args -> {
23+
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
24+
System.out.println("Enforcing a rebalance");
25+
Thread.sleep(5_000);
26+
listenerContainer.enforceRebalance();
27+
Thread.sleep(5_000);
28+
};
29+
}
30+
----
31+
32+
As the code above shows, the application uses the `KafkaListenerEndpointRegistry` to gain access to the message listener container and then calling the `enforceRebalnce` API on it.
33+
When calling the `enforceRebalance` on the listener container, it delegates the call to the underlying Kafka consumer.
34+
The Kafka consumer will trigger a rebalance as part of the next `poll()` operation.

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.concurrent.locks.ReentrantLock;
3031
import java.util.function.Function;
3132
import java.util.regex.Pattern;
@@ -67,6 +68,7 @@
6768
* @author Marius Bogoevici
6869
* @author Artem Bilan
6970
* @author Tomaz Fernandes
71+
* @author Soby Chacko
7072
*/
7173
public abstract class AbstractMessageListenerContainer<K, V>
7274
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
@@ -89,6 +91,8 @@ public abstract class AbstractMessageListenerContainer<K, V>
8991

9092
protected final ReentrantLock lifecycleLock = new ReentrantLock(); // NOSONAR
9193

94+
protected final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean();
95+
9296
private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();
9397

9498
@NonNull
@@ -134,6 +138,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
134138
@Nullable
135139
private KafkaAdmin kafkaAdmin;
136140

141+
137142
/**
138143
* Construct an instance with the provided factory and properties.
139144
* @param consumerFactory the factory.

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2023 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -392,6 +392,20 @@ && getContainerProperties().isRestartAfterAuthExceptions()
392392
}
393393
}
394394

395+
@Override
396+
public void enforceRebalance() {
397+
this.lifecycleLock.lock();
398+
try {
399+
// Since the rebalance is for the whole consumer group, there is no need to
400+
// initiate this operation for every single container in the group.
401+
final KafkaMessageListenerContainer<K, V> listenerContainer = this.containers.get(0);
402+
listenerContainer.enforceRebalance();
403+
}
404+
finally {
405+
this.lifecycleLock.unlock();
406+
}
407+
}
408+
395409
@Override
396410
public void pause() {
397411
this.lifecycleLock.lock();

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,15 @@ public boolean isInExpectedState() {
311311
return isRunning() || isStoppedNormally();
312312
}
313313

314+
@Override
315+
public void enforceRebalance() {
316+
this.thisOrParentContainer.enforceRebalanceRequested.set(true);
317+
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
318+
if (consumer != null) {
319+
consumer.wakeIfNecessary();
320+
}
321+
}
322+
314323
@Override
315324
public void pause() {
316325
super.pause();
@@ -1412,6 +1421,7 @@ protected void pollAndInvoke() {
14121421
if (!this.seeks.isEmpty()) {
14131422
processSeeks();
14141423
}
1424+
enforceRebalanceIfNecessary();
14151425
pauseConsumerIfNecessary();
14161426
pausePartitionsIfNecessary();
14171427
this.lastPoll = System.currentTimeMillis();
@@ -1730,6 +1740,20 @@ private void sleepFor(Duration duration) {
17301740
}
17311741
}
17321742

1743+
private void enforceRebalanceIfNecessary() {
1744+
try {
1745+
if (KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.get()) {
1746+
String enforcedRebalanceReason = String.format("Enforced rebalance requested for container: %s",
1747+
KafkaMessageListenerContainer.this.getListenerId());
1748+
this.logger.info(enforcedRebalanceReason);
1749+
this.consumer.enforceRebalance(enforcedRebalanceReason);
1750+
}
1751+
}
1752+
finally {
1753+
KafkaMessageListenerContainer.this.thisOrParentContainer.enforceRebalanceRequested.set(false);
1754+
}
1755+
}
1756+
17331757
private void pauseConsumerIfNecessary() {
17341758
if (this.offsetsInThisBatch != null) {
17351759
synchronized (this) {

spring-kafka/src/main/java/org/springframework/kafka/listener/MessageListenerContainer.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
3838
* @author Vladimir Tsanev
3939
* @author Tomaz Fernandes
4040
* @author Francois Rosiere
41+
* @author Soby Chacko
4142
*/
4243
public interface MessageListenerContainer extends SmartLifecycle, DisposableBean {
4344

@@ -85,6 +86,16 @@ default Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
8586
throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
8687
}
8788

89+
/**
90+
* Alerting the consumer to trigger an enforced rebalance. The actual enforce will happen
91+
* when the next poll() operation is invoked.
92+
* @since 3.1.2
93+
* @see org.apache.kafka.clients.consumer.KafkaConsumer#enforceRebalance()
94+
*/
95+
default void enforceRebalance() {
96+
throw new UnsupportedOperationException("This container doesn't support enforced rebalance");
97+
}
98+
8899
/**
89100
* Pause this container before the next poll(). This is a thread-safe operation, the
90101
* actual pause is processed by the consumer thread.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/*
2+
* Copyright 2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.awaitility.Awaitility.await;
21+
22+
import java.time.Duration;
23+
import java.util.Collection;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
26+
27+
import org.apache.kafka.clients.consumer.Consumer;
28+
import org.apache.kafka.clients.consumer.ConsumerRecord;
29+
import org.apache.kafka.common.TopicPartition;
30+
import org.junit.jupiter.api.Test;
31+
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.kafka.annotation.EnableKafka;
36+
import org.springframework.kafka.annotation.KafkaListener;
37+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
38+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
39+
import org.springframework.kafka.core.ConsumerFactory;
40+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
41+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
42+
import org.springframework.kafka.core.KafkaTemplate;
43+
import org.springframework.kafka.core.ProducerFactory;
44+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
45+
import org.springframework.kafka.test.context.EmbeddedKafka;
46+
import org.springframework.kafka.test.utils.KafkaTestUtils;
47+
import org.springframework.test.annotation.DirtiesContext;
48+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
49+
50+
/**
51+
* @author Soby Chacko
52+
* @since 3.1.2
53+
*/
54+
@SpringJUnitConfig
55+
@DirtiesContext
56+
@EmbeddedKafka(topics = "enforce-rebalance-topic")
57+
public class ContainerEnforceRebalanceTests {
58+
59+
@Test
60+
void enforceRebalance(@Autowired Config config, @Autowired KafkaTemplate<Integer, String> template,
61+
@Autowired KafkaListenerEndpointRegistry registry) throws InterruptedException {
62+
template.send("enforce-rebalance-topic", "my-data");
63+
final MessageListenerContainer listenerContainer = registry.getListenerContainer("enforce-rebalance-grp");
64+
assertThat(config.listenerLatch.await(10, TimeUnit.SECONDS)).isTrue();
65+
assertThat(listenerContainer).isNotNull();
66+
listenerContainer.enforceRebalance();
67+
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer).enforceRebalanceRequested).isTrue();
68+
// The test is expecting partition revoke once and assign twice.
69+
assertThat(config.partitionRevokedLatch.await(10, TimeUnit.SECONDS)).isTrue();
70+
assertThat(config.partitionAssignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
71+
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer).enforceRebalanceRequested).isFalse();
72+
listenerContainer.pause();
73+
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isPauseRequested()).isTrue());
74+
await().timeout(Duration.ofSeconds(10)).untilAsserted(() -> assertThat(listenerContainer.isContainerPaused()).isTrue());
75+
// resetting the latches
76+
config.partitionRevokedLatch = new CountDownLatch(1);
77+
config.partitionAssignedLatch = new CountDownLatch(1);
78+
listenerContainer.enforceRebalance();
79+
assertThat(config.partitionRevokedLatch.await(10, TimeUnit.SECONDS)).isTrue();
80+
assertThat(config.partitionAssignedLatch.await(10, TimeUnit.SECONDS)).isTrue();
81+
// Although the rebalance causes the consumer to resume again, since the container is paused,
82+
// it will pause the rebalanced consumers again.
83+
assertThat(listenerContainer.isPauseRequested()).isTrue();
84+
assertThat(listenerContainer.isContainerPaused()).isTrue();
85+
}
86+
87+
@Configuration
88+
@EnableKafka
89+
public static class Config {
90+
91+
@Autowired
92+
EmbeddedKafkaBroker broker;
93+
94+
CountDownLatch partitionRevokedLatch = new CountDownLatch(1);
95+
96+
CountDownLatch partitionAssignedLatch = new CountDownLatch(2);
97+
98+
CountDownLatch listenerLatch = new CountDownLatch(1);
99+
100+
@KafkaListener(id = "enforce-rebalance-grp", topics = "enforce-rebalance-topic")
101+
void listen(ConsumerRecord<Integer, String> ignored) {
102+
listenerLatch.countDown();
103+
}
104+
105+
@Bean
106+
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
107+
return new KafkaTemplate<>(pf);
108+
}
109+
110+
@Bean
111+
ProducerFactory<Integer, String> pf() {
112+
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker));
113+
}
114+
115+
@Bean
116+
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
117+
ConsumerFactory<Integer, String> cf) {
118+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
119+
new ConcurrentKafkaListenerContainerFactory<>();
120+
factory.setConsumerFactory(cf);
121+
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
122+
@Override
123+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
124+
partitionAssignedLatch.countDown();
125+
}
126+
127+
@Override
128+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
129+
partitionRevokedLatch.countDown();
130+
}
131+
});
132+
return factory;
133+
}
134+
135+
@Bean
136+
ConsumerFactory<Integer, String> cf() {
137+
return new DefaultKafkaConsumerFactory<>(
138+
KafkaTestUtils.consumerProps("enforce-rebalance-topic", "false", this.broker));
139+
}
140+
}
141+
142+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2487,6 +2487,34 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
24872487
logger.info("Stop rebalance after failed record");
24882488
}
24892489

2490+
@Test
2491+
void enforceRabalanceOnTheConsumer() throws Exception {
2492+
ConsumerFactory<Integer, String> cf = mock();
2493+
ContainerProperties containerProps = new ContainerProperties("enforce-rebalance-test-topic");
2494+
containerProps.setGroupId("grp");
2495+
containerProps.setAckMode(AckMode.RECORD);
2496+
containerProps.setClientId("clientId");
2497+
containerProps.setIdleBetweenPolls(10000L);
2498+
2499+
Consumer<Integer, String> consumer = mock();
2500+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2501+
2502+
CountDownLatch enforceRebalanceLatch = new CountDownLatch(1);
2503+
containerProps.setMessageListener((MessageListener<Object, Object>) data -> {
2504+
});
2505+
KafkaMessageListenerContainer<Integer, String> container =
2506+
new KafkaMessageListenerContainer<>(cf, containerProps);
2507+
willAnswer(i -> {
2508+
enforceRebalanceLatch.countDown();
2509+
container.stop();
2510+
return null;
2511+
}).given(consumer).enforceRebalance(any());
2512+
2513+
container.start();
2514+
container.enforceRebalance();
2515+
assertThat(enforceRebalanceLatch.await(10, TimeUnit.SECONDS)).isTrue();
2516+
}
2517+
24902518
@SuppressWarnings({ "unchecked" })
24912519
@Test
24922520
public void testPauseResumeAndConsumerSeekAware() throws Exception {

0 commit comments

Comments
 (0)